From e681079a4d17e271896091a0e994d86467d39340 Mon Sep 17 00:00:00 2001 From: Joe Becher Date: Sun, 15 Mar 2026 20:27:31 -0400 Subject: [PATCH] fix(worker): treat 'merged' as the only successful final state in finisher idempotency check MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The idempotency check used ('processed', 'error') as terminal upload states. After #766 changed successful uploads to 'merged', 'processed' became a dead state that will never be set again. Re-triggered finishers saw uploads in 'merged' state, concluded they weren't finished, and ran finish_reports_processing again — enqueuing duplicate save_commit_measurements tasks and flooding the timeseries queue to 200K+. Drop 'processed' from the check and use ('merged', 'error') only. Co-Authored-By: Claude --- .../tests/unit/test_upload_finisher_task.py | 21 ++++++++----------- apps/worker/tasks/upload_finisher.py | 6 +++--- 2 files changed, 12 insertions(+), 15 deletions(-) diff --git a/apps/worker/tasks/tests/unit/test_upload_finisher_task.py b/apps/worker/tasks/tests/unit/test_upload_finisher_task.py index bd83143b3..40e6931b6 100644 --- a/apps/worker/tasks/tests/unit/test_upload_finisher_task.py +++ b/apps/worker/tasks/tests/unit/test_upload_finisher_task.py @@ -1047,16 +1047,18 @@ def test_generic_exception_handling(self, dbsession, mocker, mock_self_app): ) @pytest.mark.django_db - def test_idempotency_check_skips_already_processed_uploads( + def test_idempotency_check_skips_already_merged_uploads( self, dbsession, mocker, mock_self_app ): - """Test that finisher skips work if all uploads are already in final state. + """Regression: 'merged' must be treated as a final state in the idempotency check. - This test validates the idempotency check that prevents wasted work when: - - Multiple finishers are triggered (e.g., visibility timeout re-queuing) - - Finisher is manually retried + After #766 changed successful uploads from 'processed' to 'merged', the + idempotency check only recognised ('processed', 'error') as final states. + Re-triggered finishers bypassed the check, ran finish_reports_processing + again, and enqueued duplicate save_commit_measurements tasks — flooding + the timeseries queue to 200K+. - The check only skips when ALL uploads exist in DB and are in final states. + The fix drops 'processed' (now a dead state) and uses ('merged', 'error'). """ commit = CommitFactory.create() dbsession.add(commit) @@ -1066,14 +1068,12 @@ def test_idempotency_check_skips_already_processed_uploads( dbsession.add(report) dbsession.flush() - # Create uploads that are already in "processed" state - upload_1 = UploadFactory.create(report=report, state="processed") + upload_1 = UploadFactory.create(report=report, state="merged") upload_2 = UploadFactory.create(report=report, state="error") dbsession.add(upload_1) dbsession.add(upload_2) dbsession.flush() - # Mock the _process_reports_with_lock to verify it's NOT called mock_process = mocker.patch.object( UploadFinisherTask, "_process_reports_with_lock" ) @@ -1091,13 +1091,10 @@ def test_idempotency_check_skips_already_processed_uploads( commit_yaml={}, ) - # Verify that the finisher skipped all work assert result == { "already_completed": True, "upload_ids": [upload_1.id, upload_2.id], } - - # Verify that _process_reports_with_lock was NOT called mock_process.assert_not_called() @pytest.mark.django_db diff --git a/apps/worker/tasks/upload_finisher.py b/apps/worker/tasks/upload_finisher.py index 7c9daed09..c5f2c992c 100644 --- a/apps/worker/tasks/upload_finisher.py +++ b/apps/worker/tasks/upload_finisher.py @@ -293,10 +293,10 @@ def run_impl( ) # Only skip if ALL uploads exist in DB and ALL are in final states if len(uploads_in_db) == len(upload_ids): - all_already_processed = all( - upload.state in ("processed", "error") for upload in uploads_in_db + all_already_merged = all( + upload.state in ("merged", "error") for upload in uploads_in_db ) - if all_already_processed: + if all_already_merged: log.info( "All uploads already in final state, skipping finisher work", extra={