Skip to content
Merged
Changes from 29 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
4cf7e5a
ref(anthropic): Factor out streamed result handling
alexander-alderman-webb Feb 27, 2026
61a4cd0
ref(anthropic): Skip accumulation logic for unexpected types in strea…
alexander-alderman-webb Feb 27, 2026
6cdab16
.
alexander-alderman-webb Feb 27, 2026
db5dbb9
.
alexander-alderman-webb Feb 27, 2026
091506c
Merge branch 'webb/anthropic/separate-output-handling' into webb/anth…
alexander-alderman-webb Feb 27, 2026
d47b64d
.
alexander-alderman-webb Feb 27, 2026
384351f
remove duplicate import
alexander-alderman-webb Feb 27, 2026
55d5c27
.
alexander-alderman-webb Feb 27, 2026
fd84837
simplify
alexander-alderman-webb Feb 27, 2026
53859e4
.
alexander-alderman-webb Mar 10, 2026
e2d6d78
remove unused import
alexander-alderman-webb Mar 10, 2026
a01f7c1
add docstring
alexander-alderman-webb Mar 10, 2026
7837439
add return
alexander-alderman-webb Mar 10, 2026
0e06f49
remove return statement
alexander-alderman-webb Mar 10, 2026
c396a32
merge
alexander-alderman-webb Mar 10, 2026
d4022c5
Merge branch 'master' into webb/anthropic/only-raw-message-stream-events
alexander-alderman-webb Mar 10, 2026
4e3d1db
Merge branch 'master' into webb/anthropic/separate-output-handling
alexander-alderman-webb Mar 10, 2026
531c6a8
Merge branch 'webb/anthropic/separate-output-handling' into webb/anth…
alexander-alderman-webb Mar 10, 2026
917b8d4
.
alexander-alderman-webb Mar 11, 2026
aa6e58a
.
alexander-alderman-webb Mar 11, 2026
bd00e4e
.
alexander-alderman-webb Mar 11, 2026
a6da66e
merge
alexander-alderman-webb Mar 11, 2026
c988c26
fix type
alexander-alderman-webb Mar 11, 2026
3009ba6
Merge branch 'webb/anthropic/separate-output-handling' into webb/anth…
alexander-alderman-webb Mar 11, 2026
9eff33e
fix(anthropic): Close span on GeneratorExit
alexander-alderman-webb Mar 12, 2026
5e3cb5f
revert conftest
alexander-alderman-webb Mar 12, 2026
ab5e3bb
Merge branch 'master' into webb/anthropic/separate-output-handling
alexander-alderman-webb Mar 12, 2026
571db4e
Merge branch 'webb/anthropic/separate-output-handling' into webb/anth…
alexander-alderman-webb Mar 12, 2026
804db65
Merge branch 'webb/anthropic/only-raw-message-stream-events' into web…
alexander-alderman-webb Mar 12, 2026
2a0e5e9
merge master
alexander-alderman-webb Mar 13, 2026
f36dd5d
keep comment
alexander-alderman-webb Mar 13, 2026
0da2362
wrap finally in capture internal exceptions
alexander-alderman-webb Mar 13, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
235 changes: 149 additions & 86 deletions sentry_sdk/integrations/anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,18 @@
except ImportError:
Omit = None

from anthropic import Stream, AsyncStream
from anthropic.resources import AsyncMessages, Messages

from anthropic.types import (
MessageStartEvent,
MessageDeltaEvent,
MessageStopEvent,
ContentBlockStartEvent,
ContentBlockDeltaEvent,
ContentBlockStopEvent,
)

if TYPE_CHECKING:
from anthropic.types import MessageStreamEvent, TextBlockParam
except ImportError:
Expand All @@ -49,6 +59,8 @@
from sentry_sdk.tracing import Span
from sentry_sdk._types import TextPart

from anthropic.types import RawMessageStreamEvent


