diff --git a/apps/worker/services/processing/processing.py b/apps/worker/services/processing/processing.py index da524de85..c8283a265 100644 --- a/apps/worker/services/processing/processing.py +++ b/apps/worker/services/processing/processing.py @@ -43,8 +43,8 @@ def process_upload( upload = db_session.query(Upload).filter_by(id_=upload_id).first() assert upload - state = ProcessingState(repo_id, commit_sha) - # this in a noop in normal cases, but relevant for task retries: + state = ProcessingState(repo_id, commit_sha, db_session=db_session) + # this is a noop in normal cases, but relevant for task retries: state.mark_uploads_as_processing([upload_id]) report_service = ReportService(commit_yaml) @@ -71,10 +71,12 @@ def process_upload( save_intermediate_report(upload_id, processing_result.report) state.mark_upload_as_processed(upload_id) - # Check if all uploads are now processed and trigger finisher if needed - # This handles the case where a processor task retries outside of a chord - # (e.g., from visibility timeout or task_reject_on_worker_lost) - upload_numbers = state.get_upload_numbers() + # Safety net: trigger finisher if all uploads are done. + # Handles retries outside a chord (visibility timeout, task_reject_on_worker_lost). + # Will be replaced by gate key mechanism in a future PR. + # Use Redis-only state for counting (dual-write keeps Redis in sync). + redis_state = ProcessingState(repo_id, commit_sha) + upload_numbers = redis_state.get_upload_numbers() if should_trigger_postprocessing(upload_numbers): log.info( "All uploads processed, triggering finisher", diff --git a/apps/worker/services/processing/state.py b/apps/worker/services/processing/state.py index 4f713a4e1..2acc7a21d 100644 --- a/apps/worker/services/processing/state.py +++ b/apps/worker/services/processing/state.py @@ -128,12 +128,10 @@ def get_upload_numbers(self): return UploadNumbers(processing, processed) def mark_uploads_as_processing(self, upload_ids: list[int]): - if not upload_ids or self._db_session: + if not upload_ids: return key = self._redis_key("processing") self._redis.sadd(key, *upload_ids) - # Set TTL to match intermediate report expiration (24 hours) - # This ensures state keys don't accumulate indefinitely self._redis.expire(key, PROCESSING_STATE_TTL) def clear_in_progress_uploads(self, upload_ids: list[int]): @@ -159,13 +157,10 @@ def clear_in_progress_uploads(self, upload_ids: list[int]): ) if updated > 0: CLEARED_UPLOADS.inc(updated) + self._redis.srem(self._redis_key("processing"), *upload_ids) return removed_uploads = self._redis.srem(self._redis_key("processing"), *upload_ids) if removed_uploads > 0: - # the normal flow would move the uploads from the "processing" set - # to the "processed" set via `mark_upload_as_processed`. - # this function here is only called in the error case and we don't expect - # this to be triggered often, if at all. CLEARED_UPLOADS.inc(removed_uploads) def mark_upload_as_processed(self, upload_id: int): @@ -176,36 +171,32 @@ def mark_upload_as_processed(self, upload_id: int): # Don't set upload.state here -- the finisher's idempotency check # uses state="processed" to detect already-merged uploads. # The state string is set by update_uploads() after merging. - return processing_key = self._redis_key("processing") processed_key = self._redis_key("processed") res = self._redis.smove(processing_key, processed_key, upload_id) if not res: - # this can happen when `upload_id` was never in the source set, - # which probably is the case during initial deployment as - # the code adding this to the initial set was not deployed yet - # TODO: make sure to remove this code after a grace period self._redis.sadd(processed_key, upload_id) - # Set TTL on processed key to match intermediate report expiration - # This ensures uploads marked as processed have a bounded lifetime self._redis.expire(processed_key, PROCESSING_STATE_TTL) def mark_uploads_as_merged(self, upload_ids: list[int]): if not upload_ids: return if self._db_session: - self._db_session.query(Upload).filter(Upload.id_.in_(upload_ids)).update( + self._db_session.query(Upload).filter( + Upload.id_.in_(upload_ids), + Upload.state_id == UploadState.PROCESSED.db_id, + ).update( { Upload.state_id: UploadState.MERGED.db_id, Upload.state: "merged", }, synchronize_session="fetch", ) + self._redis.srem(self._redis_key("processed"), *upload_ids) return - self._redis.srem(self._redis_key("processed"), *upload_ids) def get_uploads_for_merging(self) -> set[int]: diff --git a/apps/worker/services/tests/test_processing.py b/apps/worker/services/tests/test_processing.py index da435a12c..3ff24d029 100644 --- a/apps/worker/services/tests/test_processing.py +++ b/apps/worker/services/tests/test_processing.py @@ -9,125 +9,21 @@ ) from services.processing.processing import process_upload from services.processing.types import UploadArguments +from shared.reports.enums import UploadState from shared.yaml import UserYaml @pytest.mark.django_db(databases={"default"}) -class TestProcessUploadOrphanedTaskRecovery: - """ - Tests for the orphaned upload recovery mechanism. - - When a processor task retries outside of a chord (e.g., from visibility timeout - or task_reject_on_worker_lost), it should trigger the finisher if all uploads - are now processed. - """ - - def test_triggers_finisher_when_last_upload_completes( - self, dbsession, mocker, mock_storage - ): - """ - Test that finisher is triggered when the last upload completes processing. - - This simulates the scenario where: - 1. A processor task died before ACK, missing the chord callback - 2. Task retries standalone (visibility timeout or worker rejection) - 3. Completes successfully and notices all uploads are done - 4. Triggers finisher to complete the upload flow - """ - # Setup - repository = RepositoryFactory.create() - commit = CommitFactory.create(repository=repository) - upload = UploadFactory.create( - report__commit=commit, - state="started", - ) - dbsession.add_all([repository, commit, upload]) - dbsession.flush() - - arguments: UploadArguments = { - "commit": commit.commitid, - "upload_id": upload.id_, - "version": "v4", - "reportid": str(upload.report.external_id), - } - - # Mock dependencies - mock_report_service = mocker.patch( - "services.processing.processing.ReportService" - ) - mock_processing_result = MagicMock() - mock_processing_result.error = None - mock_processing_result.report = None - mock_report_service.return_value.build_report_from_raw_content.return_value = ( - mock_processing_result - ) - - # Mock ProcessingState to indicate all uploads are processed - mock_state_instance = MagicMock() - mock_state_instance.get_upload_numbers.return_value = MagicMock( - processing=0, - processed=0, # All uploads processed and merged - ) - mocker.patch( - "services.processing.processing.ProcessingState", - return_value=mock_state_instance, - ) - - # Mock should_trigger_postprocessing to return True - mocker.patch( - "services.processing.processing.should_trigger_postprocessing", - return_value=True, - ) - - # Mock celery app to capture finisher task scheduling - mock_celery_app = mocker.patch("services.processing.processing.celery_app") - mock_finisher_task = MagicMock() - mock_celery_app.tasks = {"app.tasks.upload.UploadFinisher": mock_finisher_task} - - # Mock other dependencies - mocker.patch("services.processing.processing.save_intermediate_report") - mocker.patch("services.processing.processing.rewrite_or_delete_upload") - - commit_yaml = UserYaml({}) - - # Execute - result = process_upload( - on_processing_error=lambda error: None, - db_session=dbsession, - repo_id=repository.repoid, - commit_sha=commit.commitid, - commit_yaml=commit_yaml, - arguments=arguments, - ) - - # Verify - assert result["successful"] is True - assert result["upload_id"] == upload.id_ - - # Verify finisher was triggered - mock_finisher_task.apply_async.assert_called_once_with( - kwargs={ - "repoid": repository.repoid, - "commitid": commit.commitid, - "commit_yaml": commit_yaml.to_dict(), - } - ) - - def test_does_not_trigger_finisher_when_uploads_still_processing( +class TestProcessUpload: + def test_successful_processing_marks_upload_as_processed( self, dbsession, mocker, mock_storage ): - """ - Test that finisher is NOT triggered when other uploads are still processing. - - This verifies we don't prematurely trigger the finisher when only some - uploads have completed. - """ - # Setup repository = RepositoryFactory.create() commit = CommitFactory.create(repository=repository) upload = UploadFactory.create( report__commit=commit, state="started", + state_id=UploadState.UPLOADED.db_id, ) dbsession.add_all([repository, commit, upload]) dbsession.flush() @@ -139,7 +35,6 @@ def test_does_not_trigger_finisher_when_uploads_still_processing( "reportid": str(upload.report.external_id), } - # Mock dependencies mock_report_service = mocker.patch( "services.processing.processing.ReportService" ) @@ -150,47 +45,23 @@ def test_does_not_trigger_finisher_when_uploads_still_processing( mock_processing_result ) - # Mock ProcessingState to indicate other uploads still processing - mock_state_instance = MagicMock() - mock_state_instance.get_upload_numbers.return_value = MagicMock( - processing=2, - processed=1, # Other uploads still in progress - ) - mocker.patch( - "services.processing.processing.ProcessingState", - return_value=mock_state_instance, - ) - - # Mock should_trigger_postprocessing to return False - mocker.patch( - "services.processing.processing.should_trigger_postprocessing", - return_value=False, - ) - - # Mock celery app to capture finisher task scheduling - mock_celery_app = mocker.patch("services.processing.processing.celery_app") - mock_finisher_task = MagicMock() - mock_celery_app.tasks = {"app.tasks.upload.UploadFinisher": mock_finisher_task} - - # Mock other dependencies mocker.patch("services.processing.processing.save_intermediate_report") mocker.patch("services.processing.processing.rewrite_or_delete_upload") - commit_yaml = UserYaml({}) - - # Execute result = process_upload( on_processing_error=lambda error: None, db_session=dbsession, repo_id=repository.repoid, commit_sha=commit.commitid, - commit_yaml=commit_yaml, + commit_yaml=UserYaml({}), arguments=arguments, ) - # Verify assert result["successful"] is True assert result["upload_id"] == upload.id_ - # Verify finisher was NOT triggered - mock_finisher_task.apply_async.assert_not_called() + dbsession.refresh(upload) + assert upload.state_id == UploadState.PROCESSED.db_id + # state string is not updated by the processor -- the finisher sets it + # after merging (to avoid triggering the finisher's idempotency check early) + assert upload.state == "started" diff --git a/apps/worker/tasks/tests/unit/test_upload_finisher_task.py b/apps/worker/tasks/tests/unit/test_upload_finisher_task.py index 31f806f18..e325ef65b 100644 --- a/apps/worker/tasks/tests/unit/test_upload_finisher_task.py +++ b/apps/worker/tasks/tests/unit/test_upload_finisher_task.py @@ -16,7 +16,6 @@ from helpers.exceptions import RepositoryWithoutValidBotError from helpers.log_context import LogContext, set_log_context from services.lock_manager import LockRetry -from services.processing.intermediate import intermediate_report_key from services.processing.merging import get_joined_flag, update_uploads from services.processing.types import MergeResult, ProcessingResult from services.timeseries import MeasurementName @@ -1064,11 +1063,12 @@ def test_idempotency_check_skips_already_processed_uploads( dbsession.add(report) dbsession.flush() - # Create uploads that are already in "processed" state upload_1 = UploadFactory.create(report=report, state="processed") upload_2 = UploadFactory.create(report=report, state="error") + upload_3 = UploadFactory.create(report=report, state="merged") dbsession.add(upload_1) dbsession.add(upload_2) + dbsession.add(upload_3) dbsession.flush() # Mock the _process_reports_with_lock to verify it's NOT called @@ -1079,6 +1079,7 @@ def test_idempotency_check_skips_already_processed_uploads( previous_results = [ {"upload_id": upload_1.id, "successful": True, "arguments": {}}, {"upload_id": upload_2.id, "successful": False, "arguments": {}}, + {"upload_id": upload_3.id, "successful": True, "arguments": {}}, ] result = UploadFinisherTask().run_impl( @@ -1089,13 +1090,11 @@ def test_idempotency_check_skips_already_processed_uploads( commit_yaml={}, ) - # Verify that the finisher skipped all work assert result == { "already_completed": True, - "upload_ids": [upload_1.id, upload_2.id], + "upload_ids": [upload_1.id, upload_2.id, upload_3.id], } - # Verify that _process_reports_with_lock was NOT called mock_process.assert_not_called() @pytest.mark.django_db @@ -1139,112 +1138,21 @@ def test_idempotency_check_proceeds_when_uploads_not_finished( # Verify that _process_reports_with_lock WAS called mock_process.assert_called_once() - @pytest.mark.django_db - def test_reconstruct_processing_results_falls_back_to_database_when_redis_expires( - self, - dbsession, - mocker, - mock_storage, - mock_repo_provider, - mock_redis, - mock_self_app, - ): - """Test that finisher falls back to database when Redis ProcessingState expires. - - This tests the edge case where Redis keys expire after 24h TTL, but uploads - were processed and have intermediate reports. The finisher should find them - via database query and include them in the final report. - """ - commit = CommitFactory.create() - dbsession.add(commit) - dbsession.flush() - - report = CommitReport(commit_id=commit.id_) - dbsession.add(report) - dbsession.flush() - - # Create uploads in "started" state (simulating Redis state expired) - upload_1 = UploadFactory.create( - report=report, state="started", state_id=UploadState.UPLOADED.db_id - ) - upload_2 = UploadFactory.create( - report=report, state="started", state_id=UploadState.UPLOADED.db_id - ) - dbsession.add(upload_1) - dbsession.add(upload_2) - dbsession.flush() - - # Mock Redis to simulate intermediate reports exist (confirms uploads were processed) - mock_redis.exists.side_effect = lambda key: ( - key == intermediate_report_key(upload_1.id) - or key == intermediate_report_key(upload_2.id) - ) - - # Mock ProcessingState to return empty (simulating Redis expiration) - mock_state = mocker.MagicMock() - mock_state.get_uploads_for_merging.return_value = set() # Redis expired - mock_state.get_upload_numbers.return_value = mocker.MagicMock( - processing=0, processed=0 - ) - mocker.patch("tasks.upload_finisher.ProcessingState", return_value=mock_state) - - # Mock the processing methods - mocker.patch("tasks.upload_finisher.load_intermediate_reports", return_value=[]) - mocker.patch("tasks.upload_finisher.update_uploads") - mock_process = mocker.patch.object( - UploadFinisherTask, "_process_reports_with_lock" - ) - - # Call run_impl without processing_results to trigger reconstruction - task = UploadFinisherTask() - task.run_impl( - dbsession, - processing_results=None, # Triggers reconstruction - repoid=commit.repoid, - commitid=commit.commitid, - commit_yaml={}, - ) - - # Verify that _find_started_uploads_with_reports was called (via reconstruction) - # This is verified by checking that _process_reports_with_lock was called - # with processing_results containing our uploads - mock_process.assert_called_once() - call_args = mock_process.call_args - # processing_results is the 4th positional argument (index 0 is args tuple) - processing_results = call_args[0][3] - - # Verify both uploads are included in processing_results - upload_ids_in_results = {r["upload_id"] for r in processing_results} - assert upload_1.id in upload_ids_in_results - assert upload_2.id in upload_ids_in_results - assert len(processing_results) == 2 - - # Verify both are marked as successful (have intermediate reports) - assert all(r["successful"] for r in processing_results) - @pytest.mark.django_db def test_reconstruct_processing_results_returns_empty_when_no_uploads_found( self, dbsession, mocker, mock_redis, mock_self_app ): - """Test that finisher returns empty list when no uploads found in Redis or DB. - - This tests the edge case where Redis expires AND no uploads exist in database - in "started" state with intermediate reports. - """ + """Test that reconstruction returns empty when no PROCESSED uploads exist.""" commit = CommitFactory.create() dbsession.add(commit) dbsession.flush() - # Mock ProcessingState to return empty (simulating Redis expiration) mock_state = mocker.MagicMock() - mock_state.get_uploads_for_merging.return_value = set() # Redis expired - mocker.patch("tasks.upload_finisher.ProcessingState", return_value=mock_state) + mock_state.get_uploads_for_merging.return_value = set() - # Call run_impl without processing_results to trigger reconstruction task = UploadFinisherTask() result = task._reconstruct_processing_results(dbsession, mock_state, commit) - # Verify empty list returned when no uploads found assert result == [] @pytest.mark.django_db diff --git a/apps/worker/tasks/upload_finisher.py b/apps/worker/tasks/upload_finisher.py index 498b6f104..102a458f1 100644 --- a/apps/worker/tasks/upload_finisher.py +++ b/apps/worker/tasks/upload_finisher.py @@ -83,108 +83,35 @@ class UploadFinisherTask(BaseCodecovTask, name=upload_finisher_task_name): max_retries = UPLOAD_PROCESSOR_MAX_RETRIES - def _find_started_uploads_with_reports( - self, db_session, commit: Commit - ) -> set[int]: - """Find uploads in "started" state that have intermediate reports in Redis. - - This is the fallback when Redis ProcessingState has expired (TTL: PROCESSING_STATE_TTL). - We check the database for uploads that were processed but never finalized, - and verify they have intermediate reports before including them. - """ - # Query for uploads in "started" state for this commit - started_uploads = ( - db_session.query(Upload) - .join(Upload.report) - .filter( - Upload.report.has(commit=commit), - Upload.state == "started", - Upload.state_id == UploadState.UPLOADED.db_id, - ) - .all() - ) - - if not started_uploads: - return set() - - log.info( - "Found uploads in started state, checking for intermediate reports", - extra={ - "upload_ids": [u.id_ for u in started_uploads], - "count": len(started_uploads), - }, - ) - - # Check which uploads have intermediate reports (confirms they were processed) - redis_connection = get_redis_connection() - upload_ids_with_reports = set() - - for upload in started_uploads: - report_key = intermediate_report_key(upload.id_) - if redis_connection.exists(report_key): - upload_ids_with_reports.add(upload.id_) - else: - log.warning( - "Upload in started state but no intermediate report found (may have expired)", - extra={"upload_id": upload.id_}, - ) - - return upload_ids_with_reports - def _reconstruct_processing_results( self, db_session, state: ProcessingState, commit: Commit ) -> list[ProcessingResult]: """Reconstruct processing_results from ProcessingState when finisher is triggered outside of a chord (e.g., from orphaned upload recovery). - This ensures ALL uploads that were marked as processed in Redis are included - in the final merged report, even if they completed via retry/recovery. - - If Redis state has expired (TTL: PROCESSING_STATE_TTL), falls back to database - to find uploads in "started" state that have intermediate reports, preventing data loss. + Uses DB state (Upload.state_id = PROCESSED) as the authoritative source of + which uploads are ready to merge. """ - - # Get all upload IDs that are ready to be merged (in "processed" set) upload_ids = state.get_uploads_for_merging() if not upload_ids: - log.warning( - "No uploads found in Redis processed set, checking database for started uploads", - extra={"repoid": commit.repoid, "commitid": commit.commitid}, - ) - # Fallback: Redis state expired (TTL: PROCESSING_STATE_TTL), check DB for uploads - # in "started" state that might have been processed but never finalized - upload_ids = self._find_started_uploads_with_reports(db_session, commit) - - if not upload_ids: - log.warning( - "No started uploads with intermediate reports found in database", - extra={"repoid": commit.repoid, "commitid": commit.commitid}, - ) - return [] - log.info( - "Found started uploads with intermediate reports (Redis state expired)", - extra={ - "upload_ids": list(upload_ids), - "count": len(upload_ids), - }, + "No uploads in PROCESSED state found for merging", + extra={"repoid": commit.repoid, "commitid": commit.commitid}, ) + return [] log.info( - "Reconstructing processing results from ProcessingState", + "Reconstructing processing results from DB state", extra={"upload_ids": list(upload_ids), "count": len(upload_ids)}, ) - # Load Upload records from database to get arguments uploads = db_session.query(Upload).filter(Upload.id_.in_(upload_ids)).all() - # Check which uploads have intermediate reports in Redis redis_connection = get_redis_connection() processing_results = [] for upload in uploads: - # Check if intermediate report exists (indicates successful processing) report_key = intermediate_report_key(upload.id_) has_report = redis_connection.exists(report_key) @@ -193,7 +120,7 @@ def _reconstruct_processing_results( "arguments": { "commit": commit.commitid, "upload_id": upload.id_, - "version": "v4", # Assume v4 for recovered uploads + "version": "v4", "reportid": str(upload.report.external_id), }, "successful": bool(has_report), @@ -201,7 +128,7 @@ def _reconstruct_processing_results( if not has_report: log.warning( - "Upload in processed set but no intermediate report found", + "Upload in PROCESSED state but no intermediate report found", extra={"upload_id": upload.id_}, ) processing_result["error"] = { @@ -211,15 +138,6 @@ def _reconstruct_processing_results( processing_results.append(processing_result) - log.info( - "Reconstructed processing results", - extra={ - "total_uploads": len(processing_results), - "successful": sum(1 for r in processing_results if r["successful"]), - "failed": sum(1 for r in processing_results if not r["successful"]), - }, - ) - return processing_results def run_impl( @@ -260,7 +178,7 @@ def run_impl( log.info("run_impl: Got commit") - state = ProcessingState(repoid, commitid) + state = ProcessingState(repoid, commitid, db_session=db_session) # If processing_results not provided (e.g., from orphaned upload recovery), # reconstruct it from ProcessingState to ensure ALL uploads are included @@ -291,7 +209,8 @@ def run_impl( # Only skip if ALL uploads exist in DB and ALL are in final states if len(uploads_in_db) == len(upload_ids): all_already_processed = all( - upload.state in ("processed", "error") for upload in uploads_in_db + upload.state in ("processed", "merged", "error") + for upload in uploads_in_db ) if all_already_processed: log.info(