diff --git a/src/promql/benches/bench_histogram_fold.rs b/src/promql/benches/bench_histogram_fold.rs new file mode 100644 index 000000000000..584c990804dd --- /dev/null +++ b/src/promql/benches/bench_histogram_fold.rs @@ -0,0 +1,184 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Benchmarks for HistogramFold plan. + +use std::sync::Arc; + +use criterion::{BenchmarkId, Criterion, criterion_group}; +use datafusion::arrow::array::Float64Array; +use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit}; +use datafusion::arrow::record_batch::RecordBatch; +use datafusion::common::ToDFSchema; +use datafusion::datasource::memory::MemorySourceConfig; +use datafusion::datasource::source::DataSourceExec; +use datafusion::logical_expr::{EmptyRelation, LogicalPlan}; +use datafusion::physical_plan::ExecutionPlan; +use datafusion::prelude::SessionContext; +use datatypes::arrow::array::TimestampMillisecondArray; +use datatypes::arrow_array::StringArray; +use promql::extension_plan::HistogramFold; + +/// Standard Prometheus histogram bucket bounds. +const STANDARD_BUCKETS: &[&str] = &[ + "0.005", "0.01", "0.025", "0.05", "0.1", "0.25", "0.5", "1", "2.5", "5", "10", "+Inf", +]; + +/// Build histogram data: `num_series` series, `num_timestamps` timestamps each, +/// each having `bucket_bounds.len()` buckets. +fn build_histogram_input( + num_series: usize, + num_timestamps: usize, + bucket_bounds: &[&str], +) -> Arc { + let schema = Arc::new(Schema::new(vec![ + Field::new("host", DataType::Utf8, true), + Field::new("le", DataType::Utf8, true), + Field::new("val", DataType::Float64, true), + Field::new("ts", DataType::Timestamp(TimeUnit::Millisecond, None), true), + ])); + + let bucket_count = bucket_bounds.len(); + let total_rows = num_series * num_timestamps * bucket_count; + + let mut hosts = Vec::with_capacity(total_rows); + let mut les = Vec::with_capacity(total_rows); + let mut vals = Vec::with_capacity(total_rows); + let mut timestamps = Vec::with_capacity(total_rows); + + for s in 0..num_series { + let host = format!("host_{}", s); + for t in 0..num_timestamps { + let ts = (s * num_timestamps + t) as i64 * 15_000; // 15s intervals + let mut cumulative = 0.0; + for (b, _le_str) in bucket_bounds.iter().enumerate() { + hosts.push(host.clone()); + les.push(bucket_bounds[b].to_string()); + timestamps.push(ts); + // Monotonically increasing counters per bucket + cumulative += (b + 1) as f64 * 10.0 + (t % 5) as f64; + vals.push(cumulative); + } + } + } + + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(StringArray::from(hosts)) as _, + Arc::new(StringArray::from(les)) as _, + Arc::new(Float64Array::from(vals)) as _, + Arc::new(TimestampMillisecondArray::from(timestamps)) as _, + ], + ) + .unwrap(); + + Arc::new(DataSourceExec::new(Arc::new( + MemorySourceConfig::try_new(&[vec![batch]], schema, None).unwrap(), + ))) +} + +fn build_exec(input: Arc, quantile: f64) -> Arc { + let logical_input = LogicalPlan::EmptyRelation(EmptyRelation { + produce_one_row: false, + schema: Arc::new(input.schema().as_ref().clone().to_dfschema().unwrap()), + }); + + HistogramFold::new( + "le".to_string(), + "val".to_string(), + "ts".to_string(), + quantile, + logical_input, + ) + .unwrap() + .to_execution_plan(input) +} + +fn run(exec: Arc, rt: &tokio::runtime::Runtime) { + let ctx = SessionContext::default(); + rt.block_on(async { + let result = datafusion::physical_plan::collect(exec, ctx.task_ctx()) + .await + .unwrap(); + std::hint::black_box(result); + }); +} + +fn bench_histogram_fold(c: &mut Criterion) { + let mut group = c.benchmark_group("histogram_fold"); + let rt = tokio::runtime::Runtime::new().unwrap(); + + // Small: 10 series × 10 timestamps × 12 buckets = 1,200 rows → 100 output + { + let input = build_histogram_input(10, 10, STANDARD_BUCKETS); + let exec = build_exec(input, 0.9); + group.bench_with_input(BenchmarkId::new("small", "10s_10t_12b"), &(), |b, _| { + b.iter(|| run(exec.clone(), &rt)) + }); + } + + // Medium: 50 series × 100 timestamps × 12 buckets = 60,000 rows → 5,000 output + { + let input = build_histogram_input(50, 100, STANDARD_BUCKETS); + let exec = build_exec(input, 0.9); + group.bench_with_input(BenchmarkId::new("medium", "50s_100t_12b"), &(), |b, _| { + b.iter(|| run(exec.clone(), &rt)) + }); + } + + // Large: 100 series × 500 timestamps × 12 buckets = 600,000 rows → 50,000 output + { + let input = build_histogram_input(100, 500, STANDARD_BUCKETS); + let exec = build_exec(input, 0.9); + group.bench_with_input(BenchmarkId::new("large", "100s_500t_12b"), &(), |b, _| { + b.iter(|| run(exec.clone(), &rt)) + }); + } + + // Many buckets: 10 series × 100 timestamps × 50 buckets = 50,000 rows + { + let mut many_buckets: Vec<&str> = Vec::with_capacity(50); + let bucket_strs: Vec = (0..49) + .map(|i| format!("{}", (i + 1) as f64 * 0.5)) + .collect(); + for s in &bucket_strs { + many_buckets.push(s.as_str()); + } + many_buckets.push("+Inf"); + + let input = build_histogram_input(10, 100, &many_buckets); + let exec = build_exec(input, 0.5); + group.bench_with_input( + BenchmarkId::new("many_buckets", "10s_100t_50b"), + &(), + |b, _| b.iter(|| run(exec.clone(), &rt)), + ); + } + + // Different quantiles on medium data + for &q in &[0.1, 0.5, 0.9, 0.99] { + let input = build_histogram_input(50, 100, STANDARD_BUCKETS); + let exec = build_exec(input, q); + group.bench_with_input( + BenchmarkId::new("quantile", format!("q{:.2}", q)), + &(), + |b, _| b.iter(|| run(exec.clone(), &rt)), + ); + } + + group.finish(); +} + +criterion_group!(benches, bench_histogram_fold); diff --git a/src/promql/benches/bench_main.rs b/src/promql/benches/bench_main.rs index 2d93887041d6..c359d26100eb 100644 --- a/src/promql/benches/bench_main.rs +++ b/src/promql/benches/bench_main.rs @@ -14,8 +14,10 @@ use criterion::criterion_main; +mod bench_histogram_fold; mod bench_range_fn; criterion_main! { - bench_range_fn::benches + bench_range_fn::benches, + bench_histogram_fold::benches, } diff --git a/src/promql/src/extension_plan/histogram_fold.rs b/src/promql/src/extension_plan/histogram_fold.rs index 15dd5f7c8c66..88b13a8dd176 100644 --- a/src/promql/src/extension_plan/histogram_fold.rs +++ b/src/promql/src/extension_plan/histogram_fold.rs @@ -20,8 +20,8 @@ use std::task::Poll; use std::time::Instant; use common_telemetry::warn; -use datafusion::arrow::array::{Array, AsArray, StringArray}; -use datafusion::arrow::compute::{SortOptions, concat_batches}; +use datafusion::arrow::array::{Array, AsArray, Float64Array, StringArray, UInt32Array}; +use datafusion::arrow::compute::{self as arrow_compute, SortOptions, concat_batches}; use datafusion::arrow::datatypes::{DataType, Float64Type, SchemaRef}; use datafusion::arrow::record_batch::RecordBatch; use datafusion::common::stats::Precision; @@ -543,6 +543,10 @@ impl ExecutionPlan for HistogramFoldExec { self.le_column_index, )?, output_buffered_rows: 0, + cached_le_parsed: None, + counters_buf: Vec::new(), + optimistic_output_batches: Vec::new(), + optimistic_output_rows: 0, })) } @@ -606,6 +610,11 @@ pub struct HistogramFoldStream { output_buffer: Vec>, output_buffered_rows: usize, + cached_le_parsed: Option>, + counters_buf: Vec, + optimistic_output_batches: Vec, + optimistic_output_rows: usize, + // runtime things input: SendableRecordBatchStream, metric: BaselineMetrics, @@ -750,64 +759,93 @@ impl HistogramFoldStream { Ok(None) } - /// Fold record batches from input buffer and put to output buffer fn fold_buf(&mut self, bucket_num: usize) -> DataFusionResult<()> { let batch = concat_batches(&self.input_schema, self.input_buffer.drain(..).as_ref())?; let mut remaining_rows = self.input_buffered_rows; let mut cursor = 0; - // TODO(LFC): Try to get rid of the Arrow array to vector conversion here. - let vectors = Helper::try_into_vectors(batch.columns()) - .map_err(|e| DataFusionError::Execution(e.to_string()))?; - let le_array = batch.column(self.le_column_index); - let le_array = le_array.as_string::(); - let field_array = batch.column(self.field_column_index); - let field_array = field_array.as_primitive::(); - let mut tag_values_buf = Vec::with_capacity(self.normal_indices.len()); + let le_array = batch.column(self.le_column_index).as_string::(); + let field_array = batch + .column(self.field_column_index) + .as_primitive::(); + let field_values = field_array.values(); + let field_no_nulls = field_array.null_count() == 0; + + if self.cached_le_parsed.is_none() && remaining_rows >= bucket_num { + let parsed = Self::parse_le_values(le_array, cursor, bucket_num); + self.cached_le_parsed = Some(parsed); + self.counters_buf.resize(bucket_num, 0.0); + } + let cached_bucket = self + .cached_le_parsed + .as_ref() + .expect("cached bucket must be initialized before optimistic folding"); + + let est_groups = remaining_rows / bucket_num; + let mut take_indices: Vec = Vec::with_capacity(est_groups); + let mut field_results: Vec = Vec::with_capacity(est_groups); while remaining_rows >= bucket_num && self.mode == FoldMode::Optimistic { - self.collect_tag_values(&vectors, cursor, &mut tag_values_buf); - if !self.validate_optimistic_group( - &vectors, - le_array, - cursor, - bucket_num, - &tag_values_buf, - ) { - let remaining_input_batch = batch.slice(cursor, remaining_rows); - self.switch_to_safe_mode(remaining_input_batch)?; - return Ok(()); + if !Self::is_positive_infinity(le_array, cursor + bucket_num - 1) { + break; } - // "sample" normal columns - for (idx, value) in self.normal_indices.iter().zip(tag_values_buf.iter()) { - self.output_buffer[*idx].push_value_ref(value); - } - // "fold" `le` and field columns - let mut bucket = Vec::with_capacity(bucket_num); - let mut counters = Vec::with_capacity(bucket_num); - for bias in 0..bucket_num { - let position = cursor + bias; - let le = if le_array.is_valid(position) { - le_array.value(position).parse::().unwrap_or(f64::NAN) - } else { - f64::NAN - }; - bucket.push(le); + take_indices.push(cursor as u32); - let counter = if field_array.is_valid(position) { - field_array.value(position) - } else { - f64::NAN - }; - counters.push(counter); + let counters = &mut self.counters_buf; + if field_no_nulls { + counters[..bucket_num].copy_from_slice(&field_values[cursor..cursor + bucket_num]); + } else { + for (bias, counter) in counters.iter_mut().enumerate().take(bucket_num) { + let position = cursor + bias; + *counter = if field_array.is_valid(position) { + field_values[position] + } else { + f64::NAN + }; + } } - // ignore invalid data - let result = Self::evaluate_row(self.quantile, &bucket, &counters).unwrap_or(f64::NAN); - self.output_buffer[self.field_column_index].push_value_ref(&ValueRef::from(result)); + + let result = Self::evaluate_row_fast(self.quantile, cached_bucket, counters); + field_results.push(result); + cursor += bucket_num; remaining_rows -= bucket_num; - self.output_buffered_rows += 1; + } + + if !take_indices.is_empty() { + let num_groups = take_indices.len(); + let indices = UInt32Array::from(take_indices); + let field_col: Arc = Arc::new(Float64Array::from(field_results)); + let mut columns: Vec> = + Vec::with_capacity(self.output_schema.fields().len()); + + let input_field_count = batch.num_columns(); + for input_idx in 0..input_field_count { + if input_idx == self.le_column_index { + continue; // le is removed from output schema + } + if input_idx == self.field_column_index { + columns.push(field_col.clone()); + } else { + columns.push(arrow_compute::take( + batch.column(input_idx), + &indices, + None, + )?); + } + } + + let output_batch = RecordBatch::try_new(self.output_schema.clone(), columns) + .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))?; + self.optimistic_output_rows += num_groups; + self.optimistic_output_batches.push(output_batch); + } + + if remaining_rows >= bucket_num && self.mode == FoldMode::Optimistic { + let remaining_input_batch = batch.slice(cursor, remaining_rows); + self.switch_to_safe_mode(remaining_input_batch)?; + return Ok(()); } let remaining_input_batch = batch.slice(cursor, remaining_rows); @@ -825,7 +863,7 @@ impl HistogramFoldStream { } fn maybe_take_output(&mut self) -> DataFusionResult>> { - if self.output_buffered_rows >= self.batch_size { + if self.output_buffered_rows + self.optimistic_output_rows >= self.batch_size { return Ok(self.take_output_buf()?.map(Ok)); } Ok(None) @@ -857,28 +895,17 @@ impl HistogramFoldStream { } } - fn validate_optimistic_group( - &self, - vectors: &[VectorRef], - le_array: &StringArray, - cursor: usize, - bucket_num: usize, - tag_values: &[ValueRef<'_>], - ) -> bool { - let inf_index = cursor + bucket_num - 1; - if !Self::is_positive_infinity(le_array, inf_index) { - return false; - } - - for offset in 1..bucket_num { - let row = cursor + offset; - for (idx, expected) in self.normal_indices.iter().zip(tag_values.iter()) { - if vectors[*idx].get_ref(row) != *expected { - return false; - } - } + fn parse_le_values(le_array: &StringArray, start: usize, bucket_num: usize) -> Vec { + let mut parsed = Vec::with_capacity(bucket_num); + for bias in 0..bucket_num { + let position = start + bias; + parsed.push(if le_array.is_valid(position) { + le_array.value(position).parse::().unwrap_or(f64::NAN) + } else { + f64::NAN + }); } - true + parsed } /// Checks whether a row belongs to the current group (same series). @@ -988,9 +1015,9 @@ impl HistogramFoldStream { .all(|(group, now)| group.as_value_ref() == *now) } - /// Compute result from output buffer fn take_output_buf(&mut self) -> DataFusionResult> { - if self.output_buffered_rows == 0 { + let total_rows = self.output_buffered_rows + self.optimistic_output_rows; + if total_rows == 0 { if self.input_buffered_rows != 0 { warn!( "input buffer is not empty, {} rows remaining", @@ -1000,19 +1027,35 @@ impl HistogramFoldStream { return Ok(None); } - let mut output_buf = Self::empty_output_buffer(&self.output_schema, self.le_column_index)?; - std::mem::swap(&mut self.output_buffer, &mut output_buf); - let mut columns = Vec::with_capacity(output_buf.len()); - for builder in output_buf.iter_mut() { - columns.push(builder.to_vector().to_arrow_array()); + let mut all_batches: Vec = Vec::new(); + + if !self.optimistic_output_batches.is_empty() { + all_batches.append(&mut self.optimistic_output_batches); + self.optimistic_output_rows = 0; + } + + if self.output_buffered_rows > 0 { + let mut output_buf = + Self::empty_output_buffer(&self.output_schema, self.le_column_index)?; + std::mem::swap(&mut self.output_buffer, &mut output_buf); + let mut columns = Vec::with_capacity(output_buf.len()); + for builder in output_buf.iter_mut() { + columns.push(builder.to_vector().to_arrow_array()); + } + columns.remove(self.le_column_index); + self.output_buffered_rows = 0; + let batch = RecordBatch::try_new(self.output_schema.clone(), columns) + .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))?; + all_batches.push(batch); } - // remove the placeholder column for `le` - columns.remove(self.le_column_index); - self.output_buffered_rows = 0; - RecordBatch::try_new(self.output_schema.clone(), columns) - .map(Some) - .map_err(|e| DataFusionError::ArrowError(Box::new(e), None)) + if all_batches.len() == 1 { + Ok(Some(all_batches.into_iter().next().unwrap())) + } else { + concat_batches(&self.output_schema, &all_batches) + .map(Some) + .map_err(Into::into) + } } fn flush_remaining(&mut self) -> DataFusionResult<()> { @@ -1034,12 +1077,62 @@ impl HistogramFoldStream { Ok(()) } + #[inline] fn is_positive_infinity(le_array: &StringArray, index: usize) -> bool { - le_array.is_valid(index) - && matches!( - le_array.value(index).parse::(), - Ok(value) if value.is_infinite() && value.is_sign_positive() - ) + le_array.is_valid(index) && { + let value = le_array.value(index); + value.eq_ignore_ascii_case("+inf") + || value.eq_ignore_ascii_case("inf") + || value.eq_ignore_ascii_case("+infinity") + || value.eq_ignore_ascii_case("infinity") + } + } + + #[inline] + fn evaluate_row_fast(quantile: f64, bucket: &[f64], counter: &mut [f64]) -> f64 { + if bucket.len() <= 1 || bucket.len() != counter.len() { + return f64::NAN; + } + if quantile < 0.0 { + return f64::NEG_INFINITY; + } + if quantile > 1.0 { + return f64::INFINITY; + } + if quantile.is_nan() { + return f64::NAN; + } + + let mut prev = 0.0f64; + for v in counter.iter_mut() { + if !v.is_finite() || *v < prev { + *v = prev; + } + prev = *v; + } + + let total = *counter.last().unwrap(); + let expected_pos = total * quantile; + let fit_bucket_pos = counter.partition_point(|&c| c < expected_pos); + if fit_bucket_pos >= bucket.len() - 1 { + bucket[bucket.len() - 2] + } else { + let upper_bound = bucket[fit_bucket_pos]; + let upper_count = counter[fit_bucket_pos]; + let mut lower_bound = bucket[0].min(0.0); + let mut lower_count = 0.0; + if fit_bucket_pos > 0 { + lower_bound = bucket[fit_bucket_pos - 1]; + lower_count = counter[fit_bucket_pos - 1]; + } + if (upper_count - lower_count).abs() < 1e-10 { + f64::NAN + } else { + lower_bound + + (upper_bound - lower_bound) / (upper_count - lower_count) + * (expected_pos - lower_count) + } + } } /// Evaluate the field column and return the result @@ -1093,10 +1186,8 @@ impl HistogramFoldStream { let total = *counter.last().unwrap(); let expected_pos = total * quantile; - let mut fit_bucket_pos = 0; - while fit_bucket_pos < bucket.len() && counter[fit_bucket_pos] < expected_pos { - fit_bucket_pos += 1; - } + // Binary search for the bucket whose counter >= expected_pos + let fit_bucket_pos = counter.partition_point(|&c| c < expected_pos); if fit_bucket_pos >= bucket.len() - 1 { Ok(bucket[bucket.len() - 2]) } else { @@ -1210,6 +1301,8 @@ mod test { schema: SchemaRef, quantile: f64, ts_column_index: usize, + le_column_index: usize, + field_column_index: usize, ) -> Arc { let input: Arc = Arc::new(DataSourceExec::new(Arc::new( MemorySourceConfig::try_new(&[batches], schema.clone(), None).unwrap(), @@ -1221,12 +1314,17 @@ mod test { .clone(), ); - let (tag_columns, partition_exprs, properties) = - build_test_plan_properties(&input, output_schema.clone(), ts_column_index); + let (tag_columns, partition_exprs, properties) = build_test_plan_properties( + &input, + output_schema.clone(), + ts_column_index, + le_column_index, + field_column_index, + ); Arc::new(HistogramFoldExec { - le_column_index: 1, - field_column_index: 2, + le_column_index, + field_column_index, quantile, ts_column_index, input, @@ -1248,6 +1346,8 @@ mod test { input: &Arc, output_schema: SchemaRef, ts_column_index: usize, + le_column_index: usize, + field_column_index: usize, ) -> PlanPropsResult { let tag_columns = input .schema() @@ -1255,7 +1355,7 @@ mod test { .iter() .enumerate() .filter_map(|(idx, field)| { - if idx == 1 || idx == 2 || idx == ts_column_index { + if idx == le_column_index || idx == field_column_index || idx == ts_column_index { None } else { Some(Arc::new(PhyColumn::new(field.name(), idx)) as _) @@ -1298,7 +1398,7 @@ mod test { .clone(), ); let (tag_columns, partition_exprs, properties) = - build_test_plan_properties(&memory_exec, output_schema.clone(), 0); + build_test_plan_properties(&memory_exec, output_schema.clone(), 0, 1, 2); let fold_exec = Arc::new(HistogramFoldExec { le_column_index: 1, field_column_index: 2, @@ -1462,7 +1562,7 @@ mod test { let val_column = Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 3.0, 1.0, 5.0])) as _; let batch = RecordBatch::try_new(schema.clone(), vec![host_column, le_column, val_column]).unwrap(); - let fold_exec = build_fold_exec_from_batches(vec![batch], schema, 0.5, 0); + let fold_exec = build_fold_exec_from_batches(vec![batch], schema, 0.5, 0, 1, 2); let session_context = SessionContext::default(); let result = datafusion::physical_plan::collect(fold_exec, session_context.task_ctx()) .await @@ -1495,7 +1595,7 @@ mod test { let val_column = Arc::new(Float64Array::from(vec![1.0, 2.0])) as _; let batch = RecordBatch::try_new(schema.clone(), vec![host_column, le_column, val_column]).unwrap(); - let fold_exec = build_fold_exec_from_batches(vec![batch], schema, 0.9, 0); + let fold_exec = build_fold_exec_from_batches(vec![batch], schema, 0.9, 0, 1, 2); let session_context = SessionContext::default(); let result = datafusion::physical_plan::collect(fold_exec, session_context.task_ctx()) .await @@ -1514,6 +1614,38 @@ mod test { assert_eq!(result_literal, expected); } + #[tokio::test] + async fn optimistic_single_inf_bucket_emits_nan() { + let schema = Arc::new(Schema::new(vec![ + Field::new("host", DataType::Utf8, true), + Field::new("le", DataType::Utf8, true), + Field::new("val", DataType::Float64, true), + ])); + let host_column = Arc::new(StringArray::from(vec!["a", "b"])) as _; + let le_column = Arc::new(StringArray::from(vec!["+Inf", "+Inf"])) as _; + let val_column = Arc::new(Float64Array::from(vec![2.0, 5.0])) as _; + let batch = + RecordBatch::try_new(schema.clone(), vec![host_column, le_column, val_column]).unwrap(); + let fold_exec = build_fold_exec_from_batches(vec![batch], schema, 0.5, 0, 1, 2); + let session_context = SessionContext::default(); + let result = datafusion::physical_plan::collect(fold_exec, session_context.task_ctx()) + .await + .unwrap(); + let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result) + .unwrap() + .to_string(); + + let expected = String::from( + "+------+-----+\n\ +| host | val |\n\ ++------+-----+\n\ +| a | NaN |\n\ +| b | NaN |\n\ ++------+-----+", + ); + assert_eq!(result_literal, expected); + } + #[tokio::test] async fn safe_mode_handles_misaligned_groups() { let schema = Arc::new(Schema::new(vec![ @@ -1534,7 +1666,7 @@ mod test { ])) as _; let batch = RecordBatch::try_new(schema.clone(), vec![ts_column, le_column, val_column]).unwrap(); - let fold_exec = build_fold_exec_from_batches(vec![batch], schema, 0.5, 0); + let fold_exec = build_fold_exec_from_batches(vec![batch], schema, 0.5, 0, 1, 2); let session_context = SessionContext::default(); let result = datafusion::physical_plan::collect(fold_exec, session_context.task_ctx()) .await @@ -1574,7 +1706,7 @@ mod test { let batch = RecordBatch::try_new(schema.clone(), vec![ts_column, le_column, val_column]).unwrap(); - let fold_exec = build_fold_exec_from_batches(vec![batch], schema, 0.5, 0); + let fold_exec = build_fold_exec_from_batches(vec![batch], schema, 0.5, 0, 1, 2); let session_context = SessionContext::default(); let result = datafusion::physical_plan::collect(fold_exec, session_context.task_ctx()) .await @@ -1612,7 +1744,7 @@ mod test { ])) as _; let batch = RecordBatch::try_new(schema.clone(), vec![ts_column, le_column, val_column]).unwrap(); - let fold_exec = build_fold_exec_from_batches(vec![batch], schema, 0.5, 0); + let fold_exec = build_fold_exec_from_batches(vec![batch], schema, 0.5, 0, 1, 2); let session_context = SessionContext::default(); let result = datafusion::physical_plan::collect(fold_exec, session_context.task_ctx()) .await