class _RecordedUsage:
output_tokens: int = 0
Expand Down Expand Up @@ -338,6 +350,131 @@ def _set_input_data(
span.set_data(SPANDATA.GEN_AI_REQUEST_AVAILABLE_TOOLS, safe_serialize(tools))


def _wrap_synchronous_message_iterator(
iterator: "Iterator[RawMessageStreamEvent]",
span: "Span",
integration: "AnthropicIntegration",
) -> "Iterator[RawMessageStreamEvent]":
"""
Sets information received while iterating the response stream on the AI Client Span.
Responsible for closing the AI Client Span.
"""

model = None
usage = _RecordedUsage()
content_blocks: "list[str]" = []

try:
for event in iterator:
if not isinstance(
event,
(
MessageStartEvent,
MessageDeltaEvent,
MessageStopEvent,
ContentBlockStartEvent,
ContentBlockDeltaEvent,
ContentBlockStopEvent,
),
):
yield event
continue

(
model,
usage,
content_blocks,
) = _collect_ai_data(
event,
model,
usage,
content_blocks,
)
yield event
finally:
# Anthropic's input_tokens excludes cached/cache_write tokens.
# Normalize to total input tokens for correct cost calculations.
total_input = (
usage.input_tokens
+ (usage.cache_read_input_tokens or 0)
+ (usage.cache_write_input_tokens or 0)
)

_set_output_data(
span=span,
integration=integration,
model=model,
input_tokens=total_input,
output_tokens=usage.output_tokens,
cache_read_input_tokens=usage.cache_read_input_tokens,
cache_write_input_tokens=usage.cache_write_input_tokens,
content_blocks=[{"text": "".join(content_blocks), "type": "text"}],
finish_span=True,
)


async def _wrap_asynchronous_message_iterator(
iterator: "AsyncIterator[RawMessageStreamEvent]",
span: "Span",
integration: "AnthropicIntegration",
) -> "AsyncIterator[RawMessageStreamEvent]":
"""
Sets information received while iterating the response stream on the AI Client Span.
Responsible for closing the AI Client Span.
"""
model = None
usage = _RecordedUsage()
content_blocks: "list[str]" = []

try:
async for event in iterator:
if not isinstance(
event,
(
MessageStartEvent,
MessageDeltaEvent,
MessageStopEvent,
ContentBlockStartEvent,
ContentBlockDeltaEvent,
ContentBlockStopEvent,
),
):
yield event
continue

(
model,
usage,
content_blocks,
) = _collect_ai_data(
event,
model,
usage,
content_blocks,
)
yield event
finally:
# Anthropic's input_tokens excludes cached/cache_write tokens.
# Normalize to total input tokens for correct cost calculations.
total_input = (
usage.input_tokens
+ (usage.cache_read_input_tokens or 0)
+ (usage.cache_write_input_tokens or 0)
)

_set_output_data(
span=span,
integration=integration,
model=model,
input_tokens=total_input,
output_tokens=usage.output_tokens,
cache_read_input_tokens=usage.cache_read_input_tokens,
cache_write_input_tokens=usage.cache_write_input_tokens,
content_blocks=[{"text": "".join(content_blocks), "type": "text"}],
finish_span=True,
)


def _set_output_data(
span: "Span",
integration: "AnthropicIntegration",
Expand Down Expand Up @@ -415,6 +552,18 @@ def _sentry_patched_create_common(f: "Any", *args: "Any", **kwargs: "Any") -> "A

result = yield f, args, kwargs

if isinstance(result, Stream):
result._iterator = _wrap_synchronous_message_iterator(
result._iterator, span, integration
)
return result

if isinstance(result, AsyncStream):
result._iterator = _wrap_asynchronous_message_iterator(
result._iterator, span, integration
)
return result

with capture_internal_exceptions():
if hasattr(result, "content"):
(
Expand Down Expand Up @@ -444,92 +593,6 @@ def _sentry_patched_create_common(f: "Any", *args: "Any", **kwargs: "Any") -> "A
content_blocks=content_blocks,
finish_span=True,
)

# Streaming response
elif hasattr(result, "_iterator"):
old_iterator = result._iterator

def new_iterator() -> "Iterator[MessageStreamEvent]":
model = None
usage = _RecordedUsage()
content_blocks: "list[str]" = []

for event in old_iterator:
(
model,
usage,
content_blocks,
) = _collect_ai_data(
event,
model,
usage,
content_blocks,
)
yield event

# Anthropic's input_tokens excludes cached/cache_write tokens.
# Normalize to total input tokens for correct cost calculations.
total_input = (
usage.input_tokens
+ (usage.cache_read_input_tokens or 0)
+ (usage.cache_write_input_tokens or 0)
)

_set_output_data(
span=span,
integration=integration,
model=model,
input_tokens=total_input,
output_tokens=usage.output_tokens,
cache_read_input_tokens=usage.cache_read_input_tokens,
cache_write_input_tokens=usage.cache_write_input_tokens,
content_blocks=[{"text": "".join(content_blocks), "type": "text"}],
finish_span=True,
)

async def new_iterator_async() -> "AsyncIterator[MessageStreamEvent]":
model = None
usage = _RecordedUsage()
content_blocks: "list[str]" = []

async for event in old_iterator:
(
model,
usage,
content_blocks,
) = _collect_ai_data(
event,
model,
usage,
content_blocks,
)
yield event

# Anthropic's input_tokens excludes cached/cache_write tokens.
# Normalize to total input tokens for correct cost calculations.
total_input = (
usage.input_tokens
+ (usage.cache_read_input_tokens or 0)
+ (usage.cache_write_input_tokens or 0)
)

_set_output_data(
span=span,
integration=integration,
model=model,
input_tokens=total_input,
output_tokens=usage.output_tokens,
cache_read_input_tokens=usage.cache_read_input_tokens,
cache_write_input_tokens=usage.cache_write_input_tokens,
content_blocks=[{"text": "".join(content_blocks), "type": "text"}],
finish_span=True,
)

if str(type(result._iterator)) == "<class 'async_generator'>":
result._iterator = new_iterator_async()
else:
result._iterator = new_iterator()

else:
span.set_data("unknown_response", True)
span.__exit__(None, None, None)
Expand Down
Loading