Skip to content

feat: flow inc query terminal metrics transport#8045

Open
discord9 wants to merge 9 commits intomainfrom
flow-inc-pr2b-terminal-metrics-transport
Open

feat: flow inc query terminal metrics transport#8045
discord9 wants to merge 9 commits intomainfrom
flow-inc-pr2b-terminal-metrics-transport

Conversation

@discord9
Copy link
Copy Markdown
Contributor

@discord9 discord9 commented Apr 28, 2026

I hereby agree to the terms of the GreptimeDB CLA.

Refer to a related PR or issue link (optional)

What's changed and what's your intention?

This PR wires terminal record-batch metrics through the Flight/client/Flow consumer path.

It builds on #8015, which introduced RecordBatchMetrics.region_watermarks and query-side terminal metric collection.

Changes

  • Adds Flight transport support for optional trailing terminal Metrics messages.
  • Adds client-side OutputWithMetrics / OutputMetrics helpers so callers can consume terminal metrics without breaking existing Output APIs.
  • Adds Flow frontend-client support for requesting and reading terminal region watermarks in both standalone and distributed modes.
  • Validates Flow query extensions at the Flight boundary before dispatch.
  • Keeps normal query/affected-row behavior backward-compatible when terminal metrics are absent.

Scope

This PR is limited to terminal metrics transport and consumption:

  • src/servers/src/grpc/flight.rs
  • src/client/src/database.rs
  • src/flow/src/batching_mode/frontend_client.rs
  • related error/test updates

It intentionally does not include later stale-cursor, incremental-after-seq, benchmark.

Compatibility

Existing client APIs (sql, query, create, alter, etc.) continue to return plain Output.
Terminal metrics are opt-in through the new metrics-aware helper path.

Malformed terminal metrics are rejected as transport/parsing errors instead of being silently ignored.

Tests

  • cargo test -p client terminal_metrics --lib
  • cargo test -p flow query_with_terminal_metrics --lib

Coverage includes:

  • stream terminal metrics roundtrip
  • affected-rows terminal metrics roundtrip
  • invalid terminal metrics rejection
  • invalid Flow extension rejection
  • standalone and distributed Flow consumer paths

PR Checklist

Please convert it to a draft if some of the following conditions are not met.

  • I have written the necessary rustdoc comments.
  • I have added the necessary unit tests and integration tests.
  • This PR requires documentation updates.
  • API changes are backward compatible.
  • Schema or data changes are backward compatible.

Signed-off-by: discord9 <discord9@163.com>
Signed-off-by: discord9 <discord9@163.com>
@github-actions github-actions Bot added size/M docs-not-required This change does not impact docs. labels Apr 28, 2026
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces OutputMetrics and OutputWithMetrics to track and expose terminal metrics, such as region watermarks, from query results across the client, frontend, and gRPC layers. It updates the gRPC Flight stream handling to support interleaved metrics messages and adds new API methods to retrieve these metrics. A critical issue was identified in the client's stream processing where a valid RecordBatch could be dropped if a subsequent Metrics message is malformed; yielding the batch before processing the next message is recommended to prevent data loss.

