-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[feat](group commit) add group_commit_mode in table property #61242
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -84,6 +84,9 @@ bvar::LatencyRecorder g_stream_load_commit_and_publish_latency_ms("stream_load", | |
|
|
||
| static constexpr size_t MIN_CHUNK_SIZE = 64 * 1024; | ||
| static const std::string CHUNK = "chunked"; | ||
| static const std::string OFF_MODE = "off_mode"; | ||
| static const std::string SYNC_MODE = "sync_mode"; | ||
| static const std::string ASYNC_MODE = "async_mode"; | ||
|
|
||
| #ifdef BE_TEST | ||
| TStreamLoadPutResult k_stream_load_put_result; | ||
|
|
@@ -254,6 +257,7 @@ int StreamLoadAction::on_header(HttpRequest* req) { | |
|
|
||
| LOG(INFO) << "new income streaming load request." << ctx->brief() << ", db=" << ctx->db | ||
| << ", tbl=" << ctx->table << ", group_commit=" << ctx->group_commit | ||
| << ", group_commit_mode=" << ctx->group_commit_mode | ||
| << ", HTTP headers=" << req->get_all_headers(); | ||
| ctx->begin_receive_and_read_data_cost_nanos = MonotonicNanos(); | ||
|
|
||
|
|
@@ -366,6 +370,9 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, std::shared_ptr<Strea | |
| int64_t begin_txn_start_time = MonotonicNanos(); | ||
| RETURN_IF_ERROR(_exec_env->stream_load_executor()->begin_txn(ctx.get())); | ||
| ctx->begin_txn_cost_nanos = MonotonicNanos() - begin_txn_start_time; | ||
| if (ctx->group_commit) { | ||
| RETURN_IF_ERROR(_check_wal_space(ctx->group_commit_mode, ctx->body_bytes)); | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [Low] At this point in This means the WAL space check is effectively a no-op for the table-property group commit path. The explicit header path has a proper check in Consider using the Content-Length from the HTTP header instead: if (ctx->group_commit) {
int64_t content_length = http_req->header(HttpHeaders::CONTENT_LENGTH).empty()
? 0 : std::stoll(http_req->header(HttpHeaders::CONTENT_LENGTH));
RETURN_IF_ERROR(_check_wal_space(ctx->group_commit_mode, content_length));
} |
||
| } | ||
|
|
||
| // process put file | ||
|
|
@@ -767,12 +774,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, | |
| request.__set_stream_per_node(stream_per_node); | ||
| } | ||
| if (ctx->group_commit) { | ||
| if (!http_req->header(HTTP_GROUP_COMMIT).empty()) { | ||
| request.__set_group_commit_mode(http_req->header(HTTP_GROUP_COMMIT)); | ||
| } else { | ||
| // used for wait_internal_group_commit_finish | ||
| request.__set_group_commit_mode("sync_mode"); | ||
| } | ||
| request.__set_group_commit_mode(ctx->group_commit_mode); | ||
| } | ||
|
|
||
| if (!http_req->header(HTTP_COMPUTE_GROUP).empty()) { | ||
|
|
@@ -811,7 +813,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, | |
| if (config::is_cloud_mode() && ctx->two_phase_commit && ctx->is_mow_table()) { | ||
| return Status::NotSupported("stream load 2pc is unsupported for mow table"); | ||
| } | ||
| if (http_req->header(HTTP_GROUP_COMMIT) == "async_mode") { | ||
| if (iequal(ctx->group_commit_mode, ASYNC_MODE)) { | ||
| // FIXME find a way to avoid chunked stream load write large WALs | ||
| size_t content_length = 0; | ||
| if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()) { | ||
|
|
@@ -886,17 +888,24 @@ void StreamLoadAction::_save_stream_load_record(std::shared_ptr<StreamLoadContex | |
| } | ||
| } | ||
|
|
||
| Status StreamLoadAction::_handle_group_commit(HttpRequest* req, | ||
| std::shared_ptr<StreamLoadContext> ctx) { | ||
| std::string group_commit_mode = req->header(HTTP_GROUP_COMMIT); | ||
| if (!group_commit_mode.empty() && !iequal(group_commit_mode, "sync_mode") && | ||
| !iequal(group_commit_mode, "async_mode") && !iequal(group_commit_mode, "off_mode")) { | ||
| return Status::InvalidArgument( | ||
| "group_commit can only be [async_mode, sync_mode, off_mode]"); | ||
| } | ||
| if (config::wait_internal_group_commit_finish) { | ||
| group_commit_mode = "sync_mode"; | ||
| Status StreamLoadAction::_check_wal_space(const std::string& group_commit_mode, | ||
| int64_t content_length) { | ||
| if (iequal(group_commit_mode, ASYNC_MODE) && | ||
| !load_size_smaller_than_wal_limit(content_length)) { | ||
| std::stringstream ss; | ||
| ss << "There is no space for group commit stream load async WAL. This stream load " | ||
| "size is " | ||
| << content_length | ||
| << ". WAL dir info: " << ExecEnv::GetInstance()->wal_mgr()->get_wal_dirs_info_string(); | ||
| LOG(WARNING) << ss.str(); | ||
| return Status::Error<EXCEEDED_LIMIT>(ss.str()); | ||
| } | ||
| return Status::OK(); | ||
| } | ||
|
|
||
| Status StreamLoadAction::_can_group_commit(HttpRequest* req, std::shared_ptr<StreamLoadContext> ctx, | ||
| std::string& group_commit_header, | ||
| bool& can_group_commit) { | ||
| int64_t content_length = req->header(HttpHeaders::CONTENT_LENGTH).empty() | ||
| ? 0 | ||
| : std::stoll(req->header(HttpHeaders::CONTENT_LENGTH)); | ||
|
|
@@ -907,13 +916,11 @@ Status StreamLoadAction::_handle_group_commit(HttpRequest* req, | |
| LOG(WARNING) << ss.str(); | ||
| return Status::InvalidArgument(ss.str()); | ||
| } | ||
| // allow chunked stream load in flink | ||
| auto is_chunk = !req->header(HttpHeaders::TRANSFER_ENCODING).empty() && | ||
| req->header(HttpHeaders::TRANSFER_ENCODING).find(CHUNK) != std::string::npos; | ||
| if (group_commit_mode.empty() || iequal(group_commit_mode, "off_mode") || | ||
| (content_length == 0 && !is_chunk)) { | ||
| if (content_length == 0 && !is_chunk) { | ||
| // off_mode and empty | ||
| ctx->group_commit = false; | ||
| can_group_commit = false; | ||
| return Status::OK(); | ||
| } | ||
| if (is_chunk) { | ||
|
|
@@ -930,20 +937,47 @@ Status StreamLoadAction::_handle_group_commit(HttpRequest* req, | |
| iequal(req->header(HTTP_UNIQUE_KEY_UPDATE_MODE), "UPDATE_FLEXIBLE_COLUMNS")); | ||
| if (!partial_columns && !partitions && !temp_partitions && !ctx->two_phase_commit && | ||
| !update_mode) { | ||
| if (!config::wait_internal_group_commit_finish && !ctx->label.empty()) { | ||
| if (!config::wait_internal_group_commit_finish && !group_commit_header.empty() && | ||
| !ctx->label.empty()) { | ||
| return Status::InvalidArgument("label and group_commit can't be set at the same time"); | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [High] Label check incorrectly rejects labeled stream loads on tables with When the user does NOT set the The problem: the user never explicitly requested group commit — they just have a table property. This is a behavioral regression because existing users who always set labels would suddenly get errors after someone sets the table property. Suggestion: Either skip this check when if (!group_commit_header.empty() && !config::wait_internal_group_commit_finish && !ctx->label.empty()) {
return Status::InvalidArgument("label and group_commit can't be set at the same time");
}This ensures only explicit header-based group commit rejects labels, while the table-property path allows labels (and falls through to a normal non-group-commit load when a label is present). |
||
| ctx->group_commit = true; | ||
| if (iequal(group_commit_mode, "async_mode")) { | ||
| if (!load_size_smaller_than_wal_limit(content_length)) { | ||
| std::stringstream ss; | ||
| ss << "There is no space for group commit stream load async WAL. This stream load " | ||
| "size is " | ||
| << content_length << ". WAL dir info: " | ||
| << ExecEnv::GetInstance()->wal_mgr()->get_wal_dirs_info_string(); | ||
| LOG(WARNING) << ss.str(); | ||
| return Status::Error<EXCEEDED_LIMIT>(ss.str()); | ||
| } | ||
| RETURN_IF_ERROR(_check_wal_space(group_commit_header, content_length)); | ||
| can_group_commit = true; | ||
| } | ||
| return Status::OK(); | ||
| } | ||
|
|
||
| Status StreamLoadAction::_handle_group_commit(HttpRequest* req, | ||
| std::shared_ptr<StreamLoadContext> ctx) { | ||
| std::string group_commit_header = req->header(HTTP_GROUP_COMMIT); | ||
| if (!group_commit_header.empty() && !iequal(group_commit_header, SYNC_MODE) && | ||
| !iequal(group_commit_header, ASYNC_MODE) && !iequal(group_commit_header, OFF_MODE)) { | ||
| return Status::InvalidArgument( | ||
| "group_commit can only be [async_mode, sync_mode, off_mode]"); | ||
| } | ||
| if (config::wait_internal_group_commit_finish) { | ||
| group_commit_header = SYNC_MODE; | ||
| } | ||
|
|
||
| // if group_commit_header is off_mode, we will not use group commit | ||
| if (iequal(group_commit_header, OFF_MODE)) { | ||
| ctx->group_commit_mode = OFF_MODE; | ||
| ctx->group_commit = false; | ||
| return Status::OK(); | ||
| } | ||
| bool can_group_commit = false; | ||
| RETURN_IF_ERROR(_can_group_commit(req, ctx, group_commit_header, can_group_commit)); | ||
| if (!can_group_commit) { | ||
| ctx->group_commit_mode = OFF_MODE; | ||
| ctx->group_commit = false; | ||
| } else { | ||
| if (!group_commit_header.empty()) { | ||
| ctx->group_commit_mode = group_commit_header; | ||
| ctx->group_commit = true; | ||
| } else { | ||
| // use table property to decide group commit or not | ||
| ctx->group_commit_mode = ""; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [Medium] Parallel code path: The However, the feature description says "For stream load, if it not set group_commit header, use the table property as the group commit mode". This should apply to both stream load endpoints. Consider either:
|
||
| ctx->group_commit = false; | ||
| } | ||
| } | ||
| return Status::OK(); | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.