Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions docs/how-to-guides/query-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -232,3 +232,24 @@ The Logfire API supports various response formats and query parameters to give y
- **`row_oriented`**: Only affects JSON responses. If set to `true`, the JSON response will be row-oriented; otherwise, it will be column-oriented.

All query parameters besides `sql` are optional and can be used in any combination to tailor the API response to your needs.

### Pagination

The Logfire API limits query results to 10,000 rows per request. To download larger datasets, use the built-in pagination support via `iter_paginated_records()`:

```python skip-run="true" skip-reason="external-connection"
from logfire.query_client import LogfireQueryClient

with LogfireQueryClient(read_token='<your_read_token>') as client:
all_rows = []
for rows, next_cursor in client.iter_paginated_records(
select='*',
where="level >= 'error'",
page_size=1000,
):
all_rows.extend(rows)
if next_cursor is None:
break
```

The method uses cursor-based pagination with `(start_timestamp, trace_id, span_id)` by default. When paginating over recent data where new rows may be inserted during the process, set `use_created_at=True` to use `(created_at, trace_id, span_id, kind)` instead, which avoids missing or duplicating rows.
13 changes: 13 additions & 0 deletions logfire-api/logfire_api/experimental/query_client.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,17 @@ class RowQueryResults(TypedDict):
"""The row-oriented results of a JSON-format query."""
columns: list[ColumnDetails]
rows: list[dict[str, Any]]

class PaginationCursor(TypedDict, total=False):
"""Cursor for pagination through query results."""
start_timestamp: str
trace_id: str
span_id: str
created_at: str
kind: str

MAX_QUERY_LIMIT: int

T = TypeVar('T', bound=BaseClient)

class _BaseLogfireQueryClient(Generic[T]):
Expand Down Expand Up @@ -74,6 +85,7 @@ class LogfireQueryClient(_BaseLogfireQueryClient[Client]):

Use `polars.read_csv(StringIO(result))` to convert the returned CSV to a polars DataFrame.
"""
def iter_paginated_records(self, select: str = '*', where: str | None = None, page_size: int = ..., cursor: PaginationCursor | None = None, use_created_at: bool = False) -> Incomplete: ...

class AsyncLogfireQueryClient(_BaseLogfireQueryClient[AsyncClient]):
"""An asynchronous client for querying Logfire data."""
Expand All @@ -98,3 +110,4 @@ class AsyncLogfireQueryClient(_BaseLogfireQueryClient[AsyncClient]):

Use `polars.read_csv(StringIO(result))` to convert the returned CSV to a polars DataFrame.
"""
async def iter_paginated_records(self, select: str = '*', where: str | None = None, page_size: int = ..., cursor: PaginationCursor | None = None, use_created_at: bool = False) -> Incomplete: ...
167 changes: 166 additions & 1 deletion logfire/experimental/query_client.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from __future__ import annotations

from collections.abc import AsyncIterator, Iterator
from datetime import datetime
from types import TracebackType
from typing import TYPE_CHECKING, Any, Generic, Literal, TypedDict, TypeVar
from typing import TYPE_CHECKING, Any, Generic, Literal, TypedDict, TypeVar, cast

from typing_extensions import Self

Expand All @@ -19,6 +20,8 @@

DEFAULT_TIMEOUT = Timeout(30.0) # queries might typically be slower than the 5s default from AsyncClient

MAX_QUERY_LIMIT = 10_000


class QueryExecutionError(RuntimeError):
"""Raised when the query execution fails on the server."""
Expand Down Expand Up @@ -72,6 +75,20 @@ class RowQueryResults(TypedDict):
rows: list[dict[str, Any]]


class PaginationCursor(TypedDict, total=False):
"""Cursor for pagination through query results.

For records with use_created_at=False: start_timestamp, trace_id, span_id.
For records with use_created_at=True: created_at, trace_id, span_id, kind.
"""

start_timestamp: str
trace_id: str
span_id: str
created_at: str
kind: str


