diff --git a/crates/toasty-core/src/driver/operation/query_pk.rs b/crates/toasty-core/src/driver/operation/query_pk.rs index 3d6a2fcb8..cf34c8299 100644 --- a/crates/toasty-core/src/driver/operation/query_pk.rs +++ b/crates/toasty-core/src/driver/operation/query_pk.rs @@ -21,6 +21,17 @@ pub struct QueryPk { /// Additional filtering done on the result before returning it to the /// caller. pub filter: Option, + + /// Maximum number of items to return. `None` means no limit. + pub limit: Option, + + /// Sort key ordering direction for queries on a table with a composite + /// primary key. `None` uses the driver's default ordering. + pub order: Option, + + /// Cursor for resuming a paginated query. Contains the serialized key of + /// the last item from a previous page of results. + pub cursor: Option, } impl From for Operation { diff --git a/crates/toasty-driver-dynamodb/src/op/query_pk.rs b/crates/toasty-driver-dynamodb/src/op/query_pk.rs index 0cfae19f9..26ec16c3b 100644 --- a/crates/toasty-driver-dynamodb/src/op/query_pk.rs +++ b/crates/toasty-driver-dynamodb/src/op/query_pk.rs @@ -1,5 +1,5 @@ use super::{ - ddb_expression, item_to_record, operation, stmt, Connection, ExprAttrs, Result, Schema, + ddb_expression, ddb_key, item_to_record, operation, stmt, Connection, ExprAttrs, Result, Schema, }; use std::sync::Arc; use toasty_core::{driver::Response, stmt::ExprContext}; @@ -29,7 +29,6 @@ impl Connection { let index = schema.db.index(index_id); if index.unique { - // assert!(!index.unique, "Index needs all fields"); use toasty_core::Error; let err = Error::from_args(format_args!( "Unique index {} doesn't have fields.", @@ -38,27 +37,55 @@ impl Connection { Err(err) } else { tracing::trace!(table_name = %table.name, index_name = %index.name, "querying secondary index"); - self.client + let mut query = self + .client .query() .table_name(&table.name) .index_name(&index.name) .key_condition_expression(key_expression) .set_filter_expression(filter_expression) .set_expression_attribute_names(Some(expr_attrs.attr_names)) - .set_expression_attribute_values(Some(expr_attrs.attr_values)) + .set_expression_attribute_values(Some(expr_attrs.attr_values)); + + // Apply pagination parameters when present. + if let Some(limit) = op.limit { + query = query.limit(limit as i32); + } + if let Some(ref direction) = op.order { + query = query.scan_index_forward(*direction == stmt::Direction::Asc); + } + if let Some(ref start_key) = op.cursor { + query = query.set_exclusive_start_key(Some(ddb_key(table, start_key))); + } + + query .send() .await .map_err(toasty_core::Error::driver_operation_failed) } } else { tracing::trace!(table_name = %table.name, "querying primary key"); - self.client + let mut query = self + .client .query() .table_name(&table.name) .key_condition_expression(key_expression) .set_filter_expression(filter_expression) .set_expression_attribute_names(Some(expr_attrs.attr_names)) - .set_expression_attribute_values(Some(expr_attrs.attr_values)) + .set_expression_attribute_values(Some(expr_attrs.attr_values)); + + // Apply pagination parameters when present. + if let Some(limit) = op.limit { + query = query.limit(limit as i32); + } + if let Some(ref direction) = op.order { + query = query.scan_index_forward(*direction == stmt::Direction::Asc); + } + if let Some(ref start_key) = op.cursor { + query = query.set_exclusive_start_key(Some(ddb_key(table, start_key))); + } + + query .send() .await .map_err(toasty_core::Error::driver_operation_failed) diff --git a/crates/toasty-driver-integration-suite/src/tests.rs b/crates/toasty-driver-integration-suite/src/tests.rs index da18da563..56bd191ee 100644 --- a/crates/toasty-driver-integration-suite/src/tests.rs +++ b/crates/toasty-driver-integration-suite/src/tests.rs @@ -2,6 +2,7 @@ pub mod belongs_to_configured; pub mod belongs_to_one_way; pub mod belongs_to_self_referential; pub mod bigdecimal; +pub mod composite_key_pagination; pub mod connection_per_clone; pub mod create_macro; pub mod decimal; diff --git a/crates/toasty-driver-integration-suite/src/tests/composite_key_pagination.rs b/crates/toasty-driver-integration-suite/src/tests/composite_key_pagination.rs new file mode 100644 index 000000000..0a8114345 --- /dev/null +++ b/crates/toasty-driver-integration-suite/src/tests/composite_key_pagination.rs @@ -0,0 +1,233 @@ +//! Test pagination on composite-key models. +//! +//! These tests exercise `paginate()`, `limit()`, and `order_by()` on models +//! with a partition + local key, which is the pattern DynamoDB uses for +//! `QueryPk`. They intentionally have **no** `requires(sql)` gate so they run +//! on every driver, including DynamoDB. + +use crate::prelude::*; +use toasty::Page; +use toasty_core::driver::{Operation, Rows}; + +#[driver_test] +pub async fn paginate_composite_key(test: &mut Test) -> Result<()> { + #[derive(Debug, toasty::Model)] + #[key(partition = kind, local = seq)] + struct Event { + kind: String, + seq: i64, + } + + let mut db = test.setup_db(models!(Event)).await; + + // Seed 20 events under the same partition key so we can paginate over them. + for i in 0..20 { + Event::create().kind("info").seq(i).exec(&mut db).await?; + } + + test.log().clear(); + + // First page (descending): should return seq 19..10 + let page: Page<_> = Event::filter_by_kind("info") + .order_by(Event::fields().seq().desc()) + .paginate(10) + .collect(&mut db) + .await?; + + assert_eq!(page.len(), 10); + for (i, expected) in (10..20).rev().enumerate() { + assert_eq!(page[i].seq, expected); + } + + // Verify the driver operation type + let (op, resp) = test.log().pop(); + if test.capability().sql { + assert_struct!(op, Operation::QuerySql(_)); + } else { + assert_struct!(op, Operation::QueryPk(_)); + } + assert_struct!(resp.rows, Rows::Stream(_)); + + // Second page via .next() + let page: Page<_> = page.next(&mut db).await?.unwrap(); + assert_eq!(page.len(), 10); + for (i, expected) in (0..10).rev().enumerate() { + assert_eq!(page[i].seq, expected); + } + + // Go back to the first page via .prev() + let page: Page<_> = page.prev(&mut db).await?.unwrap(); + assert_eq!(page.len(), 10); + for (i, expected) in (10..20).rev().enumerate() { + assert_eq!(page[i].seq, expected); + } + + Ok(()) +} + +#[driver_test] +pub async fn paginate_composite_key_asc(test: &mut Test) -> Result<()> { + #[derive(Debug, toasty::Model)] + #[key(partition = kind, local = seq)] + struct Event { + kind: String, + seq: i64, + } + + let mut db = test.setup_db(models!(Event)).await; + + for i in 0..20 { + Event::create().kind("info").seq(i).exec(&mut db).await?; + } + + test.log().clear(); + + // First page (ascending): should return seq 0..9 + let page: Page<_> = Event::filter_by_kind("info") + .order_by(Event::fields().seq().asc()) + .paginate(5) + .collect(&mut db) + .await?; + + assert_eq!(page.len(), 5); + for (i, expected) in (0..5).enumerate() { + assert_eq!(page[i].seq, expected); + } + + let (op, _) = test.log().pop(); + if test.capability().sql { + assert_struct!(op, Operation::QuerySql(_)); + } else { + assert_struct!(op, Operation::QueryPk(_)); + } + + // Walk forward through all pages and collect every seq value. + let mut all_seqs: Vec = page.iter().map(|e| e.seq).collect(); + let mut current = page; + while let Some(next) = current.next(&mut db).await? { + all_seqs.extend(next.iter().map(|e| e.seq)); + current = next; + } + + assert_eq!(all_seqs, (0..20).collect::>()); + + Ok(()) +} + +#[driver_test] +pub async fn limit_composite_key(test: &mut Test) -> Result<()> { + #[derive(Debug, toasty::Model)] + #[key(partition = kind, local = seq)] + struct Event { + kind: String, + seq: i64, + } + + let mut db = test.setup_db(models!(Event)).await; + + for i in 0..20 { + Event::create().kind("info").seq(i).exec(&mut db).await?; + } + + test.log().clear(); + + // Limit without explicit ordering + let events: Vec<_> = Event::filter_by_kind("info") + .limit(7) + .collect(&mut db) + .await?; + assert_eq!(events.len(), 7); + + let (op, _) = test.log().pop(); + if test.capability().sql { + assert_struct!(op, Operation::QuerySql(_)); + } else { + assert_struct!(op, Operation::QueryPk(_)); + } + + test.log().clear(); + + // Limit combined with descending order + let events: Vec<_> = Event::filter_by_kind("info") + .order_by(Event::fields().seq().desc()) + .limit(5) + .collect(&mut db) + .await?; + assert_eq!(events.len(), 5); + for i in 0..4 { + assert!(events[i].seq > events[i + 1].seq); + } + // The first item should be the highest seq + assert_eq!(events[0].seq, 19); + + test.log().clear(); + + // Limit larger than result set returns all results + let events: Vec<_> = Event::filter_by_kind("info") + .limit(100) + .collect(&mut db) + .await?; + assert_eq!(events.len(), 20); + + Ok(()) +} + +#[driver_test] +pub async fn sort_composite_key(test: &mut Test) -> Result<()> { + #[derive(Debug, toasty::Model)] + #[key(partition = kind, local = seq)] + struct Event { + kind: String, + seq: i64, + } + + let mut db = test.setup_db(models!(Event)).await; + + for i in 0..20 { + Event::create().kind("info").seq(i).exec(&mut db).await?; + } + + test.log().clear(); + + // Ascending sort + let events: Vec<_> = Event::filter_by_kind("info") + .order_by(Event::fields().seq().asc()) + .collect(&mut db) + .await?; + + assert_eq!(events.len(), 20); + for i in 0..19 { + assert!(events[i].seq < events[i + 1].seq); + } + + let (op, resp) = test.log().pop(); + if test.capability().sql { + assert_struct!(op, Operation::QuerySql(_)); + } else { + assert_struct!(op, Operation::QueryPk(_)); + } + assert_struct!(resp.rows, Rows::Stream(_)); + + test.log().clear(); + + // Descending sort + let events: Vec<_> = Event::filter_by_kind("info") + .order_by(Event::fields().seq().desc()) + .collect(&mut db) + .await?; + + assert_eq!(events.len(), 20); + for i in 0..19 { + assert!(events[i].seq > events[i + 1].seq); + } + + let (op, resp) = test.log().pop(); + if test.capability().sql { + assert_struct!(op, Operation::QuerySql(_)); + } else { + assert_struct!(op, Operation::QueryPk(_)); + } + assert_struct!(resp.rows, Rows::Stream(_)); + + Ok(()) +} diff --git a/crates/toasty-driver-integration-suite/src/tests/one_model_sort_limit.rs b/crates/toasty-driver-integration-suite/src/tests/one_model_sort_limit.rs index e9e536d3c..b4e310bc5 100644 --- a/crates/toasty-driver-integration-suite/src/tests/one_model_sort_limit.rs +++ b/crates/toasty-driver-integration-suite/src/tests/one_model_sort_limit.rs @@ -2,6 +2,10 @@ use crate::prelude::*; use toasty::Page; +use toasty_core::{ + driver::{Operation, Rows}, + stmt::{ExprSet, Statement}, +}; #[driver_test(id(ID), requires(sql))] pub async fn sort_asc(test: &mut Test) -> Result<()> { @@ -21,6 +25,8 @@ pub async fn sort_asc(test: &mut Test) -> Result<()> { Foo::create().order(i).exec(&mut db).await?; } + test.log().clear(); + let foos_asc: Vec<_> = Foo::all() .order_by(Foo::fields().order().asc()) .collect(&mut db) @@ -32,6 +38,20 @@ pub async fn sort_asc(test: &mut Test) -> Result<()> { assert!(foos_asc[i].order < foos_asc[i + 1].order); } + // Verify the SQL query has an ORDER BY clause + let (op, resp) = test.log().pop(); + assert_struct!(op, Operation::QuerySql(_ { + stmt: Statement::Query(_ { + body: ExprSet::Select(_ { .. }), + order_by: Some(_), + .. + }), + .. + })); + assert_struct!(resp.rows, Rows::Stream(_)); + + test.log().clear(); + let foos_desc: Vec<_> = Foo::all() .order_by(Foo::fields().order().desc()) .collect(&mut db) @@ -42,6 +62,18 @@ pub async fn sort_asc(test: &mut Test) -> Result<()> { for i in 0..99 { assert!(foos_desc[i].order > foos_desc[i + 1].order); } + + let (op, resp) = test.log().pop(); + assert_struct!(op, Operation::QuerySql(_ { + stmt: Statement::Query(_ { + body: ExprSet::Select(_ { .. }), + order_by: Some(_), + .. + }), + .. + })); + assert_struct!(resp.rows, Rows::Stream(_)); + Ok(()) } @@ -63,6 +95,8 @@ pub async fn paginate(test: &mut Test) -> Result<()> { Foo::create().order(i).exec(&mut db).await?; } + test.log().clear(); + let foos: Page<_> = Foo::all() .order_by(Foo::fields().order().desc()) .paginate(10) @@ -74,6 +108,21 @@ pub async fn paginate(test: &mut Test) -> Result<()> { assert_eq!(foos[i].order, order); } + // First page: SQL query should have ORDER BY and LIMIT + let (op, resp) = test.log().pop(); + assert_struct!(op, Operation::QuerySql(_ { + stmt: Statement::Query(_ { + body: ExprSet::Select(_ { .. }), + order_by: Some(_), + limit: Some(_), + .. + }), + .. + })); + assert_struct!(resp.rows, Rows::Stream(_)); + + test.log().clear(); + let foos: Page<_> = Foo::all() .order_by(Foo::fields().order().desc()) .paginate(10) @@ -124,10 +173,24 @@ pub async fn limit(t: &mut Test) -> Result<()> { Foo::create().order(i).exec(&mut db).await?; } + t.log().clear(); + // Basic limit without ordering let foos: Vec<_> = Foo::all().limit(5).collect(&mut db).await?; assert_eq!(foos.len(), 5); + let (op, _) = t.log().pop(); + assert_struct!(op, Operation::QuerySql(_ { + stmt: Statement::Query(_ { + body: ExprSet::Select(_ { .. }), + limit: Some(_), + .. + }), + .. + })); + + t.log().clear(); + // Limit combined with ordering let foos: Vec<_> = Foo::all() .order_by(Foo::fields().order().desc()) @@ -139,6 +202,19 @@ pub async fn limit(t: &mut Test) -> Result<()> { assert!(foos[i].order > foos[i + 1].order); } + let (op, _) = t.log().pop(); + assert_struct!(op, Operation::QuerySql(_ { + stmt: Statement::Query(_ { + body: ExprSet::Select(_ { .. }), + order_by: Some(_), + limit: Some(_), + .. + }), + .. + })); + + t.log().clear(); + // Limit larger than the result set returns all results let foos: Vec<_> = Foo::all().limit(100).collect(&mut db).await?; assert_eq!(foos.len(), 20); diff --git a/crates/toasty/src/engine/exec/query_pk.rs b/crates/toasty/src/engine/exec/query_pk.rs index 89de87721..17ccd023e 100644 --- a/crates/toasty/src/engine/exec/query_pk.rs +++ b/crates/toasty/src/engine/exec/query_pk.rs @@ -33,6 +33,15 @@ pub(crate) struct QueryPk { /// Filter to pass to the database pub row_filter: Option, + + /// Maximum number of items to return. + pub limit: Option, + + /// Sort key ordering direction. + pub order: Option, + + /// Cursor for resuming a paginated query. + pub cursor: Option, } impl Exec<'_> { @@ -55,6 +64,9 @@ impl Exec<'_> { select: action.columns.clone(), pk_filter: per_call_filter, filter: action.row_filter.clone(), + limit: action.limit, + order: action.order, + cursor: action.cursor.clone(), } .into() }) @@ -94,6 +106,9 @@ impl Exec<'_> { select: action.columns.clone(), pk_filter, filter: action.row_filter.clone(), + limit: action.limit, + order: action.order, + cursor: action.cursor.clone(), } .into(), ) diff --git a/crates/toasty/src/engine/lower.rs b/crates/toasty/src/engine/lower.rs index 821b8a7fd..ee71c44f1 100644 --- a/crates/toasty/src/engine/lower.rs +++ b/crates/toasty/src/engine/lower.rs @@ -514,11 +514,6 @@ impl visit_mut::VisitMut for LowerStatement<'_, '_> { } fn visit_stmt_query_mut(&mut self, stmt: &mut stmt::Query) { - if !self.capability().sql { - assert!(stmt.order_by.is_none(), "TODO: implement ordering for KV"); - assert!(stmt.limit.is_none(), "TODO: implement limit for KV"); - } - let mut lower = self.scope_expr(&stmt.body); if let Some(with) = &mut stmt.with { diff --git a/crates/toasty/src/engine/mir/query_pk.rs b/crates/toasty/src/engine/mir/query_pk.rs index 31692f672..3a708efc2 100644 --- a/crates/toasty/src/engine/mir/query_pk.rs +++ b/crates/toasty/src/engine/mir/query_pk.rs @@ -35,6 +35,15 @@ pub(crate) struct QueryPk { /// The return type. pub(crate) ty: stmt::Type, + + /// Maximum number of items to return. + pub(crate) limit: Option, + + /// Sort key ordering direction. + pub(crate) order: Option, + + /// Cursor for resuming a paginated query. + pub(crate) cursor: Option, } impl QueryPk { @@ -78,6 +87,9 @@ impl QueryPk { columns, pk_filter: self.pk_filter.clone(), row_filter: self.row_filter.clone(), + limit: self.limit, + order: self.order, + cursor: self.cursor.clone(), } } } diff --git a/crates/toasty/src/engine/plan/statement.rs b/crates/toasty/src/engine/plan/statement.rs index fcb8ff1a1..7fcb117e5 100644 --- a/crates/toasty/src/engine/plan/statement.rs +++ b/crates/toasty/src/engine/plan/statement.rs @@ -1004,6 +1004,9 @@ impl<'a, 'b> PlanStatement<'a, 'b> { }; if stmt.is_query() { + // Extract pagination fields from the query statement. + let (limit, order, cursor) = Self::extract_query_pk_pagination(&stmt); + // For queries, stream all matching records with the requested columns. self.insert_mir_with_deps(mir::QueryPk { input, @@ -1013,6 +1016,9 @@ impl<'a, 'b> PlanStatement<'a, 'b> { pk_filter: index_plan.index_filter.take(), row_filter: index_plan.result_filter.take(), ty: ty.clone(), + limit, + order, + cursor, }) } else { // For mutations (UPDATE/DELETE) with a partial primary-key filter, @@ -1040,6 +1046,9 @@ impl<'a, 'b> PlanStatement<'a, 'b> { pk_filter: index_plan.index_filter.take(), row_filter: index_plan.result_filter.take(), ty: index_key_ty, + limit: None, + order: None, + cursor: None, }); self.build_key_operation(&stmt, index_plan, query_pk_node, ty) @@ -1047,6 +1056,40 @@ impl<'a, 'b> PlanStatement<'a, 'b> { } } + /// Extract pagination parameters (limit, sort direction, cursor) from a + /// query statement for use with `QueryPk` on NoSQL drivers. + fn extract_query_pk_pagination( + stmt: &stmt::Statement, + ) -> (Option, Option, Option) { + let query = match stmt.as_query() { + Some(q) => q, + None => return (None, None, None), + }; + + let limit = query.limit.as_ref().and_then(|l| match &l.limit { + stmt::Expr::Value(stmt::Value::I64(n)) => Some(*n), + _ => None, + }); + + let order = query.order_by.as_ref().and_then(|ob| { + ob.exprs.first().map(|e| match e.order { + Some(stmt::Direction::Desc) => stmt::Direction::Desc, + _ => stmt::Direction::Asc, + }) + }); + + let cursor = query + .limit + .as_ref() + .and_then(|l| l.offset.as_ref()) + .and_then(|offset| match offset { + stmt::Offset::After(stmt::Expr::Value(v)) => Some(v.clone()), + _ => None, + }); + + (limit, order, cursor) + } + fn plan_secondary_index_execution( &mut self, stmt: stmt::Statement, @@ -1081,6 +1124,9 @@ impl<'a, 'b> PlanStatement<'a, 'b> { pk_filter: index_plan.index_filter.take(), row_filter: index_plan.result_filter.take(), ty: ty.clone(), // Full record type, not just PKs + limit: None, + order: None, + cursor: None, }); }