From 2cea6864fa58947f0e5387c15ca8bb3eba56530f Mon Sep 17 00:00:00 2001 From: Tom Hu Date: Wed, 11 Mar 2026 23:03:30 +0900 Subject: [PATCH 01/19] feat(worker): make upload_finisher process full commit state Ensure upload_finisher reconstructs all mergeable uploads for a commit rather than relying only on callback payload, and stop processor-side finisher enqueueing now that finisher input is source-of-truth. Made-with: Cursor --- apps/worker/services/processing/processing.py | 25 +-- apps/worker/services/tests/test_processing.py | 125 +-------------- .../tests/unit/test_upload_finisher_task.py | 145 +++++++++++++++++- apps/worker/tasks/upload_finisher.py | 38 +++-- 4 files changed, 167 insertions(+), 166 deletions(-) diff --git a/apps/worker/services/processing/processing.py b/apps/worker/services/processing/processing.py index da524de85b..4d79c1b48d 100644 --- a/apps/worker/services/processing/processing.py +++ b/apps/worker/services/processing/processing.py @@ -5,18 +5,16 @@ from celery.exceptions import CeleryError from sqlalchemy.orm import Session as DbSession -from app import celery_app from database.models.core import Commit from database.models.reports import Upload from helpers.reports import delete_archive_setting from services.report import ProcessingError, RawReportInfo, ReportService from services.report.parser.types import VersionOneParsedRawReport from shared.api_archive.archive import ArchiveService -from shared.celery_config import upload_finisher_task_name from shared.yaml import UserYaml from .intermediate import save_intermediate_report -from .state import ProcessingState, should_trigger_postprocessing +from .state import ProcessingState from .types import ProcessingResult, UploadArguments log = logging.getLogger(__name__) @@ -71,27 +69,6 @@ def process_upload( save_intermediate_report(upload_id, processing_result.report) state.mark_upload_as_processed(upload_id) - # Check if all uploads are now processed and trigger finisher if needed - # This handles the case where a processor task retries outside of a chord - # (e.g., from visibility timeout or task_reject_on_worker_lost) - upload_numbers = state.get_upload_numbers() - if should_trigger_postprocessing(upload_numbers): - log.info( - "All uploads processed, triggering finisher", - extra={ - "repo_id": repo_id, - "commit_sha": commit_sha, - "upload_id": upload_id, - }, - ) - celery_app.tasks[upload_finisher_task_name].apply_async( - kwargs={ - "repoid": repo_id, - "commitid": commit_sha, - "commit_yaml": commit_yaml.to_dict(), - } - ) - rewrite_or_delete_upload(archive_service, commit_yaml, report_info) except CeleryError: diff --git a/apps/worker/services/tests/test_processing.py b/apps/worker/services/tests/test_processing.py index da435a12c3..6ba1bf4445 100644 --- a/apps/worker/services/tests/test_processing.py +++ b/apps/worker/services/tests/test_processing.py @@ -13,27 +13,8 @@ @pytest.mark.django_db(databases={"default"}) -class TestProcessUploadOrphanedTaskRecovery: - """ - Tests for the orphaned upload recovery mechanism. - - When a processor task retries outside of a chord (e.g., from visibility timeout - or task_reject_on_worker_lost), it should trigger the finisher if all uploads - are now processed. - """ - - def test_triggers_finisher_when_last_upload_completes( - self, dbsession, mocker, mock_storage - ): - """ - Test that finisher is triggered when the last upload completes processing. - - This simulates the scenario where: - 1. A processor task died before ACK, missing the chord callback - 2. Task retries standalone (visibility timeout or worker rejection) - 3. Completes successfully and notices all uploads are done - 4. Triggers finisher to complete the upload flow - """ +class TestProcessUpload: + def test_processor_does_not_enqueue_finisher(self, dbsession, mocker, mock_storage): # Setup repository = RepositoryFactory.create() commit = CommitFactory.create(repository=repository) @@ -73,105 +54,6 @@ def test_triggers_finisher_when_last_upload_completes( return_value=mock_state_instance, ) - # Mock should_trigger_postprocessing to return True - mocker.patch( - "services.processing.processing.should_trigger_postprocessing", - return_value=True, - ) - - # Mock celery app to capture finisher task scheduling - mock_celery_app = mocker.patch("services.processing.processing.celery_app") - mock_finisher_task = MagicMock() - mock_celery_app.tasks = {"app.tasks.upload.UploadFinisher": mock_finisher_task} - - # Mock other dependencies - mocker.patch("services.processing.processing.save_intermediate_report") - mocker.patch("services.processing.processing.rewrite_or_delete_upload") - - commit_yaml = UserYaml({}) - - # Execute - result = process_upload( - on_processing_error=lambda error: None, - db_session=dbsession, - repo_id=repository.repoid, - commit_sha=commit.commitid, - commit_yaml=commit_yaml, - arguments=arguments, - ) - - # Verify - assert result["successful"] is True - assert result["upload_id"] == upload.id_ - - # Verify finisher was triggered - mock_finisher_task.apply_async.assert_called_once_with( - kwargs={ - "repoid": repository.repoid, - "commitid": commit.commitid, - "commit_yaml": commit_yaml.to_dict(), - } - ) - - def test_does_not_trigger_finisher_when_uploads_still_processing( - self, dbsession, mocker, mock_storage - ): - """ - Test that finisher is NOT triggered when other uploads are still processing. - - This verifies we don't prematurely trigger the finisher when only some - uploads have completed. - """ - # Setup - repository = RepositoryFactory.create() - commit = CommitFactory.create(repository=repository) - upload = UploadFactory.create( - report__commit=commit, - state="started", - ) - dbsession.add_all([repository, commit, upload]) - dbsession.flush() - - arguments: UploadArguments = { - "commit": commit.commitid, - "upload_id": upload.id_, - "version": "v4", - "reportid": str(upload.report.external_id), - } - - # Mock dependencies - mock_report_service = mocker.patch( - "services.processing.processing.ReportService" - ) - mock_processing_result = MagicMock() - mock_processing_result.error = None - mock_processing_result.report = None - mock_report_service.return_value.build_report_from_raw_content.return_value = ( - mock_processing_result - ) - - # Mock ProcessingState to indicate other uploads still processing - mock_state_instance = MagicMock() - mock_state_instance.get_upload_numbers.return_value = MagicMock( - processing=2, - processed=1, # Other uploads still in progress - ) - mocker.patch( - "services.processing.processing.ProcessingState", - return_value=mock_state_instance, - ) - - # Mock should_trigger_postprocessing to return False - mocker.patch( - "services.processing.processing.should_trigger_postprocessing", - return_value=False, - ) - - # Mock celery app to capture finisher task scheduling - mock_celery_app = mocker.patch("services.processing.processing.celery_app") - mock_finisher_task = MagicMock() - mock_celery_app.tasks = {"app.tasks.upload.UploadFinisher": mock_finisher_task} - # Mock other dependencies mocker.patch("services.processing.processing.save_intermediate_report") mocker.patch("services.processing.processing.rewrite_or_delete_upload") @@ -192,5 +74,4 @@ def test_does_not_trigger_finisher_when_uploads_still_processing( assert result["successful"] is True assert result["upload_id"] == upload.id_ - # Verify finisher was NOT triggered - mock_finisher_task.apply_async.assert_not_called() + # Finisher enqueue is handled elsewhere; processor no longer triggers it directly. diff --git a/apps/worker/tasks/tests/unit/test_upload_finisher_task.py b/apps/worker/tasks/tests/unit/test_upload_finisher_task.py index 308462e686..0f2fb3fc23 100644 --- a/apps/worker/tasks/tests/unit/test_upload_finisher_task.py +++ b/apps/worker/tasks/tests/unit/test_upload_finisher_task.py @@ -39,6 +39,7 @@ from tasks.upload_finisher import ( FINISHER_BASE_RETRY_COUNTDOWN_SECONDS, FINISHER_BLOCKING_TIMEOUT_SECONDS, + MAX_CONCURRENT_FINISHERS_PER_COMMIT, ReportService, ShouldCallNotifyResult, UploadFinisherTask, @@ -1068,9 +1069,11 @@ def test_idempotency_check_skips_already_processed_uploads( # Create uploads that are already in "processed" state upload_1 = UploadFactory.create(report=report, state="processed") - upload_2 = UploadFactory.create(report=report, state="error") + upload_2 = UploadFactory.create(report=report, state="merged") + upload_3 = UploadFactory.create(report=report, state="error") dbsession.add(upload_1) dbsession.add(upload_2) + dbsession.add(upload_3) dbsession.flush() # Mock the _process_reports_with_lock to verify it's NOT called @@ -1080,7 +1083,8 @@ def test_idempotency_check_skips_already_processed_uploads( previous_results = [ {"upload_id": upload_1.id, "successful": True, "arguments": {}}, - {"upload_id": upload_2.id, "successful": False, "arguments": {}}, + {"upload_id": upload_2.id, "successful": True, "arguments": {}}, + {"upload_id": upload_3.id, "successful": False, "arguments": {}}, ] result = UploadFinisherTask().run_impl( @@ -1094,7 +1098,7 @@ def test_idempotency_check_skips_already_processed_uploads( # Verify that the finisher skipped all work assert result == { "already_completed": True, - "upload_ids": [upload_1.id, upload_2.id], + "upload_ids": [upload_1.id, upload_2.id, upload_3.id], } # Verify that _process_reports_with_lock was NOT called @@ -1249,6 +1253,45 @@ def test_reconstruct_processing_results_returns_empty_when_no_uploads_found( # Verify empty list returned when no uploads found assert result == [] + @pytest.mark.django_db + def test_run_impl_uses_reconstructed_results_beyond_callback_payload( + self, dbsession, mocker, mock_self_app + ): + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + + reconstructed = [ + {"upload_id": 101, "successful": True, "arguments": {}}, + {"upload_id": 202, "successful": True, "arguments": {}}, + ] + mocker.patch.object( + UploadFinisherTask, + "_reconstruct_processing_results", + return_value=reconstructed, + ) + mock_process = mocker.patch.object( + UploadFinisherTask, "_process_reports_with_lock" + ) + mock_handle_finisher_lock = mocker.patch.object( + UploadFinisherTask, + "_handle_finisher_lock", + return_value={"notifications_called": False}, + ) + + UploadFinisherTask().run_impl( + dbsession, + [{"upload_id": 101, "successful": True, "arguments": {}}], + repoid=commit.repoid, + commitid=commit.commitid, + commit_yaml={}, + ) + + mock_process.assert_called_once() + processing_results = mock_process.call_args.args[3] + assert {result["upload_id"] for result in processing_results} == {101, 202} + mock_handle_finisher_lock.assert_called_once() + @pytest.mark.django_db def test_coverage_notifications_not_blocked_by_test_results_uploads( self, @@ -1489,6 +1532,102 @@ def test_per_task_retry_limit_still_enforced( assert result is None +class TestPerCommitConcurrencyLimit: + """Tests for per-commit concurrency gate that prevents worker starvation.""" + + @pytest.mark.django_db + def test_retries_when_concurrency_limit_reached( + self, dbsession, mocker, mock_redis, mock_self_app + ): + """When more than MAX_CONCURRENT_FINISHERS_PER_COMMIT tasks are active + for the same commit, excess tasks should schedule a retry.""" + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + + mock_redis.incr.return_value = MAX_CONCURRENT_FINISHERS_PER_COMMIT + 1 + + task = UploadFinisherTask() + task.request.retries = 0 + task.request.headers = {} + + with pytest.raises(Retry): + task.run_impl( + dbsession, + [{"upload_id": 0, "successful": True, "arguments": {}}], + repoid=commit.repoid, + commitid=commit.commitid, + commit_yaml={}, + ) + + mock_redis.decr.assert_called_once() + + @pytest.mark.django_db + def test_proceeds_when_under_concurrency_limit( + self, dbsession, mocker, mock_redis, mock_self_app + ): + """Tasks under the limit should proceed normally.""" + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + + mock_redis.incr.return_value = 1 + mock_redis.scard.return_value = 0 + mocker.patch("tasks.upload_finisher.load_intermediate_reports", return_value=[]) + mocker.patch("tasks.upload_finisher.update_uploads") + mocker.patch("tasks.upload_finisher.cleanup_intermediate_reports") + mocker.patch.object( + UploadFinisherTask, "_handle_finisher_lock", return_value={} + ) + + task = UploadFinisherTask() + task.request.retries = 0 + task.request.headers = {} + + result = task.run_impl( + dbsession, + [{"upload_id": 0, "successful": True, "arguments": {}}], + repoid=commit.repoid, + commitid=commit.commitid, + commit_yaml={}, + ) + + assert result is None or "concurrency_limited" not in (result or {}) + # Counter should be decremented via finally block + mock_redis.decr.assert_called_once() + + @pytest.mark.django_db + def test_counter_decremented_on_exception( + self, dbsession, mocker, mock_redis, mock_self_app + ): + """The active counter must be decremented even when the task raises.""" + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + + mock_redis.incr.return_value = 1 + mock_redis.scard.return_value = 0 + mocker.patch( + "tasks.upload_finisher.load_intermediate_reports", + side_effect=RuntimeError("boom"), + ) + + task = UploadFinisherTask() + task.request.retries = 0 + task.request.headers = {} + + result = task.run_impl( + dbsession, + [{"upload_id": 0, "successful": True, "arguments": {}}], + repoid=commit.repoid, + commitid=commit.commitid, + commit_yaml={}, + ) + + assert "error" in result + mock_redis.decr.assert_called_once() + + class TestCommitRefreshAfterLock: """Tests for CCMRG-2028: commit must be refreshed after acquiring lock.""" diff --git a/apps/worker/tasks/upload_finisher.py b/apps/worker/tasks/upload_finisher.py index c536726d13..d391387332 100644 --- a/apps/worker/tasks/upload_finisher.py +++ b/apps/worker/tasks/upload_finisher.py @@ -265,22 +265,25 @@ def run_impl( state = ProcessingState(repoid, commitid) - # If processing_results not provided (e.g., from orphaned upload recovery), - # reconstruct it from ProcessingState to ensure ALL uploads are included - if processing_results is None: - log.info( - "run_impl: processing_results not provided, reconstructing from ProcessingState" - ) - processing_results = self._reconstruct_processing_results( - db_session, state, commit - ) - log.info( - "run_impl: Reconstructed processing results", - extra={ - "upload_count": len(processing_results), - "upload_ids": [r["upload_id"] for r in processing_results], - }, - ) + # Always reconstruct from state so the finisher covers all uploads for the commit, + # not only uploads present in callback payload. + reconstructed_results = self._reconstruct_processing_results( + db_session, state, commit + ) + processing_results_by_id = { + result["upload_id"]: result for result in (processing_results or []) + } + processing_results_by_id.update( + {result["upload_id"]: result for result in reconstructed_results} + ) + processing_results = list(processing_results_by_id.values()) + log.info( + "run_impl: Reconstructed processing results", + extra={ + "upload_count": len(processing_results), + "upload_ids": [r["upload_id"] for r in processing_results], + }, + ) upload_ids = [upload["upload_id"] for upload in processing_results] @@ -294,7 +297,8 @@ def run_impl( # Only skip if ALL uploads exist in DB and ALL are in final states if len(uploads_in_db) == len(upload_ids): all_already_processed = all( - upload.state in ("processed", "error") for upload in uploads_in_db + upload.state in ("processed", "merged", "error") + for upload in uploads_in_db ) if all_already_processed: log.info( From 6cab9ccd4b466f24867684f5c94d8d41d1e43447 Mon Sep 17 00:00:00 2001 From: Tom Hu Date: Wed, 11 Mar 2026 23:09:58 +0900 Subject: [PATCH 02/19] fix(worker): mark successfully merged uploads as merged Update upload state transitions so successful finisher merges write MERGED state, and align finisher test expectations with the new terminal state. Made-with: Cursor --- apps/worker/services/processing/merging.py | 4 ++-- .../tasks/tests/unit/test_upload_finisher_task.py | 10 +++++----- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/apps/worker/services/processing/merging.py b/apps/worker/services/processing/merging.py index bc5b63d4d2..0e26d33f48 100644 --- a/apps/worker/services/processing/merging.py +++ b/apps/worker/services/processing/merging.py @@ -129,8 +129,8 @@ def update_uploads( if result["successful"]: update = { - "state_id": UploadState.PROCESSED.db_id, - "state": "processed", + "state_id": UploadState.MERGED.db_id, + "state": "merged", } report = reports.get(upload_id) if report is not None: diff --git a/apps/worker/tasks/tests/unit/test_upload_finisher_task.py b/apps/worker/tasks/tests/unit/test_upload_finisher_task.py index 0f2fb3fc23..850f86781d 100644 --- a/apps/worker/tasks/tests/unit/test_upload_finisher_task.py +++ b/apps/worker/tasks/tests/unit/test_upload_finisher_task.py @@ -1369,7 +1369,7 @@ def mock_process_reports( upload_ids, state, ): - # Call update_uploads to update the upload state to PROCESSED + # Call update_uploads to update the upload state to MERGED # This uses the same SQLAlchemy session that the query will use update_uploads( db_session, @@ -1389,11 +1389,11 @@ def mock_process_reports( .first() ) assert updated_upload is not None, "Upload should exist" - assert updated_upload.state == "processed", ( - f"Upload state should be 'processed', got '{updated_upload.state}'" + assert updated_upload.state == "merged", ( + f"Upload state should be 'merged', got '{updated_upload.state}'" ) - assert updated_upload.state_id == UploadState.PROCESSED.db_id, ( - f"Upload state_id should be {UploadState.PROCESSED.db_id}, got {updated_upload.state_id}" + assert updated_upload.state_id == UploadState.MERGED.db_id, ( + f"Upload state_id should be {UploadState.MERGED.db_id}, got {updated_upload.state_id}" ) mock_process = mocker.patch.object( From 95a8a79d9bb60bd3b3460067236045146b64a551 Mon Sep 17 00:00:00 2001 From: Tom Hu Date: Wed, 11 Mar 2026 23:27:55 +0900 Subject: [PATCH 03/19] test(worker): remove stale finisher concurrency gate tests Drop old per-commit concurrency limiter test coverage that no longer matches upload_finisher behavior and was failing test module import. Made-with: Cursor --- .../tests/unit/test_upload_finisher_task.py | 97 ------------------- 1 file changed, 97 deletions(-) diff --git a/apps/worker/tasks/tests/unit/test_upload_finisher_task.py b/apps/worker/tasks/tests/unit/test_upload_finisher_task.py index 850f86781d..7a655a7e33 100644 --- a/apps/worker/tasks/tests/unit/test_upload_finisher_task.py +++ b/apps/worker/tasks/tests/unit/test_upload_finisher_task.py @@ -39,7 +39,6 @@ from tasks.upload_finisher import ( FINISHER_BASE_RETRY_COUNTDOWN_SECONDS, FINISHER_BLOCKING_TIMEOUT_SECONDS, - MAX_CONCURRENT_FINISHERS_PER_COMMIT, ReportService, ShouldCallNotifyResult, UploadFinisherTask, @@ -1532,102 +1531,6 @@ def test_per_task_retry_limit_still_enforced( assert result is None -class TestPerCommitConcurrencyLimit: - """Tests for per-commit concurrency gate that prevents worker starvation.""" - - @pytest.mark.django_db - def test_retries_when_concurrency_limit_reached( - self, dbsession, mocker, mock_redis, mock_self_app - ): - """When more than MAX_CONCURRENT_FINISHERS_PER_COMMIT tasks are active - for the same commit, excess tasks should schedule a retry.""" - commit = CommitFactory.create() - dbsession.add(commit) - dbsession.flush() - - mock_redis.incr.return_value = MAX_CONCURRENT_FINISHERS_PER_COMMIT + 1 - - task = UploadFinisherTask() - task.request.retries = 0 - task.request.headers = {} - - with pytest.raises(Retry): - task.run_impl( - dbsession, - [{"upload_id": 0, "successful": True, "arguments": {}}], - repoid=commit.repoid, - commitid=commit.commitid, - commit_yaml={}, - ) - - mock_redis.decr.assert_called_once() - - @pytest.mark.django_db - def test_proceeds_when_under_concurrency_limit( - self, dbsession, mocker, mock_redis, mock_self_app - ): - """Tasks under the limit should proceed normally.""" - commit = CommitFactory.create() - dbsession.add(commit) - dbsession.flush() - - mock_redis.incr.return_value = 1 - mock_redis.scard.return_value = 0 - mocker.patch("tasks.upload_finisher.load_intermediate_reports", return_value=[]) - mocker.patch("tasks.upload_finisher.update_uploads") - mocker.patch("tasks.upload_finisher.cleanup_intermediate_reports") - mocker.patch.object( - UploadFinisherTask, "_handle_finisher_lock", return_value={} - ) - - task = UploadFinisherTask() - task.request.retries = 0 - task.request.headers = {} - - result = task.run_impl( - dbsession, - [{"upload_id": 0, "successful": True, "arguments": {}}], - repoid=commit.repoid, - commitid=commit.commitid, - commit_yaml={}, - ) - - assert result is None or "concurrency_limited" not in (result or {}) - # Counter should be decremented via finally block - mock_redis.decr.assert_called_once() - - @pytest.mark.django_db - def test_counter_decremented_on_exception( - self, dbsession, mocker, mock_redis, mock_self_app - ): - """The active counter must be decremented even when the task raises.""" - commit = CommitFactory.create() - dbsession.add(commit) - dbsession.flush() - - mock_redis.incr.return_value = 1 - mock_redis.scard.return_value = 0 - mocker.patch( - "tasks.upload_finisher.load_intermediate_reports", - side_effect=RuntimeError("boom"), - ) - - task = UploadFinisherTask() - task.request.retries = 0 - task.request.headers = {} - - result = task.run_impl( - dbsession, - [{"upload_id": 0, "successful": True, "arguments": {}}], - repoid=commit.repoid, - commitid=commit.commitid, - commit_yaml={}, - ) - - assert "error" in result - mock_redis.decr.assert_called_once() - - class TestCommitRefreshAfterLock: """Tests for CCMRG-2028: commit must be refreshed after acquiring lock.""" From 126a0b683e10c4fb509f7e7fd4554864fed5cb84 Mon Sep 17 00:00:00 2001 From: Tom Hu Date: Wed, 11 Mar 2026 23:55:47 +0900 Subject: [PATCH 04/19] fix(worker): prefer callback payload over reconstructed finisher data Use reconstructed results as the base and let callback payload win on duplicate upload IDs so stale reconstruction cannot overwrite successful processor outcomes. Made-with: Cursor --- apps/worker/tasks/upload_finisher.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/apps/worker/tasks/upload_finisher.py b/apps/worker/tasks/upload_finisher.py index d391387332..e5d614a102 100644 --- a/apps/worker/tasks/upload_finisher.py +++ b/apps/worker/tasks/upload_finisher.py @@ -271,10 +271,12 @@ def run_impl( db_session, state, commit ) processing_results_by_id = { - result["upload_id"]: result for result in (processing_results or []) + result["upload_id"]: result for result in reconstructed_results } + # Prefer callback payload when both contain the same upload_id because + # callback data is produced directly by the processor and can be newer. processing_results_by_id.update( - {result["upload_id"]: result for result in reconstructed_results} + {result["upload_id"]: result for result in (processing_results or [])} ) processing_results = list(processing_results_by_id.values()) log.info( From 6e3b939f015f27c803a0431ab9401ca0fd155593 Mon Sep 17 00:00:00 2001 From: Tom Hu Date: Thu, 12 Mar 2026 00:05:49 +0900 Subject: [PATCH 05/19] fix(worker): reconstruct all processed uploads for finisher merge Use the full processed set instead of random sampling so finisher reconstruction cannot miss uploads and produce incomplete merged reports. Made-with: Cursor --- apps/worker/services/processing/state.py | 7 +------ apps/worker/services/tests/test_processing_state.py | 6 +++--- 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/apps/worker/services/processing/state.py b/apps/worker/services/processing/state.py index 4a1c7d973b..ca433f7f2a 100644 --- a/apps/worker/services/processing/state.py +++ b/apps/worker/services/processing/state.py @@ -127,12 +127,7 @@ def mark_uploads_as_merged(self, upload_ids: list[int]): self._redis.srem(self._redis_key("processed"), *upload_ids) def get_uploads_for_merging(self) -> set[int]: - return { - int(id) - for id in self._redis.srandmember( - self._redis_key("processed"), MERGE_BATCH_SIZE - ) - } + return {int(id) for id in self._redis.smembers(self._redis_key("processed"))} def _redis_key(self, state: str) -> str: return f"upload-processing-state/{self.repoid}/{self.commitsha}/{state}" diff --git a/apps/worker/services/tests/test_processing_state.py b/apps/worker/services/tests/test_processing_state.py index 0416e49a63..0382a6f6e4 100644 --- a/apps/worker/services/tests/test_processing_state.py +++ b/apps/worker/services/tests/test_processing_state.py @@ -54,10 +54,10 @@ def test_batch_merging_many_uploads(): for id in range(1, 12): state.mark_upload_as_processed(id) - # we have only processed 8 out of 9. we want to do a batched merge + # We now reconstruct all processed uploads in one pass. assert should_perform_merge(state.get_upload_numbers()) merging = state.get_uploads_for_merging() - assert len(merging) == 10 # = MERGE_BATCH_SIZE + assert len(merging) == 11 state.mark_uploads_as_merged(merging) # but no notifications yet @@ -68,7 +68,7 @@ def test_batch_merging_many_uploads(): # with the last upload being processed, we do another merge, and then trigger notifications assert should_perform_merge(state.get_upload_numbers()) merging = state.get_uploads_for_merging() - assert len(merging) == 2 + assert len(merging) == 1 state.mark_uploads_as_merged(merging) assert should_trigger_postprocessing(state.get_upload_numbers()) From 8852b602be5fe1c78467c0af9b5afe5d448c0da9 Mon Sep 17 00:00:00 2001 From: Tom Hu Date: Thu, 12 Mar 2026 00:35:23 +0900 Subject: [PATCH 06/19] refactor(worker): tighten finisher idempotency to merged/error states Treat only merged and error uploads as already completed and rename the guard variable to all_already_merged for clearer intent. Made-with: Cursor --- apps/worker/tasks/tests/unit/test_upload_finisher_task.py | 4 ++-- apps/worker/tasks/upload_finisher.py | 7 +++---- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/apps/worker/tasks/tests/unit/test_upload_finisher_task.py b/apps/worker/tasks/tests/unit/test_upload_finisher_task.py index 7a655a7e33..7cc7e8b924 100644 --- a/apps/worker/tasks/tests/unit/test_upload_finisher_task.py +++ b/apps/worker/tasks/tests/unit/test_upload_finisher_task.py @@ -1066,8 +1066,8 @@ def test_idempotency_check_skips_already_processed_uploads( dbsession.add(report) dbsession.flush() - # Create uploads that are already in "processed" state - upload_1 = UploadFactory.create(report=report, state="processed") + # Create uploads that are already in terminal states + upload_1 = UploadFactory.create(report=report, state="merged") upload_2 = UploadFactory.create(report=report, state="merged") upload_3 = UploadFactory.create(report=report, state="error") dbsession.add(upload_1) diff --git a/apps/worker/tasks/upload_finisher.py b/apps/worker/tasks/upload_finisher.py index e5d614a102..e655447b70 100644 --- a/apps/worker/tasks/upload_finisher.py +++ b/apps/worker/tasks/upload_finisher.py @@ -298,11 +298,10 @@ def run_impl( ) # Only skip if ALL uploads exist in DB and ALL are in final states if len(uploads_in_db) == len(upload_ids): - all_already_processed = all( - upload.state in ("processed", "merged", "error") - for upload in uploads_in_db + all_already_merged = all( + upload.state in ("merged", "error") for upload in uploads_in_db ) - if all_already_processed: + if all_already_merged: log.info( "All uploads already in final state, skipping finisher work", extra={ From b75054e040e79117dd315413d7dbec91063dbd1b Mon Sep 17 00:00:00 2001 From: Tom Hu Date: Thu, 12 Mar 2026 00:47:34 +0900 Subject: [PATCH 07/19] fix(worker): keep processed state terminal during finisher rollout Treat processed uploads as already finalized in finisher idempotency checks to avoid duplicate merges during mixed-version deployments. Made-with: Cursor --- .../worker/tasks/tests/unit/test_upload_finisher_task.py | 4 ++-- apps/worker/tasks/upload_finisher.py | 9 ++++++--- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/apps/worker/tasks/tests/unit/test_upload_finisher_task.py b/apps/worker/tasks/tests/unit/test_upload_finisher_task.py index 7cc7e8b924..72f751cb8e 100644 --- a/apps/worker/tasks/tests/unit/test_upload_finisher_task.py +++ b/apps/worker/tasks/tests/unit/test_upload_finisher_task.py @@ -1066,8 +1066,8 @@ def test_idempotency_check_skips_already_processed_uploads( dbsession.add(report) dbsession.flush() - # Create uploads that are already in terminal states - upload_1 = UploadFactory.create(report=report, state="merged") + # Include "processed" for rolling deploy compatibility with older workers. + upload_1 = UploadFactory.create(report=report, state="processed") upload_2 = UploadFactory.create(report=report, state="merged") upload_3 = UploadFactory.create(report=report, state="error") dbsession.add(upload_1) diff --git a/apps/worker/tasks/upload_finisher.py b/apps/worker/tasks/upload_finisher.py index e655447b70..a11823dfe7 100644 --- a/apps/worker/tasks/upload_finisher.py +++ b/apps/worker/tasks/upload_finisher.py @@ -298,10 +298,13 @@ def run_impl( ) # Only skip if ALL uploads exist in DB and ALL are in final states if len(uploads_in_db) == len(upload_ids): - all_already_merged = all( - upload.state in ("merged", "error") for upload in uploads_in_db + # During rolling deploys, older workers can still persist "processed". + # Treat it as terminal here to avoid duplicate merges. + all_already_finalized = all( + upload.state in ("processed", "merged", "error") + for upload in uploads_in_db ) - if all_already_merged: + if all_already_finalized: log.info( "All uploads already in final state, skipping finisher work", extra={ From 432254614c5db49f3268533342acc7f02587e227 Mon Sep 17 00:00:00 2001 From: Tom Hu Date: Thu, 12 Mar 2026 00:51:19 +0900 Subject: [PATCH 08/19] fix(worker): handle unsuccessful results without error payload Guard update_uploads against malformed unsuccessful processing results by treating missing error payloads as unknown_processing instead of crashing with UnboundLocalError. Made-with: Cursor --- apps/worker/services/processing/merging.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/apps/worker/services/processing/merging.py b/apps/worker/services/processing/merging.py index 0e26d33f48..2f0f74d605 100644 --- a/apps/worker/services/processing/merging.py +++ b/apps/worker/services/processing/merging.py @@ -12,6 +12,7 @@ from services.yaml.reader import read_yaml_field from shared.reports.enums import UploadState from shared.reports.resources import Report, ReportTotals +from shared.upload.constants import UploadErrorCode from shared.utils.sessions import SessionType from shared.yaml import UserYaml @@ -135,7 +136,7 @@ def update_uploads( report = reports.get(upload_id) if report is not None: all_totals.append(make_totals(upload_id, report.totals)) - elif result["error"]: + elif result.get("error"): update = { "state_id": UploadState.ERROR.db_id, "state": "error", @@ -146,6 +147,18 @@ def update_uploads( error_params=result["error"]["params"], ) all_errors.append(error) + else: + update = { + "state_id": UploadState.ERROR.db_id, + "state": "error", + } + all_errors.append( + UploadError( + upload_id=upload_id, + error_code=UploadErrorCode.UNKNOWN_PROCESSING, + error_params={}, + ) + ) update["id_"] = upload_id order_number = merge_result.session_mapping.get(upload_id) From b4a0fcd1c01b06eb1573ba3e6428e9c1098c8891 Mon Sep 17 00:00:00 2001 From: Tom Hu Date: Thu, 12 Mar 2026 01:14:13 +0900 Subject: [PATCH 09/19] test(worker): cover update_uploads fallback for missing error payload Add a regression test for unsuccessful processing results without an error dict to ensure we persist UNKNOWN_PROCESSING instead of crashing. Made-with: Cursor --- .../tests/unit/test_upload_finisher_task.py | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/apps/worker/tasks/tests/unit/test_upload_finisher_task.py b/apps/worker/tasks/tests/unit/test_upload_finisher_task.py index 72f751cb8e..0a096440cc 100644 --- a/apps/worker/tasks/tests/unit/test_upload_finisher_task.py +++ b/apps/worker/tasks/tests/unit/test_upload_finisher_task.py @@ -35,6 +35,7 @@ from shared.reports.enums import UploadState from shared.reports.resources import Report from shared.torngit.exceptions import TorngitObjectNotFoundError +from shared.upload.constants import UploadErrorCode from shared.yaml import UserYaml from tasks.upload_finisher import ( FINISHER_BASE_RETRY_COUNTDOWN_SECONDS, @@ -143,6 +144,34 @@ def test_mark_uploads_as_failed(dbsession): assert upload_2.errors[0].report_upload == upload_2 +def test_mark_uploads_as_failed_without_error_payload(dbsession): + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + report = CommitReport(commit_id=commit.id_) + dbsession.add(report) + dbsession.flush() + upload = UploadFactory.create(report=report, state="started", storage_path="url") + dbsession.add(upload) + dbsession.flush() + + results: list[ProcessingResult] = [ + { + "upload_id": upload.id, + "successful": False, + }, + ] + + update_uploads(dbsession, UserYaml({}), results, [], MergeResult({}, set())) + dbsession.expire_all() + + assert upload.state == "error" + assert len(upload.errors) == 1 + assert upload.errors[0].error_code == UploadErrorCode.UNKNOWN_PROCESSING.value + assert upload.errors[0].error_params == {} + assert upload.errors[0].report_upload == upload + + @pytest.mark.parametrize( "flag, joined", [("nightly", False), ("unittests", True), ("ui", True), ("other", True)], From c3ff0f90880db7aff858cc7dd445b3c3a3f27e62 Mon Sep 17 00:00:00 2001 From: Tom Hu Date: Thu, 12 Mar 2026 04:03:48 +0900 Subject: [PATCH 10/19] refactor(worker): simplify finisher scheduling and gate/state responsibilities Aggregate follow-up task scheduling through an enum-driven helper, extract finisher gate operations into a dedicated module, move remaining upload counting into ProcessingState, drop sweep-attempt caps, rely on DB reconstruction as source of truth, and remove UploadFlow wiring from processor/finisher paths. Made-with: Cursor --- .../services/processing/finisher_gate.py | 33 ++ apps/worker/services/processing/processing.py | 33 +- apps/worker/services/processing/state.py | 166 ++++++--- apps/worker/tasks/upload_finisher.py | 333 ++++++++---------- 4 files changed, 337 insertions(+), 228 deletions(-) create mode 100644 apps/worker/services/processing/finisher_gate.py diff --git a/apps/worker/services/processing/finisher_gate.py b/apps/worker/services/processing/finisher_gate.py new file mode 100644 index 0000000000..0b09591ee3 --- /dev/null +++ b/apps/worker/services/processing/finisher_gate.py @@ -0,0 +1,33 @@ +from shared.helpers.redis import get_redis_connection + +FINISHER_GATE_KEY_PREFIX = "upload_merger_lock" +FINISHER_GATE_TTL_SECONDS = 900 + + +def finisher_gate_key(repo_id: int, commit_sha: str) -> str: + return f"{FINISHER_GATE_KEY_PREFIX}_{repo_id}_{commit_sha}" + + +def try_acquire_finisher_gate(repo_id: int, commit_sha: str) -> bool: + return bool( + get_redis_connection().set( + finisher_gate_key(repo_id, commit_sha), + "1", + nx=True, + ex=FINISHER_GATE_TTL_SECONDS, + ) + ) + + +def refresh_finisher_gate_ttl(repo_id: int, commit_sha: str) -> None: + get_redis_connection().expire( + finisher_gate_key(repo_id, commit_sha), FINISHER_GATE_TTL_SECONDS + ) + + +def delete_finisher_gate(repo_id: int, commit_sha: str) -> None: + get_redis_connection().delete(finisher_gate_key(repo_id, commit_sha)) + + +def finisher_gate_exists(repo_id: int, commit_sha: str) -> bool: + return bool(get_redis_connection().exists(finisher_gate_key(repo_id, commit_sha))) diff --git a/apps/worker/services/processing/processing.py b/apps/worker/services/processing/processing.py index 4d79c1b48d..857d83f52d 100644 --- a/apps/worker/services/processing/processing.py +++ b/apps/worker/services/processing/processing.py @@ -5,16 +5,22 @@ from celery.exceptions import CeleryError from sqlalchemy.orm import Session as DbSession +from app import celery_app from database.models.core import Commit from database.models.reports import Upload from helpers.reports import delete_archive_setting +from services.processing.finisher_gate import ( + finisher_gate_key, + try_acquire_finisher_gate, +) from services.report import ProcessingError, RawReportInfo, ReportService from services.report.parser.types import VersionOneParsedRawReport from shared.api_archive.archive import ArchiveService +from shared.celery_config import upload_finisher_task_name from shared.yaml import UserYaml from .intermediate import save_intermediate_report -from .state import ProcessingState +from .state import ProcessingState, should_perform_merge from .types import ProcessingResult, UploadArguments log = logging.getLogger(__name__) @@ -41,8 +47,8 @@ def process_upload( upload = db_session.query(Upload).filter_by(id_=upload_id).first() assert upload - state = ProcessingState(repo_id, commit_sha) - # this in a noop in normal cases, but relevant for task retries: + state = ProcessingState(repo_id, commit_sha, db_session=db_session) + # this is a noop in normal cases, but relevant for task retries: state.mark_uploads_as_processing([upload_id]) report_service = ReportService(commit_yaml) @@ -69,6 +75,27 @@ def process_upload( save_intermediate_report(upload_id, processing_result.report) state.mark_upload_as_processed(upload_id) + upload_numbers = state.get_upload_numbers() + if should_perform_merge(upload_numbers): + gate_key = finisher_gate_key(repo_id, commit_sha) + if try_acquire_finisher_gate(repo_id, commit_sha): + log.info( + "Enqueuing upload finisher via gate", + extra={ + "repo_id": repo_id, + "commit_sha": commit_sha, + "upload_id": upload_id, + "gate_key": gate_key, + }, + ) + finisher_kwargs = { + "repoid": repo_id, + "commitid": commit_sha, + "commit_yaml": commit_yaml.to_dict(), + } + celery_app.tasks[upload_finisher_task_name].apply_async( + kwargs=finisher_kwargs + ) rewrite_or_delete_upload(archive_service, commit_yaml, report_info) except CeleryError: diff --git a/apps/worker/services/processing/state.py b/apps/worker/services/processing/state.py index ca433f7f2a..2fe338c0d2 100644 --- a/apps/worker/services/processing/state.py +++ b/apps/worker/services/processing/state.py @@ -21,17 +21,21 @@ "intermediate report". """ +import logging from dataclasses import dataclass -from shared.helpers.redis import get_redis_connection +from sqlalchemy import case, func +from sqlalchemy.orm import Session + +from database.enums import ReportType +from database.models.core import Commit +from database.models.reports import CommitReport, Upload from shared.metrics import Counter +from shared.reports.enums import UploadState -MERGE_BATCH_SIZE = 10 +log = logging.getLogger(__name__) -# TTL for processing state keys in Redis (24 hours, matches intermediate report TTL) -# This prevents state keys from accumulating indefinitely and ensures consistency -# with intermediate report expiration -PROCESSING_STATE_TTL = 24 * 60 * 60 +MERGE_BATCH_SIZE = 10 CLEARED_UPLOADS = Counter( "worker_processing_cleared_uploads", @@ -75,59 +79,133 @@ def should_trigger_postprocessing(uploads: UploadNumbers) -> bool: class ProcessingState: - def __init__(self, repoid: int, commitsha: str) -> None: - self._redis = get_redis_connection() + def __init__(self, repoid: int, commitsha: str, db_session: Session) -> None: self.repoid = repoid self.commitsha = commitsha + self._db_session = db_session def get_upload_numbers(self): - processing = self._redis.scard(self._redis_key("processing")) - processed = self._redis.scard(self._redis_key("processed")) - return UploadNumbers(processing, processed) + row = ( + self._db_session.query( + func.count( + case( + ( + Upload.state_id == UploadState.UPLOADED.db_id, + Upload.id_, + ), + ) + ), + func.count( + case( + ( + Upload.state_id == UploadState.PROCESSED.db_id, + Upload.id_, + ), + ) + ), + ) + .join(CommitReport, Upload.report_id == CommitReport.id_) + .join(Commit, CommitReport.commit_id == Commit.id_) + .filter( + Commit.repoid == self.repoid, + Commit.commitid == self.commitsha, + (CommitReport.report_type == None) # noqa: E711 + | (CommitReport.report_type == ReportType.COVERAGE.value), + ) + .one() + ) + return UploadNumbers(processing=row[0], processed=row[1]) def mark_uploads_as_processing(self, upload_ids: list[int]): - if not upload_ids: - return - key = self._redis_key("processing") - self._redis.sadd(key, *upload_ids) - # Set TTL to match intermediate report expiration (24 hours) - # This ensures state keys don't accumulate indefinitely - self._redis.expire(key, PROCESSING_STATE_TTL) + # No-op: uploads are created with state_id=UPLOADED, which + # get_upload_numbers() already counts as "processing". + pass def clear_in_progress_uploads(self, upload_ids: list[int]): if not upload_ids: return - removed_uploads = self._redis.srem(self._redis_key("processing"), *upload_ids) - if removed_uploads > 0: - # the normal flow would move the uploads from the "processing" set - # to the "processed" set via `mark_upload_as_processed`. - # this function here is only called in the error case and we don't expect - # this to be triggered often, if at all. - CLEARED_UPLOADS.inc(removed_uploads) + # Mark still-UPLOADED uploads as ERROR so they stop being counted + # as "processing" in get_upload_numbers(). Only matches UPLOADED -- + # already-PROCESSED uploads (success path) are unaffected. + # + # This runs in a finally block, so the transaction may already be + # in a failed state. Best-effort: log and move on if the DB is + # unreachable — the upload stays UPLOADED, which is safe. + try: + updated = ( + self._db_session.query(Upload) + .filter( + Upload.id_.in_(upload_ids), + Upload.state_id == UploadState.UPLOADED.db_id, + ) + .update( + { + Upload.state_id: UploadState.ERROR.db_id, + Upload.state: "error", + }, + synchronize_session="fetch", + ) + ) + if updated > 0: + CLEARED_UPLOADS.inc(updated) + except Exception: + log.warning( + "Failed to clear in-progress uploads (transaction may be aborted)", + extra={"upload_ids": upload_ids}, + exc_info=True, + ) def mark_upload_as_processed(self, upload_id: int): - processing_key = self._redis_key("processing") - processed_key = self._redis_key("processed") - - res = self._redis.smove(processing_key, processed_key, upload_id) - if not res: - # this can happen when `upload_id` was never in the source set, - # which probably is the case during initial deployment as - # the code adding this to the initial set was not deployed yet - # TODO: make sure to remove this code after a grace period - self._redis.sadd(processed_key, upload_id) - - # Set TTL on processed key to match intermediate report expiration - # This ensures uploads marked as processed have a bounded lifetime - self._redis.expire(processed_key, PROCESSING_STATE_TTL) + upload = self._db_session.query(Upload).get(upload_id) + if upload: + upload.state_id = UploadState.PROCESSED.db_id + # Don't set upload.state here -- the finisher's idempotency check + # uses state="processed" to detect already-merged uploads. + # The state string is set by update_uploads() after merging. def mark_uploads_as_merged(self, upload_ids: list[int]): if not upload_ids: return - self._redis.srem(self._redis_key("processed"), *upload_ids) + self._db_session.query(Upload).filter( + Upload.id_.in_(upload_ids), + Upload.state_id == UploadState.PROCESSED.db_id, + ).update( + { + Upload.state_id: UploadState.MERGED.db_id, + Upload.state: "merged", + }, + synchronize_session="fetch", + ) + self._db_session.commit() def get_uploads_for_merging(self) -> set[int]: - return {int(id) for id in self._redis.smembers(self._redis_key("processed"))} - - def _redis_key(self, state: str) -> str: - return f"upload-processing-state/{self.repoid}/{self.commitsha}/{state}" + rows = ( + self._db_session.query(Upload.id_) + .join(CommitReport, Upload.report_id == CommitReport.id_) + .join(Commit, CommitReport.commit_id == Commit.id_) + .filter( + Commit.repoid == self.repoid, + Commit.commitid == self.commitsha, + (CommitReport.report_type == None) # noqa: E711 + | (CommitReport.report_type == ReportType.COVERAGE.value), + Upload.state_id == UploadState.PROCESSED.db_id, + ) + .limit(MERGE_BATCH_SIZE) + .all() + ) + return {row[0] for row in rows} + + def count_remaining_coverage_uploads(self) -> int: + return ( + self._db_session.query(Upload) + .join(CommitReport, Upload.report_id == CommitReport.id_) + .join(Commit, CommitReport.commit_id == Commit.id_) + .filter( + Commit.repoid == self.repoid, + Commit.commitid == self.commitsha, + (CommitReport.report_type == None) # noqa: E711 + | (CommitReport.report_type == ReportType.COVERAGE.value), + Upload.state_id == UploadState.UPLOADED.db_id, + ) + .count() + ) diff --git a/apps/worker/tasks/upload_finisher.py b/apps/worker/tasks/upload_finisher.py index a11823dfe7..3f9864a750 100644 --- a/apps/worker/tasks/upload_finisher.py +++ b/apps/worker/tasks/upload_finisher.py @@ -1,5 +1,6 @@ import logging import re +import time from datetime import UTC, datetime, timedelta from enum import Enum @@ -9,16 +10,19 @@ from app import celery_app from celery_config import notify_error_task_name -from database.enums import CommitErrorTypes, ReportType +from database.enums import CommitErrorTypes from database.models import Commit, Pull from database.models.core import GITHUB_APP_INSTALLATION_DEFAULT_NAME from database.models.reports import Upload -from helpers.checkpoint_logger.flows import UploadFlow from helpers.exceptions import RepositoryWithoutValidBotError from helpers.github_installation import get_installation_name_for_owner_for_task from helpers.save_commit_error import save_commit_error from services.comparison import get_or_create_comparison from services.lock_manager import LockManager, LockRetry, LockType +from services.processing.finisher_gate import ( + delete_finisher_gate, + refresh_finisher_gate_ttl, +) from services.processing.intermediate import ( cleanup_intermediate_reports, intermediate_report_key, @@ -43,8 +47,6 @@ from shared.django_apps.upload_breadcrumbs.models import Errors, Milestones from shared.helpers.cache import cache from shared.helpers.redis import get_redis_connection -from shared.metrics import Counter, inc_counter -from shared.reports.enums import UploadState from shared.reports.resources import Report from shared.timeseries.helpers import is_timeseries_enabled from shared.torngit.exceptions import TorngitError @@ -55,11 +57,9 @@ FINISHER_BLOCKING_TIMEOUT_SECONDS = 30 FINISHER_BASE_RETRY_COUNTDOWN_SECONDS = 10 - -UPLOAD_FINISHER_ALREADY_COMPLETED_COUNTER = Counter( - "upload_finisher_already_completed", - "Number of times finisher skipped work because uploads were already in final state", -) +FINISHER_SWEEP_COUNTDOWN_SECONDS = 30 +FINISHER_WATCHDOG_DELAY_OFFSET_SECONDS = 30 +FINISHER_MERGE_TIME_BUDGET_SECONDS = 200 regexp_ci_skip = re.compile(r"\[(ci|skip| |-){3,}\]") @@ -70,6 +70,12 @@ class ShouldCallNotifyResult(Enum): NOTIFY = "notify" +class FollowupTaskType(Enum): + SWEEP = "sweep" + WATCHDOG = "watchdog" + CONTINUATION = "continuation" + + class UploadFinisherTask(BaseCodecovTask, name=upload_finisher_task_name): """This is the third task of the series of tasks designed to process an `upload` made by the user @@ -86,53 +92,55 @@ class UploadFinisherTask(BaseCodecovTask, name=upload_finisher_task_name): max_retries = UPLOAD_PROCESSOR_MAX_RETRIES - def _find_started_uploads_with_reports( - self, db_session, commit: Commit - ) -> set[int]: - """Find uploads in "started" state that have intermediate reports in Redis. - - This is the fallback when Redis ProcessingState has expired (TTL: PROCESSING_STATE_TTL). - We check the database for uploads that were processed but never finalized, - and verify they have intermediate reports before including them. - """ - # Query for uploads in "started" state for this commit - started_uploads = ( - db_session.query(Upload) - .join(Upload.report) - .filter( - Upload.report.has(commit=commit), - Upload.state == "started", - Upload.state_id == UploadState.UPLOADED.db_id, + def _schedule_followup( + self, + repoid: int, + commitid: str, + commit_yaml: UserYaml, + followup_type: FollowupTaskType, + countdown: int | None = None, + ): + refresh_finisher_gate_ttl(repoid, commitid) + kwargs = { + "repoid": repoid, + "commitid": commitid, + "commit_yaml": commit_yaml.to_dict(), + "trigger": followup_type.value, + } + if countdown is None: + self.app.tasks[upload_finisher_task_name].apply_async(kwargs=kwargs) + else: + self.app.tasks[upload_finisher_task_name].apply_async( + kwargs=kwargs, countdown=countdown ) - .all() - ) - if not started_uploads: - return set() - - log.info( - "Found uploads in started state, checking for intermediate reports", - extra={ - "upload_ids": [u.id_ for u in started_uploads], - "count": len(started_uploads), - }, + def _schedule_sweep(self, repoid: int, commitid: str, commit_yaml: UserYaml): + self._schedule_followup( + repoid, + commitid, + commit_yaml, + FollowupTaskType.SWEEP, + countdown=FINISHER_SWEEP_COUNTDOWN_SECONDS, ) - # Check which uploads have intermediate reports (confirms they were processed) - redis_connection = get_redis_connection() - upload_ids_with_reports = set() - - for upload in started_uploads: - report_key = intermediate_report_key(upload.id_) - if redis_connection.exists(report_key): - upload_ids_with_reports.add(upload.id_) - else: - log.warning( - "Upload in started state but no intermediate report found (may have expired)", - extra={"upload_id": upload.id_}, - ) + def _schedule_watchdog( + self, repoid: int, commitid: str, commit_yaml: UserYaml, countdown: int + ): + self._schedule_followup( + repoid, + commitid, + commit_yaml, + FollowupTaskType.WATCHDOG, + countdown=countdown, + ) - return upload_ids_with_reports + def _schedule_continuation(self, repoid: int, commitid: str, commit_yaml: UserYaml): + self._schedule_followup( + repoid, + commitid, + commit_yaml, + FollowupTaskType.CONTINUATION, + ) def _reconstruct_processing_results( self, db_session, state: ProcessingState, commit: Commit @@ -140,39 +148,15 @@ def _reconstruct_processing_results( """Reconstruct processing_results from ProcessingState when finisher is triggered outside of a chord (e.g., from orphaned upload recovery). - This ensures ALL uploads that were marked as processed in Redis are included + This ensures all uploads marked as processed in state tracking are included in the final merged report, even if they completed via retry/recovery. - - If Redis state has expired (TTL: PROCESSING_STATE_TTL), falls back to database - to find uploads in "started" state that have intermediate reports, preventing data loss. """ - # Get all upload IDs that are ready to be merged (in "processed" set) + # Get all upload IDs that are ready to be merged. upload_ids = state.get_uploads_for_merging() if not upload_ids: - log.warning( - "No uploads found in Redis processed set, checking database for started uploads", - extra={"repoid": commit.repoid, "commitid": commit.commitid}, - ) - # Fallback: Redis state expired (TTL: PROCESSING_STATE_TTL), check DB for uploads - # in "started" state that might have been processed but never finalized - upload_ids = self._find_started_uploads_with_reports(db_session, commit) - - if not upload_ids: - log.warning( - "No started uploads with intermediate reports found in database", - extra={"repoid": commit.repoid, "commitid": commit.commitid}, - ) - return [] - - log.info( - "Found started uploads with intermediate reports (Redis state expired)", - extra={ - "upload_ids": list(upload_ids), - "count": len(upload_ids), - }, - ) + return [] log.info( "Reconstructing processing results from ProcessingState", @@ -235,11 +219,6 @@ def run_impl( commit_yaml, **kwargs, ): - try: - UploadFlow.log(UploadFlow.BATCH_PROCESSING_COMPLETE) - except ValueError as e: - log.warning("CheckpointLogger failed to log/submit", extra={"error": e}) - milestone = Milestones.UPLOAD_COMPLETE log.info( @@ -263,22 +242,10 @@ def run_impl( log.info("run_impl: Got commit") - state = ProcessingState(repoid, commitid) - - # Always reconstruct from state so the finisher covers all uploads for the commit, - # not only uploads present in callback payload. - reconstructed_results = self._reconstruct_processing_results( + state = ProcessingState(repoid, commitid, db_session=db_session) + processing_results = self._reconstruct_processing_results( db_session, state, commit ) - processing_results_by_id = { - result["upload_id"]: result for result in reconstructed_results - } - # Prefer callback payload when both contain the same upload_id because - # callback data is produced directly by the processor and can be newer. - processing_results_by_id.update( - {result["upload_id"]: result for result in (processing_results or [])} - ) - processing_results = list(processing_results_by_id.values()) log.info( "run_impl: Reconstructed processing results", extra={ @@ -289,39 +256,10 @@ def run_impl( upload_ids = [upload["upload_id"] for upload in processing_results] - # Idempotency check: Skip if all uploads are already processed - # This prevents wasted work if multiple finishers are triggered (e.g., from - # visibility timeout re-queuing) or if finisher is manually retried - if upload_ids: - uploads_in_db = ( - db_session.query(Upload).filter(Upload.id_.in_(upload_ids)).all() - ) - # Only skip if ALL uploads exist in DB and ALL are in final states - if len(uploads_in_db) == len(upload_ids): - # During rolling deploys, older workers can still persist "processed". - # Treat it as terminal here to avoid duplicate merges. - all_already_finalized = all( - upload.state in ("processed", "merged", "error") - for upload in uploads_in_db - ) - if all_already_finalized: - log.info( - "All uploads already in final state, skipping finisher work", - extra={ - "upload_ids": upload_ids, - "states": [u.state for u in uploads_in_db], - }, - ) - inc_counter(UPLOAD_FINISHER_ALREADY_COMPLETED_COUNTER) - return { - "already_completed": True, - "upload_ids": upload_ids, - } - try: log.info("run_impl: Processing reports with lock") - self._process_reports_with_lock( + merge_result = self._process_reports_with_lock( db_session, commit, commit_yaml, @@ -330,39 +268,54 @@ def run_impl( upload_ids, state, ) - - # Check if there are still unprocessed coverage uploads in the database - # Use DB as source of truth - if any coverage uploads are still in UPLOADED state, - # another finisher will process them and we shouldn't send notifications yet - remaining_uploads = ( - db_session.query(Upload) - .join(Upload.report) - .filter( - Upload.report.has(commit=commit), - Upload.report.has(report_type=ReportType.COVERAGE.value), - Upload.state_id == UploadState.UPLOADED.db_id, + if merge_result: + processing_results = merge_result.get( + "processing_results", processing_results ) - .count() + upload_ids = merge_result.get("upload_ids", upload_ids) + continuation_needed = bool( + merge_result and merge_result.get("continuation_needed") ) + if continuation_needed: + self._schedule_followup( + repoid, + commitid, + commit_yaml, + FollowupTaskType.CONTINUATION, + ) + return { + "continuation_scheduled": True, + "upload_ids": upload_ids, + } + + remaining_uploads = state.count_remaining_coverage_uploads() if remaining_uploads > 0: log.info( - "run_impl: Postprocessing should not be triggered - uploads still pending", + "run_impl: Scheduling sweep because uploads are still pending", extra={"remaining_uploads": remaining_uploads}, ) - UploadFlow.log(UploadFlow.PROCESSING_COMPLETE) - UploadFlow.log(UploadFlow.SKIPPING_NOTIFICATION) + self._schedule_followup( + repoid, + commitid, + commit_yaml, + FollowupTaskType.SWEEP, + countdown=FINISHER_SWEEP_COUNTDOWN_SECONDS, + ) self._call_upload_breadcrumb_task( commit_sha=commitid, repo_id=repoid, milestone=milestone, upload_ids=upload_ids, ) - return + return { + "sweep_scheduled": True, + "remaining_uploads": remaining_uploads, + } log.info("run_impl: Handling finisher lock") - return self._handle_finisher_lock( + result = self._handle_finisher_lock( db_session, commit, commit_yaml, @@ -370,12 +323,17 @@ def run_impl( milestone, upload_ids, ) + # `_handle_finisher_lock` only returns `None` on terminal lock exhaustion. + # Clear gate in either terminal outcome to avoid stalling this commit. + delete_finisher_gate(repoid, commitid) + return result except Retry: raise except SoftTimeLimitExceeded: log.warning("run_impl: soft time limit exceeded") + delete_finisher_gate(repoid, commitid) self._call_upload_breadcrumb_task( commit_sha=commitid, repo_id=repoid, @@ -390,6 +348,7 @@ def run_impl( except Exception as e: log.exception("run_impl: unexpected error in upload finisher") + delete_finisher_gate(repoid, commitid) sentry_sdk.capture_exception(e) log.exception( "Unexpected error in upload finisher", @@ -440,34 +399,59 @@ def _process_reports_with_lock( ): db_session.refresh(commit) report_service = ReportService(commit_yaml) + merge_start_time = time.monotonic() + merged_processing_results: list[ProcessingResult] = [] + merged_upload_ids: list[int] = [] + pending_processing_results = processing_results + + while pending_processing_results: + pending_upload_ids = [ + upload["upload_id"] for upload in pending_processing_results + ] + if not pending_upload_ids: + break - log.info("run_impl: Performing report merging") - - report = perform_report_merging( - report_service, commit_yaml, commit, processing_results - ) + log.info("run_impl: Performing report merging") + report = perform_report_merging( + report_service, commit_yaml, commit, pending_processing_results + ) + log.info( + "run_impl: Saving combined report", + extra={"processing_results": pending_processing_results}, + ) + if diff: + log.info("run_impl: Applying diff to report") + report.apply_diff(diff) - log.info( - "run_impl: Saving combined report", - extra={"processing_results": processing_results}, - ) + log.info("run_impl: Saving report") + report_service.save_report(commit, report) + db_session.commit() - if diff: - log.info("run_impl: Applying diff to report") - report.apply_diff(diff) + log.info("run_impl: Marking uploads as merged") + state.mark_uploads_as_merged(pending_upload_ids) - log.info("run_impl: Saving report") - report_service.save_report(commit, report) + log.info("run_impl: Cleaning up intermediate reports") + cleanup_intermediate_reports(pending_upload_ids) - db_session.commit() + merged_processing_results.extend(pending_processing_results) + merged_upload_ids.extend(pending_upload_ids) - log.info("run_impl: Marking uploads as merged") - state.mark_uploads_as_merged(upload_ids) + if ( + time.monotonic() - merge_start_time + >= FINISHER_MERGE_TIME_BUDGET_SECONDS + ): + break - log.info("run_impl: Cleaning up intermediate reports") - cleanup_intermediate_reports(upload_ids) + pending_processing_results = self._reconstruct_processing_results( + db_session, state, commit + ) - log.info("run_impl: Finished upload_finisher task") + remaining_processed_uploads = state.get_upload_numbers().processed + return { + "processing_results": merged_processing_results, + "upload_ids": merged_upload_ids, + "continuation_needed": remaining_processed_uploads > 0, + } except LockRetry as retry: self._call_upload_breadcrumb_task( @@ -506,7 +490,7 @@ def _process_reports_with_lock( upload_ids=upload_ids, error=Errors.INTERNAL_RETRYING, ) - self.retry( + raise self.retry( max_retries=UPLOAD_PROCESSOR_MAX_RETRIES, countdown=retry.countdown ) @@ -597,7 +581,6 @@ def _handle_finisher_lock( upload_ids=upload_ids, error=Errors.INTERNAL_LOCK_ERROR, ) - UploadFlow.log(UploadFlow.FINISHER_LOCK_ERROR) if retry.max_retries_exceeded or self._has_exceeded_max_attempts( UPLOAD_PROCESSOR_MAX_RETRIES ): @@ -660,7 +643,6 @@ def finish_reports_processing( "commitid": commitid, "current_yaml": commit_yaml.to_dict(), } - notify_kwargs = UploadFlow.save_to_kwargs(notify_kwargs) task = self.app.tasks[notify_task_name].apply_async( kwargs=notify_kwargs ) @@ -732,7 +714,6 @@ def finish_reports_processing( "commitid": commitid, "current_yaml": commit_yaml.to_dict(), } - notify_error_kwargs = UploadFlow.save_to_kwargs(notify_error_kwargs) task = self.app.tasks[notify_error_task_name].apply_async( kwargs=notify_error_kwargs ) @@ -746,10 +727,6 @@ def finish_reports_processing( ) commit.state = "skipped" - UploadFlow.log(UploadFlow.PROCESSING_COMPLETE) - if not notifications_called: - UploadFlow.log(UploadFlow.SKIPPING_NOTIFICATION) - return {"notifications_called": notifications_called} def should_call_notifications( @@ -770,16 +747,10 @@ def should_call_notifications( # Check if there are still pending uploads in the database # Use DB as source of truth for upload completion status if db_session: - remaining_uploads = ( - db_session.query(Upload) - .join(Upload.report) - .filter( - Upload.report.has(commit=commit), - Upload.report.has(report_type=ReportType.COVERAGE.value), - Upload.state_id == UploadState.UPLOADED.db_id, - ) - .count() + state = ProcessingState( + commit.repoid, commit.commitid, db_session=db_session ) + remaining_uploads = state.count_remaining_coverage_uploads() if remaining_uploads > 0: log.info( "Not scheduling notify because there are still pending uploads", From 6306ed50509784ffd5d9f9763494d550674bd33f Mon Sep 17 00:00:00 2001 From: Tom Hu Date: Thu, 12 Mar 2026 04:39:24 +0900 Subject: [PATCH 11/19] refactor(worker): simplify followup scheduling and bound merge batches Use followup-type countdown mapping in a single scheduler and process uploads in batches of 10 with early continuation when time budget is near exhaustion, then re-read merge-ready uploads from ProcessingState. Made-with: Cursor --- apps/worker/tasks/upload_finisher.py | 93 ++++++++++++++-------------- 1 file changed, 47 insertions(+), 46 deletions(-) diff --git a/apps/worker/tasks/upload_finisher.py b/apps/worker/tasks/upload_finisher.py index 3f9864a750..5225bbcefb 100644 --- a/apps/worker/tasks/upload_finisher.py +++ b/apps/worker/tasks/upload_finisher.py @@ -60,6 +60,8 @@ FINISHER_SWEEP_COUNTDOWN_SECONDS = 30 FINISHER_WATCHDOG_DELAY_OFFSET_SECONDS = 30 FINISHER_MERGE_TIME_BUDGET_SECONDS = 200 +FINISHER_CONTINUATION_BUFFER_SECONDS = 10 +FINISHER_UPLOAD_MERGE_BATCH_SIZE = 10 regexp_ci_skip = re.compile(r"\[(ci|skip| |-){3,}\]") @@ -98,50 +100,30 @@ def _schedule_followup( commitid: str, commit_yaml: UserYaml, followup_type: FollowupTaskType, - countdown: int | None = None, ): refresh_finisher_gate_ttl(repoid, commitid) + default_countdowns = { + FollowupTaskType.SWEEP: FINISHER_SWEEP_COUNTDOWN_SECONDS, + FollowupTaskType.CONTINUATION: 0, + FollowupTaskType.WATCHDOG: self.get_lock_timeout( + DEFAULT_LOCK_TIMEOUT_SECONDS + ) + + FINISHER_WATCHDOG_DELAY_OFFSET_SECONDS, + } + countdown = default_countdowns[followup_type] kwargs = { "repoid": repoid, "commitid": commitid, "commit_yaml": commit_yaml.to_dict(), "trigger": followup_type.value, } - if countdown is None: + if countdown <= 0: self.app.tasks[upload_finisher_task_name].apply_async(kwargs=kwargs) else: self.app.tasks[upload_finisher_task_name].apply_async( kwargs=kwargs, countdown=countdown ) - def _schedule_sweep(self, repoid: int, commitid: str, commit_yaml: UserYaml): - self._schedule_followup( - repoid, - commitid, - commit_yaml, - FollowupTaskType.SWEEP, - countdown=FINISHER_SWEEP_COUNTDOWN_SECONDS, - ) - - def _schedule_watchdog( - self, repoid: int, commitid: str, commit_yaml: UserYaml, countdown: int - ): - self._schedule_followup( - repoid, - commitid, - commit_yaml, - FollowupTaskType.WATCHDOG, - countdown=countdown, - ) - - def _schedule_continuation(self, repoid: int, commitid: str, commit_yaml: UserYaml): - self._schedule_followup( - repoid, - commitid, - commit_yaml, - FollowupTaskType.CONTINUATION, - ) - def _reconstruct_processing_results( self, db_session, state: ProcessingState, commit: Commit ) -> list[ProcessingResult]: @@ -300,7 +282,6 @@ def run_impl( commitid, commit_yaml, FollowupTaskType.SWEEP, - countdown=FINISHER_SWEEP_COUNTDOWN_SECONDS, ) self._call_upload_breadcrumb_task( commit_sha=commitid, @@ -400,24 +381,54 @@ def _process_reports_with_lock( db_session.refresh(commit) report_service = ReportService(commit_yaml) merge_start_time = time.monotonic() + merge_deadline = merge_start_time + FINISHER_MERGE_TIME_BUDGET_SECONDS merged_processing_results: list[ProcessingResult] = [] merged_upload_ids: list[int] = [] pending_processing_results = processing_results - while pending_processing_results: + while True: + if ( + merge_deadline - time.monotonic() + <= FINISHER_CONTINUATION_BUFFER_SECONDS + ): + remaining_processed_uploads = ( + state.get_upload_numbers().processed + ) + return { + "processing_results": merged_processing_results, + "upload_ids": merged_upload_ids, + "continuation_needed": bool(pending_processing_results) + or remaining_processed_uploads > 0, + } + + if not pending_processing_results: + pending_processing_results = ( + self._reconstruct_processing_results( + db_session, state, commit + ) + ) + if not pending_processing_results: + break + + current_batch = pending_processing_results[ + :FINISHER_UPLOAD_MERGE_BATCH_SIZE + ] + pending_processing_results = pending_processing_results[ + FINISHER_UPLOAD_MERGE_BATCH_SIZE: + ] pending_upload_ids = [ - upload["upload_id"] for upload in pending_processing_results + upload["upload_id"] for upload in current_batch ] if not pending_upload_ids: break log.info("run_impl: Performing report merging") report = perform_report_merging( - report_service, commit_yaml, commit, pending_processing_results + report_service, commit_yaml, commit, current_batch ) log.info( "run_impl: Saving combined report", - extra={"processing_results": pending_processing_results}, + extra={"processing_results": current_batch}, ) if diff: log.info("run_impl: Applying diff to report") @@ -433,19 +444,9 @@ def _process_reports_with_lock( log.info("run_impl: Cleaning up intermediate reports") cleanup_intermediate_reports(pending_upload_ids) - merged_processing_results.extend(pending_processing_results) + merged_processing_results.extend(current_batch) merged_upload_ids.extend(pending_upload_ids) - if ( - time.monotonic() - merge_start_time - >= FINISHER_MERGE_TIME_BUDGET_SECONDS - ): - break - - pending_processing_results = self._reconstruct_processing_results( - db_session, state, commit - ) - remaining_processed_uploads = state.get_upload_numbers().processed return { "processing_results": merged_processing_results, From 2a0e36e1641e7e9818de81184b7deddb30496fba Mon Sep 17 00:00:00 2001 From: Tom Hu Date: Thu, 12 Mar 2026 05:11:58 +0900 Subject: [PATCH 12/19] fix(worker): address PR #756 review thread issues Persist processed-state visibility before finisher enqueue, remove redundant merged transition in finisher, and update idempotency-era test expectations to match state-driven reconstruction behavior. Made-with: Cursor --- apps/worker/services/processing/processing.py | 1 + .../tests/unit/test_upload_finisher_task.py | 52 ++++++++----- apps/worker/tasks/upload_finisher.py | 73 ++++++++++--------- 3 files changed, 74 insertions(+), 52 deletions(-) diff --git a/apps/worker/services/processing/processing.py b/apps/worker/services/processing/processing.py index 857d83f52d..1e23a69b37 100644 --- a/apps/worker/services/processing/processing.py +++ b/apps/worker/services/processing/processing.py @@ -74,6 +74,7 @@ def process_upload( if processing_result.report: save_intermediate_report(upload_id, processing_result.report) state.mark_upload_as_processed(upload_id) + db_session.commit() upload_numbers = state.get_upload_numbers() if should_perform_merge(upload_numbers): diff --git a/apps/worker/tasks/tests/unit/test_upload_finisher_task.py b/apps/worker/tasks/tests/unit/test_upload_finisher_task.py index 0a096440cc..bc9208fba2 100644 --- a/apps/worker/tasks/tests/unit/test_upload_finisher_task.py +++ b/apps/worker/tasks/tests/unit/test_upload_finisher_task.py @@ -1076,17 +1076,10 @@ def test_generic_exception_handling(self, dbsession, mocker, mock_self_app): ) @pytest.mark.django_db - def test_idempotency_check_skips_already_processed_uploads( + def test_finisher_reconstructs_even_if_previous_results_were_terminal( self, dbsession, mocker, mock_self_app ): - """Test that finisher skips work if all uploads are already in final state. - - This test validates the idempotency check that prevents wasted work when: - - Multiple finishers are triggered (e.g., visibility timeout re-queuing) - - Finisher is manually retried - - The check only skips when ALL uploads exist in DB and are in final states. - """ + """Finisher no longer short-circuits from callback payload terminal states.""" commit = CommitFactory.create() dbsession.add(commit) dbsession.flush() @@ -1104,9 +1097,35 @@ def test_idempotency_check_skips_already_processed_uploads( dbsession.add(upload_3) dbsession.flush() - # Mock the _process_reports_with_lock to verify it's NOT called + # Mock the lock path and state reconstruction to verify normal execution path. mock_process = mocker.patch.object( - UploadFinisherTask, "_process_reports_with_lock" + UploadFinisherTask, + "_process_reports_with_lock", + return_value={ + "processing_results": [ + {"upload_id": upload_1.id, "successful": True, "arguments": {}}, + {"upload_id": upload_2.id, "successful": True, "arguments": {}}, + {"upload_id": upload_3.id, "successful": False, "arguments": {}}, + ], + "upload_ids": [upload_1.id, upload_2.id, upload_3.id], + "continuation_needed": False, + }, + ) + mocker.patch.object( + UploadFinisherTask, + "_reconstruct_processing_results", + return_value=[ + {"upload_id": upload_1.id, "successful": True, "arguments": {}}, + {"upload_id": upload_2.id, "successful": True, "arguments": {}}, + {"upload_id": upload_3.id, "successful": False, "arguments": {}}, + ], + ) + mocker.patch( + "tasks.upload_finisher.ProcessingState.count_remaining_coverage_uploads", + return_value=1, + ) + mock_handle_lock = mocker.patch.object( + UploadFinisherTask, "_handle_finisher_lock" ) previous_results = [ @@ -1123,14 +1142,13 @@ def test_idempotency_check_skips_already_processed_uploads( commit_yaml={}, ) - # Verify that the finisher skipped all work + # Verify that run_impl schedules sweep, instead of a payload-only idempotent skip. assert result == { - "already_completed": True, - "upload_ids": [upload_1.id, upload_2.id, upload_3.id], + "sweep_scheduled": True, + "remaining_uploads": 1, } - - # Verify that _process_reports_with_lock was NOT called - mock_process.assert_not_called() + mock_process.assert_called_once() + mock_handle_lock.assert_not_called() @pytest.mark.django_db def test_idempotency_check_proceeds_when_uploads_not_finished( diff --git a/apps/worker/tasks/upload_finisher.py b/apps/worker/tasks/upload_finisher.py index 5225bbcefb..c5e9163856 100644 --- a/apps/worker/tasks/upload_finisher.py +++ b/apps/worker/tasks/upload_finisher.py @@ -14,12 +14,14 @@ from database.models import Commit, Pull from database.models.core import GITHUB_APP_INSTALLATION_DEFAULT_NAME from database.models.reports import Upload +from helpers.checkpoint_logger.flows import UploadFlow from helpers.exceptions import RepositoryWithoutValidBotError from helpers.github_installation import get_installation_name_for_owner_for_task from helpers.save_commit_error import save_commit_error from services.comparison import get_or_create_comparison from services.lock_manager import LockManager, LockRetry, LockType from services.processing.finisher_gate import ( + FINISHER_GATE_TTL_SECONDS, delete_finisher_gate, refresh_finisher_gate_ttl, ) @@ -72,7 +74,7 @@ class ShouldCallNotifyResult(Enum): NOTIFY = "notify" -class FollowupTaskType(Enum): +class UploadFinisherFollowUpTaskType(Enum): SWEEP = "sweep" WATCHDOG = "watchdog" CONTINUATION = "continuation" @@ -99,16 +101,18 @@ def _schedule_followup( repoid: int, commitid: str, commit_yaml: UserYaml, - followup_type: FollowupTaskType, + followup_type: UploadFinisherFollowUpTaskType, ): refresh_finisher_gate_ttl(repoid, commitid) default_countdowns = { - FollowupTaskType.SWEEP: FINISHER_SWEEP_COUNTDOWN_SECONDS, - FollowupTaskType.CONTINUATION: 0, - FollowupTaskType.WATCHDOG: self.get_lock_timeout( - DEFAULT_LOCK_TIMEOUT_SECONDS - ) - + FINISHER_WATCHDOG_DELAY_OFFSET_SECONDS, + UploadFinisherFollowUpTaskType.SWEEP: FINISHER_SWEEP_COUNTDOWN_SECONDS, + UploadFinisherFollowUpTaskType.CONTINUATION: 0, + UploadFinisherFollowUpTaskType.WATCHDOG: min( + FINISHER_GATE_TTL_SECONDS - 60, + self.get_lock_timeout(DEFAULT_LOCK_TIMEOUT_SECONDS) + + FINISHER_WATCHDOG_DELAY_OFFSET_SECONDS + + 150, + ), } countdown = default_countdowns[followup_type] kwargs = { @@ -117,7 +121,7 @@ def _schedule_followup( "commit_yaml": commit_yaml.to_dict(), "trigger": followup_type.value, } - if countdown <= 0: + if countdown == 0: self.app.tasks[upload_finisher_task_name].apply_async(kwargs=kwargs) else: self.app.tasks[upload_finisher_task_name].apply_async( @@ -201,6 +205,11 @@ def run_impl( commit_yaml, **kwargs, ): + try: + UploadFlow.log(UploadFlow.BATCH_PROCESSING_COMPLETE) + except ValueError as e: + log.warning("CheckpointLogger failed to log/submit", extra={"error": e}) + milestone = Milestones.UPLOAD_COMPLETE log.info( @@ -263,7 +272,7 @@ def run_impl( repoid, commitid, commit_yaml, - FollowupTaskType.CONTINUATION, + UploadFinisherFollowUpTaskType.CONTINUATION, ) return { "continuation_scheduled": True, @@ -281,8 +290,10 @@ def run_impl( repoid, commitid, commit_yaml, - FollowupTaskType.SWEEP, + UploadFinisherFollowUpTaskType.SWEEP, ) + UploadFlow.log(UploadFlow.PROCESSING_COMPLETE) + UploadFlow.log(UploadFlow.SKIPPING_NOTIFICATION) self._call_upload_breadcrumb_task( commit_sha=commitid, repo_id=repoid, @@ -384,7 +395,6 @@ def _process_reports_with_lock( merge_deadline = merge_start_time + FINISHER_MERGE_TIME_BUDGET_SECONDS merged_processing_results: list[ProcessingResult] = [] merged_upload_ids: list[int] = [] - pending_processing_results = processing_results while True: if ( @@ -397,30 +407,19 @@ def _process_reports_with_lock( return { "processing_results": merged_processing_results, "upload_ids": merged_upload_ids, - "continuation_needed": bool(pending_processing_results) - or remaining_processed_uploads > 0, + "continuation_needed": remaining_processed_uploads > 0, } - - if not pending_processing_results: - pending_processing_results = ( - self._reconstruct_processing_results( - db_session, state, commit - ) - ) - if not pending_processing_results: - break - - current_batch = pending_processing_results[ + processing_results = self._reconstruct_processing_results( + db_session, state, commit + ) + if not processing_results: + break + current_batch = processing_results[ :FINISHER_UPLOAD_MERGE_BATCH_SIZE ] - pending_processing_results = pending_processing_results[ - FINISHER_UPLOAD_MERGE_BATCH_SIZE: - ] pending_upload_ids = [ upload["upload_id"] for upload in current_batch ] - if not pending_upload_ids: - break log.info("run_impl: Performing report merging") report = perform_report_merging( @@ -438,9 +437,6 @@ def _process_reports_with_lock( report_service.save_report(commit, report) db_session.commit() - log.info("run_impl: Marking uploads as merged") - state.mark_uploads_as_merged(pending_upload_ids) - log.info("run_impl: Cleaning up intermediate reports") cleanup_intermediate_reports(pending_upload_ids) @@ -462,6 +458,7 @@ def _process_reports_with_lock( upload_ids=upload_ids, error=Errors.INTERNAL_LOCK_ERROR, ) + UploadFlow.log(UploadFlow.FINISHER_LOCK_ERROR) if retry.max_retries_exceeded or self._has_exceeded_max_attempts( UPLOAD_PROCESSOR_MAX_RETRIES ): @@ -491,7 +488,7 @@ def _process_reports_with_lock( upload_ids=upload_ids, error=Errors.INTERNAL_RETRYING, ) - raise self.retry( + return self.retry( max_retries=UPLOAD_PROCESSOR_MAX_RETRIES, countdown=retry.countdown ) @@ -611,7 +608,7 @@ def _handle_finisher_lock( upload_ids=upload_ids, error=Errors.INTERNAL_RETRYING, ) - self.retry( + return self.retry( max_retries=UPLOAD_PROCESSOR_MAX_RETRIES, countdown=retry.countdown ) @@ -644,6 +641,7 @@ def finish_reports_processing( "commitid": commitid, "current_yaml": commit_yaml.to_dict(), } + notify_kwargs = UploadFlow.save_to_kwargs(notify_kwargs) task = self.app.tasks[notify_task_name].apply_async( kwargs=notify_kwargs ) @@ -715,6 +713,7 @@ def finish_reports_processing( "commitid": commitid, "current_yaml": commit_yaml.to_dict(), } + notify_error_kwargs = UploadFlow.save_to_kwargs(notify_error_kwargs) task = self.app.tasks[notify_error_task_name].apply_async( kwargs=notify_error_kwargs ) @@ -728,6 +727,10 @@ def finish_reports_processing( ) commit.state = "skipped" + UploadFlow.log(UploadFlow.PROCESSING_COMPLETE) + if not notifications_called: + UploadFlow.log(UploadFlow.SKIPPING_NOTIFICATION) + return {"notifications_called": notifications_called} def should_call_notifications( From 7dcdac6955a5f1eb3b4ba03df803a2e3a9a2979e Mon Sep 17 00:00:00 2001 From: Tom Hu Date: Thu, 12 Mar 2026 05:20:56 +0900 Subject: [PATCH 13/19] fix(worker): restore processing-state task wiring for worker CI Pass db_session into ProcessingState usage in upload scheduling and align finisher unit setup with state-reconstruction behavior so worker CI paths continue to run under DB-backed processing state. Made-with: Cursor --- apps/worker/tasks/tests/unit/test_upload_finisher_task.py | 5 +++++ apps/worker/tasks/upload.py | 4 +++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/apps/worker/tasks/tests/unit/test_upload_finisher_task.py b/apps/worker/tasks/tests/unit/test_upload_finisher_task.py index bc9208fba2..65cc245c0d 100644 --- a/apps/worker/tasks/tests/unit/test_upload_finisher_task.py +++ b/apps/worker/tasks/tests/unit/test_upload_finisher_task.py @@ -231,6 +231,11 @@ def test_upload_finisher_task_call( previous_results = [ {"upload_id": 0, "arguments": {"url": url}, "successful": True} ] + mocker.patch.object( + UploadFinisherTask, + "_reconstruct_processing_results", + return_value=previous_results, + ) _start_upload_flow(mocker) result = UploadFinisherTask().run_impl( diff --git a/apps/worker/tasks/upload.py b/apps/worker/tasks/upload.py index d91b6186b5..4076b616ba 100644 --- a/apps/worker/tasks/upload.py +++ b/apps/worker/tasks/upload.py @@ -835,7 +835,9 @@ def _schedule_coverage_processing_task( ): self.maybe_log_upload_checkpoint(UploadFlow.INITIAL_PROCESSING_COMPLETE) - state = ProcessingState(commit.repoid, commit.commitid) + state = ProcessingState( + commit.repoid, commit.commitid, db_session=commit.get_db_session() + ) state.mark_uploads_as_processing( [int(upload["upload_id"]) for upload in argument_list] ) From 6d43a1614076e9a1a6fd650e95f5b55b03130d7a Mon Sep 17 00:00:00 2001 From: Tom Hu Date: Thu, 12 Mar 2026 05:25:53 +0900 Subject: [PATCH 14/19] fix: make updates --- .../services/processing/finisher_gate.py | 4 - apps/worker/services/processing/merging.py | 18 +- apps/worker/services/processing/state.py | 21 +- apps/worker/tasks/stuck_uploads_cron.py | 91 +++ apps/worker/tasks/upload_merger.py | 689 ++++++++++++++++++ 5 files changed, 791 insertions(+), 32 deletions(-) create mode 100644 apps/worker/tasks/stuck_uploads_cron.py create mode 100644 apps/worker/tasks/upload_merger.py diff --git a/apps/worker/services/processing/finisher_gate.py b/apps/worker/services/processing/finisher_gate.py index 0b09591ee3..4e3e5cc0a0 100644 --- a/apps/worker/services/processing/finisher_gate.py +++ b/apps/worker/services/processing/finisher_gate.py @@ -27,7 +27,3 @@ def refresh_finisher_gate_ttl(repo_id: int, commit_sha: str) -> None: def delete_finisher_gate(repo_id: int, commit_sha: str) -> None: get_redis_connection().delete(finisher_gate_key(repo_id, commit_sha)) - - -def finisher_gate_exists(repo_id: int, commit_sha: str) -> bool: - return bool(get_redis_connection().exists(finisher_gate_key(repo_id, commit_sha))) diff --git a/apps/worker/services/processing/merging.py b/apps/worker/services/processing/merging.py index 2f0f74d605..1b930f5207 100644 --- a/apps/worker/services/processing/merging.py +++ b/apps/worker/services/processing/merging.py @@ -136,29 +136,17 @@ def update_uploads( report = reports.get(upload_id) if report is not None: all_totals.append(make_totals(upload_id, report.totals)) - elif result.get("error"): + else: update = { "state_id": UploadState.ERROR.db_id, "state": "error", } error = UploadError( upload_id=upload_id, - error_code=result["error"]["code"], - error_params=result["error"]["params"], + error_code=result["error"]["code"] if result.get("error") else UploadErrorCode.UNKNOWN_PROCESSING, + error_params=result["error"]["params"] if result.get("error") else {}, ) all_errors.append(error) - else: - update = { - "state_id": UploadState.ERROR.db_id, - "state": "error", - } - all_errors.append( - UploadError( - upload_id=upload_id, - error_code=UploadErrorCode.UNKNOWN_PROCESSING, - error_params={}, - ) - ) update["id_"] = upload_id order_number = merge_result.session_mapping.get(upload_id) diff --git a/apps/worker/services/processing/state.py b/apps/worker/services/processing/state.py index 2fe338c0d2..6dee8defdc 100644 --- a/apps/worker/services/processing/state.py +++ b/apps/worker/services/processing/state.py @@ -45,9 +45,9 @@ @dataclass class UploadNumbers: - processing: int + uploaded: int """ - The number of uploads currently being processed. + The number of uploads that have finished being uploaded. """ processed: int @@ -64,18 +64,18 @@ def should_perform_merge(uploads: UploadNumbers) -> bool: This is the case when no more uploads are expected, or we reached the desired batch size for merging. """ - return uploads.processing == 0 or uploads.processed >= MERGE_BATCH_SIZE + return uploads.processed > 0 -def should_trigger_postprocessing(uploads: UploadNumbers) -> bool: +def should_trigger_postuploaded(uploads: UploadNumbers) -> bool: """ - Determines whether post-processing steps, such as notifications, etc, + Determines whether post-uploaded steps, such as notifications, etc, should be performed. This is the case when no more uploads are expected, and all the processed uploads have been merged into the "master report". """ - return uploads.processing == 0 and uploads.processed == 0 + return uploads.uploaded == 0 and uploads.processed == 0 class ProcessingState: @@ -114,18 +114,13 @@ def get_upload_numbers(self): ) .one() ) - return UploadNumbers(processing=row[0], processed=row[1]) - - def mark_uploads_as_processing(self, upload_ids: list[int]): - # No-op: uploads are created with state_id=UPLOADED, which - # get_upload_numbers() already counts as "processing". - pass + return UploadNumbers(uploaded=row[0], processed=row[1]) def clear_in_progress_uploads(self, upload_ids: list[int]): if not upload_ids: return # Mark still-UPLOADED uploads as ERROR so they stop being counted - # as "processing" in get_upload_numbers(). Only matches UPLOADED -- + # as "uploaded" in get_upload_numbers(). Only matches UPLOADED -- # already-PROCESSED uploads (success path) are unaffected. # # This runs in a finally block, so the transaction may already be diff --git a/apps/worker/tasks/stuck_uploads_cron.py b/apps/worker/tasks/stuck_uploads_cron.py new file mode 100644 index 0000000000..ead1bb6fe8 --- /dev/null +++ b/apps/worker/tasks/stuck_uploads_cron.py @@ -0,0 +1,91 @@ +import logging +from datetime import timedelta + +from sqlalchemy import func + +from app import celery_app +from celery_config import stuck_uploads_check_cron_task_name +from database.enums import ReportType +from database.models.core import Commit +from database.models.reports import CommitReport, Upload +from helpers.clock import get_utc_now +from shared.celery_config import upload_merger_task_name +from shared.helpers.redis import get_redis_connection +from shared.reports.enums import UploadState +from tasks.crontasks import CodecovCronTask +from tasks.upload_merger import MERGER_GATE_TTL, merger_gate_key + +log = logging.getLogger(__name__) + +STUCK_THRESHOLD_MINUTES = 15 + + +class StuckUploadsCheckTask(CodecovCronTask, name=stuck_uploads_check_cron_task_name): + @classmethod + def get_min_seconds_interval_between_executions(cls): + return 3300 # 55 minutes + + def run_cron_task(self, db_session, *args, **kwargs): + cutoff = get_utc_now() - timedelta(minutes=STUCK_THRESHOLD_MINUTES) + + stuck_commits = ( + db_session.query( + CommitReport.commit_id, + Commit.repoid, + Commit.commitid, + func.count(Upload.id_).label("stuck_count"), + ) + .join(CommitReport, Upload.report_id == CommitReport.id_) + .join(Commit, CommitReport.commit_id == Commit.id_) + .filter( + Upload.state_id == UploadState.PROCESSED.db_id, + Upload.updated_at <= cutoff, + (CommitReport.report_type == None) # noqa: E711 + | (CommitReport.report_type == ReportType.COVERAGE.value), + ) + .group_by(CommitReport.commit_id, Commit.repoid, Commit.commitid) + .all() + ) + + if not stuck_commits: + log.info("No stuck uploads found") + return {"stuck_commits": 0, "triggered": 0} + + redis = get_redis_connection() + triggered = 0 + + for row in stuck_commits: + log.error( + "Stuck uploads detected, triggering merger", + extra={ + "repoid": row.repoid, + "commitid": row.commitid, + "stuck_count": row.stuck_count, + "threshold_minutes": STUCK_THRESHOLD_MINUTES, + }, + ) + + gate_key = merger_gate_key(row.repoid, row.commitid) + if redis.set(gate_key, "1", nx=True, ex=MERGER_GATE_TTL): + self.app.tasks[upload_merger_task_name].apply_async( + kwargs={ + "repoid": row.repoid, + "commitid": row.commitid, + "commit_yaml": {}, + "trigger": "cron", + } + ) + triggered += 1 + + log.info( + "Stuck uploads check complete", + extra={ + "stuck_commits": len(stuck_commits), + "triggered": triggered, + }, + ) + return {"stuck_commits": len(stuck_commits), "triggered": triggered} + + +RegisteredStuckUploadsCheckTask = celery_app.register_task(StuckUploadsCheckTask()) +stuck_uploads_check_task = celery_app.tasks[RegisteredStuckUploadsCheckTask.name] diff --git a/apps/worker/tasks/upload_merger.py b/apps/worker/tasks/upload_merger.py new file mode 100644 index 0000000000..aa6c9f85ea --- /dev/null +++ b/apps/worker/tasks/upload_merger.py @@ -0,0 +1,689 @@ +import logging +import re +import time +from collections import namedtuple +from datetime import UTC, datetime, timedelta +from enum import Enum + +import sentry_sdk +from asgiref.sync import async_to_sync +from celery.exceptions import SoftTimeLimitExceeded +from sqlalchemy import func + +from app import celery_app +from celery_config import notify_error_task_name +from database.enums import CommitErrorTypes, ReportType +from database.models import Commit, Pull +from database.models.core import GITHUB_APP_INSTALLATION_DEFAULT_NAME +from database.models.reports import CommitReport, Upload +from helpers.checkpoint_logger.flows import UploadFlow +from helpers.exceptions import RepositoryWithoutValidBotError +from helpers.github_installation import get_installation_name_for_owner_for_task +from helpers.save_commit_error import save_commit_error +from services.comparison import get_or_create_comparison +from services.lock_manager import LockManager, LockRetry, LockType +from services.processing.intermediate import ( + cleanup_intermediate_reports, + load_intermediate_reports, +) +from services.processing.merging import merge_reports, update_uploads +from services.processing.types import ProcessingResult +from services.report import ReportService +from services.repository import get_repo_provider_service +from services.timeseries import repository_datasets_query +from services.yaml import read_yaml_field +from shared.celery_config import ( + DEFAULT_LOCK_TIMEOUT_SECONDS, + compute_comparison_task_name, + notify_task_name, + pulls_task_name, + timeseries_save_commit_measurements_task_name, + upload_merger_task_name, +) +from shared.helpers.cache import cache +from shared.helpers.redis import get_redis_connection +from shared.metrics import Counter, Histogram +from shared.reports.enums import UploadState +from shared.reports.resources import Report +from shared.timeseries.helpers import is_timeseries_enabled +from shared.torngit.exceptions import TorngitError +from shared.yaml import UserYaml +from tasks.base import BaseCodecovTask + +log = logging.getLogger(__name__) + +# --- Constants --- + +MERGE_BATCH_SIZE = 10 +MERGE_TIME_BUDGET_SECONDS = 200 +MERGER_GATE_KEY_PREFIX = "upload_merger_lock" +MERGER_GATE_TTL = 900 + +regexp_ci_skip = re.compile(r"\[(ci|skip| |-){3,}\]") + +# --- Prometheus Metrics --- + +UPLOAD_E2E_DURATION = Histogram( + "upload_pipeline_e2e_duration_seconds", + "Duration from oldest upload created_at to notify triggered.", + ["path", "trigger"], + buckets=[5, 15, 30, 60, 120, 300, 600, 1200, 3600], +) + +UPLOAD_MERGE_DURATION = Histogram( + "upload_merge_duration_seconds", + "Wall clock time of the merge phase.", + ["path", "trigger"], + buckets=[1, 5, 10, 30, 60, 120, 300, 600], +) + +UPLOAD_MERGE_COUNT = Histogram( + "upload_merge_uploads_total", + "Number of uploads merged in one merger run.", + ["path", "trigger"], + buckets=[1, 5, 10, 25, 50, 100, 200, 500], +) + +UPLOAD_MERGE_RESULT = Counter( + "upload_merge_result_total", + "Outcome of merge task.", + ["path", "outcome", "trigger"], +) + +# --- Helper types --- + +RemainingUploads = namedtuple("RemainingUploads", ["uploaded", "processed"]) + + +class ShouldCallNotifyResult(Enum): + DO_NOT_NOTIFY = "do_not_notify" + NOTIFY_ERROR = "notify_error" + NOTIFY = "notify" + + +# --- Key helpers --- + + +def merger_gate_key(repoid: int, commitid: str) -> str: + return f"{MERGER_GATE_KEY_PREFIX}_{repoid}_{commitid}" + + +def delete_merger_gate(redis, repoid: int, commitid: str): + redis.delete(merger_gate_key(repoid, commitid)) + + +# --- Scheduling helpers --- + + +def schedule_continuation(task, repoid, commitid, commit_yaml): + task.app.tasks[upload_merger_task_name].apply_async( + kwargs={ + "repoid": repoid, + "commitid": commitid, + "commit_yaml": commit_yaml.to_dict(), + "trigger": "continuation", + } + ) + + +def schedule_sweep(task, repoid, commitid, commit_yaml, countdown=30): + task.app.tasks[upload_merger_task_name].apply_async( + kwargs={ + "repoid": repoid, + "commitid": commitid, + "commit_yaml": commit_yaml.to_dict(), + "trigger": "sweep", + }, + countdown=countdown, + ) + + +def schedule_watchdog(task, repoid, commitid, commit_yaml, countdown): + task.app.tasks[upload_merger_task_name].apply_async( + kwargs={ + "repoid": repoid, + "commitid": commitid, + "commit_yaml": commit_yaml.to_dict(), + "trigger": "watchdog", + }, + countdown=countdown, + ) + + +# --- DB query helpers --- + + +def get_processed_uploads(db_session, commit: Commit, limit: int) -> list[Upload]: + return ( + db_session.query(Upload) + .join(CommitReport, Upload.report_id == CommitReport.id_) + .filter( + CommitReport.commit_id == commit.id_, + (CommitReport.report_type == None) # noqa: E711 + | (CommitReport.report_type == ReportType.COVERAGE.value), + Upload.state_id == UploadState.PROCESSED.db_id, + ) + .limit(limit) + .all() + ) + + +def count_unfinished_uploads(db_session, commit: Commit) -> RemainingUploads: + row = ( + db_session.query( + func.count(Upload.id_).filter( + Upload.state_id == UploadState.UPLOADED.db_id + ), + func.count(Upload.id_).filter( + Upload.state_id == UploadState.PROCESSED.db_id + ), + ) + .join(CommitReport, Upload.report_id == CommitReport.id_) + .filter( + CommitReport.commit_id == commit.id_, + (CommitReport.report_type == None) # noqa: E711 + | (CommitReport.report_type == ReportType.COVERAGE.value), + ) + .one() + ) + return RemainingUploads(uploaded=row[0], processed=row[1]) + + +def get_oldest_upload_created_at(db_session, commit: Commit) -> datetime | None: + result = ( + db_session.query(func.min(Upload.created_at)) + .join(CommitReport, Upload.report_id == CommitReport.id_) + .filter( + CommitReport.commit_id == commit.id_, + (CommitReport.report_type == None) # noqa: E711 + | (CommitReport.report_type == ReportType.COVERAGE.value), + ) + .scalar() + ) + return result + + +# --- Merge helpers --- + + +def merge_batch( + report_service: ReportService, + master_report: Report, + commit: Commit, + commit_yaml: UserYaml, + batch: list[Upload], + diff: dict | None, +) -> list[ProcessingResult]: + """Load intermediate reports for a batch, merge into master report, + and mark uploads as MERGED in the session (not yet committed).""" + upload_ids = [u.id_ for u in batch] + intermediate_reports = load_intermediate_reports(upload_ids) + + processing_results: list[ProcessingResult] = [] + for upload in batch: + has_report = any( + ir.upload_id == upload.id_ and not ir.report.is_empty() + for ir in intermediate_reports + ) + processing_results.append( + { + "upload_id": upload.id_, + "arguments": { + "commit": commit.commitid, + "upload_id": upload.id_, + "version": "v4", + "reportid": str(upload.report.external_id), + }, + "successful": has_report, + } + ) + + master_report, merge_result = merge_reports( + commit_yaml, master_report, intermediate_reports + ) + + db_session = commit.get_db_session() + update_uploads( + db_session, + commit_yaml, + processing_results, + intermediate_reports, + merge_result, + ) + + return processing_results + + +def save_and_commit( + db_session, + report_service: ReportService, + commit: Commit, + master_report: Report, + merged_upload_ids: list[int], + diff: dict | None, +): + """Persist the master report and clean up intermediate data. + + Calls rollback() before writing to release a potentially stale DB + connection (same pattern as the existing finisher). + """ + if diff: + master_report.apply_diff(diff) + + db_session.rollback() + report_service.save_report(commit, master_report) + db_session.commit() + cleanup_intermediate_reports(merged_upload_ids) + + +# --- Cache invalidation --- + + +def invalidate_caches(redis_connection, commit: Commit): + redis_connection.delete(f"cache/{commit.repoid}/tree/{commit.branch}") + redis_connection.delete(f"cache/{commit.repoid}/tree/{commit.commitid}") + repository = commit.repository + key = ":".join((repository.service, repository.author.username, repository.name)) + if commit.branch: + redis_connection.hdel("badge", (f"{key}:{commit.branch}").lower()) + if commit.branch == repository.branch: + redis_connection.hdel("badge", (f"{key}:").lower()) + + +# --- Post-processing --- + + +def should_call_notifications( + commit: Commit, + commit_yaml: UserYaml, + all_processing_results: list[ProcessingResult], + db_session, +) -> ShouldCallNotifyResult: + remaining_uploads = ( + db_session.query(Upload) + .join(CommitReport, Upload.report_id == CommitReport.id_) + .filter( + CommitReport.commit_id == commit.id_, + (CommitReport.report_type == None) # noqa: E711 + | (CommitReport.report_type == ReportType.COVERAGE.value), + Upload.state_id == UploadState.UPLOADED.db_id, + ) + .count() + ) + if remaining_uploads > 0: + return ShouldCallNotifyResult.DO_NOT_NOTIFY + + manual_trigger = read_yaml_field( + commit_yaml, ("codecov", "notify", "manual_trigger") + ) + if manual_trigger: + return ShouldCallNotifyResult.DO_NOT_NOTIFY + + after_n_builds = ( + read_yaml_field(commit_yaml, ("codecov", "notify", "after_n_builds")) or 0 + ) + if after_n_builds > 0: + report = ReportService(commit_yaml).get_existing_report_for_commit(commit) + number_sessions = len(report.sessions) if report is not None else 0 + if after_n_builds > number_sessions: + return ShouldCallNotifyResult.DO_NOT_NOTIFY + + processing_successes = [x["successful"] for x in all_processing_results] + + if read_yaml_field( + commit_yaml, + ("codecov", "notify", "notify_error"), + _else=False, + ): + if len(processing_successes) == 0 or not all(processing_successes): + return ShouldCallNotifyResult.NOTIFY_ERROR + else: + if not any(processing_successes): + return ShouldCallNotifyResult.DO_NOT_NOTIFY + + return ShouldCallNotifyResult.NOTIFY + + +def post_process( + db_session, + commit: Commit, + commit_yaml: UserYaml, + task, + all_processing_results: list[ProcessingResult], +) -> dict: + """Run notifications, repo update, timeseries, and PR comparison.""" + repoid = commit.repoid + commitid = commit.commitid + repository = commit.repository + + notifications_called = False + if not regexp_ci_skip.search(commit.message or ""): + notify_result = should_call_notifications( + commit, commit_yaml, all_processing_results, db_session + ) + match notify_result: + case ShouldCallNotifyResult.NOTIFY: + notifications_called = True + notify_kwargs = { + "repoid": repoid, + "commitid": commitid, + "current_yaml": commit_yaml.to_dict(), + } + notify_kwargs = UploadFlow.save_to_kwargs(notify_kwargs) + task.app.tasks[notify_task_name].apply_async(kwargs=notify_kwargs) + + if commit.pullid: + pull = ( + db_session.query(Pull) + .filter_by(repoid=commit.repoid, pullid=commit.pullid) + .first() + ) + if pull: + head = pull.get_head_commit() + if head is None or head.timestamp <= commit.timestamp: + pull.head = commit.commitid + if pull.head == commit.commitid: + db_session.commit() + task.app.tasks[pulls_task_name].apply_async( + kwargs={ + "repoid": repoid, + "pullid": pull.pullid, + "should_send_notifications": False, + } + ) + compared_to = pull.get_comparedto_commit() + if compared_to: + comparison = get_or_create_comparison( + db_session, compared_to, commit + ) + db_session.commit() + task.app.tasks[ + compute_comparison_task_name + ].apply_async(kwargs={"comparison_id": comparison.id}) + case ShouldCallNotifyResult.NOTIFY_ERROR: + notify_error_kwargs = { + "repoid": repoid, + "commitid": commitid, + "current_yaml": commit_yaml.to_dict(), + } + notify_error_kwargs = UploadFlow.save_to_kwargs(notify_error_kwargs) + task.app.tasks[notify_error_task_name].apply_async( + kwargs=notify_error_kwargs + ) + case ShouldCallNotifyResult.DO_NOT_NOTIFY: + pass + else: + commit.state = "skipped" + + if is_timeseries_enabled(): + dataset_names = [ + dataset.name for dataset in repository_datasets_query(repository) + ] + if dataset_names: + task.app.tasks[timeseries_save_commit_measurements_task_name].apply_async( + kwargs={ + "commitid": commitid, + "repoid": repoid, + "dataset_names": dataset_names, + } + ) + + now = datetime.now(tz=UTC) + threshold = now - timedelta(minutes=60) + if not repository.updatestamp or repository.updatestamp < threshold: + repository.updatestamp = now + db_session.commit() + + UploadFlow.log(UploadFlow.PROCESSING_COMPLETE) + if not notifications_called: + UploadFlow.log(UploadFlow.SKIPPING_NOTIFICATION) + + return {"notifications_called": notifications_called} + + +# --- Diff loading (reused from finisher) --- + + +@sentry_sdk.trace +@cache.cache_function(ttl=60 * 60) +def load_commit_diff(commit: Commit, task_name: str | None = None) -> dict | None: + repository = commit.repository + commitid = commit.commitid + try: + installation_name_to_use = ( + get_installation_name_for_owner_for_task(task_name, repository.author) + if task_name + else GITHUB_APP_INSTALLATION_DEFAULT_NAME + ) + repository_service = get_repo_provider_service( + repository, installation_name_to_use=installation_name_to_use + ) + return async_to_sync(repository_service.get_commit_diff)(commitid) + except TorngitError: + log.warning( + "Could not apply diff to report because there was an error fetching diff from provider", + exc_info=True, + ) + except RepositoryWithoutValidBotError: + save_commit_error( + commit, + error_code=CommitErrorTypes.REPO_BOT_INVALID.value, + ) + log.warning( + "Could not apply diff to report because there is no valid bot found for that repo", + exc_info=True, + ) + return None + + +# === Main Task === + + +class UploadMergerTask(BaseCodecovTask, name=upload_merger_task_name): + """Merge processed uploads into the master report. + + Replaces the chord-based upload_finisher with a single-task-per-commit + model. Features: time-boxed batch merging, self-scheduling continuations, + sweep for late-arriving uploads, watchdog-first crash recovery. + """ + + def run_impl( + self, + db_session, + *, + repoid: int, + commitid: str, + commit_yaml, + trigger: str = "processor", + **kwargs, + ): + repoid = int(repoid) + commit_yaml = UserYaml(commit_yaml) + redis = get_redis_connection() + + # --- Gate key check: terminate watchdog chain if work is done --- + if not redis.exists(merger_gate_key(repoid, commitid)): + UPLOAD_MERGE_RESULT.labels( + path="merger", outcome="nothing_to_do", trigger=trigger + ).inc() + return {"nothing_to_do": True} + + commit = ( + db_session.query(Commit) + .filter(Commit.repoid == repoid, Commit.commitid == commitid) + .first() + ) + assert commit, "Commit not found in database." + + lock_manager = LockManager( + repoid=repoid, + commitid=commitid, + lock_timeout=self.get_lock_timeout(DEFAULT_LOCK_TIMEOUT_SECONDS), + blocking_timeout=5, + ) + + # --- Watchdog-first: schedule safety net BEFORE crash-prone work --- + watchdog_countdown = self.get_lock_timeout(DEFAULT_LOCK_TIMEOUT_SECONDS) + 30 + schedule_watchdog( + self, repoid, commitid, commit_yaml, countdown=watchdog_countdown + ) + + try: + with lock_manager.locked(LockType.UPLOAD_PROCESSING): + return self._merge_under_lock( + db_session, commit, commit_yaml, trigger, lock_manager + ) + except LockRetry: + UPLOAD_MERGE_RESULT.labels( + path="merger", outcome="lock_retry", trigger=trigger + ).inc() + return {"lock_acquired": False} + except SoftTimeLimitExceeded: + log.warning( + "Merger soft time limit exceeded", + extra={"repoid": repoid, "commitid": commitid, "trigger": trigger}, + ) + UPLOAD_MERGE_RESULT.labels( + path="merger", outcome="error", trigger=trigger + ).inc() + return {"error": "Soft time limit exceeded"} + + def _merge_under_lock( + self, + db_session, + commit: Commit, + commit_yaml: UserYaml, + trigger: str, + lock_manager: LockManager, + ): + repoid = commit.repoid + commitid = commit.commitid + + db_session.refresh(commit) + report_service = ReportService(commit_yaml) + master_report = report_service.get_existing_report_for_commit(commit) + if master_report is None: + master_report = Report() + diff = load_commit_diff(commit, self.name) + + merge_start = time.monotonic() + uploads_merged = 0 + merged_upload_ids: list[int] = [] + all_processing_results: list[ProcessingResult] = [] + + # --- Time-boxed merge loop --- + while time.monotonic() - merge_start < MERGE_TIME_BUDGET_SECONDS: + batch = get_processed_uploads(db_session, commit, MERGE_BATCH_SIZE) + if not batch: + break + batch_results = merge_batch( + report_service, master_report, commit, commit_yaml, batch, diff + ) + all_processing_results.extend(batch_results) + merged_upload_ids.extend(u.id_ for u in batch) + uploads_merged += len(batch) + else: + # Budget exhausted, more work may remain + save_and_commit( + db_session, + report_service, + commit, + master_report, + merged_upload_ids, + diff, + ) + UPLOAD_MERGE_DURATION.labels(path="merger", trigger=trigger).observe( + time.monotonic() - merge_start + ) + UPLOAD_MERGE_COUNT.labels(path="merger", trigger=trigger).observe( + uploads_merged + ) + UPLOAD_MERGE_RESULT.labels( + path="merger", outcome="continuation", trigger=trigger + ).inc() + schedule_continuation(self, repoid, commitid, commit_yaml) + return {"continuation_scheduled": True, "uploads_merged": uploads_merged} + + merge_elapsed = time.monotonic() - merge_start + UPLOAD_MERGE_DURATION.labels(path="merger", trigger=trigger).observe( + merge_elapsed + ) + UPLOAD_MERGE_COUNT.labels(path="merger", trigger=trigger).observe( + uploads_merged + ) + + # --- Early exit: nothing was merged --- + if uploads_merged == 0: + remaining = count_unfinished_uploads(db_session, commit) + if remaining.uploaded > 0: + UPLOAD_MERGE_RESULT.labels( + path="merger", outcome="sweep", trigger=trigger + ).inc() + schedule_sweep(self, repoid, commitid, commit_yaml, countdown=30) + return {"sweep_scheduled": True} + delete_merger_gate(lock_manager.redis_connection, repoid, commitid) + UPLOAD_MERGE_RESULT.labels( + path="merger", outcome="nothing_to_do", trigger=trigger + ).inc() + return {"nothing_to_do": True} + + # --- Save report + commit DB + cleanup intermediates --- + save_and_commit( + db_session, + report_service, + commit, + master_report, + merged_upload_ids, + diff, + ) + + # --- Check remaining work --- + remaining = count_unfinished_uploads(db_session, commit) + + if remaining.uploaded > 0: + UPLOAD_MERGE_RESULT.labels( + path="merger", outcome="sweep", trigger=trigger + ).inc() + schedule_sweep(self, repoid, commitid, commit_yaml, countdown=30) + return {"sweep_scheduled": True, "uploads_merged": uploads_merged} + + if remaining.processed > 0: + UPLOAD_MERGE_RESULT.labels( + path="merger", outcome="continuation", trigger=trigger + ).inc() + schedule_continuation(self, repoid, commitid, commit_yaml) + return {"continuation_scheduled": True, "uploads_merged": uploads_merged} + + # --- All done: post-process --- + report_totals = master_report.totals.asdict() if master_report.totals else {} + log.info( + "Merge complete", + extra={ + "repoid": repoid, + "commitid": commitid, + "report_totals": report_totals, + "uploads_merged": uploads_merged, + }, + ) + + result = post_process( + db_session, commit, commit_yaml, self, all_processing_results + ) + + oldest_upload = get_oldest_upload_created_at(db_session, commit) + if oldest_upload: + e2e = ( + datetime.now(tz=UTC) - oldest_upload.replace(tzinfo=UTC) + ).total_seconds() + UPLOAD_E2E_DURATION.labels(path="merger", trigger=trigger).observe(e2e) + + invalidate_caches(lock_manager.redis_connection, commit) + delete_merger_gate(lock_manager.redis_connection, repoid, commitid) + UPLOAD_MERGE_RESULT.labels( + path="merger", outcome="completed", trigger=trigger + ).inc() + return {**result, "uploads_merged": uploads_merged} + + +RegisteredUploadMergerTask = celery_app.register_task(UploadMergerTask()) +upload_merger_task = celery_app.tasks[RegisteredUploadMergerTask.name] From 93f3146aa75272a5a2b635e2c02093fb04e4fa42 Mon Sep 17 00:00:00 2001 From: Tom Hu Date: Thu, 12 Mar 2026 05:35:00 +0900 Subject: [PATCH 15/19] style(worker): apply repo pre-commit formatting Apply ruff formatting output required by the CI Run Lint hook for touched worker files. Made-with: Cursor --- apps/worker/services/processing/merging.py | 4 +++- apps/worker/tasks/upload_finisher.py | 12 +++--------- 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/apps/worker/services/processing/merging.py b/apps/worker/services/processing/merging.py index 1b930f5207..b5428c00c4 100644 --- a/apps/worker/services/processing/merging.py +++ b/apps/worker/services/processing/merging.py @@ -143,7 +143,9 @@ def update_uploads( } error = UploadError( upload_id=upload_id, - error_code=result["error"]["code"] if result.get("error") else UploadErrorCode.UNKNOWN_PROCESSING, + error_code=result["error"]["code"] + if result.get("error") + else UploadErrorCode.UNKNOWN_PROCESSING, error_params=result["error"]["params"] if result.get("error") else {}, ) all_errors.append(error) diff --git a/apps/worker/tasks/upload_finisher.py b/apps/worker/tasks/upload_finisher.py index c5e9163856..9e0e411e51 100644 --- a/apps/worker/tasks/upload_finisher.py +++ b/apps/worker/tasks/upload_finisher.py @@ -154,7 +154,6 @@ def _reconstruct_processing_results( # Check which uploads have intermediate reports in Redis redis_connection = get_redis_connection() - processing_results = [] for upload in uploads: # Check if intermediate report exists (indicates successful processing) @@ -162,16 +161,15 @@ def _reconstruct_processing_results( has_report = redis_connection.exists(report_key) processing_result: ProcessingResult = { - "upload_id": upload.id_, "arguments": { "commit": commit.commitid, + "reportid": str(upload.report.external_id), "upload_id": upload.id_, "version": "v4", # Assume v4 for recovered uploads - "reportid": str(upload.report.external_id), }, "successful": bool(has_report), + "upload_id": upload.id_, } - if not has_report: log.warning( "Upload in processed set but no intermediate report found", @@ -181,7 +179,6 @@ def _reconstruct_processing_results( "code": "missing_intermediate_report", "params": {}, } - processing_results.append(processing_result) log.info( @@ -211,12 +208,9 @@ def run_impl( log.warning("CheckpointLogger failed to log/submit", extra={"error": e}) milestone = Milestones.UPLOAD_COMPLETE - log.info( "Received upload_finisher task", - extra={ - "processing_results": processing_results, - }, + extra={"processing_results": processing_results}, ) repoid = int(repoid) From 53bc3abb6b45a1146b1519b2ecc560503b3a07dc Mon Sep 17 00:00:00 2001 From: Tom Hu Date: Thu, 12 Mar 2026 05:36:10 +0900 Subject: [PATCH 16/19] fix(worker): preserve merged state updates in upload_merger loop Return the updated master report from batch merges and stop rolling back before final save so staged upload state transitions are retained through commit. Made-with: Cursor --- apps/worker/tasks/upload_merger.py | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/apps/worker/tasks/upload_merger.py b/apps/worker/tasks/upload_merger.py index aa6c9f85ea..da93eee2f5 100644 --- a/apps/worker/tasks/upload_merger.py +++ b/apps/worker/tasks/upload_merger.py @@ -207,13 +207,14 @@ def get_oldest_upload_created_at(db_session, commit: Commit) -> datetime | None: def merge_batch( + db_session, report_service: ReportService, master_report: Report, commit: Commit, commit_yaml: UserYaml, batch: list[Upload], diff: dict | None, -) -> list[ProcessingResult]: +) -> tuple[Report, list[ProcessingResult]]: """Load intermediate reports for a batch, merge into master report, and mark uploads as MERGED in the session (not yet committed).""" upload_ids = [u.id_ for u in batch] @@ -242,7 +243,6 @@ def merge_batch( commit_yaml, master_report, intermediate_reports ) - db_session = commit.get_db_session() update_uploads( db_session, commit_yaml, @@ -251,7 +251,7 @@ def merge_batch( merge_result, ) - return processing_results + return master_report, processing_results def save_and_commit( @@ -262,15 +262,10 @@ def save_and_commit( merged_upload_ids: list[int], diff: dict | None, ): - """Persist the master report and clean up intermediate data. - - Calls rollback() before writing to release a potentially stale DB - connection (same pattern as the existing finisher). - """ + """Persist the master report and commit staged upload updates.""" if diff: master_report.apply_diff(diff) - db_session.rollback() report_service.save_report(commit, master_report) db_session.commit() cleanup_intermediate_reports(merged_upload_ids) @@ -576,8 +571,14 @@ def _merge_under_lock( batch = get_processed_uploads(db_session, commit, MERGE_BATCH_SIZE) if not batch: break - batch_results = merge_batch( - report_service, master_report, commit, commit_yaml, batch, diff + master_report, batch_results = merge_batch( + db_session, + report_service, + master_report, + commit, + commit_yaml, + batch, + diff, ) all_processing_results.extend(batch_results) merged_upload_ids.extend(u.id_ for u in batch) From a0cbf7eae07938c90567635a02514df3da16e876 Mon Sep 17 00:00:00 2001 From: Tom Hu Date: Thu, 12 Mar 2026 05:38:38 +0900 Subject: [PATCH 17/19] fix: make edits --- apps/worker/tasks/upload_finisher.py | 26 ++++++++++---------------- 1 file changed, 10 insertions(+), 16 deletions(-) diff --git a/apps/worker/tasks/upload_finisher.py b/apps/worker/tasks/upload_finisher.py index 9e0e411e51..8ff954b528 100644 --- a/apps/worker/tasks/upload_finisher.py +++ b/apps/worker/tasks/upload_finisher.py @@ -114,19 +114,15 @@ def _schedule_followup( + 150, ), } - countdown = default_countdowns[followup_type] - kwargs = { - "repoid": repoid, - "commitid": commitid, - "commit_yaml": commit_yaml.to_dict(), - "trigger": followup_type.value, - } - if countdown == 0: - self.app.tasks[upload_finisher_task_name].apply_async(kwargs=kwargs) - else: - self.app.tasks[upload_finisher_task_name].apply_async( - kwargs=kwargs, countdown=countdown - ) + self.app.tasks[upload_finisher_task_name].apply_async( + kwargs={ + "commit_yaml": commit_yaml.to_dict(), + "commitid": commitid, + "repoid": repoid, + "trigger": followup_type.value, + }, + countdown=default_countdowns[followup_type], + ) def _reconstruct_processing_results( self, db_session, state: ProcessingState, commit: Commit @@ -138,12 +134,10 @@ def _reconstruct_processing_results( in the final merged report, even if they completed via retry/recovery. """ - # Get all upload IDs that are ready to be merged. + # Get all upload IDs that are ready to be merged up to MERGE_BATCH_SIZE upload_ids = state.get_uploads_for_merging() - if not upload_ids: return [] - log.info( "Reconstructing processing results from ProcessingState", extra={"upload_ids": list(upload_ids), "count": len(upload_ids)}, From ceb76144396f86a5e7680edfa1d27486ec60b1a2 Mon Sep 17 00:00:00 2001 From: Tom Hu Date: Thu, 12 Mar 2026 06:19:03 +0900 Subject: [PATCH 18/19] fix: more updates --- apps/worker/tasks/upload_finisher.py | 36 ++++++++-------------------- 1 file changed, 10 insertions(+), 26 deletions(-) diff --git a/apps/worker/tasks/upload_finisher.py b/apps/worker/tasks/upload_finisher.py index 8ff954b528..310077b97f 100644 --- a/apps/worker/tasks/upload_finisher.py +++ b/apps/worker/tasks/upload_finisher.py @@ -196,11 +196,6 @@ def run_impl( commit_yaml, **kwargs, ): - try: - UploadFlow.log(UploadFlow.BATCH_PROCESSING_COMPLETE) - except ValueError as e: - log.warning("CheckpointLogger failed to log/submit", extra={"error": e}) - milestone = Milestones.UPLOAD_COMPLETE log.info( "Received upload_finisher task", @@ -209,35 +204,29 @@ def run_impl( repoid = int(repoid) commit_yaml = UserYaml(commit_yaml) - - log.info("run_impl: Getting commit") - commit = ( db_session.query(Commit) .filter(Commit.repoid == repoid, Commit.commitid == commitid) .first() ) assert commit, "Commit not found in database." - log.info("run_impl: Got commit") + # TODO - tom - you should just check to see if any uploads are in existence, have the lock handle pulling uploads state = ProcessingState(repoid, commitid, db_session=db_session) processing_results = self._reconstruct_processing_results( db_session, state, commit ) + upload_ids = [upload["upload_id"] for upload in processing_results] log.info( "run_impl: Reconstructed processing results", extra={ - "upload_count": len(processing_results), - "upload_ids": [r["upload_id"] for r in processing_results], + "upload_count": len(upload_ids), + "upload_ids": upload_ids, }, ) - upload_ids = [upload["upload_id"] for upload in processing_results] - try: - log.info("run_impl: Processing reports with lock") - merge_result = self._process_reports_with_lock( db_session, commit, @@ -358,10 +347,9 @@ def _process_reports_with_lock( state: ProcessingState, ): """Process reports with a lock to prevent concurrent modifications.""" - diff = load_commit_diff(commit, self.name) repoid = commit.repoid commitid = commit.commitid - + diff = load_commit_diff(commit, self.name) log.info("run_impl: Loaded commit diff") lock_manager = LockManager( @@ -380,23 +368,19 @@ def _process_reports_with_lock( db_session.refresh(commit) report_service = ReportService(commit_yaml) merge_start_time = time.monotonic() - merge_deadline = merge_start_time + FINISHER_MERGE_TIME_BUDGET_SECONDS merged_processing_results: list[ProcessingResult] = [] merged_upload_ids: list[int] = [] while True: - if ( - merge_deadline - time.monotonic() - <= FINISHER_CONTINUATION_BUFFER_SECONDS - ): - remaining_processed_uploads = ( - state.get_upload_numbers().processed - ) + # Check if we've run out of time to continue merging + if time.monotonic() - merge_start_time <= FINISHER_CONTINUATION_BUFFER_SECONDS: + remaining_processed_uploads = state.get_upload_numbers().processed return { + "continuation_needed": remaining_processed_uploads > 0, "processing_results": merged_processing_results, "upload_ids": merged_upload_ids, - "continuation_needed": remaining_processed_uploads > 0, } + processing_results = self._reconstruct_processing_results( db_session, state, commit ) From c48148b3b02dc0c95bbb562f572ac2d84e41479a Mon Sep 17 00:00:00 2001 From: Tom Hu Date: Thu, 12 Mar 2026 06:30:07 +0900 Subject: [PATCH 19/19] fix(worker): restore missing finisher pipeline changes Reapply no-chord coverage scheduling in UploadTask, restore ProcessingState APIs used by callers/tests, bring back finisher gate existence helper, and keep finisher merge-loop helper improvements with corrected continuation timing. Made-with: Cursor --- .../services/processing/finisher_gate.py | 4 ++ apps/worker/services/processing/state.py | 21 ++++--- apps/worker/tasks/upload.py | 20 +++--- apps/worker/tasks/upload_finisher.py | 63 ++++++++++++++----- 4 files changed, 71 insertions(+), 37 deletions(-) diff --git a/apps/worker/services/processing/finisher_gate.py b/apps/worker/services/processing/finisher_gate.py index 4e3e5cc0a0..0b09591ee3 100644 --- a/apps/worker/services/processing/finisher_gate.py +++ b/apps/worker/services/processing/finisher_gate.py @@ -27,3 +27,7 @@ def refresh_finisher_gate_ttl(repo_id: int, commit_sha: str) -> None: def delete_finisher_gate(repo_id: int, commit_sha: str) -> None: get_redis_connection().delete(finisher_gate_key(repo_id, commit_sha)) + + +def finisher_gate_exists(repo_id: int, commit_sha: str) -> bool: + return bool(get_redis_connection().exists(finisher_gate_key(repo_id, commit_sha))) diff --git a/apps/worker/services/processing/state.py b/apps/worker/services/processing/state.py index 6dee8defdc..2fe338c0d2 100644 --- a/apps/worker/services/processing/state.py +++ b/apps/worker/services/processing/state.py @@ -45,9 +45,9 @@ @dataclass class UploadNumbers: - uploaded: int + processing: int """ - The number of uploads that have finished being uploaded. + The number of uploads currently being processed. """ processed: int @@ -64,18 +64,18 @@ def should_perform_merge(uploads: UploadNumbers) -> bool: This is the case when no more uploads are expected, or we reached the desired batch size for merging. """ - return uploads.processed > 0 + return uploads.processing == 0 or uploads.processed >= MERGE_BATCH_SIZE -def should_trigger_postuploaded(uploads: UploadNumbers) -> bool: +def should_trigger_postprocessing(uploads: UploadNumbers) -> bool: """ - Determines whether post-uploaded steps, such as notifications, etc, + Determines whether post-processing steps, such as notifications, etc, should be performed. This is the case when no more uploads are expected, and all the processed uploads have been merged into the "master report". """ - return uploads.uploaded == 0 and uploads.processed == 0 + return uploads.processing == 0 and uploads.processed == 0 class ProcessingState: @@ -114,13 +114,18 @@ def get_upload_numbers(self): ) .one() ) - return UploadNumbers(uploaded=row[0], processed=row[1]) + return UploadNumbers(processing=row[0], processed=row[1]) + + def mark_uploads_as_processing(self, upload_ids: list[int]): + # No-op: uploads are created with state_id=UPLOADED, which + # get_upload_numbers() already counts as "processing". + pass def clear_in_progress_uploads(self, upload_ids: list[int]): if not upload_ids: return # Mark still-UPLOADED uploads as ERROR so they stop being counted - # as "uploaded" in get_upload_numbers(). Only matches UPLOADED -- + # as "processing" in get_upload_numbers(). Only matches UPLOADED -- # already-PROCESSED uploads (success path) are unaffected. # # This runs in a finally block, so the transaction may already be diff --git a/apps/worker/tasks/upload.py b/apps/worker/tasks/upload.py index 4076b616ba..b0581c28c4 100644 --- a/apps/worker/tasks/upload.py +++ b/apps/worker/tasks/upload.py @@ -4,12 +4,13 @@ import uuid from copy import deepcopy from datetime import UTC, datetime +from types import SimpleNamespace from typing import Any, TypedDict import orjson import sentry_sdk from asgiref.sync import async_to_sync -from celery import chain, chord +from celery import chain from celery.exceptions import SoftTimeLimitExceeded from django.conf import settings from redis import Redis @@ -54,7 +55,6 @@ from tasks.bundle_analysis_processor import bundle_analysis_processor_task from tasks.test_results_finisher import test_results_finisher_task from tasks.test_results_processor import test_results_processor_task -from tasks.upload_finisher import upload_finisher_task from tasks.upload_processor import UPLOAD_PROCESSING_LOCK_NAME, upload_processor_task log = logging.getLogger(__name__) @@ -852,16 +852,12 @@ def _schedule_coverage_processing_task( for arguments in argument_list ] - finisher_kwargs = { - "repoid": commit.repoid, - "commitid": commit.commitid, - "commit_yaml": commit_yaml, - } - finisher_kwargs = UploadFlow.save_to_kwargs(finisher_kwargs) - finish_parallel_sig = upload_finisher_task.signature(kwargs=finisher_kwargs) - - parallel_tasks = chord(parallel_processing_tasks, finish_parallel_sig) - return parallel_tasks.apply_async() + scheduled_task_ids: list[str] = [] + for processor_task in parallel_processing_tasks: + result = processor_task.apply_async() + if result and result.id: + scheduled_task_ids.append(result.id) + return SimpleNamespace(as_tuple=lambda: tuple(scheduled_task_ids)) def _schedule_bundle_analysis_processing_task( self, diff --git a/apps/worker/tasks/upload_finisher.py b/apps/worker/tasks/upload_finisher.py index 310077b97f..6b5b8ade0d 100644 --- a/apps/worker/tasks/upload_finisher.py +++ b/apps/worker/tasks/upload_finisher.py @@ -1,6 +1,7 @@ import logging import re import time +from collections import namedtuple from datetime import UTC, datetime, timedelta from enum import Enum @@ -57,6 +58,8 @@ log = logging.getLogger(__name__) +RemainingUploads = namedtuple("RemainingUploads", ["uploaded", "processed"]) + FINISHER_BLOCKING_TIMEOUT_SECONDS = 30 FINISHER_BASE_RETRY_COUNTDOWN_SECONDS = 10 FINISHER_SWEEP_COUNTDOWN_SECONDS = 30 @@ -68,6 +71,30 @@ regexp_ci_skip = re.compile(r"\[(ci|skip| |-){3,}\]") +def count_unfinished_uploads(db_session, commit: Commit) -> RemainingUploads: + state = ProcessingState(commit.repoid, commit.commitid, db_session=db_session) + upload_numbers = state.get_upload_numbers() + return RemainingUploads( + uploaded=state.count_remaining_coverage_uploads(), + processed=upload_numbers.processed, + ) + + +def save_and_commit( + db_session, + report_service: ReportService, + commit: Commit, + report: Report, + merged_upload_ids: list[int], + diff: dict | None, +): + if diff: + report.apply_diff(diff) + report_service.save_report(commit, report) + db_session.commit() + cleanup_intermediate_reports(merged_upload_ids) + + class ShouldCallNotifyResult(Enum): DO_NOT_NOTIFY = "do_not_notify" NOTIFY_ERROR = "notify_error" @@ -256,7 +283,7 @@ def run_impl( "upload_ids": upload_ids, } - remaining_uploads = state.count_remaining_coverage_uploads() + remaining_uploads = count_unfinished_uploads(db_session, commit).uploaded if remaining_uploads > 0: log.info( @@ -372,11 +399,15 @@ def _process_reports_with_lock( merged_upload_ids: list[int] = [] while True: - # Check if we've run out of time to continue merging - if time.monotonic() - merge_start_time <= FINISHER_CONTINUATION_BUFFER_SECONDS: - remaining_processed_uploads = state.get_upload_numbers().processed + elapsed = time.monotonic() - merge_start_time + if ( + elapsed + >= FINISHER_MERGE_TIME_BUDGET_SECONDS + - FINISHER_CONTINUATION_BUFFER_SECONDS + ): + remaining = count_unfinished_uploads(db_session, commit) return { - "continuation_needed": remaining_processed_uploads > 0, + "continuation_needed": remaining.processed > 0, "processing_results": merged_processing_results, "upload_ids": merged_upload_ids, } @@ -401,25 +432,23 @@ def _process_reports_with_lock( "run_impl: Saving combined report", extra={"processing_results": current_batch}, ) - if diff: - log.info("run_impl: Applying diff to report") - report.apply_diff(diff) - - log.info("run_impl: Saving report") - report_service.save_report(commit, report) - db_session.commit() - - log.info("run_impl: Cleaning up intermediate reports") - cleanup_intermediate_reports(pending_upload_ids) + save_and_commit( + db_session, + report_service, + commit, + report, + pending_upload_ids, + diff, + ) merged_processing_results.extend(current_batch) merged_upload_ids.extend(pending_upload_ids) - remaining_processed_uploads = state.get_upload_numbers().processed + remaining = count_unfinished_uploads(db_session, commit) return { "processing_results": merged_processing_results, "upload_ids": merged_upload_ids, - "continuation_needed": remaining_processed_uploads > 0, + "continuation_needed": remaining.processed > 0, } except LockRetry as retry: