Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 96 additions & 0 deletions apps/worker/tasks/tests/unit/test_upload_finisher_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -1074,6 +1074,102 @@ def test_generic_exception_handling(self, dbsession, mocker, mock_self_app):
}
)

@pytest.mark.django_db
def test_generic_exception_marks_uploads_as_error(
self, dbsession, mocker, mock_self_app
):
"""Uploads stuck in 'started' state must transition to 'error' when the
finisher hits an unrecoverable exception, otherwise new finisher tasks
for the same commit keep re-discovering them and failing in a loop."""
mocker.patch("tasks.upload_finisher.sentry_sdk.capture_exception")
mocker.patch(
"tasks.upload_finisher.UploadFinisherTask._process_reports_with_lock",
side_effect=ValueError("boom"),
)

commit = CommitFactory.create()
dbsession.add(commit)
dbsession.flush()

report = CommitReport(commit_id=commit.id_)
dbsession.add(report)
dbsession.flush()

upload = UploadFactory.create(
report=report,
state="started",
state_id=UploadState.UPLOADED.db_id,
storage_path="url",
)
dbsession.add(upload)
dbsession.flush()

previous_results = [
{"upload_id": upload.id, "successful": True, "arguments": {}}
]

result = UploadFinisherTask().run_impl(
dbsession,
previous_results,
repoid=commit.repoid,
commitid=commit.commitid,
commit_yaml={},
)

assert result["error"] == "boom"

dbsession.expire_all()
dbsession.refresh(upload)
assert upload.state == "error"
assert upload.state_id == UploadState.ERROR.db_id

@pytest.mark.django_db
def test_soft_time_limit_marks_uploads_as_error(
self, dbsession, mocker, mock_self_app
):
"""Uploads must be marked as error when finisher hits a soft time limit,
preventing the same uploads from being re-discovered by the next finisher."""
mocker.patch(
"tasks.upload_finisher.UploadFinisherTask._process_reports_with_lock",
side_effect=SoftTimeLimitExceeded,
)

commit = CommitFactory.create()
dbsession.add(commit)
dbsession.flush()

report = CommitReport(commit_id=commit.id_)
dbsession.add(report)
dbsession.flush()

upload = UploadFactory.create(
report=report,
state="started",
state_id=UploadState.UPLOADED.db_id,
storage_path="url",
)
dbsession.add(upload)
dbsession.flush()

previous_results = [
{"upload_id": upload.id, "successful": True, "arguments": {}}
]

result = UploadFinisherTask().run_impl(
dbsession,
previous_results,
repoid=commit.repoid,
commitid=commit.commitid,
commit_yaml={},
)

assert result["error"] == "Soft time limit exceeded"

dbsession.expire_all()
dbsession.refresh(upload)
assert upload.state == "error"
assert upload.state_id == UploadState.ERROR.db_id

@pytest.mark.django_db
def test_idempotency_check_skips_already_processed_uploads(
self, dbsession, mocker, mock_self_app
Expand Down
39 changes: 39 additions & 0 deletions apps/worker/tasks/upload_finisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,41 @@ class UploadFinisherTask(BaseCodecovTask, name=upload_finisher_task_name):

max_retries = UPLOAD_PROCESSOR_MAX_RETRIES

def _mark_uploads_as_error(self, db_session, upload_ids: list) -> None:
"""Best-effort: transition uploads to error state so they are not re-processed.

When the finisher fails permanently (unrecoverable exception, soft time
limit, or max retries exceeded), uploads stay in "started" state. The
next upload to the same commit will re-discover them and spawn another
finisher that fails again, creating a retry loop. Marking them as error
breaks that cycle.

The whole operation is wrapped in try/except because the DB session may
already be in a broken state (e.g. after an OperationalError).
"""
if not upload_ids:
return
try:
db_session.rollback()
db_session.query(Upload).filter(Upload.id_.in_(upload_ids)).update(
{
Upload.state: "error",
Upload.state_id: UploadState.ERROR.db_id,
},
synchronize_session="fetch",
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing state filter may revert processed uploads to error

High Severity

_mark_uploads_as_error updates all uploads matching upload_ids regardless of their current state. When _handle_finisher_lock hits max lock retries (or a SoftTimeLimitExceeded/Exception fires during that phase), uploads have already been committed as "processed" by the earlier _process_reports_with_lock call. The unfiltered update will revert them from "processed" to "error", incorrectly discarding successful work. The query needs an additional filter like Upload.state == "started" to only affect uploads that are genuinely stuck.

Additional Locations (1)

Fix in Cursor Fix in Web

db_session.commit()
log.info(
"Marked uploads as error after permanent failure",
extra={"upload_ids": upload_ids},
)
except Exception:
log.warning(
"Failed to mark uploads as error (DB may be unreachable)",
extra={"upload_ids": upload_ids},
exc_info=True,
)

def _find_started_uploads_with_reports(
self, db_session, commit: Commit
) -> set[int]:
Expand Down Expand Up @@ -369,6 +404,7 @@ def run_impl(

except SoftTimeLimitExceeded:
log.warning("run_impl: soft time limit exceeded")
self._mark_uploads_as_error(db_session, upload_ids)
self._call_upload_breadcrumb_task(
commit_sha=commitid,
repo_id=repoid,
Expand All @@ -388,6 +424,7 @@ def run_impl(
"Unexpected error in upload finisher",
extra={"upload_ids": upload_ids},
)
self._mark_uploads_as_error(db_session, upload_ids)
self._call_upload_breadcrumb_task(
commit_sha=commitid,
repo_id=repoid,
Expand Down Expand Up @@ -484,6 +521,7 @@ def _process_reports_with_lock(
"repoid": repoid,
},
)
self._mark_uploads_as_error(db_session, upload_ids)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Committed error state alters caller's control flow unintentionally

Medium Severity

When _process_reports_with_lock hits max lock retries, calling _mark_uploads_as_error commits the uploads to "error" state before returning. Back in run_impl, the remaining_uploads query filters on UploadState.UPLOADED — which now finds zero matches — so execution falls through to _handle_finisher_lock, triggering notifications and post-processing even though report merging never happened. Before this change, uploads stayed in UPLOADED state, remaining_uploads > 0 held true, and run_impl returned early.

Additional Locations (1)

Fix in Cursor Fix in Web

self._call_upload_breadcrumb_task(
commit_sha=commitid,
repo_id=repoid,
Expand Down Expand Up @@ -605,6 +643,7 @@ def _handle_finisher_lock(
"repoid": repoid,
},
)
self._mark_uploads_as_error(db_session, upload_ids)
self._call_upload_breadcrumb_task(
commit_sha=commitid,
repo_id=repoid,
Expand Down
Loading