From 3e575fece2818315591914663445e4f105aad62e Mon Sep 17 00:00:00 2001 From: Tom Hu Date: Fri, 27 Feb 2026 09:56:24 +0900 Subject: [PATCH] fix(worker): use finite blocking_timeout in upload finisher locks The upload finisher was creating LockManager with blocking_timeout=None, which blocks the worker thread indefinitely until the lock is available. For high-concurrency commits (100+ parallel CI jobs), this causes all finisher tasks to block on the same lock, exhausting the worker pool and starving all other repos. Change both UPLOAD_PROCESSING and UPLOAD_FINISHER lock acquisitions to use DEFAULT_BLOCKING_TIMEOUT_SECONDS (5s). This enables the existing LockRetry mechanism with exponential backoff, freeing worker threads immediately instead of blocking for up to 600s (the soft time limit). Made-with: Cursor --- .../tests/unit/test_upload_finisher_task.py | 39 +++++++++++++++++++ apps/worker/tasks/upload_finisher.py | 5 ++- 2 files changed, 42 insertions(+), 2 deletions(-) diff --git a/apps/worker/tasks/tests/unit/test_upload_finisher_task.py b/apps/worker/tasks/tests/unit/test_upload_finisher_task.py index 31f806f182..c21c580b04 100644 --- a/apps/worker/tasks/tests/unit/test_upload_finisher_task.py +++ b/apps/worker/tasks/tests/unit/test_upload_finisher_task.py @@ -21,6 +21,7 @@ from services.processing.types import MergeResult, ProcessingResult from services.timeseries import MeasurementName from shared.celery_config import ( + DEFAULT_BLOCKING_TIMEOUT_SECONDS, compute_comparison_task_name, notify_task_name, pulls_task_name, @@ -827,6 +828,44 @@ def test_upload_finisher_task_calls_save_commit_measurements_task( ] ) + @pytest.mark.django_db + def test_lock_manager_uses_finite_blocking_timeout( + self, dbsession, mocker, mock_redis, mock_self_app + ): + """LockManager must use a finite blocking_timeout so workers are not + blocked indefinitely when the lock is held by another task.""" + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + + lock_manager_cls = mocker.patch("tasks.upload_finisher.LockManager") + lock_manager_cls.return_value.locked.return_value.__enter__.side_effect = ( + LockRetry(60) + ) + + task = UploadFinisherTask() + task.request.retries = 0 + task.request.headers = {} + + with pytest.raises(Retry): + task.run_impl( + dbsession, + [{"upload_id": 0, "successful": True, "arguments": {}}], + repoid=commit.repoid, + commitid=commit.commitid, + commit_yaml={}, + ) + + assert lock_manager_cls.call_count >= 1 + first_call_kwargs = lock_manager_cls.call_args_list[0].kwargs + assert ( + first_call_kwargs["blocking_timeout"] == DEFAULT_BLOCKING_TIMEOUT_SECONDS + ), ( + f"Expected blocking_timeout={DEFAULT_BLOCKING_TIMEOUT_SECONDS}, " + f"got {first_call_kwargs['blocking_timeout']}. " + "blocking_timeout=None causes worker pool exhaustion." + ) + @pytest.mark.django_db def test_retry_on_report_lock(self, dbsession, mocker, mock_redis, mock_self_app): commit = CommitFactory.create() diff --git a/apps/worker/tasks/upload_finisher.py b/apps/worker/tasks/upload_finisher.py index 498b6f104b..26ca55c29c 100644 --- a/apps/worker/tasks/upload_finisher.py +++ b/apps/worker/tasks/upload_finisher.py @@ -32,6 +32,7 @@ from services.timeseries import repository_datasets_query from services.yaml import read_yaml_field from shared.celery_config import ( + DEFAULT_BLOCKING_TIMEOUT_SECONDS, DEFAULT_LOCK_TIMEOUT_SECONDS, UPLOAD_PROCESSOR_MAX_RETRIES, compute_comparison_task_name, @@ -418,7 +419,7 @@ def _process_reports_with_lock( repoid=repoid, commitid=commitid, lock_timeout=self.get_lock_timeout(DEFAULT_LOCK_TIMEOUT_SECONDS), - blocking_timeout=None, + blocking_timeout=DEFAULT_BLOCKING_TIMEOUT_SECONDS, ) try: @@ -517,7 +518,7 @@ def _handle_finisher_lock( repoid=repoid, commitid=commitid, lock_timeout=self.get_lock_timeout(DEFAULT_LOCK_TIMEOUT_SECONDS), - blocking_timeout=None, + blocking_timeout=DEFAULT_BLOCKING_TIMEOUT_SECONDS, ) try: