Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions apps/worker/tasks/tests/unit/test_upload_finisher_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from services.processing.types import MergeResult, ProcessingResult
from services.timeseries import MeasurementName
from shared.celery_config import (
DEFAULT_BLOCKING_TIMEOUT_SECONDS,
compute_comparison_task_name,
notify_task_name,
pulls_task_name,
Expand Down Expand Up @@ -827,6 +828,44 @@ def test_upload_finisher_task_calls_save_commit_measurements_task(
]
)

@pytest.mark.django_db
def test_lock_manager_uses_finite_blocking_timeout(
self, dbsession, mocker, mock_redis, mock_self_app
):
"""LockManager must use a finite blocking_timeout so workers are not
blocked indefinitely when the lock is held by another task."""
commit = CommitFactory.create()
dbsession.add(commit)
dbsession.flush()

lock_manager_cls = mocker.patch("tasks.upload_finisher.LockManager")
lock_manager_cls.return_value.locked.return_value.__enter__.side_effect = (
LockRetry(60)
)

task = UploadFinisherTask()
task.request.retries = 0
task.request.headers = {}

with pytest.raises(Retry):
task.run_impl(
dbsession,
[{"upload_id": 0, "successful": True, "arguments": {}}],
repoid=commit.repoid,
commitid=commit.commitid,
commit_yaml={},
)

assert lock_manager_cls.call_count >= 1
first_call_kwargs = lock_manager_cls.call_args_list[0].kwargs
assert (
first_call_kwargs["blocking_timeout"] == DEFAULT_BLOCKING_TIMEOUT_SECONDS
), (
f"Expected blocking_timeout={DEFAULT_BLOCKING_TIMEOUT_SECONDS}, "
f"got {first_call_kwargs['blocking_timeout']}. "
"blocking_timeout=None causes worker pool exhaustion."
)

@pytest.mark.django_db
def test_retry_on_report_lock(self, dbsession, mocker, mock_redis, mock_self_app):
commit = CommitFactory.create()
Expand Down
5 changes: 3 additions & 2 deletions apps/worker/tasks/upload_finisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from services.timeseries import repository_datasets_query
from services.yaml import read_yaml_field
from shared.celery_config import (
DEFAULT_BLOCKING_TIMEOUT_SECONDS,
DEFAULT_LOCK_TIMEOUT_SECONDS,
UPLOAD_PROCESSOR_MAX_RETRIES,
compute_comparison_task_name,
Expand Down Expand Up @@ -418,7 +419,7 @@ def _process_reports_with_lock(
repoid=repoid,
commitid=commitid,
lock_timeout=self.get_lock_timeout(DEFAULT_LOCK_TIMEOUT_SECONDS),
blocking_timeout=None,
blocking_timeout=DEFAULT_BLOCKING_TIMEOUT_SECONDS,
)

try:
Comment on lines 419 to 425
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: A shared Redis lock counter is incremented by all concurrent tasks, causing them to prematurely exceed the max_retries limit and terminate, even on their first attempt.
Severity: CRITICAL

Suggested Fix

The max retry check should be based on the individual task's attempt count (retry_num) rather than the shared Redis counter. The shared counter logic should be removed or redesigned to avoid causing cascading failures across independent tasks. The implementation should be updated to match the docstring's intent, which states that retry_num is used for max retry checking.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.

Location: apps/worker/tasks/upload_finisher.py#L419-L425

Potential issue: In high-concurrency scenarios, the lock manager uses a shared Redis
counter to track failed lock acquisition attempts. When many tasks compete for the same
lock, each task that times out after 5 seconds increments this shared counter. Once the
counter reaches the `max_retries` limit (e.g., 5), all subsequent tasks attempting to
acquire the lock will immediately fail with a `max_retries_exceeded` error. This causes
tasks to terminate prematurely, even on their first attempt, because the failure count
is shared across all concurrent tasks rather than being tracked on a per-task basis.
This issue is exacerbated by the change from an indefinite blocking timeout to a
5-second timeout.

Did we get this right? 👍 / 👎 to inform future reviews.

Expand Down Expand Up @@ -517,7 +518,7 @@ def _handle_finisher_lock(
repoid=repoid,
commitid=commitid,
lock_timeout=self.get_lock_timeout(DEFAULT_LOCK_TIMEOUT_SECONDS),
blocking_timeout=None,
blocking_timeout=DEFAULT_BLOCKING_TIMEOUT_SECONDS,
)

try:
Expand Down
Loading