From 5dc49384415745ec7790dd33c3bcbaae762653fe Mon Sep 17 00:00:00 2001 From: VTRiot <105142614+VTRiot@users.noreply.github.com> Date: Tue, 21 Apr 2026 22:06:10 +0900 Subject: [PATCH] fix(gateway): prevent duplicate final send when only cosmetic edit failed MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When the stream consumer's got_done handler successfully delivers the final response content via _send_or_edit but the subsequent edit (e.g. cursor removal) fails, final_response_sent remains False even though the user has already received the final answer. The gateway's fallback send path then re-delivers the same content, causing the user to see the response twice on Telegram. Introduce a new _final_content_delivered flag on the stream consumer, set by the got_done handler when the final content has reached the user. The _run_agent suppression logic now treats this flag as an additional signal (alongside final_response_sent and response_previewed) that final delivery is already complete. This preserves the existing behavior for intermediate-text-only streams (where already_sent=True but no final content has been delivered) — those still receive the gateway's fallback send, matching the test expectation in test_partial_stream_output_does_not_set_already_sent. Adds TestFinalContentDeliveredSuppression with two cases covering both the suppression (content delivered + edit failed) and the non-suppression (intermediate text only) branches. --- gateway/run.py | 12 ++-- gateway/stream_consumer.py | 17 ++++++ .../test_duplicate_reply_suppression.py | 56 +++++++++++++++++++ 3 files changed, 81 insertions(+), 4 deletions(-) diff --git a/gateway/run.py b/gateway/run.py index c19303e61b1..0f3fb84407f 100644 --- a/gateway/run.py +++ b/gateway/run.py @@ -10536,6 +10536,7 @@ async def _notify_long_running(): _already_streamed = bool( (_sc and getattr(_sc, "final_response_sent", False)) or _previewed + or (_sc and getattr(_sc, "final_content_delivered", False)) ) first_response = result.get("final_response", "") if first_response and not _already_streamed: @@ -10679,12 +10680,15 @@ async def _notify_long_running(): # response_previewed means the interim_assistant_callback already # sent the final text via the adapter (non-streaming path). _previewed = bool(response.get("response_previewed")) - if not _is_empty_sentinel and (_streamed or _previewed): + _content_delivered = bool( + _sc and getattr(_sc, "final_content_delivered", False) + ) + if not _is_empty_sentinel and (_streamed or _previewed or _content_delivered): logger.info( - "Suppressing normal final send for session %s: final delivery already confirmed (streamed=%s previewed=%s).", + "Suppressing normal final send for session %s: " + "final delivery already confirmed (streamed=%s previewed=%s content_delivered=%s).", session_key[:20] if session_key else "?", - _streamed, - _previewed, + _streamed, _previewed, _content_delivered, ) response["already_sent"] = True diff --git a/gateway/stream_consumer.py b/gateway/stream_consumer.py index 78e365712d9..e271b5a5cd8 100644 --- a/gateway/stream_consumer.py +++ b/gateway/stream_consumer.py @@ -100,6 +100,10 @@ def __init__( self._flood_strikes = 0 # Consecutive flood-control edit failures self._current_edit_interval = self.cfg.edit_interval # Adaptive backoff self._final_response_sent = False + # Set when the final response content was sent to the user via + # streaming, even if the final edit (cursor removal etc.) + # subsequently failed. + self._final_content_delivered = False # Cache adapter lifecycle capability: only platforms that need an # explicit finalize call (e.g. DingTalk AI Cards) force us to make # a redundant final edit. Everyone else keeps the fast path. @@ -123,6 +127,12 @@ def final_response_sent(self) -> bool: """True when the stream consumer delivered the final assistant reply.""" return self._final_response_sent + @property + def final_content_delivered(self) -> bool: + """True when the final response content reached the user, even if + the subsequent cosmetic edit (cursor removal) failed.""" + return self._final_content_delivered + def on_segment_break(self) -> None: """Finalize the current stream segment and start a fresh message.""" self._queue.put(_NEW_SEGMENT) @@ -335,6 +345,8 @@ async def run(self) -> None: self._last_edit_time = time.monotonic() if got_done: self._final_response_sent = self._already_sent + if self._already_sent: + self._final_content_delivered = True return if got_segment_break: self._message_id = None @@ -382,6 +394,11 @@ async def run(self) -> None: self._last_edit_time = time.monotonic() if got_done: + # Record that the final content reached the user even + # if the cosmetic final edit below fails. + if current_update_visible and self._accumulated: + self._final_content_delivered = True + # Final edit without cursor. If progressive editing failed # mid-stream, send a single continuation/fallback message # here instead of letting the base gateway path send the diff --git a/tests/gateway/test_duplicate_reply_suppression.py b/tests/gateway/test_duplicate_reply_suppression.py index c275a12c07c..eec21f7709b 100644 --- a/tests/gateway/test_duplicate_reply_suppression.py +++ b/tests/gateway/test_duplicate_reply_suppression.py @@ -458,3 +458,59 @@ def test_old_behavior_would_have_promoted_partial(self): final_response_sent = True assert final_response_sent is True # the bug: partial promoted to final + + +class TestFinalContentDeliveredSuppression: + """When stream consumer delivered the final content but the cosmetic + final edit (cursor removal) failed, the gateway must suppress the + fallback send to prevent duplicate messages. + + Covers the scenario not handled by final_response_sent alone: + content reached the user via _send_or_edit, but the subsequent edit + that clears a typing cursor or streaming marker failed, leaving + final_response_sent=False even though the user already saw the text. + """ + + def test_content_delivered_but_final_edit_failed_suppresses(self): + """final_content_delivered=True + final_response_sent=False + must suppress (content already visible to user).""" + sc = SimpleNamespace( + already_sent=True, + final_response_sent=False, + final_content_delivered=True, + ) + response = {"final_response": "Hello!", "response_previewed": False} + + _streamed = bool(getattr(sc, "final_response_sent", False)) + _previewed = bool(response.get("response_previewed")) + _content_delivered = bool(getattr(sc, "final_content_delivered", False)) + _is_empty_sentinel = ( + not response.get("final_response") + or response.get("final_response") == "(empty)" + ) + if not _is_empty_sentinel and (_streamed or _previewed or _content_delivered): + response["already_sent"] = True + + assert response.get("already_sent") is True + + def test_intermediate_text_only_does_not_suppress(self): + """already_sent=True from intermediate text + final_content_delivered=False + must NOT suppress (user still needs the real final answer).""" + sc = SimpleNamespace( + already_sent=True, + final_response_sent=False, + final_content_delivered=False, + ) + response = {"final_response": "Real answer", "response_previewed": False} + + _streamed = bool(getattr(sc, "final_response_sent", False)) + _previewed = bool(response.get("response_previewed")) + _content_delivered = bool(getattr(sc, "final_content_delivered", False)) + _is_empty_sentinel = ( + not response.get("final_response") + or response.get("final_response") == "(empty)" + ) + if not _is_empty_sentinel and (_streamed or _previewed or _content_delivered): + response["already_sent"] = True + + assert "already_sent" not in response