Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 87 additions & 4 deletions src/query/src/optimizer/windowed_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,16 @@ impl WindowedSortPhysicalRule {
let Some(scanner_info) = fetch_partition_range(sort_input.clone())? else {
return Ok(Transformed::no(plan));
};
let input_schema = sort_input.schema();

if let Some(first_sort_expr) = sort_exec.expr().first()
&& let Some(column_expr) = first_sort_expr
.expr
.as_any()
.downcast_ref::<PhysicalColumn>()
&& scanner_info.time_index.contains(column_expr.name())
&& scanner_info
.time_index
.contains(input_schema.field(column_expr.index()).name())
{
} else {
return Ok(Transformed::no(plan));
Expand Down Expand Up @@ -157,6 +160,7 @@ struct ScannerInfo {
fn fetch_partition_range(input: Arc<dyn ExecutionPlan>) -> DataFusionResult<Option<ScannerInfo>> {
let mut partition_ranges = None;
let mut time_index = HashSet::new();
let mut alias_map = Vec::new();
let mut tag_columns = None;
let mut is_batch_coalesced = false;
Comment thread
fengys1996 marked this conversation as resolved.

Expand All @@ -174,15 +178,16 @@ fn fetch_partition_range(input: Arc<dyn ExecutionPlan>) -> DataFusionResult<Opti
is_batch_coalesced = true;
}

// TODO(discord9): do this in logical plan instead as it's lessy bugy there
// Collects alias of the time index column.
if let Some(projection) = plan.as_any().downcast_ref::<ProjectionExec>() {
for (expr, output_name) in projection.expr() {
if let Some(column_expr) = expr.as_any().downcast_ref::<PhysicalColumn>() {
if time_index.contains(column_expr.name()) {
time_index.insert(output_name.clone());
}
alias_map.push((column_expr.name().to_string(), output_name.clone()));
}
}
// resolve alias properly
time_index = resolve_alias(&alias_map, &time_index);
}
Comment thread
fengys1996 marked this conversation as resolved.

if let Some(region_scan_exec) = plan.as_any().downcast_ref::<RegionScanExec>() {
Expand Down Expand Up @@ -231,3 +236,81 @@ fn remove_repartition(
Ok(Transformed::no(plan))
})
}

/// Resolves alias of the time index column.
///
/// i.e if a is time index, alias= {a:b, b:c}, then result should be {a, b}(not {a, c}) because projection is not transitive
/// if alias={b:a} and a is time index, then return empty
fn resolve_alias(alias_map: &[(String, String)], time_index: &HashSet<String>) -> HashSet<String> {
// available old name for time index
let mut avail_old_name = time_index.clone();
let mut new_time_index = HashSet::new();
for (old, new) in alias_map {
if time_index.contains(old) {
new_time_index.insert(new.clone());
} else if time_index.contains(new) && old != new {
// other alias to time index, remove the old name
avail_old_name.remove(new);
continue;
}
}
// add the remaining time index that is not in alias map
new_time_index.extend(avail_old_name);
new_time_index
}
Comment thread
fengys1996 marked this conversation as resolved.

#[cfg(test)]
mod test {
use itertools::Itertools;

use super::*;

#[test]
fn test_alias() {
let testcases = [
// notice the old name is still in the result
(
vec![("a", "b"), ("b", "c")],
HashSet::from(["a"]),
HashSet::from(["a", "b"]),
),
// alias swap
(
vec![("b", "a"), ("a", "b")],
HashSet::from(["a"]),
HashSet::from(["b"]),
),
(
vec![("b", "a"), ("b", "c")],
HashSet::from(["a"]),
HashSet::from([]),
),
// not in alias map
(
vec![("c", "d"), ("d", "c")],
HashSet::from(["a"]),
HashSet::from(["a"]),
),
Comment thread
fengys1996 marked this conversation as resolved.
// no alias
(vec![], HashSet::from(["a"]), HashSet::from(["a"])),
// empty time index
(vec![], HashSet::from([]), HashSet::from([])),
];
for (alias_map, time_index, expected) in testcases {
let alias_map = alias_map
.into_iter()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect_vec();
let time_index = time_index.into_iter().map(|i| i.to_string()).collect();
let expected: HashSet<String> = expected.into_iter().map(|i| i.to_string()).collect();

assert_eq!(
expected,
resolve_alias(&alias_map, &time_index),
"alias_map={:?}, time_index={:?}",
alias_map,
time_index
);
}
}
}
232 changes: 232 additions & 0 deletions tests/cases/distributed/optimizer/windowed_sort.result
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
-- test if handle aliased sort expr correctly
CREATE TABLE IF NOT EXISTS lightning (
collect_time TIMESTAMP(9) NOT NULL,
collect_time_utc TIMESTAMP(9) NULL,
peak_current FLOAT NULL,
TIME INDEX (collect_time)
)
ENGINE=mito
WITH(
'compaction.twcs.time_window' = '7d',
'compaction.type' = 'twcs'
);

Affected Rows: 0

-- insert some data, with collect_time = collect_time_utc + 8 hour
INSERT INTO lightning VALUES
('2025-03-01 16:00:00', '2025-03-01 08:00:00', 1.0),
('2025-03-01 17:00:00', '2025-03-01 09:00:00', 1.0),
('2025-03-01 18:00:00', '2025-03-01 10:00:00', 1.0),
('2025-03-01 19:00:00', '2025-03-01 11:00:00', 1.0),
('2025-03-01 20:00:00', '2025-03-01 12:00:00', 1.0),
('2025-03-01 21:00:00', '2025-03-01 13:00:00', 1.0),
('2025-03-01 22:00:00', '2025-03-01 14:00:00', 1.0),
('2025-03-01 23:00:00', '2025-03-01 15:00:00', 1.0)
;

Affected Rows: 8

-- notice the alias make order by not applicable for window sort
-- note due to alias there is a tiny difference in the output between standalone/distributed
-- which is acceptable
SELECT
collect_time_utc AS collect_time,
peak_current,
FROM
lightning
ORDER BY
collect_time ASC;

+---------------------+--------------+
| collect_time | peak_current |
+---------------------+--------------+
| 2025-03-01T08:00:00 | 1.0 |
| 2025-03-01T09:00:00 | 1.0 |
| 2025-03-01T10:00:00 | 1.0 |
| 2025-03-01T11:00:00 | 1.0 |
| 2025-03-01T12:00:00 | 1.0 |
| 2025-03-01T13:00:00 | 1.0 |
| 2025-03-01T14:00:00 | 1.0 |
| 2025-03-01T15:00:00 | 1.0 |
+---------------------+--------------+

-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
EXPLAIN ANALYZE SELECT
collect_time_utc AS collect_time,
peak_current,
FROM
lightning
ORDER BY
collect_time ASC;

+-+-+-+
| stage | node | plan_|
+-+-+-+
| 0_| 0_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_ProjectionExec: expr=[collect_time_utc@0 as collect_time, peak_current@1 as peak_current] REDACTED
|_|_|_SortExec: expr=[collect_time_utc@0 ASC NULLS LAST], preserve_partitioning=[false] REDACTED
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges) REDACTED
|_|_|_|
|_|_| Total rows: 8_|
+-+-+-+

-- also try alias with different name with time index
SELECT
collect_time_utc AS collect_time_0,
peak_current,
FROM
lightning
ORDER BY
collect_time_0 ASC;

+---------------------+--------------+
| collect_time_0 | peak_current |
+---------------------+--------------+
| 2025-03-01T08:00:00 | 1.0 |
| 2025-03-01T09:00:00 | 1.0 |
| 2025-03-01T10:00:00 | 1.0 |
| 2025-03-01T11:00:00 | 1.0 |
| 2025-03-01T12:00:00 | 1.0 |
| 2025-03-01T13:00:00 | 1.0 |
| 2025-03-01T14:00:00 | 1.0 |
| 2025-03-01T15:00:00 | 1.0 |
+---------------------+--------------+

-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
EXPLAIN ANALYZE SELECT
collect_time_utc AS collect_time_0,
peak_current,
FROM
lightning
ORDER BY
collect_time_0 ASC;

+-+-+-+
| stage | node | plan_|
+-+-+-+
| 0_| 0_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_ProjectionExec: expr=[collect_time_utc@0 as collect_time_0, peak_current@1 as peak_current] REDACTED
|_|_|_SortExec: expr=[collect_time_utc@0 ASC NULLS LAST], preserve_partitioning=[false] REDACTED
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges) REDACTED
|_|_|_|
|_|_| Total rows: 8_|
+-+-+-+