Comment thread src/client/src/database.rs Outdated
Comment on lines +683 to +725
FlightMessage::RecordBatch(arrow_batch) => {
yield Ok(RecordBatch::from_df_record_batch(
let result_to_yield = RecordBatch::from_df_record_batch(
schema_cloned.clone(),
arrow_batch,
))
);

if let Some(next_flight_message_result) =
flight_message_stream.next().await
{
match next_flight_message_result {
Ok(FlightMessage::Metrics(s)) => {
match parse_terminal_metrics(&s) {
Ok(m) => {
metrics_ref.swap(Some(Arc::new(m)));
}
Err(e) => {
yield Err(BoxedError::new(e))
.context(ExternalSnafu);
break;
}
};
}
Ok(FlightMessage::RecordBatch(rb)) => {
buffered_message = Some(FlightMessage::RecordBatch(rb));
}
Ok(_) => {
yield IllegalFlightMessagesSnafu {reason: "A RecordBatch message can only be succeeded by a Metrics message or another RecordBatch message"}
.fail()
.map_err(BoxedError::new)
.context(ExternalSnafu);
break;
}
Err(e) => {
yield Err(BoxedError::new(e)).context(ExternalSnafu);
break;
}
}
} else {
stream_ended = true;
}

yield Ok(result_to_yield)
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The current implementation for handling FlightMessage::RecordBatch can lead to data loss. If an error occurs while processing the message following a RecordBatch (e.g., an invalid Metrics message), the stream yields an error and terminates, but the last valid RecordBatch that was received is dropped without being yielded to the consumer.

While the test test_invalid_terminal_metrics_after_record_batch_fails_before_yielding_batch seems to confirm this behavior, dropping valid data due to subsequent invalid metadata is not ideal for clients. The consumer should receive all valid data batches, and then an error if one occurs.

To fix this, the RecordBatch should be yielded immediately after it's created, before processing the next message in the stream. This ensures no data is lost if a subsequent error occurs.

                            FlightMessage::RecordBatch(arrow_batch) => {
                                let result_to_yield = RecordBatch::from_df_record_batch(
                                    schema_cloned.clone(),
                                    arrow_batch,
                                );
                                yield Ok(result_to_yield);

                                if let Some(next_flight_message_result) =
                                    flight_message_stream.next().await
                                {
                                    match next_flight_message_result {
                                        Ok(FlightMessage::Metrics(s)) => {
                                            match parse_terminal_metrics(&s) {
                                                Ok(m) => {
                                                    metrics_ref.swap(Some(Arc::new(m)));
                                                }
                                                Err(e) => {
                                                    yield Err(BoxedError::new(e))
                                                        .context(ExternalSnafu);
                                                }
                                            };
                                            break;
                                        }
                                        Ok(FlightMessage::RecordBatch(rb)) => {
                                            buffered_message = Some(FlightMessage::RecordBatch(rb));
                                        }
                                        Ok(_) => {
                                            yield IllegalFlightMessagesSnafu {reason: "A RecordBatch message can only be succeeded by a Metrics message or another RecordBatch message"}
                                                .fail()
                                                .map_err(BoxedError::new)
                                                .context(ExternalSnafu);
                                            break;
                                        }
                                        Err(e) => {
                                            yield Err(BoxedError::new(e)).context(ExternalSnafu);
                                            break;
                                        }
                                    }
                                } else {
                                    break;
                                }
                            }

Comment thread src/client/src/database.rs Outdated
Comment thread src/servers/src/grpc/flight.rs Outdated
Comment thread src/servers/src/grpc/flight.rs Outdated
Signed-off-by: discord9 <discord9@163.com>
@github-actions github-actions Bot added size/L and removed size/M labels Apr 29, 2026
@discord9 discord9 marked this pull request as ready for review April 29, 2026 09:04
@discord9 discord9 requested review from a team and waynexia as code owners April 29, 2026 09:04
@discord9 discord9 requested a review from killme2008 April 29, 2026 09:17
Signed-off-by: discord9 <discord9@163.com>
Avoid routing Flow-specific query extensions through comma-separated hints so checkpoint JSON values remain intact over Flight.

Signed-off-by: discord9 <discord9@163.com>
@github-actions github-actions Bot added size/XL and removed size/L labels Apr 30, 2026
Comment thread src/client/src/database.rs Outdated

const FLOW_EXTENSIONS_METADATA_KEY: &str = "x-greptime-flow-extensions";

#[derive(Debug, Clone, Default)]
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alaways document the pub structs

Comment thread src/client/src/database.rs Outdated
}
Some(FlightMessage::Metrics(_)) => {
return IllegalFlightMessagesSnafu {
reason: "'AffectedRows' Flight metadata already carries Metrics and cannot be followed by another Metrics message".to_string(),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
reason: "'AffectedRows' Flight metadata already carries Metrics and cannot be followed by another Metrics message".to_string(),
reason: "'AffectedRows' Flight metadata already carries Metrics and cannot be followed by another Metrics message",

.map(OutputWithMetrics::into_output)
}

pub async fn sql_with_terminal_metrics<S>(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Document the new pub function.

Schema(SchemaRef),
RecordBatch(DfRecordBatch),
AffectedRows(usize),
AffectedRows {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it a breaking change in protocol?

Comment thread src/datanode/src/region_server.rs Outdated
Comment thread src/flow/src/batching_mode/frontend_client.rs Outdated
Comment thread src/query/src/datafusion.rs
Comment thread src/query/src/metrics.rs
Comment thread src/client/src/database.rs Outdated
Comment thread src/client/src/database.rs Outdated
Comment thread src/client/src/database.rs
Comment thread src/client/src/database.rs Outdated
Comment thread src/client/src/database.rs Outdated
Comment thread src/client/src/database.rs Outdated
Comment thread src/client/src/database.rs Outdated
Comment thread src/common/grpc/src/flight.rs
Comment thread src/flow/src/batching_mode/frontend_client.rs Outdated
Comment thread src/query/src/options.rs
Comment on lines +202 to +207
fn should_collect_region_watermark(
return_region_seq: bool,
has_incremental_after_seqs: bool,
) -> bool {
return_region_seq || has_incremental_after_seqs
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to wrap this into a function?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to wrap this into a function?

it's used twice so kind of necessary?

Signed-off-by: discord9 <discord9@163.com>
@github-actions github-actions Bot added size/L and removed size/XL labels May 6, 2026
discord9 added 3 commits May 6, 2026 12:35
Signed-off-by: discord9 <discord9@163.com>
Signed-off-by: discord9 <discord9@163.com>
Signed-off-by: discord9 <discord9@163.com>
@github-actions github-actions Bot added size/XL and removed size/L labels May 6, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

docs-not-required This change does not impact docs. size/XL

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants