diff --git a/be/src/load/stream_load/stream_load_context.cpp b/be/src/load/stream_load/stream_load_context.cpp index 12e5c39f0d7683..4fee22ca057d62 100644 --- a/be/src/load/stream_load/stream_load_context.cpp +++ b/be/src/load/stream_load/stream_load_context.cpp @@ -58,6 +58,10 @@ std::string StreamLoadContext::to_json() const { } else { writer.Key("GroupCommit"); writer.Bool(true); + writer.Key("GroupCommitMode"); + writer.String(group_commit_mode.c_str()); + writer.Key("LoadId"); + writer.String(id.to_string().c_str()); } // status diff --git a/be/src/load/stream_load/stream_load_context.h b/be/src/load/stream_load/stream_load_context.h index 85b4c4d146804c..4e257c86e7d9b4 100644 --- a/be/src/load/stream_load/stream_load_context.h +++ b/be/src/load/stream_load/stream_load_context.h @@ -196,6 +196,7 @@ class StreamLoadContext { TFileFormatType::type format = TFileFormatType::FORMAT_CSV_PLAIN; TFileCompressType::type compress_type = TFileCompressType::UNKNOWN; bool group_commit = false; + std::string group_commit_mode = ""; std::shared_ptr body_sink; std::shared_ptr pipe; diff --git a/be/src/load/stream_load/stream_load_executor.cpp b/be/src/load/stream_load/stream_load_executor.cpp index b24333941c25da..08fc3bb34b15a9 100644 --- a/be/src/load/stream_load/stream_load_executor.cpp +++ b/be/src/load/stream_load/stream_load_executor.cpp @@ -189,6 +189,9 @@ Status StreamLoadExecutor::begin_txn(StreamLoadContext* ctx) { } request.__set_request_id(ctx->id.to_thrift()); request.__set_backend_id(_exec_env->cluster_info()->backend_id); + if (ctx->group_commit_mode.empty()) { + request.__set_use_table_group_commit_mode(true); + } TLoadTxnBeginResult result; Status status; @@ -217,6 +220,15 @@ Status StreamLoadExecutor::begin_txn(StreamLoadContext* ctx) { } return status; } + if (ctx->group_commit_mode.empty() && result.__isset.table_group_commit_mode) { + auto table_group_commit_mode = result.table_group_commit_mode; + if (iequal(table_group_commit_mode, "async_mode") || + iequal(table_group_commit_mode, "sync_mode")) { + ctx->group_commit = true; + ctx->group_commit_mode = table_group_commit_mode; + return Status::OK(); + } + } ctx->txn_id = result.txnId; if (result.__isset.db_id) { ctx->db_id = result.db_id; diff --git a/be/src/service/http/action/stream_load.cpp b/be/src/service/http/action/stream_load.cpp index f483e11258ac91..8582ced02a72e9 100644 --- a/be/src/service/http/action/stream_load.cpp +++ b/be/src/service/http/action/stream_load.cpp @@ -84,6 +84,9 @@ bvar::LatencyRecorder g_stream_load_commit_and_publish_latency_ms("stream_load", static constexpr size_t MIN_CHUNK_SIZE = 64 * 1024; static const std::string CHUNK = "chunked"; +static const std::string OFF_MODE = "off_mode"; +static const std::string SYNC_MODE = "sync_mode"; +static const std::string ASYNC_MODE = "async_mode"; #ifdef BE_TEST TStreamLoadPutResult k_stream_load_put_result; @@ -254,6 +257,7 @@ int StreamLoadAction::on_header(HttpRequest* req) { LOG(INFO) << "new income streaming load request." << ctx->brief() << ", db=" << ctx->db << ", tbl=" << ctx->table << ", group_commit=" << ctx->group_commit + << ", group_commit_mode=" << ctx->group_commit_mode << ", HTTP headers=" << req->get_all_headers(); ctx->begin_receive_and_read_data_cost_nanos = MonotonicNanos(); @@ -366,6 +370,9 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, std::shared_ptrstream_load_executor()->begin_txn(ctx.get())); ctx->begin_txn_cost_nanos = MonotonicNanos() - begin_txn_start_time; + if (ctx->group_commit) { + RETURN_IF_ERROR(_check_wal_space(ctx->group_commit_mode, ctx->body_bytes)); + } } // process put file @@ -767,12 +774,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, request.__set_stream_per_node(stream_per_node); } if (ctx->group_commit) { - if (!http_req->header(HTTP_GROUP_COMMIT).empty()) { - request.__set_group_commit_mode(http_req->header(HTTP_GROUP_COMMIT)); - } else { - // used for wait_internal_group_commit_finish - request.__set_group_commit_mode("sync_mode"); - } + request.__set_group_commit_mode(ctx->group_commit_mode); } if (!http_req->header(HTTP_COMPUTE_GROUP).empty()) { @@ -811,7 +813,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, if (config::is_cloud_mode() && ctx->two_phase_commit && ctx->is_mow_table()) { return Status::NotSupported("stream load 2pc is unsupported for mow table"); } - if (http_req->header(HTTP_GROUP_COMMIT) == "async_mode") { + if (iequal(ctx->group_commit_mode, ASYNC_MODE)) { // FIXME find a way to avoid chunked stream load write large WALs size_t content_length = 0; if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()) { @@ -886,17 +888,24 @@ void StreamLoadAction::_save_stream_load_record(std::shared_ptr ctx) { - std::string group_commit_mode = req->header(HTTP_GROUP_COMMIT); - if (!group_commit_mode.empty() && !iequal(group_commit_mode, "sync_mode") && - !iequal(group_commit_mode, "async_mode") && !iequal(group_commit_mode, "off_mode")) { - return Status::InvalidArgument( - "group_commit can only be [async_mode, sync_mode, off_mode]"); - } - if (config::wait_internal_group_commit_finish) { - group_commit_mode = "sync_mode"; +Status StreamLoadAction::_check_wal_space(const std::string& group_commit_mode, + int64_t content_length) { + if (iequal(group_commit_mode, ASYNC_MODE) && + !load_size_smaller_than_wal_limit(content_length)) { + std::stringstream ss; + ss << "There is no space for group commit stream load async WAL. This stream load " + "size is " + << content_length + << ". WAL dir info: " << ExecEnv::GetInstance()->wal_mgr()->get_wal_dirs_info_string(); + LOG(WARNING) << ss.str(); + return Status::Error(ss.str()); } + return Status::OK(); +} + +Status StreamLoadAction::_can_group_commit(HttpRequest* req, std::shared_ptr ctx, + std::string& group_commit_header, + bool& can_group_commit) { int64_t content_length = req->header(HttpHeaders::CONTENT_LENGTH).empty() ? 0 : std::stoll(req->header(HttpHeaders::CONTENT_LENGTH)); @@ -907,13 +916,11 @@ Status StreamLoadAction::_handle_group_commit(HttpRequest* req, LOG(WARNING) << ss.str(); return Status::InvalidArgument(ss.str()); } - // allow chunked stream load in flink auto is_chunk = !req->header(HttpHeaders::TRANSFER_ENCODING).empty() && req->header(HttpHeaders::TRANSFER_ENCODING).find(CHUNK) != std::string::npos; - if (group_commit_mode.empty() || iequal(group_commit_mode, "off_mode") || - (content_length == 0 && !is_chunk)) { + if (content_length == 0 && !is_chunk) { // off_mode and empty - ctx->group_commit = false; + can_group_commit = false; return Status::OK(); } if (is_chunk) { @@ -930,20 +937,47 @@ Status StreamLoadAction::_handle_group_commit(HttpRequest* req, iequal(req->header(HTTP_UNIQUE_KEY_UPDATE_MODE), "UPDATE_FLEXIBLE_COLUMNS")); if (!partial_columns && !partitions && !temp_partitions && !ctx->two_phase_commit && !update_mode) { - if (!config::wait_internal_group_commit_finish && !ctx->label.empty()) { + if (!config::wait_internal_group_commit_finish && !group_commit_header.empty() && + !ctx->label.empty()) { return Status::InvalidArgument("label and group_commit can't be set at the same time"); } - ctx->group_commit = true; - if (iequal(group_commit_mode, "async_mode")) { - if (!load_size_smaller_than_wal_limit(content_length)) { - std::stringstream ss; - ss << "There is no space for group commit stream load async WAL. This stream load " - "size is " - << content_length << ". WAL dir info: " - << ExecEnv::GetInstance()->wal_mgr()->get_wal_dirs_info_string(); - LOG(WARNING) << ss.str(); - return Status::Error(ss.str()); - } + RETURN_IF_ERROR(_check_wal_space(group_commit_header, content_length)); + can_group_commit = true; + } + return Status::OK(); +} + +Status StreamLoadAction::_handle_group_commit(HttpRequest* req, + std::shared_ptr ctx) { + std::string group_commit_header = req->header(HTTP_GROUP_COMMIT); + if (!group_commit_header.empty() && !iequal(group_commit_header, SYNC_MODE) && + !iequal(group_commit_header, ASYNC_MODE) && !iequal(group_commit_header, OFF_MODE)) { + return Status::InvalidArgument( + "group_commit can only be [async_mode, sync_mode, off_mode]"); + } + if (config::wait_internal_group_commit_finish) { + group_commit_header = SYNC_MODE; + } + + // if group_commit_header is off_mode, we will not use group commit + if (iequal(group_commit_header, OFF_MODE)) { + ctx->group_commit_mode = OFF_MODE; + ctx->group_commit = false; + return Status::OK(); + } + bool can_group_commit = false; + RETURN_IF_ERROR(_can_group_commit(req, ctx, group_commit_header, can_group_commit)); + if (!can_group_commit) { + ctx->group_commit_mode = OFF_MODE; + ctx->group_commit = false; + } else { + if (!group_commit_header.empty()) { + ctx->group_commit_mode = group_commit_header; + ctx->group_commit = true; + } else { + // use table property to decide group commit or not + ctx->group_commit_mode = ""; + ctx->group_commit = false; } } return Status::OK(); diff --git a/be/src/service/http/action/stream_load.h b/be/src/service/http/action/stream_load.h index e36b960f379266..29fc92065ed8d7 100644 --- a/be/src/service/http/action/stream_load.h +++ b/be/src/service/http/action/stream_load.h @@ -50,7 +50,10 @@ class StreamLoadAction : public HttpHandler { Status _handle(std::shared_ptr ctx, HttpRequest* req); Status _data_saved_path(HttpRequest* req, std::string* file_path, int64_t file_bytes); Status _process_put(HttpRequest* http_req, std::shared_ptr ctx); + Status _can_group_commit(HttpRequest* http_req, std::shared_ptr ctx, + std::string& group_commit_header, bool& can_group_commit); void _save_stream_load_record(std::shared_ptr ctx, const std::string& str); + Status _check_wal_space(const std::string& group_commit_mode, int64_t content_length); Status _handle_group_commit(HttpRequest* http_req, std::shared_ptr ctx); void _on_finish(std::shared_ptr ctx, HttpRequest* req); void _send_reply(std::shared_ptr ctx, HttpRequest* req); diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java index 113f32d7ae819b..55ebf57ee78144 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java @@ -2569,6 +2569,7 @@ public void updateTableProperties(Database db, String tableName, Map rowStoreColumns) { if (rowStoreColumns != null && !rowStoreColumns.isEmpty()) { modifyTableProperties(PropertyAnalyzer.PROPERTIES_STORE_ROW_COLUMN, "true"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/alter/CloudSchemaChangeHandler.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/alter/CloudSchemaChangeHandler.java index caf5f93f772957..c2155963273e09 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/alter/CloudSchemaChangeHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/alter/CloudSchemaChangeHandler.java @@ -100,6 +100,7 @@ public void updateTableProperties(Database db, String tableName, Map allowedProps = new HashSet() { { + add(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_MODE); add(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_INTERVAL_MS); add(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_DATA_BYTES); add(PropertyAnalyzer.PROPERTIES_FILE_CACHE_TTL_SECONDS); @@ -152,6 +153,21 @@ public void updateTableProperties(Database db, String tableName, Map properties, bo return groupCommitDataBytes; } + public static String analyzeGroupCommitMode(Map properties, boolean removeProperty) + throws AnalysisException { + String groupCommitMode = GROUP_COMMIT_MODE_OFF; + if (properties != null && properties.containsKey(PROPERTIES_GROUP_COMMIT_MODE)) { + groupCommitMode = properties.get(PROPERTIES_GROUP_COMMIT_MODE); + if (!groupCommitMode.equalsIgnoreCase(GROUP_COMMIT_MODE_OFF) + && !groupCommitMode.equalsIgnoreCase(GROUP_COMMIT_MODE_ASYNC) + && !groupCommitMode.equalsIgnoreCase(GROUP_COMMIT_MODE_SYNC)) { + throw new AnalysisException("Invalid group_commit_mode: " + groupCommitMode + + ". Valid values: " + GROUP_COMMIT_MODE_OFF + ", " + GROUP_COMMIT_MODE_ASYNC + + ", " + GROUP_COMMIT_MODE_SYNC); + } + if (removeProperty) { + properties.remove(PROPERTIES_GROUP_COMMIT_MODE); + } + } + return groupCommitMode.toLowerCase(); + } + /** * Check the type property of the catalog props. */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index 6a7a2950beb721..e56c0826bb6a9f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -2966,6 +2966,13 @@ private boolean createOlapTable(Database db, CreateTableInfo createTableInfo) th throw new DdlException(e.getMessage()); } + try { + String groupCommitMode = PropertyAnalyzer.analyzeGroupCommitMode(properties, true); + olapTable.setGroupCommitMode(groupCommitMode); + } catch (Exception e) { + throw new DdlException(e.getMessage()); + } + try { TEncryptionAlgorithm tdeAlgorithm = PropertyAnalyzer.analyzeTDEAlgorithm(properties); olapTable.setEncryptionAlgorithm(tdeAlgorithm); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ModifyTablePropertiesOp.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ModifyTablePropertiesOp.java index e2189ed0696c52..8ae93531a0f5a0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ModifyTablePropertiesOp.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ModifyTablePropertiesOp.java @@ -334,6 +334,10 @@ public void validate(ConnectContext ctx) throws UserException { PropertyAnalyzer.analyzeGroupCommitDataBytes(properties, false); this.needTableStable = false; this.opType = AlterOpType.MODIFY_TABLE_PROPERTY_SYNC; + } else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_MODE)) { + PropertyAnalyzer.analyzeGroupCommitMode(properties, false); + this.needTableStable = false; + this.opType = AlterOpType.MODIFY_TABLE_PROPERTY_SYNC; } else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_FILE_CACHE_TTL_SECONDS)) { this.needTableStable = false; this.opType = AlterOpType.MODIFY_TABLE_PROPERTY_SYNC; diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 33784ec9c52d86..2d35d55edc2579 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -1240,7 +1240,12 @@ public TLoadTxnBeginResult loadTxnBegin(TLoadTxnBeginRequest request) throws TEx try { TLoadTxnBeginResult tmpRes = loadTxnBeginImpl(request, clientAddr); - result.setTxnId(tmpRes.getTxnId()).setDbId(tmpRes.getDbId()); + if (tmpRes.isSetTableGroupCommitMode()) { + // if use table group commit mode, just return the mode info, no need to begin txn + result.setTableGroupCommitMode(tmpRes.getTableGroupCommitMode()).setDbId(tmpRes.getDbId()); + } else { + result.setTxnId(tmpRes.getTxnId()).setDbId(tmpRes.getDbId()); + } } catch (DuplicatedRequestException e) { // this is a duplicate request, just return previous txn id LOG.warn("duplicate request for stream load. request id: {}, txn: {}", e.getDuplicatedRequestId(), @@ -1297,6 +1302,16 @@ private TLoadTxnBeginResult loadTxnBeginImpl(TLoadTxnBeginRequest request, Strin } OlapTable table = (OlapTable) db.getTableOrMetaException(request.tbl, TableType.OLAP); + // check if use table group_commit_mode property + if (request.isUseTableGroupCommitMode()) { + String tableGroupCommitMode = table.getGroupCommitMode(); + if (tableGroupCommitMode != null && !tableGroupCommitMode.equalsIgnoreCase( + PropertyAnalyzer.GROUP_COMMIT_MODE_OFF)) { + TLoadTxnBeginResult result = new TLoadTxnBeginResult(); + result.setTableGroupCommitMode(tableGroupCommitMode).setDbId(db.getId()); + return result; + } + } // begin long timeoutSecond = request.isSetTimeout() ? request.getTimeout() : Config.stream_load_default_timeout_second; Backend backend = Env.getCurrentSystemInfo().getBackend(request.getBackendId()); diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto index 60596a5882850a..34422d6c1e306f 100644 --- a/gensrc/proto/cloud.proto +++ b/gensrc/proto/cloud.proto @@ -664,6 +664,7 @@ message TabletMetaInfoPB { // For update tablet meta optional bool disable_auto_compaction = 13; optional bool enable_mow_light_delete = 14; optional int32 vertical_compaction_num_columns_per_group = 15; + optional string group_commit_mode = 16; } message TabletCompactionJobPB { diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 7e520751ee170a..ccd082e7f5261f 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -466,6 +466,8 @@ struct TLoadTxnBeginRequest { 14: optional i64 table_id 15: optional i64 backend_id 16: optional TCertBasedAuth cert_based_auth + // If set to true: use table group_commit_mode property + 17: optional bool use_table_group_commit_mode } struct TLoadTxnBeginResult { @@ -473,6 +475,9 @@ struct TLoadTxnBeginResult { 2: optional i64 txnId 3: optional string job_status // if label already used, set status of existing job 4: optional i64 db_id + // If use_table_group_commit_mode is true in TLoadTxnBeginRequest, and table group_commit_mode property is + // async_mode or sync_mode, return table group_commit_mode (begin_txn is skipped) + 5: optional string table_group_commit_mode } struct TBeginTxnRequest { diff --git a/regression-test/data/query_p0/system/test_table_properties.out b/regression-test/data/query_p0/system/test_table_properties.out index 0f8b1a2acafd57..9f6e334cc9abdf 100644 --- a/regression-test/data/query_p0/system/test_table_properties.out +++ b/regression-test/data/query_p0/system/test_table_properties.out @@ -1,6 +1,6 @@ -- This file is automatically generated. You should know what you did if you want to edit this -- !select_check_1 -- -111 +114 -- !select_check_2 -- internal test_table_properties_db duplicate_table _auto_bucket false @@ -19,6 +19,7 @@ internal test_table_properties_db duplicate_table enable_unique_key_merge_on_wri internal test_table_properties_db duplicate_table file_cache_ttl_seconds 0 internal test_table_properties_db duplicate_table group_commit_data_bytes 134217728 internal test_table_properties_db duplicate_table group_commit_interval_ms 10000 +internal test_table_properties_db duplicate_table group_commit_mode off_mode internal test_table_properties_db duplicate_table in_memory false internal test_table_properties_db duplicate_table inverted_index_storage_format V3 internal test_table_properties_db duplicate_table is_being_synced false @@ -55,6 +56,7 @@ internal test_table_properties_db listtable enable_unique_key_merge_on_write fal internal test_table_properties_db listtable file_cache_ttl_seconds 0 internal test_table_properties_db listtable group_commit_data_bytes 134217728 internal test_table_properties_db listtable group_commit_interval_ms 10000 +internal test_table_properties_db listtable group_commit_mode off_mode internal test_table_properties_db listtable in_memory false internal test_table_properties_db listtable inverted_index_storage_format V3 internal test_table_properties_db listtable is_being_synced false @@ -91,6 +93,7 @@ internal test_table_properties_db unique_table enable_unique_key_merge_on_write internal test_table_properties_db unique_table file_cache_ttl_seconds 0 internal test_table_properties_db unique_table group_commit_data_bytes 134217728 internal test_table_properties_db unique_table group_commit_interval_ms 10000 +internal test_table_properties_db unique_table group_commit_mode off_mode internal test_table_properties_db unique_table in_memory false internal test_table_properties_db unique_table inverted_index_storage_format V3 internal test_table_properties_db unique_table is_being_synced false @@ -129,6 +132,7 @@ internal test_table_properties_db duplicate_table enable_unique_key_merge_on_wri internal test_table_properties_db duplicate_table file_cache_ttl_seconds 0 internal test_table_properties_db duplicate_table group_commit_data_bytes 134217728 internal test_table_properties_db duplicate_table group_commit_interval_ms 10000 +internal test_table_properties_db duplicate_table group_commit_mode off_mode internal test_table_properties_db duplicate_table in_memory false internal test_table_properties_db duplicate_table inverted_index_storage_format V3 internal test_table_properties_db duplicate_table is_being_synced false @@ -165,6 +169,7 @@ internal test_table_properties_db unique_table enable_unique_key_merge_on_write internal test_table_properties_db unique_table file_cache_ttl_seconds 0 internal test_table_properties_db unique_table group_commit_data_bytes 134217728 internal test_table_properties_db unique_table group_commit_interval_ms 10000 +internal test_table_properties_db unique_table group_commit_mode off_mode internal test_table_properties_db unique_table in_memory false internal test_table_properties_db unique_table inverted_index_storage_format V3 internal test_table_properties_db unique_table is_being_synced false @@ -205,6 +210,7 @@ internal test_table_properties_db duplicate_table enable_unique_key_merge_on_wri internal test_table_properties_db duplicate_table file_cache_ttl_seconds 0 internal test_table_properties_db duplicate_table group_commit_data_bytes 134217728 internal test_table_properties_db duplicate_table group_commit_interval_ms 10000 +internal test_table_properties_db duplicate_table group_commit_mode off_mode internal test_table_properties_db duplicate_table in_memory false internal test_table_properties_db duplicate_table inverted_index_storage_format V3 internal test_table_properties_db duplicate_table is_being_synced false diff --git a/regression-test/suites/insert_p0/insert_group_commit_into.groovy b/regression-test/suites/insert_p0/insert_group_commit_into.groovy index ff8aaeeefc7dfc..9f18e9a813c839 100644 --- a/regression-test/suites/insert_p0/insert_group_commit_into.groovy +++ b/regression-test/suites/insert_p0/insert_group_commit_into.groovy @@ -513,6 +513,69 @@ suite("insert_group_commit_into") { """ exception """group_commit_data_bytes must be greater than 0""" } + + // Test group_commit_mode property + sql """ drop table if exists ${table}_mode; """ + sql """ + CREATE TABLE IF NOT EXISTS ${table}_mode ( + k1 INT not null, + k2 varchar(50) + ) + DUPLICATE KEY(`k1`) + DISTRIBUTED BY HASH(`k1`) + BUCKETS 1 PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "group_commit_interval_ms" = "200", + "group_commit_mode" = "async_mode" + ); + """ + + // Verify SHOW CREATE TABLE displays group_commit_mode + def createStmt = sql """ SHOW CREATE TABLE ${table}_mode """ + logger.info("SHOW CREATE TABLE result: " + createStmt) + assertTrue(createStmt.toString().contains('group_commit_mode'), "SHOW CREATE TABLE should contain group_commit_mode") + assertTrue(createStmt.toString().contains('async_mode'), "SHOW CREATE TABLE should contain async_mode") + + // Test ALTER TABLE to change group_commit_mode + sql """ ALTER TABLE ${table}_mode SET ("group_commit_mode" = "SYNC_MODE"); """ + + def createStmt2 = sql """ SHOW CREATE TABLE ${table}_mode """ + logger.info("SHOW CREATE TABLE result after alter: " + createStmt2) + assertTrue(createStmt2.toString().contains('SYNC_MODE'), "SHOW CREATE TABLE should contain sync_mode after alter") + + // Test ALTER TABLE to change back to off_mode - should NOT show in SHOW CREATE TABLE + sql """ ALTER TABLE ${table}_mode SET ("group_commit_mode" = "OFF_MODE"); """ + + def createStmt3 = sql """ SHOW CREATE TABLE ${table}_mode """ + logger.info("SHOW CREATE TABLE result after alter to off_mode: " + createStmt3) + // off_mode should NOT be shown in SHOW CREATE TABLE + assertTrue(!createStmt3.toString().contains('group_commit_mode'), + "off_mode should NOT be shown in SHOW CREATE TABLE") + + // Test invalid group_commit_mode value + test { + sql """ ALTER TABLE ${table}_mode SET ("group_commit_mode" = "invalid_mode"); """ + exception """Invalid group_commit_mode""" + } + + // Test default value (off_mode) - should NOT show group_commit_mode in SHOW CREATE TABLE + sql """ drop table if exists ${table}_mode_default; """ + sql """ + CREATE TABLE IF NOT EXISTS ${table}_mode_default ( + k1 INT not null + ) + DUPLICATE KEY(`k1`) + DISTRIBUTED BY HASH(`k1`) + BUCKETS 1 PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + def createStmt4 = sql """ SHOW CREATE TABLE ${table}_mode_default """ + logger.info("SHOW CREATE TABLE result for default: " + createStmt4) + // When default is off_mode, it should NOT be shown in SHOW CREATE TABLE + assertTrue(!createStmt4.toString().contains('group_commit_mode'), + "Default off_mode should NOT be shown in SHOW CREATE TABLE") + } finally { } } diff --git a/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy b/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy index a2de2a04b98d9d..342b87f13960c0 100644 --- a/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy +++ b/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy @@ -31,6 +31,22 @@ suite("test_group_commit_stream_load") { } } + def getTableRowCount = { tableName1, expectedRowCount -> + def retry = 0 + def row = 0 + while (retry < 30) { + sleep(2000) + def rowCount = sql "select count(*) from ${tableName1}" + logger.info("rowCount: " + rowCount + ", retry: " + retry) + row = rowCount[0][0] + if (row >= expectedRowCount) { + break + } + retry++ + } + assertTrue(row >= expectedRowCount, "Expected at least ${expectedRowCount} rows, but got ${row}") + } + def getAlterTableState = { waitForSchemaChangeDone { sql """ SHOW ALTER TABLE COLUMN WHERE tablename='${tableName}' ORDER BY createtime DESC LIMIT 1 """ @@ -342,4 +358,164 @@ suite("test_group_commit_stream_load") { } qt_read_json_by_line "select k,v1,v2,v3,v4,v5,BITMAP_TO_STRING(__DORIS_SKIP_BITMAP_COL__) from ${tableName} order by k;" + // Test: stream load using table property group_commit_mode (async_mode) + // When HTTP header 'group_commit' is not set, should use table's group_commit_mode property + def tableNameAsync = "test_group_commit_stream_load_table_property_async" + try { + sql """ drop table if exists ${tableNameAsync}; """ + + sql """ + CREATE TABLE `${tableNameAsync}` ( + `id` int(11) NOT NULL, + `name` varchar(100) NULL, + `score` int(11) NULL default "-1" + ) ENGINE=OLAP + DUPLICATE KEY(`id`, `name`) + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "group_commit_interval_ms" = "200", + "group_commit_mode" = "async_mode" + ); + """ + + // Verify SHOW CREATE TABLE contains group_commit_mode + def createStmt1 = sql """ SHOW CREATE TABLE ${tableNameAsync} """ + logger.info("SHOW CREATE TABLE for async: " + createStmt1) + assertTrue(createStmt1.toString().contains('async_mode'), "Table should have async_mode") + + // Stream load WITHOUT setting group_commit header - should use table property + streamLoad { + table "${tableNameAsync}" + set 'column_separator', ',' + // NOT setting 'group_commit' header - should use table property + set 'columns', 'id, name' + file "test_stream_load1.csv" + unset 'label' + time 10000 + + check { result, exception, startTime, endTime -> + checkStreamLoadResult(exception, result, 2, 2, 0, 0) + } + } + // Check data is loaded + getTableRowCount(tableNameAsync, 2) + + streamLoad { + table "${tableNameAsync}" + set 'column_separator', ',' + // NOT setting 'group_commit' header - should use table property, but set partitions + set 'partitions', "${tableNameAsync}" + set 'columns', 'id, name' + file "test_stream_load1.csv" + time 10000 + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result (header override): ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + def label = json.Label + assertTrue(!label.startsWith("group_commit")) + } + } + } finally { + } + + // Test: stream load using table property group_commit_mode (sync_mode) + def tableNameSync = "test_group_commit_stream_load_table_property_sync" + try { + sql """ drop table if exists ${tableNameSync}; """ + + sql """ + CREATE TABLE `${tableNameSync}` ( + `id` int(11) NOT NULL, + `name` varchar(100) NULL, + `score` int(11) NULL default "-1" + ) ENGINE=OLAP + DUPLICATE KEY(`id`, `name`) + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "group_commit_interval_ms" = "200", + "group_commit_mode" = "SYNC_MODE" + ); + """ + + // Verify SHOW CREATE TABLE contains group_commit_mode + def createStmt2 = sql """ SHOW CREATE TABLE ${tableNameSync} """ + logger.info("SHOW CREATE TABLE for sync: " + createStmt2) + assertTrue(createStmt2.toString().contains('sync_mode'), "Table should have sync_mode") + + // Stream load WITHOUT setting group_commit header - should use table property (sync_mode) + streamLoad { + table "${tableNameSync}" + set 'column_separator', ',' + // NOT setting 'group_commit' header - should use table property + set 'columns', 'id, name' + file "test_stream_load1.csv" + unset 'label' + time 10000 + + check { result, exception, startTime, endTime -> + checkStreamLoadResult(exception, result, 2, 2, 0, 0) + } + } + + // Check data is loaded + def rowCount2 = sql "select count(*) from ${tableNameSync}" + logger.info("Row count for sync table: " + rowCount2) + assertTrue(rowCount2[0][0] > 0, "Data should be loaded") + + } finally { + } + + // Test: stream load with header override table property + // Table has async_mode, but header sets off_mode + def tableNameOverride = "test_group_commit_stream_load_override" + try { + sql """ drop table if exists ${tableNameOverride}; """ + + sql """ + CREATE TABLE `${tableNameOverride}` ( + `id` int(11) NOT NULL, + `name` varchar(100) NULL, + `score` int(11) NULL default "-1" + ) ENGINE=OLAP + DUPLICATE KEY(`id`, `name`) + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "group_commit_interval_ms" = "200", + "group_commit_mode" = "async_mode" + ); + """ + + // Stream load with header setting group_commit=off_mode - should override table property + streamLoad { + table "${tableNameOverride}" + set 'column_separator', ',' + set 'columns', 'id, name' + set 'group_commit', 'off_mode' // Override table property + file "test_stream_load1.csv" + set 'label', 'test_override_' + System.currentTimeMillis() + time 10000 + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result (header override): ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + def label = json.Label + assertTrue(label.startsWith("test_override_"), "Label should start with test_override_") + // When off_mode, GroupCommit should be false or label should not start with group_commit_ + // Note: GroupCommit field behavior may vary, but label should NOT be group_commit_ when off_mode + } + } + } finally { + } }