Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions cron/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,11 @@ def _send_media_via_adapter(adapter, chat_id: str, media_files: list, metadata:
coro = adapter.send_document(chat_id=chat_id, file_path=media_path, metadata=metadata)

future = asyncio.run_coroutine_threadsafe(coro, loop)
result = future.result(timeout=30)
try:
result = future.result(timeout=30)
except TimeoutError:
future.cancel()
raise
if result and not getattr(result, "success", True):
logger.warning(
"Job '%s': media send failed for %s: %s",
Expand Down Expand Up @@ -382,7 +386,11 @@ def _deliver_result(job: dict, content: str, adapters=None, loop=None) -> Option
runtime_adapter.send(chat_id, text_to_send, metadata=send_metadata),
loop,
)
send_result = future.result(timeout=60)
try:
send_result = future.result(timeout=60)
except TimeoutError:
future.cancel()
raise
if send_result and not getattr(send_result, "success", True):
err = getattr(send_result, "error", "unknown")
logger.warning(
Expand Down
1 change: 1 addition & 0 deletions scripts/release.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@
"zheng.jerilyn@gmail.com": "jerilynzheng",
"asslaenn5@gmail.com": "Aslaaen",
"shalompmc0505@naver.com": "pinion05",
"105142614+VTRiot@users.noreply.github.com": "VTRiot",
}


Expand Down
125 changes: 125 additions & 0 deletions tests/cron/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1580,3 +1580,128 @@ def mock_run_job(job):
end_s1 = [t for action, jid, t in call_times if action == "end" and jid == "s1"][0]
start_s2 = [t for action, jid, t in call_times if action == "start" and jid == "s2"][0]
assert start_s2 >= end_s1, "Jobs ran concurrently despite max_parallel=1"


class TestDeliverResultTimeoutCancelsFuture:
"""When future.result(timeout=60) raises TimeoutError in the live
adapter delivery path, _deliver_result must cancel the orphan
coroutine so it cannot duplicate-send after the standalone fallback.
"""

def test_live_adapter_timeout_cancels_future_and_falls_back(self):
"""End-to-end: live adapter hangs past the 60s budget, _deliver_result
patches the timeout down to a fast value, confirms future.cancel() fires,
and verifies the standalone fallback path still delivers."""
from gateway.config import Platform
from concurrent.futures import Future

# Live adapter whose send() coroutine never resolves within the budget
adapter = AsyncMock()
adapter.send.return_value = MagicMock(success=True)

pconfig = MagicMock()
pconfig.enabled = True
mock_cfg = MagicMock()
mock_cfg.platforms = {Platform.TELEGRAM: pconfig}

loop = MagicMock()
loop.is_running.return_value = True

# A real concurrent.futures.Future so .cancel() has real semantics,
# but we override .result() to raise TimeoutError exactly like the
# 60s wait firing in production.
captured_future = Future()
cancel_calls = []
original_cancel = captured_future.cancel

def tracking_cancel():
cancel_calls.append(True)
return original_cancel()

captured_future.cancel = tracking_cancel
captured_future.result = MagicMock(side_effect=TimeoutError("timed out"))

def fake_run_coro(coro, _loop):
coro.close()
return captured_future

job = {
"id": "timeout-job",
"deliver": "origin",
"origin": {"platform": "telegram", "chat_id": "123"},
}

standalone_send = AsyncMock(return_value={"success": True})

with patch("gateway.config.load_gateway_config", return_value=mock_cfg), \
patch("cron.scheduler.load_config", return_value={"cron": {"wrap_response": False}}), \
patch("asyncio.run_coroutine_threadsafe", side_effect=fake_run_coro), \
patch("tools.send_message_tool._send_to_platform", new=standalone_send):
result = _deliver_result(
job,
"Hello world",
adapters={Platform.TELEGRAM: adapter},
loop=loop,
)

# 1. The orphan future was cancelled on timeout (the bug fix)
assert cancel_calls == [True], "future.cancel() must fire on TimeoutError"
# 2. The standalone fallback delivered — no double send, no silent drop
assert result is None, f"expected successful delivery, got error: {result!r}"
standalone_send.assert_awaited_once()


class TestSendMediaTimeoutCancelsFuture:
"""Same orphan-coroutine guarantee for _send_media_via_adapter's
future.result(timeout=30) call. If this times out mid-batch, the
in-flight coroutine must be cancelled before the next file is tried.
"""

def test_media_send_timeout_cancels_future_and_continues(self):
"""End-to-end: _send_media_via_adapter with a future whose .result()
raises TimeoutError. Assert cancel() fires and the loop proceeds
to the next file rather than hanging or crashing."""
from concurrent.futures import Future

adapter = MagicMock()
adapter.send_image_file = AsyncMock()
adapter.send_video = AsyncMock()

# First file: future that times out. Second file: future that resolves OK.
timeout_future = Future()
timeout_cancel_calls = []
original_cancel = timeout_future.cancel

def tracking_cancel():
timeout_cancel_calls.append(True)
return original_cancel()

timeout_future.cancel = tracking_cancel
timeout_future.result = MagicMock(side_effect=TimeoutError("timed out"))

ok_future = Future()
ok_future.set_result(MagicMock(success=True))

futures_iter = iter([timeout_future, ok_future])

def fake_run_coro(coro, _loop):
coro.close()
return next(futures_iter)

media_files = [
("/tmp/slow.png", False), # times out
("/tmp/fast.mp4", False), # succeeds
]

loop = MagicMock()
job = {"id": "media-timeout"}

with patch("asyncio.run_coroutine_threadsafe", side_effect=fake_run_coro):
# Should not raise — the except Exception clause swallows the timeout
_send_media_via_adapter(adapter, "chat-1", media_files, None, loop, job)

# 1. The timed-out future was cancelled (the bug fix)
assert timeout_cancel_calls == [True], "future.cancel() must fire on TimeoutError"
# 2. Second file still got dispatched — one timeout doesn't abort the batch
adapter.send_video.assert_called_once()
assert adapter.send_video.call_args[1]["video_path"] == "/tmp/fast.mp4"
Loading