Conversation
Signed-off-by: discord9 <discord9@163.com>
Signed-off-by: discord9 <discord9@163.com>
There was a problem hiding this comment.
Code Review
This pull request introduces a remote dynamic filter protocol, enabling the serialization and transmission of DataFusion physical expressions between components. Key additions include the DynFilterPayload and DynFilterUpdate structures, along with client and server-side handlers for dynamic filter updates and unregistrations. The PR also implements automatic generation of a unique remote_query_id using UUID v7 within the QueryContext for better request tracking. Feedback highlights a naming inconsistency between internal structures and gRPC messages regarding the 'epoch' field, as well as a potential issue with strict column name validation when qualifiers are present.
| pub protocol_version: u32, | ||
| pub query_id: String, | ||
| pub filter_id: String, | ||
| pub epoch: u64, |
| if field.name() != column.name() { | ||
| return Err(DataFusionError::Plan(format!( | ||
| "Decoded Column name/index mismatch: payload has '{}' at index {}, but schema field is '{}'", | ||
| column.name(), | ||
| column.index(), | ||
| field.name() | ||
| ))); | ||
| } |
There was a problem hiding this comment.
The strict name check field.name() != column.name() might be too restrictive if the DataFusion Column expression contains qualifiers (e.g., table.column) while the input schema fields are unqualified. Consider using a more robust comparison or ensuring that qualifiers are handled consistently during serialization and deserialization.
Signed-off-by: discord9 <discord9@163.com>
| .channel(channel); | ||
| .channel(channel) | ||
| .set_extension( | ||
| REMOTE_QUERY_ID_EXTENSION_KEY.to_string(), |
There was a problem hiding this comment.
remote_query_id is being introduced here as the internal correlation key for remote dynamic filters, but this builder still applies every client-supplied hint after generating it. Because x-greptime-hints accepts arbitrary keys, an external gRPC caller can set remote_query_id=... and force collisions with another query's control-plane traffic. The same overwrite path exists for HTTP via http::hints::extract_hints(). Once follow-up PRs start using this ID to register/update/unregister filters, this becomes a spoofable internal identifier.
Signed-off-by: discord9 <discord9@163.com>
| #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] | ||
| #[non_exhaustive] | ||
| #[serde(tag = "kind", content = "payload", rename_all = "snake_case")] | ||
| pub enum DynFilterPayload { |
There was a problem hiding this comment.
Always document the public structs and functions.
| .as_ref() | ||
| .context(error::MissingRequiredFieldSnafu { name: "action" })? | ||
| { | ||
| api::v1::region::remote_dyn_filter_request::Action::Update(update) => { |
There was a problem hiding this comment.
use api::v1::region::remote_dyn_filter_request::Action
Make the code clean.
| .channel(channel); | ||
| .channel(channel) | ||
| .set_extension( | ||
| REMOTE_QUERY_ID_EXTENSION_KEY.to_string(), |
Signed-off-by: discord9 <discord9@163.com>
I hereby agree to the terms of the GreptimeDB CLA.
Refer to a related PR or issue link (optional)
GreptimeTeam/greptime-proto#314
Summary
remote_query_id,DynFilterUpdate, and shared payload encode/decode helpersupdate/unregisterhandling between frontend and datanodeWhy
This splits out the protocol and control-plane groundwork into a smaller reviewable PR before the larger end-to-end remote dynamic filter implementation. It gives us a stable ABI and RPC entrypoint first, so later work can build on a reviewed contract instead of mixing transport and runtime changes together.
Included in this PR
Not included
PR Checklist
Please convert it to a draft if some of the following conditions are not met.