diff --git a/src/query/src/range_select/plan_rewrite.rs b/src/query/src/range_select/plan_rewrite.rs index 7cf8ef98b576..edb75d77d4e9 100644 --- a/src/query/src/range_select/plan_rewrite.rs +++ b/src/query/src/range_select/plan_rewrite.rs @@ -35,6 +35,7 @@ use datafusion_expr::{ }; use datafusion_optimizer::simplify_expressions::ExprSimplifier; use datatypes::prelude::ConcreteDataType; +use datatypes::schema::TIME_INDEX_KEY; use promql_parser::util::parse_duration; use session::context::QueryContextRef; use snafu::{OptionExt, ResultExt, ensure}; @@ -376,7 +377,7 @@ impl RangePlanRewriter { } .fail(); }; - let (time_index, default_by) = self.get_index_by(input.schema()).await?; + let query_ctx = self.query_ctx.clone(); let mut range_rewriter = RangeExprRewriter { input_plan: &input, align: Duration::default(), @@ -384,13 +385,16 @@ impl RangePlanRewriter { by: vec![], range_fn: BTreeSet::new(), sub_aggr: aggr_plan, - query_ctx: &self.query_ctx, + query_ctx: &query_ctx, }; let new_expr = expr .iter() .map(|expr| expr.clone().rewrite(&mut range_rewriter).map(|x| x.data)) .collect::>>()?; - if range_rewriter.by.is_empty() { + let need_default_by = range_rewriter.by.is_empty(); + let (time_index, default_by) = + self.get_index_by(input.schema(), need_default_by).await?; + if need_default_by { range_rewriter.by = default_by; } let range_select = RangeSelect::try_new( @@ -485,21 +489,49 @@ impl RangePlanRewriter { /// return `(time_index, [row_columns])` to the rewriter. /// If the user does not explicitly use the `by` keyword to indicate time series, /// `[row_columns]` will be use as default time series - async fn get_index_by(&mut self, schema: &Arc) -> Result<(Expr, Vec)> { + async fn get_index_by( + &mut self, + schema: &Arc, + need_default_by: bool, + ) -> Result<(Expr, Vec)> { #[allow(deprecated)] let mut time_index_expr = Expr::Wildcard { qualifier: None, options: Box::new(WildcardOptions::default()), }; let mut default_by = vec![]; + let metadata_time_index_expr = (0..schema.fields().len()).find_map(|i| { + let (qualifier, field) = schema.qualified_field(i); + if field.metadata().contains_key(TIME_INDEX_KEY) + && matches!(field.data_type(), DataType::Timestamp(_, _)) + { + Some(Expr::Column(Column::new( + qualifier.cloned(), + field.name().clone(), + ))) + } else { + None + } + }); for i in 0..schema.fields().len() { let (qualifier, _) = schema.qualified_field(i); if let Some(table_ref) = qualifier { - let table = self - .table_provider - .resolve_table(table_ref.clone()) - .await - .context(CatalogSnafu)? + let table_source = match self.table_provider.resolve_table(table_ref.clone()).await + { + Ok(table_source) => table_source, + Err(error) => { + // TableNotExist may infer this table is a derived table (like from JOIN or set op), + // in this case we can still continue with time index column identified from column + // metadata. + if matches!(&error, catalog::error::Error::TableNotExist { .. }) + && metadata_time_index_expr.is_some() + { + continue; + } + return Err(error).context(CatalogSnafu); + } + }; + let table = table_source .as_any() .downcast_ref::() .context(UnknownTableSnafu)? @@ -537,6 +569,18 @@ impl RangePlanRewriter { } } #[allow(deprecated)] + if matches!(time_index_expr, Expr::Wildcard { .. }) + && let Some(expr) = metadata_time_index_expr + { + ensure!( + !need_default_by, + RangeQuerySnafu { + msg: "Cannot infer default BY columns from derived range query input" + } + ); + time_index_expr = expr; + } + #[allow(deprecated)] if matches!(time_index_expr, Expr::Wildcard { .. }) { TimeIndexNotFoundSnafu { table: schema.to_string(), @@ -614,6 +658,7 @@ mod test { use datatypes::schema::{ColumnSchema, Schema}; use session::context::QueryContext; use table::metadata::{TableInfoBuilder, TableMetaBuilder}; + use table::table::TableRef; use table::test_util::EmptyTable; use super::*; @@ -622,7 +667,45 @@ mod test { use crate::{QueryEngineFactory, QueryEngineRef}; async fn create_test_engine() -> QueryEngineRef { - let table_name = "test".to_string(); + create_test_engine_with_tables(&["test"], false).await + } + + async fn create_union_test_engine() -> QueryEngineRef { + create_test_engine_with_tables(&["test_0", "test_1"], true).await + } + + async fn create_test_engine_with_tables( + table_names: &[&str], + with_extra_timestamp: bool, + ) -> QueryEngineRef { + let catalog_list = MemoryCatalogManager::with_default_setup(); + for (i, table_name) in table_names.iter().enumerate() { + let table = create_test_table(table_name, with_extra_timestamp); + assert!( + catalog_list + .register_table_sync(RegisterTableRequest { + catalog: DEFAULT_CATALOG_NAME.to_string(), + schema: DEFAULT_SCHEMA_NAME.to_string(), + table_name: (*table_name).to_string(), + table_id: 1024 + i as u32, + table, + }) + .is_ok() + ); + } + QueryEngineFactory::new( + catalog_list, + None, + None, + None, + None, + false, + QueryOptions::default(), + ) + .query_engine() + } + + fn create_test_table(table_name: &str, with_extra_timestamp: bool) -> TableRef { let mut columns = vec![]; for i in 0..5 { columns.push(ColumnSchema::new( @@ -639,6 +722,13 @@ mod test { ) .with_time_index(true), ); + if with_extra_timestamp { + columns.push(ColumnSchema::new( + "timestamp_2".to_string(), + ConcreteDataType::timestamp_millisecond_datatype(), + true, + )); + } for i in 0..5 { columns.push(ColumnSchema::new( format!("field_{i}"), @@ -650,38 +740,20 @@ mod test { let table_meta = TableMetaBuilder::empty() .schema(schema) .primary_key_indices((0..5).collect()) - .value_indices((6..11).collect()) + .value_indices(if with_extra_timestamp { + (6..12).collect() + } else { + (6..11).collect() + }) .next_column_id(1024) .build() .unwrap(); let table_info = TableInfoBuilder::default() - .name(&table_name) + .name(table_name) .meta(table_meta) .build() .unwrap(); - let table = EmptyTable::from_table_info(&table_info); - let catalog_list = MemoryCatalogManager::with_default_setup(); - assert!( - catalog_list - .register_table_sync(RegisterTableRequest { - catalog: DEFAULT_CATALOG_NAME.to_string(), - schema: DEFAULT_SCHEMA_NAME.to_string(), - table_name, - table_id: 1024, - table, - }) - .is_ok() - ); - QueryEngineFactory::new( - catalog_list, - None, - None, - None, - None, - false, - QueryOptions::default(), - ) - .query_engine() + EmptyTable::from_table_info(&table_info) } async fn do_query(sql: &str) -> Result { @@ -690,6 +762,12 @@ mod test { engine.planner().plan(&stmt, QueryContext::arc()).await } + async fn do_union_query(sql: &str) -> Result { + let stmt = QueryLanguageParser::parse_sql(sql, &QueryContext::arc()).unwrap(); + let engine = create_union_test_engine().await; + engine.planner().plan(&stmt, QueryContext::arc()).await + } + async fn query_plan_compare(sql: &str, expected: String) { let plan = do_query(sql).await.unwrap(); assert_eq!(plan.display_indent_schema().to_string(), expected); @@ -765,6 +843,40 @@ mod test { query_plan_compare(query, expected).await; } + #[tokio::test] + async fn range_from_union_query() { + let queries = [ + r#"SELECT timestamp, tag_0, avg(field_0) RANGE '5m' + FROM ( + SELECT timestamp, tag_0, field_0, timestamp_2 FROM test_0 + UNION ALL + SELECT timestamp, tag_0, field_0, timestamp_2 FROM test_1 + ) + WHERE timestamp >= '1970-01-01 00:00:00' + ALIGN '1h' by (tag_0)"#, + r#"SELECT tmp.timestamp, tmp.tag_0, avg(tmp.field_0) RANGE '5m' + FROM ( + SELECT timestamp, tag_0, field_0, timestamp_2 FROM test_0 + UNION ALL + SELECT timestamp, tag_0, field_0, timestamp_2 FROM test_1 + ) AS tmp + WHERE tmp.timestamp >= '1970-01-01 00:00:00' + ALIGN '1h' by (tmp.tag_0)"#, + ]; + + for query in queries { + let plan = do_union_query(query) + .await + .unwrap() + .display_indent_schema() + .to_string(); + + assert!(plan.contains("RangeSelect")); + assert!(plan.contains("Union")); + assert!(plan.contains("time_index=timestamp")); + } + } + #[tokio::test] async fn range_in_expr() { let query = r#"SELECT sin(avg(field_0 + field_1) RANGE '5m' + 1) FROM test ALIGN '1h' by (tag_0,tag_1);"#; diff --git a/tests/cases/standalone/common/range/nest.result b/tests/cases/standalone/common/range/nest.result index 0730a0b99774..7bc3e39a7bc3 100644 --- a/tests/cases/standalone/common/range/nest.result +++ b/tests/cases/standalone/common/range/nest.result @@ -45,6 +45,82 @@ SELECT ts, b, min(c) RANGE '5s' FROM (SELECT ts, host AS b, val AS c FROM host W | 1970-01-01T00:00:20 | host1 | 2 | +---------------------+-------+-----------------+ +CREATE TABLE host_union_0 ( + ts timestamp(3) time index, + host STRING PRIMARY KEY, + val BIGINT, + ts2 timestamp(3), +); + +Affected Rows: 0 + +CREATE TABLE host_union_1 ( + ts timestamp(3) time index, + host STRING PRIMARY KEY, + val BIGINT, + ts2 timestamp(3), +); + +Affected Rows: 0 + +INSERT INTO TABLE host_union_0 VALUES + (0, 'host1', 3, 0), + (5000, 'host1', 2, 5000), + (10000, 'host1', 1, 10000); + +Affected Rows: 3 + +INSERT INTO TABLE host_union_1 VALUES + (0, 'host1', 6, 0), + (5000, 'host1', 5, 5000), + (10000, 'host1', 4, 10000); + +Affected Rows: 3 + +SELECT ts, host, min(val ORDER BY ts ASC) RANGE '5s' +FROM ( + SELECT ts, host, val, ts2 FROM host_union_0 + UNION ALL + SELECT ts, host, val, ts2 FROM host_union_1 +) +WHERE ts >= '1970-01-01 00:00:00' +ALIGN '5s' BY (host) +ORDER BY host, ts; + ++---------------------+-------+------------------------------------------------+ +| ts | host | min(val) ORDER BY [ts ASC NULLS LAST] RANGE 5s | ++---------------------+-------+------------------------------------------------+ +| 1970-01-01T00:00:00 | host1 | 3 | +| 1970-01-01T00:00:05 | host1 | 2 | +| 1970-01-01T00:00:10 | host1 | 1 | ++---------------------+-------+------------------------------------------------+ + +SELECT tmp.ts, tmp.host, min(tmp.val ORDER BY tmp.ts ASC) RANGE '5s' +FROM ( + SELECT ts, host, val, ts2 FROM host_union_0 + UNION ALL + SELECT ts, host, val, ts2 FROM host_union_1 +) AS tmp +WHERE tmp.ts >= '1970-01-01 00:00:00' +ALIGN '5s' BY (tmp.host) +ORDER BY tmp.host, tmp.ts; + ++---------------------+-------+--------------------------------------------------------+ +| ts | host | min(tmp.val) ORDER BY [tmp.ts ASC NULLS LAST] RANGE 5s | ++---------------------+-------+--------------------------------------------------------+ +| 1970-01-01T00:00:00 | host1 | 3 | +| 1970-01-01T00:00:05 | host1 | 2 | +| 1970-01-01T00:00:10 | host1 | 1 | ++---------------------+-------+--------------------------------------------------------+ + +DROP TABLE host_union_0; + +Affected Rows: 0 + +DROP TABLE host_union_1; + +Affected Rows: 0 + -- Test EXPLAIN and ANALYZE -- SQLNESS REPLACE (-+) - -- SQLNESS REPLACE (\s\s+) _ diff --git a/tests/cases/standalone/common/range/nest.sql b/tests/cases/standalone/common/range/nest.sql index 4ee447cfbdef..d92f9f1345fd 100644 --- a/tests/cases/standalone/common/range/nest.sql +++ b/tests/cases/standalone/common/range/nest.sql @@ -22,6 +22,54 @@ SELECT ts, host, foo FROM (SELECT ts, host, min(val) RANGE '5s' AS foo FROM host SELECT ts, b, min(c) RANGE '5s' FROM (SELECT ts, host AS b, val AS c FROM host WHERE host = 'host1') ALIGN '5s' BY (b) ORDER BY b, ts; +CREATE TABLE host_union_0 ( + ts timestamp(3) time index, + host STRING PRIMARY KEY, + val BIGINT, + ts2 timestamp(3), +); + +CREATE TABLE host_union_1 ( + ts timestamp(3) time index, + host STRING PRIMARY KEY, + val BIGINT, + ts2 timestamp(3), +); + +INSERT INTO TABLE host_union_0 VALUES + (0, 'host1', 3, 0), + (5000, 'host1', 2, 5000), + (10000, 'host1', 1, 10000); + +INSERT INTO TABLE host_union_1 VALUES + (0, 'host1', 6, 0), + (5000, 'host1', 5, 5000), + (10000, 'host1', 4, 10000); + +SELECT ts, host, min(val ORDER BY ts ASC) RANGE '5s' +FROM ( + SELECT ts, host, val, ts2 FROM host_union_0 + UNION ALL + SELECT ts, host, val, ts2 FROM host_union_1 +) +WHERE ts >= '1970-01-01 00:00:00' +ALIGN '5s' BY (host) +ORDER BY host, ts; + +SELECT tmp.ts, tmp.host, min(tmp.val ORDER BY tmp.ts ASC) RANGE '5s' +FROM ( + SELECT ts, host, val, ts2 FROM host_union_0 + UNION ALL + SELECT ts, host, val, ts2 FROM host_union_1 +) AS tmp +WHERE tmp.ts >= '1970-01-01 00:00:00' +ALIGN '5s' BY (tmp.host) +ORDER BY tmp.host, tmp.ts; + +DROP TABLE host_union_0; + +DROP TABLE host_union_1; + -- Test EXPLAIN and ANALYZE