T = TypeVar('T', bound=BaseClient)


Expand Down Expand Up @@ -111,6 +128,56 @@ def handle_response_errors(self, response: Response) -> None:
assert response.status_code == 200, response.content


def _build_paginated_records_sql(
select: str = '*',
where: str | None = None,
page_size: int = MAX_QUERY_LIMIT,
cursor: PaginationCursor | None = None,
use_created_at: bool = False,
table: str = 'records',
) -> str:
"""Build SQL for paginated records query."""
if use_created_at:
table = 'records_all'
order_cols = 'created_at, trace_id, span_id, kind'
cursor_cols = ['created_at', 'trace_id', 'span_id', 'kind']
cursor_keys = ['created_at', 'trace_id', 'span_id', 'kind']
else:
order_cols = 'start_timestamp, trace_id, span_id'
cursor_cols = ['start_timestamp', 'trace_id', 'span_id']
cursor_keys = ['start_timestamp', 'trace_id', 'span_id']

parts = [f'SELECT {select} FROM {table}']

if where:
parts.append(f'WHERE {where}')
if cursor:
cursor_vals = [cursor.get(k) for k in cursor_keys]
if all(v is not None for v in cursor_vals):
placeholders = ', '.join(f"'{str(v).replace(chr(39), chr(39) + chr(39))}'" for v in cursor_vals)
parts.append(f'AND ({", ".join(cursor_cols)}) > ({placeholders})')
elif cursor:
cursor_vals = [cursor.get(k) for k in cursor_keys]
if all(v is not None for v in cursor_vals):
placeholders = ', '.join(f"'{str(v).replace(chr(39), chr(39) + chr(39))}'" for v in cursor_vals)
parts.append(f'WHERE ({", ".join(cursor_cols)}) > ({placeholders})')

parts.append(f'ORDER BY {order_cols}')
parts.append(f'LIMIT {page_size}')
return ' '.join(parts)


def _extract_cursor_from_row(row: dict[str, Any], use_created_at: bool = False) -> PaginationCursor | None:
"""Extract pagination cursor from the last row."""
if use_created_at:
keys = ['created_at', 'trace_id', 'span_id', 'kind']
else:
keys = ['start_timestamp', 'trace_id', 'span_id']
if all(k in row and row[k] is not None for k in keys):
return cast(PaginationCursor, {k: str(row[k]) for k in keys})
return None


class LogfireQueryClient(_BaseLogfireQueryClient[Client]):
"""A synchronous client for querying Logfire data."""

Expand Down Expand Up @@ -237,6 +304,55 @@ def query_csv(
)
return response.text

def iter_paginated_records(
self,
select: str = '*',
where: str | None = None,
page_size: int = MAX_QUERY_LIMIT,
cursor: PaginationCursor | None = None,
use_created_at: bool = False,
) -> Iterator[tuple[list[dict[str, Any]], PaginationCursor | None]]:
"""Iterate over records in pages, yielding (rows, next_cursor) for each page.

Uses cursor-based pagination to retrieve more than the 10,000 row API limit.
The cursor is derived from (start_timestamp, trace_id, span_id) or, when
use_created_at=True, from (created_at, trace_id, span_id, kind).

Use use_created_at=True when paginating over recent data where new rows may
be inserted during pagination. Otherwise use start_timestamp-based pagination.

Args:
select: SQL columns to select. Must include cursor columns for pagination
to continue: start_timestamp, trace_id, span_id (or created_at, kind
when use_created_at=True). Use '*' to select all.
where: Optional WHERE clause (without the leading WHERE keyword).
page_size: Number of rows per page (max 10,000).
cursor: Cursor from previous page to continue from.
use_created_at: Use created_at for cursor when new data may be inserted.

Yields:
Tuples of (rows, next_cursor). next_cursor is None when no more pages.
"""
page_size = min(page_size, MAX_QUERY_LIMIT)
while True:
sql = _build_paginated_records_sql(
select=select,
where=where,
page_size=page_size,
cursor=cursor,
use_created_at=use_created_at,
)
result = self.query_json_rows(sql=sql)
rows = result.get('rows', [])
if not rows:
yield rows, None
return
next_cursor = _extract_cursor_from_row(rows[-1], use_created_at=use_created_at)
yield rows, next_cursor
if next_cursor is None or len(rows) < page_size:
return
cursor = next_cursor

