Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 147 additions & 35 deletions src/query/src/range_select/plan_rewrite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use datafusion_expr::{
};
use datafusion_optimizer::simplify_expressions::ExprSimplifier;
use datatypes::prelude::ConcreteDataType;
use datatypes::schema::TIME_INDEX_KEY;
use promql_parser::util::parse_duration;
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt, ensure};
Expand Down Expand Up @@ -376,21 +377,24 @@ impl RangePlanRewriter {
}
.fail();
};
let (time_index, default_by) = self.get_index_by(input.schema()).await?;
let query_ctx = self.query_ctx.clone();
let mut range_rewriter = RangeExprRewriter {
input_plan: &input,
align: Duration::default(),
align_to: 0,
by: vec![],
range_fn: BTreeSet::new(),
sub_aggr: aggr_plan,
query_ctx: &self.query_ctx,
query_ctx: &query_ctx,
};
let new_expr = expr
.iter()
.map(|expr| expr.clone().rewrite(&mut range_rewriter).map(|x| x.data))
.collect::<DFResult<Vec<_>>>()?;
if range_rewriter.by.is_empty() {
let need_default_by = range_rewriter.by.is_empty();
let (time_index, default_by) =
self.get_index_by(input.schema(), need_default_by).await?;
if need_default_by {
range_rewriter.by = default_by;
}
let range_select = RangeSelect::try_new(
Expand Down Expand Up @@ -485,21 +489,49 @@ impl RangePlanRewriter {
/// return `(time_index, [row_columns])` to the rewriter.
/// If the user does not explicitly use the `by` keyword to indicate time series,
/// `[row_columns]` will be use as default time series
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to change the comment too?

async fn get_index_by(&mut self, schema: &Arc<DFSchema>) -> Result<(Expr, Vec<Expr>)> {
async fn get_index_by(
&mut self,
schema: &Arc<DFSchema>,
need_default_by: bool,
) -> Result<(Expr, Vec<Expr>)> {
#[allow(deprecated)]
let mut time_index_expr = Expr::Wildcard {
qualifier: None,
options: Box::new(WildcardOptions::default()),
};
let mut default_by = vec![];
let metadata_time_index_expr = (0..schema.fields().len()).find_map(|i| {
let (qualifier, field) = schema.qualified_field(i);
if field.metadata().contains_key(TIME_INDEX_KEY)
&& matches!(field.data_type(), DataType::Timestamp(_, _))
{
Some(Expr::Column(Column::new(
qualifier.cloned(),
field.name().clone(),
)))
} else {
None
}
});
for i in 0..schema.fields().len() {
let (qualifier, _) = schema.qualified_field(i);
if let Some(table_ref) = qualifier {
let table = self
.table_provider
.resolve_table(table_ref.clone())
.await
.context(CatalogSnafu)?
let table_source = match self.table_provider.resolve_table(table_ref.clone()).await
{
Ok(table_source) => table_source,
Err(error) => {
// TableNotExist may infer this table is a derived table (like from JOIN or set op),
// in this case we can still continue with time index column identified from column
// metadata.
if matches!(&error, catalog::error::Error::TableNotExist { .. })
&& metadata_time_index_expr.is_some()
{
continue;
}
Comment on lines +522 to +530
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add some comments to document why do we need this check?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to add a debug log.

return Err(error).context(CatalogSnafu);
}
};
let table = table_source
.as_any()
.downcast_ref::<DefaultTableSource>()
.context(UnknownTableSnafu)?
Expand Down Expand Up @@ -537,6 +569,18 @@ impl RangePlanRewriter {
}
}
#[allow(deprecated)]
if matches!(time_index_expr, Expr::Wildcard { .. })
&& let Some(expr) = metadata_time_index_expr
{
ensure!(
!need_default_by,
RangeQuerySnafu {
msg: "Cannot infer default BY columns from derived range query input"
}
);
time_index_expr = expr;
}
#[allow(deprecated)]
if matches!(time_index_expr, Expr::Wildcard { .. }) {
TimeIndexNotFoundSnafu {
table: schema.to_string(),
Expand Down Expand Up @@ -614,6 +658,7 @@ mod test {
use datatypes::schema::{ColumnSchema, Schema};
use session::context::QueryContext;
use table::metadata::{TableInfoBuilder, TableMetaBuilder};
use table::table::TableRef;
use table::test_util::EmptyTable;

use super::*;
Expand All @@ -622,7 +667,45 @@ mod test {
use crate::{QueryEngineFactory, QueryEngineRef};

async fn create_test_engine() -> QueryEngineRef {
let table_name = "test".to_string();
create_test_engine_with_tables(&["test"], false).await
}

async fn create_union_test_engine() -> QueryEngineRef {
create_test_engine_with_tables(&["test_0", "test_1"], true).await
}

async fn create_test_engine_with_tables(
table_names: &[&str],
with_extra_timestamp: bool,
) -> QueryEngineRef {
let catalog_list = MemoryCatalogManager::with_default_setup();
for (i, table_name) in table_names.iter().enumerate() {
let table = create_test_table(table_name, with_extra_timestamp);
assert!(
catalog_list
.register_table_sync(RegisterTableRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table_name: (*table_name).to_string(),
table_id: 1024 + i as u32,
table,
})
.is_ok()
);
}
QueryEngineFactory::new(
catalog_list,
None,
None,
None,
None,
false,
QueryOptions::default(),
)
.query_engine()
}

fn create_test_table(table_name: &str, with_extra_timestamp: bool) -> TableRef {
let mut columns = vec![];
for i in 0..5 {
columns.push(ColumnSchema::new(
Expand All @@ -639,6 +722,13 @@ mod test {
)
.with_time_index(true),
);
if with_extra_timestamp {
columns.push(ColumnSchema::new(
"timestamp_2".to_string(),
ConcreteDataType::timestamp_millisecond_datatype(),
true,
));
}
for i in 0..5 {
columns.push(ColumnSchema::new(
format!("field_{i}"),
Expand All @@ -650,38 +740,20 @@ mod test {
let table_meta = TableMetaBuilder::empty()
.schema(schema)
.primary_key_indices((0..5).collect())
.value_indices((6..11).collect())
.value_indices(if with_extra_timestamp {
(6..12).collect()
} else {
(6..11).collect()
})
.next_column_id(1024)
.build()
.unwrap();
let table_info = TableInfoBuilder::default()
.name(&table_name)
.name(table_name)
.meta(table_meta)
.build()
.unwrap();
let table = EmptyTable::from_table_info(&table_info);
let catalog_list = MemoryCatalogManager::with_default_setup();
assert!(
catalog_list
.register_table_sync(RegisterTableRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table_name,
table_id: 1024,
table,
})
.is_ok()
);
QueryEngineFactory::new(
catalog_list,
None,
None,
None,
None,
false,
QueryOptions::default(),
)
.query_engine()
EmptyTable::from_table_info(&table_info)
}

async fn do_query(sql: &str) -> Result<LogicalPlan> {
Expand All @@ -690,6 +762,12 @@ mod test {
engine.planner().plan(&stmt, QueryContext::arc()).await
}

async fn do_union_query(sql: &str) -> Result<LogicalPlan> {
let stmt = QueryLanguageParser::parse_sql(sql, &QueryContext::arc()).unwrap();
let engine = create_union_test_engine().await;
engine.planner().plan(&stmt, QueryContext::arc()).await
}

async fn query_plan_compare(sql: &str, expected: String) {
let plan = do_query(sql).await.unwrap();
assert_eq!(plan.display_indent_schema().to_string(), expected);
Expand Down Expand Up @@ -765,6 +843,40 @@ mod test {
query_plan_compare(query, expected).await;
}

#[tokio::test]
async fn range_from_union_query() {
let queries = [
r#"SELECT timestamp, tag_0, avg(field_0) RANGE '5m'
FROM (
SELECT timestamp, tag_0, field_0, timestamp_2 FROM test_0
UNION ALL
SELECT timestamp, tag_0, field_0, timestamp_2 FROM test_1
)
WHERE timestamp >= '1970-01-01 00:00:00'
ALIGN '1h' by (tag_0)"#,
r#"SELECT tmp.timestamp, tmp.tag_0, avg(tmp.field_0) RANGE '5m'
FROM (
SELECT timestamp, tag_0, field_0, timestamp_2 FROM test_0
UNION ALL
SELECT timestamp, tag_0, field_0, timestamp_2 FROM test_1
) AS tmp
WHERE tmp.timestamp >= '1970-01-01 00:00:00'
ALIGN '1h' by (tmp.tag_0)"#,
];

for query in queries {
let plan = do_union_query(query)
.await
.unwrap()
.display_indent_schema()
.to_string();

assert!(plan.contains("RangeSelect"));
assert!(plan.contains("Union"));
assert!(plan.contains("time_index=timestamp"));
}
}

#[tokio::test]
async fn range_in_expr() {
let query = r#"SELECT sin(avg(field_0 + field_1) RANGE '5m' + 1) FROM test ALIGN '1h' by (tag_0,tag_1);"#;
Expand Down
76 changes: 76 additions & 0 deletions tests/cases/standalone/common/range/nest.result
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,82 @@ SELECT ts, b, min(c) RANGE '5s' FROM (SELECT ts, host AS b, val AS c FROM host W
| 1970-01-01T00:00:20 | host1 | 2 |
+---------------------+-------+-----------------+

CREATE TABLE host_union_0 (
ts timestamp(3) time index,
host STRING PRIMARY KEY,
val BIGINT,
ts2 timestamp(3),
);

Affected Rows: 0

CREATE TABLE host_union_1 (
ts timestamp(3) time index,
host STRING PRIMARY KEY,
val BIGINT,
ts2 timestamp(3),
);

Affected Rows: 0

INSERT INTO TABLE host_union_0 VALUES
(0, 'host1', 3, 0),
(5000, 'host1', 2, 5000),
(10000, 'host1', 1, 10000);

Affected Rows: 3

INSERT INTO TABLE host_union_1 VALUES
(0, 'host1', 6, 0),
(5000, 'host1', 5, 5000),
(10000, 'host1', 4, 10000);

Affected Rows: 3

SELECT ts, host, min(val ORDER BY ts ASC) RANGE '5s'
FROM (
SELECT ts, host, val, ts2 FROM host_union_0
UNION ALL
SELECT ts, host, val, ts2 FROM host_union_1
)
WHERE ts >= '1970-01-01 00:00:00'
ALIGN '5s' BY (host)
ORDER BY host, ts;

+---------------------+-------+------------------------------------------------+
| ts | host | min(val) ORDER BY [ts ASC NULLS LAST] RANGE 5s |
+---------------------+-------+------------------------------------------------+
| 1970-01-01T00:00:00 | host1 | 3 |
| 1970-01-01T00:00:05 | host1 | 2 |
| 1970-01-01T00:00:10 | host1 | 1 |
+---------------------+-------+------------------------------------------------+

SELECT tmp.ts, tmp.host, min(tmp.val ORDER BY tmp.ts ASC) RANGE '5s'
FROM (
SELECT ts, host, val, ts2 FROM host_union_0
UNION ALL
SELECT ts, host, val, ts2 FROM host_union_1
) AS tmp
WHERE tmp.ts >= '1970-01-01 00:00:00'
ALIGN '5s' BY (tmp.host)
ORDER BY tmp.host, tmp.ts;

+---------------------+-------+--------------------------------------------------------+
| ts | host | min(tmp.val) ORDER BY [tmp.ts ASC NULLS LAST] RANGE 5s |
+---------------------+-------+--------------------------------------------------------+
| 1970-01-01T00:00:00 | host1 | 3 |
| 1970-01-01T00:00:05 | host1 | 2 |
| 1970-01-01T00:00:10 | host1 | 1 |
+---------------------+-------+--------------------------------------------------------+

DROP TABLE host_union_0;

Affected Rows: 0

DROP TABLE host_union_1;

Affected Rows: 0

-- Test EXPLAIN and ANALYZE
-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
Expand Down
Loading
Loading