diff --git a/apps/worker/services/lock_manager.py b/apps/worker/services/lock_manager.py index d2be045760..9c1e6a2d98 100644 --- a/apps/worker/services/lock_manager.py +++ b/apps/worker/services/lock_manager.py @@ -14,13 +14,12 @@ from shared.celery_config import ( DEFAULT_BLOCKING_TIMEOUT_SECONDS, DEFAULT_LOCK_TIMEOUT_SECONDS, + TASK_RETRY_COUNTDOWN_MAX_SECONDS, ) from shared.config import get_config from shared.helpers.redis import get_redis_connection # type: ignore log = logging.getLogger(__name__) - -MAX_RETRY_COUNTDOWN_SECONDS = 60 * 60 * 5 BASE_RETRY_COUNTDOWN_SECONDS = 200 RETRY_BACKOFF_MULTIPLIER = 3 RETRY_COUNTDOWN_RANGE_DIVISOR = 2 @@ -183,8 +182,8 @@ def locked( max_retry_unbounded = self.base_retry_countdown * ( RETRY_BACKOFF_MULTIPLIER**retry_num ) - if max_retry_unbounded >= MAX_RETRY_COUNTDOWN_SECONDS: - countdown = MAX_RETRY_COUNTDOWN_SECONDS + if max_retry_unbounded >= TASK_RETRY_COUNTDOWN_MAX_SECONDS: + countdown = TASK_RETRY_COUNTDOWN_MAX_SECONDS else: countdown_unbounded = random.randint( max_retry_unbounded // RETRY_COUNTDOWN_RANGE_DIVISOR, diff --git a/apps/worker/services/tests/test_lock_manager.py b/apps/worker/services/tests/test_lock_manager.py index 673e30ae6b..972287e79e 100644 --- a/apps/worker/services/tests/test_lock_manager.py +++ b/apps/worker/services/tests/test_lock_manager.py @@ -7,7 +7,12 @@ from redis.exceptions import LockError from database.enums import ReportType -from services.lock_manager import LockManager, LockRetry, LockType +from services.lock_manager import ( + LockManager, + LockRetry, + LockType, +) +from shared.celery_config import TASK_RETRY_COUNTDOWN_MAX_SECONDS from tasks.base import BaseCodecovTask @@ -191,11 +196,11 @@ def test_locked_exponential_backoff_retry_2(self, mock_redis): with manager.locked(LockType.UPLOAD, retry_num=2): pass - # retry_num=2: 200 * 3^2 = 1800, so countdown should be between 900-1800 - assert 900 <= exc_info.value.countdown <= 1800 + # retry_num=2: 200 * 3^2 = 1800, which exceeds TASK_RETRY_COUNTDOWN_MAX_SECONDS + assert exc_info.value.countdown == TASK_RETRY_COUNTDOWN_MAX_SECONDS def test_locked_exponential_backoff_cap(self, mock_redis): - """Test that exponential backoff is capped at 5 hours""" + """Test that exponential backoff is capped at TASK_RETRY_COUNTDOWN_MAX_SECONDS""" mock_redis.lock.side_effect = LockError() mock_redis.incr.return_value = 1 @@ -205,8 +210,7 @@ def test_locked_exponential_backoff_cap(self, mock_redis): with manager.locked(LockType.UPLOAD, retry_num=10): pass - # Cap is 60 * 60 * 5 = 18000 seconds (5 hours) - assert exc_info.value.countdown <= 18000 + assert exc_info.value.countdown == TASK_RETRY_COUNTDOWN_MAX_SECONDS def test_locked_max_retries_not_provided(self, mock_redis, caplog): """Test that max_retries=None doesn't log error""" diff --git a/apps/worker/tasks/base.py b/apps/worker/tasks/base.py index a79f9f9b21..4e2b5ff8a4 100644 --- a/apps/worker/tasks/base.py +++ b/apps/worker/tasks/base.py @@ -33,6 +33,9 @@ from services.repository import get_repo_provider_service from shared.celery_config import ( TASK_RETRY_BACKOFF_BASE_SECONDS, + TASK_RETRY_COUNTDOWN_MAX_SECONDS, + TASK_RETRY_MIN_SAFE_WINDOW_SECONDS, + TASK_VISIBILITY_TIMEOUT_SECONDS, upload_breadcrumb_task_name, ) from shared.celery_router import route_tasks_based_on_user_plan @@ -202,12 +205,52 @@ def __init_subclass__(cls, name=None): cls.task_core_runtime = TASK_CORE_RUNTIME.labels(task=name) @property - def hard_time_limit_task(self): + def hard_time_limit_task(self) -> int: + """Return the hard time limit for this task execution, in seconds. + + Checked in priority order: + 1. request.timelimit[0] — set per-execution when a task is dispatched + with a custom time limit (e.g. apply_async(time_limit=N)). + 2. self.time_limit — the class-level hard limit defined on the task. + 3. app.conf.task_time_limit — the Celery application default. + + Returns 0 if no limit is configured at any level. + """ if self.request.timelimit is not None and self.request.timelimit[0] is not None: - return self.request.timelimit[0] + return int(self.request.timelimit[0]) if self.time_limit is not None: - return self.time_limit - return self.app.conf.task_time_limit or 0 + return int(self.time_limit) + return int(self.app.conf.task_time_limit or 0) + + def clamp_retry_countdown(self, countdown: int) -> int: + """Cap a retry countdown to prevent Redis redelivery before the retry fires. + + A task calling retry() may have been running for up to hard_time_limit_task + seconds, so the safe ceiling is: + TASK_VISIBILITY_TIMEOUT_SECONDS - hard_time_limit_task + + Tasks with no hard limit fall back to TASK_RETRY_COUNTDOWN_MAX_SECONDS. + + When the safe window is smaller than TASK_RETRY_MIN_SAFE_WINDOW_SECONDS + (e.g. enterprise tasks with hard_timelimit=2450s >> visibility_timeout=900s), + also falls back to TASK_RETRY_COUNTDOWN_MAX_SECONDS. + """ + hard_limit = self.hard_time_limit_task + if hard_limit: + safe_window = TASK_VISIBILITY_TIMEOUT_SECONDS - hard_limit + if safe_window < TASK_RETRY_MIN_SAFE_WINDOW_SECONDS: + log.warning( + "Task hard time limit leaves insufficient visibility window for " + "retry clamping — falling back to global max", + extra={ + "hard_time_limit_task": hard_limit, + "min_safe_window": TASK_RETRY_MIN_SAFE_WINDOW_SECONDS, + "visibility_timeout": TASK_VISIBILITY_TIMEOUT_SECONDS, + }, + ) + return min(countdown, TASK_RETRY_COUNTDOWN_MAX_SECONDS) + return min(countdown, safe_window) + return min(countdown, TASK_RETRY_COUNTDOWN_MAX_SECONDS) def get_lock_timeout(self, default_timeout: int) -> int: """ @@ -261,7 +304,7 @@ def apply_async(self, args=None, kwargs=None, **options): return super().apply_async(args=args, kwargs=kwargs, headers=headers, **options) def retry(self, max_retries=None, countdown=None, exc=None, **kwargs): - """Override Celery's retry to always update attempts header.""" + """Override Celery's retry to always update attempts header and clamp countdown.""" request = getattr(self, "request", None) current_attempts = self.attempts kwargs["headers"] = { @@ -270,6 +313,9 @@ def retry(self, max_retries=None, countdown=None, exc=None, **kwargs): "attempts": current_attempts + 1, } + if countdown is not None: + countdown = self.clamp_retry_countdown(countdown) + return super().retry( max_retries=max_retries, countdown=countdown, exc=exc, **kwargs ) diff --git a/apps/worker/tasks/bundle_analysis_notify.py b/apps/worker/tasks/bundle_analysis_notify.py index cc08558d49..ea82e56d70 100644 --- a/apps/worker/tasks/bundle_analysis_notify.py +++ b/apps/worker/tasks/bundle_analysis_notify.py @@ -84,7 +84,10 @@ def run_impl( "notify_attempted": False, "notify_succeeded": None, } - self.retry(max_retries=self.max_retries, countdown=retry.countdown) + self.retry( + max_retries=self.max_retries, + countdown=retry.countdown, + ) @sentry_sdk.trace def process_impl_within_lock( diff --git a/apps/worker/tasks/bundle_analysis_processor.py b/apps/worker/tasks/bundle_analysis_processor.py index 4e580ed03e..8afe77ec65 100644 --- a/apps/worker/tasks/bundle_analysis_processor.py +++ b/apps/worker/tasks/bundle_analysis_processor.py @@ -153,7 +153,10 @@ def run_impl( retry_num=self.request.retries, ) return previous_result - self.retry(max_retries=self.max_retries, countdown=retry.countdown) + self.retry( + max_retries=self.max_retries, + countdown=retry.countdown, + ) @staticmethod def _ba_report_already_exists(db_session, repoid: int, commitid: str) -> bool: diff --git a/apps/worker/tasks/http_request.py b/apps/worker/tasks/http_request.py index 02fbc81c13..71f4f215df 100644 --- a/apps/worker/tasks/http_request.py +++ b/apps/worker/tasks/http_request.py @@ -66,7 +66,10 @@ def run_impl( def _retry_task(self): # retry w/ exponential backoff - self.retry(max_retries=5, countdown=20 * (2**self.request.retries)) + self.retry( + max_retries=5, + countdown=20 * (2**self.request.retries), + ) RegisteredHTTPRequestTask = celery_app.register_task(HTTPRequestTask()) diff --git a/apps/worker/tasks/manual_trigger.py b/apps/worker/tasks/manual_trigger.py index e8b783f7fd..ab9bab75b7 100644 --- a/apps/worker/tasks/manual_trigger.py +++ b/apps/worker/tasks/manual_trigger.py @@ -65,7 +65,10 @@ def run_impl( "notifications_called": False, "message": "Unable to acquire lock", } - self.retry(max_retries=TASK_MAX_RETRIES_DEFAULT, countdown=retry.countdown) + self.retry( + max_retries=TASK_MAX_RETRIES_DEFAULT, + countdown=retry.countdown, + ) def process_impl_within_lock( self, diff --git a/apps/worker/tasks/preprocess_upload.py b/apps/worker/tasks/preprocess_upload.py index 087480edd3..8437d776cc 100644 --- a/apps/worker/tasks/preprocess_upload.py +++ b/apps/worker/tasks/preprocess_upload.py @@ -92,7 +92,8 @@ def run_impl(self, db_session, *, repoid: int, commitid: str, **kwargs): error=Errors.INTERNAL_RETRYING, ) self.retry( - max_retries=PREPROCESS_UPLOAD_MAX_RETRIES, countdown=retry.countdown + max_retries=PREPROCESS_UPLOAD_MAX_RETRIES, + countdown=retry.countdown, ) def process_impl_within_lock(self, db_session, repoid, commitid): diff --git a/apps/worker/tasks/tests/unit/test_base.py b/apps/worker/tasks/tests/unit/test_base.py index 3bd0fa6d2c..9b6b1499b5 100644 --- a/apps/worker/tasks/tests/unit/test_base.py +++ b/apps/worker/tasks/tests/unit/test_base.py @@ -22,6 +22,9 @@ from database.tests.factories.core import OwnerFactory, RepositoryFactory from helpers.exceptions import NoConfiguredAppsAvailable, RepositoryWithoutValidBotError from shared.celery_config import ( + TASK_RETRY_COUNTDOWN_MAX_SECONDS, + TASK_RETRY_MIN_SAFE_WINDOW_SECONDS, + TASK_VISIBILITY_TIMEOUT_SECONDS, sync_repos_task_name, upload_breadcrumb_task_name, upload_task_name, @@ -29,13 +32,206 @@ from shared.django_apps.upload_breadcrumbs.models import BreadcrumbData, Errors from shared.plan.constants import PlanName from shared.torngit.exceptions import TorngitClientError -from tasks.base import BaseCodecovRequest, BaseCodecovTask +from tasks.base import ( + BaseCodecovRequest, + BaseCodecovTask, +) from tasks.base import celery_app as base_celery_app from tests.helpers import mock_all_plans_and_tiers here = Path(__file__) +class TestClampRetryCountdown: + # A representative task hard time limit — arbitrary fraction of the visibility + # timeout, chosen so the cap math produces non-trivial values in both + # the test environment (TASK_VISIBILITY_TIMEOUT_SECONDS=20) and production (900). + _HARD_LIMIT = TASK_VISIBILITY_TIMEOUT_SECONDS // 4 + + # The safe ceiling: a task may have been running for up to _HARD_LIMIT seconds, + # so the countdown must fit within the remaining visibility window. + _SAFE_CAP = TASK_VISIBILITY_TIMEOUT_SECONDS - _HARD_LIMIT + + # A countdown larger than any conceivable real cap, to exercise clamping. + _COUNTDOWN_EXCEEDING_ANY_CAP = 999_999 + + @pytest.mark.parametrize( + "hard_limit,countdown,expected", + [ + # below cap → returned as-is + (_HARD_LIMIT, 0, 0), + (_HARD_LIMIT, _SAFE_CAP, _SAFE_CAP), + # 1 above cap → clamped to cap + (_HARD_LIMIT, _SAFE_CAP + 1, _SAFE_CAP), + # way above cap → clamped to cap + (_HARD_LIMIT, _COUNTDOWN_EXCEEDING_ANY_CAP, _SAFE_CAP), + # no hard limit, way above max → clamped to max + (0, _COUNTDOWN_EXCEEDING_ANY_CAP, TASK_RETRY_COUNTDOWN_MAX_SECONDS), + # no hard limit, below max → returned as-is + (0, 0, 0), + ], + ) + def test_clamp(self, mocker, hard_limit, countdown, expected): + mocker.patch.object( + BaseCodecovTask, + "hard_time_limit_task", + new_callable=PropertyMock, + return_value=hard_limit, + ) + task = BaseCodecovTask() + assert task.clamp_retry_countdown(countdown) == expected + + @pytest.mark.parametrize( + "hard_limit", + [ + # safe window is 1 second below the minimum — smallest misconfigured case + TASK_VISIBILITY_TIMEOUT_SECONDS - TASK_RETRY_MIN_SAFE_WINDOW_SECONDS + 1, + # safe window is 0 + TASK_VISIBILITY_TIMEOUT_SECONDS, + # hard_limit exceeds visibility timeout (e.g. enterprise tasks with hard_timelimit=2450s) + TASK_VISIBILITY_TIMEOUT_SECONDS + 10, + ], + ) + def test_clamp_falls_back_when_misconfigured(self, mocker, hard_limit): + """Falls back to TASK_RETRY_COUNTDOWN_MAX_SECONDS when the safe window is too small.""" + mocker.patch.object( + BaseCodecovTask, + "hard_time_limit_task", + new_callable=PropertyMock, + return_value=hard_limit, + ) + task = BaseCodecovTask() + assert task.clamp_retry_countdown(999) == TASK_RETRY_COUNTDOWN_MAX_SECONDS + + def test_hard_time_limit_task_float_is_cast_to_int(self, mocker): + """Floats from self.request.timelimit[0] must be cast so math stays integer.""" + task = BaseCodecovTask() + # Mock request so timelimit[0] is a float — exercises the first branch. + mock_request = MagicMock() + mock_request.timelimit = [600.9, None] + mocker.patch.object( + BaseCodecovTask, + "request", + new_callable=PropertyMock, + return_value=mock_request, + ) + result = task.hard_time_limit_task + assert result == 600 + assert isinstance(result, int) + + def test_retry_applies_clamping(self, mocker): + """retry() must pass a clamped countdown to Celery so we never exceed visibility timeout.""" + task = BaseCodecovTask() + hard_limit = TASK_VISIBILITY_TIMEOUT_SECONDS // 4 + mocker.patch.object( + BaseCodecovTask, + "hard_time_limit_task", + new_callable=PropertyMock, + return_value=hard_limit, + ) + captured = {} + + def fake_super_retry(max_retries=None, countdown=None, exc=None, **kwargs): + captured["countdown"] = countdown + raise Retry() + + mocker.patch.object( + BaseCodecovTask, + "attempts", + new_callable=PropertyMock, + return_value=0, + ) + mock_request = MagicMock() + mock_request.headers = {} + mocker.patch.object( + BaseCodecovTask, + "request", + new_callable=PropertyMock, + return_value=mock_request, + ) + mocker.patch("celery.app.task.Task.retry", side_effect=fake_super_retry) + + with pytest.raises(Retry): + task.retry(max_retries=5, countdown=99999) + + assert captured["countdown"] == TASK_VISIBILITY_TIMEOUT_SECONDS - hard_limit + + def test_retry_does_not_clamp_when_countdown_is_none(self, mocker): + """retry() with countdown=None must pass None through untouched (Celery default).""" + task = BaseCodecovTask() + mocker.patch.object( + BaseCodecovTask, + "hard_time_limit_task", + new_callable=PropertyMock, + return_value=TASK_VISIBILITY_TIMEOUT_SECONDS // 4, + ) + captured = {} + + def fake_super_retry(max_retries=None, countdown=None, exc=None, **kwargs): + captured["countdown"] = countdown + raise Retry() + + mocker.patch.object( + BaseCodecovTask, + "attempts", + new_callable=PropertyMock, + return_value=0, + ) + mock_request = MagicMock() + mock_request.headers = {} + mocker.patch.object( + BaseCodecovTask, + "request", + new_callable=PropertyMock, + return_value=mock_request, + ) + mocker.patch("celery.app.task.Task.retry", side_effect=fake_super_retry) + + with pytest.raises(Retry): + task.retry(max_retries=5, countdown=None) + + assert captured["countdown"] is None + + def _patch_misconfigured_task(self, mocker): + """Patch a task so its hard limit leaves no safe retry window.""" + mocker.patch.object( + BaseCodecovTask, + "hard_time_limit_task", + new_callable=PropertyMock, + return_value=TASK_VISIBILITY_TIMEOUT_SECONDS, + ) + mocker.patch.object( + BaseCodecovTask, + "attempts", + new_callable=PropertyMock, + return_value=0, + ) + mock_request = MagicMock() + mock_request.headers = {} + mocker.patch.object( + BaseCodecovTask, + "request", + new_callable=PropertyMock, + return_value=mock_request, + ) + + def test_retry_clamps_to_max_when_misconfigured(self, mocker): + """retry() uses global max fallback when misconfigured — does not raise.""" + self._patch_misconfigured_task(mocker) + captured = {} + + def fake_super_retry(max_retries=None, countdown=None, exc=None, **kwargs): + captured["countdown"] = countdown + raise Retry() + + mocker.patch("celery.app.task.Task.retry", side_effect=fake_super_retry) + + with pytest.raises(Retry): + BaseCodecovTask().retry(max_retries=5, countdown=999) + + assert captured["countdown"] == TASK_RETRY_COUNTDOWN_MAX_SECONDS + + @pytest.fixture def mock_self_app(mocker, celery_app): mock_app = celery_app diff --git a/apps/worker/tasks/upload_finisher.py b/apps/worker/tasks/upload_finisher.py index 498b6f104b..895326abc8 100644 --- a/apps/worker/tasks/upload_finisher.py +++ b/apps/worker/tasks/upload_finisher.py @@ -496,7 +496,8 @@ def _process_reports_with_lock( error=Errors.INTERNAL_RETRYING, ) self.retry( - max_retries=UPLOAD_PROCESSOR_MAX_RETRIES, countdown=retry.countdown + max_retries=UPLOAD_PROCESSOR_MAX_RETRIES, + countdown=retry.countdown, ) def _handle_finisher_lock( @@ -617,7 +618,8 @@ def _handle_finisher_lock( error=Errors.INTERNAL_RETRYING, ) self.retry( - max_retries=UPLOAD_PROCESSOR_MAX_RETRIES, countdown=retry.countdown + max_retries=UPLOAD_PROCESSOR_MAX_RETRIES, + countdown=retry.countdown, ) def finish_reports_processing( diff --git a/apps/worker/tasks/upload_processor.py b/apps/worker/tasks/upload_processor.py index 54ff5fc104..dc7dd7b969 100644 --- a/apps/worker/tasks/upload_processor.py +++ b/apps/worker/tasks/upload_processor.py @@ -117,7 +117,10 @@ def on_processing_error(error: ProcessingError): upload_ids=[arguments["upload_id"]], error=Errors.INTERNAL_RETRYING, ) - self.retry(max_retries=error.max_retries, countdown=countdown) + self.retry( + max_retries=error.max_retries, + countdown=countdown, + ) elif error.is_retryable and self.request.retries >= error.max_retries: sentry_sdk.capture_exception( error.error_class(error.error_text), diff --git a/libs/shared/shared/celery_config.py b/libs/shared/shared/celery_config.py index 0f63bd07ed..7b26fe9ca5 100644 --- a/libs/shared/shared/celery_config.py +++ b/libs/shared/shared/celery_config.py @@ -263,6 +263,16 @@ def get_task_group(task_name: str) -> str | None: get_config("setup", "tasks", "celery", "retry_backoff_base", default=20) ) +# Fallback cap for clamp_retry_countdown on tasks with no hard time limit +# configured. As of 2026-02-25, TASK_VISIBILITY_TIMEOUT_SECONDS defaults to +# 900 s in production. The 30 s margin is arbitrary but sufficient. +TASK_RETRY_COUNTDOWN_MAX_SECONDS = TASK_VISIBILITY_TIMEOUT_SECONDS - 30 + +# Minimum safe window for clamp_retry_countdown. If the remaining visibility +# window (TASK_VISIBILITY_TIMEOUT_SECONDS - hard_time_limit_task) is smaller +# than this, clamp_retry_countdown falls back to TASK_RETRY_COUNTDOWN_MAX_SECONDS. +TASK_RETRY_MIN_SAFE_WINDOW_SECONDS = 30 + # Fixed retry delay for specific conditions (seconds) # Used for predictable retry intervals (e.g., waiting for processing lock) # Default: 60 seconds