diff --git a/docs/rfcs/2026-04-08-aggregate-stats-physical-pass.md b/docs/rfcs/2026-04-08-aggregate-stats-physical-pass.md new file mode 100644 index 000000000000..15c2c9bd3b60 --- /dev/null +++ b/docs/rfcs/2026-04-08-aggregate-stats-physical-pass.md @@ -0,0 +1,363 @@ +--- +Feature Name: Aggregate Statistics Physical Optimizer Pass +Tracking Issue: TBD +Date: 2026-04-08 +Author: @OpenAgent +--- + +# Summary + +This RFC proposes a new physical-plan optimization pass for aggregate queries in GreptimeDB. +Instead of relying primarily on DataFusion's relation-level `Statistics/ColumnStatistics` to optimize whole-query planning, the new pass opportunistically consumes per-file statistics during physical optimization and execution planning. + +If some files have usable statistics for an aggregate, GreptimeDB will avoid scanning those files and synthesize partial aggregate state from their metadata. Files whose statistics are missing, incompatible, or unsafe to use will still be scanned normally. The two paths are then merged with the existing state/merge aggregate wrapper mechanism so the query remains correct while extracting as much value as possible from available statistics. + +# Motivation + +Today GreptimeDB's aggregate optimization is still mostly constrained by DataFusion's `ColumnStatistics/Statistics` model. +That model is useful for planner-level estimation and some coarse optimizations, but it is not a good fit for GreptimeDB's desired behavior: + +1. We want to optimize at the **physical file level**, not only at the relation level. +2. We want to support **mixed execution**: + - files with usable statistics -> answer from stats + - files without usable statistics -> fall back to scan +3. We want this to work even when only **part of the input** can benefit. +4. We want the optimization to degrade gracefully when file statistics become unavailable or semantically unusable, for example after repartition or other transformations. + +A physical pass is a better fit because it can inspect the concrete scan node, the concrete aggregate node, and the exact file set that would otherwise be scanned. + +# Goals + +1. Add a GreptimeDB physical optimizer rule that rewrites eligible aggregate plans to use file statistics opportunistically. +2. Allow `RegionScan` / `ScanInput` to skip files already covered by statistics-derived aggregate state. +3. Reuse the existing step-aggregate state/merge mechanism instead of inventing a separate aggregation framework. +4. Preserve correctness by falling back to normal scan whenever statistics are unavailable or unsafe. +5. Make mixed execution explicit and observable in `EXPLAIN` and metrics. + +# Non-Goals + +1. This RFC does **not** attempt to replace all DataFusion statistics usage. +2. This RFC does **not** try to optimize arbitrary expressions above aggregates. +3. This RFC does **not** guarantee support for every aggregate function in v1. +4. This RFC does **not** require exact group-by-from-statistics optimization in v1. +5. This RFC does **not** require statistics reconstruction for files whose metadata has already lost the needed semantics. +6. This RFC does **not** require support for non-append-only tables in v1. + +# Proposal + +## 1. Add a physical optimizer pass + +Add a new `PhysicalOptimizerRule` in GreptimeDB's query engine, alongside existing rules such as `ParallelizeScan` and `PassDistribution`. + +Conceptually, the rule targets plans of the form: + +```text +AggregateExec + RegionScanExec +``` + +or other small variants where the aggregate is still directly attributable to a single `RegionScanExec` input. + +The rule will: + +1. inspect the aggregate expressions; +2. inspect the underlying region scan and candidate files; +3. classify each file as either: + - **stats-eligible**, or + - **scan-required**; +4. rewrite the physical plan into a mixed plan that merges: + - partial aggregate state synthesized from file statistics, and + - partial aggregate state produced by scanning only the remaining files. + +This is a GreptimeDB-specific optimization and should live in GreptimeDB's physical optimizer pipeline, not in generic DataFusion statistics estimation. + +## 2. Optimization shape + +The core idea is to convert one aggregate over raw rows into one merge aggregate over two partial-state sources. + +For an original query like: + +```sql +select max(v) from t; +``` + +the optimized execution is conceptually equivalent to: + +1. compute state from scanned files: `__max_state(v)` +2. compute state from stats-only files: `__max_state(file_max(v))` +3. merge them with `__max_merge(...)` + +The exact expression syntax above is illustrative only. The physical rewrite should use the existing state/merge wrapper machinery rather than depend on SQL syntax. + +A conceptual physical shape is: + +```text +AggregateExec(mode=Final, aggr=[__max_merge(state_col)]) + UnionExec / MixedPartialSourceExec + AggregateExec(mode=Partial, aggr=[__max_state(v)]) + RegionScanExec(files = scan-required) + Literal partial-state input / values=[__max_state(file_max(v)), ...] +``` + +The same pattern extends to other supported aggregates. + +For v1, this RFC prefers **literal partial-state input** over a dedicated `StatisticsStateExec`. +The main reason is that the needed statistics are easiest to collect during optimization / rewrite, when the optimizer still has direct access to the concrete aggregate shape and the concrete file set under `RegionScanExec`. +At that point the optimizer can classify files, compute stats-derived partial states, and bake those states directly into the rewritten plan as constants or precomputed batches. + +By contrast, a dedicated `StatisticsStateExec` would need its own execution-time path to rediscover or reload the same file statistics, which adds another metadata access boundary and makes the feature more complex without changing the core semantics. + +## 3. File classification + +For each candidate file in the underlying scan, the optimizer classifies whether it can contribute via statistics. + +A file is **stats-eligible** for a given aggregate only if all required conditions hold. +Typical examples: + +- `MAX(col)`: file has usable max statistics for `col` +- `MIN(col)`: file has usable min statistics for `col` +- `COUNT(*)`: file has exact row count +- `COUNT(col)`: file has exact row count and null count for `col` + +A file is **scan-required** if any of the following apply: + +- required statistic is missing; +- required statistic is known to be inexact or semantically unsafe; +- the file contains semantics not captured by the statistic needed by this aggregate; +- the file has gone through transformations where the available metadata can no longer safely answer the aggregate (for example, after repartition and before compaction); +- the query shape prevents file-level attribution. + +This classification is per file, not all-or-nothing for the whole query. + +## 4. Why physical pass instead of `Statistics/ColumnStatistics` + +The current `RegionScanExec::partition_statistics()` integration is relation-oriented and coarse. +It is useful for estimation and some generic optimizer decisions, but it cannot naturally express: + +- a query answered by **some files from stats and some files from scan**; +- skipping specific files in `ScanInput.files` while still scanning others; +- building synthetic partial aggregate input from per-file metadata; +- graceful fallback when a subset of files lose usable statistics after repartition or similar operations. + +In other words, `Statistics/ColumnStatistics` is still useful metadata, but it is no longer the right execution boundary for this feature. + +## 5. Supported aggregates in v1 + +The recommended v1 scope is deliberately narrow: + +1. `MAX(col)` +2. `MIN(col)` +3. `COUNT(*)` +4. `COUNT(col)` + +These functions map naturally to common file statistics. + +### Deferred from v1 + +- `SUM(col)` +- `AVG(col)` +- `FIRST_VALUE` / `LAST_VALUE` +- `DISTINCT` aggregates +- grouped aggregation from file statistics + +`SUM/AVG` are intentionally deferred unless GreptimeDB has exact, semantics-preserving file-level summaries for them. Reusing the step-aggregate framework does not by itself make them safe; the underlying statistics must still be correct and complete enough. + +# Detailed Design + +## 1. Eligibility rules + +The physical rule should only fire when all of the following hold: + +1. The aggregate node is recognized and all aggregate expressions are in the supported set. +2. The aggregate input can still be traced to a concrete `RegionScanExec` file set. +3. The query shape is single-stage or can be safely rewritten into partial/final form. +4. There is no intermediate operator that destroys file-level attribution needed by this optimization. +5. At least one file is stats-eligible. +6. The underlying table is append-only in v1. + +The optimizer should bail out if the input has already crossed a boundary where "which file contributes which rows" is no longer meaningful for this optimization, for example after repartition or exchange that hides the original file set. + +## 2. `RegionScan` and `ScanInput` changes + +This RFC proposes that the physical rewrite eventually drives scan execution by excluding files already covered by statistics-derived partial state. + +At a high level, the scan path needs one of these equivalent capabilities: + +1. construct a new `RegionScanExec` whose scanner produces a `ScanInput` containing only `scan-required` files; or +2. pass an explicit `excluded_file_ids` / `stats_covered_files` hint into the scanner so `ScanInput.files` omits those files. + +The key requirement is: + +- **stats-eligible files must not be scanned again**. + +The memtable path remains unchanged in v1 and is always scanned normally. + +## 3. Materializing statistics-derived partial state + +For v1, the recommended design is to materialize stats-derived partial state during optimization and embed it into the rewritten plan as literal values or precomputed batches. + +Responsibilities of this materialization step: + +1. compute stats-derived partial aggregate states during rewrite; +2. expose a schema compatible with the upper merge aggregate; +3. feed one or more state rows into the merge side of the aggregate; +4. preserve enough explainability to show how many files were answered from statistics. + +This keeps the feature optimizer-driven: the same rewrite pass that classifies files also decides which files are skipped and what partial state replaces them. + +If a future version needs lazy metadata access or reusable stats computation, GreptimeDB can still introduce a dedicated `StatisticsStateExec` later. + +## 4. Reusing state/merge wrappers + +GreptimeDB already has step aggregate infrastructure in `aggr_wrapper` and distributed planning. +This RFC proposes reusing that model directly. + +Instead of introducing a separate "stats aggregate result" merge path, the optimizer should normalize both sources into the same intermediate representation: + +- scan path -> ordinary partial aggregate state +- stats path -> synthetic partial aggregate state + +Then the upper merge aggregate remains the single correctness boundary. + +This has two advantages: + +1. mixed execution becomes structurally uniform; +2. future aggregate extensions can piggyback on the same state/merge contract. + +## 5. Correctness rules + +Correctness is more important than hit rate. +The optimizer must fall back to scan whenever correctness cannot be proven. + +### 5.1 Null semantics + +Statistics-based answers must preserve SQL null semantics. +For example: + +- `COUNT(*)` uses exact row count +- `COUNT(col)` requires exact null count semantics +- `MIN/MAX(col)` must not treat missing stats as real values + +### 5.2 Delete / merge semantics + +If a file's visible query result can differ from simple file statistics because of deletion markers, merge semantics, or other storage-level visibility rules not reflected in the statistic, that file is scan-required. + +For that reason, a conservative v1 can explicitly restrict the optimization to **append-only** tables. +In append-only mode, the correctness surface is much smaller because files do not need stats-based reasoning across delete markers or row replacement semantics. +This significantly reduces the chance of classifying an unsafe file as stats-eligible. + +### 5.3 Mixed correctness + +The final answer must be the same as scanning all files. +The mixed plan is valid because it partitions the input file set into disjoint subsets: + +- subset A -> answered by stats-derived state +- subset B -> answered by scan-derived state + +and merges both through the same aggregate state contract. + +### 5.4 Repartition and degraded metadata + +After repartition or similar operations, some files may no longer have usable metadata for this optimization. +That is an expected case, not an error. +The rule should simply classify those files as scan-required and proceed with a mixed or pure-scan plan. + +One concrete example is the period **after repartition and before compaction**. +In that window, files may still physically originate from older partitioning/layout assumptions, while the current read path is already interpreting them under the new region layout. +Even if some original file statistics still exist, they may no longer be a safe answer source for the current aggregate optimization contract. +So those files should be treated as scan-required until a later compaction regenerates files and metadata under the new layout. + +## 6. Explain and observability + +The optimized plan should be visible in `EXPLAIN`. +At minimum we should be able to tell: + +- the aggregate was rewritten by the stats physical pass; +- how many files are answered from statistics; +- how many files remain in scan; +- whether the stats side is literal/precomputed input; +- which aggregate functions are optimized. + +Recommended metrics: + +- aggregate-stats eligible files +- aggregate-stats skipped files +- aggregate-stats fallback files +- aggregate-stats queries hit/miss + +# Rollout Plan + +## Phase 1: MVP + +1. Add the physical optimizer rule. +2. Restrict the optimization to append-only tables. +3. Support `MIN/MAX/COUNT(*)/COUNT(col)`. +4. Materialize stats-derived partial state as literal/precomputed input during rewrite. +5. Add the ability for scan planning to skip stats-covered files. +6. Add `EXPLAIN` output and metrics. + +## Phase 2: Scope expansion + +1. Revisit support for `SUM/AVG` only if exact semantics are available. +2. Revisit non-append-only tables once delete / merge visibility semantics are modeled safely. +3. Consider grouped aggregation if file-level summaries can safely support it. +4. Explore better costing / heuristics when using statistics is possible but not necessarily profitable. + +# Testing Plan + +1. Unit tests for file classification by aggregate type. +2. Unit tests for stats-state materialization. +3. Plan rewrite tests for: + - pure stats + - mixed stats + scan + - pure fallback scan +4. Correctness tests comparing optimized vs non-optimized answers. +5. Edge-case tests for: + - null-heavy columns + - missing statistics + - memtable + SST mixed inputs + - repartitioned / degraded-stat files + - append-only gating +6. `EXPLAIN` tests to verify plan visibility. + +# Risks + +1. Incorrectly classifying a file as stats-eligible would produce wrong answers. +2. Forcing this optimization too broadly may complicate aggregate planning and debugging. +3. The physical rewrite may become awkward if state/merge wrappers remain only partially exposed at the physical layer. +4. If scan skipping is not wired cleanly into `RegionScan` / `ScanInput`, the implementation may accidentally double count files. +5. Embedding too much precomputed state directly in the plan may become awkward if future workloads rely on much larger stats-derived inputs. + +# Alternatives + +## 1. Continue to rely on `Statistics/ColumnStatistics` + +Rejected for this feature because it cannot naturally express file-level mixed execution with scan skipping. + +## 2. Add a storage-side aggregate API only + +This would hide some complexity in storage, but it makes the optimization less transparent at the query layer and harder to compose with existing state/merge aggregate infrastructure. + +## 3. Require all files to have usable statistics before optimizing + +Rejected because it gives up the main benefit of this design: partial wins are still wins. + +## 4. Introduce `StatisticsStateExec` in v1 + +Deferred. +It may become useful later, but for the current scope the optimizer already has the most convenient place to read and classify the relevant file statistics. + +# Open Questions + +1. Is `UnionExec` over scan-state and literal/precomputed stats-state sufficient, or do we still want a dedicated mixed-source helper node? +2. Where is the cleanest API boundary for excluding stats-covered files from `RegionScanExec`? +3. Should v1 support only aggregates without `GROUP BY`, or should we allow a narrow grouped case when grouping columns align with file partition metadata? +4. Do we want a session option to disable this pass for debugging and staged rollout? + +# Conclusion + +The desired behavior is not "use statistics instead of scan when relation-level statistics happen to be complete". +The desired behavior is "at physical planning time, exploit file statistics wherever they are correct and fall back to scan for the rest". + +A dedicated GreptimeDB physical optimizer pass is the right abstraction boundary for that behavior. +It matches the concrete file-level execution model, composes naturally with `RegionScan` / `ScanInput`, and can reuse the existing step aggregate state/merge design to keep mixed execution both efficient and correct. diff --git a/src/common/function/src/aggrs/aggr_wrapper.rs b/src/common/function/src/aggrs/aggr_wrapper.rs index 6242ab9454f0..7b67e11fbab3 100644 --- a/src/common/function/src/aggrs/aggr_wrapper.rs +++ b/src/common/function/src/aggrs/aggr_wrapper.rs @@ -330,6 +330,39 @@ impl StateWrapper { acc_args.return_field = self.deduce_aggr_return_type(&acc_args)?; Ok(acc_args) } + + /// Builds a state scalar from explicit state-field values. + /// + /// The caller must provide one scalar per state field in the wrapper's state layout. + /// This method is responsible only for validating the current wrapper state type and + /// assembling the final struct scalar from those explicit field values. + pub fn value_from_custom_state_fields( + &self, + arg_types: &[DataType], + state_values: Vec, + ) -> datafusion_common::Result { + let DataType::Struct(fields) = self.return_type(arg_types)? else { + return Err(datafusion_common::DataFusionError::Internal(format!( + "Expected struct state type for {}, got non-struct return type", + self.name() + ))); + }; + if fields.len() != state_values.len() { + return Err(datafusion_common::DataFusionError::Internal(format!( + "Expected {} state fields for {}, got {}", + fields.len(), + self.name(), + state_values.len() + ))); + } + + let arrays = state_values + .into_iter() + .map(|value| value.to_array()) + .collect::>>()?; + let struct_array = build_state_struct_array(&fields, arrays)?; + Ok(ScalarValue::Struct(Arc::new(struct_array))) + } } impl AggregateUDFImpl for StateWrapper { @@ -472,13 +505,59 @@ impl AggregateUDFImpl for StateWrapper { }; let array = ret.to_array().ok()?; - let struct_array = StructArray::new(fields.clone(), vec![array], None); let ret = ScalarValue::Struct(Arc::new(struct_array)); Some(ret) } } +fn build_state_struct_array( + fields: &Fields, + arrays: Vec, +) -> datafusion_common::Result { + let array_type = arrays + .iter() + .map(|array| array.data_type().clone()) + .collect::>(); + let expected_type = fields + .iter() + .map(|field| field.data_type().clone()) + .collect::>(); + if array_type != expected_type { + // Keep this fallback intentionally lenient. + // + // Historically the wrapper path has tolerated state-schema drift as long as the + // physical state columns remain positionally compatible. This shows up most clearly + // in order-sensitive aggregates such as first_value/last_value, where DataFusion-side + // state metadata and the arrays we need to wrap may not line up exactly. The merge + // path consumes state columns by position, not by field metadata, so preserving a + // struct wrapper here is more compatible than failing eagerly on field/type mismatch. + debug!( + "State mismatch, expected: {}, got: {} for expected fields: {:?} and given array types: {:?}", + fields.len(), + arrays.len(), + fields, + array_type, + ); + let guess_schema = arrays + .iter() + .enumerate() + .map(|(index, array)| { + Field::new( + format!("col_{index}[mismatch_state]").as_str(), + array.data_type().clone(), + true, + ) + }) + .collect::(); + return StructArray::try_new(guess_schema, arrays, None) + .map_err(|err| datafusion_common::DataFusionError::ArrowError(Box::new(err), None)); + } + + StructArray::try_new(fields.clone(), arrays, None) + .map_err(|err| datafusion_common::DataFusionError::ArrowError(Box::new(err), None)) +} + /// The wrapper's input is the same as the original aggregate function's input, /// and the output is the state function's output. #[derive(Debug)] @@ -510,42 +589,9 @@ impl StateGroupsAccum { } fn wrap_state_arrays(&self, arrays: Vec) -> datafusion_common::Result { - let array_type = arrays - .iter() - .map(|array| array.data_type().clone()) - .collect::>(); - let expected_type = self - .state_fields - .iter() - .map(|field| field.data_type().clone()) - .collect::>(); - if array_type != expected_type { - debug!( - "State mismatch, expected: {}, got: {} for expected fields: {:?} and given array types: {:?}", - self.state_fields.len(), - arrays.len(), - self.state_fields, - array_type, - ); - let guess_schema = arrays - .iter() - .enumerate() - .map(|(index, array)| { - Field::new( - format!("col_{index}[mismatch_state]").as_str(), - array.data_type().clone(), - true, - ) - }) - .collect::(); - let array = StructArray::try_new(guess_schema, arrays, None)?; - return Ok(Arc::new(array)); - } - - Ok(Arc::new(StructArray::try_new( - self.state_fields.clone(), + Ok(Arc::new(build_state_struct_array( + &self.state_fields, arrays, - None, )?)) } } @@ -621,44 +667,11 @@ impl Accumulator for StateAccum { fn evaluate(&mut self) -> datafusion_common::Result { let state = self.inner.state()?; - let array = state + let arrays = state .iter() .map(|s| s.to_array()) .collect::, _>>()?; - let array_type = array - .iter() - .map(|a| a.data_type().clone()) - .collect::>(); - let expected_type: Vec<_> = self - .state_fields - .iter() - .map(|f| f.data_type().clone()) - .collect(); - if array_type != expected_type { - debug!( - "State mismatch, expected: {}, got: {} for expected fields: {:?} and given array types: {:?}", - self.state_fields.len(), - array.len(), - self.state_fields, - array_type, - ); - let guess_schema = array - .iter() - .enumerate() - .map(|(index, array)| { - Field::new( - format!("col_{index}[mismatch_state]").as_str(), - array.data_type().clone(), - true, - ) - }) - .collect::(); - let arr = StructArray::try_new(guess_schema, array, None)?; - - return Ok(ScalarValue::Struct(Arc::new(arr))); - } - - let struct_array = StructArray::try_new(self.state_fields.clone(), array, None)?; + let struct_array = build_state_struct_array(&self.state_fields, arrays)?; Ok(ScalarValue::Struct(Arc::new(struct_array))) } @@ -860,7 +873,10 @@ impl Accumulator for MergeAccum { "State fields mismatch, expected: {:?}, got: {:?}", self.state_fields, fields ); - // state fields mismatch might be acceptable by datafusion, continue + // Intentionally continue here for compatibility with the wrapper's historical + // behavior: downstream merge logic uses the struct columns positionally, and some + // DataFusion/order-sensitive aggregate paths can produce equivalent state payloads + // whose field metadata does not exactly match our locally expected schema. } // now fields should be the same, so we can merge the batch diff --git a/src/common/function/src/aggrs/aggr_wrapper/tests.rs b/src/common/function/src/aggrs/aggr_wrapper/tests.rs index de3a77df6b9d..516babb114cf 100644 --- a/src/common/function/src/aggrs/aggr_wrapper/tests.rs +++ b/src/common/function/src/aggrs/aggr_wrapper/tests.rs @@ -28,6 +28,7 @@ use datafusion::datasource::DefaultTableSource; use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext}; use datafusion::functions_aggregate::average::avg_udaf; use datafusion::functions_aggregate::count::count_udaf; +use datafusion::functions_aggregate::min_max::max_udaf; use datafusion::functions_aggregate::sum::sum_udaf; use datafusion::optimizer::AnalyzerRule; use datafusion::optimizer::analyzer::type_coercion::TypeCoercion; @@ -291,6 +292,50 @@ fn create_avg_state_groups_accumulator() -> Box { state_wrapper.create_groups_accumulator(acc_args).unwrap() } +fn test_state_scalar_for_type(data_type: &DataType) -> ScalarValue { + match data_type { + DataType::Float64 => ScalarValue::Float64(Some(1.5)), + DataType::UInt64 => ScalarValue::UInt64(Some(2)), + DataType::Int64 => ScalarValue::Int64(Some(3)), + _ => panic!("unsupported test data type: {data_type:?}"), + } +} + +#[test] +fn test_value_from_custom_state_fields_single_field() { + let wrapper = StateWrapper::new((*max_udaf()).clone()).unwrap(); + let value = wrapper + .value_from_custom_state_fields(&[DataType::Int64], vec![ScalarValue::Int64(Some(7))]) + .unwrap(); + + let ScalarValue::Struct(array) = value else { + panic!("expected struct state") + }; + assert_eq!(1, array.columns().len()); + assert_eq!(DataType::Int64, array.column(0).data_type().clone()); +} + +#[test] +fn test_value_from_custom_state_fields_multi_field() { + let wrapper = StateWrapper::new((*avg_udaf()).clone()).unwrap(); + let DataType::Struct(fields) = wrapper.return_type(&[DataType::Float64]).unwrap() else { + panic!("expected struct state type") + }; + + let values = fields + .iter() + .map(|field| test_state_scalar_for_type(field.data_type())) + .collect::>(); + let value = wrapper + .value_from_custom_state_fields(&[DataType::Float64], values) + .unwrap(); + + let ScalarValue::Struct(array) = value else { + panic!("expected struct state") + }; + assert_eq!(fields.len(), array.columns().len()); +} + #[tokio::test] async fn test_sum_udaf() { let ctx = SessionContext::new(); diff --git a/src/mito2/src/read.rs b/src/mito2/src/read.rs index 240a99c247db..57507c86f49a 100644 --- a/src/mito2/src/read.rs +++ b/src/mito2/src/read.rs @@ -28,6 +28,7 @@ pub(crate) mod prune; pub(crate) mod pruner; pub mod range; pub(crate) mod range_cache; +pub(crate) mod scan_input_stats; pub mod scan_region; pub mod scan_util; pub(crate) mod seq_scan; diff --git a/src/mito2/src/read/scan_input_stats.rs b/src/mito2/src/read/scan_input_stats.rs new file mode 100644 index 000000000000..0e83296782f2 --- /dev/null +++ b/src/mito2/src/read/scan_input_stats.rs @@ -0,0 +1,434 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::sync::Arc; + +use api::v1::SemanticType; +use common_error::ext::{BoxedError, PlainError}; +use common_error::status_code::StatusCode; +use datafusion_common::pruning::PruningStatistics; +use datafusion_common::{Column, ScalarValue}; +use datatypes::arrow::array::{Array, AsArray, UInt64Array}; +use datatypes::arrow::compute::{cast, max, max_boolean, max_string, min, min_boolean, min_string}; +use datatypes::arrow::datatypes::{ + DataType as ArrowDataType, Float32Type, Float64Type, Int32Type, Int64Type, + TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType, + TimestampSecondType, UInt32Type, UInt64Type, +}; +use datatypes::value::Value; +use store_api::metadata::RegionMetadata; +use store_api::scan_stats::{ + RegionScanColumnStats as RegionScanColumnInputStats, + RegionScanFileStats as RegionScanFileInputStats, RegionScanStats as RegionScanInputStats, +}; + +use crate::read::scan_region::ScanInput; +use crate::sst::file::FileHandle; +use crate::sst::parquet::format::ReadFormat; +use crate::sst::parquet::stats::RowGroupPruningStats; + +pub fn build_scan_input_stats( + input: &ScanInput, + metadata: &RegionMetadata, +) -> std::result::Result { + let files = input + .files + .iter() + .enumerate() + .map(|(index, file)| { + let partition_expr_matches_region = file_partition_expr_matches_region(metadata, file)?; + Ok(RegionScanFileInputStats { + file_id: file.file_id().file_id(), + file_ordinal: index, + exact_num_rows: exact_file_num_rows(file), + time_range: exact_file_time_range(file), + field_stats: build_file_field_stats(input, metadata, file)?, + partition_expr_matches_region, + }) + }) + .collect::, BoxedError>>()?; + + Ok(RegionScanInputStats { files }) +} + +fn exact_file_num_rows(file: &FileHandle) -> Option { + Some(file.num_rows()) +} + +fn exact_file_time_range( + file: &FileHandle, +) -> Option<(common_time::Timestamp, common_time::Timestamp)> { + (file.meta_ref().num_row_groups != 0).then_some(file.time_range()) +} + +fn build_file_field_stats( + input: &ScanInput, + metadata: &RegionMetadata, + file: &FileHandle, +) -> std::result::Result, BoxedError> { + // TODO(ruihang): extract stats only for columns referenced by the supported aggregates + // instead of eagerly materializing every field column for every file. + let Some(parquet_meta) = input + .cache_strategy + .get_parquet_meta_data_from_mem_cache(file.file_id()) + else { + return Ok(HashMap::new()); + }; + + let region_metadata = Arc::new(metadata.clone()); + let file_path = format!("{:?}", file.file_id()); + let read_format = ReadFormat::new( + region_metadata.clone(), + None, + input.flat_format, + Some(parquet_meta.file_metadata().schema_descr().num_columns()), + &file_path, + false, + ) + .map_err(BoxedError::new)?; + let row_groups = parquet_meta.row_groups(); + let pruning_stats = + RowGroupPruningStats::new(row_groups, &read_format, Some(region_metadata), false); + + metadata + .column_metadatas + .iter() + .filter(|column| column.semantic_type == SemanticType::Field) + .filter_map(|column| { + let stats = build_field_column_stats( + column.column_schema.name.as_str(), + row_groups, + &pruning_stats, + ) + .transpose(); + match stats { + Some(Ok(stats)) => Some(Ok((column.column_schema.name.to_string(), stats))), + Some(Err(err)) => Some(Err(err)), + None => None, + } + }) + .collect() +} + +fn build_field_column_stats( + column_name: &str, + row_groups: &[parquet::file::metadata::RowGroupMetaData], + pruning_stats: &impl PruningStatistics, +) -> std::result::Result, BoxedError> { + let column = Column::from_name(column_name); + let min_value = aggregate_column_min_value(pruning_stats.min_values(&column).as_deref())?; + let max_value = aggregate_column_max_value(pruning_stats.max_values(&column).as_deref())?; + let exact_non_null_rows = + aggregate_exact_non_null_rows(pruning_stats.null_counts(&column).as_deref(), row_groups)?; + + if min_value.is_none() && max_value.is_none() && exact_non_null_rows.is_none() { + return Ok(None); + } + + Ok(Some(RegionScanColumnInputStats { + min_value, + max_value, + exact_non_null_rows, + })) +} + +fn aggregate_column_min_value( + values: Option<&dyn Array>, +) -> std::result::Result, BoxedError> { + aggregate_column_extreme_value(values, true) +} + +fn aggregate_column_max_value( + values: Option<&dyn Array>, +) -> std::result::Result, BoxedError> { + aggregate_column_extreme_value(values, false) +} + +fn aggregate_column_extreme_value( + values: Option<&dyn Array>, + is_min: bool, +) -> std::result::Result, BoxedError> { + let Some(values) = values else { + return Ok(None); + }; + if values.is_empty() || values.null_count() > 0 { + return Ok(None); + } + + if let Some(value) = aggregate_column_extreme_value_with_compute(values, is_min)? { + return Ok(Some(value)); + } + + aggregate_column_extreme_value_fallback(values, is_min) +} + +fn aggregate_column_extreme_value_with_compute( + values: &dyn Array, + is_min: bool, +) -> std::result::Result, BoxedError> { + if let ArrowDataType::Dictionary(_, value_type) = values.data_type() { + let casted = cast(values, value_type.as_ref()).map_err(|err| { + BoxedError::new(PlainError::new( + format!("failed to cast dictionary stats array to value type: {err}"), + StatusCode::Unexpected, + )) + })?; + return aggregate_column_extreme_value_with_compute(casted.as_ref(), is_min); + } + + macro_rules! compute_primitive_extreme { + ($array_ty:ty, $variant:ident) => {{ + let array = values.as_primitive::<$array_ty>(); + let scalar = if is_min { + min(array).map(|value| ScalarValue::$variant(Some(value))) + } else { + max(array).map(|value| ScalarValue::$variant(Some(value))) + }; + scalar + .map(|value| Value::try_from(value).map_err(BoxedError::new)) + .transpose() + }}; + } + + macro_rules! compute_timestamp_extreme { + ($array_ty:ty, $variant:ident, $tz:expr) => {{ + let array = values.as_primitive::<$array_ty>(); + let scalar = if is_min { + min(array).map(|value| ScalarValue::$variant(Some(value), $tz.clone())) + } else { + max(array).map(|value| ScalarValue::$variant(Some(value), $tz.clone())) + }; + scalar + .map(|value| Value::try_from(value).map_err(BoxedError::new)) + .transpose() + }}; + } + + match values.data_type() { + ArrowDataType::Boolean => { + let array = values.as_boolean(); + let scalar = if is_min { + min_boolean(array).map(|value| ScalarValue::Boolean(Some(value))) + } else { + max_boolean(array).map(|value| ScalarValue::Boolean(Some(value))) + }; + scalar + .map(|value| Value::try_from(value).map_err(BoxedError::new)) + .transpose() + } + ArrowDataType::Utf8 => { + let array = values.as_string::(); + let scalar = if is_min { + min_string(array) + } else { + max_string(array) + } + .map(|value| ScalarValue::Utf8(Some(value.to_string()))); + scalar + .map(|value| Value::try_from(value).map_err(BoxedError::new)) + .transpose() + } + ArrowDataType::LargeUtf8 => { + let array = values.as_string::(); + let scalar = if is_min { + min_string(array) + } else { + max_string(array) + } + .map(|value| ScalarValue::LargeUtf8(Some(value.to_string()))); + scalar + .map(|value| Value::try_from(value).map_err(BoxedError::new)) + .transpose() + } + ArrowDataType::UInt32 => compute_primitive_extreme!(UInt32Type, UInt32), + ArrowDataType::UInt64 => compute_primitive_extreme!(UInt64Type, UInt64), + ArrowDataType::Int32 => compute_primitive_extreme!(Int32Type, Int32), + ArrowDataType::Int64 => compute_primitive_extreme!(Int64Type, Int64), + ArrowDataType::Float32 => compute_primitive_extreme!(Float32Type, Float32), + ArrowDataType::Float64 => compute_primitive_extreme!(Float64Type, Float64), + ArrowDataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Second, tz) => { + compute_timestamp_extreme!(TimestampSecondType, TimestampSecond, tz) + } + ArrowDataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Millisecond, tz) => { + compute_timestamp_extreme!(TimestampMillisecondType, TimestampMillisecond, tz) + } + ArrowDataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Microsecond, tz) => { + compute_timestamp_extreme!(TimestampMicrosecondType, TimestampMicrosecond, tz) + } + ArrowDataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Nanosecond, tz) => { + compute_timestamp_extreme!(TimestampNanosecondType, TimestampNanosecond, tz) + } + _ => Ok(None), + } +} + +fn aggregate_column_extreme_value_fallback( + values: &dyn Array, + is_min: bool, +) -> std::result::Result, BoxedError> { + let scalars = (0..values.len()) + .map(|index| { + ScalarValue::try_from_array(values, index).map_err(|err| { + BoxedError::new(PlainError::new( + format!("failed to extract scalar value from stats array: {err}"), + StatusCode::Unexpected, + )) + }) + }) + .collect::, _>>()?; + let mut iter = scalars + .into_iter() + .map(|value| Value::try_from(value).map_err(BoxedError::new)); + let Some(first) = iter.next() else { + return Ok(None); + }; + let first = first?; + + iter.try_fold(first, |current, value| { + let value = value?; + let next = if is_min { + Value::min(current, value) + } else { + Value::max(current, value) + }; + Ok::<_, BoxedError>(next) + }) + .map(Some) +} + +fn aggregate_exact_non_null_rows( + null_counts: Option<&dyn Array>, + row_groups: &[parquet::file::metadata::RowGroupMetaData], +) -> std::result::Result, BoxedError> { + let Some(null_counts) = null_counts else { + return Ok(None); + }; + if null_counts.null_count() > 0 { + return Ok(None); + } + let Some(null_counts) = null_counts.as_any().downcast_ref::() else { + return Ok(None); + }; + + row_groups + .iter() + .zip(null_counts.iter()) + .try_fold(0usize, |acc, (row_group, null_count)| { + let row_count = usize::try_from(row_group.num_rows()).map_err(|err| { + BoxedError::new(PlainError::new( + format!("failed to convert row group row count to usize: {err}"), + StatusCode::Unexpected, + )) + })?; + let null_count = usize::try_from(null_count.unwrap_or_default()).map_err(|err| { + BoxedError::new(PlainError::new( + format!("failed to convert parquet null count to usize: {err}"), + StatusCode::Unexpected, + )) + })?; + Ok::<_, BoxedError>(acc + row_count.saturating_sub(null_count)) + }) + .map(Some) +} + +fn file_partition_expr_matches_region( + metadata: &RegionMetadata, + file: &FileHandle, +) -> std::result::Result { + let file_partition_expr = file + .meta_ref() + .partition_expr + .as_ref() + .map(|expr| expr.as_json_str()) + .transpose() + .map_err(BoxedError::new)?; + Ok(file_partition_expr == metadata.partition_expr) +} + +#[cfg(test)] +mod tests { + use common_time::timestamp::TimeUnit as TimestampUnit; + use datatypes::arrow::array::{ + DictionaryArray, Int64Array, StringArray, TimestampMillisecondArray, UInt64Array, + }; + use datatypes::arrow::datatypes::Int32Type; + + use super::*; + + #[test] + fn test_aggregate_column_extreme_value_uses_numeric_fast_path() { + let values = UInt64Array::from(vec![Some(7), Some(2), Some(11)]); + + let min_value = aggregate_column_min_value(Some(&values)).unwrap(); + let max_value = aggregate_column_max_value(Some(&values)).unwrap(); + + assert_eq!(Some(Value::UInt64(2)), min_value); + assert_eq!(Some(Value::UInt64(11)), max_value); + } + + #[test] + fn test_aggregate_column_extreme_value_uses_string_fast_path() { + let values = StringArray::from(vec![Some("delta"), Some("alpha"), Some("gamma")]); + + let min_value = aggregate_column_min_value(Some(&values)).unwrap(); + let max_value = aggregate_column_max_value(Some(&values)).unwrap(); + + assert_eq!(Some(Value::String("alpha".into())), min_value); + assert_eq!(Some(Value::String("gamma".into())), max_value); + } + + #[test] + fn test_aggregate_column_extreme_value_uses_timestamp_fast_path() { + let values = TimestampMillisecondArray::from(vec![Some(7), Some(2), Some(11)]); + + let min_value = aggregate_column_min_value(Some(&values)).unwrap(); + let max_value = aggregate_column_max_value(Some(&values)).unwrap(); + + assert_eq!( + Some(Value::Timestamp(common_time::Timestamp::new( + 2, + TimestampUnit::Millisecond + ))), + min_value + ); + assert_eq!( + Some(Value::Timestamp(common_time::Timestamp::new( + 11, + TimestampUnit::Millisecond + ))), + max_value + ); + } + + #[test] + fn test_aggregate_column_extreme_value_dictionary_falls_back() { + let values = + DictionaryArray::::from_iter([Some("delta"), Some("alpha"), Some("gamma")]); + + let min_value = aggregate_column_min_value(Some(&values)).unwrap(); + let max_value = aggregate_column_max_value(Some(&values)).unwrap(); + + assert_eq!(Some(Value::String("alpha".into())), min_value); + assert_eq!(Some(Value::String("gamma".into())), max_value); + } + + #[test] + fn test_aggregate_column_extreme_value_returns_none_when_any_stats_are_null() { + let values = Int64Array::from(vec![Some(7), None, Some(11)]); + + assert_eq!(None, aggregate_column_min_value(Some(&values)).unwrap()); + assert_eq!(None, aggregate_column_max_value(Some(&values)).unwrap()); + } +} diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index 5cb2d75e25e5..227a1a55e47d 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -806,6 +806,7 @@ fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool { } /// Common input for different scanners. +#[derive(Clone)] pub struct ScanInput { /// Region SST access layer. access_layer: AccessLayerRef, @@ -929,6 +930,26 @@ impl ScanInput { self } + /// Excludes SST files by their original ordinal in this scan input. + #[must_use] + pub(crate) fn with_excluded_file_ordinals(mut self, excluded_file_ordinals: &[usize]) -> Self { + if excluded_file_ordinals.is_empty() { + return self; + } + + let excluded = excluded_file_ordinals + .iter() + .copied() + .collect::>(); + self.files = self + .files + .into_iter() + .enumerate() + .filter_map(|(ordinal, file)| (!excluded.contains(&ordinal)).then_some(file)) + .collect(); + self + } + /// Sets cache for this query. #[must_use] pub(crate) fn with_cache(mut self, cache: CacheStrategy) -> Self { @@ -1916,6 +1937,12 @@ mod tests { .with_files(vec![file]) } + fn new_test_file(file_id: store_api::storage::FileId) -> FileHandle { + let mut meta = crate::sst::file::FileMeta::default(); + meta.file_id = file_id; + FileHandle::new(meta, Arc::new(crate::sst::file_purger::NoopFilePurger)) + } + #[tokio::test] async fn test_build_read_column_ids_includes_filters() { let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false)); @@ -2122,6 +2149,27 @@ mod tests { assert!(build_scan_fingerprint(&no_files).is_none()); } + #[tokio::test] + async fn test_scan_input_excludes_file_ordinals_only_from_ssts() { + use store_api::storage::FileId; + + let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false)); + let input = new_scan_input(metadata, vec![]).await.with_files(vec![ + new_test_file(FileId::random()), + new_test_file(FileId::random()), + new_test_file(FileId::random()), + ]); + + let remaining = input.clone().with_excluded_file_ordinals(&[1]); + + assert_eq!(input.num_memtables(), remaining.num_memtables()); + assert_eq!(remaining.num_files(), 2); + assert_eq!( + remaining.file_ids(), + vec![input.file_ids()[0], input.file_ids()[2]] + ); + } + #[tokio::test] async fn test_build_scan_fingerprint_tracks_schema_and_partition_expr_changes() { let base = metadata_with_primary_key(vec![0, 1], false); diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index a1b3b8f350a8..04b271c87985 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -32,6 +32,7 @@ use store_api::metadata::RegionMetadataRef; use store_api::region_engine::{ PartitionRange, PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties, }; +use store_api::scan_stats::RegionScanStats; use store_api::storage::TimeSeriesRowSelector; use tokio::sync::Semaphore; @@ -43,6 +44,7 @@ use crate::read::last_row::{FlatLastRowReader, LastRowReader}; use crate::read::merge::MergeReaderBuilder; use crate::read::pruner::{PartitionPruner, Pruner}; use crate::read::range::RangeMeta; +use crate::read::scan_input_stats::build_scan_input_stats; use crate::read::scan_region::{ScanInput, StreamContext}; use crate::read::scan_util::{ PartitionMetrics, PartitionMetricsList, SplitRecordBatchStream, scan_file_ranges, @@ -114,6 +116,18 @@ impl SeqScan { Ok(Box::pin(aggr_stream)) } + fn rebuild_input(&mut self, input: ScanInput) { + // IMPORTANT: when `ScanInput.files` changes, all derived state must move together. + // `partition_ranges()` identifiers and `Pruner` internals are tied to this exact + // `StreamContext`, so reusing the old ones would make them point at stale file/range + // layouts. + let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input)); + self.properties.partitions = vec![stream_ctx.partition_ranges()]; + let num_workers = common_stat::get_total_cpu_cores().max(1); + self.pruner = Arc::new(Pruner::new(stream_ctx.clone(), num_workers)); + self.stream_ctx = stream_ctx; + } + /// Scan [`Batch`] in all partitions one by one. pub(crate) fn scan_all_partitions(&self) -> Result { let metrics_set = ExecutionPlanMetricsSet::new(); @@ -653,6 +667,19 @@ impl RegionScanner for SeqScan { } fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> { + request.validate()?; + + if let Some(excluded_file_ordinals) = request.excluded_file_ordinals.clone() { + // IMPORTANT: exclusion changes the underlying file set, so rebuild the input-bound + // runtime state first, then apply ordinary property updates from `prepare()`. + let input = self + .stream_ctx + .input + .clone() + .with_excluded_file_ordinals(&excluded_file_ordinals); + self.rebuild_input(input); + } + self.properties.prepare(request); self.check_scan_limit().map_err(BoxedError::new)?; @@ -669,6 +696,14 @@ impl RegionScanner for SeqScan { predicate.is_some() } + fn scan_input_stats(&self) -> Result, BoxedError> { + build_scan_input_stats( + &self.stream_ctx.input, + self.stream_ctx.input.mapper.metadata(), + ) + .map(Some) + } + fn add_dyn_filter_to_predicate( &mut self, filter_exprs: Vec>, diff --git a/src/mito2/src/read/series_scan.rs b/src/mito2/src/read/series_scan.rs index 2d6994d0afbb..d3996e796109 100644 --- a/src/mito2/src/read/series_scan.rs +++ b/src/mito2/src/read/series_scan.rs @@ -36,6 +36,7 @@ use store_api::metadata::RegionMetadataRef; use store_api::region_engine::{ PartitionRange, PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties, }; +use store_api::scan_stats::RegionScanStats; use tokio::sync::Semaphore; use tokio::sync::mpsc::error::{SendTimeoutError, TrySendError}; use tokio::sync::mpsc::{self, Receiver, Sender}; @@ -45,6 +46,7 @@ use crate::error::{ ScanSeriesSnafu, TooManyFilesToReadSnafu, }; use crate::read::pruner::{PartitionPruner, Pruner}; +use crate::read::scan_input_stats::build_scan_input_stats; use crate::read::scan_region::{ScanInput, StreamContext}; use crate::read::scan_util::{PartitionMetrics, PartitionMetricsList, SeriesDistributorMetrics}; use crate::read::seq_scan::{SeqScan, build_flat_sources, build_sources}; @@ -308,6 +310,18 @@ impl SeriesScan { Ok(()) } + + fn rebuild_input(&mut self, input: ScanInput) { + // IMPORTANT: `SeriesScan` keeps extra runtime state (`receivers`) on top of the shared + // `StreamContext`/`Pruner` pair. Once the file set changes, all of them must be reset + // together so the distributor does not keep using channels from the old partition layout. + let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input)); + self.properties.partitions = vec![stream_ctx.partition_ranges()]; + let num_workers = common_stat::get_total_cpu_cores().max(1); + self.pruner = Arc::new(Pruner::new(stream_ctx.clone(), num_workers)); + self.stream_ctx = stream_ctx; + self.receivers = Mutex::new(Vec::new()); + } } fn new_channel_list(num_partitions: usize) -> (SenderList, ReceiverList) { @@ -348,6 +362,19 @@ impl RegionScanner for SeriesScan { } fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> { + request.validate()?; + + if let Some(excluded_file_ordinals) = request.excluded_file_ordinals.clone() { + // IMPORTANT: exclusion is effectively a new scan input for `SeriesScan`, so rebuild + // before applying the remaining prepare options. + let input = self + .stream_ctx + .input + .clone() + .with_excluded_file_ordinals(&excluded_file_ordinals); + self.rebuild_input(input); + } + self.properties.prepare(request); self.check_scan_limit().map_err(BoxedError::new)?; @@ -364,6 +391,14 @@ impl RegionScanner for SeriesScan { predicate.is_some() } + fn scan_input_stats(&self) -> Result, BoxedError> { + build_scan_input_stats( + &self.stream_ctx.input, + self.stream_ctx.input.mapper.metadata(), + ) + .map(Some) + } + fn add_dyn_filter_to_predicate( &mut self, filter_exprs: Vec>, diff --git a/src/mito2/src/read/unordered_scan.rs b/src/mito2/src/read/unordered_scan.rs index 2d557e887176..276f01f71733 100644 --- a/src/mito2/src/read/unordered_scan.rs +++ b/src/mito2/src/read/unordered_scan.rs @@ -32,9 +32,11 @@ use store_api::metadata::RegionMetadataRef; use store_api::region_engine::{ PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties, }; +use store_api::scan_stats::RegionScanStats; use crate::error::{PartitionOutOfRangeSnafu, Result}; use crate::read::pruner::{PartitionPruner, Pruner}; +use crate::read::scan_input_stats::build_scan_input_stats; use crate::read::scan_region::{ScanInput, StreamContext}; use crate::read::scan_util::{ PartitionMetrics, PartitionMetricsList, scan_file_ranges, scan_flat_file_ranges, @@ -450,6 +452,16 @@ impl UnorderedScan { }; Ok(Box::pin(stream)) } + + fn rebuild_input(&mut self, input: ScanInput) { + // IMPORTANT: file exclusion changes the range layout seen by this scanner, so the + // `StreamContext`, partition ranges, and `Pruner` must be rebuilt as one snapshot. + let stream_ctx = Arc::new(StreamContext::unordered_scan_ctx(input)); + self.properties.partitions = vec![stream_ctx.partition_ranges()]; + let num_workers = common_stat::get_total_cpu_cores().max(1); + self.pruner = Arc::new(Pruner::new(stream_ctx.clone(), num_workers)); + self.stream_ctx = stream_ctx; + } } impl RegionScanner for UnorderedScan { @@ -470,6 +482,19 @@ impl RegionScanner for UnorderedScan { } fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> { + request.validate()?; + + if let Some(excluded_file_ordinals) = request.excluded_file_ordinals.clone() { + // IMPORTANT: rebuild first so later property updates apply to the new input shape, + // not the stale one. + let input = self + .stream_ctx + .input + .clone() + .with_excluded_file_ordinals(&excluded_file_ordinals); + self.rebuild_input(input); + } + self.properties.prepare(request); Ok(()) @@ -495,6 +520,14 @@ impl RegionScanner for UnorderedScan { predicate.is_some() } + fn scan_input_stats(&self) -> Result, BoxedError> { + build_scan_input_stats( + &self.stream_ctx.input, + self.stream_ctx.input.mapper.metadata(), + ) + .map(Some) + } + fn add_dyn_filter_to_predicate( &mut self, filter_exprs: Vec>, diff --git a/src/query/src/metrics.rs b/src/query/src/metrics.rs index e0d02e9a3d1f..bb3589614baf 100644 --- a/src/query/src/metrics.rs +++ b/src/query/src/metrics.rs @@ -74,6 +74,44 @@ lazy_static! { "total number of query memory allocations rejected" ) .unwrap(); + + pub static ref AGGREGATE_STATS_STATS_INPUT_FILES_TOTAL: IntCounter = register_int_counter!( + "greptime_aggregate_stats_stats_input_files_total", + "aggregate stats literal or precomputed stats input files total" + ) + .unwrap(); + pub static ref AGGREGATE_STATS_SCANNED_INPUT_FILES_TOTAL: IntCounter = register_int_counter!( + "greptime_aggregate_stats_scanned_input_files_total", + "aggregate stats scanned files total" + ) + .unwrap(); +} + +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)] +pub struct AggregateStatsMetricsSnapshot { + pub stats_input_files: u64, + pub scanned_input_files: u64, +} + +#[cfg(test)] +impl AggregateStatsMetricsSnapshot { + pub fn delta(self, before: Self) -> Self { + Self { + stats_input_files: self + .stats_input_files + .saturating_sub(before.stats_input_files), + scanned_input_files: self + .scanned_input_files + .saturating_sub(before.scanned_input_files), + } + } +} + +pub fn aggregate_stats_metrics_snapshot() -> AggregateStatsMetricsSnapshot { + AggregateStatsMetricsSnapshot { + stats_input_files: AGGREGATE_STATS_STATS_INPUT_FILES_TOTAL.get(), + scanned_input_files: AGGREGATE_STATS_SCANNED_INPUT_FILES_TOTAL.get(), + } } /// A stream to call the callback once a RecordBatch stream is done. diff --git a/src/query/src/optimizer.rs b/src/query/src/optimizer.rs index 4259b587ba11..646a8ef36cea 100644 --- a/src/query/src/optimizer.rs +++ b/src/query/src/optimizer.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod aggr_stats; pub mod constant_term; pub mod count_wildcard; pub mod parallelize_scan; diff --git a/src/query/src/optimizer/aggr_stats.rs b/src/query/src/optimizer/aggr_stats.rs new file mode 100644 index 000000000000..8a0d4ddd7070 --- /dev/null +++ b/src/query/src/optimizer/aggr_stats.rs @@ -0,0 +1,454 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use arrow::array::{Array, ArrayRef, StructArray}; +use arrow::record_batch::RecordBatch; +use common_error::ext::BoxedError; +use common_telemetry::debug; +use datafusion::config::ConfigOptions; +use datafusion::datasource::memory::MemorySourceConfig; +use datafusion::datasource::source::DataSourceExec; +use datafusion::physical_optimizer::PhysicalOptimizerRule; +use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode}; +use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; +use datafusion::physical_plan::union::UnionExec; +use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanProperties}; +use datafusion_common::tree_node::{Transformed, TreeNode}; +use datafusion_common::{DataFusionError, Result as DfResult, ScalarValue}; +use datafusion_physical_expr::aggregate::AggregateFunctionExpr; +use datatypes::arrow::datatypes::DataType; +use table::table::scan::{AggregateStatsExplain, RegionScanExec}; + +mod check; +mod split; +#[cfg(test)] +mod tests; + +use check::RewriteCheck; +use split::{common_stats_file_ordinals, filter_stats_by_file_ordinals, partial_state_from_stats}; + +use crate::metrics::{ + AGGREGATE_STATS_SCANNED_INPUT_FILES_TOTAL, AGGREGATE_STATS_STATS_INPUT_FILES_TOTAL, +}; + +enum RewriteTarget<'a> { + SingleStage { + aggregate_exec: &'a AggregateExec, + region_scan: &'a RegionScanExec, + }, + FinalOverPartial { + final_exec: &'a AggregateExec, + partial_exec: &'a AggregateExec, + region_scan: &'a RegionScanExec, + keep_coalesce: bool, + }, +} + +impl<'a> RewriteTarget<'a> { + fn extract(plan: &'a Arc) -> Option { + let aggregate_exec = plan.as_any().downcast_ref::()?; + + if matches!( + aggregate_exec.mode(), + AggregateMode::Single | AggregateMode::SinglePartitioned + ) { + let region_scan = AggregateStats::extract_region_scan(aggregate_exec.input())?; + return Some(Self::SingleStage { + aggregate_exec, + region_scan, + }); + } + + if !matches!(aggregate_exec.mode(), AggregateMode::Final) { + return None; + } + + if let Some(coalesce) = aggregate_exec + .input() + .as_any() + .downcast_ref::() + { + let partial_exec = coalesce.input().as_any().downcast_ref::()?; + if !matches!(partial_exec.mode(), AggregateMode::Partial) { + return None; + } + + let region_scan = AggregateStats::extract_region_scan(partial_exec.input())?; + return Some(Self::FinalOverPartial { + final_exec: aggregate_exec, + partial_exec, + region_scan, + keep_coalesce: true, + }); + } + + let partial_exec = aggregate_exec + .input() + .as_any() + .downcast_ref::()?; + if !matches!(partial_exec.mode(), AggregateMode::Partial) { + return None; + } + + let region_scan = AggregateStats::extract_region_scan(partial_exec.input())?; + Some(Self::FinalOverPartial { + final_exec: aggregate_exec, + partial_exec, + region_scan, + keep_coalesce: false, + }) + } + + fn first_stage_aggregate(&self) -> &'a AggregateExec { + match self { + RewriteTarget::SingleStage { aggregate_exec, .. } => aggregate_exec, + RewriteTarget::FinalOverPartial { partial_exec, .. } => partial_exec, + } + } + + fn region_scan(&self) -> &'a RegionScanExec { + match self { + RewriteTarget::SingleStage { region_scan, .. } + | RewriteTarget::FinalOverPartial { region_scan, .. } => region_scan, + } + } +} + +#[derive(Debug)] +pub struct AggregateStats; + +/// All supported aggregate from statistics +#[derive(Debug, Clone, PartialEq, Eq)] +enum StatsAgg { + CountStar, + CountField { + column_name: String, + arg_type: DataType, + }, + CountTimeIndex { + arg_type: DataType, + }, + MinField { + column_name: String, + arg_type: DataType, + }, + MinTimeIndex { + arg_type: DataType, + }, + MaxField { + column_name: String, + arg_type: DataType, + }, + MaxTimeIndex { + arg_type: DataType, + }, +} + +impl PhysicalOptimizerRule for AggregateStats { + fn optimize( + &self, + plan: Arc, + _config: &ConfigOptions, + ) -> DfResult> { + Self::do_optimize(plan) + } + + fn name(&self) -> &str { + "aggregate_stats" + } + + fn schema_check(&self) -> bool { + true + } +} + +impl AggregateStats { + fn do_optimize(plan: Arc) -> DfResult> { + let result = plan + .transform_down(|plan| { + let Some(target) = RewriteTarget::extract(&plan) else { + return Ok(Transformed::no(plan)); + }; + let Some(scan_input_stats) = target.region_scan().scan_input_stats()? else { + Self::record_rewrite_miss(None); + return Ok(Transformed::no(plan)); + }; + + let check = RewriteCheck::new(target.first_stage_aggregate(), target.region_scan()); + if let Some(reason) = check.skip_reason()? { + debug!("Skip aggregate stats optimization: {reason}"); + Self::record_rewrite_miss( + target + .region_scan() + .scan_input_stats() + .ok() + .flatten() + .map(|stats| stats.files.len()), + ); + return Ok(Transformed::no(plan)); + } + + let aggs = check.parse_aggs().map_err(|reason| { + DataFusionError::Internal(format!( + "aggregate stats rewrite became ineligible after eligibility check: {reason}" + )) + })?; + + let excluded_file_ordinals = common_stats_file_ordinals(&aggs, &scan_input_stats); + if excluded_file_ordinals.is_empty() { + debug!( + "Skip aggregate stats optimization: no shared stats-covered files across aggregates" + ); + Self::record_rewrite_miss(Some(scan_input_stats.files.len())); + return Ok(Transformed::no(plan)); + } + + let rewritten = Self::rewrite_aggregate( + &target, + &aggs, + &scan_input_stats, + &excluded_file_ordinals, + )?; + + Self::record_rewrite_hit(scan_input_stats.files.len(), excluded_file_ordinals.len()); + + Ok(Transformed::yes(rewritten)) + })? + .data; + + Ok(result) + } + + fn extract_region_scan(plan: &Arc) -> Option<&RegionScanExec> { + plan.as_any().downcast_ref::() + } + + fn rewrite_aggregate( + target: &RewriteTarget<'_>, + aggs: &[StatsAgg], + scan_input_stats: &store_api::scan_stats::RegionScanStats, + excluded_file_ordinals: &[usize], + ) -> DfResult> { + match target { + RewriteTarget::SingleStage { + aggregate_exec, + region_scan, + } => Self::rewrite_single_stage( + aggregate_exec, + region_scan, + aggs, + scan_input_stats, + excluded_file_ordinals, + ), + RewriteTarget::FinalOverPartial { + final_exec, + partial_exec, + region_scan, + keep_coalesce, + } => Self::rewrite_final_over_partial( + final_exec, + partial_exec, + region_scan, + *keep_coalesce, + aggs, + scan_input_stats, + excluded_file_ordinals, + ), + } + } + + fn rewrite_single_stage( + aggregate_exec: &AggregateExec, + region_scan: &RegionScanExec, + aggs: &[StatsAgg], + scan_input_stats: &store_api::scan_stats::RegionScanStats, + excluded_file_ordinals: &[usize], + ) -> DfResult> { + let union = Self::build_partial_union_source( + aggregate_exec, + region_scan, + aggs, + scan_input_stats, + excluded_file_ordinals, + )?; + let union = Self::coalesce_if_needed(union); + + Ok(Arc::new( + AggregateExec::try_new( + AggregateMode::Final, + aggregate_exec.group_expr().clone(), + aggregate_exec.aggr_expr().to_vec(), + vec![None; aggregate_exec.aggr_expr().len()], + union, + aggregate_exec.input_schema(), + )? + .with_limit_options(aggregate_exec.limit_options()), + )) + } + + fn rewrite_final_over_partial( + final_exec: &AggregateExec, + partial_exec: &AggregateExec, + region_scan: &RegionScanExec, + keep_coalesce: bool, + aggs: &[StatsAgg], + scan_input_stats: &store_api::scan_stats::RegionScanStats, + excluded_file_ordinals: &[usize], + ) -> DfResult> { + let partial_source = Self::build_partial_union_source( + partial_exec, + region_scan, + aggs, + scan_input_stats, + excluded_file_ordinals, + )?; + let final_input = if keep_coalesce { + Arc::new(CoalescePartitionsExec::new(partial_source)) as Arc + } else if partial_source.output_partitioning().partition_count() > 1 { + Arc::new(CoalescePartitionsExec::new(partial_source)) as Arc + } else { + partial_source + }; + + Ok(Arc::new( + AggregateExec::try_new( + *final_exec.mode(), + final_exec.group_expr().clone(), + final_exec.aggr_expr().to_vec(), + final_exec.filter_expr().to_vec(), + final_input, + final_exec.input_schema(), + )? + .with_limit_options(final_exec.limit_options()), + )) + } + + fn build_partial_union_source( + aggregate_exec: &AggregateExec, + region_scan: &RegionScanExec, + aggs: &[StatsAgg], + scan_input_stats: &store_api::scan_stats::RegionScanStats, + excluded_file_ordinals: &[usize], + ) -> DfResult> { + let stats_scan_input = + filter_stats_by_file_ordinals(scan_input_stats, excluded_file_ordinals); + let stats_states = aggs + .iter() + .map(|agg| { + partial_state_from_stats(agg, &stats_scan_input)?.ok_or_else(|| { + DataFusionError::Internal( + "missing stats-derived partial state for excluded files".to_string(), + ) + }) + }) + .collect::>>()?; + + let filtered_scan = Arc::new( + region_scan + .with_excluded_file_ordinals(excluded_file_ordinals.to_vec()) + .map_err(boxed_external)? + .with_aggregate_stats_explain(AggregateStatsExplain { + stats_file_ids: stats_scan_input + .files + .iter() + .map(|file| file.file_id) + .collect(), + }), + ); + + let partial_scan = Arc::new( + AggregateExec::try_new( + AggregateMode::Partial, + aggregate_exec.group_expr().clone(), + aggregate_exec.aggr_expr().to_vec(), + aggregate_exec.filter_expr().to_vec(), + filtered_scan, + aggregate_exec.input_schema(), + )? + .with_limit_options(aggregate_exec.limit_options()), + ); + + let stats_input = Self::build_stats_input(aggregate_exec.aggr_expr(), stats_states)?; + UnionExec::try_new(vec![partial_scan, stats_input]) + } + + fn coalesce_if_needed(plan: Arc) -> Arc { + if plan.output_partitioning().partition_count() > 1 { + Arc::new(CoalescePartitionsExec::new(plan)) + } else { + plan + } + } + + fn build_stats_input( + aggr_exprs: &[Arc], + stats_states: Vec, + ) -> DfResult> { + let fields = aggr_exprs.iter().try_fold(Vec::new(), |mut fields, expr| { + fields.extend(expr.state_fields()?); + Ok::<_, DataFusionError>(fields) + })?; + let schema = Arc::new(arrow::datatypes::Schema::new(fields)); + + let columns = stats_states + .into_iter() + .try_fold(Vec::new(), |mut columns, state| { + columns.extend(Self::state_columns(state)?); + Ok::<_, DataFusionError>(columns) + })?; + let batch = RecordBatch::try_new(schema.clone(), columns) + .map_err(|err| DataFusionError::ArrowError(Box::new(err), None))?; + + Ok(DataSourceExec::from_data_source( + MemorySourceConfig::try_new(&[vec![batch]], schema, None)?, + )) + } + + fn state_columns(state: ScalarValue) -> DfResult> { + let ScalarValue::Struct(array) = state else { + return Err(DataFusionError::Internal( + "aggregate stats rewrite expected a struct partial state".to_string(), + )); + }; + + let struct_array = array + .as_ref() + .as_any() + .downcast_ref::() + .ok_or_else(|| { + DataFusionError::Internal( + "aggregate stats rewrite expected a struct array partial state".to_string(), + ) + })?; + Ok(struct_array.columns().to_vec()) + } + + fn record_rewrite_hit(total_files: usize, excluded_files: usize) { + let fallback_files = total_files.saturating_sub(excluded_files); + AGGREGATE_STATS_STATS_INPUT_FILES_TOTAL.inc_by(excluded_files as u64); + AGGREGATE_STATS_SCANNED_INPUT_FILES_TOTAL.inc_by(fallback_files as u64); + } + + fn record_rewrite_miss(total_files: Option) { + if let Some(total_files) = total_files { + AGGREGATE_STATS_SCANNED_INPUT_FILES_TOTAL.inc_by(total_files as u64); + } + } +} + +fn boxed_external(err: BoxedError) -> DataFusionError { + DataFusionError::External(Box::new(err)) +} diff --git a/src/query/src/optimizer/aggr_stats/check.rs b/src/query/src/optimizer/aggr_stats/check.rs new file mode 100644 index 000000000000..e5b61a83e6d0 --- /dev/null +++ b/src/query/src/optimizer/aggr_stats/check.rs @@ -0,0 +1,229 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_telemetry::debug; +use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode}; +use datafusion_common::Result as DfResult; +use datafusion_common::utils::expr::COUNT_STAR_EXPANSION; +use datafusion_physical_expr::PhysicalExpr; +use datafusion_physical_expr::aggregate::AggregateFunctionExpr; +use datafusion_physical_expr::expressions::{Column, Literal}; +use table::table::scan::RegionScanExec; + +use super::StatsAgg; +use super::split::{StatsAggExt, has_partition_expr_mismatch}; + +#[derive(Debug)] +pub(super) struct RewriteCheck<'a> { + aggregate_exec: &'a AggregateExec, + region_scan: &'a RegionScanExec, +} + +impl<'a> RewriteCheck<'a> { + pub(super) fn new(aggregate_exec: &'a AggregateExec, region_scan: &'a RegionScanExec) -> Self { + Self { + aggregate_exec, + region_scan, + } + } + + pub(super) fn skip_reason(&self) -> DfResult> { + // MVP only handles global first-stage aggregates over append-mode region + // scans. Anything else falls back to the normal execution path. + if !self.region_scan.append_mode() + || !self.aggregate_exec.group_expr().is_empty() + || !matches!( + self.aggregate_exec.mode(), + AggregateMode::Partial | AggregateMode::Single | AggregateMode::SinglePartitioned + ) + || self + .aggregate_exec + .filter_expr() + .iter() + .any(|expr| expr.is_some()) + { + return Ok(Some(RejectReason::UnsupportedPlan)); + } + + let aggs = match self.parse_aggs() { + Ok(aggs) => aggs, + Err(reason) => return Ok(Some(reason)), + }; + + let scan_input_stats = self.try_scan_stats(); + if self.stats_unavailable(scan_input_stats.as_ref()) { + return Ok(Some(RejectReason::StatsUnavailable)); + } + + if !aggs + .iter() + .all(|agg| agg.has_stats_files(scan_input_stats.as_ref().unwrap())) + { + return Ok(Some(RejectReason::NoStatsFiles)); + } + + Ok(None) + } + + fn try_scan_stats(&self) -> Option { + match self.region_scan.scan_input_stats() { + Ok(stats) => stats, + Err(err) => { + debug!( + "Skip aggregate stats optimization: failed to collect scan input stats: {err}" + ); + None + } + } + } + + pub(super) fn parse_aggs(&self) -> Result, RejectReason> { + let aggr_exprs = self.aggregate_exec.aggr_expr(); + if aggr_exprs.is_empty() { + return Err(RejectReason::UnsupportedAggregate); + } + + aggr_exprs.iter().map(|expr| self.parse_agg(expr)).collect() + } + + fn parse_agg(&self, expr: &AggregateFunctionExpr) -> Result { + if !is_supported_aggregate_name(expr.fun().name()) { + return Err(RejectReason::UnsupportedAggregate); + } + + Self::check_agg_shape(expr)?; + + let inputs = expr.expressions(); + let name = expr.fun().name().to_ascii_lowercase(); + + // COUNT(*) is usually rewrite to COUNT(time-index) + // before this physical optimizer runs, so CountStar is mostly a defensive fallback + if name == "count" && is_count_star_expr(&inputs) { + return Ok(StatsAgg::CountStar); + } + + if inputs.len() != 1 { + return Err(RejectReason::UnsupportedAggregate); + } + + let Some(column) = inputs[0].as_any().downcast_ref::() else { + return Err(RejectReason::UnsupportedAggregate); + }; + let column_name = column.name().to_string(); + let arg_type = inputs[0] + .data_type(self.aggregate_exec.input_schema().as_ref()) + .map_err(|_| RejectReason::UnsupportedAggregate)?; + + if self.is_tag_column(&column_name) { + return Err(RejectReason::UnsupportedPlan); + } + + let is_time_index = column_name == self.region_scan.time_index(); + match (name.as_str(), is_time_index) { + ("count", true) => Ok(StatsAgg::CountTimeIndex { arg_type }), + ("count", false) => Ok(StatsAgg::CountField { + column_name, + arg_type, + }), + ("min", true) => Ok(StatsAgg::MinTimeIndex { arg_type }), + ("min", false) => Ok(StatsAgg::MinField { + column_name, + arg_type, + }), + ("max", true) => Ok(StatsAgg::MaxTimeIndex { arg_type }), + ("max", false) => Ok(StatsAgg::MaxField { + column_name, + arg_type, + }), + _ => Err(RejectReason::UnsupportedAggregate), + } + } + + pub(super) fn check_agg_shape(expr: &AggregateFunctionExpr) -> Result<(), RejectReason> { + if expr.is_distinct() || expr.is_reversed() || !expr.order_bys().is_empty() { + return Err(RejectReason::UnsupportedAggregate); + } + + Ok(()) + } + + fn is_tag_column(&self, column_name: &str) -> bool { + self.region_scan + .tag_columns() + .iter() + .any(|tag| tag == column_name) + } + + fn stats_unavailable( + &self, + scan_input_stats: Option<&store_api::scan_stats::RegionScanStats>, + ) -> bool { + // These cases keep the plan shape eligible in principle, but the scan cannot + // provide metadata that is trustworthy enough for a safe stats-only rewrite. + self.region_scan.has_predicate_without_region() + || scan_input_stats.is_none() + || has_partition_expr_mismatch(scan_input_stats) + } +} + +#[derive(Debug)] +pub(super) enum RejectReason { + /// The physical plan shape is outside the current safe rewrite envelope. + UnsupportedPlan, + /// At least one aggregate function or aggregate shape is unsupported. + UnsupportedAggregate, + /// The scan cannot provide trustworthy stats for this rewrite attempt. + StatsUnavailable, + /// The aggregate shape is supported, but no file contributes metadata-only results. + NoStatsFiles, +} + +impl std::fmt::Display for RejectReason { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + RejectReason::UnsupportedPlan => write!( + f, + "aggregate stats MVP does not support this plan shape safely" + ), + RejectReason::UnsupportedAggregate => write!( + f, + "aggregate stats MVP only supports the narrowed no-GROUP-BY count/min/max matrix" + ), + RejectReason::StatsUnavailable => write!( + f, + "aggregate stats MVP found a supported shape, but trustworthy statistics are unavailable for it" + ), + RejectReason::NoStatsFiles => write!( + f, + "aggregate stats rewrite requires at least one stats-backed file after safety checks" + ), + } + } +} + +pub(super) fn is_supported_aggregate_name(name: &str) -> bool { + matches!(name.to_ascii_lowercase().as_str(), "min" | "max" | "count") +} + +fn is_count_star_expr(inputs: &[std::sync::Arc]) -> bool { + match inputs { + // Keep the legacy empty-input shape as a compatibility fallback. + [] => true, + [arg] => arg + .as_any() + .downcast_ref::() + .is_some_and(|lit| lit.value() == &COUNT_STAR_EXPANSION), + _ => false, + } +} diff --git a/src/query/src/optimizer/aggr_stats/split.rs b/src/query/src/optimizer/aggr_stats/split.rs new file mode 100644 index 000000000000..d5d9bb98bc7b --- /dev/null +++ b/src/query/src/optimizer/aggr_stats/split.rs @@ -0,0 +1,465 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_function::aggrs::aggr_wrapper::StateWrapper; +use common_time::Timestamp; +use datafusion::functions_aggregate::count::count_udaf; +use datafusion::functions_aggregate::min_max::{max_udaf, min_udaf}; +use datafusion_common::{Result as DfResult, ScalarValue}; +use datatypes::arrow::datatypes::DataType; +use datatypes::data_type::ConcreteDataType; +use datatypes::value::Value; +use store_api::scan_stats::RegionScanStats; + +use super::StatsAgg; + +#[cfg(test)] +#[derive(Debug, Clone, PartialEq, Eq)] +pub(super) enum FileStatsRequirement { + FileExactRowCount, + FileTimeRange, + RowGroupMinMax, + RowGroupNullCount, +} + +/// Splits scan input files into two buckets for one aggregate rewrite path. +/// +/// `stats_file_ordinals` and `scan_file_ordinals` store +/// `RegionScanFileStats::file_ordinal`, not indexes into +/// `RegionScanStats::files`. The optimizer later uses these ordinals to decide +/// which physical files can be answered from metadata and which still need a +/// real scan. +/// +#[derive(Debug, Clone, PartialEq, Eq)] +pub(super) struct FileSplit { + /// File ordinals whose stats is sufficient to contribute to the rewrite. + pub stats_file_ordinals: Vec, + /// File ordinals that still need to be scanned because required stats are missing. + pub scan_file_ordinals: Vec, + /// Aggregate contribution computed from `stats_file_ordinals` only. + pub stats: T, +} + +impl FileSplit { + fn has_stats_files(&self) -> bool { + !self.stats_file_ordinals.is_empty() + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Default)] +/// File-level time bounds collected from metadata-only files. +pub(super) struct TimeBounds { + pub min: Option, + pub max: Option, +} + +#[derive(Debug, Clone, PartialEq, Eq, Default)] +/// Field value bounds collected from metadata-only files. +pub(super) struct ValueBounds { + pub min: Option, + pub max: Option, +} + +pub(super) type CountStarFileSplit = FileSplit; +pub(super) type FieldCountFileSplit = FileSplit; +pub(super) type TimeFileSplit = FileSplit; +pub(super) type FieldMinMaxFileSplit = FileSplit; + +fn stats_file_ordinals(aggregate: &StatsAgg, scan_input_stats: &RegionScanStats) -> Vec { + match aggregate { + StatsAgg::CountStar => split_count_star_files(scan_input_stats).stats_file_ordinals, + StatsAgg::CountField { column_name, .. } => { + split_count_field_files(scan_input_stats, column_name).stats_file_ordinals + } + StatsAgg::CountTimeIndex { .. } + | StatsAgg::MinTimeIndex { .. } + | StatsAgg::MaxTimeIndex { .. } => split_time_files(scan_input_stats).stats_file_ordinals, + StatsAgg::MinField { column_name, .. } | StatsAgg::MaxField { column_name, .. } => { + split_min_max_field_files(scan_input_stats, column_name).stats_file_ordinals + } + } +} + +/// Returns file ordinals that every aggregate in the list can answer from stats. +#[allow(dead_code)] +pub(super) fn common_stats_file_ordinals( + aggregates: &[StatsAgg], + scan_input_stats: &RegionScanStats, +) -> Vec { + let Some(first) = aggregates.first() else { + return Vec::new(); + }; + + let mut common = stats_file_ordinals(first, scan_input_stats) + .into_iter() + .collect::>(); + + for aggregate in &aggregates[1..] { + let ordinals = stats_file_ordinals(aggregate, scan_input_stats) + .into_iter() + .collect::>(); + common.retain(|ordinal| ordinals.contains(ordinal)); + } + + scan_input_stats + .files + .iter() + .filter_map(|file| { + common + .contains(&file.file_ordinal) + .then_some(file.file_ordinal) + }) + .collect() +} + +pub(super) fn filter_stats_by_file_ordinals( + scan_input_stats: &RegionScanStats, + file_ordinals: &[usize], +) -> RegionScanStats { + let selected = file_ordinals + .iter() + .copied() + .collect::>(); + + RegionScanStats { + files: scan_input_stats + .files + .iter() + .filter(|file| selected.contains(&file.file_ordinal)) + .cloned() + .collect(), + } +} + +pub(super) trait StatsAggExt { + fn has_stats_files(&self, scan_input_stats: &RegionScanStats) -> bool; +} + +impl StatsAggExt for StatsAgg { + fn has_stats_files(&self, scan_input_stats: &RegionScanStats) -> bool { + match self { + StatsAgg::CountStar => split_count_star_files(scan_input_stats).has_stats_files(), + StatsAgg::CountField { column_name, .. } => { + split_count_field_files(scan_input_stats, column_name).has_stats_files() + } + StatsAgg::CountTimeIndex { .. } + | StatsAgg::MinTimeIndex { .. } + | StatsAgg::MaxTimeIndex { .. } => split_time_files(scan_input_stats).has_stats_files(), + StatsAgg::MinField { column_name, .. } | StatsAgg::MaxField { column_name, .. } => { + split_min_max_field_files(scan_input_stats, column_name).has_stats_files() + } + } + } +} + +#[cfg(test)] +impl StatsAgg { + pub(super) fn file_stats_requirement(&self) -> FileStatsRequirement { + match self { + StatsAgg::CountStar => FileStatsRequirement::FileExactRowCount, + StatsAgg::CountField { .. } => FileStatsRequirement::RowGroupNullCount, + StatsAgg::CountTimeIndex { .. } + | StatsAgg::MinTimeIndex { .. } + | StatsAgg::MaxTimeIndex { .. } => FileStatsRequirement::FileTimeRange, + StatsAgg::MinField { .. } | StatsAgg::MaxField { .. } => { + FileStatsRequirement::RowGroupMinMax + } + } + } +} + +pub(super) fn has_partition_expr_mismatch(scan_input_stats: Option<&RegionScanStats>) -> bool { + scan_input_stats + .map(|stats| { + stats + .files + .iter() + .any(|file| !file.partition_expr_matches_region) + }) + .unwrap_or(false) +} + +pub(super) fn split_count_star_files(scan_input_stats: &RegionScanStats) -> CountStarFileSplit { + scan_input_stats.files.iter().fold( + FileSplit { + stats_file_ordinals: Vec::new(), + scan_file_ordinals: Vec::new(), + stats: 0, + }, + |mut split, file| { + if file.partition_expr_matches_region + && let Some(num_rows) = file.exact_num_rows + { + split.stats_file_ordinals.push(file.file_ordinal); + split.stats += num_rows; + return split; + } + + split.scan_file_ordinals.push(file.file_ordinal); + split + }, + ) +} + +pub(super) fn split_time_files(scan_input_stats: &RegionScanStats) -> TimeFileSplit { + scan_input_stats.files.iter().fold( + FileSplit { + stats_file_ordinals: Vec::new(), + scan_file_ordinals: Vec::new(), + stats: TimeBounds::default(), + }, + |mut split, file| { + if file.partition_expr_matches_region + && let Some((min_ts, max_ts)) = file.time_range + { + split.stats_file_ordinals.push(file.file_ordinal); + split.stats.min = Some(match split.stats.min { + Some(current) => Timestamp::min(current, min_ts), + None => min_ts, + }); + split.stats.max = Some(match split.stats.max { + Some(current) => Timestamp::max(current, max_ts), + None => max_ts, + }); + return split; + } + + split.scan_file_ordinals.push(file.file_ordinal); + split + }, + ) +} + +pub(super) fn split_count_field_files( + scan_input_stats: &RegionScanStats, + column_name: &str, +) -> FieldCountFileSplit { + scan_input_stats.files.iter().fold( + FileSplit { + stats_file_ordinals: Vec::new(), + scan_file_ordinals: Vec::new(), + stats: 0, + }, + |mut split, file| { + if file.partition_expr_matches_region + && let Some(non_null_rows) = file + .field_stats + .get(column_name) + .and_then(|stats| stats.exact_non_null_rows) + { + split.stats_file_ordinals.push(file.file_ordinal); + split.stats += non_null_rows; + return split; + } + + split.scan_file_ordinals.push(file.file_ordinal); + split + }, + ) +} + +pub(super) fn split_min_max_field_files( + scan_input_stats: &RegionScanStats, + column_name: &str, +) -> FieldMinMaxFileSplit { + scan_input_stats.files.iter().fold( + FileSplit { + stats_file_ordinals: Vec::new(), + scan_file_ordinals: Vec::new(), + stats: ValueBounds::default(), + }, + |mut split, file| { + if file.partition_expr_matches_region + && let Some(stats) = file.field_stats.get(column_name) + && let (Some(min_value), Some(max_value)) = + (stats.min_value.clone(), stats.max_value.clone()) + { + split.stats_file_ordinals.push(file.file_ordinal); + split.stats.min = Some(match split.stats.min { + Some(current) => Value::min(current, min_value), + None => min_value, + }); + split.stats.max = Some(match split.stats.max { + Some(current) => Value::max(current, max_value), + None => max_value, + }); + return split; + } + + split.scan_file_ordinals.push(file.file_ordinal); + split + }, + ) +} + +// These helpers are implemented for the upcoming mixed stats-plus-scan rewrite in task 3. +#[allow(dead_code)] +pub(super) fn partial_state_from_stats( + aggregate: &StatsAgg, + scan_input_stats: &RegionScanStats, +) -> DfResult> { + match aggregate { + StatsAgg::CountStar => { + let split = split_count_star_files(scan_input_stats); + if !split.has_stats_files() { + return Ok(None); + } + + let wrapper = StateWrapper::new((*count_udaf()).clone())?; + wrapper + .value_from_custom_state_fields( + &[], + vec![ScalarValue::Int64(Some(split.stats as i64))], + ) + .map(Some) + } + StatsAgg::CountField { + arg_type, + column_name, + } => { + let split = split_count_field_files(scan_input_stats, column_name); + if !split.has_stats_files() { + return Ok(None); + } + + let wrapper = StateWrapper::new((*count_udaf()).clone())?; + wrapper + .value_from_custom_state_fields( + std::slice::from_ref(arg_type), + vec![ScalarValue::Int64(Some(split.stats as i64))], + ) + .map(Some) + } + StatsAgg::CountTimeIndex { arg_type } => { + let split = split_count_star_files(scan_input_stats); + if !split.has_stats_files() { + return Ok(None); + } + + let wrapper = StateWrapper::new((*count_udaf()).clone())?; + wrapper + .value_from_custom_state_fields( + std::slice::from_ref(arg_type), + vec![ScalarValue::Int64(Some(split.stats as i64))], + ) + .map(Some) + } + StatsAgg::MinField { + arg_type, + column_name, + } => { + let split = split_min_max_field_files(scan_input_stats, column_name); + let Some(value) = split.stats.min.as_ref() else { + return Ok(None); + }; + + let wrapper = StateWrapper::new((*min_udaf()).clone())?; + wrapper + .value_from_custom_state_fields( + std::slice::from_ref(arg_type), + vec![stats_value_scalar(value, arg_type)?], + ) + .map(Some) + } + StatsAgg::MinTimeIndex { arg_type } => { + let split = split_time_files(scan_input_stats); + let Some(timestamp) = split.stats.min else { + return Ok(None); + }; + + let wrapper = StateWrapper::new((*min_udaf()).clone())?; + wrapper + .value_from_custom_state_fields( + std::slice::from_ref(arg_type), + vec![timestamp_scalar_value(×tamp, arg_type)?], + ) + .map(Some) + } + StatsAgg::MaxField { + arg_type, + column_name, + } => { + let split = split_min_max_field_files(scan_input_stats, column_name); + let Some(value) = split.stats.max.as_ref() else { + return Ok(None); + }; + + let wrapper = StateWrapper::new((*max_udaf()).clone())?; + wrapper + .value_from_custom_state_fields( + std::slice::from_ref(arg_type), + vec![stats_value_scalar(value, arg_type)?], + ) + .map(Some) + } + StatsAgg::MaxTimeIndex { arg_type } => { + let split = split_time_files(scan_input_stats); + let Some(timestamp) = split.stats.max else { + return Ok(None); + }; + + let wrapper = StateWrapper::new((*max_udaf()).clone())?; + wrapper + .value_from_custom_state_fields( + std::slice::from_ref(arg_type), + vec![timestamp_scalar_value(×tamp, arg_type)?], + ) + .map(Some) + } + } +} + +#[allow(dead_code)] +fn timestamp_scalar_value(timestamp: &Timestamp, arg_type: &DataType) -> DfResult { + match arg_type { + DataType::Timestamp(unit, tz) => { + let converted = timestamp.convert_to((*unit).into()).ok_or_else(|| { + datafusion_common::DataFusionError::Internal(format!( + "failed to convert timestamp {timestamp:?} to {unit:?}" + )) + })?; + Ok(match unit { + datatypes::arrow::datatypes::TimeUnit::Second => { + ScalarValue::TimestampSecond(Some(converted.value()), tz.clone()) + } + datatypes::arrow::datatypes::TimeUnit::Millisecond => { + ScalarValue::TimestampMillisecond(Some(converted.value()), tz.clone()) + } + datatypes::arrow::datatypes::TimeUnit::Microsecond => { + ScalarValue::TimestampMicrosecond(Some(converted.value()), tz.clone()) + } + datatypes::arrow::datatypes::TimeUnit::Nanosecond => { + ScalarValue::TimestampNanosecond(Some(converted.value()), tz.clone()) + } + }) + } + _ => Err(datafusion_common::DataFusionError::Internal(format!( + "expected timestamp arg type, got {arg_type:?}" + ))), + } +} + +#[allow(dead_code)] +fn stats_value_scalar(value: &Value, arg_type: &DataType) -> DfResult { + let output_type = ConcreteDataType::try_from(arg_type).map_err(|err| { + datafusion_common::DataFusionError::Internal(format!( + "failed to convert arrow type {arg_type:?} to concrete type: {err}" + )) + })?; + value.try_to_scalar_value(&output_type).map_err(|err| { + datafusion_common::DataFusionError::Internal(format!( + "failed to convert stats value {value:?} to scalar for {arg_type:?}: {err}" + )) + }) +} diff --git a/src/query/src/optimizer/aggr_stats/tests.rs b/src/query/src/optimizer/aggr_stats/tests.rs new file mode 100644 index 000000000000..69b0a937f9da --- /dev/null +++ b/src/query/src/optimizer/aggr_stats/tests.rs @@ -0,0 +1,1730 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::fmt; +use std::sync::Arc; +use std::sync::atomic::{AtomicUsize, Ordering}; + +use arrow::array::{Int64Array, TimestampMillisecondArray}; +use arrow::datatypes::{Field, Schema}; +use common_error::ext::BoxedError; +use common_recordbatch::{ + EmptyRecordBatchStream, RecordBatch as CommonRecordBatch, RecordBatches, + SendableRecordBatchStream, +}; +use common_time::Timestamp; +use common_time::timestamp::TimeUnit as TimestampUnit; +use datafusion::execution::context::SessionContext; +use datafusion::functions_aggregate::count::count_udaf; +use datafusion::functions_aggregate::min_max::{max_udaf, min_udaf}; +use datafusion::physical_plan::displayable; +use datafusion::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}; +use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; +use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; +use datafusion::physical_plan::union::UnionExec; +use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan}; +use datafusion_common::utils::expr::COUNT_STAR_EXPANSION; +use datafusion_expr::expr::{AggregateFunction, AggregateFunctionParams}; +use datafusion_expr::{Expr, LogicalPlan}; +use datafusion_physical_expr::PhysicalExpr; +use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr}; +use datafusion_physical_expr::expressions::{Column, Literal, PhysicalSortExpr}; +use datatypes::arrow::array::AsArray; +use datatypes::arrow::datatypes::{DataType, TimeUnit}; +use datatypes::prelude::ConcreteDataType; +use datatypes::schema::{ColumnSchema, Schema as GreptimeSchema}; +use datatypes::value::Value; +use session::context::QueryContext; +use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder, RegionMetadataRef}; +use store_api::region_engine::{ + PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties, +}; +use store_api::scan_stats::{ + RegionScanColumnStats as RegionScanColumnInputStats, + RegionScanFileStats as RegionScanFileInputStats, RegionScanStats as RegionScanInputStats, +}; +use store_api::storage::{FileId, RegionId, ScanRequest}; +use table::metadata::{TableInfoBuilder, TableMetaBuilder}; +use table::table::scan::RegionScanExec; +use table::test_util::EmptyTable; + +use super::check::{RejectReason, RewriteCheck, is_supported_aggregate_name}; +use super::split::{ + FileStatsRequirement, StatsAggExt, common_stats_file_ordinals, filter_stats_by_file_ordinals, + has_partition_expr_mismatch, partial_state_from_stats, split_count_field_files, + split_count_star_files, split_min_max_field_files, split_time_files, +}; +use super::{AggregateStats, StatsAgg}; +use crate::parser::QueryLanguageParser; +use crate::tests::new_query_engine_with_table; + +fn test_timestamp(value: i64) -> Timestamp { + Timestamp::new(value, TimestampUnit::Millisecond) +} + +fn test_file_id(seed: usize) -> FileId { + FileId::parse_str(&format!("00000000-0000-0000-0000-{:012x}", seed + 1)).unwrap() +} + +fn field_stats( + exact_non_null_rows: Option, + min_value: Option, + max_value: Option, +) -> HashMap { + HashMap::from([( + "value".to_string(), + RegionScanColumnInputStats { + min_value, + max_value, + exact_non_null_rows, + }, + )]) +} + +#[derive(Debug)] +struct StatsRecordingScanner { + schema: datatypes::schema::SchemaRef, + metadata: RegionMetadataRef, + properties: ScannerProperties, + base_stats: RegionScanInputStats, + excluded_count: Arc, + file_batches: Vec<(usize, CommonRecordBatch)>, + excluded_file_ordinals: Vec, +} + +impl StatsRecordingScanner { + fn new( + schema: datatypes::schema::SchemaRef, + metadata: RegionMetadataRef, + base_stats: RegionScanInputStats, + excluded_count: Arc, + ) -> Self { + Self { + schema, + metadata, + properties: ScannerProperties::default().with_append_mode(true), + base_stats, + excluded_count, + file_batches: Vec::new(), + excluded_file_ordinals: Vec::new(), + } + } + + fn with_file_batches(mut self, file_batches: Vec<(usize, CommonRecordBatch)>) -> Self { + self.file_batches = file_batches; + self + } +} + +impl DisplayAs for StatsRecordingScanner { + fn fmt_as(&self, _: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "StatsRecordingScanner") + } +} + +impl RegionScanner for StatsRecordingScanner { + fn name(&self) -> &str { + "StatsRecordingScanner" + } + + fn properties(&self) -> &ScannerProperties { + &self.properties + } + + fn schema(&self) -> datatypes::schema::SchemaRef { + self.schema.clone() + } + + fn metadata(&self) -> RegionMetadataRef { + self.metadata.clone() + } + + fn prepare(&mut self, request: PrepareRequest) -> Result<(), common_error::ext::BoxedError> { + request.validate()?; + if let Some(excluded_file_ordinals) = request.excluded_file_ordinals.as_ref() { + self.excluded_count + .store(excluded_file_ordinals.len(), Ordering::Relaxed); + self.excluded_file_ordinals = excluded_file_ordinals.clone(); + } + self.properties.prepare(request); + Ok(()) + } + + fn scan_partition( + &self, + _: &QueryScanContext, + _: &ExecutionPlanMetricsSet, + _: usize, + ) -> Result { + let batches = self + .file_batches + .iter() + .filter(|(file_ordinal, _)| !self.excluded_file_ordinals.contains(file_ordinal)) + .map(|(_, batch)| batch.clone()) + .collect::>(); + + if batches.is_empty() { + Ok(Box::pin(EmptyRecordBatchStream::new(self.schema.clone()))) + } else { + Ok(RecordBatches::try_new(self.schema.clone(), batches) + .map_err(BoxedError::new)? + .as_stream()) + } + } + + fn has_predicate_without_region(&self) -> bool { + false + } + + fn scan_input_stats( + &self, + ) -> Result, common_error::ext::BoxedError> { + Ok(Some(filter_stats_by_file_ordinals( + &self.base_stats, + &self + .base_stats + .files + .iter() + .filter_map(|file| { + (!self.excluded_file_ordinals.contains(&file.file_ordinal)) + .then_some(file.file_ordinal) + }) + .collect::>(), + ))) + } + + fn add_dyn_filter_to_predicate(&mut self, _: Vec>) -> Vec { + Vec::new() + } + + fn set_logical_region(&mut self, logical_region: bool) { + self.properties.set_logical_region(logical_region); + } +} + +fn test_region_metadata() -> RegionMetadataRef { + let mut builder = RegionMetadataBuilder::new(RegionId::new(1024, 1)); + builder + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ) + .with_time_index(true), + semantic_type: api::v1::SemanticType::Timestamp, + column_id: 1, + }) + .push_column_metadata(ColumnMetadata { + column_schema: ColumnSchema::new("value", ConcreteDataType::int64_datatype(), true), + semantic_type: api::v1::SemanticType::Field, + column_id: 2, + }) + .primary_key(vec![]); + Arc::new(builder.build().unwrap()) +} + +fn scan_batch( + schema: datatypes::schema::SchemaRef, + timestamps: Vec, + values: Vec>, +) -> CommonRecordBatch { + let df_record_batch = datatypes::arrow::record_batch::RecordBatch::try_new( + schema.arrow_schema().clone(), + vec![ + Arc::new(TimestampMillisecondArray::from_iter_values(timestamps)), + Arc::new(Int64Array::from(values)), + ], + ) + .unwrap(); + + CommonRecordBatch::from_df_record_batch(schema, df_record_batch) +} + +fn build_test_aggr_expr( + distinct: bool, + ignore_nulls: bool, + order_by: bool, +) -> datafusion_common::Result { + let schema = Arc::new(Schema::new(vec![ + Field::new( + "ts", + DataType::Timestamp(TimeUnit::Millisecond, None), + false, + ), + Field::new("value", DataType::Int64, true), + ])); + let args = vec![Arc::new(Column::new("value", 1)) as Arc]; + let mut builder = AggregateExprBuilder::new(Arc::new((*count_udaf()).clone()), args) + .schema(schema) + .alias("count(value)"); + + if distinct { + builder = builder.with_distinct(true); + } + if ignore_nulls { + builder = builder.ignore_nulls(); + } + if order_by { + builder = builder.order_by(vec![PhysicalSortExpr { + expr: Arc::new(Column::new("value", 0)), + options: Default::default(), + }]); + } + + builder.build() +} + +fn build_min_field_aggr_expr_with_ignore_nulls( + ignore_nulls: bool, +) -> datafusion_common::Result { + let schema = Arc::new(Schema::new(vec![ + Field::new( + "ts", + DataType::Timestamp(TimeUnit::Millisecond, None), + false, + ), + Field::new("value", DataType::Int64, true), + ])); + let args = vec![Arc::new(Column::new("value", 1)) as Arc]; + let mut builder = AggregateExprBuilder::new(Arc::new((*min_udaf()).clone()), args) + .schema(schema) + .alias("min(value)"); + if ignore_nulls { + builder = builder.ignore_nulls(); + } + + builder.build() +} + +fn build_max_field_aggr_expr_with_ignore_nulls( + ignore_nulls: bool, +) -> datafusion_common::Result { + let schema = Arc::new(Schema::new(vec![ + Field::new( + "ts", + DataType::Timestamp(TimeUnit::Millisecond, None), + false, + ), + Field::new("value", DataType::Int64, true), + ])); + let args = vec![Arc::new(Column::new("value", 1)) as Arc]; + let mut builder = AggregateExprBuilder::new(Arc::new((*max_udaf()).clone()), args) + .schema(schema) + .alias("max(value)"); + if ignore_nulls { + builder = builder.ignore_nulls(); + } + + builder.build() +} + +fn build_count_star_aggr_expr() -> datafusion_common::Result { + let schema = Arc::new(Schema::empty()); + let args = vec![Arc::new(Literal::new(COUNT_STAR_EXPANSION)) as Arc]; + + AggregateExprBuilder::new(Arc::new((*count_udaf()).clone()), args) + .schema(schema) + .alias("count(*)") + .build() +} + +fn new_test_engine() -> crate::QueryEngineRef { + let columns = vec![ + ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + ColumnSchema::new("value", ConcreteDataType::int64_datatype(), true), + ]; + let schema = Arc::new(GreptimeSchema::new(columns)); + let table_meta = TableMetaBuilder::empty() + .schema(schema) + .primary_key_indices(vec![0]) + .value_indices(vec![1]) + .next_column_id(1024) + .build() + .unwrap(); + let table_info = TableInfoBuilder::new("test", table_meta).build().unwrap(); + let table = EmptyTable::from_table_info(&table_info); + + new_query_engine_with_table(table) +} + +async fn parse_sql_to_plan(sql: &str) -> LogicalPlan { + let query_ctx = QueryContext::arc(); + let stmt = QueryLanguageParser::parse_sql(sql, &query_ctx).unwrap(); + new_test_engine() + .planner() + .plan(&stmt, query_ctx) + .await + .unwrap() +} + +#[test] +fn test_supported_aggregate_names() { + assert!(is_supported_aggregate_name("min")); + assert!(is_supported_aggregate_name("MAX")); + assert!(is_supported_aggregate_name("count")); + assert!(!is_supported_aggregate_name("sum")); + assert!(!is_supported_aggregate_name("avg(value)")); +} + +#[test] +fn test_file_stats_requirement_matrix() { + let count_star = StatsAgg::CountStar; + assert_eq!( + count_star.file_stats_requirement(), + FileStatsRequirement::FileExactRowCount + ); + + let time_min = StatsAgg::MinTimeIndex { + arg_type: DataType::Timestamp(TimeUnit::Millisecond, None), + }; + assert_eq!( + time_min.file_stats_requirement(), + FileStatsRequirement::FileTimeRange + ); + + let field_count = StatsAgg::CountField { + column_name: "value".to_string(), + arg_type: DataType::Int64, + }; + assert_eq!( + field_count.file_stats_requirement(), + FileStatsRequirement::RowGroupNullCount + ); +} + +#[test] +fn test_count_star_file_stats_eligibility() { + let stats = RegionScanInputStats { + files: vec![ + RegionScanFileInputStats { + file_ordinal: 0, + file_id: test_file_id(0), + exact_num_rows: None, + time_range: Some((test_timestamp(10), test_timestamp(20))), + field_stats: HashMap::new(), + partition_expr_matches_region: true, + }, + RegionScanFileInputStats { + file_ordinal: 1, + file_id: test_file_id(1), + exact_num_rows: Some(42), + time_range: Some((test_timestamp(30), test_timestamp(40))), + field_stats: HashMap::new(), + partition_expr_matches_region: false, + }, + RegionScanFileInputStats { + file_ordinal: 2, + file_id: test_file_id(2), + exact_num_rows: Some(7), + time_range: Some((test_timestamp(50), test_timestamp(60))), + field_stats: HashMap::new(), + partition_expr_matches_region: true, + }, + ], + }; + + assert!(StatsAgg::CountStar.has_stats_files(&stats)); +} + +#[test] +fn test_split_count_star_files() { + let stats = RegionScanInputStats { + files: vec![ + RegionScanFileInputStats { + file_ordinal: 0, + file_id: test_file_id(0), + exact_num_rows: Some(3), + time_range: Some((test_timestamp(10), test_timestamp(20))), + field_stats: HashMap::new(), + partition_expr_matches_region: true, + }, + RegionScanFileInputStats { + file_ordinal: 1, + file_id: test_file_id(1), + exact_num_rows: None, + time_range: Some((test_timestamp(30), test_timestamp(40))), + field_stats: HashMap::new(), + partition_expr_matches_region: true, + }, + RegionScanFileInputStats { + file_ordinal: 2, + file_id: test_file_id(2), + exact_num_rows: Some(5), + time_range: Some((test_timestamp(50), test_timestamp(60))), + field_stats: HashMap::new(), + partition_expr_matches_region: false, + }, + ], + }; + + let split = split_count_star_files(&stats); + assert_eq!(split.stats_file_ordinals, vec![0]); + assert_eq!(split.scan_file_ordinals, vec![1, 2]); + assert_eq!(split.stats, 3); +} + +#[test] +fn test_split_count_star_files_keeps_zero_row_files_stats_eligible() { + let stats = RegionScanInputStats { + files: vec![ + RegionScanFileInputStats { + file_ordinal: 0, + file_id: test_file_id(0), + exact_num_rows: Some(0), + time_range: None, + field_stats: HashMap::new(), + partition_expr_matches_region: true, + }, + RegionScanFileInputStats { + file_ordinal: 1, + file_id: test_file_id(1), + exact_num_rows: None, + time_range: None, + field_stats: HashMap::new(), + partition_expr_matches_region: true, + }, + ], + }; + + let split = split_count_star_files(&stats); + assert_eq!(split.stats_file_ordinals, vec![0]); + assert_eq!(split.scan_file_ordinals, vec![1]); + assert_eq!(split.stats, 0); +} + +#[test] +fn test_supported_aggregate_shape_allows_ignore_nulls_for_count_min_max() { + let distinct = build_test_aggr_expr(true, false, false).unwrap(); + let count_ignore_nulls = build_test_aggr_expr(false, true, false).unwrap(); + let min_ignore_nulls = build_min_field_aggr_expr_with_ignore_nulls(true).unwrap(); + let max_ignore_nulls = build_max_field_aggr_expr_with_ignore_nulls(true).unwrap(); + let order_by = build_test_aggr_expr(false, false, true).unwrap(); + + let reject_reason = |expr: AggregateFunctionExpr| { + let schema = execution_test_schema(); + let region_scan = Arc::new( + RegionScanExec::new( + Box::new(StatsRecordingScanner::new( + schema.clone(), + test_region_metadata(), + execution_test_stats(), + Arc::new(AtomicUsize::new(0)), + )), + ScanRequest::default(), + None, + ) + .unwrap(), + ); + let aggregate = Arc::new( + AggregateExec::try_new( + AggregateMode::Partial, + PhysicalGroupBy::new_single(vec![]), + vec![Arc::new(expr)], + vec![None], + region_scan.clone(), + schema.arrow_schema().clone(), + ) + .unwrap(), + ); + let check = RewriteCheck::new(aggregate.as_ref(), region_scan.as_ref()); + check.skip_reason().unwrap() + }; + + assert!(matches!( + RewriteCheck::check_agg_shape(&distinct), + Err(RejectReason::UnsupportedAggregate) + )); + assert!(reject_reason(count_ignore_nulls).is_none()); + assert!(reject_reason(min_ignore_nulls).is_none()); + assert!(reject_reason(max_ignore_nulls).is_none()); + assert!(matches!( + RewriteCheck::check_agg_shape(&order_by), + Err(RejectReason::UnsupportedAggregate) + )); +} + +#[test] +fn test_count_star_expansion_is_treated_as_count_star() { + let expr = build_count_star_aggr_expr().unwrap(); + + assert_eq!(expr.fun().name(), "count"); + assert_eq!(expr.expressions().len(), 1); + assert!( + expr.expressions()[0] + .as_any() + .downcast_ref::() + .is_some_and(|lit| lit.value() == &COUNT_STAR_EXPANSION) + ); +} + +#[tokio::test] +async fn test_sql_count_star_is_planned_with_count_star_expansion() { + let plan = parse_sql_to_plan("select count(*) from test").await; + let LogicalPlan::Projection(projection) = plan else { + panic!("expected projection over aggregate plan, got {plan:?}"); + }; + let LogicalPlan::Aggregate(aggregate) = projection.input.as_ref() else { + panic!("expected aggregate input, got {:?}", projection.input); + }; + + assert_eq!(aggregate.aggr_expr.len(), 1); + let Expr::AggregateFunction(AggregateFunction { + func, + params: AggregateFunctionParams { args, .. }, + }) = &aggregate.aggr_expr[0] + else { + panic!( + "expected aggregate function expr, got {:?}", + aggregate.aggr_expr[0] + ); + }; + + assert_eq!(func.name(), "count"); + assert_eq!(args.len(), 1); + assert!(matches!( + &args[0], + Expr::Literal(value, _) if value == &COUNT_STAR_EXPANSION + )); +} + +#[test] +fn test_partition_expr_mismatch_detection() { + let stats = RegionScanInputStats { + files: vec![ + RegionScanFileInputStats { + file_ordinal: 0, + file_id: test_file_id(0), + exact_num_rows: Some(1), + time_range: Some((test_timestamp(10), test_timestamp(20))), + field_stats: HashMap::new(), + partition_expr_matches_region: true, + }, + RegionScanFileInputStats { + file_ordinal: 1, + file_id: test_file_id(1), + exact_num_rows: Some(2), + time_range: Some((test_timestamp(30), test_timestamp(40))), + field_stats: HashMap::new(), + partition_expr_matches_region: false, + }, + ], + }; + + assert!(has_partition_expr_mismatch(Some(&stats))); +} + +#[test] +fn test_time_file_stats_eligibility() { + let stats = RegionScanInputStats { + files: vec![ + RegionScanFileInputStats { + file_ordinal: 0, + file_id: test_file_id(0), + exact_num_rows: Some(3), + time_range: None, + field_stats: HashMap::new(), + partition_expr_matches_region: true, + }, + RegionScanFileInputStats { + file_ordinal: 1, + file_id: test_file_id(1), + exact_num_rows: None, + time_range: Some((test_timestamp(30), test_timestamp(40))), + field_stats: HashMap::new(), + partition_expr_matches_region: true, + }, + ], + }; + + assert!( + StatsAgg::MinTimeIndex { + arg_type: DataType::Timestamp(TimeUnit::Millisecond, None), + } + .has_stats_files(&stats) + ); +} + +#[test] +fn test_split_time_files() { + let stats = RegionScanInputStats { + files: vec![ + RegionScanFileInputStats { + file_ordinal: 0, + file_id: test_file_id(0), + exact_num_rows: Some(3), + time_range: Some((test_timestamp(50), test_timestamp(70))), + field_stats: HashMap::new(), + partition_expr_matches_region: true, + }, + RegionScanFileInputStats { + file_ordinal: 1, + file_id: test_file_id(1), + exact_num_rows: Some(4), + time_range: None, + field_stats: HashMap::new(), + partition_expr_matches_region: true, + }, + RegionScanFileInputStats { + file_ordinal: 2, + file_id: test_file_id(2), + exact_num_rows: Some(5), + time_range: Some((test_timestamp(10), test_timestamp(20))), + field_stats: HashMap::new(), + partition_expr_matches_region: true, + }, + RegionScanFileInputStats { + file_ordinal: 3, + file_id: test_file_id(3), + exact_num_rows: Some(6), + time_range: Some((test_timestamp(5), test_timestamp(100))), + field_stats: HashMap::new(), + partition_expr_matches_region: false, + }, + ], + }; + + let split = split_time_files(&stats); + assert_eq!(split.stats_file_ordinals, vec![0, 2]); + assert_eq!(split.scan_file_ordinals, vec![1, 3]); + assert_eq!(split.stats.min, Some(test_timestamp(10))); + assert_eq!(split.stats.max, Some(test_timestamp(70))); +} + +#[test] +fn test_count_field_file_stats_eligibility() { + let stats = RegionScanInputStats { + files: vec![ + RegionScanFileInputStats { + file_ordinal: 0, + file_id: test_file_id(0), + exact_num_rows: Some(3), + time_range: Some((test_timestamp(10), test_timestamp(20))), + field_stats: HashMap::new(), + partition_expr_matches_region: true, + }, + RegionScanFileInputStats { + file_ordinal: 1, + file_id: test_file_id(1), + exact_num_rows: Some(5), + time_range: Some((test_timestamp(30), test_timestamp(40))), + field_stats: field_stats(Some(4), Some(Value::Int64(1)), Some(Value::Int64(9))), + partition_expr_matches_region: true, + }, + ], + }; + + assert!( + StatsAgg::CountField { + column_name: "value".to_string(), + arg_type: DataType::Int64, + } + .has_stats_files(&stats) + ); +} + +#[test] +fn test_split_count_field_files() { + let stats = RegionScanInputStats { + files: vec![ + RegionScanFileInputStats { + file_ordinal: 0, + file_id: test_file_id(0), + exact_num_rows: Some(3), + time_range: Some((test_timestamp(10), test_timestamp(20))), + field_stats: field_stats(Some(2), Some(Value::Int64(1)), Some(Value::Int64(3))), + partition_expr_matches_region: true, + }, + RegionScanFileInputStats { + file_ordinal: 1, + file_id: test_file_id(1), + exact_num_rows: Some(4), + time_range: Some((test_timestamp(30), test_timestamp(40))), + field_stats: HashMap::new(), + partition_expr_matches_region: true, + }, + RegionScanFileInputStats { + file_ordinal: 2, + file_id: test_file_id(2), + exact_num_rows: Some(5), + time_range: Some((test_timestamp(50), test_timestamp(60))), + field_stats: field_stats(Some(4), Some(Value::Int64(5)), Some(Value::Int64(8))), + partition_expr_matches_region: true, + }, + ], + }; + + let split = split_count_field_files(&stats, "value"); + assert_eq!(split.stats_file_ordinals, vec![0, 2]); + assert_eq!(split.scan_file_ordinals, vec![1]); + assert_eq!(split.stats, 6); +} + +#[test] +fn test_min_max_field_stats_eligibility() { + let stats = RegionScanInputStats { + files: vec![ + RegionScanFileInputStats { + file_ordinal: 0, + file_id: test_file_id(0), + exact_num_rows: Some(3), + time_range: Some((test_timestamp(10), test_timestamp(20))), + field_stats: field_stats(Some(2), Some(Value::Int64(1)), Some(Value::Int64(3))), + partition_expr_matches_region: true, + }, + RegionScanFileInputStats { + file_ordinal: 1, + file_id: test_file_id(1), + exact_num_rows: Some(4), + time_range: Some((test_timestamp(30), test_timestamp(40))), + field_stats: HashMap::new(), + partition_expr_matches_region: false, + }, + ], + }; + + assert!( + StatsAgg::MinField { + column_name: "value".to_string(), + arg_type: DataType::Int64, + } + .has_stats_files(&stats) + ); +} + +#[test] +fn test_split_min_max_field_files() { + let stats = RegionScanInputStats { + files: vec![ + RegionScanFileInputStats { + file_ordinal: 0, + file_id: test_file_id(0), + exact_num_rows: Some(3), + time_range: Some((test_timestamp(10), test_timestamp(20))), + field_stats: field_stats(Some(2), Some(Value::Int64(4)), Some(Value::Int64(9))), + partition_expr_matches_region: true, + }, + RegionScanFileInputStats { + file_ordinal: 1, + file_id: test_file_id(1), + exact_num_rows: Some(4), + time_range: Some((test_timestamp(30), test_timestamp(40))), + field_stats: HashMap::new(), + partition_expr_matches_region: true, + }, + RegionScanFileInputStats { + file_ordinal: 2, + file_id: test_file_id(2), + exact_num_rows: Some(5), + time_range: Some((test_timestamp(50), test_timestamp(60))), + field_stats: field_stats(Some(4), Some(Value::Int64(1)), Some(Value::Int64(7))), + partition_expr_matches_region: true, + }, + ], + }; + + let split = split_min_max_field_files(&stats, "value"); + assert_eq!(split.stats_file_ordinals, vec![0, 2]); + assert_eq!(split.scan_file_ordinals, vec![1]); + assert_eq!(split.stats.min, Some(Value::Int64(1))); + assert_eq!(split.stats.max, Some(Value::Int64(9))); +} + +#[test] +fn test_common_stats_file_ordinals_intersects_supported_aggregates() { + let stats = RegionScanInputStats { + files: vec![ + RegionScanFileInputStats { + file_ordinal: 0, + file_id: test_file_id(0), + exact_num_rows: Some(3), + time_range: Some((test_timestamp(10), test_timestamp(20))), + field_stats: field_stats(Some(2), Some(Value::Int64(4)), Some(Value::Int64(9))), + partition_expr_matches_region: true, + }, + RegionScanFileInputStats { + file_ordinal: 1, + file_id: test_file_id(1), + exact_num_rows: Some(4), + time_range: Some((test_timestamp(30), test_timestamp(40))), + field_stats: HashMap::new(), + partition_expr_matches_region: true, + }, + RegionScanFileInputStats { + file_ordinal: 2, + file_id: test_file_id(2), + exact_num_rows: Some(5), + time_range: Some((test_timestamp(50), test_timestamp(60))), + field_stats: field_stats(Some(4), Some(Value::Int64(1)), Some(Value::Int64(7))), + partition_expr_matches_region: true, + }, + ], + }; + + let aggregates = vec![ + StatsAgg::CountStar, + StatsAgg::CountField { + column_name: "value".to_string(), + arg_type: DataType::Int64, + }, + StatsAgg::MaxField { + column_name: "value".to_string(), + arg_type: DataType::Int64, + }, + ]; + + assert_eq!(common_stats_file_ordinals(&aggregates, &stats), vec![0, 2]); +} + +#[test] +fn test_common_stats_file_ordinals_returns_only_shared_stats_eligible_files() { + let stats = RegionScanInputStats { + files: vec![ + RegionScanFileInputStats { + file_ordinal: 0, + file_id: test_file_id(0), + exact_num_rows: Some(3), + time_range: Some((test_timestamp(10), test_timestamp(20))), + field_stats: field_stats(Some(2), Some(Value::Int64(4)), Some(Value::Int64(9))), + partition_expr_matches_region: true, + }, + RegionScanFileInputStats { + file_ordinal: 1, + file_id: test_file_id(1), + exact_num_rows: Some(4), + time_range: Some((test_timestamp(30), test_timestamp(40))), + field_stats: HashMap::new(), + partition_expr_matches_region: true, + }, + ], + }; + + let aggregates = vec![ + StatsAgg::CountStar, + StatsAgg::CountField { + column_name: "value".to_string(), + arg_type: DataType::Int64, + }, + ]; + + assert_eq!(common_stats_file_ordinals(&aggregates, &stats), vec![0]); +} + +#[test] +fn test_filter_stats_by_file_ordinals_keeps_only_selected_files() { + let stats = RegionScanInputStats { + files: vec![ + RegionScanFileInputStats { + file_ordinal: 0, + file_id: test_file_id(0), + exact_num_rows: Some(3), + time_range: Some((test_timestamp(10), test_timestamp(20))), + field_stats: HashMap::new(), + partition_expr_matches_region: true, + }, + RegionScanFileInputStats { + file_ordinal: 1, + file_id: test_file_id(1), + exact_num_rows: Some(4), + time_range: Some((test_timestamp(30), test_timestamp(40))), + field_stats: HashMap::new(), + partition_expr_matches_region: true, + }, + ], + }; + + let filtered = filter_stats_by_file_ordinals(&stats, &[1]); + assert_eq!(filtered.files.len(), 1); + assert_eq!(filtered.files[0].file_ordinal, 1); +} + +#[test] +fn test_partial_state_from_stats_count_star() { + let aggregate = StatsAgg::CountStar; + let stats = RegionScanInputStats { + files: vec![ + RegionScanFileInputStats { + file_ordinal: 0, + file_id: test_file_id(0), + exact_num_rows: Some(3), + time_range: Some((test_timestamp(10), test_timestamp(20))), + field_stats: HashMap::new(), + partition_expr_matches_region: true, + }, + RegionScanFileInputStats { + file_ordinal: 1, + file_id: test_file_id(1), + exact_num_rows: Some(4), + time_range: Some((test_timestamp(30), test_timestamp(40))), + field_stats: HashMap::new(), + partition_expr_matches_region: true, + }, + ], + }; + + let value = partial_state_from_stats(&aggregate, &stats) + .unwrap() + .unwrap(); + let array = value.to_array().unwrap(); + let struct_array = array.as_struct(); + let count_values = struct_array + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(count_values.value(0), 7); +} + +#[test] +fn test_partial_state_from_stats_count_field() { + let aggregate = StatsAgg::CountField { + column_name: "value".to_string(), + arg_type: DataType::Int64, + }; + let stats = RegionScanInputStats { + files: vec![ + RegionScanFileInputStats { + file_ordinal: 0, + file_id: test_file_id(0), + exact_num_rows: Some(3), + time_range: Some((test_timestamp(10), test_timestamp(20))), + field_stats: field_stats(Some(2), Some(Value::Int64(1)), Some(Value::Int64(3))), + partition_expr_matches_region: true, + }, + RegionScanFileInputStats { + file_ordinal: 1, + file_id: test_file_id(1), + exact_num_rows: Some(4), + time_range: Some((test_timestamp(30), test_timestamp(40))), + field_stats: field_stats(Some(4), Some(Value::Int64(5)), Some(Value::Int64(9))), + partition_expr_matches_region: true, + }, + ], + }; + + let value = partial_state_from_stats(&aggregate, &stats) + .unwrap() + .unwrap(); + let array = value.to_array().unwrap(); + let struct_array = array.as_struct(); + let count_values = struct_array + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(count_values.value(0), 6); +} + +#[test] +fn test_partial_state_from_stats_min_time() { + let aggregate = StatsAgg::MinTimeIndex { + arg_type: DataType::Timestamp(TimeUnit::Millisecond, None), + }; + let stats = RegionScanInputStats { + files: vec![ + RegionScanFileInputStats { + file_ordinal: 0, + file_id: test_file_id(0), + exact_num_rows: Some(3), + time_range: Some((test_timestamp(50), test_timestamp(70))), + field_stats: HashMap::new(), + partition_expr_matches_region: true, + }, + RegionScanFileInputStats { + file_ordinal: 1, + file_id: test_file_id(1), + exact_num_rows: Some(5), + time_range: Some((test_timestamp(10), test_timestamp(20))), + field_stats: HashMap::new(), + partition_expr_matches_region: true, + }, + ], + }; + + let value = partial_state_from_stats(&aggregate, &stats) + .unwrap() + .unwrap(); + let array = value.to_array().unwrap(); + let struct_array = array.as_struct(); + let ts_values = struct_array + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(ts_values.value(0), 10); +} + +#[test] +fn test_partial_state_from_stats_max_field() { + let aggregate = StatsAgg::MaxField { + column_name: "value".to_string(), + arg_type: DataType::Int64, + }; + let stats = RegionScanInputStats { + files: vec![ + RegionScanFileInputStats { + file_ordinal: 0, + file_id: test_file_id(0), + exact_num_rows: Some(3), + time_range: Some((test_timestamp(10), test_timestamp(20))), + field_stats: field_stats(Some(2), Some(Value::Int64(1)), Some(Value::Int64(3))), + partition_expr_matches_region: true, + }, + RegionScanFileInputStats { + file_ordinal: 1, + file_id: test_file_id(1), + exact_num_rows: Some(4), + time_range: Some((test_timestamp(30), test_timestamp(40))), + field_stats: field_stats(Some(4), Some(Value::Int64(5)), Some(Value::Int64(9))), + partition_expr_matches_region: true, + }, + ], + }; + + let value = partial_state_from_stats(&aggregate, &stats) + .unwrap() + .unwrap(); + let array = value.to_array().unwrap(); + let struct_array = array.as_struct(); + let max_values = struct_array + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(max_values.value(0), 9); +} + +#[test] +fn test_optimizer_rewrites_into_final_union_partial_scan_and_stats() { + let schema = Arc::new(GreptimeSchema::new(vec![ + ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + ColumnSchema::new("value", ConcreteDataType::int64_datatype(), true), + ])); + let metadata = test_region_metadata(); + let excluded_count = Arc::new(AtomicUsize::new(0)); + let base_stats = RegionScanInputStats { + files: vec![ + RegionScanFileInputStats { + file_ordinal: 0, + file_id: test_file_id(0), + exact_num_rows: Some(3), + time_range: Some((test_timestamp(10), test_timestamp(20))), + field_stats: field_stats(Some(2), Some(Value::Int64(1)), Some(Value::Int64(3))), + partition_expr_matches_region: true, + }, + RegionScanFileInputStats { + file_ordinal: 1, + file_id: test_file_id(1), + exact_num_rows: Some(4), + time_range: Some((test_timestamp(30), test_timestamp(40))), + field_stats: HashMap::new(), + partition_expr_matches_region: true, + }, + RegionScanFileInputStats { + file_ordinal: 2, + file_id: test_file_id(2), + exact_num_rows: Some(5), + time_range: Some((test_timestamp(50), test_timestamp(60))), + field_stats: field_stats(Some(4), Some(Value::Int64(5)), Some(Value::Int64(9))), + partition_expr_matches_region: true, + }, + ], + }; + let scanner = Box::new(StatsRecordingScanner::new( + schema.clone(), + metadata, + base_stats, + excluded_count.clone(), + )); + let scan = Arc::new(RegionScanExec::new(scanner, ScanRequest::default(), None).unwrap()); + let aggr_expr = Arc::new(build_test_aggr_expr(false, false, false).unwrap()); + let plan: Arc = Arc::new( + AggregateExec::try_new( + AggregateMode::Single, + PhysicalGroupBy::new_single(vec![]), + vec![aggr_expr], + vec![None], + scan, + schema.arrow_schema().clone(), + ) + .unwrap(), + ); + + let optimized = AggregateStats::do_optimize(plan).unwrap(); + let final_agg = optimized.as_any().downcast_ref::().unwrap(); + assert_eq!(final_agg.mode(), &AggregateMode::Final); + + let coalesce = final_agg + .input() + .as_any() + .downcast_ref::() + .unwrap(); + let union = coalesce + .input() + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(union.inputs().len(), 2); + + let partial_agg = union.inputs()[0] + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(partial_agg.mode(), &AggregateMode::Partial); + let partial_scan = partial_agg + .input() + .as_any() + .downcast_ref::() + .unwrap(); + let remaining = partial_scan.scan_input_stats().unwrap().unwrap(); + assert_eq!( + remaining + .files + .iter() + .map(|file| file.file_ordinal) + .collect::>(), + vec![1] + ); + assert_eq!(excluded_count.load(Ordering::Relaxed), 2); + + let explain = displayable(optimized.as_ref()).indent(true).to_string(); + assert!( + explain.contains("aggregate_stats: rewritten=true"), + "explain: {explain}" + ); + assert!(explain.contains("stats_files=2"), "explain: {explain}"); + assert!(explain.contains(&test_file_id(0).to_string()), "explain: {explain}"); + assert!(explain.contains(&test_file_id(2).to_string()), "explain: {explain}"); +} + +#[test] +fn test_optimizer_rewrites_final_partial_plan_by_replacing_partial_input() { + let schema = Arc::new(GreptimeSchema::new(vec![ + ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ), + ColumnSchema::new("value", ConcreteDataType::int64_datatype(), true), + ])); + let metadata = test_region_metadata(); + let excluded_count = Arc::new(AtomicUsize::new(0)); + let base_stats = RegionScanInputStats { + files: vec![ + RegionScanFileInputStats { + file_ordinal: 0, + file_id: test_file_id(0), + exact_num_rows: Some(3), + time_range: Some((test_timestamp(10), test_timestamp(20))), + field_stats: field_stats(Some(2), Some(Value::Int64(1)), Some(Value::Int64(3))), + partition_expr_matches_region: true, + }, + RegionScanFileInputStats { + file_ordinal: 1, + file_id: test_file_id(1), + exact_num_rows: Some(4), + time_range: Some((test_timestamp(30), test_timestamp(40))), + field_stats: HashMap::new(), + partition_expr_matches_region: true, + }, + RegionScanFileInputStats { + file_ordinal: 2, + file_id: test_file_id(2), + exact_num_rows: Some(5), + time_range: Some((test_timestamp(50), test_timestamp(60))), + field_stats: field_stats(Some(4), Some(Value::Int64(5)), Some(Value::Int64(9))), + partition_expr_matches_region: true, + }, + ], + }; + let scanner = Box::new(StatsRecordingScanner::new( + schema.clone(), + metadata, + base_stats, + excluded_count.clone(), + )); + let scan = Arc::new(RegionScanExec::new(scanner, ScanRequest::default(), None).unwrap()); + let aggr_expr = Arc::new(build_test_aggr_expr(false, false, false).unwrap()); + let partial: Arc = Arc::new( + AggregateExec::try_new( + AggregateMode::Partial, + PhysicalGroupBy::new_single(vec![]), + vec![aggr_expr.clone()], + vec![None], + scan, + schema.arrow_schema().clone(), + ) + .unwrap(), + ); + let plan: Arc = Arc::new( + AggregateExec::try_new( + AggregateMode::Final, + PhysicalGroupBy::new_single(vec![]), + vec![aggr_expr], + vec![None], + Arc::new(CoalescePartitionsExec::new(partial.clone())), + partial.schema(), + ) + .unwrap(), + ); + + let optimized = AggregateStats::do_optimize(plan).unwrap(); + let final_agg = optimized.as_any().downcast_ref::().unwrap(); + assert_eq!(final_agg.mode(), &AggregateMode::Final); + + let coalesce = final_agg + .input() + .as_any() + .downcast_ref::() + .unwrap(); + let union = coalesce + .input() + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(union.inputs().len(), 2); + + let partial_agg = union.inputs()[0] + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(partial_agg.mode(), &AggregateMode::Partial); + let partial_scan = partial_agg + .input() + .as_any() + .downcast_ref::() + .unwrap(); + let remaining = partial_scan.scan_input_stats().unwrap().unwrap(); + assert_eq!( + remaining + .files + .iter() + .map(|file| file.file_ordinal) + .collect::>(), + vec![1] + ); + assert_eq!(excluded_count.load(Ordering::Relaxed), 2); +} + +#[derive(Clone, Copy)] +enum ExecutionAggExprCase { + CountValue { ignore_nulls: bool }, + CountStar, + MinValue { ignore_nulls: bool }, + MaxValue { ignore_nulls: bool }, +} + +impl ExecutionAggExprCase { + fn build(self) -> AggregateFunctionExpr { + match self { + ExecutionAggExprCase::CountValue { ignore_nulls } => { + build_test_aggr_expr(false, ignore_nulls, false).unwrap() + } + ExecutionAggExprCase::CountStar => build_count_star_aggr_expr().unwrap(), + ExecutionAggExprCase::MinValue { ignore_nulls } => { + build_min_field_aggr_expr_with_ignore_nulls(ignore_nulls).unwrap() + } + ExecutionAggExprCase::MaxValue { ignore_nulls } => { + build_max_field_aggr_expr_with_ignore_nulls(ignore_nulls).unwrap() + } + } + } +} + +fn execution_test_schema() -> datatypes::schema::SchemaRef { + Arc::new(GreptimeSchema::new(vec![ + ColumnSchema::new( + "ts", + ConcreteDataType::timestamp_millisecond_datatype(), + false, + ) + .with_time_index(true), + ColumnSchema::new("value", ConcreteDataType::int64_datatype(), true), + ])) +} + +fn execution_test_stats() -> RegionScanInputStats { + RegionScanInputStats { + files: vec![ + RegionScanFileInputStats { + file_ordinal: 0, + file_id: test_file_id(0), + exact_num_rows: Some(3), + time_range: Some((test_timestamp(10), test_timestamp(12))), + field_stats: field_stats(Some(2), Some(Value::Int64(1)), Some(Value::Int64(3))), + partition_expr_matches_region: true, + }, + RegionScanFileInputStats { + file_ordinal: 1, + file_id: test_file_id(1), + exact_num_rows: Some(4), + time_range: Some((test_timestamp(30), test_timestamp(33))), + field_stats: HashMap::new(), + partition_expr_matches_region: true, + }, + RegionScanFileInputStats { + file_ordinal: 2, + file_id: test_file_id(2), + exact_num_rows: Some(5), + time_range: Some((test_timestamp(50), test_timestamp(54))), + field_stats: field_stats(Some(4), Some(Value::Int64(7)), Some(Value::Int64(10))), + partition_expr_matches_region: true, + }, + ], + } +} + +fn execution_test_file_batches( + schema: datatypes::schema::SchemaRef, +) -> Vec<(usize, CommonRecordBatch)> { + vec![ + ( + 0, + scan_batch( + schema.clone(), + vec![10, 11, 12], + vec![Some(1), None, Some(3)], + ), + ), + ( + 1, + scan_batch( + schema.clone(), + vec![30, 31, 32, 33], + vec![Some(4), Some(5), None, Some(6)], + ), + ), + ( + 2, + scan_batch( + schema, + vec![50, 51, 52, 53, 54], + vec![Some(7), Some(8), Some(9), Some(10), None], + ), + ), + ] +} + +fn build_execution_test_plan( + schema: datatypes::schema::SchemaRef, + metadata: RegionMetadataRef, + base_stats: RegionScanInputStats, + file_batches: Vec<(usize, CommonRecordBatch)>, + aggr_expr: AggregateFunctionExpr, + excluded_count: Arc, +) -> Arc { + let scanner = Box::new( + StatsRecordingScanner::new(schema.clone(), metadata, base_stats, excluded_count) + .with_file_batches(file_batches), + ); + let scan = Arc::new(RegionScanExec::new(scanner, ScanRequest::default(), None).unwrap()); + let aggr_expr = Arc::new(aggr_expr); + let partial: Arc = Arc::new( + AggregateExec::try_new( + AggregateMode::Partial, + PhysicalGroupBy::new_single(vec![]), + vec![aggr_expr.clone()], + vec![None], + scan, + schema.arrow_schema().clone(), + ) + .unwrap(), + ); + + Arc::new( + AggregateExec::try_new( + AggregateMode::Final, + PhysicalGroupBy::new_single(vec![]), + vec![aggr_expr], + vec![None], + Arc::new(CoalescePartitionsExec::new(partial.clone())), + partial.schema(), + ) + .unwrap(), + ) +} + +fn optimized_plan_uses_stats_union(plan: &Arc) -> bool { + let Some(final_agg) = plan.as_any().downcast_ref::() else { + return false; + }; + + let input = final_agg.input(); + if let Some(coalesce) = input.as_any().downcast_ref::() { + return coalesce + .input() + .as_any() + .downcast_ref::() + .is_some(); + } + + input.as_any().downcast_ref::().is_some() +} + +#[tokio::test] +async fn test_optimizer_execution_matrix() { + struct Case { + name: &'static str, + expr: ExecutionAggExprCase, + expect_rewrite: bool, + expected_excluded_count: usize, + expected: String, + } + + let cases = [ + Case { + name: "count value", + expr: ExecutionAggExprCase::CountValue { + ignore_nulls: false, + }, + expect_rewrite: true, + expected_excluded_count: 2, + expected: [ + "+--------------+", + "| count(value) |", + "+--------------+", + "| 9 |", + "+--------------+", + ] + .join("\n"), + }, + Case { + name: "count value ignore nulls", + expr: ExecutionAggExprCase::CountValue { ignore_nulls: true }, + expect_rewrite: true, + expected_excluded_count: 2, + expected: [ + "+--------------+", + "| count(value) |", + "+--------------+", + "| 9 |", + "+--------------+", + ] + .join("\n"), + }, + Case { + name: "count star", + expr: ExecutionAggExprCase::CountStar, + expect_rewrite: true, + expected_excluded_count: 3, + expected: [ + "+----------+", + "| count(*) |", + "+----------+", + "| 12 |", + "+----------+", + ] + .join("\n"), + }, + Case { + name: "min value", + expr: ExecutionAggExprCase::MinValue { + ignore_nulls: false, + }, + expect_rewrite: true, + expected_excluded_count: 2, + expected: [ + "+------------+", + "| min(value) |", + "+------------+", + "| 1 |", + "+------------+", + ] + .join("\n"), + }, + Case { + name: "min value ignore nulls", + expr: ExecutionAggExprCase::MinValue { ignore_nulls: true }, + expect_rewrite: true, + expected_excluded_count: 2, + expected: [ + "+------------+", + "| min(value) |", + "+------------+", + "| 1 |", + "+------------+", + ] + .join("\n"), + }, + Case { + name: "max value", + expr: ExecutionAggExprCase::MaxValue { + ignore_nulls: false, + }, + expect_rewrite: true, + expected_excluded_count: 2, + expected: [ + "+------------+", + "| max(value) |", + "+------------+", + "| 10 |", + "+------------+", + ] + .join("\n"), + }, + Case { + name: "max value ignore nulls", + expr: ExecutionAggExprCase::MaxValue { ignore_nulls: true }, + expect_rewrite: true, + expected_excluded_count: 2, + expected: [ + "+------------+", + "| max(value) |", + "+------------+", + "| 10 |", + "+------------+", + ] + .join("\n"), + }, + ]; + + let schema = execution_test_schema(); + let metadata = test_region_metadata(); + let base_stats = execution_test_stats(); + let file_batches = execution_test_file_batches(schema.clone()); + + for case in cases { + let unoptimized = build_execution_test_plan( + schema.clone(), + metadata.clone(), + base_stats.clone(), + file_batches.clone(), + case.expr.build(), + Arc::new(AtomicUsize::new(0)), + ); + let unoptimized_result = + datafusion::physical_plan::collect(unoptimized, SessionContext::default().task_ctx()) + .await + .unwrap(); + + let optimized_excluded_count = Arc::new(AtomicUsize::new(0)); + let optimized = AggregateStats::do_optimize(build_execution_test_plan( + schema.clone(), + metadata.clone(), + base_stats.clone(), + file_batches.clone(), + case.expr.build(), + optimized_excluded_count.clone(), + )) + .unwrap(); + let optimized_result = datafusion::physical_plan::collect( + optimized.clone(), + SessionContext::default().task_ctx(), + ) + .await + .unwrap(); + + let unoptimized_pretty = arrow::util::pretty::pretty_format_batches(&unoptimized_result) + .unwrap() + .to_string(); + let optimized_pretty = arrow::util::pretty::pretty_format_batches(&optimized_result) + .unwrap() + .to_string(); + + assert_eq!( + unoptimized_pretty, + case.expected.as_str(), + "case: {}", + case.name + ); + assert_eq!( + optimized_pretty, + case.expected.as_str(), + "case: {}", + case.name + ); + assert_eq!( + optimized_plan_uses_stats_union(&optimized), + case.expect_rewrite, + "case: {}", + case.name + ); + assert_eq!( + optimized_excluded_count.load(Ordering::Relaxed), + case.expected_excluded_count, + "case: {}", + case.name + ); + } +} + +#[test] +fn test_optimizer_observability_distinguishes_rewrite_hit_and_miss() { + let schema = execution_test_schema(); + let metadata = test_region_metadata(); + let base_stats = execution_test_stats(); + let file_batches = execution_test_file_batches(schema.clone()); + let hit_excluded_count = Arc::new(AtomicUsize::new(0)); + + let hit_plan = build_execution_test_plan( + schema.clone(), + metadata.clone(), + base_stats.clone(), + file_batches.clone(), + ExecutionAggExprCase::CountValue { + ignore_nulls: false, + } + .build(), + hit_excluded_count.clone(), + ); + let optimized_hit = AggregateStats::do_optimize(hit_plan).unwrap(); + assert!(optimized_plan_uses_stats_union(&optimized_hit)); + assert_eq!(hit_excluded_count.load(Ordering::Relaxed), 2); + + let hit_explain = displayable(optimized_hit.as_ref()).indent(true).to_string(); + assert!(hit_explain.contains("aggregate_stats: rewritten=true")); + assert!(hit_explain.contains("stats_files=2")); + assert!(hit_explain.contains(&test_file_id(0).to_string())); + assert!(hit_explain.contains(&test_file_id(2).to_string())); + + let miss_excluded_count = Arc::new(AtomicUsize::new(0)); + let miss_scanner = Box::new( + StatsRecordingScanner::new( + schema.clone(), + metadata, + base_stats, + miss_excluded_count.clone(), + ) + .with_file_batches(file_batches), + ); + let miss_scan = Arc::new(RegionScanExec::new(miss_scanner, ScanRequest::default(), None).unwrap()); + let miss_aggr_expr = Arc::new(build_test_aggr_expr(true, false, false).unwrap()); + let miss_plan: Arc = Arc::new( + AggregateExec::try_new( + AggregateMode::Single, + PhysicalGroupBy::new_single(vec![]), + vec![miss_aggr_expr], + vec![None], + miss_scan, + schema.arrow_schema().clone(), + ) + .unwrap(), + ); + + let optimized_miss = AggregateStats::do_optimize(miss_plan).unwrap(); + assert!(!optimized_plan_uses_stats_union(&optimized_miss)); + assert_eq!(miss_excluded_count.load(Ordering::Relaxed), 0); + + let miss_explain = displayable(optimized_miss.as_ref()).indent(true).to_string(); + assert!(!miss_explain.contains("aggregate_stats: rewritten=true")); +} diff --git a/src/query/src/query_engine/state.rs b/src/query/src/query_engine/state.rs index a45fc4c8966b..a680a79ad550 100644 --- a/src/query/src/query_engine/state.rs +++ b/src/query/src/query_engine/state.rs @@ -59,6 +59,7 @@ use crate::dist_plan::{ }; use crate::metrics::{QUERY_MEMORY_POOL_REJECTED_TOTAL, QUERY_MEMORY_POOL_USAGE_BYTES}; use crate::optimizer::ExtensionAnalyzerRule; +use crate::optimizer::aggr_stats::AggregateStats; use crate::optimizer::constant_term::MatchesConstantTermOptimizer; use crate::optimizer::count_wildcard::CountWildcardToTimeIndexRule; use crate::optimizer::parallelize_scan::ParallelizeScan; @@ -175,17 +176,18 @@ impl QueryEngineState { // add physical optimizer let mut physical_optimizer = PhysicalOptimizer::new(); + physical_optimizer.rules.insert(5, Arc::new(AggregateStats)); // Change TableScan's partition right before enforcing distribution physical_optimizer .rules - .insert(5, Arc::new(ParallelizeScan)); + .insert(6, Arc::new(ParallelizeScan)); // Pass distribution requirement to MergeScanExec to avoid unnecessary shuffling physical_optimizer .rules - .insert(6, Arc::new(PassDistribution)); + .insert(7, Arc::new(PassDistribution)); // Enforce sorting AFTER custom rules that modify the plan structure physical_optimizer.rules.insert( - 7, + 8, Arc::new(datafusion::physical_optimizer::enforce_sorting::EnforceSorting {}), ); // Add rule for windowed sort diff --git a/src/store-api/src/lib.rs b/src/store-api/src/lib.rs index 4df594fc6774..ccb3abc8f0be 100644 --- a/src/store-api/src/lib.rs +++ b/src/store-api/src/lib.rs @@ -26,6 +26,7 @@ pub mod mito_engine_options; pub mod path_utils; pub mod region_engine; pub mod region_request; +pub mod scan_stats; pub mod sst_entry; pub mod storage; diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index b3f460d01d49..3a79eba033da 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -22,7 +22,8 @@ use std::sync::{Arc, Mutex}; use api::greptime_proto::v1::meta::{GrantedRegion as PbGrantedRegion, RegionRole as PbRegionRole}; use api::region::RegionResponse; use async_trait::async_trait; -use common_error::ext::BoxedError; +use common_error::ext::{BoxedError, PlainError}; +use common_error::status_code::StatusCode; use common_recordbatch::{EmptyRecordBatchStream, MemoryPermit, SendableRecordBatchStream}; use common_time::Timestamp; use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet; @@ -37,6 +38,7 @@ use crate::metadata::RegionMetadataRef; use crate::region_request::{ BatchRegionDdlRequest, RegionCatchupRequest, RegionOpenRequest, RegionRequest, }; +use crate::scan_stats::RegionScanStats; use crate::storage::{FileId, RegionId, ScanRequest, SequenceNumber}; /// The settable region role state. @@ -387,6 +389,13 @@ pub struct PrepareRequest { pub distinguish_partition_range: Option, /// The expected number of target partitions. pub target_partitions: Option, + /// SST file ordinals to exclude from the current scan input. + /// + /// IMPORTANT: this is not a lightweight property toggle. Scanner implementations + /// may need to rebuild their `ScanInput`-derived runtime state (`StreamContext`, + /// `Pruner`, partition ranges, and similar caches) before applying other prepare + /// settings. + pub excluded_file_ordinals: Option>, } impl PrepareRequest { @@ -407,6 +416,24 @@ impl PrepareRequest { self.target_partitions = Some(target_partitions); self } + + /// Sets SST file ordinals to exclude from the current scan input. + pub fn with_excluded_file_ordinals(mut self, excluded_file_ordinals: Vec) -> Self { + self.excluded_file_ordinals = Some(excluded_file_ordinals); + self + } + + pub fn validate(&self) -> Result<(), BoxedError> { + if self.ranges.is_some() && self.excluded_file_ordinals.is_some() { + return Err(BoxedError::new(PlainError::new( + "PrepareRequest does not allow mixing ranges with excluded_file_ordinals" + .to_string(), + StatusCode::InvalidArguments, + ))); + } + + Ok(()) + } } /// Necessary context of the query for the scanner. @@ -432,9 +459,11 @@ pub trait RegionScanner: Debug + DisplayAs + Send { /// Returns the metadata of the region. fn metadata(&self) -> RegionMetadataRef; - /// Prepares the scanner with the given partition ranges. + /// Prepares the scanner with planner-side overrides. /// - /// This method is for the planner to adjust the scanner's behavior based on the partition ranges. + /// IMPORTANT: some requests only tweak properties, but others (such as + /// `excluded_file_ordinals`) require the scanner to rebuild any runtime state derived + /// from the current `ScanInput` before applying the rest of the request. fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError>; /// Scans the partition and returns a stream of record batches. @@ -451,6 +480,11 @@ pub trait RegionScanner: Debug + DisplayAs + Send { /// Check if there is any predicate exclude region partition exprs that may be executed in this scanner. fn has_predicate_without_region(&self) -> bool; + /// Returns file-level scan statistics for the current scanner input when available. + fn scan_input_stats(&self) -> Result, BoxedError> { + Ok(None) + } + /// Add the given dynamic filter expressions to the predicate of the scanner. /// Returns a vector of booleans indicating which filter expressions were applied. /// true indicates the filter expression was applied(will be use by scanner to prune by stat for row group), @@ -468,6 +502,32 @@ pub type RegionScannerRef = Box; pub type BatchResponses = Vec<(RegionId, Result)>; +#[cfg(test)] +mod tests { + use common_time::Timestamp; + + use super::{PartitionRange, PrepareRequest}; + + #[test] + fn test_prepare_request_rejects_ranges_and_excluded_file_ordinals_together() { + let err = PrepareRequest::default() + .with_ranges(vec![vec![PartitionRange { + start: Timestamp::new_millisecond(0), + end: Timestamp::new_millisecond(1), + num_rows: 1, + identifier: 0, + }]]) + .with_excluded_file_ordinals(vec![1]) + .validate() + .unwrap_err(); + + assert!( + err.to_string() + .contains("does not allow mixing ranges with excluded_file_ordinals") + ); + } +} + /// Represents the statistics of a region. #[derive(Debug, Deserialize, Serialize, Default)] pub struct RegionStatistic { diff --git a/src/store-api/src/scan_stats.rs b/src/store-api/src/scan_stats.rs new file mode 100644 index 000000000000..51e62f418c94 --- /dev/null +++ b/src/store-api/src/scan_stats.rs @@ -0,0 +1,49 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Trusted, scan-derived statistics exposed by region scanners for higher-level optimizations. +//! +//! These are not raw storage-format statistics. They are conservative summaries that scanner +//! implementations can safely expose for optimizer consumption. + +use std::collections::HashMap; + +use common_time::Timestamp; +use datatypes::value::Value; + +use crate::storage::FileId; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct RegionScanColumnStats { + pub min_value: Option, + pub max_value: Option, + pub exact_non_null_rows: Option, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct RegionScanFileStats { + /// Stable file id for explainability and cross-plan correlation. + pub file_id: FileId, + /// Stable file ordinal within one `RegionScanStats` snapshot. + pub file_ordinal: usize, + pub exact_num_rows: Option, + pub time_range: Option<(Timestamp, Timestamp)>, + pub field_stats: HashMap, + pub partition_expr_matches_region: bool, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct RegionScanStats { + pub files: Vec, +} diff --git a/src/table/src/table/scan.rs b/src/table/src/table/scan.rs index e2d8f794da34..44ef5c1867a4 100644 --- a/src/table/src/table/scan.rs +++ b/src/table/src/table/scan.rs @@ -48,7 +48,8 @@ use store_api::metric_engine_consts::DATA_SCHEMA_TSID_COLUMN_NAME; use store_api::region_engine::{ PartitionRange, PrepareRequest, QueryScanContext, RegionScannerRef, }; -use store_api::storage::{ScanRequest, TimeSeriesDistribution}; +use store_api::scan_stats::RegionScanStats; +use store_api::storage::{FileId, ScanRequest, TimeSeriesDistribution}; use crate::table::metrics::StreamMetrics; @@ -66,10 +67,16 @@ pub struct RegionScanExec { is_partition_set: bool, // TODO(ruihang): handle TimeWindowed dist via this parameter distribution: Option, + aggr_stats_explain: Option, explain_verbose: bool, query_memory_permit: Option>, } +#[derive(Debug, Clone)] +pub struct AggregateStatsExplain { + pub stats_file_ids: Vec, +} + impl std::fmt::Debug for RegionScanExec { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("RegionScanExec") @@ -82,6 +89,7 @@ impl std::fmt::Debug for RegionScanExec { .field("total_rows", &self.total_rows) .field("is_partition_set", &self.is_partition_set) .field("distribution", &self.distribution) + .field("aggr_stats_explain", &self.aggr_stats_explain) .field("explain_verbose", &self.explain_verbose) .finish() } @@ -225,6 +233,7 @@ impl RegionScanExec { total_rows, is_partition_set: false, distribution: request.distribution, + aggr_stats_explain: None, explain_verbose: false, query_memory_permit, }) @@ -298,15 +307,80 @@ impl RegionScanExec { total_rows: self.total_rows, is_partition_set: true, distribution: self.distribution, + aggr_stats_explain: self.aggr_stats_explain.clone(), explain_verbose: self.explain_verbose, query_memory_permit: self.query_memory_permit.clone(), }) } + pub fn with_excluded_file_ordinals( + &self, + excluded_file_ordinals: Vec, + ) -> Result { + { + let mut scanner = self.scanner.lock().unwrap(); + scanner.prepare( + PrepareRequest::default().with_excluded_file_ordinals(excluded_file_ordinals), + )?; + } + + Ok(Self { + scanner: self.scanner.clone(), + arrow_schema: self.arrow_schema.clone(), + output_ordering: self.output_ordering.clone(), + metric: self.metric.clone(), + properties: self.properties.clone(), + append_mode: self.append_mode, + total_rows: self.total_rows, + is_partition_set: self.is_partition_set, + distribution: self.distribution, + aggr_stats_explain: self.aggr_stats_explain.clone(), + explain_verbose: self.explain_verbose, + query_memory_permit: self.query_memory_permit.clone(), + }) + } + + pub fn with_aggregate_stats_explain(&self, explain: AggregateStatsExplain) -> Self { + Self { + scanner: self.scanner.clone(), + arrow_schema: self.arrow_schema.clone(), + output_ordering: self.output_ordering.clone(), + metric: self.metric.clone(), + properties: self.properties.clone(), + append_mode: self.append_mode, + total_rows: self.total_rows, + is_partition_set: self.is_partition_set, + distribution: self.distribution, + aggr_stats_explain: Some(explain), + explain_verbose: self.explain_verbose, + query_memory_permit: self.query_memory_permit.clone(), + } + } + pub fn distribution(&self) -> Option { self.distribution } + pub fn append_mode(&self) -> bool { + self.append_mode + } + + pub fn total_rows(&self) -> usize { + self.total_rows + } + + pub fn has_predicate_without_region(&self) -> bool { + self.scanner.lock().unwrap().has_predicate_without_region() + } + + pub fn scan_input_stats(&self) -> DfResult> { + self.scanner + .lock() + .unwrap() + .scan_input_stats() + .map_err(|err| DataFusionError::External(err.into())) + } + pub fn with_distinguish_partition_range(&self, distinguish_partition_range: bool) { let mut scanner = self.scanner.lock().unwrap(); // set distinguish_partition_range won't fail @@ -473,7 +547,27 @@ impl DisplayAs for RegionScanExec { fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result { // The scanner contains all information needed to display the plan. match self.scanner.try_lock() { - Ok(scanner) => scanner.fmt_as(t, f), + Ok(scanner) => { + scanner.fmt_as(t, f)?; + if let Some(explain) = self.aggr_stats_explain.as_ref() { + write!( + f, + ", aggregate_stats: rewritten=true, stats_files={}", + explain.stats_file_ids.len(), + )?; + + if matches!(t, DisplayFormatType::Verbose) { + let stats_file_ids = explain + .stats_file_ids + .iter() + .map(ToString::to_string) + .collect::>() + .join(", "); + write!(f, ", stats_file_ids=[{}]", stats_file_ids)?; + } + } + Ok(()) + } Err(_) => write!(f, "RegionScanExec "), } }