diff --git a/apps/worker/services/processing/processing.py b/apps/worker/services/processing/processing.py index 4d79c1b48..d80c819b0 100644 --- a/apps/worker/services/processing/processing.py +++ b/apps/worker/services/processing/processing.py @@ -5,20 +5,32 @@ from celery.exceptions import CeleryError from sqlalchemy.orm import Session as DbSession +from app import celery_app from database.models.core import Commit from database.models.reports import Upload +from helpers.checkpoint_logger import _kwargs_key +from helpers.checkpoint_logger.flows import UploadFlow from helpers.reports import delete_archive_setting from services.report import ProcessingError, RawReportInfo, ReportService from services.report.parser.types import VersionOneParsedRawReport from shared.api_archive.archive import ArchiveService +from shared.celery_config import upload_finisher_task_name +from shared.helpers.redis import get_redis_connection from shared.yaml import UserYaml from .intermediate import save_intermediate_report -from .state import ProcessingState +from .state import ProcessingState, should_perform_merge from .types import ProcessingResult, UploadArguments log = logging.getLogger(__name__) +FINISHER_GATE_KEY_PREFIX = "upload_merger_lock" +FINISHER_GATE_TTL_SECONDS = 900 + + +def finisher_gate_key(repo_id: int, commit_sha: str) -> str: + return f"{FINISHER_GATE_KEY_PREFIX}_{repo_id}_{commit_sha}" + @sentry_sdk.trace def process_upload( @@ -69,6 +81,34 @@ def process_upload( save_intermediate_report(upload_id, processing_result.report) state.mark_upload_as_processed(upload_id) + upload_numbers = state.get_upload_numbers() + if should_perform_merge(upload_numbers): + gate_key = finisher_gate_key(repo_id, commit_sha) + redis = get_redis_connection() + if redis.set(gate_key, "1", nx=True, ex=FINISHER_GATE_TTL_SECONDS): + log.info( + "Enqueuing upload finisher via gate", + extra={ + "repo_id": repo_id, + "commit_sha": commit_sha, + "upload_id": upload_id, + "gate_key": gate_key, + }, + ) + finisher_kwargs = { + "repoid": repo_id, + "commitid": commit_sha, + "commit_yaml": commit_yaml.to_dict(), + } + checkpoint_kwargs_key = _kwargs_key(UploadFlow) + if checkpoint_kwargs_key in arguments: + finisher_kwargs[checkpoint_kwargs_key] = arguments[ + checkpoint_kwargs_key + ] + celery_app.tasks[upload_finisher_task_name].apply_async( + kwargs=finisher_kwargs + ) + rewrite_or_delete_upload(archive_service, commit_yaml, report_info) except CeleryError: diff --git a/apps/worker/services/tests/test_processing.py b/apps/worker/services/tests/test_processing.py index 6ba1bf444..04b5b4996 100644 --- a/apps/worker/services/tests/test_processing.py +++ b/apps/worker/services/tests/test_processing.py @@ -13,8 +13,10 @@ @pytest.mark.django_db(databases={"default"}) -class TestProcessUpload: - def test_processor_does_not_enqueue_finisher(self, dbsession, mocker, mock_storage): +class TestProcessUploadFinisherGate: + def test_triggers_finisher_when_gate_is_acquired( + self, dbsession, mocker, mock_storage + ): # Setup repository = RepositoryFactory.create() commit = CommitFactory.create(repository=repository) @@ -54,6 +56,92 @@ def test_processor_does_not_enqueue_finisher(self, dbsession, mocker, mock_stora return_value=mock_state_instance, ) + # Mock celery app to capture finisher task scheduling + mock_celery_app = mocker.patch("services.processing.processing.celery_app") + mock_finisher_task = MagicMock() + mock_celery_app.tasks = {"app.tasks.upload.UploadFinisher": mock_finisher_task} + mock_redis = mocker.patch("services.processing.processing.get_redis_connection") + mock_redis.return_value.set.return_value = True + + # Mock other dependencies + mocker.patch("services.processing.processing.save_intermediate_report") + mocker.patch("services.processing.processing.rewrite_or_delete_upload") + + commit_yaml = UserYaml({}) + + # Execute + result = process_upload( + on_processing_error=lambda error: None, + db_session=dbsession, + repo_id=repository.repoid, + commit_sha=commit.commitid, + commit_yaml=commit_yaml, + arguments=arguments, + ) + + # Verify + assert result["successful"] is True + assert result["upload_id"] == upload.id_ + + # Verify finisher was triggered + mock_finisher_task.apply_async.assert_called_once_with( + kwargs={ + "repoid": repository.repoid, + "commitid": commit.commitid, + "commit_yaml": commit_yaml.to_dict(), + } + ) + mock_redis.return_value.set.assert_called_once() + + def test_does_not_trigger_finisher_when_gate_exists( + self, dbsession, mocker, mock_storage + ): + # Setup + repository = RepositoryFactory.create() + commit = CommitFactory.create(repository=repository) + upload = UploadFactory.create( + report__commit=commit, + state="started", + ) + dbsession.add_all([repository, commit, upload]) + dbsession.flush() + + arguments: UploadArguments = { + "commit": commit.commitid, + "upload_id": upload.id_, + "version": "v4", + "reportid": str(upload.report.external_id), + } + + # Mock dependencies + mock_report_service = mocker.patch( + "services.processing.processing.ReportService" + ) + mock_processing_result = MagicMock() + mock_processing_result.error = None + mock_processing_result.report = None + mock_report_service.return_value.build_report_from_raw_content.return_value = ( + mock_processing_result + ) + + # Mock ProcessingState to indicate other uploads still processing + mock_state_instance = MagicMock() + mock_state_instance.get_upload_numbers.return_value = MagicMock( + processing=2, + processed=1, # Other uploads still in progress + ) + mocker.patch( + "services.processing.processing.ProcessingState", + return_value=mock_state_instance, + ) + + # Mock celery app to capture finisher task scheduling + mock_celery_app = mocker.patch("services.processing.processing.celery_app") + mock_finisher_task = MagicMock() + mock_celery_app.tasks = {"app.tasks.upload.UploadFinisher": mock_finisher_task} + mock_redis = mocker.patch("services.processing.processing.get_redis_connection") + mock_redis.return_value.set.return_value = False + # Mock other dependencies mocker.patch("services.processing.processing.save_intermediate_report") mocker.patch("services.processing.processing.rewrite_or_delete_upload") @@ -74,4 +162,6 @@ def test_processor_does_not_enqueue_finisher(self, dbsession, mocker, mock_stora assert result["successful"] is True assert result["upload_id"] == upload.id_ - # Finisher enqueue is handled elsewhere; processor no longer triggers it directly. + # Verify finisher was NOT triggered. + mock_finisher_task.apply_async.assert_not_called() + mock_redis.return_value.set.assert_not_called() diff --git a/apps/worker/tasks/tests/unit/test_upload_finisher_task.py b/apps/worker/tasks/tests/unit/test_upload_finisher_task.py index 0a096440c..ede69a5cd 100644 --- a/apps/worker/tasks/tests/unit/test_upload_finisher_task.py +++ b/apps/worker/tasks/tests/unit/test_upload_finisher_task.py @@ -40,6 +40,7 @@ from tasks.upload_finisher import ( FINISHER_BASE_RETRY_COUNTDOWN_SECONDS, FINISHER_BLOCKING_TIMEOUT_SECONDS, + FINISHER_MAX_SWEEP_ATTEMPTS, ReportService, ShouldCallNotifyResult, UploadFinisherTask, @@ -991,6 +992,9 @@ def test_soft_time_limit_handling(self, dbsession, mocker, mock_self_app): mocker.patch( "tasks.upload_finisher.load_commit_diff", side_effect=SoftTimeLimitExceeded ) + mock_delete_gate = mocker.patch.object( + UploadFinisherTask, "_delete_finisher_gate" + ) commit = CommitFactory.create() dbsession.add(commit) @@ -1020,6 +1024,7 @@ def test_soft_time_limit_handling(self, dbsession, mocker, mock_self_app): "sentry_trace_id": None, } ) + mock_delete_gate.assert_called_once_with(commit.repoid, commit.commitid) @pytest.mark.django_db def test_generic_exception_handling(self, dbsession, mocker, mock_self_app): @@ -1038,6 +1043,9 @@ def test_generic_exception_handling(self, dbsession, mocker, mock_self_app): previous_results = [{"upload_id": 0, "successful": True, "arguments": {}}] + mock_delete_gate = mocker.patch.object( + UploadFinisherTask, "_delete_finisher_gate" + ) result = UploadFinisherTask().run_impl( dbsession, previous_results, @@ -1074,6 +1082,7 @@ def test_generic_exception_handling(self, dbsession, mocker, mock_self_app): "sentry_trace_id": None, } ) + mock_delete_gate.assert_called_once_with(commit.repoid, commit.commitid) @pytest.mark.django_db def test_idempotency_check_skips_already_processed_uploads( @@ -1115,7 +1124,12 @@ def test_idempotency_check_skips_already_processed_uploads( {"upload_id": upload_3.id, "successful": False, "arguments": {}}, ] - result = UploadFinisherTask().run_impl( + task = UploadFinisherTask() + mock_count_pending = mocker.patch.object( + task, "_count_remaining_coverage_uploads", return_value=0 + ) + mock_delete_gate = mocker.patch.object(task, "_delete_finisher_gate") + result = task.run_impl( dbsession, previous_results, repoid=commit.repoid, @@ -1131,6 +1145,78 @@ def test_idempotency_check_skips_already_processed_uploads( # Verify that _process_reports_with_lock was NOT called mock_process.assert_not_called() + mock_count_pending.assert_called_once() + mock_delete_gate.assert_called_once() + + @pytest.mark.django_db + def test_idempotency_schedules_sweep_when_pending_uploads_remain( + self, dbsession, mocker, mock_self_app + ): + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + + report = CommitReport(commit_id=commit.id_) + dbsession.add(report) + dbsession.flush() + + upload_1 = UploadFactory.create(report=report, state="merged") + dbsession.add(upload_1) + dbsession.flush() + + task = UploadFinisherTask() + mock_count_pending = mocker.patch.object( + task, "_count_remaining_coverage_uploads", return_value=2 + ) + mock_schedule_sweep = mocker.patch.object(task, "_schedule_sweep") + mock_delete_gate = mocker.patch.object(task, "_delete_finisher_gate") + + result = task.run_impl( + dbsession, + [{"upload_id": upload_1.id, "successful": True, "arguments": {}}], + repoid=commit.repoid, + commitid=commit.commitid, + commit_yaml={}, + ) + + assert result == {"sweep_scheduled": True, "remaining_uploads": 2} + mock_count_pending.assert_called_once() + mock_schedule_sweep.assert_called_once() + mock_delete_gate.assert_not_called() + + @pytest.mark.django_db + def test_idempotency_does_not_reschedule_sweep_after_max_attempts( + self, dbsession, mocker, mock_self_app + ): + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + + report = CommitReport(commit_id=commit.id_) + dbsession.add(report) + dbsession.flush() + + upload_1 = UploadFactory.create(report=report, state="merged") + dbsession.add(upload_1) + dbsession.flush() + + task = UploadFinisherTask() + mocker.patch.object(task, "_count_remaining_coverage_uploads", return_value=2) + mock_schedule_sweep = mocker.patch.object(task, "_schedule_sweep") + mock_delete_gate = mocker.patch.object(task, "_delete_finisher_gate") + + result = task.run_impl( + dbsession, + [{"upload_id": upload_1.id, "successful": True, "arguments": {}}], + repoid=commit.repoid, + commitid=commit.commitid, + commit_yaml={}, + sweep_attempt=FINISHER_MAX_SWEEP_ATTEMPTS, + ) + + assert result == {"sweep_scheduled": False, "remaining_uploads": 2} + mock_schedule_sweep.assert_not_called() + mock_delete_gate.assert_called_once() @pytest.mark.django_db def test_idempotency_check_proceeds_when_uploads_not_finished( @@ -1448,6 +1534,66 @@ def mock_process_reports( # This is the key assertion - notifications should NOT be blocked by test_results uploads mock_handle_finisher_lock.assert_called_once() + @pytest.mark.django_db + def test_run_impl_deletes_gate_after_successful_completion( + self, dbsession, mocker, mock_self_app + ): + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + + task = UploadFinisherTask() + mocker.patch.object( + task, + "_reconstruct_processing_results", + return_value=[{"upload_id": 1, "successful": True, "arguments": {}}], + ) + mocker.patch.object(task, "_process_reports_with_lock") + mocker.patch.object(task, "_count_remaining_coverage_uploads", return_value=0) + mocker.patch.object(task, "_handle_finisher_lock", return_value={"done": True}) + mock_delete_gate = mocker.patch.object(task, "_delete_finisher_gate") + + result = task.run_impl( + dbsession, + processing_results=[], + repoid=commit.repoid, + commitid=commit.commitid, + commit_yaml={}, + ) + + assert result == {"done": True} + mock_delete_gate.assert_called_once() + + @pytest.mark.django_db + def test_run_impl_deletes_gate_when_finisher_lock_is_exhausted( + self, dbsession, mocker, mock_self_app + ): + commit = CommitFactory.create() + dbsession.add(commit) + dbsession.flush() + + task = UploadFinisherTask() + mocker.patch.object( + task, + "_reconstruct_processing_results", + return_value=[{"upload_id": 1, "successful": True, "arguments": {}}], + ) + mocker.patch.object(task, "_process_reports_with_lock") + mocker.patch.object(task, "_count_remaining_coverage_uploads", return_value=0) + mocker.patch.object(task, "_handle_finisher_lock", return_value=None) + mock_delete_gate = mocker.patch.object(task, "_delete_finisher_gate") + + result = task.run_impl( + dbsession, + processing_results=[], + repoid=commit.repoid, + commitid=commit.commitid, + commit_yaml={}, + ) + + assert result is None + mock_delete_gate.assert_called_once() + class TestLockManagerConfiguration: """Tests for lock manager configuration: finite blocking_timeout diff --git a/apps/worker/tasks/tests/unit/test_upload_processing_task.py b/apps/worker/tasks/tests/unit/test_upload_processing_task.py index 77738f732..7ebd3dfef 100644 --- a/apps/worker/tasks/tests/unit/test_upload_processing_task.py +++ b/apps/worker/tasks/tests/unit/test_upload_processing_task.py @@ -57,6 +57,13 @@ def mock_self_app(mocker, celery_app): ) +@pytest.fixture(autouse=True) +def mock_finisher_task_enqueue(mocker): + mock_celery_app = mocker.patch("services.processing.processing.celery_app") + mock_celery_app.tasks = {"app.tasks.upload.UploadFinisher": mocker.MagicMock()} + return mock_celery_app + + def test_default_acks_late() -> None: task = UploadProcessorTask() # task.acks_late is defined at import time, so it's difficult to test diff --git a/apps/worker/tasks/tests/unit/test_upload_task.py b/apps/worker/tasks/tests/unit/test_upload_task.py index 6be8e6f5a..a87e7757b 100644 --- a/apps/worker/tasks/tests/unit/test_upload_task.py +++ b/apps/worker/tasks/tests/unit/test_upload_task.py @@ -38,8 +38,6 @@ from tasks.test_results_processor import test_results_processor_task from tasks.tests.utils import ensure_hard_time_limit_task_is_numeric from tasks.upload import UploadContext, UploadTask -from tasks.upload_finisher import upload_finisher_task -from tasks.upload_processor import upload_processor_task here = Path(__file__) @@ -190,7 +188,9 @@ def test_upload_task_call( mock_self_app, ): _start_upload_flow(mocker) - mocked_chord = mocker.patch("tasks.upload.chord") + mocked_apply_async = mocker.patch( + "tasks.upload.upload_processor_task.apply_async" + ) url = "v4/raw/2019-05-22/C3C4715CA57C910D11D5EB899FC86A7E/4c4e4654ac25037ae869caeb3619d485970b6304/a84d445c-9c1e-434f-8275-f18f1f320f81.txt" commit = CommitFactory.create( @@ -233,26 +233,11 @@ def test_upload_task_call( .filter_by(report_id=commit.report.id, build_code="some_random_build") .first() ) - processor = upload_processor_task.s( - repoid=commit.repoid, - commitid="abf6d4df662c47e32460020ab14abf9303581429", - commit_yaml={"codecov": {"max_report_age": "1y ago"}}, - arguments={ - "url": url, - "flags": [], - "build": "some_random_build", - "upload_id": first_session.id, - "upload_pk": first_session.id, - }, - ) - kwargs = { - "repoid": commit.repoid, - "commitid": "abf6d4df662c47e32460020ab14abf9303581429", - "commit_yaml": {"codecov": {"max_report_age": "1y ago"}}, - } - kwargs[_kwargs_key(UploadFlow)] = mocker.ANY - finisher = upload_finisher_task.signature(kwargs=kwargs) - mocked_chord.assert_called_with([processor], finisher) + mocked_apply_async.assert_called_once() + scheduled_kwargs = mocked_apply_async.call_args.kwargs["kwargs"] + assert scheduled_kwargs["repoid"] == commit.repoid + assert scheduled_kwargs["commitid"] == commit.commitid + assert scheduled_kwargs["arguments"]["upload_id"] == first_session.id calls = [ call( "time_before_processing", @@ -807,7 +792,9 @@ def test_upload_task_upload_processing_delay_enough_delay( ) mock_configuration.set_params({"setup": {"upload_processing_delay": 1000}}) mocker.patch.object(UploadTask, "possibly_setup_webhooks", return_value=True) - mocked_chord = mocker.patch("tasks.upload.chord") + mocked_apply_async = mocker.patch( + "tasks.upload.upload_processor_task.apply_async" + ) dbsession.add(commit) dbsession.flush() @@ -834,7 +821,7 @@ def test_upload_task_upload_processing_delay_enough_delay( commit_report = commit.commit_report(report_type=ReportType.COVERAGE) assert commit_report is not None assert len(commit_report.uploads) == 2 - mocked_chord.assert_called_with([mocker.ANY, mocker.ANY], mocker.ANY) + assert mocked_apply_async.call_count == 2 mock_self_app.tasks[upload_breadcrumb_task_name].apply_async.assert_has_calls( [ call( @@ -879,7 +866,9 @@ def test_upload_task_upload_processing_delay_upload_is_none( "tasks.upload.possibly_update_commit_from_provider_info", return_value=True ) mocker.patch.object(UploadTask, "possibly_setup_webhooks", return_value=True) - mocked_chord = mocker.patch("tasks.upload.chord") + mocked_apply_async = mocker.patch( + "tasks.upload.upload_processor_task.apply_async" + ) commit = CommitFactory.create( parent_commit_id=None, message="", @@ -912,7 +901,7 @@ def test_upload_task_upload_processing_delay_upload_is_none( commit_report = commit.commit_report(report_type=ReportType.COVERAGE) assert commit_report is not None assert len(commit_report.uploads) == 2 - mocked_chord.assert_called_with([mocker.ANY, mocker.ANY], mocker.ANY) + assert mocked_apply_async.call_count == 2 mock_self_app.tasks[upload_breadcrumb_task_name].apply_async.assert_has_calls( [ call( @@ -952,7 +941,9 @@ def test_upload_task_call_multiple_processors( mock_storage, mock_self_app, ): - mocked_chord = mocker.patch("tasks.upload.chord") + mocked_apply_async = mocker.patch( + "tasks.upload.upload_processor_task.apply_async" + ) commit = CommitFactory.create( message="", @@ -994,29 +985,7 @@ def test_upload_task_call_multiple_processors( commit_report = commit.commit_report(report_type=ReportType.COVERAGE) assert commit_report is not None assert len(commit_report.uploads) == 8 - processors = [ - upload_processor_task.s( - repoid=commit.repoid, - commitid="abf6d4df662c47e32460020ab14abf9303581429", - commit_yaml={"codecov": {"max_report_age": "1y ago"}}, - arguments={ - **arguments, - "flags": [], - "upload_id": mocker.ANY, - "upload_pk": mocker.ANY, - }, - ) - for arguments in redis_queue - ] - processors.reverse() # whatever the reason - kwargs = { - "repoid": commit.repoid, - "commitid": "abf6d4df662c47e32460020ab14abf9303581429", - "commit_yaml": {"codecov": {"max_report_age": "1y ago"}}, - } - kwargs[_kwargs_key(UploadFlow)] = mocker.ANY - t_final = upload_finisher_task.signature(kwargs=kwargs) - mocked_chord.assert_called_with(processors, t_final) + assert mocked_apply_async.call_count == 8 mock_self_app.tasks[upload_breadcrumb_task_name].apply_async.assert_has_calls( [ call( @@ -1590,7 +1559,9 @@ def test_schedule_task_with_one_task( mocker.patch( "tasks.upload.UploadTask.possibly_setup_webhooks", return_value=True ) - mocked_chord = mocker.patch("tasks.upload.chord") + mocked_apply_async = mocker.patch( + "tasks.upload.upload_processor_task.apply_async" + ) commit = CommitFactory.create() commit_yaml = {"codecov": {"max_report_age": "100y ago"}} dbsession.add(commit) @@ -1607,22 +1578,8 @@ def test_schedule_task_with_one_task( ReportFactory.create(), upload_args, ) - assert result == mocked_chord.return_value.apply_async.return_value - processor = upload_processor_task.s( - repoid=commit.repoid, - commitid=commit.commitid, - commit_yaml=commit_yaml, - arguments={"upload_id": 1, "upload_pk": 1}, - ) - finisher = upload_finisher_task.signature( - kwargs={ - "repoid": commit.repoid, - "commitid": commit.commitid, - "commit_yaml": commit_yaml, - _kwargs_key(UploadFlow): mocker.ANY, - } - ) - mocked_chord.assert_called_with([processor], finisher) + assert result is not None + mocked_apply_async.assert_called_once() mock_self_app.tasks[ upload_breadcrumb_task_name ].apply_async.assert_called_once_with( @@ -2205,7 +2162,9 @@ def test_upload_debounce_limit( self, dbsession, mocker, mock_config, mock_storage, mock_self_app ): mock_config(10, "setup", "upload_debounce_limit") - mocked_chord = mocker.patch("tasks.upload.chord") + mocked_apply_async = mocker.patch( + "tasks.upload.upload_processor_task.apply_async" + ) mocked_fetch_yaml = mocker.patch( "tasks.upload.fetch_commit_yaml_and_possibly_store" ) @@ -2256,21 +2215,14 @@ def test_upload_debounce_limit( commit.commitid, ) - # every chord instance gets called 5 times - assert len(mocked_chord.mock_calls) / 5 == 3 - - chord_calls = mocked_chord.call_args_list - all_processor_tasks = [] - for chord_call in chord_calls: - processor_tasks = chord_call[0][0] - all_processor_tasks.extend(processor_tasks) + assert mocked_apply_async.call_count == 25 actual_storage_paths = [] - for task in all_processor_tasks: - if hasattr(task, "kwargs") and "arguments" in task.kwargs: - arguments = task.kwargs["arguments"] - if "url" in arguments: - actual_storage_paths.append(arguments["url"]) + for call_args in mocked_apply_async.call_args_list: + kwargs = call_args.kwargs.get("kwargs", {}) + arguments = kwargs.get("arguments", {}) + if "url" in arguments: + actual_storage_paths.append(arguments["url"]) expected_storage_paths = [item["url"] for item in redis_queue] diff --git a/apps/worker/tasks/upload.py b/apps/worker/tasks/upload.py index d91b6186b..fa7460916 100644 --- a/apps/worker/tasks/upload.py +++ b/apps/worker/tasks/upload.py @@ -4,12 +4,13 @@ import uuid from copy import deepcopy from datetime import UTC, datetime +from types import SimpleNamespace from typing import Any, TypedDict import orjson import sentry_sdk from asgiref.sync import async_to_sync -from celery import chain, chord +from celery import chain from celery.exceptions import SoftTimeLimitExceeded from django.conf import settings from redis import Redis @@ -54,7 +55,6 @@ from tasks.bundle_analysis_processor import bundle_analysis_processor_task from tasks.test_results_finisher import test_results_finisher_task from tasks.test_results_processor import test_results_processor_task -from tasks.upload_finisher import upload_finisher_task from tasks.upload_processor import UPLOAD_PROCESSING_LOCK_NAME, upload_processor_task log = logging.getLogger(__name__) @@ -841,25 +841,25 @@ def _schedule_coverage_processing_task( ) parallel_processing_tasks = [ - upload_processor_task.s( - repoid=commit.repoid, - commitid=commit.commitid, - commit_yaml=commit_yaml, - arguments=arguments, - ) + { + "repoid": commit.repoid, + "commitid": commit.commitid, + "commit_yaml": commit_yaml, + "arguments": arguments, + } for arguments in argument_list ] - finisher_kwargs = { - "repoid": commit.repoid, - "commitid": commit.commitid, - "commit_yaml": commit_yaml, - } - finisher_kwargs = UploadFlow.save_to_kwargs(finisher_kwargs) - finish_parallel_sig = upload_finisher_task.signature(kwargs=finisher_kwargs) - - parallel_tasks = chord(parallel_processing_tasks, finish_parallel_sig) - return parallel_tasks.apply_async() + scheduled_tasks = [ + upload_processor_task.apply_async(kwargs=processor_task_kwargs) + for processor_task_kwargs in parallel_processing_tasks + ] + scheduled_task_ids = [ + task_result.id + for task_result in scheduled_tasks + if task_result and task_result.id + ] + return SimpleNamespace(as_tuple=lambda: tuple(scheduled_task_ids)) def _schedule_bundle_analysis_processing_task( self, diff --git a/apps/worker/tasks/upload_finisher.py b/apps/worker/tasks/upload_finisher.py index a11823dfe..fc3eab531 100644 --- a/apps/worker/tasks/upload_finisher.py +++ b/apps/worker/tasks/upload_finisher.py @@ -25,6 +25,7 @@ load_intermediate_reports, ) from services.processing.merging import merge_reports, update_uploads +from services.processing.processing import FINISHER_GATE_TTL_SECONDS, finisher_gate_key from services.processing.state import ProcessingState from services.processing.types import ProcessingResult from services.report import ReportService @@ -55,6 +56,8 @@ FINISHER_BLOCKING_TIMEOUT_SECONDS = 30 FINISHER_BASE_RETRY_COUNTDOWN_SECONDS = 10 +FINISHER_SWEEP_COUNTDOWN_SECONDS = 30 +FINISHER_MAX_SWEEP_ATTEMPTS = 20 UPLOAD_FINISHER_ALREADY_COMPLETED_COUNTER = Counter( "upload_finisher_already_completed", @@ -86,6 +89,39 @@ class UploadFinisherTask(BaseCodecovTask, name=upload_finisher_task_name): max_retries = UPLOAD_PROCESSOR_MAX_RETRIES + def _schedule_sweep( + self, repoid: int, commitid: str, commit_yaml: UserYaml, sweep_attempt: int + ): + # Keep the commit-level finisher gate alive while sweep retries are ongoing. + get_redis_connection().expire( + finisher_gate_key(repoid, commitid), FINISHER_GATE_TTL_SECONDS + ) + self.app.tasks[upload_finisher_task_name].apply_async( + kwargs={ + "repoid": repoid, + "commitid": commitid, + "commit_yaml": commit_yaml.to_dict(), + "trigger": "sweep", + "sweep_attempt": sweep_attempt + 1, + }, + countdown=FINISHER_SWEEP_COUNTDOWN_SECONDS, + ) + + def _delete_finisher_gate(self, repoid: int, commitid: str): + get_redis_connection().delete(finisher_gate_key(repoid, commitid)) + + def _count_remaining_coverage_uploads(self, db_session, commit: Commit) -> int: + return ( + db_session.query(Upload) + .join(Upload.report) + .filter( + Upload.report.has(commit=commit), + Upload.report.has(report_type=ReportType.COVERAGE.value), + Upload.state_id == UploadState.UPLOADED.db_id, + ) + .count() + ) + def _find_started_uploads_with_reports( self, db_session, commit: Commit ) -> set[int]: @@ -251,6 +287,7 @@ def run_impl( repoid = int(repoid) commit_yaml = UserYaml(commit_yaml) + sweep_attempt = int(kwargs.get("sweep_attempt", 0)) log.info("run_impl: Getting commit") @@ -305,6 +342,30 @@ def run_impl( for upload in uploads_in_db ) if all_already_finalized: + remaining_uploads = self._count_remaining_coverage_uploads( + db_session, commit + ) + if remaining_uploads > 0: + sweep_scheduled = False + if sweep_attempt < FINISHER_MAX_SWEEP_ATTEMPTS: + self._schedule_sweep( + repoid, commitid, commit_yaml, sweep_attempt + ) + sweep_scheduled = True + else: + self._delete_finisher_gate(repoid, commitid) + UploadFlow.log(UploadFlow.PROCESSING_COMPLETE) + UploadFlow.log(UploadFlow.SKIPPING_NOTIFICATION) + self._call_upload_breadcrumb_task( + commit_sha=commitid, + repo_id=repoid, + milestone=milestone, + upload_ids=upload_ids, + ) + return { + "sweep_scheduled": sweep_scheduled, + "remaining_uploads": remaining_uploads, + } log.info( "All uploads already in final state, skipping finisher work", extra={ @@ -313,6 +374,7 @@ def run_impl( }, ) inc_counter(UPLOAD_FINISHER_ALREADY_COMPLETED_COUNTER) + self._delete_finisher_gate(repoid, commitid) return { "already_completed": True, "upload_ids": upload_ids, @@ -334,22 +396,21 @@ def run_impl( # Check if there are still unprocessed coverage uploads in the database # Use DB as source of truth - if any coverage uploads are still in UPLOADED state, # another finisher will process them and we shouldn't send notifications yet - remaining_uploads = ( - db_session.query(Upload) - .join(Upload.report) - .filter( - Upload.report.has(commit=commit), - Upload.report.has(report_type=ReportType.COVERAGE.value), - Upload.state_id == UploadState.UPLOADED.db_id, - ) - .count() + remaining_uploads = self._count_remaining_coverage_uploads( + db_session, commit ) if remaining_uploads > 0: log.info( - "run_impl: Postprocessing should not be triggered - uploads still pending", + "run_impl: Scheduling sweep because uploads are still pending", extra={"remaining_uploads": remaining_uploads}, ) + sweep_scheduled = False + if sweep_attempt < FINISHER_MAX_SWEEP_ATTEMPTS: + self._schedule_sweep(repoid, commitid, commit_yaml, sweep_attempt) + sweep_scheduled = True + else: + self._delete_finisher_gate(repoid, commitid) UploadFlow.log(UploadFlow.PROCESSING_COMPLETE) UploadFlow.log(UploadFlow.SKIPPING_NOTIFICATION) self._call_upload_breadcrumb_task( @@ -358,11 +419,14 @@ def run_impl( milestone=milestone, upload_ids=upload_ids, ) - return + return { + "sweep_scheduled": sweep_scheduled, + "remaining_uploads": remaining_uploads, + } log.info("run_impl: Handling finisher lock") - return self._handle_finisher_lock( + result = self._handle_finisher_lock( db_session, commit, commit_yaml, @@ -370,12 +434,17 @@ def run_impl( milestone, upload_ids, ) + # `_handle_finisher_lock` only returns `None` on terminal lock exhaustion. + # Clear gate in either terminal outcome to avoid stalling this commit. + self._delete_finisher_gate(repoid, commitid) + return result except Retry: raise except SoftTimeLimitExceeded: log.warning("run_impl: soft time limit exceeded") + self._delete_finisher_gate(repoid, commitid) self._call_upload_breadcrumb_task( commit_sha=commitid, repo_id=repoid, @@ -390,6 +459,7 @@ def run_impl( except Exception as e: log.exception("run_impl: unexpected error in upload finisher") + self._delete_finisher_gate(repoid, commitid) sentry_sdk.capture_exception(e) log.exception( "Unexpected error in upload finisher",