Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions crates/toasty-core/src/driver/operation/query_pk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,17 @@ pub struct QueryPk {
/// Additional filtering done on the result before returning it to the
/// caller.
pub filter: Option<stmt::Expr>,

/// Maximum number of items to return. `None` means no limit.
pub limit: Option<i64>,

/// Sort key ordering direction for queries on a table with a composite
/// primary key. `None` uses the driver's default ordering.
pub order: Option<stmt::Direction>,

/// Cursor for resuming a paginated query. Contains the serialized key of
/// the last item from a previous page of results.
pub cursor: Option<stmt::Value>,
}

impl From<QueryPk> for Operation {
Expand Down
39 changes: 33 additions & 6 deletions crates/toasty-driver-dynamodb/src/op/query_pk.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::{
ddb_expression, item_to_record, operation, stmt, Connection, ExprAttrs, Result, Schema,
ddb_expression, ddb_key, item_to_record, operation, stmt, Connection, ExprAttrs, Result, Schema,
};
use std::sync::Arc;
use toasty_core::{driver::Response, stmt::ExprContext};
Expand Down Expand Up @@ -29,7 +29,6 @@ impl Connection {
let index = schema.db.index(index_id);

if index.unique {
// assert!(!index.unique, "Index needs all fields");
use toasty_core::Error;
let err = Error::from_args(format_args!(
"Unique index {} doesn't have fields.",
Expand All @@ -38,27 +37,55 @@ impl Connection {
Err(err)
} else {
tracing::trace!(table_name = %table.name, index_name = %index.name, "querying secondary index");
self.client
let mut query = self
.client
.query()
.table_name(&table.name)
.index_name(&index.name)
.key_condition_expression(key_expression)
.set_filter_expression(filter_expression)
.set_expression_attribute_names(Some(expr_attrs.attr_names))
.set_expression_attribute_values(Some(expr_attrs.attr_values))
.set_expression_attribute_values(Some(expr_attrs.attr_values));

// Apply pagination parameters when present.
if let Some(limit) = op.limit {
query = query.limit(limit as i32);
}
if let Some(ref direction) = op.order {
query = query.scan_index_forward(*direction == stmt::Direction::Asc);
}
if let Some(ref start_key) = op.cursor {
query = query.set_exclusive_start_key(Some(ddb_key(table, start_key)));
}

query
.send()
.await
.map_err(toasty_core::Error::driver_operation_failed)
}
} else {
tracing::trace!(table_name = %table.name, "querying primary key");
self.client
let mut query = self
.client
.query()
.table_name(&table.name)
.key_condition_expression(key_expression)
.set_filter_expression(filter_expression)
.set_expression_attribute_names(Some(expr_attrs.attr_names))
.set_expression_attribute_values(Some(expr_attrs.attr_values))
.set_expression_attribute_values(Some(expr_attrs.attr_values));

// Apply pagination parameters when present.
if let Some(limit) = op.limit {
query = query.limit(limit as i32);
}
if let Some(ref direction) = op.order {
query = query.scan_index_forward(*direction == stmt::Direction::Asc);
}
if let Some(ref start_key) = op.cursor {
query = query.set_exclusive_start_key(Some(ddb_key(table, start_key)));
}

query
.send()
.await
.map_err(toasty_core::Error::driver_operation_failed)
Expand Down
1 change: 1 addition & 0 deletions crates/toasty-driver-integration-suite/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ pub mod belongs_to_configured;
pub mod belongs_to_one_way;
pub mod belongs_to_self_referential;
pub mod bigdecimal;
pub mod composite_key_pagination;
pub mod connection_per_clone;
pub mod create_macro;
pub mod decimal;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
//! Test pagination on composite-key models.
//!
//! These tests exercise `paginate()`, `limit()`, and `order_by()` on models
//! with a partition + local key, which is the pattern DynamoDB uses for
//! `QueryPk`. They intentionally have **no** `requires(sql)` gate so they run
//! on every driver, including DynamoDB.

use crate::prelude::*;
use toasty::Page;
use toasty_core::driver::{Operation, Rows};

#[driver_test]
pub async fn paginate_composite_key(test: &mut Test) -> Result<()> {
#[derive(Debug, toasty::Model)]
#[key(partition = kind, local = seq)]
struct Event {
kind: String,
seq: i64,
}

let mut db = test.setup_db(models!(Event)).await;

// Seed 20 events under the same partition key so we can paginate over them.
for i in 0..20 {
Event::create().kind("info").seq(i).exec(&mut db).await?;
}

test.log().clear();

// First page (descending): should return seq 19..10
let page: Page<_> = Event::filter_by_kind("info")
.order_by(Event::fields().seq().desc())
.paginate(10)
.collect(&mut db)
.await?;

assert_eq!(page.len(), 10);
for (i, expected) in (10..20).rev().enumerate() {
assert_eq!(page[i].seq, expected);
}

// Verify the driver operation type
let (op, resp) = test.log().pop();
if test.capability().sql {
assert_struct!(op, Operation::QuerySql(_));
} else {
assert_struct!(op, Operation::QueryPk(_));
}
assert_struct!(resp.rows, Rows::Stream(_));

// Second page via .next()
let page: Page<_> = page.next(&mut db).await?.unwrap();
assert_eq!(page.len(), 10);
for (i, expected) in (0..10).rev().enumerate() {
assert_eq!(page[i].seq, expected);
}

// Go back to the first page via .prev()
let page: Page<_> = page.prev(&mut db).await?.unwrap();
assert_eq!(page.len(), 10);
for (i, expected) in (10..20).rev().enumerate() {
assert_eq!(page[i].seq, expected);
}

Ok(())
}

#[driver_test]
pub async fn paginate_composite_key_asc(test: &mut Test) -> Result<()> {
#[derive(Debug, toasty::Model)]
#[key(partition = kind, local = seq)]
struct Event {
kind: String,
seq: i64,
}

let mut db = test.setup_db(models!(Event)).await;

for i in 0..20 {
Event::create().kind("info").seq(i).exec(&mut db).await?;
}

test.log().clear();

// First page (ascending): should return seq 0..9
let page: Page<_> = Event::filter_by_kind("info")
.order_by(Event::fields().seq().asc())
.paginate(5)
.collect(&mut db)
.await?;

assert_eq!(page.len(), 5);
for (i, expected) in (0..5).enumerate() {
assert_eq!(page[i].seq, expected);
}

let (op, _) = test.log().pop();
if test.capability().sql {
assert_struct!(op, Operation::QuerySql(_));
} else {
assert_struct!(op, Operation::QueryPk(_));
}

// Walk forward through all pages and collect every seq value.
let mut all_seqs: Vec<i64> = page.iter().map(|e| e.seq).collect();
let mut current = page;
while let Some(next) = current.next(&mut db).await? {
all_seqs.extend(next.iter().map(|e| e.seq));
current = next;
}

assert_eq!(all_seqs, (0..20).collect::<Vec<_>>());

Ok(())
}

#[driver_test]
pub async fn limit_composite_key(test: &mut Test) -> Result<()> {
#[derive(Debug, toasty::Model)]
#[key(partition = kind, local = seq)]
struct Event {
kind: String,
seq: i64,
}

let mut db = test.setup_db(models!(Event)).await;

for i in 0..20 {
Event::create().kind("info").seq(i).exec(&mut db).await?;
}

test.log().clear();

// Limit without explicit ordering
let events: Vec<_> = Event::filter_by_kind("info")
.limit(7)
.collect(&mut db)
.await?;
assert_eq!(events.len(), 7);

let (op, _) = test.log().pop();
if test.capability().sql {
assert_struct!(op, Operation::QuerySql(_));
} else {
assert_struct!(op, Operation::QueryPk(_));
}

test.log().clear();

// Limit combined with descending order
let events: Vec<_> = Event::filter_by_kind("info")
.order_by(Event::fields().seq().desc())
.limit(5)
.collect(&mut db)
.await?;
assert_eq!(events.len(), 5);
for i in 0..4 {
assert!(events[i].seq > events[i + 1].seq);
}
// The first item should be the highest seq
assert_eq!(events[0].seq, 19);

test.log().clear();

// Limit larger than result set returns all results
let events: Vec<_> = Event::filter_by_kind("info")
.limit(100)
.collect(&mut db)
.await?;
assert_eq!(events.len(), 20);

Ok(())
}

#[driver_test]
pub async fn sort_composite_key(test: &mut Test) -> Result<()> {
#[derive(Debug, toasty::Model)]
#[key(partition = kind, local = seq)]
struct Event {
kind: String,
seq: i64,
}

let mut db = test.setup_db(models!(Event)).await;

for i in 0..20 {
Event::create().kind("info").seq(i).exec(&mut db).await?;
}

test.log().clear();

// Ascending sort
let events: Vec<_> = Event::filter_by_kind("info")
.order_by(Event::fields().seq().asc())
.collect(&mut db)
.await?;

assert_eq!(events.len(), 20);
for i in 0..19 {
assert!(events[i].seq < events[i + 1].seq);
}

let (op, resp) = test.log().pop();
if test.capability().sql {
assert_struct!(op, Operation::QuerySql(_));
} else {
assert_struct!(op, Operation::QueryPk(_));
}
assert_struct!(resp.rows, Rows::Stream(_));

test.log().clear();

// Descending sort
let events: Vec<_> = Event::filter_by_kind("info")
.order_by(Event::fields().seq().desc())
.collect(&mut db)
.await?;

assert_eq!(events.len(), 20);
for i in 0..19 {
assert!(events[i].seq > events[i + 1].seq);
}

let (op, resp) = test.log().pop();
if test.capability().sql {
assert_struct!(op, Operation::QuerySql(_));
} else {
assert_struct!(op, Operation::QueryPk(_));
}
assert_struct!(resp.rows, Rows::Stream(_));

Ok(())
}
Loading
Loading