Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions apps/worker/services/lock_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,12 @@
from shared.celery_config import (
DEFAULT_BLOCKING_TIMEOUT_SECONDS,
DEFAULT_LOCK_TIMEOUT_SECONDS,
TASK_RETRY_COUNTDOWN_MAX_SECONDS,
)
from shared.config import get_config
from shared.helpers.redis import get_redis_connection # type: ignore

log = logging.getLogger(__name__)

MAX_RETRY_COUNTDOWN_SECONDS = 60 * 60 * 5
BASE_RETRY_COUNTDOWN_SECONDS = 200
RETRY_BACKOFF_MULTIPLIER = 3
RETRY_COUNTDOWN_RANGE_DIVISOR = 2
Expand Down Expand Up @@ -183,8 +182,8 @@ def locked(
max_retry_unbounded = self.base_retry_countdown * (
RETRY_BACKOFF_MULTIPLIER**retry_num
)
if max_retry_unbounded >= MAX_RETRY_COUNTDOWN_SECONDS:
countdown = MAX_RETRY_COUNTDOWN_SECONDS
if max_retry_unbounded >= TASK_RETRY_COUNTDOWN_MAX_SECONDS:
countdown = TASK_RETRY_COUNTDOWN_MAX_SECONDS
else:
countdown_unbounded = random.randint(
max_retry_unbounded // RETRY_COUNTDOWN_RANGE_DIVISOR,
Expand Down
16 changes: 10 additions & 6 deletions apps/worker/services/tests/test_lock_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@
from redis.exceptions import LockError

from database.enums import ReportType
from services.lock_manager import LockManager, LockRetry, LockType
from services.lock_manager import (
LockManager,
LockRetry,
LockType,
)
from shared.celery_config import TASK_RETRY_COUNTDOWN_MAX_SECONDS
from tasks.base import BaseCodecovTask


Expand Down Expand Up @@ -191,11 +196,11 @@ def test_locked_exponential_backoff_retry_2(self, mock_redis):
with manager.locked(LockType.UPLOAD, retry_num=2):
pass

# retry_num=2: 200 * 3^2 = 1800, so countdown should be between 900-1800
assert 900 <= exc_info.value.countdown <= 1800
# retry_num=2: 200 * 3^2 = 1800, which exceeds TASK_RETRY_COUNTDOWN_MAX_SECONDS
assert exc_info.value.countdown == TASK_RETRY_COUNTDOWN_MAX_SECONDS

def test_locked_exponential_backoff_cap(self, mock_redis):
"""Test that exponential backoff is capped at 5 hours"""
"""Test that exponential backoff is capped at TASK_RETRY_COUNTDOWN_MAX_SECONDS"""
mock_redis.lock.side_effect = LockError()
mock_redis.incr.return_value = 1

Expand All @@ -205,8 +210,7 @@ def test_locked_exponential_backoff_cap(self, mock_redis):
with manager.locked(LockType.UPLOAD, retry_num=10):
pass

# Cap is 60 * 60 * 5 = 18000 seconds (5 hours)
assert exc_info.value.countdown <= 18000
assert exc_info.value.countdown == TASK_RETRY_COUNTDOWN_MAX_SECONDS

def test_locked_max_retries_not_provided(self, mock_redis, caplog):
"""Test that max_retries=None doesn't log error"""
Expand Down
56 changes: 51 additions & 5 deletions apps/worker/tasks/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
from services.repository import get_repo_provider_service
from shared.celery_config import (
TASK_RETRY_BACKOFF_BASE_SECONDS,
TASK_RETRY_COUNTDOWN_MAX_SECONDS,
TASK_RETRY_MIN_SAFE_WINDOW_SECONDS,
TASK_VISIBILITY_TIMEOUT_SECONDS,
upload_breadcrumb_task_name,
)
from shared.celery_router import route_tasks_based_on_user_plan
Expand Down Expand Up @@ -202,12 +205,52 @@ def __init_subclass__(cls, name=None):
cls.task_core_runtime = TASK_CORE_RUNTIME.labels(task=name)

@property
def hard_time_limit_task(self):
def hard_time_limit_task(self) -> int:
"""Return the hard time limit for this task execution, in seconds.

Checked in priority order:
1. request.timelimit[0] — set per-execution when a task is dispatched
with a custom time limit (e.g. apply_async(time_limit=N)).
2. self.time_limit — the class-level hard limit defined on the task.
3. app.conf.task_time_limit — the Celery application default.

Returns 0 if no limit is configured at any level.
"""
if self.request.timelimit is not None and self.request.timelimit[0] is not None:
return self.request.timelimit[0]
return int(self.request.timelimit[0])
if self.time_limit is not None:
return self.time_limit
return self.app.conf.task_time_limit or 0
return int(self.time_limit)
return int(self.app.conf.task_time_limit or 0)

def clamp_retry_countdown(self, countdown: int) -> int:
"""Cap a retry countdown to prevent Redis redelivery before the retry fires.

A task calling retry() may have been running for up to hard_time_limit_task
seconds, so the safe ceiling is:
TASK_VISIBILITY_TIMEOUT_SECONDS - hard_time_limit_task

Tasks with no hard limit fall back to TASK_RETRY_COUNTDOWN_MAX_SECONDS.

When the safe window is smaller than TASK_RETRY_MIN_SAFE_WINDOW_SECONDS
(e.g. enterprise tasks with hard_timelimit=2450s >> visibility_timeout=900s),
also falls back to TASK_RETRY_COUNTDOWN_MAX_SECONDS.
"""
hard_limit = self.hard_time_limit_task
if hard_limit:
safe_window = TASK_VISIBILITY_TIMEOUT_SECONDS - hard_limit
if safe_window < TASK_RETRY_MIN_SAFE_WINDOW_SECONDS:
log.warning(
"Task hard time limit leaves insufficient visibility window for "
"retry clamping — falling back to global max",
extra={
"hard_time_limit_task": hard_limit,
"min_safe_window": TASK_RETRY_MIN_SAFE_WINDOW_SECONDS,
"visibility_timeout": TASK_VISIBILITY_TIMEOUT_SECONDS,
},
)
return min(countdown, TASK_RETRY_COUNTDOWN_MAX_SECONDS)
return min(countdown, safe_window)
return min(countdown, TASK_RETRY_COUNTDOWN_MAX_SECONDS)

def get_lock_timeout(self, default_timeout: int) -> int:
"""
Expand Down Expand Up @@ -261,7 +304,7 @@ def apply_async(self, args=None, kwargs=None, **options):
return super().apply_async(args=args, kwargs=kwargs, headers=headers, **options)

def retry(self, max_retries=None, countdown=None, exc=None, **kwargs):
"""Override Celery's retry to always update attempts header."""
"""Override Celery's retry to always update attempts header and clamp countdown."""
request = getattr(self, "request", None)
current_attempts = self.attempts
kwargs["headers"] = {
Expand All @@ -270,6 +313,9 @@ def retry(self, max_retries=None, countdown=None, exc=None, **kwargs):
"attempts": current_attempts + 1,
}

if countdown is not None:
countdown = self.clamp_retry_countdown(countdown)

return super().retry(
max_retries=max_retries, countdown=countdown, exc=exc, **kwargs
)
Expand Down
5 changes: 4 additions & 1 deletion apps/worker/tasks/bundle_analysis_notify.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,10 @@ def run_impl(
"notify_attempted": False,
"notify_succeeded": None,
}
self.retry(max_retries=self.max_retries, countdown=retry.countdown)
self.retry(
max_retries=self.max_retries,
countdown=retry.countdown,
)

@sentry_sdk.trace
def process_impl_within_lock(
Expand Down
5 changes: 4 additions & 1 deletion apps/worker/tasks/bundle_analysis_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,10 @@ def run_impl(
retry_num=self.request.retries,
)
return previous_result
self.retry(max_retries=self.max_retries, countdown=retry.countdown)
self.retry(
max_retries=self.max_retries,
countdown=retry.countdown,
)

@staticmethod
def _ba_report_already_exists(db_session, repoid: int, commitid: str) -> bool:
Expand Down
5 changes: 4 additions & 1 deletion apps/worker/tasks/http_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,10 @@ def run_impl(

def _retry_task(self):
# retry w/ exponential backoff
self.retry(max_retries=5, countdown=20 * (2**self.request.retries))
self.retry(
max_retries=5,
countdown=20 * (2**self.request.retries),
)


RegisteredHTTPRequestTask = celery_app.register_task(HTTPRequestTask())
Expand Down
5 changes: 4 additions & 1 deletion apps/worker/tasks/manual_trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,10 @@ def run_impl(
"notifications_called": False,
"message": "Unable to acquire lock",
}
self.retry(max_retries=TASK_MAX_RETRIES_DEFAULT, countdown=retry.countdown)
self.retry(
max_retries=TASK_MAX_RETRIES_DEFAULT,
countdown=retry.countdown,
)

def process_impl_within_lock(
self,
Expand Down
3 changes: 2 additions & 1 deletion apps/worker/tasks/preprocess_upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ def run_impl(self, db_session, *, repoid: int, commitid: str, **kwargs):
error=Errors.INTERNAL_RETRYING,
)
self.retry(
max_retries=PREPROCESS_UPLOAD_MAX_RETRIES, countdown=retry.countdown
max_retries=PREPROCESS_UPLOAD_MAX_RETRIES,
countdown=retry.countdown,
)

def process_impl_within_lock(self, db_session, repoid, commitid):
Expand Down
Loading
Loading