Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
685 changes: 606 additions & 79 deletions src/client/src/database.rs

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion src/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub use common_recordbatch::{RecordBatches, SendableRecordBatchStream};
use snafu::OptionExt;

pub use self::client::Client;
pub use self::database::Database;
pub use self::database::{Database, OutputMetrics, OutputWithMetrics};
pub use self::error::{Error, Result};
use crate::error::{IllegalDatabaseResponseSnafu, ServerSnafu};

Expand Down
59 changes: 55 additions & 4 deletions src/common/grpc/src/flight.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ use crate::error::{DecodeFlightDataSnafu, InvalidFlightDataSnafu, Result};
pub enum FlightMessage {
Schema(SchemaRef),
RecordBatch(DfRecordBatch),
AffectedRows(usize),
AffectedRows {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it a breaking change in protocol?

rows: usize,
metrics: Option<String>,
Comment thread
discord9 marked this conversation as resolved.
},
Metrics(String),
}

Expand Down Expand Up @@ -116,10 +119,12 @@ impl FlightEncoder {
encoded_batch.into(),
)
}
FlightMessage::AffectedRows(rows) => {
FlightMessage::AffectedRows { rows, metrics } => {
let metadata = FlightMetadata {
affected_rows: Some(AffectedRows { value: rows as _ }),
metrics: None,
metrics: metrics.map(|s| Metrics {
metrics: s.into_bytes(),
}),
}
.encode_to_vec();
vec1![FlightData {
Expand Down Expand Up @@ -223,7 +228,12 @@ impl FlightDecoder {
let metadata = FlightMetadata::decode(flight_data.app_metadata.clone())
.context(DecodeFlightDataSnafu)?;
if let Some(AffectedRows { value }) = metadata.affected_rows {
return Ok(Some(FlightMessage::AffectedRows(value as _)));
return Ok(Some(FlightMessage::AffectedRows {
rows: value as _,
metrics: metadata
.metrics
.map(|m| String::from_utf8_lossy(&m.metrics).to_string()),
}));
}
if let Some(Metrics { metrics }) = metadata.metrics {
return Ok(Some(FlightMessage::Metrics(
Expand Down Expand Up @@ -426,6 +436,47 @@ mod test {
Ok(())
}

#[test]
fn test_affected_rows_metrics_encode_decode() -> Result<()> {
let metrics = r#"{"region_watermarks":[{"region_id":42,"watermark":7}]}"#;
let mut encoder = FlightEncoder::default();
let encoded = encoder.encode(FlightMessage::AffectedRows {
rows: 3,
metrics: Some(metrics.to_string()),
});

assert_eq!(encoded.len(), 1);

let mut decoder = FlightDecoder::default();
let decoded = decoder.try_decode(encoded.first())?.unwrap();
let FlightMessage::AffectedRows {
rows,
metrics: decoded_metrics,
} = decoded
else {
unreachable!()
};
assert_eq!(rows, 3);
assert_eq!(decoded_metrics.as_deref(), Some(metrics));

let encoded = encoder.encode(FlightMessage::AffectedRows {
rows: 5,
metrics: None,
});
let decoded = decoder.try_decode(encoded.first())?.unwrap();
let FlightMessage::AffectedRows {
rows,
metrics: decoded_metrics,
} = decoded
else {
unreachable!()
};
assert_eq!(rows, 5);
assert!(decoded_metrics.is_none());

Ok(())
}

#[test]
fn test_flight_messages_to_recordbatches() {
let schema = Arc::new(Schema::new(vec![Field::new("m", DataType::Int32, true)]));
Expand Down
143 changes: 138 additions & 5 deletions src/datanode/src/region_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ mod catalog;
use std::collections::HashMap;
use std::fmt::Debug;
use std::ops::Deref;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::task::{Context, Poll};
use std::time::Duration;

use api::region::RegionResponse;
Expand All @@ -36,7 +38,8 @@ use common_error::status_code::StatusCode;
use common_meta::datanode::TopicStatsReporter;
use common_query::OutputData;
use common_query::request::QueryRequest;
use common_recordbatch::SendableRecordBatchStream;
use common_recordbatch::adapter::RecordBatchMetrics;
use common_recordbatch::{OrderOption, RecordBatch, RecordBatchStream, SendableRecordBatchStream};
use common_runtime::Runtime;
use common_telemetry::tracing::{self, info_span};
use common_telemetry::tracing_context::{FutureExt, TracingContext};
Expand All @@ -45,6 +48,7 @@ use dashmap::DashMap;
use datafusion::datasource::TableProvider;
use datafusion_common::tree_node::TreeNode;
use either::Either;
use futures_util::Stream;
use futures_util::future::try_join_all;
use metric_engine::engine::MetricEngine;
use mito2::engine::{MITO_ENGINE_NAME, MitoEngine};
Expand All @@ -53,6 +57,7 @@ use query::QueryEngineRef;
pub use query::dummy_catalog::{
DummyCatalogList, DummyTableProviderFactory, TableProviderFactoryRef,
};
use query::options::should_collect_region_watermark_from_extensions;
use serde_json;
use servers::error::{
self as servers_error, ExecuteGrpcRequestSnafu, Result as ServerResult, SuspendedSnafu,
Expand Down Expand Up @@ -278,16 +283,31 @@ impl RegionServer {
.await
.context(DecodeLogicalPlanSnafu)?;

self.inner
let stream = self
.inner
.handle_read(
QueryRequest {
header: request.header,
region_id,
plan,
},
query_ctx,
query_ctx.clone(),
)
.await
.await?;

let region_latest_seq =
if should_collect_region_watermark_from_extensions(&query_ctx.extensions()) {
query_ctx.get_snapshot(region_id.as_u64())
} else {
None
};

if let Some(seq) = region_latest_seq {
Ok(Box::pin(RegionWatermarkStream::new(stream, region_id, seq))
as SendableRecordBatchStream)
} else {
Ok(stream)
}
}

#[tracing::instrument(skip_all)]
Expand Down Expand Up @@ -749,6 +769,83 @@ impl RegionServer {
}
}

/// Wraps a region read stream so terminal metrics can carry the scan-open watermark.
struct RegionWatermarkStream {
stream: SendableRecordBatchStream,
region_id: u64,
snapshot_seq: u64,
finished: AtomicBool,
}

impl RegionWatermarkStream {
fn new(stream: SendableRecordBatchStream, region_id: RegionId, snapshot_seq: u64) -> Self {
Self {
stream,
region_id: region_id.as_u64(),
snapshot_seq,
finished: AtomicBool::new(false),
}
}

fn merged_metrics(&self, mut metrics: RecordBatchMetrics) -> RecordBatchMetrics {
let entry = if let Some(entry) = metrics
.region_watermarks
.iter_mut()
.find(|entry| entry.region_id == self.region_id)
{
entry
} else {
metrics
.region_watermarks
.push(common_recordbatch::adapter::RegionWatermarkEntry {
region_id: self.region_id,
watermark: None,
});
metrics.region_watermarks.last_mut().unwrap()
};

entry.watermark = Some(self.snapshot_seq);
metrics
}
}

impl RecordBatchStream for RegionWatermarkStream {
fn name(&self) -> &str {
self.stream.name()
}

fn schema(&self) -> datatypes::schema::SchemaRef {
self.stream.schema()
}

fn output_ordering(&self) -> Option<&[OrderOption]> {
self.stream.output_ordering()
}

fn metrics(&self) -> Option<RecordBatchMetrics> {
let base = self.stream.metrics();
if !self.finished.load(Ordering::Relaxed) {
return base;
}

Some(self.merged_metrics(base.unwrap_or_default()))
}
}

impl Stream for RegionWatermarkStream {
type Item = common_recordbatch::error::Result<RecordBatch>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match Pin::new(&mut self.stream).poll_next(cx) {
Poll::Ready(None) => {
self.finished.store(true, Ordering::Relaxed);
Comment thread
discord9 marked this conversation as resolved.
Outdated
Poll::Ready(None)
}
other => other,
}
}
}

#[async_trait]
impl RegionServerHandler for RegionServer {
async fn handle(&self, request: region_request::Body) -> ServerResult<RegionResponseV1> {
Expand Down Expand Up @@ -1669,10 +1766,16 @@ impl RegionAttribute {
mod tests {

use std::assert_matches;
use std::sync::Arc;

use api::v1::SemanticType;
use common_error::ext::ErrorExt;
use datatypes::prelude::ConcreteDataType;
use common_recordbatch::RecordBatches;
use common_recordbatch::adapter::RegionWatermarkEntry;
use datatypes::prelude::{ConcreteDataType, VectorRef};
use datatypes::schema::{ColumnSchema, Schema};
use datatypes::vectors::Int32Vector;
use futures_util::StreamExt;
use mito2::test_util::CreateRequestBuilder;
use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
use store_api::region_engine::RegionEngine;
Expand All @@ -1685,6 +1788,36 @@ mod tests {
use crate::error::Result;
use crate::tests::{MockRegionEngine, mock_region_server};

#[tokio::test]
async fn test_region_watermark_stream_only_sets_terminal_metrics() {
let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
"v",
ConcreteDataType::int32_datatype(),
false,
)]));
let values: VectorRef = Arc::new(Int32Vector::from_slice([1, 2]));
let batch = RecordBatch::new(schema.clone(), vec![values]).unwrap();
let stream = RecordBatches::try_new(schema, vec![batch])
.unwrap()
.as_stream();

let region_id = RegionId::new(42, 7);
let wrapped = RegionWatermarkStream::new(stream, region_id, 99);
let mut pinned = Box::pin(wrapped);

assert!(pinned.as_ref().get_ref().metrics().is_none());
while pinned.next().await.is_some() {}

let metrics = pinned.as_ref().get_ref().metrics().unwrap();
assert_eq!(
metrics.region_watermarks,
vec![RegionWatermarkEntry {
region_id: region_id.as_u64(),
watermark: Some(99),
}]
);
}

#[tokio::test]
async fn test_region_registering() {
common_telemetry::init_default_ut_logging();
Expand Down
Loading
Loading