Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 49 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ datafusion-orc = "0.7"
datafusion-pg-catalog = "0.15.1"
datafusion-physical-expr = "=52.1"
datafusion-physical-plan = "=52.1"
datafusion-proto = "=52.1"
datafusion-sql = "=52.1"
datafusion-substrait = "=52.1"
deadpool = "0.12"
Expand All @@ -154,7 +155,7 @@ etcd-client = { version = "0.17", features = [
fst = "0.4.7"
futures = "0.3"
futures-util = "0.3"
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "092ba1d01e2da676dca66cca7eebb55009da8ef8" }
greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "9423b6ae25e8e64b1c57ef2594a6a7698efb3c5a" }
hex = "0.4"
http = "1"
humantime = "2.1"
Expand Down Expand Up @@ -251,7 +252,7 @@ tracing-appender = "0.2"
tracing-opentelemetry = "0.31.0"
tracing-subscriber = { version = "0.3", features = ["env-filter", "json", "fmt"] }
typetag = "0.2"
uuid = { version = "1.17", features = ["serde", "v4", "fast-rng"] }
uuid = { version = "1.17", features = ["serde", "v4", "v7", "fast-rng"] }
vrl = "0.25"
zstd = "0.13"
# DO_NOT_REMOVE_THIS: END_OF_EXTERNAL_DEPENDENCIES
Expand Down Expand Up @@ -341,6 +342,7 @@ datafusion-optimizer = { git = "https://github.com/GreptimeTeam/datafusion.git",
datafusion-physical-expr = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "02b82535e0160c4545667f36a03e1ff9d1d2e51f" }
datafusion-physical-expr-common = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "02b82535e0160c4545667f36a03e1ff9d1d2e51f" }
datafusion-physical-plan = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "02b82535e0160c4545667f36a03e1ff9d1d2e51f" }
datafusion-proto = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "02b82535e0160c4545667f36a03e1ff9d1d2e51f" }
datafusion-datasource = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "02b82535e0160c4545667f36a03e1ff9d1d2e51f" }
datafusion-sql = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "02b82535e0160c4545667f36a03e1ff9d1d2e51f" }
datafusion-substrait = { git = "https://github.com/GreptimeTeam/datafusion.git", rev = "02b82535e0160c4545667f36a03e1ff9d1d2e51f" }
Expand Down
74 changes: 73 additions & 1 deletion src/client/src/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ use std::sync::Arc;

use api::region::RegionResponse;
use api::v1::ResponseHeader;
use api::v1::region::RegionRequest;
use api::v1::region::{
RegionRequest, RegionRequestHeader, RemoteDynFilterRequest, RemoteDynFilterUnregister,
RemoteDynFilterUpdate, region_request, remote_dyn_filter_request,
};
use arc_swap::ArcSwapOption;
use arrow_flight::Ticket;
use async_stream::stream;
Expand Down Expand Up @@ -284,6 +287,48 @@ impl RegionRequester {
pub async fn handle(&self, request: RegionRequest) -> Result<RegionResponse> {
self.handle_inner(request).await
}

pub async fn handle_remote_dyn_filter_update(
&self,
query_id: impl Into<String>,
update: RemoteDynFilterUpdate,
) -> Result<RegionResponse> {
self.handle_inner(build_remote_dyn_filter_request(
query_id.into(),
remote_dyn_filter_request::Action::Update(update),
))
.await
}

pub async fn handle_remote_dyn_filter_unregister(
&self,
query_id: impl Into<String>,
unregister: RemoteDynFilterUnregister,
) -> Result<RegionResponse> {
self.handle_inner(build_remote_dyn_filter_request(
query_id.into(),
remote_dyn_filter_request::Action::Unregister(unregister),
))
.await
}
}

fn build_remote_dyn_filter_request(
query_id: String,
action: remote_dyn_filter_request::Action,
) -> RegionRequest {
RegionRequest {
header: Some(RegionRequestHeader {
tracing_context: TracingContext::from_current_span().to_w3c(),
..Default::default()
}),
body: Some(region_request::Body::RemoteDynFilter(
RemoteDynFilterRequest {
query_id,
action: Some(action),
},
)),
}
}

pub fn check_response_header(header: &Option<ResponseHeader>) -> Result<()> {
Expand Down Expand Up @@ -312,6 +357,7 @@ pub fn check_response_header(header: &Option<ResponseHeader>) -> Result<()> {
#[cfg(test)]
mod test {
use api::v1::Status as PbStatus;
use api::v1::region::{RemoteDynFilterUpdate, region_request, remote_dyn_filter_request};

use super::*;
use crate::Error::{IllegalDatabaseResponse, Server};
Expand Down Expand Up @@ -361,4 +407,30 @@ mod test {
assert_eq!(code, StatusCode::Internal);
assert_eq!(msg, "blabla");
}

#[test]
fn test_build_remote_dyn_filter_request_sets_header_and_body() {
let request = build_remote_dyn_filter_request(
"query-1".to_string(),
remote_dyn_filter_request::Action::Update(RemoteDynFilterUpdate {
filter_id: "filter-1".to_string(),
payload: vec![1, 2, 3],
generation: 7,
is_complete: false,
}),
);

request.header.expect("remote dyn filter header must exist");

let body = request.body.expect("remote dyn filter body must exist");
let region_request::Body::RemoteDynFilter(remote_request) = body else {
panic!("expected remote dyn filter request body");
};

assert_eq!(remote_request.query_id, "query-1");
assert!(matches!(
remote_request.action,
Some(remote_dyn_filter_request::Action::Update(_))
));
}
}
3 changes: 3 additions & 0 deletions src/common/query/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ common-time.workspace = true
datafusion.workspace = true
datafusion-common.workspace = true
datafusion-expr.workspace = true
datafusion-proto.workspace = true
datatypes.workspace = true
once_cell.workspace = true
prost.workspace = true
serde.workspace = true
snafu.workspace = true
sqlparser.workspace = true
Expand All @@ -33,4 +35,5 @@ store-api.workspace = true
[dev-dependencies]
common-base.workspace = true
futures-util.workspace = true
serde_json.workspace = true
tokio.workspace = true
Loading
Loading