Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions gateway/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -10536,6 +10536,7 @@ async def _notify_long_running():
_already_streamed = bool(
(_sc and getattr(_sc, "final_response_sent", False))
or _previewed
or (_sc and getattr(_sc, "final_content_delivered", False))
)
first_response = result.get("final_response", "")
if first_response and not _already_streamed:
Expand Down Expand Up @@ -10679,12 +10680,15 @@ async def _notify_long_running():
# response_previewed means the interim_assistant_callback already
# sent the final text via the adapter (non-streaming path).
_previewed = bool(response.get("response_previewed"))
if not _is_empty_sentinel and (_streamed or _previewed):
_content_delivered = bool(
_sc and getattr(_sc, "final_content_delivered", False)
)
if not _is_empty_sentinel and (_streamed or _previewed or _content_delivered):
logger.info(
"Suppressing normal final send for session %s: final delivery already confirmed (streamed=%s previewed=%s).",
"Suppressing normal final send for session %s: "
"final delivery already confirmed (streamed=%s previewed=%s content_delivered=%s).",
session_key[:20] if session_key else "?",
_streamed,
_previewed,
_streamed, _previewed, _content_delivered,
)
response["already_sent"] = True

Expand Down
17 changes: 17 additions & 0 deletions gateway/stream_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ def __init__(
self._flood_strikes = 0 # Consecutive flood-control edit failures
self._current_edit_interval = self.cfg.edit_interval # Adaptive backoff
self._final_response_sent = False
# Set when the final response content was sent to the user via
# streaming, even if the final edit (cursor removal etc.)
# subsequently failed.
self._final_content_delivered = False
# Cache adapter lifecycle capability: only platforms that need an
# explicit finalize call (e.g. DingTalk AI Cards) force us to make
# a redundant final edit. Everyone else keeps the fast path.
Expand All @@ -123,6 +127,12 @@ def final_response_sent(self) -> bool:
"""True when the stream consumer delivered the final assistant reply."""
return self._final_response_sent

@property
def final_content_delivered(self) -> bool:
"""True when the final response content reached the user, even if
the subsequent cosmetic edit (cursor removal) failed."""
return self._final_content_delivered

def on_segment_break(self) -> None:
"""Finalize the current stream segment and start a fresh message."""
self._queue.put(_NEW_SEGMENT)
Expand Down Expand Up @@ -335,6 +345,8 @@ async def run(self) -> None:
self._last_edit_time = time.monotonic()
if got_done:
self._final_response_sent = self._already_sent
if self._already_sent:
self._final_content_delivered = True
return
if got_segment_break:
self._message_id = None
Expand Down Expand Up @@ -382,6 +394,11 @@ async def run(self) -> None:
self._last_edit_time = time.monotonic()

if got_done:
# Record that the final content reached the user even
# if the cosmetic final edit below fails.
if current_update_visible and self._accumulated:
self._final_content_delivered = True

# Final edit without cursor. If progressive editing failed
# mid-stream, send a single continuation/fallback message
# here instead of letting the base gateway path send the
Expand Down
56 changes: 56 additions & 0 deletions tests/gateway/test_duplicate_reply_suppression.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,3 +458,59 @@ def test_old_behavior_would_have_promoted_partial(self):
final_response_sent = True

assert final_response_sent is True # the bug: partial promoted to final


class TestFinalContentDeliveredSuppression:
"""When stream consumer delivered the final content but the cosmetic
final edit (cursor removal) failed, the gateway must suppress the
fallback send to prevent duplicate messages.

Covers the scenario not handled by final_response_sent alone:
content reached the user via _send_or_edit, but the subsequent edit
that clears a typing cursor or streaming marker failed, leaving
final_response_sent=False even though the user already saw the text.
"""

def test_content_delivered_but_final_edit_failed_suppresses(self):
"""final_content_delivered=True + final_response_sent=False
must suppress (content already visible to user)."""
sc = SimpleNamespace(
already_sent=True,
final_response_sent=False,
final_content_delivered=True,
)
response = {"final_response": "Hello!", "response_previewed": False}

_streamed = bool(getattr(sc, "final_response_sent", False))
_previewed = bool(response.get("response_previewed"))
_content_delivered = bool(getattr(sc, "final_content_delivered", False))
_is_empty_sentinel = (
not response.get("final_response")
or response.get("final_response") == "(empty)"
)
if not _is_empty_sentinel and (_streamed or _previewed or _content_delivered):
response["already_sent"] = True

assert response.get("already_sent") is True

def test_intermediate_text_only_does_not_suppress(self):
"""already_sent=True from intermediate text + final_content_delivered=False
must NOT suppress (user still needs the real final answer)."""
sc = SimpleNamespace(
already_sent=True,
final_response_sent=False,
final_content_delivered=False,
)
response = {"final_response": "Real answer", "response_previewed": False}

_streamed = bool(getattr(sc, "final_response_sent", False))
_previewed = bool(response.get("response_previewed"))
_content_delivered = bool(getattr(sc, "final_content_delivered", False))
_is_empty_sentinel = (
not response.get("final_response")
or response.get("final_response") == "(empty)"
)
if not _is_empty_sentinel and (_streamed or _previewed or _content_delivered):
response["already_sent"] = True

assert "already_sent" not in response
Loading