diff --git a/apps/worker/services/processing/processing.py b/apps/worker/services/processing/processing.py index c8283a265..97832ccb0 100644 --- a/apps/worker/services/processing/processing.py +++ b/apps/worker/services/processing/processing.py @@ -74,24 +74,29 @@ def process_upload( # Safety net: trigger finisher if all uploads are done. # Handles retries outside a chord (visibility timeout, task_reject_on_worker_lost). # Will be replaced by gate key mechanism in a future PR. - # Use Redis-only state for counting (dual-write keeps Redis in sync). - redis_state = ProcessingState(repo_id, commit_sha) - upload_numbers = redis_state.get_upload_numbers() - if should_trigger_postprocessing(upload_numbers): - log.info( - "All uploads processed, triggering finisher", - extra={ - "repo_id": repo_id, - "commit_sha": commit_sha, - "upload_id": upload_id, - }, - ) - celery_app.tasks[upload_finisher_task_name].apply_async( - kwargs={ - "repoid": repo_id, - "commitid": commit_sha, - "commit_yaml": commit_yaml.to_dict(), - } + try: + upload_numbers = state.get_upload_numbers() + if should_trigger_postprocessing(upload_numbers): + log.info( + "All uploads processed, triggering finisher", + extra={ + "repo_id": repo_id, + "commit_sha": commit_sha, + "upload_id": upload_id, + }, + ) + celery_app.tasks[upload_finisher_task_name].apply_async( + kwargs={ + "repoid": repo_id, + "commitid": commit_sha, + "commit_yaml": commit_yaml.to_dict(), + } + ) + except Exception: + log.warning( + "Safety-net finisher trigger failed (non-fatal)", + extra={"repo_id": repo_id, "commit_sha": commit_sha}, + exc_info=True, ) rewrite_or_delete_upload(archive_service, commit_yaml, report_info) diff --git a/apps/worker/services/processing/state.py b/apps/worker/services/processing/state.py index 2acc7a21d..397bb445f 100644 --- a/apps/worker/services/processing/state.py +++ b/apps/worker/services/processing/state.py @@ -21,6 +21,7 @@ "intermediate report". """ +import logging from dataclasses import dataclass from sqlalchemy import case, func @@ -29,16 +30,12 @@ from database.enums import ReportType from database.models.core import Commit from database.models.reports import CommitReport, Upload -from shared.helpers.redis import get_redis_connection from shared.metrics import Counter from shared.reports.enums import UploadState -MERGE_BATCH_SIZE = 10 +log = logging.getLogger(__name__) -# TTL for processing state keys in Redis (24 hours, matches intermediate report TTL) -# This prevents state keys from accumulating indefinitely and ensures consistency -# with intermediate report expiration -PROCESSING_STATE_TTL = 24 * 60 * 60 +MERGE_BATCH_SIZE = 10 CLEARED_UPLOADS = Counter( "worker_processing_cleared_uploads", @@ -82,65 +79,59 @@ def should_trigger_postprocessing(uploads: UploadNumbers) -> bool: class ProcessingState: - def __init__( - self, repoid: int, commitsha: str, db_session: Session | None = None - ) -> None: - self._redis = get_redis_connection() + def __init__(self, repoid: int, commitsha: str, db_session: Session) -> None: self.repoid = repoid self.commitsha = commitsha self._db_session = db_session def get_upload_numbers(self): - if self._db_session: - row = ( - self._db_session.query( - func.count( - case( - ( - Upload.state_id == UploadState.UPLOADED.db_id, - Upload.id_, - ), - ) - ), - func.count( - case( - ( - Upload.state_id == UploadState.PROCESSED.db_id, - Upload.id_, - ), - ) - ), - ) - .join(CommitReport, Upload.report_id == CommitReport.id_) - .join(Commit, CommitReport.commit_id == Commit.id_) - .filter( - Commit.repoid == self.repoid, - Commit.commitid == self.commitsha, - (CommitReport.report_type == None) # noqa: E711 - | (CommitReport.report_type == ReportType.COVERAGE.value), - ) - .one() + row = ( + self._db_session.query( + func.count( + case( + ( + Upload.state_id == UploadState.UPLOADED.db_id, + Upload.id_, + ), + ) + ), + func.count( + case( + ( + Upload.state_id == UploadState.PROCESSED.db_id, + Upload.id_, + ), + ) + ), ) - return UploadNumbers(processing=row[0], processed=row[1]) - - processing = self._redis.scard(self._redis_key("processing")) - processed = self._redis.scard(self._redis_key("processed")) - return UploadNumbers(processing, processed) + .join(CommitReport, Upload.report_id == CommitReport.id_) + .join(Commit, CommitReport.commit_id == Commit.id_) + .filter( + Commit.repoid == self.repoid, + Commit.commitid == self.commitsha, + (CommitReport.report_type == None) # noqa: E711 + | (CommitReport.report_type == ReportType.COVERAGE.value), + ) + .one() + ) + return UploadNumbers(processing=row[0], processed=row[1]) def mark_uploads_as_processing(self, upload_ids: list[int]): - if not upload_ids: - return - key = self._redis_key("processing") - self._redis.sadd(key, *upload_ids) - self._redis.expire(key, PROCESSING_STATE_TTL) + # No-op: uploads are created with state_id=UPLOADED, which + # get_upload_numbers() already counts as "processing". + pass def clear_in_progress_uploads(self, upload_ids: list[int]): if not upload_ids: return - if self._db_session: - # Mark still-UPLOADED uploads as ERROR so they stop being counted - # as "processing" in get_upload_numbers(). Only matches UPLOADED -- - # already-PROCESSED uploads (success path) are unaffected. + # Mark still-UPLOADED uploads as ERROR so they stop being counted + # as "processing" in get_upload_numbers(). Only matches UPLOADED -- + # already-PROCESSED uploads (success path) are unaffected. + # + # This runs in a finally block, so the transaction may already be + # in a failed state. Best-effort: log and move on if the DB is + # unreachable — the upload stays UPLOADED, which is safe. + try: updated = ( self._db_session.query(Upload) .filter( @@ -157,72 +148,48 @@ def clear_in_progress_uploads(self, upload_ids: list[int]): ) if updated > 0: CLEARED_UPLOADS.inc(updated) - self._redis.srem(self._redis_key("processing"), *upload_ids) - return - removed_uploads = self._redis.srem(self._redis_key("processing"), *upload_ids) - if removed_uploads > 0: - CLEARED_UPLOADS.inc(removed_uploads) + except Exception: + log.warning( + "Failed to clear in-progress uploads (transaction may be aborted)", + extra={"upload_ids": upload_ids}, + exc_info=True, + ) def mark_upload_as_processed(self, upload_id: int): - if self._db_session: - upload = self._db_session.query(Upload).get(upload_id) - if upload: - upload.state_id = UploadState.PROCESSED.db_id - # Don't set upload.state here -- the finisher's idempotency check - # uses state="processed" to detect already-merged uploads. - # The state string is set by update_uploads() after merging. - - processing_key = self._redis_key("processing") - processed_key = self._redis_key("processed") - - res = self._redis.smove(processing_key, processed_key, upload_id) - if not res: - self._redis.sadd(processed_key, upload_id) - - self._redis.expire(processed_key, PROCESSING_STATE_TTL) + upload = self._db_session.query(Upload).get(upload_id) + if upload: + upload.state_id = UploadState.PROCESSED.db_id + # Don't set upload.state here -- the finisher's idempotency check + # uses state="processed" to detect already-merged uploads. + # The state string is set by update_uploads() after merging. def mark_uploads_as_merged(self, upload_ids: list[int]): if not upload_ids: return - if self._db_session: - self._db_session.query(Upload).filter( - Upload.id_.in_(upload_ids), - Upload.state_id == UploadState.PROCESSED.db_id, - ).update( - { - Upload.state_id: UploadState.MERGED.db_id, - Upload.state: "merged", - }, - synchronize_session="fetch", - ) - self._redis.srem(self._redis_key("processed"), *upload_ids) - return - self._redis.srem(self._redis_key("processed"), *upload_ids) + self._db_session.query(Upload).filter( + Upload.id_.in_(upload_ids), + Upload.state_id == UploadState.PROCESSED.db_id, + ).update( + { + Upload.state_id: UploadState.MERGED.db_id, + Upload.state: "merged", + }, + synchronize_session="fetch", + ) def get_uploads_for_merging(self) -> set[int]: - if self._db_session: - rows = ( - self._db_session.query(Upload.id_) - .join(CommitReport, Upload.report_id == CommitReport.id_) - .join(Commit, CommitReport.commit_id == Commit.id_) - .filter( - Commit.repoid == self.repoid, - Commit.commitid == self.commitsha, - (CommitReport.report_type == None) # noqa: E711 - | (CommitReport.report_type == ReportType.COVERAGE.value), - Upload.state_id == UploadState.PROCESSED.db_id, - ) - .limit(MERGE_BATCH_SIZE) - .all() - ) - return {row[0] for row in rows} - - return { - int(id) - for id in self._redis.srandmember( - self._redis_key("processed"), MERGE_BATCH_SIZE + rows = ( + self._db_session.query(Upload.id_) + .join(CommitReport, Upload.report_id == CommitReport.id_) + .join(Commit, CommitReport.commit_id == Commit.id_) + .filter( + Commit.repoid == self.repoid, + Commit.commitid == self.commitsha, + (CommitReport.report_type == None) # noqa: E711 + | (CommitReport.report_type == ReportType.COVERAGE.value), + Upload.state_id == UploadState.PROCESSED.db_id, ) - } - - def _redis_key(self, state: str) -> str: - return f"upload-processing-state/{self.repoid}/{self.commitsha}/{state}" + .limit(MERGE_BATCH_SIZE) + .all() + ) + return {row[0] for row in rows} diff --git a/apps/worker/services/tests/test_processing_state.py b/apps/worker/services/tests/test_processing_state.py index 3433c534d..d8f96f29b 100644 --- a/apps/worker/services/tests/test_processing_state.py +++ b/apps/worker/services/tests/test_processing_state.py @@ -1,6 +1,3 @@ -from unittest.mock import MagicMock, patch -from uuid import uuid4 - import pytest from database.tests.factories.core import ( @@ -17,191 +14,8 @@ from shared.reports.enums import UploadState -def test_single_upload(): - state = ProcessingState(1234, uuid4().hex) - state.mark_uploads_as_processing([1]) - - state.mark_upload_as_processed(1) - - # this is the only in-progress upload, nothing more to expect - assert should_perform_merge(state.get_upload_numbers()) - - assert state.get_uploads_for_merging() == {1} - state.mark_uploads_as_merged([1]) - - assert should_trigger_postprocessing(state.get_upload_numbers()) - - -def test_concurrent_uploads(): - state = ProcessingState(1234, uuid4().hex) - state.mark_uploads_as_processing([1]) - - state.mark_upload_as_processed(1) - # meanwhile, another upload comes in: - state.mark_uploads_as_processing([2]) - - # not merging/postprocessing yet, as that will be debounced with the second upload - assert not should_perform_merge(state.get_upload_numbers()) - - state.mark_upload_as_processed(2) - - assert should_perform_merge(state.get_upload_numbers()) - - assert state.get_uploads_for_merging() == {1, 2} - state.mark_uploads_as_merged([1, 2]) - - assert should_trigger_postprocessing(state.get_upload_numbers()) - - -def test_batch_merging_many_uploads(): - state = ProcessingState(1234, uuid4().hex) - - state.mark_uploads_as_processing([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]) - - for id in range(1, 12): - state.mark_upload_as_processed(id) - - # we have only processed 8 out of 9. we want to do a batched merge - assert should_perform_merge(state.get_upload_numbers()) - merging = state.get_uploads_for_merging() - assert len(merging) == 10 # = MERGE_BATCH_SIZE - state.mark_uploads_as_merged(merging) - - # but no notifications yet - assert not should_trigger_postprocessing(state.get_upload_numbers()) - - state.mark_upload_as_processed(12) - - # with the last upload being processed, we do another merge, and then trigger notifications - assert should_perform_merge(state.get_upload_numbers()) - merging = state.get_uploads_for_merging() - assert len(merging) == 2 - state.mark_uploads_as_merged(merging) - - assert should_trigger_postprocessing(state.get_upload_numbers()) - - -class TestProcessingStateEmptyListGuards: - """Tests for empty list guards in ProcessingState methods.""" - - @pytest.fixture - def mock_redis(self): - """Create a mock Redis connection.""" - with patch("services.processing.state.get_redis_connection") as mock_get_redis: - mock_redis = MagicMock() - mock_get_redis.return_value = mock_redis - yield mock_redis - - def test_mark_uploads_as_processing_empty_list(self, mock_redis): - """Test that mark_uploads_as_processing handles empty list gracefully.""" - state = ProcessingState(1234, uuid4().hex) - state._redis = mock_redis - - # Should not call Redis when empty list is passed - state.mark_uploads_as_processing([]) - - mock_redis.sadd.assert_not_called() - mock_redis.expire.assert_not_called() - - def test_mark_uploads_as_processing_non_empty_list(self, mock_redis): - """Test that mark_uploads_as_processing works with non-empty list.""" - state = ProcessingState(1234, uuid4().hex) - state._redis = mock_redis - - state.mark_uploads_as_processing([1, 2, 3]) - - mock_redis.sadd.assert_called_once() - mock_redis.expire.assert_called() - - def test_clear_in_progress_uploads_empty_list(self, mock_redis): - """Test that clear_in_progress_uploads handles empty list gracefully.""" - state = ProcessingState(1234, uuid4().hex) - state._redis = mock_redis - - # Should not call Redis when empty list is passed - state.clear_in_progress_uploads([]) - - mock_redis.srem.assert_not_called() - - def test_clear_in_progress_uploads_non_empty_list(self, mock_redis): - """Test that clear_in_progress_uploads works with non-empty list.""" - state = ProcessingState(1234, uuid4().hex) - state._redis = mock_redis - mock_redis.srem.return_value = 3 - - state.clear_in_progress_uploads([1, 2, 3]) - - mock_redis.srem.assert_called_once() - - def test_mark_uploads_as_merged_empty_list(self, mock_redis): - """Test that mark_uploads_as_merged handles empty list gracefully.""" - state = ProcessingState(1234, uuid4().hex) - state._redis = mock_redis - - # Should not call Redis when empty list is passed - state.mark_uploads_as_merged([]) - - mock_redis.srem.assert_not_called() - - def test_mark_uploads_as_merged_non_empty_list(self, mock_redis): - """Test that mark_uploads_as_merged works with non-empty list.""" - state = ProcessingState(1234, uuid4().hex) - state._redis = mock_redis - - state.mark_uploads_as_merged([1, 2, 3]) - - mock_redis.srem.assert_called_once() - - -@pytest.mark.parametrize( - "method_name,upload_ids,should_call_redis", - [ - ("mark_uploads_as_processing", [], False), - ("mark_uploads_as_processing", [1], True), - ("mark_uploads_as_processing", [1, 2, 3], True), - ("clear_in_progress_uploads", [], False), - ("clear_in_progress_uploads", [1], True), - ("clear_in_progress_uploads", [1, 2, 3], True), - ("mark_uploads_as_merged", [], False), - ("mark_uploads_as_merged", [1], True), - ("mark_uploads_as_merged", [1, 2, 3], True), - ], -) -def test_empty_list_guards_parametrized(method_name, upload_ids, should_call_redis): - """Parametrized test for empty list guards across all methods.""" - with patch("services.processing.state.get_redis_connection") as mock_get_redis: - mock_redis = MagicMock() - mock_get_redis.return_value = mock_redis - - # For clear_in_progress_uploads, srem needs to return a value - mock_redis.srem.return_value = len(upload_ids) if upload_ids else 0 - - state = ProcessingState(1234, uuid4().hex) - state._redis = mock_redis - - # Call the method - method = getattr(state, method_name) - method(upload_ids) - - # Check if Redis was called based on expected behavior - if should_call_redis: - assert mock_redis.method_calls, f"{method_name} should call Redis" - else: - # For empty lists, only get_redis_connection is called (in __init__), - # but no actual operations should happen - assert mock_redis.sadd.call_count == 0, ( - f"{method_name} should not call sadd" - ) - if method_name != "mark_uploads_as_processing": - assert mock_redis.srem.call_count == 0, ( - f"{method_name} should not call srem" - ) - - @pytest.mark.django_db(databases={"default"}) -class TestProcessingStateDBPath: - """Tests for the DB-backed path of ProcessingState (when db_session is provided).""" - +class TestProcessingState: @pytest.fixture def setup_commit(self, dbsession): repository = RepositoryFactory.create() @@ -227,7 +41,7 @@ def _create_upload(self, dbsession, report, state_id): def test_get_upload_numbers_empty(self, dbsession, setup_commit): _, commit, _ = setup_commit - state = ProcessingState(commit.repoid, commit.commitid, db_session=dbsession) + state = ProcessingState(commit.repoid, commit.commitid, dbsession) numbers = state.get_upload_numbers() assert numbers.processing == 0 assert numbers.processed == 0 @@ -238,7 +52,7 @@ def test_get_upload_numbers_with_uploads(self, dbsession, setup_commit): self._create_upload(dbsession, report, UploadState.UPLOADED.db_id) self._create_upload(dbsession, report, UploadState.PROCESSED.db_id) - state = ProcessingState(commit.repoid, commit.commitid, db_session=dbsession) + state = ProcessingState(commit.repoid, commit.commitid, dbsession) numbers = state.get_upload_numbers() assert numbers.processing == 2 assert numbers.processed == 1 @@ -247,7 +61,7 @@ def test_mark_upload_as_processed(self, dbsession, setup_commit): _, commit, report = setup_commit upload = self._create_upload(dbsession, report, UploadState.UPLOADED.db_id) - state = ProcessingState(commit.repoid, commit.commitid, db_session=dbsession) + state = ProcessingState(commit.repoid, commit.commitid, dbsession) state.mark_upload_as_processed(upload.id_) dbsession.flush() @@ -262,7 +76,7 @@ def test_mark_uploads_as_merged(self, dbsession, setup_commit): u1 = self._create_upload(dbsession, report, UploadState.PROCESSED.db_id) u2 = self._create_upload(dbsession, report, UploadState.PROCESSED.db_id) - state = ProcessingState(commit.repoid, commit.commitid, db_session=dbsession) + state = ProcessingState(commit.repoid, commit.commitid, dbsession) state.mark_uploads_as_merged([u1.id_, u2.id_]) dbsession.flush() @@ -275,7 +89,7 @@ def test_mark_uploads_as_merged(self, dbsession, setup_commit): def test_mark_uploads_as_merged_empty_list(self, dbsession, setup_commit): _, commit, _ = setup_commit - state = ProcessingState(commit.repoid, commit.commitid, db_session=dbsession) + state = ProcessingState(commit.repoid, commit.commitid, dbsession) state.mark_uploads_as_merged([]) def test_get_uploads_for_merging(self, dbsession, setup_commit): @@ -285,7 +99,7 @@ def test_get_uploads_for_merging(self, dbsession, setup_commit): self._create_upload(dbsession, report, UploadState.UPLOADED.db_id) self._create_upload(dbsession, report, UploadState.MERGED.db_id) - state = ProcessingState(commit.repoid, commit.commitid, db_session=dbsession) + state = ProcessingState(commit.repoid, commit.commitid, dbsession) merging = state.get_uploads_for_merging() assert merging == {u1.id_, u2.id_} @@ -294,7 +108,7 @@ def test_get_uploads_for_merging_respects_batch_size(self, dbsession, setup_comm for _ in range(15): self._create_upload(dbsession, report, UploadState.PROCESSED.db_id) - state = ProcessingState(commit.repoid, commit.commitid, db_session=dbsession) + state = ProcessingState(commit.repoid, commit.commitid, dbsession) merging = state.get_uploads_for_merging() assert len(merging) == 10 @@ -302,7 +116,7 @@ def test_mark_uploads_as_processing_is_noop(self, dbsession, setup_commit): _, commit, report = setup_commit upload = self._create_upload(dbsession, report, UploadState.UPLOADED.db_id) - state = ProcessingState(commit.repoid, commit.commitid, db_session=dbsession) + state = ProcessingState(commit.repoid, commit.commitid, dbsession) state.mark_uploads_as_processing([upload.id_]) dbsession.refresh(upload) @@ -315,7 +129,7 @@ def test_clear_in_progress_uploads_sets_error_on_uploaded( _, commit, report = setup_commit upload = self._create_upload(dbsession, report, UploadState.UPLOADED.db_id) - state = ProcessingState(commit.repoid, commit.commitid, db_session=dbsession) + state = ProcessingState(commit.repoid, commit.commitid, dbsession) state.clear_in_progress_uploads([upload.id_]) dbsession.refresh(upload) @@ -327,19 +141,24 @@ def test_clear_in_progress_uploads_skips_processed(self, dbsession, setup_commit _, commit, report = setup_commit upload = self._create_upload(dbsession, report, UploadState.PROCESSED.db_id) - state = ProcessingState(commit.repoid, commit.commitid, db_session=dbsession) + state = ProcessingState(commit.repoid, commit.commitid, dbsession) state.clear_in_progress_uploads([upload.id_]) dbsession.refresh(upload) assert upload.state_id == UploadState.PROCESSED.db_id + def test_clear_in_progress_uploads_empty_list(self, dbsession, setup_commit): + _, commit, _ = setup_commit + state = ProcessingState(commit.repoid, commit.commitid, dbsession) + state.clear_in_progress_uploads([]) + def test_full_lifecycle(self, dbsession, setup_commit): """End-to-end: UPLOADED -> PROCESSED -> MERGED with DB state.""" _, commit, report = setup_commit u1 = self._create_upload(dbsession, report, UploadState.UPLOADED.db_id) u2 = self._create_upload(dbsession, report, UploadState.UPLOADED.db_id) - state = ProcessingState(commit.repoid, commit.commitid, db_session=dbsession) + state = ProcessingState(commit.repoid, commit.commitid, dbsession) numbers = state.get_upload_numbers() assert numbers.processing == 2 diff --git a/apps/worker/tasks/tests/unit/test_upload_task.py b/apps/worker/tasks/tests/unit/test_upload_task.py index 6be8e6f5a..aafe2afbb 100644 --- a/apps/worker/tasks/tests/unit/test_upload_task.py +++ b/apps/worker/tasks/tests/unit/test_upload_task.py @@ -1245,6 +1245,7 @@ def test_upload_task_no_bot( assert commit.message == "" assert commit.parent_commit_id is None mocked_schedule_task.assert_called_with( + dbsession, commit, {"codecov": {"max_report_age": "764y ago"}}, [ @@ -1338,6 +1339,7 @@ def test_upload_task_bot_no_permissions( assert commit.message == "" assert commit.parent_commit_id is None mocked_schedule_task.assert_called_with( + dbsession, commit, {"codecov": {"max_report_age": "764y ago"}}, [ @@ -1449,6 +1451,7 @@ def test_upload_task_bot_unauthorized( .first() ) mocked_schedule_task.assert_called_with( + dbsession, commit, {"codecov": {"max_report_age": "764y ago"}}, [ @@ -1545,6 +1548,7 @@ def fail_if_try_to_create_upload(*args, **kwargs): assert commit.parent_commit_id is None assert commit.report is not None mocked_schedule_task.assert_called_with( + dbsession, commit, {"codecov": {"max_report_age": "764y ago"}}, [ @@ -1601,6 +1605,7 @@ def test_schedule_task_with_one_task( redis_connection=mock_redis, ) result = UploadTask().schedule_task( + dbsession, commit, commit_yaml, [{"upload_id": 1, "upload_pk": 1}], diff --git a/apps/worker/tasks/upload.py b/apps/worker/tasks/upload.py index d91b6186b..729d92098 100644 --- a/apps/worker/tasks/upload.py +++ b/apps/worker/tasks/upload.py @@ -585,6 +585,7 @@ def run_impl_within_lock( scheduled_tasks_list = [] for chunk in chunks: chunk_scheduled_tasks = self.schedule_task( + db_session, commit, commit_yaml.to_dict(), chunk, @@ -596,6 +597,7 @@ def run_impl_within_lock( scheduled_tasks = scheduled_tasks_list else: scheduled_tasks = self.schedule_task( + db_session, commit, commit_yaml.to_dict(), upload_argument_list, @@ -781,6 +783,7 @@ def _fetch_all_repo_flags( def schedule_task( self, + db_session: Session, commit: Commit, commit_yaml: dict, argument_list: list[UploadArguments], @@ -808,6 +811,7 @@ def schedule_task( or commit_report.report_type == ReportType.COVERAGE.value ) return self._schedule_coverage_processing_task( + db_session, commit, commit_yaml, argument_list, @@ -828,6 +832,7 @@ def schedule_task( def _schedule_coverage_processing_task( self, + db_session: Session, commit: Commit, commit_yaml: dict, argument_list: list[UploadArguments], @@ -835,7 +840,7 @@ def _schedule_coverage_processing_task( ): self.maybe_log_upload_checkpoint(UploadFlow.INITIAL_PROCESSING_COMPLETE) - state = ProcessingState(commit.repoid, commit.commitid) + state = ProcessingState(commit.repoid, commit.commitid, db_session) state.mark_uploads_as_processing( [int(upload["upload_id"]) for upload in argument_list] )