diff --git a/apps/worker/services/processing/processing.py b/apps/worker/services/processing/processing.py index d80c819b0..ed5865f96 100644 --- a/apps/worker/services/processing/processing.py +++ b/apps/worker/services/processing/processing.py @@ -53,8 +53,8 @@ def process_upload( upload = db_session.query(Upload).filter_by(id_=upload_id).first() assert upload - state = ProcessingState(repo_id, commit_sha) - # this in a noop in normal cases, but relevant for task retries: + state = ProcessingState(repo_id, commit_sha, db_session=db_session) + # this is a noop in normal cases, but relevant for task retries: state.mark_uploads_as_processing([upload_id]) report_service = ReportService(commit_yaml) @@ -108,7 +108,6 @@ def process_upload( celery_app.tasks[upload_finisher_task_name].apply_async( kwargs=finisher_kwargs ) - rewrite_or_delete_upload(archive_service, commit_yaml, report_info) except CeleryError: diff --git a/apps/worker/services/processing/state.py b/apps/worker/services/processing/state.py index ca433f7f2..8380c16fa 100644 --- a/apps/worker/services/processing/state.py +++ b/apps/worker/services/processing/state.py @@ -21,17 +21,21 @@ "intermediate report". """ +import logging from dataclasses import dataclass -from shared.helpers.redis import get_redis_connection +from sqlalchemy import case, func +from sqlalchemy.orm import Session + +from database.enums import ReportType +from database.models.core import Commit +from database.models.reports import CommitReport, Upload from shared.metrics import Counter +from shared.reports.enums import UploadState -MERGE_BATCH_SIZE = 10 +log = logging.getLogger(__name__) -# TTL for processing state keys in Redis (24 hours, matches intermediate report TTL) -# This prevents state keys from accumulating indefinitely and ensures consistency -# with intermediate report expiration -PROCESSING_STATE_TTL = 24 * 60 * 60 +MERGE_BATCH_SIZE = 10 CLEARED_UPLOADS = Counter( "worker_processing_cleared_uploads", @@ -75,59 +79,118 @@ def should_trigger_postprocessing(uploads: UploadNumbers) -> bool: class ProcessingState: - def __init__(self, repoid: int, commitsha: str) -> None: - self._redis = get_redis_connection() + def __init__(self, repoid: int, commitsha: str, db_session: Session) -> None: self.repoid = repoid self.commitsha = commitsha + self._db_session = db_session def get_upload_numbers(self): - processing = self._redis.scard(self._redis_key("processing")) - processed = self._redis.scard(self._redis_key("processed")) - return UploadNumbers(processing, processed) + row = ( + self._db_session.query( + func.count( + case( + ( + Upload.state_id == UploadState.UPLOADED.db_id, + Upload.id_, + ), + ) + ), + func.count( + case( + ( + Upload.state_id == UploadState.PROCESSED.db_id, + Upload.id_, + ), + ) + ), + ) + .join(CommitReport, Upload.report_id == CommitReport.id_) + .join(Commit, CommitReport.commit_id == Commit.id_) + .filter( + Commit.repoid == self.repoid, + Commit.commitid == self.commitsha, + (CommitReport.report_type == None) # noqa: E711 + | (CommitReport.report_type == ReportType.COVERAGE.value), + ) + .one() + ) + return UploadNumbers(processing=row[0], processed=row[1]) def mark_uploads_as_processing(self, upload_ids: list[int]): - if not upload_ids: - return - key = self._redis_key("processing") - self._redis.sadd(key, *upload_ids) - # Set TTL to match intermediate report expiration (24 hours) - # This ensures state keys don't accumulate indefinitely - self._redis.expire(key, PROCESSING_STATE_TTL) + # No-op: uploads are created with state_id=UPLOADED, which + # get_upload_numbers() already counts as "processing". + pass def clear_in_progress_uploads(self, upload_ids: list[int]): if not upload_ids: return - removed_uploads = self._redis.srem(self._redis_key("processing"), *upload_ids) - if removed_uploads > 0: - # the normal flow would move the uploads from the "processing" set - # to the "processed" set via `mark_upload_as_processed`. - # this function here is only called in the error case and we don't expect - # this to be triggered often, if at all. - CLEARED_UPLOADS.inc(removed_uploads) + # Mark still-UPLOADED uploads as ERROR so they stop being counted + # as "processing" in get_upload_numbers(). Only matches UPLOADED -- + # already-PROCESSED uploads (success path) are unaffected. + # + # This runs in a finally block, so the transaction may already be + # in a failed state. Best-effort: log and move on if the DB is + # unreachable — the upload stays UPLOADED, which is safe. + try: + updated = ( + self._db_session.query(Upload) + .filter( + Upload.id_.in_(upload_ids), + Upload.state_id == UploadState.UPLOADED.db_id, + ) + .update( + { + Upload.state_id: UploadState.ERROR.db_id, + Upload.state: "error", + }, + synchronize_session="fetch", + ) + ) + if updated > 0: + CLEARED_UPLOADS.inc(updated) + except Exception: + log.warning( + "Failed to clear in-progress uploads (transaction may be aborted)", + extra={"upload_ids": upload_ids}, + exc_info=True, + ) def mark_upload_as_processed(self, upload_id: int): - processing_key = self._redis_key("processing") - processed_key = self._redis_key("processed") - - res = self._redis.smove(processing_key, processed_key, upload_id) - if not res: - # this can happen when `upload_id` was never in the source set, - # which probably is the case during initial deployment as - # the code adding this to the initial set was not deployed yet - # TODO: make sure to remove this code after a grace period - self._redis.sadd(processed_key, upload_id) - - # Set TTL on processed key to match intermediate report expiration - # This ensures uploads marked as processed have a bounded lifetime - self._redis.expire(processed_key, PROCESSING_STATE_TTL) + upload = self._db_session.query(Upload).get(upload_id) + if upload: + upload.state_id = UploadState.PROCESSED.db_id + # Don't set upload.state here -- the finisher's idempotency check + # uses state="processed" to detect already-merged uploads. + # The state string is set by update_uploads() after merging. def mark_uploads_as_merged(self, upload_ids: list[int]): if not upload_ids: return - self._redis.srem(self._redis_key("processed"), *upload_ids) + self._db_session.query(Upload).filter( + Upload.id_.in_(upload_ids), + Upload.state_id == UploadState.PROCESSED.db_id, + ).update( + { + Upload.state_id: UploadState.MERGED.db_id, + Upload.state: "merged", + }, + synchronize_session="fetch", + ) + self._db_session.commit() def get_uploads_for_merging(self) -> set[int]: - return {int(id) for id in self._redis.smembers(self._redis_key("processed"))} - - def _redis_key(self, state: str) -> str: - return f"upload-processing-state/{self.repoid}/{self.commitsha}/{state}" + rows = ( + self._db_session.query(Upload.id_) + .join(CommitReport, Upload.report_id == CommitReport.id_) + .join(Commit, CommitReport.commit_id == Commit.id_) + .filter( + Commit.repoid == self.repoid, + Commit.commitid == self.commitsha, + (CommitReport.report_type == None) # noqa: E711 + | (CommitReport.report_type == ReportType.COVERAGE.value), + Upload.state_id == UploadState.PROCESSED.db_id, + ) + .limit(MERGE_BATCH_SIZE) + .all() + ) + return {row[0] for row in rows} diff --git a/apps/worker/services/tests/test_merging.py b/apps/worker/services/tests/test_merging.py new file mode 100644 index 000000000..eb8f99d33 --- /dev/null +++ b/apps/worker/services/tests/test_merging.py @@ -0,0 +1,68 @@ +import pytest + +from database.tests.factories.core import ( + CommitFactory, + ReportFactory, + RepositoryFactory, + UploadFactory, +) +from services.processing.merging import update_uploads +from services.processing.types import MergeResult, ProcessingResult +from shared.reports.enums import UploadState +from shared.yaml import UserYaml + + +@pytest.mark.django_db(databases={"default"}) +class TestUpdateUploadsState: + def test_successful_uploads_set_to_merged(self, dbsession): + repository = RepositoryFactory.create() + commit = CommitFactory.create(repository=repository) + report = ReportFactory.create(commit=commit) + upload = UploadFactory.create( + report=report, + state="started", + state_id=UploadState.UPLOADED.db_id, + ) + dbsession.add_all([repository, commit, report, upload]) + dbsession.flush() + + processing_results: list[ProcessingResult] = [ + {"upload_id": upload.id_, "successful": True, "arguments": {}}, + ] + merge_result = MergeResult( + session_mapping={upload.id_: 0}, deleted_sessions=set() + ) + + update_uploads(dbsession, UserYaml({}), processing_results, [], merge_result) + + dbsession.refresh(upload) + assert upload.state_id == UploadState.MERGED.db_id + assert upload.state == "merged" + + def test_failed_uploads_set_to_error(self, dbsession): + repository = RepositoryFactory.create() + commit = CommitFactory.create(repository=repository) + report = ReportFactory.create(commit=commit) + upload = UploadFactory.create( + report=report, + state="started", + state_id=UploadState.UPLOADED.db_id, + ) + dbsession.add_all([repository, commit, report, upload]) + dbsession.flush() + + processing_results: list[ProcessingResult] = [ + { + "upload_id": upload.id_, + "successful": False, + "arguments": {}, + "error": {"code": "report_empty", "params": {}}, + }, + ] + merge_result = MergeResult(session_mapping={}, deleted_sessions=set()) + + update_uploads(dbsession, UserYaml({}), processing_results, [], merge_result) + + dbsession.refresh(upload) + assert upload.state_id == UploadState.ERROR.db_id + assert upload.state == "error" diff --git a/apps/worker/services/tests/test_processing.py b/apps/worker/services/tests/test_processing.py index 04b5b4996..63ba3509b 100644 --- a/apps/worker/services/tests/test_processing.py +++ b/apps/worker/services/tests/test_processing.py @@ -9,6 +9,7 @@ ) from services.processing.processing import process_upload from services.processing.types import UploadArguments +from shared.reports.enums import UploadState from shared.yaml import UserYaml @@ -23,6 +24,7 @@ def test_triggers_finisher_when_gate_is_acquired( upload = UploadFactory.create( report__commit=commit, state="started", + state_id=UploadState.UPLOADED.db_id, ) dbsession.add_all([repository, commit, upload]) dbsession.flush() @@ -34,7 +36,6 @@ def test_triggers_finisher_when_gate_is_acquired( "reportid": str(upload.report.external_id), } - # Mock dependencies mock_report_service = mocker.patch( "services.processing.processing.ReportService" ) @@ -63,13 +64,10 @@ def test_triggers_finisher_when_gate_is_acquired( mock_redis = mocker.patch("services.processing.processing.get_redis_connection") mock_redis.return_value.set.return_value = True - # Mock other dependencies mocker.patch("services.processing.processing.save_intermediate_report") mocker.patch("services.processing.processing.rewrite_or_delete_upload") - commit_yaml = UserYaml({}) - # Execute result = process_upload( on_processing_error=lambda error: None, db_session=dbsession, @@ -79,10 +77,8 @@ def test_triggers_finisher_when_gate_is_acquired( arguments=arguments, ) - # Verify assert result["successful"] is True assert result["upload_id"] == upload.id_ - # Verify finisher was triggered mock_finisher_task.apply_async.assert_called_once_with( kwargs={ @@ -92,6 +88,11 @@ def test_triggers_finisher_when_gate_is_acquired( } ) mock_redis.return_value.set.assert_called_once() + dbsession.refresh(upload) + assert upload.state_id == UploadState.PROCESSED.db_id + # state string is not updated by the processor -- the finisher sets it + # after merging (to avoid triggering the finisher's idempotency check early) + assert upload.state == "started" def test_does_not_trigger_finisher_when_gate_exists( self, dbsession, mocker, mock_storage diff --git a/apps/worker/services/tests/test_processing_state.py b/apps/worker/services/tests/test_processing_state.py index 0382a6f6e..90ae3699f 100644 --- a/apps/worker/services/tests/test_processing_state.py +++ b/apps/worker/services/tests/test_processing_state.py @@ -1,191 +1,191 @@ -from unittest.mock import MagicMock, patch -from uuid import uuid4 - import pytest +from database.tests.factories.core import ( + CommitFactory, + ReportFactory, + RepositoryFactory, + UploadFactory, +) from services.processing.state import ( ProcessingState, should_perform_merge, should_trigger_postprocessing, ) +from shared.reports.enums import UploadState +@pytest.mark.django_db(databases={"default"}) +class TestProcessingState: + @pytest.fixture + def setup_commit(self, dbsession): + repository = RepositoryFactory.create() + dbsession.add(repository) + dbsession.flush() + commit = CommitFactory.create(repository=repository) + dbsession.add(commit) + dbsession.flush() + report = ReportFactory.create(commit=commit) + dbsession.add(report) + dbsession.flush() + return repository, commit, report + + def _create_upload(self, dbsession, report, state_id): + upload = UploadFactory.create( + report=report, + state="uploaded", + state_id=state_id, + ) + dbsession.add(upload) + dbsession.flush() + return upload + + def test_get_upload_numbers_empty(self, dbsession, setup_commit): + _, commit, _ = setup_commit + state = ProcessingState(commit.repoid, commit.commitid, dbsession) + numbers = state.get_upload_numbers() + assert numbers.processing == 0 + assert numbers.processed == 0 + + def test_get_upload_numbers_with_uploads(self, dbsession, setup_commit): + _, commit, report = setup_commit + self._create_upload(dbsession, report, UploadState.UPLOADED.db_id) + self._create_upload(dbsession, report, UploadState.UPLOADED.db_id) + self._create_upload(dbsession, report, UploadState.PROCESSED.db_id) + + state = ProcessingState(commit.repoid, commit.commitid, dbsession) + numbers = state.get_upload_numbers() + assert numbers.processing == 2 + assert numbers.processed == 1 + + def test_mark_upload_as_processed(self, dbsession, setup_commit): + _, commit, report = setup_commit + upload = self._create_upload(dbsession, report, UploadState.UPLOADED.db_id) + + state = ProcessingState(commit.repoid, commit.commitid, dbsession) + state.mark_upload_as_processed(upload.id_) + dbsession.flush() + + dbsession.refresh(upload) + assert upload.state_id == UploadState.PROCESSED.db_id + # state string is intentionally NOT set here -- the finisher's + # idempotency check uses state="processed" to detect already-merged uploads + assert upload.state == "uploaded" + + def test_mark_uploads_as_merged(self, dbsession, setup_commit): + _, commit, report = setup_commit + u1 = self._create_upload(dbsession, report, UploadState.PROCESSED.db_id) + u2 = self._create_upload(dbsession, report, UploadState.PROCESSED.db_id) + + state = ProcessingState(commit.repoid, commit.commitid, dbsession) + state.mark_uploads_as_merged([u1.id_, u2.id_]) + dbsession.flush() + + dbsession.refresh(u1) + dbsession.refresh(u2) + assert u1.state_id == UploadState.MERGED.db_id + assert u1.state == "merged" + assert u2.state_id == UploadState.MERGED.db_id + assert u2.state == "merged" + + def test_mark_uploads_as_merged_empty_list(self, dbsession, setup_commit): + _, commit, _ = setup_commit + state = ProcessingState(commit.repoid, commit.commitid, dbsession) + state.mark_uploads_as_merged([]) -def test_single_upload(): - state = ProcessingState(1234, uuid4().hex) - state.mark_uploads_as_processing([1]) - - state.mark_upload_as_processed(1) - - # this is the only in-progress upload, nothing more to expect - assert should_perform_merge(state.get_upload_numbers()) - - assert state.get_uploads_for_merging() == {1} - state.mark_uploads_as_merged([1]) - - assert should_trigger_postprocessing(state.get_upload_numbers()) - - -def test_concurrent_uploads(): - state = ProcessingState(1234, uuid4().hex) - state.mark_uploads_as_processing([1]) - - state.mark_upload_as_processed(1) - # meanwhile, another upload comes in: - state.mark_uploads_as_processing([2]) - - # not merging/postprocessing yet, as that will be debounced with the second upload - assert not should_perform_merge(state.get_upload_numbers()) - - state.mark_upload_as_processed(2) - - assert should_perform_merge(state.get_upload_numbers()) - - assert state.get_uploads_for_merging() == {1, 2} - state.mark_uploads_as_merged([1, 2]) - - assert should_trigger_postprocessing(state.get_upload_numbers()) - - -def test_batch_merging_many_uploads(): - state = ProcessingState(1234, uuid4().hex) - - state.mark_uploads_as_processing([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]) - - for id in range(1, 12): - state.mark_upload_as_processed(id) - - # We now reconstruct all processed uploads in one pass. - assert should_perform_merge(state.get_upload_numbers()) - merging = state.get_uploads_for_merging() - assert len(merging) == 11 - state.mark_uploads_as_merged(merging) - - # but no notifications yet - assert not should_trigger_postprocessing(state.get_upload_numbers()) - - state.mark_upload_as_processed(12) + def test_get_uploads_for_merging(self, dbsession, setup_commit): + _, commit, report = setup_commit + u1 = self._create_upload(dbsession, report, UploadState.PROCESSED.db_id) + u2 = self._create_upload(dbsession, report, UploadState.PROCESSED.db_id) + self._create_upload(dbsession, report, UploadState.UPLOADED.db_id) + self._create_upload(dbsession, report, UploadState.MERGED.db_id) - # with the last upload being processed, we do another merge, and then trigger notifications - assert should_perform_merge(state.get_upload_numbers()) - merging = state.get_uploads_for_merging() - assert len(merging) == 1 - state.mark_uploads_as_merged(merging) + state = ProcessingState(commit.repoid, commit.commitid, dbsession) + merging = state.get_uploads_for_merging() + assert merging == {u1.id_, u2.id_} - assert should_trigger_postprocessing(state.get_upload_numbers()) + def test_get_uploads_for_merging_respects_batch_size(self, dbsession, setup_commit): + _, commit, report = setup_commit + for _ in range(15): + self._create_upload(dbsession, report, UploadState.PROCESSED.db_id) + state = ProcessingState(commit.repoid, commit.commitid, dbsession) + merging = state.get_uploads_for_merging() + assert len(merging) == 10 -class TestProcessingStateEmptyListGuards: - """Tests for empty list guards in ProcessingState methods.""" + def test_mark_uploads_as_processing_is_noop(self, dbsession, setup_commit): + _, commit, report = setup_commit + upload = self._create_upload(dbsession, report, UploadState.UPLOADED.db_id) - @pytest.fixture - def mock_redis(self): - """Create a mock Redis connection.""" - with patch("services.processing.state.get_redis_connection") as mock_get_redis: - mock_redis = MagicMock() - mock_get_redis.return_value = mock_redis - yield mock_redis + state = ProcessingState(commit.repoid, commit.commitid, dbsession) + state.mark_uploads_as_processing([upload.id_]) - def test_mark_uploads_as_processing_empty_list(self, mock_redis): - """Test that mark_uploads_as_processing handles empty list gracefully.""" - state = ProcessingState(1234, uuid4().hex) - state._redis = mock_redis + dbsession.refresh(upload) + assert upload.state_id == UploadState.UPLOADED.db_id - # Should not call Redis when empty list is passed - state.mark_uploads_as_processing([]) + def test_clear_in_progress_uploads_sets_error_on_uploaded( + self, dbsession, setup_commit + ): + """Uploaded uploads are set to ERROR so they stop counting as 'processing'.""" + _, commit, report = setup_commit + upload = self._create_upload(dbsession, report, UploadState.UPLOADED.db_id) - mock_redis.sadd.assert_not_called() - mock_redis.expire.assert_not_called() + state = ProcessingState(commit.repoid, commit.commitid, dbsession) + state.clear_in_progress_uploads([upload.id_]) - def test_mark_uploads_as_processing_non_empty_list(self, mock_redis): - """Test that mark_uploads_as_processing works with non-empty list.""" - state = ProcessingState(1234, uuid4().hex) - state._redis = mock_redis + dbsession.refresh(upload) + assert upload.state_id == UploadState.ERROR.db_id + assert upload.state == "error" - state.mark_uploads_as_processing([1, 2, 3]) + def test_clear_in_progress_uploads_skips_processed(self, dbsession, setup_commit): + """Already-processed uploads are not affected (success path in finally block).""" + _, commit, report = setup_commit + upload = self._create_upload(dbsession, report, UploadState.PROCESSED.db_id) - mock_redis.sadd.assert_called_once() - mock_redis.expire.assert_called() + state = ProcessingState(commit.repoid, commit.commitid, dbsession) + state.clear_in_progress_uploads([upload.id_]) - def test_clear_in_progress_uploads_empty_list(self, mock_redis): - """Test that clear_in_progress_uploads handles empty list gracefully.""" - state = ProcessingState(1234, uuid4().hex) - state._redis = mock_redis + dbsession.refresh(upload) + assert upload.state_id == UploadState.PROCESSED.db_id - # Should not call Redis when empty list is passed + def test_clear_in_progress_uploads_empty_list(self, dbsession, setup_commit): + _, commit, _ = setup_commit + state = ProcessingState(commit.repoid, commit.commitid, dbsession) state.clear_in_progress_uploads([]) - mock_redis.srem.assert_not_called() + def test_full_lifecycle(self, dbsession, setup_commit): + """End-to-end: UPLOADED -> PROCESSED -> MERGED with DB state.""" + _, commit, report = setup_commit + u1 = self._create_upload(dbsession, report, UploadState.UPLOADED.db_id) + u2 = self._create_upload(dbsession, report, UploadState.UPLOADED.db_id) - def test_clear_in_progress_uploads_non_empty_list(self, mock_redis): - """Test that clear_in_progress_uploads works with non-empty list.""" - state = ProcessingState(1234, uuid4().hex) - state._redis = mock_redis - mock_redis.srem.return_value = 3 + state = ProcessingState(commit.repoid, commit.commitid, dbsession) - state.clear_in_progress_uploads([1, 2, 3]) + numbers = state.get_upload_numbers() + assert numbers.processing == 2 + assert numbers.processed == 0 - mock_redis.srem.assert_called_once() + state.mark_upload_as_processed(u1.id_) + dbsession.flush() - def test_mark_uploads_as_merged_empty_list(self, mock_redis): - """Test that mark_uploads_as_merged handles empty list gracefully.""" - state = ProcessingState(1234, uuid4().hex) - state._redis = mock_redis + numbers = state.get_upload_numbers() + assert numbers.processing == 1 + assert numbers.processed == 1 + assert not should_perform_merge(numbers) - # Should not call Redis when empty list is passed - state.mark_uploads_as_merged([]) + state.mark_upload_as_processed(u2.id_) + dbsession.flush() - mock_redis.srem.assert_not_called() + numbers = state.get_upload_numbers() + assert numbers.processing == 0 + assert numbers.processed == 2 + assert should_perform_merge(numbers) - def test_mark_uploads_as_merged_non_empty_list(self, mock_redis): - """Test that mark_uploads_as_merged works with non-empty list.""" - state = ProcessingState(1234, uuid4().hex) - state._redis = mock_redis + merging = state.get_uploads_for_merging() + assert merging == {u1.id_, u2.id_} - state.mark_uploads_as_merged([1, 2, 3]) + state.mark_uploads_as_merged(list(merging)) + dbsession.flush() - mock_redis.srem.assert_called_once() - - -@pytest.mark.parametrize( - "method_name,upload_ids,should_call_redis", - [ - ("mark_uploads_as_processing", [], False), - ("mark_uploads_as_processing", [1], True), - ("mark_uploads_as_processing", [1, 2, 3], True), - ("clear_in_progress_uploads", [], False), - ("clear_in_progress_uploads", [1], True), - ("clear_in_progress_uploads", [1, 2, 3], True), - ("mark_uploads_as_merged", [], False), - ("mark_uploads_as_merged", [1], True), - ("mark_uploads_as_merged", [1, 2, 3], True), - ], -) -def test_empty_list_guards_parametrized(method_name, upload_ids, should_call_redis): - """Parametrized test for empty list guards across all methods.""" - with patch("services.processing.state.get_redis_connection") as mock_get_redis: - mock_redis = MagicMock() - mock_get_redis.return_value = mock_redis - - # For clear_in_progress_uploads, srem needs to return a value - mock_redis.srem.return_value = len(upload_ids) if upload_ids else 0 - - state = ProcessingState(1234, uuid4().hex) - state._redis = mock_redis - - # Call the method - method = getattr(state, method_name) - method(upload_ids) - - # Check if Redis was called based on expected behavior - if should_call_redis: - assert mock_redis.method_calls, f"{method_name} should call Redis" - else: - # For empty lists, only get_redis_connection is called (in __init__), - # but no actual operations should happen - assert mock_redis.sadd.call_count == 0, ( - f"{method_name} should not call sadd" - ) - if method_name != "mark_uploads_as_processing": - assert mock_redis.srem.call_count == 0, ( - f"{method_name} should not call srem" - ) + numbers = state.get_upload_numbers() + assert numbers.processing == 0 + assert numbers.processed == 0 + assert should_trigger_postprocessing(numbers) diff --git a/apps/worker/tasks/tests/unit/test_upload_task.py b/apps/worker/tasks/tests/unit/test_upload_task.py index a87e7757b..a5c6a1ac7 100644 --- a/apps/worker/tasks/tests/unit/test_upload_task.py +++ b/apps/worker/tasks/tests/unit/test_upload_task.py @@ -1214,6 +1214,7 @@ def test_upload_task_no_bot( assert commit.message == "" assert commit.parent_commit_id is None mocked_schedule_task.assert_called_with( + dbsession, commit, {"codecov": {"max_report_age": "764y ago"}}, [ @@ -1307,6 +1308,7 @@ def test_upload_task_bot_no_permissions( assert commit.message == "" assert commit.parent_commit_id is None mocked_schedule_task.assert_called_with( + dbsession, commit, {"codecov": {"max_report_age": "764y ago"}}, [ @@ -1418,6 +1420,7 @@ def test_upload_task_bot_unauthorized( .first() ) mocked_schedule_task.assert_called_with( + dbsession, commit, {"codecov": {"max_report_age": "764y ago"}}, [ @@ -1514,6 +1517,7 @@ def fail_if_try_to_create_upload(*args, **kwargs): assert commit.parent_commit_id is None assert commit.report is not None mocked_schedule_task.assert_called_with( + dbsession, commit, {"codecov": {"max_report_age": "764y ago"}}, [ @@ -1572,6 +1576,7 @@ def test_schedule_task_with_one_task( redis_connection=mock_redis, ) result = UploadTask().schedule_task( + dbsession, commit, commit_yaml, [{"upload_id": 1, "upload_pk": 1}], diff --git a/apps/worker/tasks/upload.py b/apps/worker/tasks/upload.py index fa7460916..6cb34e47f 100644 --- a/apps/worker/tasks/upload.py +++ b/apps/worker/tasks/upload.py @@ -585,6 +585,7 @@ def run_impl_within_lock( scheduled_tasks_list = [] for chunk in chunks: chunk_scheduled_tasks = self.schedule_task( + db_session, commit, commit_yaml.to_dict(), chunk, @@ -596,6 +597,7 @@ def run_impl_within_lock( scheduled_tasks = scheduled_tasks_list else: scheduled_tasks = self.schedule_task( + db_session, commit, commit_yaml.to_dict(), upload_argument_list, @@ -781,6 +783,7 @@ def _fetch_all_repo_flags( def schedule_task( self, + db_session: Session, commit: Commit, commit_yaml: dict, argument_list: list[UploadArguments], @@ -808,6 +811,7 @@ def schedule_task( or commit_report.report_type == ReportType.COVERAGE.value ) return self._schedule_coverage_processing_task( + db_session, commit, commit_yaml, argument_list, @@ -828,6 +832,7 @@ def schedule_task( def _schedule_coverage_processing_task( self, + db_session: Session, commit: Commit, commit_yaml: dict, argument_list: list[UploadArguments], @@ -835,7 +840,7 @@ def _schedule_coverage_processing_task( ): self.maybe_log_upload_checkpoint(UploadFlow.INITIAL_PROCESSING_COMPLETE) - state = ProcessingState(commit.repoid, commit.commitid) + state = ProcessingState(commit.repoid, commit.commitid, db_session) state.mark_uploads_as_processing( [int(upload["upload_id"]) for upload in argument_list] ) diff --git a/apps/worker/tasks/upload_finisher.py b/apps/worker/tasks/upload_finisher.py index fc3eab531..a075d4619 100644 --- a/apps/worker/tasks/upload_finisher.py +++ b/apps/worker/tasks/upload_finisher.py @@ -300,7 +300,7 @@ def run_impl( log.info("run_impl: Got commit") - state = ProcessingState(repoid, commitid) + state = ProcessingState(repoid, commitid, db_session=db_session) # Always reconstruct from state so the finisher covers all uploads for the commit, # not only uploads present in callback payload.