Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 28 additions & 6 deletions gateway/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -1510,6 +1510,10 @@ def _queue_or_replace_pending_event(self, session_key: str, event: MessageEvent)
return
merge_pending_message_event(adapter._pending_messages, session_key, event)

def busy_input_mode(self) -> str:
"""Return the current active busy-input mode for gateway sessions."""
return self._busy_input_mode

async def _handle_active_session_busy_message(self, event: MessageEvent, session_key: str) -> bool:
# --- Draining case (gateway restarting/stopping) ---
if self._draining:
Expand All @@ -1533,17 +1537,35 @@ async def _handle_active_session_busy_message(self, event: MessageEvent, session
return True

# --- Normal busy case (agent actively running a task) ---
# The user sent a message while the agent is working. Interrupt the
# agent immediately so it stops the current tool-calling loop and
# processes the new message. The pending message is stored in the
# adapter so the base adapter picks it up once the interrupted run
# returns. A brief ack tells the user what's happening (debounced
# to avoid spam when they fire multiple messages quickly).
# Queue mode should preserve the current run and deliver the follow-up
# only after completion. Interrupt mode should stop the current run.

adapter = self.adapters.get(event.source.platform)
if not adapter:
return False # let default path handle it

if self._busy_input_mode == "queue":
from gateway.platforms.base import merge_pending_message_event

merge_pending_message_event(adapter._pending_messages, session_key, event)

_BUSY_ACK_COOLDOWN = 30
now = time.time()
last_ack = self._busy_ack_ts.get(session_key, 0)
if now - last_ack >= _BUSY_ACK_COOLDOWN:
self._busy_ack_ts[session_key] = now
thread_meta = {"thread_id": event.source.thread_id} if event.source.thread_id else None
try:
await adapter._send_with_retry(
chat_id=event.source.chat_id,
content="⏳ Current task kept running. Your message is queued for the next turn.",
reply_to=event.message_id,
metadata=thread_meta,
)
except Exception as e:
logger.debug("Failed to send busy-queue ack: %s", e)
return True

# Store the message so it's processed as the next turn after the
# interrupt causes the current run to exit.
from gateway.platforms.base import merge_pending_message_event
Expand Down
63 changes: 63 additions & 0 deletions tests/test_issue_13403_queue_wait.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import time
from unittest.mock import patch

from tools.process_registry import ProcessRegistry, ProcessSession


def test_wait_does_not_interrupt_in_queue_mode():
registry = ProcessRegistry()
session = ProcessSession(
id="proc_test_13403",
command="sleep 1",
started_at=time.time(),
exited=False,
exit_code=None,
output_buffer="still running",
)

refresh_state = {"calls": 0}

def fake_refresh(s):
refresh_state["calls"] += 1
if refresh_state["calls"] >= 2:
s.exited = True
s.exit_code = 0
s.output_buffer = "done"
return s

interrupt_state = {"calls": 0}

def fake_interrupt():
interrupt_state["calls"] += 1
return interrupt_state["calls"] == 1

with patch.object(registry, "get", return_value=session), \
patch.object(registry, "_refresh_detached_session", side_effect=fake_refresh), \
patch("tools.process_registry._gateway_busy_input_mode", return_value="queue"), \
patch("tools.interrupt.is_interrupted", side_effect=fake_interrupt), \
patch("tools.process_registry.time.sleep", return_value=None):
result = registry.wait("proc_test_13403", timeout=2)

assert result["status"] == "exited"
assert result["exit_code"] == 0


def test_wait_interrupts_when_not_queue_mode():
registry = ProcessRegistry()
session = ProcessSession(
id="proc_test_interrupt",
command="sleep 1",
started_at=time.time(),
exited=False,
exit_code=None,
output_buffer="still running",
)

with patch.object(registry, "get", return_value=session), \
patch.object(registry, "_refresh_detached_session", side_effect=lambda s: s), \
patch("tools.process_registry._gateway_busy_input_mode", return_value="interrupt"), \
patch("tools.interrupt.is_interrupted", return_value=True), \
patch("tools.process_registry.time.sleep", return_value=None):
result = registry.wait("proc_test_interrupt", timeout=2)

assert result["status"] == "interrupted"
15 changes: 15 additions & 0 deletions tools/process_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,15 @@
logger = logging.getLogger(__name__)


def _gateway_busy_input_mode() -> str:
"""Best-effort read of the active gateway busy-input mode."""
try:
from gateway.run import GatewayRunner as _GatewayRunner
return _GatewayRunner._load_busy_input_mode()
except Exception:
return "interrupt"


# Checkpoint file for crash recovery (gateway only)
CHECKPOINT_PATH = get_hermes_home() / "processes.json"

Expand Down Expand Up @@ -760,6 +769,12 @@ def wait(self, session_id: str, timeout: int = None) -> dict:
return result

if _is_interrupted():
_queue_mode = _gateway_busy_input_mode() == "queue"

if _queue_mode:
time.sleep(1)
continue

result = {
"status": "interrupted",
"output": strip_ansi(session.output_buffer[-1000:]),
Expand Down