From 05b5b3212a81acd431da2250a6e06da9a4005b33 Mon Sep 17 00:00:00 2001 From: meiyi Date: Mon, 9 Mar 2026 19:31:13 +0800 Subject: [PATCH 1/3] [fix](group commit) forbid set group_commit_interval_ms and group_commit_data_bytes to 0 (#61034) --- .../cloud/alter/CloudSchemaChangeHandler.java | 6 ++-- .../doris/common/util/PropertyAnalyzer.java | 23 +++++++++---- .../doris/datasource/InternalCatalog.java | 4 +-- .../info/ModifyTablePropertiesOp.java | 26 ++------------ .../insert_p0/insert_group_commit_into.groovy | 34 +++++++++++++++++++ 5 files changed, 57 insertions(+), 36 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/alter/CloudSchemaChangeHandler.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/alter/CloudSchemaChangeHandler.java index 21860a7772d8ef..caf5f93f772957 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/alter/CloudSchemaChangeHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/alter/CloudSchemaChangeHandler.java @@ -153,8 +153,7 @@ public void updateTableProperties(Database db, String tableName, Map p * 1000 * * @param properties - * @param defaultValue * @return * @throws AnalysisException */ - public static int analyzeGroupCommitIntervalMs(Map properties) throws AnalysisException { + public static int analyzeGroupCommitIntervalMs(Map properties, boolean removeProperty) + throws AnalysisException { int groupCommitIntervalMs = PROPERTIES_GROUP_COMMIT_INTERVAL_MS_DEFAULT_VALUE; if (properties != null && properties.containsKey(PROPERTIES_GROUP_COMMIT_INTERVAL_MS)) { String groupIntervalCommitMsStr = properties.get(PROPERTIES_GROUP_COMMIT_INTERVAL_MS); @@ -1736,24 +1736,35 @@ public static int analyzeGroupCommitIntervalMs(Map properties) t } catch (Exception e) { throw new AnalysisException("parse group_commit_interval_ms format error"); } + if (groupCommitIntervalMs <= 0) { + throw new AnalysisException("group_commit_interval_ms must be greater than 0"); + } - properties.remove(PROPERTIES_GROUP_COMMIT_INTERVAL_MS); + if (removeProperty) { + properties.remove(PROPERTIES_GROUP_COMMIT_INTERVAL_MS); + } } return groupCommitIntervalMs; } - public static int analyzeGroupCommitDataBytes(Map properties) throws AnalysisException { + public static int analyzeGroupCommitDataBytes(Map properties, boolean removeProperty) + throws AnalysisException { int groupCommitDataBytes = PROPERTIES_GROUP_COMMIT_DATA_BYTES_DEFAULT_VALUE; if (properties != null && properties.containsKey(PROPERTIES_GROUP_COMMIT_DATA_BYTES)) { String groupIntervalCommitDataBytesStr = properties.get(PROPERTIES_GROUP_COMMIT_DATA_BYTES); try { groupCommitDataBytes = Integer.parseInt(groupIntervalCommitDataBytesStr); } catch (Exception e) { - throw new AnalysisException("parse group_commit_interval_ms format error"); + throw new AnalysisException("parse group_commit_data_bytes format error"); + } + if (groupCommitDataBytes <= 0) { + throw new AnalysisException("group_commit_data_bytes must be greater than 0"); } - properties.remove(PROPERTIES_GROUP_COMMIT_DATA_BYTES); + if (removeProperty) { + properties.remove(PROPERTIES_GROUP_COMMIT_DATA_BYTES); + } } return groupCommitDataBytes; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index 74dfe12ba68d73..e23fac83674380 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -3035,14 +3035,14 @@ private boolean createOlapTable(Database db, CreateTableInfo createTableInfo) th } try { - int groupCommitIntervalMs = PropertyAnalyzer.analyzeGroupCommitIntervalMs(properties); + int groupCommitIntervalMs = PropertyAnalyzer.analyzeGroupCommitIntervalMs(properties, true); olapTable.setGroupCommitIntervalMs(groupCommitIntervalMs); } catch (Exception e) { throw new DdlException(e.getMessage()); } try { - int groupCommitDataBytes = PropertyAnalyzer.analyzeGroupCommitDataBytes(properties); + int groupCommitDataBytes = PropertyAnalyzer.analyzeGroupCommitDataBytes(properties, true); olapTable.setGroupCommitDataBytes(groupCommitDataBytes); } catch (Exception e) { throw new DdlException(e.getMessage()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ModifyTablePropertiesOp.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ModifyTablePropertiesOp.java index 1e74a816543d43..e1fce11e594e87 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ModifyTablePropertiesOp.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ModifyTablePropertiesOp.java @@ -329,33 +329,11 @@ public void validate(ConnectContext ctx) throws UserException { this.needTableStable = false; this.opType = AlterOpType.MODIFY_TABLE_PROPERTY_SYNC; } else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_INTERVAL_MS)) { - long groupCommitIntervalMs; - String groupCommitIntervalMsStr = properties.get(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_INTERVAL_MS); - try { - groupCommitIntervalMs = Long.parseLong(groupCommitIntervalMsStr); - if (groupCommitIntervalMs < 0) { - throw new AnalysisException("group_commit_interval_ms can not be less than 0:" - + groupCommitIntervalMsStr); - } - } catch (NumberFormatException e) { - throw new AnalysisException("Invalid group_commit_interval_ms format: " - + groupCommitIntervalMsStr); - } + PropertyAnalyzer.analyzeGroupCommitIntervalMs(properties, false); this.needTableStable = false; this.opType = AlterOpType.MODIFY_TABLE_PROPERTY_SYNC; } else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_DATA_BYTES)) { - long groupCommitDataBytes; - String groupCommitDataBytesStr = properties.get(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_DATA_BYTES); - try { - groupCommitDataBytes = Long.parseLong(groupCommitDataBytesStr); - if (groupCommitDataBytes < 0) { - throw new AnalysisException("group_commit_data_bytes can not be less than 0:" - + groupCommitDataBytesStr); - } - } catch (NumberFormatException e) { - throw new AnalysisException("Invalid group_commit_data_bytes format: " - + groupCommitDataBytesStr); - } + PropertyAnalyzer.analyzeGroupCommitDataBytes(properties, false); this.needTableStable = false; this.opType = AlterOpType.MODIFY_TABLE_PROPERTY_SYNC; } else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_FILE_CACHE_TTL_SECONDS)) { diff --git a/regression-test/suites/insert_p0/insert_group_commit_into.groovy b/regression-test/suites/insert_p0/insert_group_commit_into.groovy index 1b9b31db1d3410..ff8aaeeefc7dfc 100644 --- a/regression-test/suites/insert_p0/insert_group_commit_into.groovy +++ b/regression-test/suites/insert_p0/insert_group_commit_into.groovy @@ -479,6 +479,40 @@ suite("insert_group_commit_into") { exception """null value for not null column""" } getRowCount(2) + + // test invalid group_commit_interval_ms and group_commit_data_bytes + test { + sql """ ALTER TABLE ${table} SET ("group_commit_interval_ms" = "0"); """ + exception """group_commit_interval_ms must be greater than 0""" + } + test { + sql """ ALTER TABLE ${table} SET ("group_commit_data_bytes" = "0"); """ + exception """group_commit_data_bytes must be greater than 0""" + } + test { + sql """ drop table if exists ${table}_invalid; """ + sql """ CREATE TABLE IF NOT EXISTS ${table}_invalid ( k1 INT not null ) + DUPLICATE KEY(`k1`) + DISTRIBUTED BY HASH(`k1`) + BUCKETS 1 PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "group_commit_interval_ms" = "0", + "group_commit_data_bytes" = "100" + ); + """ + exception """group_commit_interval_ms must be greater than 0""" + } + test { + sql """ CREATE TABLE IF NOT EXISTS ${table}_invalid ( k1 INT not null ) + DUPLICATE KEY(`k1`) + DISTRIBUTED BY HASH(`k1`) + BUCKETS 1 PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "group_commit_data_bytes" = "0" + ); + """ + exception """group_commit_data_bytes must be greater than 0""" + } } finally { } } From 53cb8de234ab4b881c80ca32bcbaf3e8965c7bfd Mon Sep 17 00:00:00 2001 From: meiyi Date: Mon, 16 Mar 2026 15:09:19 +0800 Subject: [PATCH 2/3] [feat](group commit) add group_commit_mode in table property (#61242) ### What problem does this PR solve? Issue Number: close #xxx Related PR: #xxx Problem Summary: 1. Create table support set `group_commit_mode` table property ``` CREATE TABLE ... PROPERTIES( "group_commit_mode" = "async_mode" ); ``` 2. Support alter this property ``` ALTER TABLE ... SET ("group_commit_mode" = "off_mode"); ``` 3. Show create table shows this property if its value is not `off_mode` 4. For stream load, if it not set `group_commit` header, use the table property as the group commit mode; if it set `group_commit` header, use the header value. 5. doc: https://github.com/apache/doris-website/pull/3465 ### Release note None ### Check List (For Author) - Test - [ ] Regression test - [ ] Unit Test - [ ] Manual test (add detailed scripts or steps below) - [ ] No need to test or manual test. Explain why: - [ ] This is a refactor/code format and no logic has been changed. - [ ] Previous test can cover this change. - [ ] No code files have been changed. - [ ] Other reason - Behavior changed: - [ ] No. - [ ] Yes. - Does this need documentation? - [ ] No. - [ ] Yes. ### Check List (For Reviewer who merge this PR) - [ ] Confirm the release note - [ ] Confirm test cases - [ ] Confirm document - [ ] Add branch pick label --- be/src/http/action/stream_load.cpp | 100 ++++++---- be/src/http/action/stream_load.h | 3 + .../stream_load/stream_load_context.cpp | 4 + .../runtime/stream_load/stream_load_context.h | 1 + .../stream_load/stream_load_executor.cpp | 12 ++ .../doris/alter/SchemaChangeHandler.java | 2 + .../java/org/apache/doris/catalog/Env.java | 6 + .../org/apache/doris/catalog/OlapTable.java | 8 + .../apache/doris/catalog/TableProperty.java | 9 + .../cloud/alter/CloudSchemaChangeHandler.java | 21 +++ .../doris/common/util/PropertyAnalyzer.java | 24 +++ .../doris/datasource/InternalCatalog.java | 7 + .../info/ModifyTablePropertiesOp.java | 4 + .../doris/service/FrontendServiceImpl.java | 17 +- gensrc/proto/cloud.proto | 1 + gensrc/thrift/FrontendService.thrift | 5 + .../query_p0/system/test_table_properties.out | 8 +- .../insert_p0/insert_group_commit_into.groovy | 63 +++++++ .../test_group_commit_stream_load.groovy | 176 ++++++++++++++++++ 19 files changed, 436 insertions(+), 35 deletions(-) diff --git a/be/src/http/action/stream_load.cpp b/be/src/http/action/stream_load.cpp index ee660d9a1bf617..092088bf44aa5b 100644 --- a/be/src/http/action/stream_load.cpp +++ b/be/src/http/action/stream_load.cpp @@ -82,6 +82,9 @@ bvar::LatencyRecorder g_stream_load_commit_and_publish_latency_ms("stream_load", static constexpr size_t MIN_CHUNK_SIZE = 64 * 1024; static const std::string CHUNK = "chunked"; +static const std::string OFF_MODE = "off_mode"; +static const std::string SYNC_MODE = "sync_mode"; +static const std::string ASYNC_MODE = "async_mode"; #ifdef BE_TEST TStreamLoadPutResult k_stream_load_put_result; @@ -219,6 +222,7 @@ int StreamLoadAction::on_header(HttpRequest* req) { LOG(INFO) << "new income streaming load request." << ctx->brief() << ", db=" << ctx->db << ", tbl=" << ctx->table << ", group_commit=" << ctx->group_commit + << ", group_commit_mode=" << ctx->group_commit_mode << ", HTTP headers=" << req->get_all_headers(); ctx->begin_receive_and_read_data_cost_nanos = MonotonicNanos(); @@ -345,6 +349,9 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, std::shared_ptrstream_load_executor()->begin_txn(ctx.get())); ctx->begin_txn_cost_nanos = MonotonicNanos() - begin_txn_start_time; + if (ctx->group_commit) { + RETURN_IF_ERROR(_check_wal_space(ctx->group_commit_mode, ctx->body_bytes)); + } } // process put file @@ -746,12 +753,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, request.__set_stream_per_node(stream_per_node); } if (ctx->group_commit) { - if (!http_req->header(HTTP_GROUP_COMMIT).empty()) { - request.__set_group_commit_mode(http_req->header(HTTP_GROUP_COMMIT)); - } else { - // used for wait_internal_group_commit_finish - request.__set_group_commit_mode("sync_mode"); - } + request.__set_group_commit_mode(ctx->group_commit_mode); } if (!http_req->header(HTTP_COMPUTE_GROUP).empty()) { @@ -790,7 +792,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, if (config::is_cloud_mode() && ctx->two_phase_commit && ctx->is_mow_table()) { return Status::NotSupported("stream load 2pc is unsupported for mow table"); } - if (http_req->header(HTTP_GROUP_COMMIT) == "async_mode") { + if (iequal(ctx->group_commit_mode, ASYNC_MODE)) { // FIXME find a way to avoid chunked stream load write large WALs size_t content_length = 0; if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()) { @@ -861,17 +863,24 @@ void StreamLoadAction::_save_stream_load_record(std::shared_ptr ctx) { - std::string group_commit_mode = req->header(HTTP_GROUP_COMMIT); - if (!group_commit_mode.empty() && !iequal(group_commit_mode, "sync_mode") && - !iequal(group_commit_mode, "async_mode") && !iequal(group_commit_mode, "off_mode")) { - return Status::InvalidArgument( - "group_commit can only be [async_mode, sync_mode, off_mode]"); - } - if (config::wait_internal_group_commit_finish) { - group_commit_mode = "sync_mode"; +Status StreamLoadAction::_check_wal_space(const std::string& group_commit_mode, + int64_t content_length) { + if (iequal(group_commit_mode, ASYNC_MODE) && + !load_size_smaller_than_wal_limit(content_length)) { + std::stringstream ss; + ss << "There is no space for group commit stream load async WAL. This stream load " + "size is " + << content_length + << ". WAL dir info: " << ExecEnv::GetInstance()->wal_mgr()->get_wal_dirs_info_string(); + LOG(WARNING) << ss.str(); + return Status::Error(ss.str()); } + return Status::OK(); +} + +Status StreamLoadAction::_can_group_commit(HttpRequest* req, std::shared_ptr ctx, + std::string& group_commit_header, + bool& can_group_commit) { int64_t content_length = req->header(HttpHeaders::CONTENT_LENGTH).empty() ? 0 : std::stoll(req->header(HttpHeaders::CONTENT_LENGTH)); @@ -882,13 +891,11 @@ Status StreamLoadAction::_handle_group_commit(HttpRequest* req, LOG(WARNING) << ss.str(); return Status::InvalidArgument(ss.str()); } - // allow chunked stream load in flink auto is_chunk = !req->header(HttpHeaders::TRANSFER_ENCODING).empty() && req->header(HttpHeaders::TRANSFER_ENCODING).find(CHUNK) != std::string::npos; - if (group_commit_mode.empty() || iequal(group_commit_mode, "off_mode") || - (content_length == 0 && !is_chunk)) { + if (content_length == 0 && !is_chunk) { // off_mode and empty - ctx->group_commit = false; + can_group_commit = false; return Status::OK(); } if (is_chunk) { @@ -905,20 +912,47 @@ Status StreamLoadAction::_handle_group_commit(HttpRequest* req, iequal(req->header(HTTP_UNIQUE_KEY_UPDATE_MODE), "UPDATE_FLEXIBLE_COLUMNS")); if (!partial_columns && !partitions && !temp_partitions && !ctx->two_phase_commit && !update_mode) { - if (!config::wait_internal_group_commit_finish && !ctx->label.empty()) { + if (!config::wait_internal_group_commit_finish && !group_commit_header.empty() && + !ctx->label.empty()) { return Status::InvalidArgument("label and group_commit can't be set at the same time"); } - ctx->group_commit = true; - if (iequal(group_commit_mode, "async_mode")) { - if (!load_size_smaller_than_wal_limit(content_length)) { - std::stringstream ss; - ss << "There is no space for group commit stream load async WAL. This stream load " - "size is " - << content_length << ". WAL dir info: " - << ExecEnv::GetInstance()->wal_mgr()->get_wal_dirs_info_string(); - LOG(WARNING) << ss.str(); - return Status::Error(ss.str()); - } + RETURN_IF_ERROR(_check_wal_space(group_commit_header, content_length)); + can_group_commit = true; + } + return Status::OK(); +} + +Status StreamLoadAction::_handle_group_commit(HttpRequest* req, + std::shared_ptr ctx) { + std::string group_commit_header = req->header(HTTP_GROUP_COMMIT); + if (!group_commit_header.empty() && !iequal(group_commit_header, SYNC_MODE) && + !iequal(group_commit_header, ASYNC_MODE) && !iequal(group_commit_header, OFF_MODE)) { + return Status::InvalidArgument( + "group_commit can only be [async_mode, sync_mode, off_mode]"); + } + if (config::wait_internal_group_commit_finish) { + group_commit_header = SYNC_MODE; + } + + // if group_commit_header is off_mode, we will not use group commit + if (iequal(group_commit_header, OFF_MODE)) { + ctx->group_commit_mode = OFF_MODE; + ctx->group_commit = false; + return Status::OK(); + } + bool can_group_commit = false; + RETURN_IF_ERROR(_can_group_commit(req, ctx, group_commit_header, can_group_commit)); + if (!can_group_commit) { + ctx->group_commit_mode = OFF_MODE; + ctx->group_commit = false; + } else { + if (!group_commit_header.empty()) { + ctx->group_commit_mode = group_commit_header; + ctx->group_commit = true; + } else { + // use table property to decide group commit or not + ctx->group_commit_mode = ""; + ctx->group_commit = false; } } return Status::OK(); diff --git a/be/src/http/action/stream_load.h b/be/src/http/action/stream_load.h index bf359317f94ba1..bf06a5d6ca8dfd 100644 --- a/be/src/http/action/stream_load.h +++ b/be/src/http/action/stream_load.h @@ -49,7 +49,10 @@ class StreamLoadAction : public HttpHandler { Status _handle(std::shared_ptr ctx); Status _data_saved_path(HttpRequest* req, std::string* file_path, int64_t file_bytes); Status _process_put(HttpRequest* http_req, std::shared_ptr ctx); + Status _can_group_commit(HttpRequest* http_req, std::shared_ptr ctx, + std::string& group_commit_header, bool& can_group_commit); void _save_stream_load_record(std::shared_ptr ctx, const std::string& str); + Status _check_wal_space(const std::string& group_commit_mode, int64_t content_length); Status _handle_group_commit(HttpRequest* http_req, std::shared_ptr ctx); private: diff --git a/be/src/runtime/stream_load/stream_load_context.cpp b/be/src/runtime/stream_load/stream_load_context.cpp index 65ec117149344f..ac362b3f0373f8 100644 --- a/be/src/runtime/stream_load/stream_load_context.cpp +++ b/be/src/runtime/stream_load/stream_load_context.cpp @@ -58,6 +58,10 @@ std::string StreamLoadContext::to_json() const { } else { writer.Key("GroupCommit"); writer.Bool(true); + writer.Key("GroupCommitMode"); + writer.String(group_commit_mode.c_str()); + writer.Key("LoadId"); + writer.String(id.to_string().c_str()); } // status diff --git a/be/src/runtime/stream_load/stream_load_context.h b/be/src/runtime/stream_load/stream_load_context.h index d66602a8d2387f..c8869323331974 100644 --- a/be/src/runtime/stream_load/stream_load_context.h +++ b/be/src/runtime/stream_load/stream_load_context.h @@ -195,6 +195,7 @@ class StreamLoadContext { TFileFormatType::type format = TFileFormatType::FORMAT_CSV_PLAIN; TFileCompressType::type compress_type = TFileCompressType::UNKNOWN; bool group_commit = false; + std::string group_commit_mode = ""; std::shared_ptr body_sink; std::shared_ptr pipe; diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp index 907fbdf7410301..fd2e78cb620e1d 100644 --- a/be/src/runtime/stream_load/stream_load_executor.cpp +++ b/be/src/runtime/stream_load/stream_load_executor.cpp @@ -172,6 +172,9 @@ Status StreamLoadExecutor::begin_txn(StreamLoadContext* ctx) { } request.__set_request_id(ctx->id.to_thrift()); request.__set_backend_id(_exec_env->cluster_info()->backend_id); + if (ctx->group_commit_mode.empty()) { + request.__set_use_table_group_commit_mode(true); + } TLoadTxnBeginResult result; Status status; @@ -200,6 +203,15 @@ Status StreamLoadExecutor::begin_txn(StreamLoadContext* ctx) { } return status; } + if (ctx->group_commit_mode.empty() && result.__isset.table_group_commit_mode) { + auto table_group_commit_mode = result.table_group_commit_mode; + if (iequal(table_group_commit_mode, "async_mode") || + iequal(table_group_commit_mode, "sync_mode")) { + ctx->group_commit = true; + ctx->group_commit_mode = table_group_commit_mode; + return Status::OK(); + } + } ctx->txn_id = result.txnId; if (result.__isset.db_id) { ctx->db_id = result.db_id; diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java index fadd689c04aa6b..42e25ea8f0622d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeHandler.java @@ -2361,6 +2361,7 @@ public void updateTableProperties(Database db, String tableName, Map rowStoreColumns) { if (rowStoreColumns != null && !rowStoreColumns.isEmpty()) { modifyTableProperties(PropertyAnalyzer.PROPERTIES_STORE_ROW_COLUMN, "true"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/cloud/alter/CloudSchemaChangeHandler.java b/fe/fe-core/src/main/java/org/apache/doris/cloud/alter/CloudSchemaChangeHandler.java index caf5f93f772957..c2155963273e09 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/cloud/alter/CloudSchemaChangeHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/cloud/alter/CloudSchemaChangeHandler.java @@ -100,6 +100,7 @@ public void updateTableProperties(Database db, String tableName, Map allowedProps = new HashSet() { { + add(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_MODE); add(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_INTERVAL_MS); add(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_DATA_BYTES); add(PropertyAnalyzer.PROPERTIES_FILE_CACHE_TTL_SECONDS); @@ -152,6 +153,21 @@ public void updateTableProperties(Database db, String tableName, Map properties, bo return groupCommitDataBytes; } + public static String analyzeGroupCommitMode(Map properties, boolean removeProperty) + throws AnalysisException { + String groupCommitMode = GROUP_COMMIT_MODE_OFF; + if (properties != null && properties.containsKey(PROPERTIES_GROUP_COMMIT_MODE)) { + groupCommitMode = properties.get(PROPERTIES_GROUP_COMMIT_MODE); + if (!groupCommitMode.equalsIgnoreCase(GROUP_COMMIT_MODE_OFF) + && !groupCommitMode.equalsIgnoreCase(GROUP_COMMIT_MODE_ASYNC) + && !groupCommitMode.equalsIgnoreCase(GROUP_COMMIT_MODE_SYNC)) { + throw new AnalysisException("Invalid group_commit_mode: " + groupCommitMode + + ". Valid values: " + GROUP_COMMIT_MODE_OFF + ", " + GROUP_COMMIT_MODE_ASYNC + + ", " + GROUP_COMMIT_MODE_SYNC); + } + if (removeProperty) { + properties.remove(PROPERTIES_GROUP_COMMIT_MODE); + } + } + return groupCommitMode.toLowerCase(); + } + /** * Check the type property of the catalog props. */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index e23fac83674380..102060ae2e6d60 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -3048,6 +3048,13 @@ private boolean createOlapTable(Database db, CreateTableInfo createTableInfo) th throw new DdlException(e.getMessage()); } + try { + String groupCommitMode = PropertyAnalyzer.analyzeGroupCommitMode(properties, true); + olapTable.setGroupCommitMode(groupCommitMode); + } catch (Exception e) { + throw new DdlException(e.getMessage()); + } + try { TEncryptionAlgorithm tdeAlgorithm = PropertyAnalyzer.analyzeTDEAlgorithm(properties); olapTable.setEncryptionAlgorithm(tdeAlgorithm); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ModifyTablePropertiesOp.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ModifyTablePropertiesOp.java index e1fce11e594e87..5f20285ed5bf75 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ModifyTablePropertiesOp.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ModifyTablePropertiesOp.java @@ -336,6 +336,10 @@ public void validate(ConnectContext ctx) throws UserException { PropertyAnalyzer.analyzeGroupCommitDataBytes(properties, false); this.needTableStable = false; this.opType = AlterOpType.MODIFY_TABLE_PROPERTY_SYNC; + } else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_MODE)) { + PropertyAnalyzer.analyzeGroupCommitMode(properties, false); + this.needTableStable = false; + this.opType = AlterOpType.MODIFY_TABLE_PROPERTY_SYNC; } else if (properties.containsKey(PropertyAnalyzer.PROPERTIES_FILE_CACHE_TTL_SECONDS)) { this.needTableStable = false; this.opType = AlterOpType.MODIFY_TABLE_PROPERTY_SYNC; diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 4a0b486dc62c44..89bc3fd289ec6b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -1220,7 +1220,12 @@ public TLoadTxnBeginResult loadTxnBegin(TLoadTxnBeginRequest request) throws TEx try { TLoadTxnBeginResult tmpRes = loadTxnBeginImpl(request, clientAddr); - result.setTxnId(tmpRes.getTxnId()).setDbId(tmpRes.getDbId()); + if (tmpRes.isSetTableGroupCommitMode()) { + // if use table group commit mode, just return the mode info, no need to begin txn + result.setTableGroupCommitMode(tmpRes.getTableGroupCommitMode()).setDbId(tmpRes.getDbId()); + } else { + result.setTxnId(tmpRes.getTxnId()).setDbId(tmpRes.getDbId()); + } } catch (DuplicatedRequestException e) { // this is a duplicate request, just return previous txn id LOG.warn("duplicate request for stream load. request id: {}, txn: {}", e.getDuplicatedRequestId(), @@ -1277,6 +1282,16 @@ private TLoadTxnBeginResult loadTxnBeginImpl(TLoadTxnBeginRequest request, Strin } OlapTable table = (OlapTable) db.getTableOrMetaException(request.tbl, TableType.OLAP); + // check if use table group_commit_mode property + if (request.isUseTableGroupCommitMode()) { + String tableGroupCommitMode = table.getGroupCommitMode(); + if (tableGroupCommitMode != null && !tableGroupCommitMode.equalsIgnoreCase( + PropertyAnalyzer.GROUP_COMMIT_MODE_OFF)) { + TLoadTxnBeginResult result = new TLoadTxnBeginResult(); + result.setTableGroupCommitMode(tableGroupCommitMode).setDbId(db.getId()); + return result; + } + } // begin long timeoutSecond = request.isSetTimeout() ? request.getTimeout() : Config.stream_load_default_timeout_second; Backend backend = Env.getCurrentSystemInfo().getBackend(request.getBackendId()); diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto index c8d8a40487632b..b60f330dcfeb4b 100644 --- a/gensrc/proto/cloud.proto +++ b/gensrc/proto/cloud.proto @@ -650,6 +650,7 @@ message TabletMetaInfoPB { // For update tablet meta optional bool disable_auto_compaction = 13; optional bool enable_mow_light_delete = 14; optional int32 vertical_compaction_num_columns_per_group = 15; + optional string group_commit_mode = 16; } message TabletCompactionJobPB { diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 73bf949c3b4f5c..9eed2cd26af2c5 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -464,6 +464,8 @@ struct TLoadTxnBeginRequest { 14: optional i64 table_id 15: optional i64 backend_id 16: optional TCertBasedAuth cert_based_auth + // If set to true: use table group_commit_mode property + 17: optional bool use_table_group_commit_mode } struct TLoadTxnBeginResult { @@ -471,6 +473,9 @@ struct TLoadTxnBeginResult { 2: optional i64 txnId 3: optional string job_status // if label already used, set status of existing job 4: optional i64 db_id + // If use_table_group_commit_mode is true in TLoadTxnBeginRequest, and table group_commit_mode property is + // async_mode or sync_mode, return table group_commit_mode (begin_txn is skipped) + 5: optional string table_group_commit_mode } struct TBeginTxnRequest { diff --git a/regression-test/data/query_p0/system/test_table_properties.out b/regression-test/data/query_p0/system/test_table_properties.out index 0f8b1a2acafd57..9f6e334cc9abdf 100644 --- a/regression-test/data/query_p0/system/test_table_properties.out +++ b/regression-test/data/query_p0/system/test_table_properties.out @@ -1,6 +1,6 @@ -- This file is automatically generated. You should know what you did if you want to edit this -- !select_check_1 -- -111 +114 -- !select_check_2 -- internal test_table_properties_db duplicate_table _auto_bucket false @@ -19,6 +19,7 @@ internal test_table_properties_db duplicate_table enable_unique_key_merge_on_wri internal test_table_properties_db duplicate_table file_cache_ttl_seconds 0 internal test_table_properties_db duplicate_table group_commit_data_bytes 134217728 internal test_table_properties_db duplicate_table group_commit_interval_ms 10000 +internal test_table_properties_db duplicate_table group_commit_mode off_mode internal test_table_properties_db duplicate_table in_memory false internal test_table_properties_db duplicate_table inverted_index_storage_format V3 internal test_table_properties_db duplicate_table is_being_synced false @@ -55,6 +56,7 @@ internal test_table_properties_db listtable enable_unique_key_merge_on_write fal internal test_table_properties_db listtable file_cache_ttl_seconds 0 internal test_table_properties_db listtable group_commit_data_bytes 134217728 internal test_table_properties_db listtable group_commit_interval_ms 10000 +internal test_table_properties_db listtable group_commit_mode off_mode internal test_table_properties_db listtable in_memory false internal test_table_properties_db listtable inverted_index_storage_format V3 internal test_table_properties_db listtable is_being_synced false @@ -91,6 +93,7 @@ internal test_table_properties_db unique_table enable_unique_key_merge_on_write internal test_table_properties_db unique_table file_cache_ttl_seconds 0 internal test_table_properties_db unique_table group_commit_data_bytes 134217728 internal test_table_properties_db unique_table group_commit_interval_ms 10000 +internal test_table_properties_db unique_table group_commit_mode off_mode internal test_table_properties_db unique_table in_memory false internal test_table_properties_db unique_table inverted_index_storage_format V3 internal test_table_properties_db unique_table is_being_synced false @@ -129,6 +132,7 @@ internal test_table_properties_db duplicate_table enable_unique_key_merge_on_wri internal test_table_properties_db duplicate_table file_cache_ttl_seconds 0 internal test_table_properties_db duplicate_table group_commit_data_bytes 134217728 internal test_table_properties_db duplicate_table group_commit_interval_ms 10000 +internal test_table_properties_db duplicate_table group_commit_mode off_mode internal test_table_properties_db duplicate_table in_memory false internal test_table_properties_db duplicate_table inverted_index_storage_format V3 internal test_table_properties_db duplicate_table is_being_synced false @@ -165,6 +169,7 @@ internal test_table_properties_db unique_table enable_unique_key_merge_on_write internal test_table_properties_db unique_table file_cache_ttl_seconds 0 internal test_table_properties_db unique_table group_commit_data_bytes 134217728 internal test_table_properties_db unique_table group_commit_interval_ms 10000 +internal test_table_properties_db unique_table group_commit_mode off_mode internal test_table_properties_db unique_table in_memory false internal test_table_properties_db unique_table inverted_index_storage_format V3 internal test_table_properties_db unique_table is_being_synced false @@ -205,6 +210,7 @@ internal test_table_properties_db duplicate_table enable_unique_key_merge_on_wri internal test_table_properties_db duplicate_table file_cache_ttl_seconds 0 internal test_table_properties_db duplicate_table group_commit_data_bytes 134217728 internal test_table_properties_db duplicate_table group_commit_interval_ms 10000 +internal test_table_properties_db duplicate_table group_commit_mode off_mode internal test_table_properties_db duplicate_table in_memory false internal test_table_properties_db duplicate_table inverted_index_storage_format V3 internal test_table_properties_db duplicate_table is_being_synced false diff --git a/regression-test/suites/insert_p0/insert_group_commit_into.groovy b/regression-test/suites/insert_p0/insert_group_commit_into.groovy index ff8aaeeefc7dfc..9f18e9a813c839 100644 --- a/regression-test/suites/insert_p0/insert_group_commit_into.groovy +++ b/regression-test/suites/insert_p0/insert_group_commit_into.groovy @@ -513,6 +513,69 @@ suite("insert_group_commit_into") { """ exception """group_commit_data_bytes must be greater than 0""" } + + // Test group_commit_mode property + sql """ drop table if exists ${table}_mode; """ + sql """ + CREATE TABLE IF NOT EXISTS ${table}_mode ( + k1 INT not null, + k2 varchar(50) + ) + DUPLICATE KEY(`k1`) + DISTRIBUTED BY HASH(`k1`) + BUCKETS 1 PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "group_commit_interval_ms" = "200", + "group_commit_mode" = "async_mode" + ); + """ + + // Verify SHOW CREATE TABLE displays group_commit_mode + def createStmt = sql """ SHOW CREATE TABLE ${table}_mode """ + logger.info("SHOW CREATE TABLE result: " + createStmt) + assertTrue(createStmt.toString().contains('group_commit_mode'), "SHOW CREATE TABLE should contain group_commit_mode") + assertTrue(createStmt.toString().contains('async_mode'), "SHOW CREATE TABLE should contain async_mode") + + // Test ALTER TABLE to change group_commit_mode + sql """ ALTER TABLE ${table}_mode SET ("group_commit_mode" = "SYNC_MODE"); """ + + def createStmt2 = sql """ SHOW CREATE TABLE ${table}_mode """ + logger.info("SHOW CREATE TABLE result after alter: " + createStmt2) + assertTrue(createStmt2.toString().contains('SYNC_MODE'), "SHOW CREATE TABLE should contain sync_mode after alter") + + // Test ALTER TABLE to change back to off_mode - should NOT show in SHOW CREATE TABLE + sql """ ALTER TABLE ${table}_mode SET ("group_commit_mode" = "OFF_MODE"); """ + + def createStmt3 = sql """ SHOW CREATE TABLE ${table}_mode """ + logger.info("SHOW CREATE TABLE result after alter to off_mode: " + createStmt3) + // off_mode should NOT be shown in SHOW CREATE TABLE + assertTrue(!createStmt3.toString().contains('group_commit_mode'), + "off_mode should NOT be shown in SHOW CREATE TABLE") + + // Test invalid group_commit_mode value + test { + sql """ ALTER TABLE ${table}_mode SET ("group_commit_mode" = "invalid_mode"); """ + exception """Invalid group_commit_mode""" + } + + // Test default value (off_mode) - should NOT show group_commit_mode in SHOW CREATE TABLE + sql """ drop table if exists ${table}_mode_default; """ + sql """ + CREATE TABLE IF NOT EXISTS ${table}_mode_default ( + k1 INT not null + ) + DUPLICATE KEY(`k1`) + DISTRIBUTED BY HASH(`k1`) + BUCKETS 1 PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + def createStmt4 = sql """ SHOW CREATE TABLE ${table}_mode_default """ + logger.info("SHOW CREATE TABLE result for default: " + createStmt4) + // When default is off_mode, it should NOT be shown in SHOW CREATE TABLE + assertTrue(!createStmt4.toString().contains('group_commit_mode'), + "Default off_mode should NOT be shown in SHOW CREATE TABLE") + } finally { } } diff --git a/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy b/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy index a2de2a04b98d9d..342b87f13960c0 100644 --- a/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy +++ b/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy @@ -31,6 +31,22 @@ suite("test_group_commit_stream_load") { } } + def getTableRowCount = { tableName1, expectedRowCount -> + def retry = 0 + def row = 0 + while (retry < 30) { + sleep(2000) + def rowCount = sql "select count(*) from ${tableName1}" + logger.info("rowCount: " + rowCount + ", retry: " + retry) + row = rowCount[0][0] + if (row >= expectedRowCount) { + break + } + retry++ + } + assertTrue(row >= expectedRowCount, "Expected at least ${expectedRowCount} rows, but got ${row}") + } + def getAlterTableState = { waitForSchemaChangeDone { sql """ SHOW ALTER TABLE COLUMN WHERE tablename='${tableName}' ORDER BY createtime DESC LIMIT 1 """ @@ -342,4 +358,164 @@ suite("test_group_commit_stream_load") { } qt_read_json_by_line "select k,v1,v2,v3,v4,v5,BITMAP_TO_STRING(__DORIS_SKIP_BITMAP_COL__) from ${tableName} order by k;" + // Test: stream load using table property group_commit_mode (async_mode) + // When HTTP header 'group_commit' is not set, should use table's group_commit_mode property + def tableNameAsync = "test_group_commit_stream_load_table_property_async" + try { + sql """ drop table if exists ${tableNameAsync}; """ + + sql """ + CREATE TABLE `${tableNameAsync}` ( + `id` int(11) NOT NULL, + `name` varchar(100) NULL, + `score` int(11) NULL default "-1" + ) ENGINE=OLAP + DUPLICATE KEY(`id`, `name`) + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "group_commit_interval_ms" = "200", + "group_commit_mode" = "async_mode" + ); + """ + + // Verify SHOW CREATE TABLE contains group_commit_mode + def createStmt1 = sql """ SHOW CREATE TABLE ${tableNameAsync} """ + logger.info("SHOW CREATE TABLE for async: " + createStmt1) + assertTrue(createStmt1.toString().contains('async_mode'), "Table should have async_mode") + + // Stream load WITHOUT setting group_commit header - should use table property + streamLoad { + table "${tableNameAsync}" + set 'column_separator', ',' + // NOT setting 'group_commit' header - should use table property + set 'columns', 'id, name' + file "test_stream_load1.csv" + unset 'label' + time 10000 + + check { result, exception, startTime, endTime -> + checkStreamLoadResult(exception, result, 2, 2, 0, 0) + } + } + // Check data is loaded + getTableRowCount(tableNameAsync, 2) + + streamLoad { + table "${tableNameAsync}" + set 'column_separator', ',' + // NOT setting 'group_commit' header - should use table property, but set partitions + set 'partitions', "${tableNameAsync}" + set 'columns', 'id, name' + file "test_stream_load1.csv" + time 10000 + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result (header override): ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + def label = json.Label + assertTrue(!label.startsWith("group_commit")) + } + } + } finally { + } + + // Test: stream load using table property group_commit_mode (sync_mode) + def tableNameSync = "test_group_commit_stream_load_table_property_sync" + try { + sql """ drop table if exists ${tableNameSync}; """ + + sql """ + CREATE TABLE `${tableNameSync}` ( + `id` int(11) NOT NULL, + `name` varchar(100) NULL, + `score` int(11) NULL default "-1" + ) ENGINE=OLAP + DUPLICATE KEY(`id`, `name`) + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "group_commit_interval_ms" = "200", + "group_commit_mode" = "SYNC_MODE" + ); + """ + + // Verify SHOW CREATE TABLE contains group_commit_mode + def createStmt2 = sql """ SHOW CREATE TABLE ${tableNameSync} """ + logger.info("SHOW CREATE TABLE for sync: " + createStmt2) + assertTrue(createStmt2.toString().contains('sync_mode'), "Table should have sync_mode") + + // Stream load WITHOUT setting group_commit header - should use table property (sync_mode) + streamLoad { + table "${tableNameSync}" + set 'column_separator', ',' + // NOT setting 'group_commit' header - should use table property + set 'columns', 'id, name' + file "test_stream_load1.csv" + unset 'label' + time 10000 + + check { result, exception, startTime, endTime -> + checkStreamLoadResult(exception, result, 2, 2, 0, 0) + } + } + + // Check data is loaded + def rowCount2 = sql "select count(*) from ${tableNameSync}" + logger.info("Row count for sync table: " + rowCount2) + assertTrue(rowCount2[0][0] > 0, "Data should be loaded") + + } finally { + } + + // Test: stream load with header override table property + // Table has async_mode, but header sets off_mode + def tableNameOverride = "test_group_commit_stream_load_override" + try { + sql """ drop table if exists ${tableNameOverride}; """ + + sql """ + CREATE TABLE `${tableNameOverride}` ( + `id` int(11) NOT NULL, + `name` varchar(100) NULL, + `score` int(11) NULL default "-1" + ) ENGINE=OLAP + DUPLICATE KEY(`id`, `name`) + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "group_commit_interval_ms" = "200", + "group_commit_mode" = "async_mode" + ); + """ + + // Stream load with header setting group_commit=off_mode - should override table property + streamLoad { + table "${tableNameOverride}" + set 'column_separator', ',' + set 'columns', 'id, name' + set 'group_commit', 'off_mode' // Override table property + file "test_stream_load1.csv" + set 'label', 'test_override_' + System.currentTimeMillis() + time 10000 + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result (header override): ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + def label = json.Label + assertTrue(label.startsWith("test_override_"), "Label should start with test_override_") + // When off_mode, GroupCommit should be false or label should not start with group_commit_ + // Note: GroupCommit field behavior may vary, but label should NOT be group_commit_ when off_mode + } + } + } finally { + } } From a1268238f7928747d9239aa1a012be2dfe559fad Mon Sep 17 00:00:00 2001 From: meiyi Date: Wed, 18 Mar 2026 20:40:46 +0800 Subject: [PATCH 3/3] fix --- .../main/java/org/apache/doris/service/FrontendServiceImpl.java | 1 + 1 file changed, 1 insertion(+) diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 89bc3fd289ec6b..f5d5fc3af56268 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -74,6 +74,7 @@ import org.apache.doris.common.io.Text; import org.apache.doris.common.util.DebugPointUtil; import org.apache.doris.common.util.DebugPointUtil.DebugPoint; +import org.apache.doris.common.util.PropertyAnalyzer; import org.apache.doris.common.util.Util; import org.apache.doris.cooldown.CooldownDelete; import org.apache.doris.datasource.CatalogIf;