diff --git a/docs/how-to-guides/query-api.md b/docs/how-to-guides/query-api.md index c00ac7750..10e81e39a 100644 --- a/docs/how-to-guides/query-api.md +++ b/docs/how-to-guides/query-api.md @@ -232,3 +232,24 @@ The Logfire API supports various response formats and query parameters to give y - **`row_oriented`**: Only affects JSON responses. If set to `true`, the JSON response will be row-oriented; otherwise, it will be column-oriented. All query parameters besides `sql` are optional and can be used in any combination to tailor the API response to your needs. + +### Pagination + +The Logfire API limits query results to 10,000 rows per request. To download larger datasets, use the built-in pagination support via `iter_paginated_records()`: + +```python skip-run="true" skip-reason="external-connection" +from logfire.query_client import LogfireQueryClient + +with LogfireQueryClient(read_token='') as client: + all_rows = [] + for rows, next_cursor in client.iter_paginated_records( + select='*', + where="level >= 'error'", + page_size=1000, + ): + all_rows.extend(rows) + if next_cursor is None: + break +``` + +The method uses cursor-based pagination with `(start_timestamp, trace_id, span_id)` by default. When paginating over recent data where new rows may be inserted during the process, set `use_created_at=True` to use `(created_at, trace_id, span_id, kind)` instead, which avoids missing or duplicating rows. diff --git a/logfire-api/logfire_api/experimental/query_client.pyi b/logfire-api/logfire_api/experimental/query_client.pyi index a5ccb57ea..1170ceb18 100644 --- a/logfire-api/logfire_api/experimental/query_client.pyi +++ b/logfire-api/logfire_api/experimental/query_client.pyi @@ -40,6 +40,17 @@ class RowQueryResults(TypedDict): """The row-oriented results of a JSON-format query.""" columns: list[ColumnDetails] rows: list[dict[str, Any]] + +class PaginationCursor(TypedDict, total=False): + """Cursor for pagination through query results.""" + start_timestamp: str + trace_id: str + span_id: str + created_at: str + kind: str + +MAX_QUERY_LIMIT: int + T = TypeVar('T', bound=BaseClient) class _BaseLogfireQueryClient(Generic[T]): @@ -74,6 +85,7 @@ class LogfireQueryClient(_BaseLogfireQueryClient[Client]): Use `polars.read_csv(StringIO(result))` to convert the returned CSV to a polars DataFrame. """ + def iter_paginated_records(self, select: str = '*', where: str | None = None, page_size: int = ..., cursor: PaginationCursor | None = None, use_created_at: bool = False) -> Incomplete: ... class AsyncLogfireQueryClient(_BaseLogfireQueryClient[AsyncClient]): """An asynchronous client for querying Logfire data.""" @@ -98,3 +110,4 @@ class AsyncLogfireQueryClient(_BaseLogfireQueryClient[AsyncClient]): Use `polars.read_csv(StringIO(result))` to convert the returned CSV to a polars DataFrame. """ + async def iter_paginated_records(self, select: str = '*', where: str | None = None, page_size: int = ..., cursor: PaginationCursor | None = None, use_created_at: bool = False) -> Incomplete: ... diff --git a/logfire/experimental/query_client.py b/logfire/experimental/query_client.py index 90eb06ee4..25d6f40de 100644 --- a/logfire/experimental/query_client.py +++ b/logfire/experimental/query_client.py @@ -1,8 +1,9 @@ from __future__ import annotations +from collections.abc import AsyncIterator, Iterator from datetime import datetime from types import TracebackType -from typing import TYPE_CHECKING, Any, Generic, Literal, TypedDict, TypeVar +from typing import TYPE_CHECKING, Any, Generic, Literal, TypedDict, TypeVar, cast from typing_extensions import Self @@ -19,6 +20,8 @@ DEFAULT_TIMEOUT = Timeout(30.0) # queries might typically be slower than the 5s default from AsyncClient +MAX_QUERY_LIMIT = 10_000 + class QueryExecutionError(RuntimeError): """Raised when the query execution fails on the server.""" @@ -72,6 +75,20 @@ class RowQueryResults(TypedDict): rows: list[dict[str, Any]] +class PaginationCursor(TypedDict, total=False): + """Cursor for pagination through query results. + + For records with use_created_at=False: start_timestamp, trace_id, span_id. + For records with use_created_at=True: created_at, trace_id, span_id, kind. + """ + + start_timestamp: str + trace_id: str + span_id: str + created_at: str + kind: str + + T = TypeVar('T', bound=BaseClient) @@ -111,6 +128,56 @@ def handle_response_errors(self, response: Response) -> None: assert response.status_code == 200, response.content +def _build_paginated_records_sql( + select: str = '*', + where: str | None = None, + page_size: int = MAX_QUERY_LIMIT, + cursor: PaginationCursor | None = None, + use_created_at: bool = False, + table: str = 'records', +) -> str: + """Build SQL for paginated records query.""" + if use_created_at: + table = 'records_all' + order_cols = 'created_at, trace_id, span_id, kind' + cursor_cols = ['created_at', 'trace_id', 'span_id', 'kind'] + cursor_keys = ['created_at', 'trace_id', 'span_id', 'kind'] + else: + order_cols = 'start_timestamp, trace_id, span_id' + cursor_cols = ['start_timestamp', 'trace_id', 'span_id'] + cursor_keys = ['start_timestamp', 'trace_id', 'span_id'] + + parts = [f'SELECT {select} FROM {table}'] + + if where: + parts.append(f'WHERE {where}') + if cursor: + cursor_vals = [cursor.get(k) for k in cursor_keys] + if all(v is not None for v in cursor_vals): + placeholders = ', '.join(f"'{str(v).replace(chr(39), chr(39) + chr(39))}'" for v in cursor_vals) + parts.append(f'AND ({", ".join(cursor_cols)}) > ({placeholders})') + elif cursor: + cursor_vals = [cursor.get(k) for k in cursor_keys] + if all(v is not None for v in cursor_vals): + placeholders = ', '.join(f"'{str(v).replace(chr(39), chr(39) + chr(39))}'" for v in cursor_vals) + parts.append(f'WHERE ({", ".join(cursor_cols)}) > ({placeholders})') + + parts.append(f'ORDER BY {order_cols}') + parts.append(f'LIMIT {page_size}') + return ' '.join(parts) + + +def _extract_cursor_from_row(row: dict[str, Any], use_created_at: bool = False) -> PaginationCursor | None: + """Extract pagination cursor from the last row.""" + if use_created_at: + keys = ['created_at', 'trace_id', 'span_id', 'kind'] + else: + keys = ['start_timestamp', 'trace_id', 'span_id'] + if all(k in row and row[k] is not None for k in keys): + return cast(PaginationCursor, {k: str(row[k]) for k in keys}) + return None + + class LogfireQueryClient(_BaseLogfireQueryClient[Client]): """A synchronous client for querying Logfire data.""" @@ -237,6 +304,55 @@ def query_csv( ) return response.text + def iter_paginated_records( + self, + select: str = '*', + where: str | None = None, + page_size: int = MAX_QUERY_LIMIT, + cursor: PaginationCursor | None = None, + use_created_at: bool = False, + ) -> Iterator[tuple[list[dict[str, Any]], PaginationCursor | None]]: + """Iterate over records in pages, yielding (rows, next_cursor) for each page. + + Uses cursor-based pagination to retrieve more than the 10,000 row API limit. + The cursor is derived from (start_timestamp, trace_id, span_id) or, when + use_created_at=True, from (created_at, trace_id, span_id, kind). + + Use use_created_at=True when paginating over recent data where new rows may + be inserted during pagination. Otherwise use start_timestamp-based pagination. + + Args: + select: SQL columns to select. Must include cursor columns for pagination + to continue: start_timestamp, trace_id, span_id (or created_at, kind + when use_created_at=True). Use '*' to select all. + where: Optional WHERE clause (without the leading WHERE keyword). + page_size: Number of rows per page (max 10,000). + cursor: Cursor from previous page to continue from. + use_created_at: Use created_at for cursor when new data may be inserted. + + Yields: + Tuples of (rows, next_cursor). next_cursor is None when no more pages. + """ + page_size = min(page_size, MAX_QUERY_LIMIT) + while True: + sql = _build_paginated_records_sql( + select=select, + where=where, + page_size=page_size, + cursor=cursor, + use_created_at=use_created_at, + ) + result = self.query_json_rows(sql=sql) + rows = result.get('rows', []) + if not rows: + yield rows, None + return + next_cursor = _extract_cursor_from_row(rows[-1], use_created_at=use_created_at) + yield rows, next_cursor + if next_cursor is None or len(rows) < page_size: + return + cursor = next_cursor + def _query( self, accept: Literal['application/json', 'application/vnd.apache.arrow.stream', 'text/csv'], @@ -378,6 +494,55 @@ async def query_csv( ) return response.text + async def iter_paginated_records( + self, + select: str = '*', + where: str | None = None, + page_size: int = MAX_QUERY_LIMIT, + cursor: PaginationCursor | None = None, + use_created_at: bool = False, + ) -> AsyncIterator[tuple[list[dict[str, Any]], PaginationCursor | None]]: + """Iterate over records in pages, yielding (rows, next_cursor) for each page. + + Uses cursor-based pagination to retrieve more than the 10,000 row API limit. + The cursor is derived from (start_timestamp, trace_id, span_id) or, when + use_created_at=True, from (created_at, trace_id, span_id, kind). + + Use use_created_at=True when paginating over recent data where new rows may + be inserted during pagination. Otherwise use start_timestamp-based pagination. + + Args: + select: SQL columns to select. Must include cursor columns for pagination + to continue: start_timestamp, trace_id, span_id (or created_at, kind + when use_created_at=True). Use '*' to select all. + where: Optional WHERE clause (without the leading WHERE keyword). + page_size: Number of rows per page (max 10,000). + cursor: Cursor from previous page to continue from. + use_created_at: Use created_at for cursor when new data may be inserted. + + Yields: + Tuples of (rows, next_cursor). next_cursor is None when no more pages. + """ + page_size = min(page_size, MAX_QUERY_LIMIT) + while True: + sql = _build_paginated_records_sql( + select=select, + where=where, + page_size=page_size, + cursor=cursor, + use_created_at=use_created_at, + ) + result = await self.query_json_rows(sql=sql) + rows = result.get('rows', []) + if not rows: + yield rows, None + return + next_cursor = _extract_cursor_from_row(rows[-1], use_created_at=use_created_at) + yield rows, next_cursor + if next_cursor is None or len(rows) < page_size: + return + cursor = next_cursor + async def _query( self, accept: Literal['application/json', 'application/vnd.apache.arrow.stream', 'text/csv'], diff --git a/tests/cassettes/test_query_client/test_iter_paginated_records_async.yaml b/tests/cassettes/test_query_client/test_iter_paginated_records_async.yaml new file mode 100644 index 000000000..8ddb68151 --- /dev/null +++ b/tests/cassettes/test_query_client/test_iter_paginated_records_async.yaml @@ -0,0 +1,78 @@ +interactions: +- request: + body: '' + headers: + accept: + - application/json + accept-encoding: + - gzip, deflate + connection: + - keep-alive + host: + - localhost:8000 + user-agent: + - python-httpx/0.27.2 + method: GET + uri: http://localhost:8000/v1/query?sql=SELECT+kind%2C+message%2C+start_timestamp%2C+trace_id%2C+span_id+FROM+records+ORDER+BY+start_timestamp%2C+trace_id%2C+span_id+LIMIT+5&json_rows=true + response: + body: + string: '{"columns":[{"name":"kind","nullable":false,"datatype":"Utf8"},{"name":"message","nullable":true,"datatype":"Utf8"},{"name":"start_timestamp","nullable":true,"datatype":"Utf8"},{"name":"trace_id","nullable":true,"datatype":"Utf8"},{"name":"span_id","nullable":true,"datatype":"Utf8"}],"rows":[{"kind":"log","message":"row1","start_timestamp":"2024-09-27T15:19:42.000000Z","trace_id":"e1742e366c5ed4c2917b4034042b3550","span_id":"53a7a41b2c077626"},{"kind":"log","message":"row2","start_timestamp":"2024-09-27T15:19:42.100000Z","trace_id":"18164ae14fd72cdba5125727713231b5","span_id":"b84f2ad111165ca"},{"kind":"log","message":"row3","start_timestamp":"2024-09-27T15:19:42.200000Z","trace_id":"18164ae14fd72cdba5125727713231b5","span_id":"c94f2ad111165cb"},{"kind":"log","message":"row4","start_timestamp":"2024-09-27T15:19:42.300000Z","trace_id":"28164ae14fd72cdba5125727713231b5","span_id":"d94f2ad111165cc"},{"kind":"log","message":"row5","start_timestamp":"2024-09-27T15:19:42.400000Z","trace_id":"28164ae14fd72cdba5125727713231b5","span_id":"e94f2ad111165cd"}]}' + headers: + access-control-expose-headers: + - traceresponse + content-length: + - '450' + content-type: + - application/json + date: + - Fri, 27 Sep 2024 15:19:43 GMT + server: + - uvicorn + traceresponse: + - 00-e1742e366c5ed4c2917b4034042b3550-53a7a41b2c077626-01 + x-api-version: + - 1flTOwUHOSSJqmjknvZl4Cphg4j7R1eYX7kz5RICs/4= + x-logfire-context: + - e2e-test/test-e2e-project + status: + code: 200 + message: OK +- request: + body: '' + headers: + accept: + - application/json + accept-encoding: + - gzip, deflate + connection: + - keep-alive + host: + - localhost:8000 + user-agent: + - python-httpx/0.27.2 + method: GET + uri: http://localhost:8000/v1/query?sql=SELECT+kind%2C+message%2C+start_timestamp%2C+trace_id%2C+span_id+FROM+records+WHERE+%28start_timestamp%2C+trace_id%2C+span_id%29+%3E+%28%272024-09-27T15%3A19%3A42.400000Z%27%2C+%2728164ae14fd72cdba5125727713231b5%27%2C+%27e94f2ad111165cd%27%29+ORDER+BY+start_timestamp%2C+trace_id%2C+span_id+LIMIT+5&json_rows=true + response: + body: + string: '{"columns":[{"name":"kind","nullable":false,"datatype":"Utf8"},{"name":"message","nullable":true,"datatype":"Utf8"},{"name":"start_timestamp","nullable":true,"datatype":"Utf8"},{"name":"trace_id","nullable":true,"datatype":"Utf8"},{"name":"span_id","nullable":true,"datatype":"Utf8"}],"rows":[{"kind":"log","message":"row6","start_timestamp":"2024-09-27T15:19:42.500000Z","trace_id":"38164ae14fd72cdba5125727713231b5","span_id":"f94f2ad111165ce"}]}' + headers: + access-control-expose-headers: + - traceresponse + content-length: + - '200' + content-type: + - application/json + date: + - Fri, 27 Sep 2024 15:19:43 GMT + server: + - uvicorn + traceresponse: + - 00-e1742e366c5ed4c2917b4034042b3550-53a7a41b2c077626-01 + x-api-version: + - 1flTOwUHOSSJqmjknvZl4Cphg4j7R1eYX7kz5RICs/4= + x-logfire-context: + - e2e-test/test-e2e-project + status: + code: 200 + message: OK +version: 1 diff --git a/tests/cassettes/test_query_client/test_iter_paginated_records_empty_response_async.yaml b/tests/cassettes/test_query_client/test_iter_paginated_records_empty_response_async.yaml new file mode 100644 index 000000000..38b550d2d --- /dev/null +++ b/tests/cassettes/test_query_client/test_iter_paginated_records_empty_response_async.yaml @@ -0,0 +1,32 @@ +interactions: +- request: + body: '' + headers: + accept: + - application/json + accept-encoding: + - gzip, deflate + connection: + - keep-alive + host: + - localhost:8000 + user-agent: + - python-httpx/0.27.2 + method: GET + uri: http://localhost:8000/v1/query?sql=SELECT+*+FROM+records+WHERE+message+%3D+%27nonexistent%27+ORDER+BY+start_timestamp%2C+trace_id%2C+span_id+LIMIT+5&json_rows=true + response: + body: + string: '{"columns":[{"name":"kind","nullable":false,"datatype":"Utf8"},{"name":"message","nullable":true,"datatype":"Utf8"}],"rows":[]}' + headers: + access-control-expose-headers: + - traceresponse + content-type: + - application/json + server: + - uvicorn + x-logfire-context: + - e2e-test/test-e2e-project + status: + code: 200 + message: OK +version: 1 diff --git a/tests/cassettes/test_query_client/test_iter_paginated_records_empty_response_sync.yaml b/tests/cassettes/test_query_client/test_iter_paginated_records_empty_response_sync.yaml new file mode 100644 index 000000000..38b550d2d --- /dev/null +++ b/tests/cassettes/test_query_client/test_iter_paginated_records_empty_response_sync.yaml @@ -0,0 +1,32 @@ +interactions: +- request: + body: '' + headers: + accept: + - application/json + accept-encoding: + - gzip, deflate + connection: + - keep-alive + host: + - localhost:8000 + user-agent: + - python-httpx/0.27.2 + method: GET + uri: http://localhost:8000/v1/query?sql=SELECT+*+FROM+records+WHERE+message+%3D+%27nonexistent%27+ORDER+BY+start_timestamp%2C+trace_id%2C+span_id+LIMIT+5&json_rows=true + response: + body: + string: '{"columns":[{"name":"kind","nullable":false,"datatype":"Utf8"},{"name":"message","nullable":true,"datatype":"Utf8"}],"rows":[]}' + headers: + access-control-expose-headers: + - traceresponse + content-type: + - application/json + server: + - uvicorn + x-logfire-context: + - e2e-test/test-e2e-project + status: + code: 200 + message: OK +version: 1 diff --git a/tests/cassettes/test_query_client/test_iter_paginated_records_sync.yaml b/tests/cassettes/test_query_client/test_iter_paginated_records_sync.yaml new file mode 100644 index 000000000..8ddb68151 --- /dev/null +++ b/tests/cassettes/test_query_client/test_iter_paginated_records_sync.yaml @@ -0,0 +1,78 @@ +interactions: +- request: + body: '' + headers: + accept: + - application/json + accept-encoding: + - gzip, deflate + connection: + - keep-alive + host: + - localhost:8000 + user-agent: + - python-httpx/0.27.2 + method: GET + uri: http://localhost:8000/v1/query?sql=SELECT+kind%2C+message%2C+start_timestamp%2C+trace_id%2C+span_id+FROM+records+ORDER+BY+start_timestamp%2C+trace_id%2C+span_id+LIMIT+5&json_rows=true + response: + body: + string: '{"columns":[{"name":"kind","nullable":false,"datatype":"Utf8"},{"name":"message","nullable":true,"datatype":"Utf8"},{"name":"start_timestamp","nullable":true,"datatype":"Utf8"},{"name":"trace_id","nullable":true,"datatype":"Utf8"},{"name":"span_id","nullable":true,"datatype":"Utf8"}],"rows":[{"kind":"log","message":"row1","start_timestamp":"2024-09-27T15:19:42.000000Z","trace_id":"e1742e366c5ed4c2917b4034042b3550","span_id":"53a7a41b2c077626"},{"kind":"log","message":"row2","start_timestamp":"2024-09-27T15:19:42.100000Z","trace_id":"18164ae14fd72cdba5125727713231b5","span_id":"b84f2ad111165ca"},{"kind":"log","message":"row3","start_timestamp":"2024-09-27T15:19:42.200000Z","trace_id":"18164ae14fd72cdba5125727713231b5","span_id":"c94f2ad111165cb"},{"kind":"log","message":"row4","start_timestamp":"2024-09-27T15:19:42.300000Z","trace_id":"28164ae14fd72cdba5125727713231b5","span_id":"d94f2ad111165cc"},{"kind":"log","message":"row5","start_timestamp":"2024-09-27T15:19:42.400000Z","trace_id":"28164ae14fd72cdba5125727713231b5","span_id":"e94f2ad111165cd"}]}' + headers: + access-control-expose-headers: + - traceresponse + content-length: + - '450' + content-type: + - application/json + date: + - Fri, 27 Sep 2024 15:19:43 GMT + server: + - uvicorn + traceresponse: + - 00-e1742e366c5ed4c2917b4034042b3550-53a7a41b2c077626-01 + x-api-version: + - 1flTOwUHOSSJqmjknvZl4Cphg4j7R1eYX7kz5RICs/4= + x-logfire-context: + - e2e-test/test-e2e-project + status: + code: 200 + message: OK +- request: + body: '' + headers: + accept: + - application/json + accept-encoding: + - gzip, deflate + connection: + - keep-alive + host: + - localhost:8000 + user-agent: + - python-httpx/0.27.2 + method: GET + uri: http://localhost:8000/v1/query?sql=SELECT+kind%2C+message%2C+start_timestamp%2C+trace_id%2C+span_id+FROM+records+WHERE+%28start_timestamp%2C+trace_id%2C+span_id%29+%3E+%28%272024-09-27T15%3A19%3A42.400000Z%27%2C+%2728164ae14fd72cdba5125727713231b5%27%2C+%27e94f2ad111165cd%27%29+ORDER+BY+start_timestamp%2C+trace_id%2C+span_id+LIMIT+5&json_rows=true + response: + body: + string: '{"columns":[{"name":"kind","nullable":false,"datatype":"Utf8"},{"name":"message","nullable":true,"datatype":"Utf8"},{"name":"start_timestamp","nullable":true,"datatype":"Utf8"},{"name":"trace_id","nullable":true,"datatype":"Utf8"},{"name":"span_id","nullable":true,"datatype":"Utf8"}],"rows":[{"kind":"log","message":"row6","start_timestamp":"2024-09-27T15:19:42.500000Z","trace_id":"38164ae14fd72cdba5125727713231b5","span_id":"f94f2ad111165ce"}]}' + headers: + access-control-expose-headers: + - traceresponse + content-length: + - '200' + content-type: + - application/json + date: + - Fri, 27 Sep 2024 15:19:43 GMT + server: + - uvicorn + traceresponse: + - 00-e1742e366c5ed4c2917b4034042b3550-53a7a41b2c077626-01 + x-api-version: + - 1flTOwUHOSSJqmjknvZl4Cphg4j7R1eYX7kz5RICs/4= + x-logfire-context: + - e2e-test/test-e2e-project + status: + code: 200 + message: OK +version: 1 diff --git a/tests/test_query_client.py b/tests/test_query_client.py index 204fc87fb..7ddd8ae7d 100644 --- a/tests/test_query_client.py +++ b/tests/test_query_client.py @@ -2,11 +2,20 @@ import sys from datetime import datetime, timezone +from typing import Any import pytest from inline_snapshot import snapshot -from logfire.query_client import AsyncLogfireQueryClient, LogfireQueryClient +from logfire.experimental.query_client import ( + _build_paginated_records_sql, + _extract_cursor_from_row, +) +from logfire.query_client import ( + AsyncLogfireQueryClient, + LogfireQueryClient, + PaginationCursor, +) # This file is intended to be updated by the Logfire developers, with the development platform running locally. # To update, set the `CLIENT_BASE_URL` and `CLIENT_READ_TOKEN` values to match the local development environment, @@ -312,3 +321,100 @@ async def test_query_params_async(): is_exception,count(*) false,37 """) + + +def test_iter_paginated_records_sync(): + """Test that iter_paginated_records yields pages of records.""" + with LogfireQueryClient(read_token=CLIENT_READ_TOKEN, base_url=CLIENT_BASE_URL) as client: + all_rows: list[dict[str, Any]] = [] + cursor: PaginationCursor | None = None + for rows, next_cursor in client.iter_paginated_records( + select='kind, message, start_timestamp, trace_id, span_id', + page_size=5, + ): + all_rows.extend(rows) + cursor = next_cursor + if cursor is None or len(rows) < 5: + break + assert len(all_rows) >= 5 + + +def test_build_paginated_records_sql_use_created_at(): + """Test _build_paginated_records_sql with use_created_at=True uses records_all and created_at.""" + sql = _build_paginated_records_sql(use_created_at=True) + assert 'FROM records_all' in sql + assert 'ORDER BY created_at, trace_id, span_id, kind' in sql + + +def test_build_paginated_records_sql_where_with_cursor(): + """Test _build_paginated_records_sql with where clause and cursor.""" + sql = _build_paginated_records_sql( + where="level >= 'error'", + cursor={'start_timestamp': '2024-01-01T00:00:00Z', 'trace_id': 'abc', 'span_id': 'def'}, + ) + assert 'WHERE level >= ' in sql + assert "AND (start_timestamp, trace_id, span_id) > ('2024-01-01T00:00:00Z'" in sql + + +def test_build_paginated_records_sql_cursor_only(): + """Test _build_paginated_records_sql with cursor and no where clause.""" + sql = _build_paginated_records_sql( + cursor={'start_timestamp': '2024-01-01T00:00:00Z', 'trace_id': 'abc', 'span_id': 'def'}, + ) + assert "WHERE (start_timestamp, trace_id, span_id) > ('2024-01-01T00:00:00Z'" in sql + + +def test_extract_cursor_from_row_use_created_at(): + """Test _extract_cursor_from_row with use_created_at=True.""" + row = { + 'created_at': '2024-01-01T00:00:00Z', + 'trace_id': 'abc', + 'span_id': 'def', + 'kind': 'span', + } + cursor = _extract_cursor_from_row(row, use_created_at=True) + assert cursor == {'created_at': '2024-01-01T00:00:00Z', 'trace_id': 'abc', 'span_id': 'def', 'kind': 'span'} + + +def test_extract_cursor_from_row_returns_none_when_keys_missing(): + """Test _extract_cursor_from_row returns None when required keys are missing.""" + assert _extract_cursor_from_row({'message': 'foo'}) is None + assert _extract_cursor_from_row({'start_timestamp': 'x', 'trace_id': 'y'}) is None + + +def test_iter_paginated_records_empty_response_sync(): + """Test iter_paginated_records handles empty response and yields once.""" + with LogfireQueryClient(read_token=CLIENT_READ_TOKEN, base_url=CLIENT_BASE_URL) as client: + pages = list(client.iter_paginated_records(where="message = 'nonexistent'", page_size=5)) + assert len(pages) == 1 + assert pages[0][0] == [] + assert pages[0][1] is None + + +@pytest.mark.anyio +async def test_iter_paginated_records_empty_response_async(): + """Test async iter_paginated_records handles empty response and yields once.""" + async with AsyncLogfireQueryClient(read_token=CLIENT_READ_TOKEN, base_url=CLIENT_BASE_URL) as client: + pages = [] + async for rows, next_cursor in client.iter_paginated_records(where="message = 'nonexistent'", page_size=5): + pages.append((rows, next_cursor)) + assert len(pages) == 1 + assert pages[0][0] == [] + assert pages[0][1] is None + + +@pytest.mark.anyio +async def test_iter_paginated_records_async(): + """Test that async iter_paginated_records yields pages of records.""" + async with AsyncLogfireQueryClient(read_token=CLIENT_READ_TOKEN, base_url=CLIENT_BASE_URL) as client: + all_rows: list[dict[str, Any]] = [] + cursor: PaginationCursor | None = None + async for rows, next_cursor in client.iter_paginated_records( + select='kind, message, start_timestamp, trace_id, span_id', + page_size=5, + ): + all_rows.extend(rows) + cursor = next_cursor + if cursor is None or len(rows) < 5: + break + assert len(all_rows) >= 5