diff --git a/apps/worker/services/processing/finisher_gate.py b/apps/worker/services/processing/finisher_gate.py new file mode 100644 index 0000000000..0b09591ee3 --- /dev/null +++ b/apps/worker/services/processing/finisher_gate.py @@ -0,0 +1,33 @@ +from shared.helpers.redis import get_redis_connection + +FINISHER_GATE_KEY_PREFIX = "upload_merger_lock" +FINISHER_GATE_TTL_SECONDS = 900 + + +def finisher_gate_key(repo_id: int, commit_sha: str) -> str: + return f"{FINISHER_GATE_KEY_PREFIX}_{repo_id}_{commit_sha}" + + +def try_acquire_finisher_gate(repo_id: int, commit_sha: str) -> bool: + return bool( + get_redis_connection().set( + finisher_gate_key(repo_id, commit_sha), + "1", + nx=True, + ex=FINISHER_GATE_TTL_SECONDS, + ) + ) + + +def refresh_finisher_gate_ttl(repo_id: int, commit_sha: str) -> None: + get_redis_connection().expire( + finisher_gate_key(repo_id, commit_sha), FINISHER_GATE_TTL_SECONDS + ) + + +def delete_finisher_gate(repo_id: int, commit_sha: str) -> None: + get_redis_connection().delete(finisher_gate_key(repo_id, commit_sha)) + + +def finisher_gate_exists(repo_id: int, commit_sha: str) -> bool: + return bool(get_redis_connection().exists(finisher_gate_key(repo_id, commit_sha))) diff --git a/apps/worker/services/processing/merging.py b/apps/worker/services/processing/merging.py index bc5b63d4d2..b5428c00c4 100644 --- a/apps/worker/services/processing/merging.py +++ b/apps/worker/services/processing/merging.py @@ -12,6 +12,7 @@ from services.yaml.reader import read_yaml_field from shared.reports.enums import UploadState from shared.reports.resources import Report, ReportTotals +from shared.upload.constants import UploadErrorCode from shared.utils.sessions import SessionType from shared.yaml import UserYaml @@ -129,21 +130,23 @@ def update_uploads( if result["successful"]: update = { - "state_id": UploadState.PROCESSED.db_id, - "state": "processed", + "state_id": UploadState.MERGED.db_id, + "state": "merged", } report = reports.get(upload_id) if report is not None: all_totals.append(make_totals(upload_id, report.totals)) - elif result["error"]: + else: update = { "state_id": UploadState.ERROR.db_id, "state": "error", } error = UploadError( upload_id=upload_id, - error_code=result["error"]["code"], - error_params=result["error"]["params"], + error_code=result["error"]["code"] + if result.get("error") + else UploadErrorCode.UNKNOWN_PROCESSING, + error_params=result["error"]["params"] if result.get("error") else {}, ) all_errors.append(error) diff --git a/apps/worker/services/processing/processing.py b/apps/worker/services/processing/processing.py index da524de85b..1e23a69b37 100644 --- a/apps/worker/services/processing/processing.py +++ b/apps/worker/services/processing/processing.py @@ -9,6 +9,10 @@ from database.models.core import Commit from database.models.reports import Upload from helpers.reports import delete_archive_setting +from services.processing.finisher_gate import ( + finisher_gate_key, + try_acquire_finisher_gate, +) from services.report import ProcessingError, RawReportInfo, ReportService from services.report.parser.types import VersionOneParsedRawReport from shared.api_archive.archive import ArchiveService @@ -16,7 +20,7 @@ from shared.yaml import UserYaml from .intermediate import save_intermediate_report -from .state import ProcessingState, should_trigger_postprocessing +from .state import ProcessingState, should_perform_merge from .types import ProcessingResult, UploadArguments log = logging.getLogger(__name__) @@ -43,8 +47,8 @@ def process_upload( upload = db_session.query(Upload).filter_by(id_=upload_id).first() assert upload - state = ProcessingState(repo_id, commit_sha) - # this in a noop in normal cases, but relevant for task retries: + state = ProcessingState(repo_id, commit_sha, db_session=db_session) + # this is a noop in normal cases, but relevant for task retries: state.mark_uploads_as_processing([upload_id]) report_service = ReportService(commit_yaml) @@ -70,28 +74,29 @@ def process_upload( if processing_result.report: save_intermediate_report(upload_id, processing_result.report) state.mark_upload_as_processed(upload_id) + db_session.commit() - # Check if all uploads are now processed and trigger finisher if needed - # This handles the case where a processor task retries outside of a chord - # (e.g., from visibility timeout or task_reject_on_worker_lost) upload_numbers = state.get_upload_numbers() - if should_trigger_postprocessing(upload_numbers): - log.info( - "All uploads processed, triggering finisher", - extra={ - "repo_id": repo_id, - "commit_sha": commit_sha, - "upload_id": upload_id, - }, - ) - celery_app.tasks[upload_finisher_task_name].apply_async( - kwargs={ + if should_perform_merge(upload_numbers): + gate_key = finisher_gate_key(repo_id, commit_sha) + if try_acquire_finisher_gate(repo_id, commit_sha): + log.info( + "Enqueuing upload finisher via gate", + extra={ + "repo_id": repo_id, + "commit_sha": commit_sha, + "upload_id": upload_id, + "gate_key": gate_key, + }, + ) + finisher_kwargs = { "repoid": repo_id, "commitid": commit_sha, "commit_yaml": commit_yaml.to_dict(), } - ) - + celery_app.tasks[upload_finisher_task_name].apply_async( + kwargs=finisher_kwargs + ) rewrite_or_delete_upload(archive_service, commit_yaml, report_info) except CeleryError: diff --git a/apps/worker/services/processing/state.py b/apps/worker/services/processing/state.py index 4a1c7d973b..2fe338c0d2 100644 --- a/apps/worker/services/processing/state.py +++ b/apps/worker/services/processing/state.py @@ -21,17 +21,21 @@ "intermediate report". """ +import logging from dataclasses import dataclass -from shared.helpers.redis import get_redis_connection +from sqlalchemy import case, func +from sqlalchemy.orm import Session + +from database.enums import ReportType +from database.models.core import Commit +from database.models.reports import CommitReport, Upload from shared.metrics import Counter +from shared.reports.enums import UploadState -MERGE_BATCH_SIZE = 10 +log = logging.getLogger(__name__) -# TTL for processing state keys in Redis (24 hours, matches intermediate report TTL) -# This prevents state keys from accumulating indefinitely and ensures consistency -# with intermediate report expiration -PROCESSING_STATE_TTL = 24 * 60 * 60 +MERGE_BATCH_SIZE = 10 CLEARED_UPLOADS = Counter( "worker_processing_cleared_uploads", @@ -75,64 +79,133 @@ def should_trigger_postprocessing(uploads: UploadNumbers) -> bool: class ProcessingState: - def __init__(self, repoid: int, commitsha: str) -> None: - self._redis = get_redis_connection() + def __init__(self, repoid: int, commitsha: str, db_session: Session) -> None: self.repoid = repoid self.commitsha = commitsha + self._db_session = db_session def get_upload_numbers(self): - processing = self._redis.scard(self._redis_key("processing")) - processed = self._redis.scard(self._redis_key("processed")) - return UploadNumbers(processing, processed) + row = ( + self._db_session.query( + func.count( + case( + ( + Upload.state_id == UploadState.UPLOADED.db_id, + Upload.id_, + ), + ) + ), + func.count( + case( + ( + Upload.state_id == UploadState.PROCESSED.db_id, + Upload.id_, + ), + ) + ), + ) + .join(CommitReport, Upload.report_id == CommitReport.id_) + .join(Commit, CommitReport.commit_id == Commit.id_) + .filter( + Commit.repoid == self.repoid, + Commit.commitid == self.commitsha, + (CommitReport.report_type == None) # noqa: E711 + | (CommitReport.report_type == ReportType.COVERAGE.value), + ) + .one() + ) + return UploadNumbers(processing=row[0], processed=row[1]) def mark_uploads_as_processing(self, upload_ids: list[int]): - if not upload_ids: - return - key = self._redis_key("processing") - self._redis.sadd(key, *upload_ids) - # Set TTL to match intermediate report expiration (24 hours) - # This ensures state keys don't accumulate indefinitely - self._redis.expire(key, PROCESSING_STATE_TTL) + # No-op: uploads are created with state_id=UPLOADED, which + # get_upload_numbers() already counts as "processing". + pass def clear_in_progress_uploads(self, upload_ids: list[int]): if not upload_ids: return - removed_uploads = self._redis.srem(self._redis_key("processing"), *upload_ids) - if removed_uploads > 0: - # the normal flow would move the uploads from the "processing" set - # to the "processed" set via `mark_upload_as_processed`. - # this function here is only called in the error case and we don't expect - # this to be triggered often, if at all. - CLEARED_UPLOADS.inc(removed_uploads) + # Mark still-UPLOADED uploads as ERROR so they stop being counted + # as "processing" in get_upload_numbers(). Only matches UPLOADED -- + # already-PROCESSED uploads (success path) are unaffected. + # + # This runs in a finally block, so the transaction may already be + # in a failed state. Best-effort: log and move on if the DB is + # unreachable — the upload stays UPLOADED, which is safe. + try: + updated = ( + self._db_session.query(Upload) + .filter( + Upload.id_.in_(upload_ids), + Upload.state_id == UploadState.UPLOADED.db_id, + ) + .update( + { + Upload.state_id: UploadState.ERROR.db_id, + Upload.state: "error", + }, + synchronize_session="fetch", + ) + ) + if updated > 0: + CLEARED_UPLOADS.inc(updated) + except Exception: + log.warning( + "Failed to clear in-progress uploads (transaction may be aborted)", + extra={"upload_ids": upload_ids}, + exc_info=True, + ) def mark_upload_as_processed(self, upload_id: int): - processing_key = self._redis_key("processing") - processed_key = self._redis_key("processed") - - res = self._redis.smove(processing_key, processed_key, upload_id) - if not res: - # this can happen when `upload_id` was never in the source set, - # which probably is the case during initial deployment as - # the code adding this to the initial set was not deployed yet - # TODO: make sure to remove this code after a grace period - self._redis.sadd(processed_key, upload_id) - - # Set TTL on processed key to match intermediate report expiration - # This ensures uploads marked as processed have a bounded lifetime - self._redis.expire(processed_key, PROCESSING_STATE_TTL) + upload = self._db_session.query(Upload).get(upload_id) + if upload: + upload.state_id = UploadState.PROCESSED.db_id + # Don't set upload.state here -- the finisher's idempotency check + # uses state="processed" to detect already-merged uploads. + # The state string is set by update_uploads() after merging. def mark_uploads_as_merged(self, upload_ids: list[int]): if not upload_ids: return - self._redis.srem(self._redis_key("processed"), *upload_ids) + self._db_session.query(Upload).filter( + Upload.id_.in_(upload_ids), + Upload.state_id == UploadState.PROCESSED.db_id, + ).update( + { + Upload.state_id: UploadState.MERGED.db_id, + Upload.state: "merged", + }, + synchronize_session="fetch", + ) + self._db_session.commit() def get_uploads_for_merging(self) -> set[int]: - return { - int(id) - for id in self._redis.srandmember( - self._redis_key("processed"), MERGE_BATCH_SIZE + rows = ( + self._db_session.query(Upload.id_) + .join(CommitReport, Upload.report_id == CommitReport.id_) + .join(Commit, CommitReport.commit_id == Commit.id_) + .filter( + Commit.repoid == self.repoid, + Commit.commitid == self.commitsha, + (CommitReport.report_type == None) # noqa: E711 + | (CommitReport.report_type == ReportType.COVERAGE.value), + Upload.state_id == UploadState.PROCESSED.db_id, ) - } - - def _redis_key(self, state: str) -> str: - return f"upload-processing-state/{self.repoid}/{self.commitsha}/{state}" + .limit(MERGE_BATCH_SIZE) + .all() + ) + return {row[0] for row in rows} + + def count_remaining_coverage_uploads(self) -> int: + return ( + self._db_session.query(Upload) + .join(CommitReport, Upload.report_id == CommitReport.id_) + .join(Commit, CommitReport.commit_id == Commit.id_) + .filter( + Commit.repoid == self.repoid, + Commit.commitid == self.commitsha, + (CommitReport.report_type == None) # noqa: E711 + | (CommitReport.report_type == ReportType.COVERAGE.value), + Upload.state_id == UploadState.UPLOADED.db_id, + ) + .count() + ) diff --git a/apps/worker/services/tests/test_processing.py b/apps/worker/services/tests/test_processing.py index da435a12c3..6ba1bf4445 100644 --- a/apps/worker/services/tests/test_processing.py +++ b/apps/worker/services/tests/test_processing.py @@ -13,27 +13,8 @@ @pytest.mark.django_db(databases={"default"}) -class TestProcessUploadOrphanedTaskRecovery: - """ - Tests for the orphaned upload recovery mechanism. - - When a processor task retries outside of a chord (e.g., from visibility timeout - or task_reject_on_worker_lost), it should trigger the finisher if all uploads - are now processed. - """ - - def test_triggers_finisher_when_last_upload_completes( - self, dbsession, mocker, mock_storage - ): - """ - Test that finisher is triggered when the last upload completes processing. - - This simulates the scenario where: - 1. A processor task died before ACK, missing the chord callback - 2. Task retries standalone (visibility timeout or worker rejection) - 3. Completes successfully and notices all uploads are done - 4. Triggers finisher to complete the upload flow - """ +class TestProcessUpload: + def test_processor_does_not_enqueue_finisher(self, dbsession, mocker, mock_storage): # Setup repository = RepositoryFactory.create() commit = CommitFactory.create(repository=repository) @@ -73,105 +54,6 @@ def test_triggers_finisher_when_last_upload_completes( return_value=mock_state_instance, ) - # Mock should_trigger_postprocessing to return True - mocker.patch( - "services.processing.processing.should_trigger_postprocessing", - return_value=True, - ) - - # Mock celery app to capture finisher task scheduling - mock_celery_app = mocker.patch("services.processing.processing.celery_app") - mock_finisher_task = MagicMock() - mock_celery_app.tasks = {"app.tasks.upload.UploadFinisher": mock_finisher_task} - - # Mock other dependencies - mocker.patch("services.processing.processing.save_intermediate_report") - mocker.patch("services.processing.processing.rewrite_or_delete_upload") - - commit_yaml = UserYaml({}) - - # Execute - result = process_upload( - on_processing_error=lambda error: None, - db_session=dbsession, - repo_id=repository.repoid, - commit_sha=commit.commitid, - commit_yaml=commit_yaml, - arguments=arguments, - ) - - # Verify - assert result["successful"] is True - assert result["upload_id"] == upload.id_ - - # Verify finisher was triggered - mock_finisher_task.apply_async.assert_called_once_with( - kwargs={ - "repoid": repository.repoid, - "commitid": commit.commitid, - "commit_yaml": commit_yaml.to_dict(), - } - ) - - def test_does_not_trigger_finisher_when_uploads_still_processing( - self, dbsession, mocker, mock_storage - ): - """ - Test that finisher is NOT triggered when other uploads are still processing. - - This verifies we don't prematurely trigger the finisher when only some - uploads have completed. - """ - # Setup - repository = RepositoryFactory.create() - commit = CommitFactory.create(repository=repository) - upload = UploadFactory.create( - report__commit=commit, - state="started", - ) - dbsession.add_all([repository, commit, upload]) - dbsession.flush() - - arguments: UploadArguments = { - "commit": commit.commitid, - "upload_id": upload.id_, - "version": "v4", - "reportid": str(upload.report.external_id), - } - - # Mock dependencies - mock_report_service = mocker.patch( - "services.processing.processing.ReportService" - ) - mock_processing_result = MagicMock() - mock_processing_result.error = None - mock_processing_result.report = None - mock_report_service.return_value.build_report_from_raw_content.return_value = ( - mock_processing_result - ) - - # Mock ProcessingState to indicate other uploads still processing - mock_state_instance = MagicMock() - mock_state_instance.get_upload_numbers.return_value = MagicMock( - processing=2, - processed=1, # Other uploads still in progress - ) - mocker.patch( - "services.processing.processing.ProcessingState", - return_value=mock_state_instance, - ) - - # Mock should_trigger_postprocessing to return False - mocker.patch( - "services.processing.processing.should_trigger_postprocessing", - return_value=False, - ) - - # Mock celery app to capture finisher task scheduling - mock_celery_app = mocker.patch("services.processing.processing.celery_app") - mock_finisher_task = MagicMock() - mock_celery_app.tasks = {"app.tasks.upload.UploadFinisher": mock_finisher_task} - # Mock other dependencies mocker.patch("services.processing.processing.save_intermediate_report") mocker.patch("services.processing.processing.rewrite_or_delete_upload") @@ -192,5 +74,4 @@ def test_does_not_trigger_finisher_when_uploads_still_processing( assert result["successful"] is True assert result["upload_id"] == upload.id_ - # Verify finisher was NOT triggered - mock_finisher_task.apply_async.assert_not_called() + # Finisher enqueue is handled elsewhere; processor no longer triggers it directly. diff --git a/apps/worker/services/tests/test_processing_state.py b/apps/worker/services/tests/test_processing_state.py index 0416e49a63..0382a6f6e4 100644 --- a/apps/worker/services/tests/test_processing_state.py +++ b/apps/worker/services/tests/test_processing_state.py @@ -54,10 +54,10 @@ def test_batch_merging_many_uploads(): for id in range(1, 12): state.mark_upload_as_processed(id) - # we have only processed 8 out of 9. we want to do a batched merge + # We now reconstruct all processed uploads in one pass. assert should_perform_merge(state.get_upload_numbers()) merging = state.get_uploads_for_merging() - assert len(merging) == 10 # = MERGE_BATCH_SIZE + assert len(merging) == 11 state.mark_uploads_as_merged(merging) # but no notifications yet @@ -68,7 +68,7 @@ def test_batch_merging_many_uploads(): # with the last upload being processed, we do another merge, and then trigger notifications assert should_perform_merge(state.get_upload_numbers()) merging = state.get_uploads_for_merging() - assert len(merging) == 2 + assert len(merging) == 1 state.mark_uploads_as_merged(merging) assert should_trigger_postprocessing(state.get_upload_numbers()) diff --git a/apps/worker/tasks/stuck_uploads_cron.py b/apps/worker/tasks/stuck_uploads_cron.py new file mode 100644 index 0000000000..ead1bb6fe8 --- /dev/null +++ b/apps/worker/tasks/stuck_uploads_cron.py @@ -0,0 +1,91 @@ +import logging +from datetime import timedelta + +from sqlalchemy import func + +from app import celery_app +from celery_config import stuck_uploads_check_cron_task_name +from database.enums import ReportType +from database.models.core import Commit +from database.models.reports import CommitReport, Upload +from helpers.clock import get_utc_now +from shared.celery_config import upload_merger_task_name +from shared.helpers.redis import get_redis_connection +from shared.reports.enums import UploadState +from tasks.crontasks import CodecovCronTask +from tasks.upload_merger import MERGER_GATE_TTL, merger_gate_key + +log = logging.getLogger(__name__) + +STUCK_THRESHOLD_MINUTES = 15 + + +class StuckUploadsCheckTask(CodecovCronTask, name=stuck_uploads_check_cron_task_name): + @classmethod + def get_min_seconds_interval_between_executions(cls): + return 3300 # 55 minutes + + def run_cron_task(self, db_session, *args, **kwargs): + cutoff = get_utc_now() - timedelta(minutes=STUCK_THRESHOLD_MINUTES) + + stuck_commits = ( + db_session.query( + CommitReport.commit_id, + Commit.repoid, + Commit.commitid, + func.count(Upload.id_).label("stuck_count"), + ) + .join(CommitReport, Upload.report_id == CommitReport.id_) + .join(Commit, CommitReport.commit_id == Commit.id_) + .filter( + Upload.state_id == UploadState.PROCESSED.db_id, + Upload.updated_at <= cutoff, + (CommitReport.report_type == None) # noqa: E711 + | (CommitReport.report_type == ReportType.COVERAGE.value), + ) + .group_by(CommitReport.commit_id, Commit.repoid, Commit.commitid) + .all() + ) + + if not stuck_commits: + log.info("No stuck uploads found") + return {"stuck_commits": 0, "triggered": 0} + + redis = get_redis_connection() + triggered = 0 + + for row in stuck_commits: + log.error( + "Stuck uploads detected, triggering merger", + extra={ + "repoid": row.repoid, + "commitid": row.commitid, + "stuck_count": row.stuck_count, + "threshold_minutes": STUCK_THRESHOLD_MINUTES, + }, + ) + + gate_key = merger_gate_key(row.repoid, row.commitid) + if redis.set(gate_key, "1", nx=True, ex=MERGER_GATE_TTL): + self.app.tasks[upload_merger_task_name].apply_async( + kwargs={ + "repoid": row.repoid, + "commitid": row.commitid, + "commit_yaml": {}, + "trigger": "cron", + } + ) + triggered += 1 + + log.info( + "Stuck uploads check complete", + extra={ + "stuck_commits": len(stuck_commits), + "triggered": triggered, + }, + ) + return {"stuck_commits": len(stuck_commits), "triggered": triggered} + + +RegisteredStuckUploadsCheckTask = celery_app.register_task(StuckUploadsCheckTask()) +stuck_uploads_check_task = celery_app.tasks[RegisteredStuckUploadsCheckTask.name] diff --git a/apps/worker/tasks/tests/unit/test_upload_finisher_task.py b/apps/worker/tasks/tests/unit/test_upload_finisher_task.py index 308462e686..65cc245c0d 100644 --- a/apps/worker/tasks/tests/unit/test_upload_finisher_task.py +++ b/apps/worker/tasks/tests/unit/test_upload_finisher_task.py @@ -35,6 +35,7 @@ from shared.reports.enums import UploadState from shared.reports.resources import Report from shared.torngit.exceptions import TorngitObjectNotFoundError +from shared.upload.constants import UploadErrorCode from shared.yaml import UserYaml from tasks.upload_finisher import ( FINISHER_BASE_RETRY_COUNTDOWN_SECONDS, @@ -143,6 +144,34 @@ def test_mark_uploads_as_failed(dbsession): assert upload_2.errors[0].report_upload == upload_2 +def test_mark_uploads_as_failed_without_error_payload(dbsession): + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + report = CommitReport(commit_id=commit.id_) + dbsession.add(report) + dbsession.flush() + upload = UploadFactory.create(report=report, state="started", storage_path="url") + dbsession.add(upload) + dbsession.flush() + + results: list[ProcessingResult] = [ + { + "upload_id": upload.id, + "successful": False, + }, + ] + + update_uploads(dbsession, UserYaml({}), results, [], MergeResult({}, set())) + dbsession.expire_all() + + assert upload.state == "error" + assert len(upload.errors) == 1 + assert upload.errors[0].error_code == UploadErrorCode.UNKNOWN_PROCESSING.value + assert upload.errors[0].error_params == {} + assert upload.errors[0].report_upload == upload + + @pytest.mark.parametrize( "flag, joined", [("nightly", False), ("unittests", True), ("ui", True), ("other", True)], @@ -202,6 +231,11 @@ def test_upload_finisher_task_call( previous_results = [ {"upload_id": 0, "arguments": {"url": url}, "successful": True} ] + mocker.patch.object( + UploadFinisherTask, + "_reconstruct_processing_results", + return_value=previous_results, + ) _start_upload_flow(mocker) result = UploadFinisherTask().run_impl( @@ -1047,17 +1081,10 @@ def test_generic_exception_handling(self, dbsession, mocker, mock_self_app): ) @pytest.mark.django_db - def test_idempotency_check_skips_already_processed_uploads( + def test_finisher_reconstructs_even_if_previous_results_were_terminal( self, dbsession, mocker, mock_self_app ): - """Test that finisher skips work if all uploads are already in final state. - - This test validates the idempotency check that prevents wasted work when: - - Multiple finishers are triggered (e.g., visibility timeout re-queuing) - - Finisher is manually retried - - The check only skips when ALL uploads exist in DB and are in final states. - """ + """Finisher no longer short-circuits from callback payload terminal states.""" commit = CommitFactory.create() dbsession.add(commit) dbsession.flush() @@ -1066,21 +1093,50 @@ def test_idempotency_check_skips_already_processed_uploads( dbsession.add(report) dbsession.flush() - # Create uploads that are already in "processed" state + # Include "processed" for rolling deploy compatibility with older workers. upload_1 = UploadFactory.create(report=report, state="processed") - upload_2 = UploadFactory.create(report=report, state="error") + upload_2 = UploadFactory.create(report=report, state="merged") + upload_3 = UploadFactory.create(report=report, state="error") dbsession.add(upload_1) dbsession.add(upload_2) + dbsession.add(upload_3) dbsession.flush() - # Mock the _process_reports_with_lock to verify it's NOT called + # Mock the lock path and state reconstruction to verify normal execution path. mock_process = mocker.patch.object( - UploadFinisherTask, "_process_reports_with_lock" + UploadFinisherTask, + "_process_reports_with_lock", + return_value={ + "processing_results": [ + {"upload_id": upload_1.id, "successful": True, "arguments": {}}, + {"upload_id": upload_2.id, "successful": True, "arguments": {}}, + {"upload_id": upload_3.id, "successful": False, "arguments": {}}, + ], + "upload_ids": [upload_1.id, upload_2.id, upload_3.id], + "continuation_needed": False, + }, + ) + mocker.patch.object( + UploadFinisherTask, + "_reconstruct_processing_results", + return_value=[ + {"upload_id": upload_1.id, "successful": True, "arguments": {}}, + {"upload_id": upload_2.id, "successful": True, "arguments": {}}, + {"upload_id": upload_3.id, "successful": False, "arguments": {}}, + ], + ) + mocker.patch( + "tasks.upload_finisher.ProcessingState.count_remaining_coverage_uploads", + return_value=1, + ) + mock_handle_lock = mocker.patch.object( + UploadFinisherTask, "_handle_finisher_lock" ) previous_results = [ {"upload_id": upload_1.id, "successful": True, "arguments": {}}, - {"upload_id": upload_2.id, "successful": False, "arguments": {}}, + {"upload_id": upload_2.id, "successful": True, "arguments": {}}, + {"upload_id": upload_3.id, "successful": False, "arguments": {}}, ] result = UploadFinisherTask().run_impl( @@ -1091,14 +1147,13 @@ def test_idempotency_check_skips_already_processed_uploads( commit_yaml={}, ) - # Verify that the finisher skipped all work + # Verify that run_impl schedules sweep, instead of a payload-only idempotent skip. assert result == { - "already_completed": True, - "upload_ids": [upload_1.id, upload_2.id], + "sweep_scheduled": True, + "remaining_uploads": 1, } - - # Verify that _process_reports_with_lock was NOT called - mock_process.assert_not_called() + mock_process.assert_called_once() + mock_handle_lock.assert_not_called() @pytest.mark.django_db def test_idempotency_check_proceeds_when_uploads_not_finished( @@ -1249,6 +1304,45 @@ def test_reconstruct_processing_results_returns_empty_when_no_uploads_found( # Verify empty list returned when no uploads found assert result == [] + @pytest.mark.django_db + def test_run_impl_uses_reconstructed_results_beyond_callback_payload( + self, dbsession, mocker, mock_self_app + ): + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + + reconstructed = [ + {"upload_id": 101, "successful": True, "arguments": {}}, + {"upload_id": 202, "successful": True, "arguments": {}}, + ] + mocker.patch.object( + UploadFinisherTask, + "_reconstruct_processing_results", + return_value=reconstructed, + ) + mock_process = mocker.patch.object( + UploadFinisherTask, "_process_reports_with_lock" + ) + mock_handle_finisher_lock = mocker.patch.object( + UploadFinisherTask, + "_handle_finisher_lock", + return_value={"notifications_called": False}, + ) + + UploadFinisherTask().run_impl( + dbsession, + [{"upload_id": 101, "successful": True, "arguments": {}}], + repoid=commit.repoid, + commitid=commit.commitid, + commit_yaml={}, + ) + + mock_process.assert_called_once() + processing_results = mock_process.call_args.args[3] + assert {result["upload_id"] for result in processing_results} == {101, 202} + mock_handle_finisher_lock.assert_called_once() + @pytest.mark.django_db def test_coverage_notifications_not_blocked_by_test_results_uploads( self, @@ -1326,7 +1420,7 @@ def mock_process_reports( upload_ids, state, ): - # Call update_uploads to update the upload state to PROCESSED + # Call update_uploads to update the upload state to MERGED # This uses the same SQLAlchemy session that the query will use update_uploads( db_session, @@ -1346,11 +1440,11 @@ def mock_process_reports( .first() ) assert updated_upload is not None, "Upload should exist" - assert updated_upload.state == "processed", ( - f"Upload state should be 'processed', got '{updated_upload.state}'" + assert updated_upload.state == "merged", ( + f"Upload state should be 'merged', got '{updated_upload.state}'" ) - assert updated_upload.state_id == UploadState.PROCESSED.db_id, ( - f"Upload state_id should be {UploadState.PROCESSED.db_id}, got {updated_upload.state_id}" + assert updated_upload.state_id == UploadState.MERGED.db_id, ( + f"Upload state_id should be {UploadState.MERGED.db_id}, got {updated_upload.state_id}" ) mock_process = mocker.patch.object( diff --git a/apps/worker/tasks/upload.py b/apps/worker/tasks/upload.py index d91b6186b5..b0581c28c4 100644 --- a/apps/worker/tasks/upload.py +++ b/apps/worker/tasks/upload.py @@ -4,12 +4,13 @@ import uuid from copy import deepcopy from datetime import UTC, datetime +from types import SimpleNamespace from typing import Any, TypedDict import orjson import sentry_sdk from asgiref.sync import async_to_sync -from celery import chain, chord +from celery import chain from celery.exceptions import SoftTimeLimitExceeded from django.conf import settings from redis import Redis @@ -54,7 +55,6 @@ from tasks.bundle_analysis_processor import bundle_analysis_processor_task from tasks.test_results_finisher import test_results_finisher_task from tasks.test_results_processor import test_results_processor_task -from tasks.upload_finisher import upload_finisher_task from tasks.upload_processor import UPLOAD_PROCESSING_LOCK_NAME, upload_processor_task log = logging.getLogger(__name__) @@ -835,7 +835,9 @@ def _schedule_coverage_processing_task( ): self.maybe_log_upload_checkpoint(UploadFlow.INITIAL_PROCESSING_COMPLETE) - state = ProcessingState(commit.repoid, commit.commitid) + state = ProcessingState( + commit.repoid, commit.commitid, db_session=commit.get_db_session() + ) state.mark_uploads_as_processing( [int(upload["upload_id"]) for upload in argument_list] ) @@ -850,16 +852,12 @@ def _schedule_coverage_processing_task( for arguments in argument_list ] - finisher_kwargs = { - "repoid": commit.repoid, - "commitid": commit.commitid, - "commit_yaml": commit_yaml, - } - finisher_kwargs = UploadFlow.save_to_kwargs(finisher_kwargs) - finish_parallel_sig = upload_finisher_task.signature(kwargs=finisher_kwargs) - - parallel_tasks = chord(parallel_processing_tasks, finish_parallel_sig) - return parallel_tasks.apply_async() + scheduled_task_ids: list[str] = [] + for processor_task in parallel_processing_tasks: + result = processor_task.apply_async() + if result and result.id: + scheduled_task_ids.append(result.id) + return SimpleNamespace(as_tuple=lambda: tuple(scheduled_task_ids)) def _schedule_bundle_analysis_processing_task( self, diff --git a/apps/worker/tasks/upload_finisher.py b/apps/worker/tasks/upload_finisher.py index c536726d13..6b5b8ade0d 100644 --- a/apps/worker/tasks/upload_finisher.py +++ b/apps/worker/tasks/upload_finisher.py @@ -1,5 +1,7 @@ import logging import re +import time +from collections import namedtuple from datetime import UTC, datetime, timedelta from enum import Enum @@ -9,7 +11,7 @@ from app import celery_app from celery_config import notify_error_task_name -from database.enums import CommitErrorTypes, ReportType +from database.enums import CommitErrorTypes from database.models import Commit, Pull from database.models.core import GITHUB_APP_INSTALLATION_DEFAULT_NAME from database.models.reports import Upload @@ -19,6 +21,11 @@ from helpers.save_commit_error import save_commit_error from services.comparison import get_or_create_comparison from services.lock_manager import LockManager, LockRetry, LockType +from services.processing.finisher_gate import ( + FINISHER_GATE_TTL_SECONDS, + delete_finisher_gate, + refresh_finisher_gate_ttl, +) from services.processing.intermediate import ( cleanup_intermediate_reports, intermediate_report_key, @@ -43,8 +50,6 @@ from shared.django_apps.upload_breadcrumbs.models import Errors, Milestones from shared.helpers.cache import cache from shared.helpers.redis import get_redis_connection -from shared.metrics import Counter, inc_counter -from shared.reports.enums import UploadState from shared.reports.resources import Report from shared.timeseries.helpers import is_timeseries_enabled from shared.torngit.exceptions import TorngitError @@ -53,23 +58,55 @@ log = logging.getLogger(__name__) +RemainingUploads = namedtuple("RemainingUploads", ["uploaded", "processed"]) + FINISHER_BLOCKING_TIMEOUT_SECONDS = 30 FINISHER_BASE_RETRY_COUNTDOWN_SECONDS = 10 - -UPLOAD_FINISHER_ALREADY_COMPLETED_COUNTER = Counter( - "upload_finisher_already_completed", - "Number of times finisher skipped work because uploads were already in final state", -) +FINISHER_SWEEP_COUNTDOWN_SECONDS = 30 +FINISHER_WATCHDOG_DELAY_OFFSET_SECONDS = 30 +FINISHER_MERGE_TIME_BUDGET_SECONDS = 200 +FINISHER_CONTINUATION_BUFFER_SECONDS = 10 +FINISHER_UPLOAD_MERGE_BATCH_SIZE = 10 regexp_ci_skip = re.compile(r"\[(ci|skip| |-){3,}\]") +def count_unfinished_uploads(db_session, commit: Commit) -> RemainingUploads: + state = ProcessingState(commit.repoid, commit.commitid, db_session=db_session) + upload_numbers = state.get_upload_numbers() + return RemainingUploads( + uploaded=state.count_remaining_coverage_uploads(), + processed=upload_numbers.processed, + ) + + +def save_and_commit( + db_session, + report_service: ReportService, + commit: Commit, + report: Report, + merged_upload_ids: list[int], + diff: dict | None, +): + if diff: + report.apply_diff(diff) + report_service.save_report(commit, report) + db_session.commit() + cleanup_intermediate_reports(merged_upload_ids) + + class ShouldCallNotifyResult(Enum): DO_NOT_NOTIFY = "do_not_notify" NOTIFY_ERROR = "notify_error" NOTIFY = "notify" +class UploadFinisherFollowUpTaskType(Enum): + SWEEP = "sweep" + WATCHDOG = "watchdog" + CONTINUATION = "continuation" + + class UploadFinisherTask(BaseCodecovTask, name=upload_finisher_task_name): """This is the third task of the series of tasks designed to process an `upload` made by the user @@ -86,94 +123,48 @@ class UploadFinisherTask(BaseCodecovTask, name=upload_finisher_task_name): max_retries = UPLOAD_PROCESSOR_MAX_RETRIES - def _find_started_uploads_with_reports( - self, db_session, commit: Commit - ) -> set[int]: - """Find uploads in "started" state that have intermediate reports in Redis. - - This is the fallback when Redis ProcessingState has expired (TTL: PROCESSING_STATE_TTL). - We check the database for uploads that were processed but never finalized, - and verify they have intermediate reports before including them. - """ - # Query for uploads in "started" state for this commit - started_uploads = ( - db_session.query(Upload) - .join(Upload.report) - .filter( - Upload.report.has(commit=commit), - Upload.state == "started", - Upload.state_id == UploadState.UPLOADED.db_id, - ) - .all() - ) - - if not started_uploads: - return set() - - log.info( - "Found uploads in started state, checking for intermediate reports", - extra={ - "upload_ids": [u.id_ for u in started_uploads], - "count": len(started_uploads), + def _schedule_followup( + self, + repoid: int, + commitid: str, + commit_yaml: UserYaml, + followup_type: UploadFinisherFollowUpTaskType, + ): + refresh_finisher_gate_ttl(repoid, commitid) + default_countdowns = { + UploadFinisherFollowUpTaskType.SWEEP: FINISHER_SWEEP_COUNTDOWN_SECONDS, + UploadFinisherFollowUpTaskType.CONTINUATION: 0, + UploadFinisherFollowUpTaskType.WATCHDOG: min( + FINISHER_GATE_TTL_SECONDS - 60, + self.get_lock_timeout(DEFAULT_LOCK_TIMEOUT_SECONDS) + + FINISHER_WATCHDOG_DELAY_OFFSET_SECONDS + + 150, + ), + } + self.app.tasks[upload_finisher_task_name].apply_async( + kwargs={ + "commit_yaml": commit_yaml.to_dict(), + "commitid": commitid, + "repoid": repoid, + "trigger": followup_type.value, }, + countdown=default_countdowns[followup_type], ) - # Check which uploads have intermediate reports (confirms they were processed) - redis_connection = get_redis_connection() - upload_ids_with_reports = set() - - for upload in started_uploads: - report_key = intermediate_report_key(upload.id_) - if redis_connection.exists(report_key): - upload_ids_with_reports.add(upload.id_) - else: - log.warning( - "Upload in started state but no intermediate report found (may have expired)", - extra={"upload_id": upload.id_}, - ) - - return upload_ids_with_reports - def _reconstruct_processing_results( self, db_session, state: ProcessingState, commit: Commit ) -> list[ProcessingResult]: """Reconstruct processing_results from ProcessingState when finisher is triggered outside of a chord (e.g., from orphaned upload recovery). - This ensures ALL uploads that were marked as processed in Redis are included + This ensures all uploads marked as processed in state tracking are included in the final merged report, even if they completed via retry/recovery. - - If Redis state has expired (TTL: PROCESSING_STATE_TTL), falls back to database - to find uploads in "started" state that have intermediate reports, preventing data loss. """ - # Get all upload IDs that are ready to be merged (in "processed" set) + # Get all upload IDs that are ready to be merged up to MERGE_BATCH_SIZE upload_ids = state.get_uploads_for_merging() - if not upload_ids: - log.warning( - "No uploads found in Redis processed set, checking database for started uploads", - extra={"repoid": commit.repoid, "commitid": commit.commitid}, - ) - # Fallback: Redis state expired (TTL: PROCESSING_STATE_TTL), check DB for uploads - # in "started" state that might have been processed but never finalized - upload_ids = self._find_started_uploads_with_reports(db_session, commit) - - if not upload_ids: - log.warning( - "No started uploads with intermediate reports found in database", - extra={"repoid": commit.repoid, "commitid": commit.commitid}, - ) - return [] - - log.info( - "Found started uploads with intermediate reports (Redis state expired)", - extra={ - "upload_ids": list(upload_ids), - "count": len(upload_ids), - }, - ) - + return [] log.info( "Reconstructing processing results from ProcessingState", extra={"upload_ids": list(upload_ids), "count": len(upload_ids)}, @@ -184,7 +175,6 @@ def _reconstruct_processing_results( # Check which uploads have intermediate reports in Redis redis_connection = get_redis_connection() - processing_results = [] for upload in uploads: # Check if intermediate report exists (indicates successful processing) @@ -192,16 +182,15 @@ def _reconstruct_processing_results( has_report = redis_connection.exists(report_key) processing_result: ProcessingResult = { - "upload_id": upload.id_, "arguments": { "commit": commit.commitid, + "reportid": str(upload.report.external_id), "upload_id": upload.id_, "version": "v4", # Assume v4 for recovered uploads - "reportid": str(upload.report.external_id), }, "successful": bool(has_report), + "upload_id": upload.id_, } - if not has_report: log.warning( "Upload in processed set but no intermediate report found", @@ -211,7 +200,6 @@ def _reconstruct_processing_results( "code": "missing_intermediate_report", "params": {}, } - processing_results.append(processing_result) log.info( @@ -235,85 +223,38 @@ def run_impl( commit_yaml, **kwargs, ): - try: - UploadFlow.log(UploadFlow.BATCH_PROCESSING_COMPLETE) - except ValueError as e: - log.warning("CheckpointLogger failed to log/submit", extra={"error": e}) - milestone = Milestones.UPLOAD_COMPLETE - log.info( "Received upload_finisher task", - extra={ - "processing_results": processing_results, - }, + extra={"processing_results": processing_results}, ) repoid = int(repoid) commit_yaml = UserYaml(commit_yaml) - - log.info("run_impl: Getting commit") - commit = ( db_session.query(Commit) .filter(Commit.repoid == repoid, Commit.commitid == commitid) .first() ) assert commit, "Commit not found in database." - log.info("run_impl: Got commit") - state = ProcessingState(repoid, commitid) - - # If processing_results not provided (e.g., from orphaned upload recovery), - # reconstruct it from ProcessingState to ensure ALL uploads are included - if processing_results is None: - log.info( - "run_impl: processing_results not provided, reconstructing from ProcessingState" - ) - processing_results = self._reconstruct_processing_results( - db_session, state, commit - ) - log.info( - "run_impl: Reconstructed processing results", - extra={ - "upload_count": len(processing_results), - "upload_ids": [r["upload_id"] for r in processing_results], - }, - ) - + # TODO - tom - you should just check to see if any uploads are in existence, have the lock handle pulling uploads + state = ProcessingState(repoid, commitid, db_session=db_session) + processing_results = self._reconstruct_processing_results( + db_session, state, commit + ) upload_ids = [upload["upload_id"] for upload in processing_results] - - # Idempotency check: Skip if all uploads are already processed - # This prevents wasted work if multiple finishers are triggered (e.g., from - # visibility timeout re-queuing) or if finisher is manually retried - if upload_ids: - uploads_in_db = ( - db_session.query(Upload).filter(Upload.id_.in_(upload_ids)).all() - ) - # Only skip if ALL uploads exist in DB and ALL are in final states - if len(uploads_in_db) == len(upload_ids): - all_already_processed = all( - upload.state in ("processed", "error") for upload in uploads_in_db - ) - if all_already_processed: - log.info( - "All uploads already in final state, skipping finisher work", - extra={ - "upload_ids": upload_ids, - "states": [u.state for u in uploads_in_db], - }, - ) - inc_counter(UPLOAD_FINISHER_ALREADY_COMPLETED_COUNTER) - return { - "already_completed": True, - "upload_ids": upload_ids, - } + log.info( + "run_impl: Reconstructed processing results", + extra={ + "upload_count": len(upload_ids), + "upload_ids": upload_ids, + }, + ) try: - log.info("run_impl: Processing reports with lock") - - self._process_reports_with_lock( + merge_result = self._process_reports_with_lock( db_session, commit, commit_yaml, @@ -322,26 +263,39 @@ def run_impl( upload_ids, state, ) - - # Check if there are still unprocessed coverage uploads in the database - # Use DB as source of truth - if any coverage uploads are still in UPLOADED state, - # another finisher will process them and we shouldn't send notifications yet - remaining_uploads = ( - db_session.query(Upload) - .join(Upload.report) - .filter( - Upload.report.has(commit=commit), - Upload.report.has(report_type=ReportType.COVERAGE.value), - Upload.state_id == UploadState.UPLOADED.db_id, + if merge_result: + processing_results = merge_result.get( + "processing_results", processing_results ) - .count() + upload_ids = merge_result.get("upload_ids", upload_ids) + continuation_needed = bool( + merge_result and merge_result.get("continuation_needed") ) + if continuation_needed: + self._schedule_followup( + repoid, + commitid, + commit_yaml, + UploadFinisherFollowUpTaskType.CONTINUATION, + ) + return { + "continuation_scheduled": True, + "upload_ids": upload_ids, + } + + remaining_uploads = count_unfinished_uploads(db_session, commit).uploaded if remaining_uploads > 0: log.info( - "run_impl: Postprocessing should not be triggered - uploads still pending", + "run_impl: Scheduling sweep because uploads are still pending", extra={"remaining_uploads": remaining_uploads}, ) + self._schedule_followup( + repoid, + commitid, + commit_yaml, + UploadFinisherFollowUpTaskType.SWEEP, + ) UploadFlow.log(UploadFlow.PROCESSING_COMPLETE) UploadFlow.log(UploadFlow.SKIPPING_NOTIFICATION) self._call_upload_breadcrumb_task( @@ -350,11 +304,14 @@ def run_impl( milestone=milestone, upload_ids=upload_ids, ) - return + return { + "sweep_scheduled": True, + "remaining_uploads": remaining_uploads, + } log.info("run_impl: Handling finisher lock") - return self._handle_finisher_lock( + result = self._handle_finisher_lock( db_session, commit, commit_yaml, @@ -362,12 +319,17 @@ def run_impl( milestone, upload_ids, ) + # `_handle_finisher_lock` only returns `None` on terminal lock exhaustion. + # Clear gate in either terminal outcome to avoid stalling this commit. + delete_finisher_gate(repoid, commitid) + return result except Retry: raise except SoftTimeLimitExceeded: log.warning("run_impl: soft time limit exceeded") + delete_finisher_gate(repoid, commitid) self._call_upload_breadcrumb_task( commit_sha=commitid, repo_id=repoid, @@ -382,6 +344,7 @@ def run_impl( except Exception as e: log.exception("run_impl: unexpected error in upload finisher") + delete_finisher_gate(repoid, commitid) sentry_sdk.capture_exception(e) log.exception( "Unexpected error in upload finisher", @@ -411,10 +374,9 @@ def _process_reports_with_lock( state: ProcessingState, ): """Process reports with a lock to prevent concurrent modifications.""" - diff = load_commit_diff(commit, self.name) repoid = commit.repoid commitid = commit.commitid - + diff = load_commit_diff(commit, self.name) log.info("run_impl: Loaded commit diff") lock_manager = LockManager( @@ -432,34 +394,62 @@ def _process_reports_with_lock( ): db_session.refresh(commit) report_service = ReportService(commit_yaml) + merge_start_time = time.monotonic() + merged_processing_results: list[ProcessingResult] = [] + merged_upload_ids: list[int] = [] + + while True: + elapsed = time.monotonic() - merge_start_time + if ( + elapsed + >= FINISHER_MERGE_TIME_BUDGET_SECONDS + - FINISHER_CONTINUATION_BUFFER_SECONDS + ): + remaining = count_unfinished_uploads(db_session, commit) + return { + "continuation_needed": remaining.processed > 0, + "processing_results": merged_processing_results, + "upload_ids": merged_upload_ids, + } + + processing_results = self._reconstruct_processing_results( + db_session, state, commit + ) + if not processing_results: + break + current_batch = processing_results[ + :FINISHER_UPLOAD_MERGE_BATCH_SIZE + ] + pending_upload_ids = [ + upload["upload_id"] for upload in current_batch + ] - log.info("run_impl: Performing report merging") - - report = perform_report_merging( - report_service, commit_yaml, commit, processing_results - ) - - log.info( - "run_impl: Saving combined report", - extra={"processing_results": processing_results}, - ) - - if diff: - log.info("run_impl: Applying diff to report") - report.apply_diff(diff) - - log.info("run_impl: Saving report") - report_service.save_report(commit, report) - - db_session.commit() - - log.info("run_impl: Marking uploads as merged") - state.mark_uploads_as_merged(upload_ids) + log.info("run_impl: Performing report merging") + report = perform_report_merging( + report_service, commit_yaml, commit, current_batch + ) + log.info( + "run_impl: Saving combined report", + extra={"processing_results": current_batch}, + ) + save_and_commit( + db_session, + report_service, + commit, + report, + pending_upload_ids, + diff, + ) - log.info("run_impl: Cleaning up intermediate reports") - cleanup_intermediate_reports(upload_ids) + merged_processing_results.extend(current_batch) + merged_upload_ids.extend(pending_upload_ids) - log.info("run_impl: Finished upload_finisher task") + remaining = count_unfinished_uploads(db_session, commit) + return { + "processing_results": merged_processing_results, + "upload_ids": merged_upload_ids, + "continuation_needed": remaining.processed > 0, + } except LockRetry as retry: self._call_upload_breadcrumb_task( @@ -469,6 +459,7 @@ def _process_reports_with_lock( upload_ids=upload_ids, error=Errors.INTERNAL_LOCK_ERROR, ) + UploadFlow.log(UploadFlow.FINISHER_LOCK_ERROR) if retry.max_retries_exceeded or self._has_exceeded_max_attempts( UPLOAD_PROCESSOR_MAX_RETRIES ): @@ -498,7 +489,7 @@ def _process_reports_with_lock( upload_ids=upload_ids, error=Errors.INTERNAL_RETRYING, ) - self.retry( + return self.retry( max_retries=UPLOAD_PROCESSOR_MAX_RETRIES, countdown=retry.countdown ) @@ -589,7 +580,6 @@ def _handle_finisher_lock( upload_ids=upload_ids, error=Errors.INTERNAL_LOCK_ERROR, ) - UploadFlow.log(UploadFlow.FINISHER_LOCK_ERROR) if retry.max_retries_exceeded or self._has_exceeded_max_attempts( UPLOAD_PROCESSOR_MAX_RETRIES ): @@ -619,7 +609,7 @@ def _handle_finisher_lock( upload_ids=upload_ids, error=Errors.INTERNAL_RETRYING, ) - self.retry( + return self.retry( max_retries=UPLOAD_PROCESSOR_MAX_RETRIES, countdown=retry.countdown ) @@ -762,16 +752,10 @@ def should_call_notifications( # Check if there are still pending uploads in the database # Use DB as source of truth for upload completion status if db_session: - remaining_uploads = ( - db_session.query(Upload) - .join(Upload.report) - .filter( - Upload.report.has(commit=commit), - Upload.report.has(report_type=ReportType.COVERAGE.value), - Upload.state_id == UploadState.UPLOADED.db_id, - ) - .count() + state = ProcessingState( + commit.repoid, commit.commitid, db_session=db_session ) + remaining_uploads = state.count_remaining_coverage_uploads() if remaining_uploads > 0: log.info( "Not scheduling notify because there are still pending uploads", diff --git a/apps/worker/tasks/upload_merger.py b/apps/worker/tasks/upload_merger.py new file mode 100644 index 0000000000..da93eee2f5 --- /dev/null +++ b/apps/worker/tasks/upload_merger.py @@ -0,0 +1,690 @@ +import logging +import re +import time +from collections import namedtuple +from datetime import UTC, datetime, timedelta +from enum import Enum + +import sentry_sdk +from asgiref.sync import async_to_sync +from celery.exceptions import SoftTimeLimitExceeded +from sqlalchemy import func + +from app import celery_app +from celery_config import notify_error_task_name +from database.enums import CommitErrorTypes, ReportType +from database.models import Commit, Pull +from database.models.core import GITHUB_APP_INSTALLATION_DEFAULT_NAME +from database.models.reports import CommitReport, Upload +from helpers.checkpoint_logger.flows import UploadFlow +from helpers.exceptions import RepositoryWithoutValidBotError +from helpers.github_installation import get_installation_name_for_owner_for_task +from helpers.save_commit_error import save_commit_error +from services.comparison import get_or_create_comparison +from services.lock_manager import LockManager, LockRetry, LockType +from services.processing.intermediate import ( + cleanup_intermediate_reports, + load_intermediate_reports, +) +from services.processing.merging import merge_reports, update_uploads +from services.processing.types import ProcessingResult +from services.report import ReportService +from services.repository import get_repo_provider_service +from services.timeseries import repository_datasets_query +from services.yaml import read_yaml_field +from shared.celery_config import ( + DEFAULT_LOCK_TIMEOUT_SECONDS, + compute_comparison_task_name, + notify_task_name, + pulls_task_name, + timeseries_save_commit_measurements_task_name, + upload_merger_task_name, +) +from shared.helpers.cache import cache +from shared.helpers.redis import get_redis_connection +from shared.metrics import Counter, Histogram +from shared.reports.enums import UploadState +from shared.reports.resources import Report +from shared.timeseries.helpers import is_timeseries_enabled +from shared.torngit.exceptions import TorngitError +from shared.yaml import UserYaml +from tasks.base import BaseCodecovTask + +log = logging.getLogger(__name__) + +# --- Constants --- + +MERGE_BATCH_SIZE = 10 +MERGE_TIME_BUDGET_SECONDS = 200 +MERGER_GATE_KEY_PREFIX = "upload_merger_lock" +MERGER_GATE_TTL = 900 + +regexp_ci_skip = re.compile(r"\[(ci|skip| |-){3,}\]") + +# --- Prometheus Metrics --- + +UPLOAD_E2E_DURATION = Histogram( + "upload_pipeline_e2e_duration_seconds", + "Duration from oldest upload created_at to notify triggered.", + ["path", "trigger"], + buckets=[5, 15, 30, 60, 120, 300, 600, 1200, 3600], +) + +UPLOAD_MERGE_DURATION = Histogram( + "upload_merge_duration_seconds", + "Wall clock time of the merge phase.", + ["path", "trigger"], + buckets=[1, 5, 10, 30, 60, 120, 300, 600], +) + +UPLOAD_MERGE_COUNT = Histogram( + "upload_merge_uploads_total", + "Number of uploads merged in one merger run.", + ["path", "trigger"], + buckets=[1, 5, 10, 25, 50, 100, 200, 500], +) + +UPLOAD_MERGE_RESULT = Counter( + "upload_merge_result_total", + "Outcome of merge task.", + ["path", "outcome", "trigger"], +) + +# --- Helper types --- + +RemainingUploads = namedtuple("RemainingUploads", ["uploaded", "processed"]) + + +class ShouldCallNotifyResult(Enum): + DO_NOT_NOTIFY = "do_not_notify" + NOTIFY_ERROR = "notify_error" + NOTIFY = "notify" + + +# --- Key helpers --- + + +def merger_gate_key(repoid: int, commitid: str) -> str: + return f"{MERGER_GATE_KEY_PREFIX}_{repoid}_{commitid}" + + +def delete_merger_gate(redis, repoid: int, commitid: str): + redis.delete(merger_gate_key(repoid, commitid)) + + +# --- Scheduling helpers --- + + +def schedule_continuation(task, repoid, commitid, commit_yaml): + task.app.tasks[upload_merger_task_name].apply_async( + kwargs={ + "repoid": repoid, + "commitid": commitid, + "commit_yaml": commit_yaml.to_dict(), + "trigger": "continuation", + } + ) + + +def schedule_sweep(task, repoid, commitid, commit_yaml, countdown=30): + task.app.tasks[upload_merger_task_name].apply_async( + kwargs={ + "repoid": repoid, + "commitid": commitid, + "commit_yaml": commit_yaml.to_dict(), + "trigger": "sweep", + }, + countdown=countdown, + ) + + +def schedule_watchdog(task, repoid, commitid, commit_yaml, countdown): + task.app.tasks[upload_merger_task_name].apply_async( + kwargs={ + "repoid": repoid, + "commitid": commitid, + "commit_yaml": commit_yaml.to_dict(), + "trigger": "watchdog", + }, + countdown=countdown, + ) + + +# --- DB query helpers --- + + +def get_processed_uploads(db_session, commit: Commit, limit: int) -> list[Upload]: + return ( + db_session.query(Upload) + .join(CommitReport, Upload.report_id == CommitReport.id_) + .filter( + CommitReport.commit_id == commit.id_, + (CommitReport.report_type == None) # noqa: E711 + | (CommitReport.report_type == ReportType.COVERAGE.value), + Upload.state_id == UploadState.PROCESSED.db_id, + ) + .limit(limit) + .all() + ) + + +def count_unfinished_uploads(db_session, commit: Commit) -> RemainingUploads: + row = ( + db_session.query( + func.count(Upload.id_).filter( + Upload.state_id == UploadState.UPLOADED.db_id + ), + func.count(Upload.id_).filter( + Upload.state_id == UploadState.PROCESSED.db_id + ), + ) + .join(CommitReport, Upload.report_id == CommitReport.id_) + .filter( + CommitReport.commit_id == commit.id_, + (CommitReport.report_type == None) # noqa: E711 + | (CommitReport.report_type == ReportType.COVERAGE.value), + ) + .one() + ) + return RemainingUploads(uploaded=row[0], processed=row[1]) + + +def get_oldest_upload_created_at(db_session, commit: Commit) -> datetime | None: + result = ( + db_session.query(func.min(Upload.created_at)) + .join(CommitReport, Upload.report_id == CommitReport.id_) + .filter( + CommitReport.commit_id == commit.id_, + (CommitReport.report_type == None) # noqa: E711 + | (CommitReport.report_type == ReportType.COVERAGE.value), + ) + .scalar() + ) + return result + + +# --- Merge helpers --- + + +def merge_batch( + db_session, + report_service: ReportService, + master_report: Report, + commit: Commit, + commit_yaml: UserYaml, + batch: list[Upload], + diff: dict | None, +) -> tuple[Report, list[ProcessingResult]]: + """Load intermediate reports for a batch, merge into master report, + and mark uploads as MERGED in the session (not yet committed).""" + upload_ids = [u.id_ for u in batch] + intermediate_reports = load_intermediate_reports(upload_ids) + + processing_results: list[ProcessingResult] = [] + for upload in batch: + has_report = any( + ir.upload_id == upload.id_ and not ir.report.is_empty() + for ir in intermediate_reports + ) + processing_results.append( + { + "upload_id": upload.id_, + "arguments": { + "commit": commit.commitid, + "upload_id": upload.id_, + "version": "v4", + "reportid": str(upload.report.external_id), + }, + "successful": has_report, + } + ) + + master_report, merge_result = merge_reports( + commit_yaml, master_report, intermediate_reports + ) + + update_uploads( + db_session, + commit_yaml, + processing_results, + intermediate_reports, + merge_result, + ) + + return master_report, processing_results + + +def save_and_commit( + db_session, + report_service: ReportService, + commit: Commit, + master_report: Report, + merged_upload_ids: list[int], + diff: dict | None, +): + """Persist the master report and commit staged upload updates.""" + if diff: + master_report.apply_diff(diff) + + report_service.save_report(commit, master_report) + db_session.commit() + cleanup_intermediate_reports(merged_upload_ids) + + +# --- Cache invalidation --- + + +def invalidate_caches(redis_connection, commit: Commit): + redis_connection.delete(f"cache/{commit.repoid}/tree/{commit.branch}") + redis_connection.delete(f"cache/{commit.repoid}/tree/{commit.commitid}") + repository = commit.repository + key = ":".join((repository.service, repository.author.username, repository.name)) + if commit.branch: + redis_connection.hdel("badge", (f"{key}:{commit.branch}").lower()) + if commit.branch == repository.branch: + redis_connection.hdel("badge", (f"{key}:").lower()) + + +# --- Post-processing --- + + +def should_call_notifications( + commit: Commit, + commit_yaml: UserYaml, + all_processing_results: list[ProcessingResult], + db_session, +) -> ShouldCallNotifyResult: + remaining_uploads = ( + db_session.query(Upload) + .join(CommitReport, Upload.report_id == CommitReport.id_) + .filter( + CommitReport.commit_id == commit.id_, + (CommitReport.report_type == None) # noqa: E711 + | (CommitReport.report_type == ReportType.COVERAGE.value), + Upload.state_id == UploadState.UPLOADED.db_id, + ) + .count() + ) + if remaining_uploads > 0: + return ShouldCallNotifyResult.DO_NOT_NOTIFY + + manual_trigger = read_yaml_field( + commit_yaml, ("codecov", "notify", "manual_trigger") + ) + if manual_trigger: + return ShouldCallNotifyResult.DO_NOT_NOTIFY + + after_n_builds = ( + read_yaml_field(commit_yaml, ("codecov", "notify", "after_n_builds")) or 0 + ) + if after_n_builds > 0: + report = ReportService(commit_yaml).get_existing_report_for_commit(commit) + number_sessions = len(report.sessions) if report is not None else 0 + if after_n_builds > number_sessions: + return ShouldCallNotifyResult.DO_NOT_NOTIFY + + processing_successes = [x["successful"] for x in all_processing_results] + + if read_yaml_field( + commit_yaml, + ("codecov", "notify", "notify_error"), + _else=False, + ): + if len(processing_successes) == 0 or not all(processing_successes): + return ShouldCallNotifyResult.NOTIFY_ERROR + else: + if not any(processing_successes): + return ShouldCallNotifyResult.DO_NOT_NOTIFY + + return ShouldCallNotifyResult.NOTIFY + + +def post_process( + db_session, + commit: Commit, + commit_yaml: UserYaml, + task, + all_processing_results: list[ProcessingResult], +) -> dict: + """Run notifications, repo update, timeseries, and PR comparison.""" + repoid = commit.repoid + commitid = commit.commitid + repository = commit.repository + + notifications_called = False + if not regexp_ci_skip.search(commit.message or ""): + notify_result = should_call_notifications( + commit, commit_yaml, all_processing_results, db_session + ) + match notify_result: + case ShouldCallNotifyResult.NOTIFY: + notifications_called = True + notify_kwargs = { + "repoid": repoid, + "commitid": commitid, + "current_yaml": commit_yaml.to_dict(), + } + notify_kwargs = UploadFlow.save_to_kwargs(notify_kwargs) + task.app.tasks[notify_task_name].apply_async(kwargs=notify_kwargs) + + if commit.pullid: + pull = ( + db_session.query(Pull) + .filter_by(repoid=commit.repoid, pullid=commit.pullid) + .first() + ) + if pull: + head = pull.get_head_commit() + if head is None or head.timestamp <= commit.timestamp: + pull.head = commit.commitid + if pull.head == commit.commitid: + db_session.commit() + task.app.tasks[pulls_task_name].apply_async( + kwargs={ + "repoid": repoid, + "pullid": pull.pullid, + "should_send_notifications": False, + } + ) + compared_to = pull.get_comparedto_commit() + if compared_to: + comparison = get_or_create_comparison( + db_session, compared_to, commit + ) + db_session.commit() + task.app.tasks[ + compute_comparison_task_name + ].apply_async(kwargs={"comparison_id": comparison.id}) + case ShouldCallNotifyResult.NOTIFY_ERROR: + notify_error_kwargs = { + "repoid": repoid, + "commitid": commitid, + "current_yaml": commit_yaml.to_dict(), + } + notify_error_kwargs = UploadFlow.save_to_kwargs(notify_error_kwargs) + task.app.tasks[notify_error_task_name].apply_async( + kwargs=notify_error_kwargs + ) + case ShouldCallNotifyResult.DO_NOT_NOTIFY: + pass + else: + commit.state = "skipped" + + if is_timeseries_enabled(): + dataset_names = [ + dataset.name for dataset in repository_datasets_query(repository) + ] + if dataset_names: + task.app.tasks[timeseries_save_commit_measurements_task_name].apply_async( + kwargs={ + "commitid": commitid, + "repoid": repoid, + "dataset_names": dataset_names, + } + ) + + now = datetime.now(tz=UTC) + threshold = now - timedelta(minutes=60) + if not repository.updatestamp or repository.updatestamp < threshold: + repository.updatestamp = now + db_session.commit() + + UploadFlow.log(UploadFlow.PROCESSING_COMPLETE) + if not notifications_called: + UploadFlow.log(UploadFlow.SKIPPING_NOTIFICATION) + + return {"notifications_called": notifications_called} + + +# --- Diff loading (reused from finisher) --- + + +@sentry_sdk.trace +@cache.cache_function(ttl=60 * 60) +def load_commit_diff(commit: Commit, task_name: str | None = None) -> dict | None: + repository = commit.repository + commitid = commit.commitid + try: + installation_name_to_use = ( + get_installation_name_for_owner_for_task(task_name, repository.author) + if task_name + else GITHUB_APP_INSTALLATION_DEFAULT_NAME + ) + repository_service = get_repo_provider_service( + repository, installation_name_to_use=installation_name_to_use + ) + return async_to_sync(repository_service.get_commit_diff)(commitid) + except TorngitError: + log.warning( + "Could not apply diff to report because there was an error fetching diff from provider", + exc_info=True, + ) + except RepositoryWithoutValidBotError: + save_commit_error( + commit, + error_code=CommitErrorTypes.REPO_BOT_INVALID.value, + ) + log.warning( + "Could not apply diff to report because there is no valid bot found for that repo", + exc_info=True, + ) + return None + + +# === Main Task === + + +class UploadMergerTask(BaseCodecovTask, name=upload_merger_task_name): + """Merge processed uploads into the master report. + + Replaces the chord-based upload_finisher with a single-task-per-commit + model. Features: time-boxed batch merging, self-scheduling continuations, + sweep for late-arriving uploads, watchdog-first crash recovery. + """ + + def run_impl( + self, + db_session, + *, + repoid: int, + commitid: str, + commit_yaml, + trigger: str = "processor", + **kwargs, + ): + repoid = int(repoid) + commit_yaml = UserYaml(commit_yaml) + redis = get_redis_connection() + + # --- Gate key check: terminate watchdog chain if work is done --- + if not redis.exists(merger_gate_key(repoid, commitid)): + UPLOAD_MERGE_RESULT.labels( + path="merger", outcome="nothing_to_do", trigger=trigger + ).inc() + return {"nothing_to_do": True} + + commit = ( + db_session.query(Commit) + .filter(Commit.repoid == repoid, Commit.commitid == commitid) + .first() + ) + assert commit, "Commit not found in database." + + lock_manager = LockManager( + repoid=repoid, + commitid=commitid, + lock_timeout=self.get_lock_timeout(DEFAULT_LOCK_TIMEOUT_SECONDS), + blocking_timeout=5, + ) + + # --- Watchdog-first: schedule safety net BEFORE crash-prone work --- + watchdog_countdown = self.get_lock_timeout(DEFAULT_LOCK_TIMEOUT_SECONDS) + 30 + schedule_watchdog( + self, repoid, commitid, commit_yaml, countdown=watchdog_countdown + ) + + try: + with lock_manager.locked(LockType.UPLOAD_PROCESSING): + return self._merge_under_lock( + db_session, commit, commit_yaml, trigger, lock_manager + ) + except LockRetry: + UPLOAD_MERGE_RESULT.labels( + path="merger", outcome="lock_retry", trigger=trigger + ).inc() + return {"lock_acquired": False} + except SoftTimeLimitExceeded: + log.warning( + "Merger soft time limit exceeded", + extra={"repoid": repoid, "commitid": commitid, "trigger": trigger}, + ) + UPLOAD_MERGE_RESULT.labels( + path="merger", outcome="error", trigger=trigger + ).inc() + return {"error": "Soft time limit exceeded"} + + def _merge_under_lock( + self, + db_session, + commit: Commit, + commit_yaml: UserYaml, + trigger: str, + lock_manager: LockManager, + ): + repoid = commit.repoid + commitid = commit.commitid + + db_session.refresh(commit) + report_service = ReportService(commit_yaml) + master_report = report_service.get_existing_report_for_commit(commit) + if master_report is None: + master_report = Report() + diff = load_commit_diff(commit, self.name) + + merge_start = time.monotonic() + uploads_merged = 0 + merged_upload_ids: list[int] = [] + all_processing_results: list[ProcessingResult] = [] + + # --- Time-boxed merge loop --- + while time.monotonic() - merge_start < MERGE_TIME_BUDGET_SECONDS: + batch = get_processed_uploads(db_session, commit, MERGE_BATCH_SIZE) + if not batch: + break + master_report, batch_results = merge_batch( + db_session, + report_service, + master_report, + commit, + commit_yaml, + batch, + diff, + ) + all_processing_results.extend(batch_results) + merged_upload_ids.extend(u.id_ for u in batch) + uploads_merged += len(batch) + else: + # Budget exhausted, more work may remain + save_and_commit( + db_session, + report_service, + commit, + master_report, + merged_upload_ids, + diff, + ) + UPLOAD_MERGE_DURATION.labels(path="merger", trigger=trigger).observe( + time.monotonic() - merge_start + ) + UPLOAD_MERGE_COUNT.labels(path="merger", trigger=trigger).observe( + uploads_merged + ) + UPLOAD_MERGE_RESULT.labels( + path="merger", outcome="continuation", trigger=trigger + ).inc() + schedule_continuation(self, repoid, commitid, commit_yaml) + return {"continuation_scheduled": True, "uploads_merged": uploads_merged} + + merge_elapsed = time.monotonic() - merge_start + UPLOAD_MERGE_DURATION.labels(path="merger", trigger=trigger).observe( + merge_elapsed + ) + UPLOAD_MERGE_COUNT.labels(path="merger", trigger=trigger).observe( + uploads_merged + ) + + # --- Early exit: nothing was merged --- + if uploads_merged == 0: + remaining = count_unfinished_uploads(db_session, commit) + if remaining.uploaded > 0: + UPLOAD_MERGE_RESULT.labels( + path="merger", outcome="sweep", trigger=trigger + ).inc() + schedule_sweep(self, repoid, commitid, commit_yaml, countdown=30) + return {"sweep_scheduled": True} + delete_merger_gate(lock_manager.redis_connection, repoid, commitid) + UPLOAD_MERGE_RESULT.labels( + path="merger", outcome="nothing_to_do", trigger=trigger + ).inc() + return {"nothing_to_do": True} + + # --- Save report + commit DB + cleanup intermediates --- + save_and_commit( + db_session, + report_service, + commit, + master_report, + merged_upload_ids, + diff, + ) + + # --- Check remaining work --- + remaining = count_unfinished_uploads(db_session, commit) + + if remaining.uploaded > 0: + UPLOAD_MERGE_RESULT.labels( + path="merger", outcome="sweep", trigger=trigger + ).inc() + schedule_sweep(self, repoid, commitid, commit_yaml, countdown=30) + return {"sweep_scheduled": True, "uploads_merged": uploads_merged} + + if remaining.processed > 0: + UPLOAD_MERGE_RESULT.labels( + path="merger", outcome="continuation", trigger=trigger + ).inc() + schedule_continuation(self, repoid, commitid, commit_yaml) + return {"continuation_scheduled": True, "uploads_merged": uploads_merged} + + # --- All done: post-process --- + report_totals = master_report.totals.asdict() if master_report.totals else {} + log.info( + "Merge complete", + extra={ + "repoid": repoid, + "commitid": commitid, + "report_totals": report_totals, + "uploads_merged": uploads_merged, + }, + ) + + result = post_process( + db_session, commit, commit_yaml, self, all_processing_results + ) + + oldest_upload = get_oldest_upload_created_at(db_session, commit) + if oldest_upload: + e2e = ( + datetime.now(tz=UTC) - oldest_upload.replace(tzinfo=UTC) + ).total_seconds() + UPLOAD_E2E_DURATION.labels(path="merger", trigger=trigger).observe(e2e) + + invalidate_caches(lock_manager.redis_connection, commit) + delete_merger_gate(lock_manager.redis_connection, repoid, commitid) + UPLOAD_MERGE_RESULT.labels( + path="merger", outcome="completed", trigger=trigger + ).inc() + return {**result, "uploads_merged": uploads_merged} + + +RegisteredUploadMergerTask = celery_app.register_task(UploadMergerTask()) +upload_merger_task = celery_app.tasks[RegisteredUploadMergerTask.name]