Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 9 additions & 12 deletions apps/worker/tasks/tests/unit/test_upload_finisher_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -1047,16 +1047,18 @@ def test_generic_exception_handling(self, dbsession, mocker, mock_self_app):
)

@pytest.mark.django_db
def test_idempotency_check_skips_already_processed_uploads(
def test_idempotency_check_skips_already_merged_uploads(
self, dbsession, mocker, mock_self_app
):
"""Test that finisher skips work if all uploads are already in final state.
"""Regression: 'merged' must be treated as a final state in the idempotency check.

This test validates the idempotency check that prevents wasted work when:
- Multiple finishers are triggered (e.g., visibility timeout re-queuing)
- Finisher is manually retried
After #766 changed successful uploads from 'processed' to 'merged', the
idempotency check only recognised ('processed', 'error') as final states.
Re-triggered finishers bypassed the check, ran finish_reports_processing
again, and enqueued duplicate save_commit_measurements tasks — flooding
the timeseries queue to 200K+.

The check only skips when ALL uploads exist in DB and are in final states.
The fix drops 'processed' (now a dead state) and uses ('merged', 'error').
"""
commit = CommitFactory.create()
dbsession.add(commit)
Expand All @@ -1066,14 +1068,12 @@ def test_idempotency_check_skips_already_processed_uploads(
dbsession.add(report)
dbsession.flush()

# Create uploads that are already in "processed" state
upload_1 = UploadFactory.create(report=report, state="processed")
upload_1 = UploadFactory.create(report=report, state="merged")
upload_2 = UploadFactory.create(report=report, state="error")
dbsession.add(upload_1)
dbsession.add(upload_2)
dbsession.flush()

# Mock the _process_reports_with_lock to verify it's NOT called
mock_process = mocker.patch.object(
UploadFinisherTask, "_process_reports_with_lock"
)
Expand All @@ -1091,13 +1091,10 @@ def test_idempotency_check_skips_already_processed_uploads(
commit_yaml={},
)

# Verify that the finisher skipped all work
assert result == {
"already_completed": True,
"upload_ids": [upload_1.id, upload_2.id],
}

# Verify that _process_reports_with_lock was NOT called
mock_process.assert_not_called()

@pytest.mark.django_db
Expand Down
6 changes: 3 additions & 3 deletions apps/worker/tasks/upload_finisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,10 +293,10 @@ def run_impl(
)
# Only skip if ALL uploads exist in DB and ALL are in final states
if len(uploads_in_db) == len(upload_ids):
all_already_processed = all(
upload.state in ("processed", "error") for upload in uploads_in_db
all_already_merged = all(
upload.state in ("merged", "error") for upload in uploads_in_db
)
if all_already_processed:
if all_already_merged:
log.info(
"All uploads already in final state, skipping finisher work",
extra={
Expand Down
Loading