-- try more complex alias with time index
SELECT
collect_time AS true_collect_time,
collect_time_utc AS collect_time,
peak_current,
FROM
lightning
ORDER BY
true_collect_time DESC;

+---------------------+---------------------+--------------+
| true_collect_time | collect_time | peak_current |
+---------------------+---------------------+--------------+
| 2025-03-01T23:00:00 | 2025-03-01T15:00:00 | 1.0 |
| 2025-03-01T22:00:00 | 2025-03-01T14:00:00 | 1.0 |
| 2025-03-01T21:00:00 | 2025-03-01T13:00:00 | 1.0 |
| 2025-03-01T20:00:00 | 2025-03-01T12:00:00 | 1.0 |
| 2025-03-01T19:00:00 | 2025-03-01T11:00:00 | 1.0 |
| 2025-03-01T18:00:00 | 2025-03-01T10:00:00 | 1.0 |
| 2025-03-01T17:00:00 | 2025-03-01T09:00:00 | 1.0 |
| 2025-03-01T16:00:00 | 2025-03-01T08:00:00 | 1.0 |
+---------------------+---------------------+--------------+

-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
EXPLAIN ANALYZE SELECT
collect_time AS true_collect_time,
collect_time_utc AS collect_time,
peak_current,
FROM
lightning
ORDER BY
true_collect_time DESC;

+-+-+-+
| stage | node | plan_|
+-+-+-+
| 0_| 0_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_ProjectionExec: expr=[collect_time@0 as true_collect_time, collect_time_utc@1 as collect_time, peak_current@2 as peak_current] REDACTED
|_|_|_SortPreservingMergeExec: [collect_time@0 DESC] REDACTED
|_|_|_WindowedSortExec: expr=collect_time@0 DESC num_ranges=1 REDACTED
|_|_|_PartSortExec: expr=collect_time@0 DESC num_ranges=1 REDACTED
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges) REDACTED
|_|_|_|
|_|_| Total rows: 8_|
+-+-+-+

-- this should also do windowed sort
SELECT
collect_time_utc AS collect_time,
collect_time AS true_collect_time,
peak_current,
FROM
lightning
ORDER BY
true_collect_time DESC;

+---------------------+---------------------+--------------+
| collect_time | true_collect_time | peak_current |
+---------------------+---------------------+--------------+
| 2025-03-01T15:00:00 | 2025-03-01T23:00:00 | 1.0 |
| 2025-03-01T14:00:00 | 2025-03-01T22:00:00 | 1.0 |
| 2025-03-01T13:00:00 | 2025-03-01T21:00:00 | 1.0 |
| 2025-03-01T12:00:00 | 2025-03-01T20:00:00 | 1.0 |
| 2025-03-01T11:00:00 | 2025-03-01T19:00:00 | 1.0 |
| 2025-03-01T10:00:00 | 2025-03-01T18:00:00 | 1.0 |
| 2025-03-01T09:00:00 | 2025-03-01T17:00:00 | 1.0 |
| 2025-03-01T08:00:00 | 2025-03-01T16:00:00 | 1.0 |
+---------------------+---------------------+--------------+

-- SQLNESS REPLACE (-+) -
-- SQLNESS REPLACE (\s\s+) _
-- SQLNESS REPLACE (peers.*) REDACTED
-- SQLNESS REPLACE (metrics.*) REDACTED
-- SQLNESS REPLACE region=\d+\(\d+,\s+\d+\) region=REDACTED
EXPLAIN ANALYZE SELECT
collect_time_utc AS collect_time,
collect_time AS true_collect_time,
peak_current,
FROM
lightning
ORDER BY
true_collect_time DESC;

+-+-+-+
| stage | node | plan_|
+-+-+-+
| 0_| 0_|_MergeScanExec: REDACTED
|_|_|_|
| 1_| 0_|_ProjectionExec: expr=[collect_time_utc@0 as collect_time, collect_time@1 as true_collect_time, peak_current@2 as peak_current] REDACTED
|_|_|_SortPreservingMergeExec: [collect_time@1 DESC] REDACTED
|_|_|_WindowedSortExec: expr=collect_time@1 DESC num_ranges=1 REDACTED
|_|_|_PartSortExec: expr=collect_time@1 DESC num_ranges=1 REDACTED
|_|_|_ProjectionExec: expr=[collect_time_utc@1 as collect_time_utc, collect_time@0 as collect_time, peak_current@2 as peak_current] REDACTED
|_|_|_SeqScan: region=REDACTED, partition_count=1 (1 memtable ranges, 0 file 0 ranges) REDACTED
|_|_|_|
|_|_| Total rows: 8_|
+-+-+-+

DROP TABLE lightning;

Affected Rows: 0

Loading
Loading