def _query(
self,
accept: Literal['application/json', 'application/vnd.apache.arrow.stream', 'text/csv'],
Expand Down Expand Up @@ -378,6 +494,55 @@ async def query_csv(
)
return response.text

async def iter_paginated_records(
self,
select: str = '*',
where: str | None = None,
page_size: int = MAX_QUERY_LIMIT,
cursor: PaginationCursor | None = None,
use_created_at: bool = False,
) -> AsyncIterator[tuple[list[dict[str, Any]], PaginationCursor | None]]:
"""Iterate over records in pages, yielding (rows, next_cursor) for each page.

Uses cursor-based pagination to retrieve more than the 10,000 row API limit.
The cursor is derived from (start_timestamp, trace_id, span_id) or, when
use_created_at=True, from (created_at, trace_id, span_id, kind).

Use use_created_at=True when paginating over recent data where new rows may
be inserted during pagination. Otherwise use start_timestamp-based pagination.

Args:
select: SQL columns to select. Must include cursor columns for pagination
to continue: start_timestamp, trace_id, span_id (or created_at, kind
when use_created_at=True). Use '*' to select all.
where: Optional WHERE clause (without the leading WHERE keyword).
page_size: Number of rows per page (max 10,000).
cursor: Cursor from previous page to continue from.
use_created_at: Use created_at for cursor when new data may be inserted.

Yields:
Tuples of (rows, next_cursor). next_cursor is None when no more pages.
"""
page_size = min(page_size, MAX_QUERY_LIMIT)
while True:
sql = _build_paginated_records_sql(
select=select,
where=where,
page_size=page_size,
cursor=cursor,
use_created_at=use_created_at,
)
result = await self.query_json_rows(sql=sql)
rows = result.get('rows', [])
if not rows:
yield rows, None
return
next_cursor = _extract_cursor_from_row(rows[-1], use_created_at=use_created_at)
yield rows, next_cursor
if next_cursor is None or len(rows) < page_size:
return
cursor = next_cursor

async def _query(
self,
accept: Literal['application/json', 'application/vnd.apache.arrow.stream', 'text/csv'],
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
interactions:
- request:
body: ''
headers:
accept:
- application/json
accept-encoding:
- gzip, deflate
connection:
- keep-alive
host:
- localhost:8000
user-agent:
- python-httpx/0.27.2
method: GET
uri: http://localhost:8000/v1/query?sql=SELECT+kind%2C+message%2C+start_timestamp%2C+trace_id%2C+span_id+FROM+records+ORDER+BY+start_timestamp%2C+trace_id%2C+span_id+LIMIT+5&json_rows=true
response:
body:
string: '{"columns":[{"name":"kind","nullable":false,"datatype":"Utf8"},{"name":"message","nullable":true,"datatype":"Utf8"},{"name":"start_timestamp","nullable":true,"datatype":"Utf8"},{"name":"trace_id","nullable":true,"datatype":"Utf8"},{"name":"span_id","nullable":true,"datatype":"Utf8"}],"rows":[{"kind":"log","message":"row1","start_timestamp":"2024-09-27T15:19:42.000000Z","trace_id":"e1742e366c5ed4c2917b4034042b3550","span_id":"53a7a41b2c077626"},{"kind":"log","message":"row2","start_timestamp":"2024-09-27T15:19:42.100000Z","trace_id":"18164ae14fd72cdba5125727713231b5","span_id":"b84f2ad111165ca"},{"kind":"log","message":"row3","start_timestamp":"2024-09-27T15:19:42.200000Z","trace_id":"18164ae14fd72cdba5125727713231b5","span_id":"c94f2ad111165cb"},{"kind":"log","message":"row4","start_timestamp":"2024-09-27T15:19:42.300000Z","trace_id":"28164ae14fd72cdba5125727713231b5","span_id":"d94f2ad111165cc"},{"kind":"log","message":"row5","start_timestamp":"2024-09-27T15:19:42.400000Z","trace_id":"28164ae14fd72cdba5125727713231b5","span_id":"e94f2ad111165cd"}]}'
headers:
access-control-expose-headers:
- traceresponse
content-length:
- '450'
content-type:
- application/json
date:
- Fri, 27 Sep 2024 15:19:43 GMT
server:
- uvicorn
traceresponse:
- 00-e1742e366c5ed4c2917b4034042b3550-53a7a41b2c077626-01
x-api-version:
- 1flTOwUHOSSJqmjknvZl4Cphg4j7R1eYX7kz5RICs/4=
x-logfire-context:
- e2e-test/test-e2e-project
status:
code: 200
message: OK
- request:
body: ''
headers:
accept:
- application/json
accept-encoding:
- gzip, deflate
connection:
- keep-alive
host:
- localhost:8000
user-agent:
- python-httpx/0.27.2
method: GET
uri: http://localhost:8000/v1/query?sql=SELECT+kind%2C+message%2C+start_timestamp%2C+trace_id%2C+span_id+FROM+records+WHERE+%28start_timestamp%2C+trace_id%2C+span_id%29+%3E+%28%272024-09-27T15%3A19%3A42.400000Z%27%2C+%2728164ae14fd72cdba5125727713231b5%27%2C+%27e94f2ad111165cd%27%29+ORDER+BY+start_timestamp%2C+trace_id%2C+span_id+LIMIT+5&json_rows=true
response:
body:
string: '{"columns":[{"name":"kind","nullable":false,"datatype":"Utf8"},{"name":"message","nullable":true,"datatype":"Utf8"},{"name":"start_timestamp","nullable":true,"datatype":"Utf8"},{"name":"trace_id","nullable":true,"datatype":"Utf8"},{"name":"span_id","nullable":true,"datatype":"Utf8"}],"rows":[{"kind":"log","message":"row6","start_timestamp":"2024-09-27T15:19:42.500000Z","trace_id":"38164ae14fd72cdba5125727713231b5","span_id":"f94f2ad111165ce"}]}'
headers:
access-control-expose-headers:
- traceresponse
content-length:
- '200'
content-type:
- application/json
date:
- Fri, 27 Sep 2024 15:19:43 GMT
server:
- uvicorn
traceresponse:
- 00-e1742e366c5ed4c2917b4034042b3550-53a7a41b2c077626-01
x-api-version:
- 1flTOwUHOSSJqmjknvZl4Cphg4j7R1eYX7kz5RICs/4=
x-logfire-context:
- e2e-test/test-e2e-project
status:
code: 200
message: OK
version: 1
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
interactions:
- request:
body: ''
headers:
accept:
- application/json
accept-encoding:
- gzip, deflate
connection:
- keep-alive
host:
- localhost:8000
user-agent:
- python-httpx/0.27.2
method: GET
uri: http://localhost:8000/v1/query?sql=SELECT+*+FROM+records+WHERE+message+%3D+%27nonexistent%27+ORDER+BY+start_timestamp%2C+trace_id%2C+span_id+LIMIT+5&json_rows=true
response:
body:
string: '{"columns":[{"name":"kind","nullable":false,"datatype":"Utf8"},{"name":"message","nullable":true,"datatype":"Utf8"}],"rows":[]}'
headers:
access-control-expose-headers:
- traceresponse
content-type:
- application/json
server:
- uvicorn
x-logfire-context:
- e2e-test/test-e2e-project
status:
code: 200
message: OK
version: 1
Loading
Loading