From 0d60ee910805fc073815bb76f29cc5710a75c93a Mon Sep 17 00:00:00 2001 From: Tom Hu Date: Fri, 27 Feb 2026 08:51:41 +0900 Subject: [PATCH] fix(worker): mark uploads as error on permanent finisher failure When the upload_finisher fails permanently (unrecoverable exception, soft time limit, or max lock retries exceeded), uploads remained in "started" state. The next upload to the same commit would re-discover these stuck uploads and spawn another finisher that fails again, creating a retry loop that inflated the queue. Add _mark_uploads_as_error() helper that best-effort transitions uploads to "error" state, breaking the cycle. Called from all four permanent failure paths: SoftTimeLimitExceeded, generic Exception, and both LockRetry max-retries-exceeded handlers. The helper rolls back the (possibly broken) DB session first and is wrapped in try/except so a dead DB connection does not cause a secondary failure. Made-with: Cursor --- .../tests/unit/test_upload_finisher_task.py | 96 +++++++++++++++++++ apps/worker/tasks/upload_finisher.py | 39 ++++++++ 2 files changed, 135 insertions(+) diff --git a/apps/worker/tasks/tests/unit/test_upload_finisher_task.py b/apps/worker/tasks/tests/unit/test_upload_finisher_task.py index 55bd762ae3..0baa3773aa 100644 --- a/apps/worker/tasks/tests/unit/test_upload_finisher_task.py +++ b/apps/worker/tasks/tests/unit/test_upload_finisher_task.py @@ -1074,6 +1074,102 @@ def test_generic_exception_handling(self, dbsession, mocker, mock_self_app): } ) + @pytest.mark.django_db + def test_generic_exception_marks_uploads_as_error( + self, dbsession, mocker, mock_self_app + ): + """Uploads stuck in 'started' state must transition to 'error' when the + finisher hits an unrecoverable exception, otherwise new finisher tasks + for the same commit keep re-discovering them and failing in a loop.""" + mocker.patch("tasks.upload_finisher.sentry_sdk.capture_exception") + mocker.patch( + "tasks.upload_finisher.UploadFinisherTask._process_reports_with_lock", + side_effect=ValueError("boom"), + ) + + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + + report = CommitReport(commit_id=commit.id_) + dbsession.add(report) + dbsession.flush() + + upload = UploadFactory.create( + report=report, + state="started", + state_id=UploadState.UPLOADED.db_id, + storage_path="url", + ) + dbsession.add(upload) + dbsession.flush() + + previous_results = [ + {"upload_id": upload.id, "successful": True, "arguments": {}} + ] + + result = UploadFinisherTask().run_impl( + dbsession, + previous_results, + repoid=commit.repoid, + commitid=commit.commitid, + commit_yaml={}, + ) + + assert result["error"] == "boom" + + dbsession.expire_all() + dbsession.refresh(upload) + assert upload.state == "error" + assert upload.state_id == UploadState.ERROR.db_id + + @pytest.mark.django_db + def test_soft_time_limit_marks_uploads_as_error( + self, dbsession, mocker, mock_self_app + ): + """Uploads must be marked as error when finisher hits a soft time limit, + preventing the same uploads from being re-discovered by the next finisher.""" + mocker.patch( + "tasks.upload_finisher.UploadFinisherTask._process_reports_with_lock", + side_effect=SoftTimeLimitExceeded, + ) + + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + + report = CommitReport(commit_id=commit.id_) + dbsession.add(report) + dbsession.flush() + + upload = UploadFactory.create( + report=report, + state="started", + state_id=UploadState.UPLOADED.db_id, + storage_path="url", + ) + dbsession.add(upload) + dbsession.flush() + + previous_results = [ + {"upload_id": upload.id, "successful": True, "arguments": {}} + ] + + result = UploadFinisherTask().run_impl( + dbsession, + previous_results, + repoid=commit.repoid, + commitid=commit.commitid, + commit_yaml={}, + ) + + assert result["error"] == "Soft time limit exceeded" + + dbsession.expire_all() + dbsession.refresh(upload) + assert upload.state == "error" + assert upload.state_id == UploadState.ERROR.db_id + @pytest.mark.django_db def test_idempotency_check_skips_already_processed_uploads( self, dbsession, mocker, mock_self_app diff --git a/apps/worker/tasks/upload_finisher.py b/apps/worker/tasks/upload_finisher.py index 619e16b7e1..bb7ae45764 100644 --- a/apps/worker/tasks/upload_finisher.py +++ b/apps/worker/tasks/upload_finisher.py @@ -87,6 +87,41 @@ class UploadFinisherTask(BaseCodecovTask, name=upload_finisher_task_name): max_retries = UPLOAD_PROCESSOR_MAX_RETRIES + def _mark_uploads_as_error(self, db_session, upload_ids: list) -> None: + """Best-effort: transition uploads to error state so they are not re-processed. + + When the finisher fails permanently (unrecoverable exception, soft time + limit, or max retries exceeded), uploads stay in "started" state. The + next upload to the same commit will re-discover them and spawn another + finisher that fails again, creating a retry loop. Marking them as error + breaks that cycle. + + The whole operation is wrapped in try/except because the DB session may + already be in a broken state (e.g. after an OperationalError). + """ + if not upload_ids: + return + try: + db_session.rollback() + db_session.query(Upload).filter(Upload.id_.in_(upload_ids)).update( + { + Upload.state: "error", + Upload.state_id: UploadState.ERROR.db_id, + }, + synchronize_session="fetch", + ) + db_session.commit() + log.info( + "Marked uploads as error after permanent failure", + extra={"upload_ids": upload_ids}, + ) + except Exception: + log.warning( + "Failed to mark uploads as error (DB may be unreachable)", + extra={"upload_ids": upload_ids}, + exc_info=True, + ) + def _find_started_uploads_with_reports( self, db_session, commit: Commit ) -> set[int]: @@ -369,6 +404,7 @@ def run_impl( except SoftTimeLimitExceeded: log.warning("run_impl: soft time limit exceeded") + self._mark_uploads_as_error(db_session, upload_ids) self._call_upload_breadcrumb_task( commit_sha=commitid, repo_id=repoid, @@ -388,6 +424,7 @@ def run_impl( "Unexpected error in upload finisher", extra={"upload_ids": upload_ids}, ) + self._mark_uploads_as_error(db_session, upload_ids) self._call_upload_breadcrumb_task( commit_sha=commitid, repo_id=repoid, @@ -484,6 +521,7 @@ def _process_reports_with_lock( "repoid": repoid, }, ) + self._mark_uploads_as_error(db_session, upload_ids) self._call_upload_breadcrumb_task( commit_sha=commitid, repo_id=repoid, @@ -605,6 +643,7 @@ def _handle_finisher_lock( "repoid": repoid, }, ) + self._mark_uploads_as_error(db_session, upload_ids) self._call_upload_breadcrumb_task( commit_sha=commitid, repo_id=repoid,