Skip to content
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
4cf7e5a
ref(anthropic): Factor out streamed result handling
alexander-alderman-webb Feb 27, 2026
61a4cd0
ref(anthropic): Skip accumulation logic for unexpected types in strea…
alexander-alderman-webb Feb 27, 2026
6cdab16
.
alexander-alderman-webb Feb 27, 2026
db5dbb9
.
alexander-alderman-webb Feb 27, 2026
091506c
Merge branch 'webb/anthropic/separate-output-handling' into webb/anth…
alexander-alderman-webb Feb 27, 2026
d47b64d
.
alexander-alderman-webb Feb 27, 2026
384351f
remove duplicate import
alexander-alderman-webb Feb 27, 2026
55d5c27
.
alexander-alderman-webb Feb 27, 2026
fd84837
simplify
alexander-alderman-webb Feb 27, 2026
53859e4
.
alexander-alderman-webb Mar 10, 2026
e2d6d78
remove unused import
alexander-alderman-webb Mar 10, 2026
a01f7c1
add docstring
alexander-alderman-webb Mar 10, 2026
7837439
add return
alexander-alderman-webb Mar 10, 2026
0e06f49
remove return statement
alexander-alderman-webb Mar 10, 2026
c396a32
merge
alexander-alderman-webb Mar 10, 2026
d4022c5
Merge branch 'master' into webb/anthropic/only-raw-message-stream-events
alexander-alderman-webb Mar 10, 2026
4e3d1db
Merge branch 'master' into webb/anthropic/separate-output-handling
alexander-alderman-webb Mar 10, 2026
531c6a8
Merge branch 'webb/anthropic/separate-output-handling' into webb/anth…
alexander-alderman-webb Mar 10, 2026
917b8d4
.
alexander-alderman-webb Mar 11, 2026
aa6e58a
.
alexander-alderman-webb Mar 11, 2026
bd00e4e
.
alexander-alderman-webb Mar 11, 2026
a6da66e
merge
alexander-alderman-webb Mar 11, 2026
c988c26
fix type
alexander-alderman-webb Mar 11, 2026
3009ba6
Merge branch 'webb/anthropic/separate-output-handling' into webb/anth…
alexander-alderman-webb Mar 11, 2026
9eff33e
fix(anthropic): Close span on GeneratorExit
alexander-alderman-webb Mar 12, 2026
5e3cb5f
revert conftest
alexander-alderman-webb Mar 12, 2026
ab5e3bb
Merge branch 'master' into webb/anthropic/separate-output-handling
alexander-alderman-webb Mar 12, 2026
571db4e
Merge branch 'webb/anthropic/separate-output-handling' into webb/anth…
alexander-alderman-webb Mar 12, 2026
804db65
Merge branch 'webb/anthropic/only-raw-message-stream-events' into web…
alexander-alderman-webb Mar 12, 2026
2a0e5e9
merge master
alexander-alderman-webb Mar 13, 2026
f36dd5d
keep comment
alexander-alderman-webb Mar 13, 2026
0da2362
wrap finally in capture internal exceptions
alexander-alderman-webb Mar 13, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
235 changes: 149 additions & 86 deletions sentry_sdk/integrations/anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,18 @@
except ImportError:
Omit = None

from anthropic import Stream, AsyncStream
from anthropic.resources import AsyncMessages, Messages

from anthropic.types import (
MessageStartEvent,
MessageDeltaEvent,
MessageStopEvent,
ContentBlockStartEvent,
ContentBlockDeltaEvent,
ContentBlockStopEvent,
)

if TYPE_CHECKING:
from anthropic.types import MessageStreamEvent, TextBlockParam
except ImportError:
Expand All @@ -49,6 +59,8 @@
from sentry_sdk.tracing import Span
from sentry_sdk._types import TextPart

from anthropic.types import RawMessageStreamEvent


class _RecordedUsage:
output_tokens: int = 0
Expand Down Expand Up @@ -338,6 +350,131 @@
span.set_data(SPANDATA.GEN_AI_REQUEST_AVAILABLE_TOOLS, safe_serialize(tools))


def _wrap_synchronous_message_iterator(
iterator: "Iterator[RawMessageStreamEvent]",
span: "Span",
integration: "AnthropicIntegration",
) -> "Iterator[RawMessageStreamEvent]":
"""
Sets information received while iterating the response stream on the AI Client Span.
Responsible for closing the AI Client Span.
"""

model = None
usage = _RecordedUsage()
content_blocks: "list[str]" = []

try:
for event in iterator:
if not isinstance(
event,
(
MessageStartEvent,
MessageDeltaEvent,
MessageStopEvent,
ContentBlockStartEvent,
ContentBlockDeltaEvent,
ContentBlockStopEvent,
),
):
yield event
continue

(
model,
usage,
content_blocks,
) = _collect_ai_data(
event,
model,
usage,
content_blocks,
)
yield event
finally:
# Anthropic's input_tokens excludes cached/cache_write tokens.
# Normalize to total input tokens for correct cost calculations.
total_input = (
usage.input_tokens
+ (usage.cache_read_input_tokens or 0)
+ (usage.cache_write_input_tokens or 0)
)

_set_output_data(
span=span,
integration=integration,
model=model,
input_tokens=total_input,
output_tokens=usage.output_tokens,
cache_read_input_tokens=usage.cache_read_input_tokens,
cache_write_input_tokens=usage.cache_write_input_tokens,
content_blocks=[{"text": "".join(content_blocks), "type": "text"}],
finish_span=True,
)

Check warning on line 413 in sentry_sdk/integrations/anthropic.py

View workflow job for this annotation

GitHub Actions / warden: code-review

Missing test coverage for GeneratorExit scenario

The PR adds try/finally handling to close spans when GeneratorExit is raised (e.g., when consumers break early from a streaming iterator), but no test verifies this behavior. Without a test that simulates early iterator termination, there's no regression protection for this fix.


async def _wrap_asynchronous_message_iterator(
iterator: "AsyncIterator[RawMessageStreamEvent]",
span: "Span",
integration: "AnthropicIntegration",
) -> "AsyncIterator[RawMessageStreamEvent]":
"""
Sets information received while iterating the response stream on the AI Client Span.
Responsible for closing the AI Client Span.
"""
model = None
usage = _RecordedUsage()
content_blocks: "list[str]" = []

try:
async for event in iterator:
if not isinstance(
event,
(
MessageStartEvent,
MessageDeltaEvent,
MessageStopEvent,
ContentBlockStartEvent,
ContentBlockDeltaEvent,
ContentBlockStopEvent,
),
):
yield event
continue

(
model,
usage,
content_blocks,
) = _collect_ai_data(
event,
model,
usage,
content_blocks,
)
yield event
finally:
# Anthropic's input_tokens excludes cached/cache_write tokens.
# Normalize to total input tokens for correct cost calculations.
total_input = (
usage.input_tokens
+ (usage.cache_read_input_tokens or 0)
+ (usage.cache_write_input_tokens or 0)
)

_set_output_data(
span=span,
integration=integration,
model=model,
input_tokens=total_input,
output_tokens=usage.output_tokens,
cache_read_input_tokens=usage.cache_read_input_tokens,
cache_write_input_tokens=usage.cache_write_input_tokens,
content_blocks=[{"text": "".join(content_blocks), "type": "text"}],
finish_span=True,
)


def _set_output_data(
span: "Span",
integration: "AnthropicIntegration",
Expand Down Expand Up @@ -415,6 +552,18 @@

result = yield f, args, kwargs

if isinstance(result, Stream):
result._iterator = _wrap_synchronous_message_iterator(
result._iterator, span, integration
)
return result

if isinstance(result, AsyncStream):
result._iterator = _wrap_asynchronous_message_iterator(
result._iterator, span, integration
)
return result

with capture_internal_exceptions():
if hasattr(result, "content"):
(
Expand Down Expand Up @@ -444,92 +593,6 @@
content_blocks=content_blocks,
finish_span=True,
)

# Streaming response
elif hasattr(result, "_iterator"):
old_iterator = result._iterator

def new_iterator() -> "Iterator[MessageStreamEvent]":
model = None
usage = _RecordedUsage()
content_blocks: "list[str]" = []

for event in old_iterator:
(
model,
usage,
content_blocks,
) = _collect_ai_data(
event,
model,
usage,
content_blocks,
)
yield event

# Anthropic's input_tokens excludes cached/cache_write tokens.
# Normalize to total input tokens for correct cost calculations.
total_input = (
usage.input_tokens
+ (usage.cache_read_input_tokens or 0)
+ (usage.cache_write_input_tokens or 0)
)

_set_output_data(
span=span,
integration=integration,
model=model,
input_tokens=total_input,
output_tokens=usage.output_tokens,
cache_read_input_tokens=usage.cache_read_input_tokens,
cache_write_input_tokens=usage.cache_write_input_tokens,
content_blocks=[{"text": "".join(content_blocks), "type": "text"}],
finish_span=True,
)

async def new_iterator_async() -> "AsyncIterator[MessageStreamEvent]":
model = None
usage = _RecordedUsage()
content_blocks: "list[str]" = []

async for event in old_iterator:
(
model,
usage,
content_blocks,
) = _collect_ai_data(
event,
model,
usage,
content_blocks,
)
yield event

# Anthropic's input_tokens excludes cached/cache_write tokens.
# Normalize to total input tokens for correct cost calculations.
total_input = (
usage.input_tokens
+ (usage.cache_read_input_tokens or 0)
+ (usage.cache_write_input_tokens or 0)
)

_set_output_data(
span=span,
integration=integration,
model=model,
input_tokens=total_input,
output_tokens=usage.output_tokens,
cache_read_input_tokens=usage.cache_read_input_tokens,
cache_write_input_tokens=usage.cache_write_input_tokens,
content_blocks=[{"text": "".join(content_blocks), "type": "text"}],
finish_span=True,
)

if str(type(result._iterator)) == "<class 'async_generator'>":
result._iterator = new_iterator_async()
else:
result._iterator = new_iterator()

else:
span.set_data("unknown_response", True)
span.__exit__(None, None, None)
Expand Down
62 changes: 49 additions & 13 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,20 +57,29 @@
from collections.abc import Iterator

try:
from anyio import create_memory_object_stream, create_task_group, EndOfStream
from mcp.types import (
JSONRPCMessage,
JSONRPCNotification,
JSONRPCRequest,
)
from mcp.shared.message import SessionMessage
from httpx import (
ASGITransport,
Request as HttpxRequest,
Response as HttpxResponse,
AsyncByteStream,
AsyncClient,
)
except ImportError:
ASGITransport = None
HttpxRequest = None
HttpxResponse = None
AsyncByteStream = None
AsyncClient = None


try:
from anyio import create_memory_object_stream, create_task_group, EndOfStream
from mcp.types import (
JSONRPCMessage,
JSONRPCNotification,
JSONRPCRequest,
)
from mcp.shared.message import SessionMessage
except ImportError:
create_memory_object_stream = None
create_task_group = None
Expand All @@ -81,12 +90,6 @@
JSONRPCRequest = None
SessionMessage = None

ASGITransport = None
HttpxRequest = None
HttpxResponse = None
AsyncByteStream = None
AsyncClient = None


SENTRY_EVENT_SCHEMA = "./checkouts/data-schemas/relay/event.schema.json"

Expand Down Expand Up @@ -1013,6 +1016,39 @@ async def inner(values):
return inner


@pytest.fixture
def server_side_event_chunks():
def inner(events):
for event in events:
payload = event.model_dump()
chunk = f"event: {payload['type']}\ndata: {json.dumps(payload)}\n\n"
yield chunk.encode("utf-8")

return inner


@pytest.fixture
def get_model_response():
def inner(response_content, serialize_pydantic=False):
model_request = HttpxRequest(
"POST",
"/responses",
)

if serialize_pydantic:
response_content = json.dumps(response_content.model_dump()).encode("utf-8")

response = HttpxResponse(
200,
request=model_request,
content=response_content,
)

return response

return inner


class MockServerRequestHandler(BaseHTTPRequestHandler):
def do_GET(self): # noqa: N802
# Process an HTTP GET request and return a response.
Expand Down
Loading