feat(worker): DB-backed ProcessingState (replaces Redis)#749
Conversation
Codecov Report❌ Patch coverage is Please upload reports for the commit eb9c82e to get more accurate results.
Additional details and impacted files@@ Coverage Diff @@
## tomhu/finisher-source-of-truth #749 +/- ##
==================================================================
- Coverage 92.25% 92.21% -0.05%
==================================================================
Files 1304 1304
Lines 47973 47909 -64
Branches 1628 1628
==================================================================
- Hits 44259 44179 -80
- Misses 3405 3421 +16
Partials 309 309
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
Codecov Report❌ Patch coverage is
📢 Thoughts on this report? Let us know! |
Add an optional db_session parameter to ProcessingState. When provided, all methods use DB queries (Upload.state_id) instead of Redis sets. When omitted, behavior is unchanged (Redis path). DB-backed methods: - get_upload_numbers: COUNT by state_id (UPLOADED=processing, PROCESSED=processed) - mark_upload_as_processed: UPDATE state_id to PROCESSED - mark_uploads_as_merged: UPDATE state_id to MERGED - get_uploads_for_merging: SELECT WHERE state_id=PROCESSED LIMIT batch_size - mark_uploads_as_processing / clear_in_progress_uploads: no-op (DB path) No callers change in this PR -- this is a pure capability addition. Made-with: Cursor
Activate the DB-backed state path in process_upload() by passing db_session to ProcessingState. The processor now writes PROCESSED state to the database instead of Redis. Also removes the should_trigger_postprocessing check and direct finisher triggering from the processor -- this orphaned-task recovery will be replaced by the gate key mechanism in a later PR. Made-with: Cursor
The DB-backed path was skipping Redis writes, but the finisher still reads from Redis. Keep writing to both until the finisher migrates to DB-backed state in a later PR. Made-with: Cursor
When db_session is present, both the DB path and the Redis fall-through path were incrementing CLEARED_UPLOADS. Move the Redis srem inside the DB block and return early so the metric is only counted once. Made-with: Cursor
1. Add Redis srem inside the DB block so stale entries in the Redis "processed" set are cleaned up during dual-write. 2. Add PROCESSED state filter to prevent accidentally overwriting ERROR-state uploads. Made-with: Cursor
Change update_uploads() to write state_id=MERGED, state="merged" for successful uploads instead of PROCESSED. This completes the semantic distinction: PROCESSED means "processor done, waiting for merge" while MERGED means "incorporated into the master report." Safe because the finisher's idempotency check already recognizes the "merged" state (done in the previous PR). Made-with: Cursor
- Make db_session a required parameter on ProcessingState - Remove all Redis operations (sadd, srem, smove, scard, srandmember) - Remove PROCESSING_STATE_TTL, _redis_key(), get_redis_connection import - mark_uploads_as_processing is now an explicit no-op (uploads already exist with state_id=UPLOADED which get_upload_numbers counts) - Pass db_session through upload.py schedule_task chain - Remove redis_state workaround in processing.py safety-net trigger, reuse the existing DB-backed state instance - Remove all Redis-only and Redis-mock unit tests Made-with: Cursor
This runs in a finally block where the DB transaction may already be in a failed state. Wrap in try/except so it doesn't mask the original error. The upload stays UPLOADED, which is safe. Made-with: Cursor
Don't rely on the task framework's finally cleanup to persist the MERGED state — commit immediately after the update. Made-with: Cursor
9726587 to
2d7ec3f
Compare
| class ProcessingState: | ||
| def __init__(self, repoid: int, commitsha: str) -> None: | ||
| self._redis = get_redis_connection() | ||
| def __init__(self, repoid: int, commitsha: str, db_session: Session) -> None: |
There was a problem hiding this comment.
Bug: The call to ProcessingState() in upload_finisher.py is missing the required db_session argument, which will cause a TypeError at runtime.
Severity: CRITICAL
Suggested Fix
Update the ProcessingState instantiation in upload_finisher.py to pass the db_session argument, which is available in the run_impl method's scope. The call should be ProcessingState(repoid, commitid, db_session).
Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.
Location: apps/worker/services/processing/state.py#L82
Potential issue: The `ProcessingState.__init__` method was updated to require a
`db_session` argument. However, the instantiation of `ProcessingState` in
`upload_finisher.py` (line 303) was not updated to pass this required argument. The
`db_session` is available in the scope of the calling `run_impl` method. This omission
will cause a `TypeError` every time the `UploadFinisherTask` is executed, which will
crash the task and block the entire coverage report finalization pipeline.
Update upload_finisher to construct ProcessingState with db_session after DB-only state migration so reconstruction and merge readiness checks use the new required interface. Made-with: Cursor
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 2 potential issues.
Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
| assert upload.state_id == UploadState.PROCESSED.db_id | ||
| # state string is not updated by the processor -- the finisher sets it | ||
| # after merging (to avoid triggering the finisher's idempotency check early) | ||
| assert upload.state == "started" |
There was a problem hiding this comment.
Test assertion contradicts mock making test always fail
High Severity
The new assertions at lines 91–95 expect upload.state_id == UploadState.PROCESSED.db_id, but ProcessingState is fully mocked at line 56, making mark_upload_as_processed a no-op MagicMock. The upload is created with state_id=UploadState.UPLOADED.db_id (value 1), and nothing changes it, so the assertion comparing against PROCESSED.db_id (value 2) will always raise AssertionError.
Additional Locations (1)
| celery_app.tasks[upload_finisher_task_name].apply_async( | ||
| kwargs=finisher_kwargs | ||
| ) | ||
|
|
There was a problem hiding this comment.
No DB commit before dispatching async finisher task
Medium Severity
mark_upload_as_processed sets state_id=PROCESSED on the ORM object but the transaction is never committed before the finisher is dispatched via apply_async(). The finisher runs in a separate DB session and cannot see uncommitted changes. With the previous Redis-based approach, smove was immediately visible cross-process. The finisher's get_uploads_for_merging() may find zero PROCESSED uploads, falling through to a legacy fallback path.
Additional Locations (1)
eb9c82e
into
tomhu/finisher-source-of-truth


Summary
Replaces Redis-based upload processing state tracking with the database as the single source of truth.
ProcessingStatenow reads and writesUpload.state_iddirectly, eliminating Redis sets for processing/processed/merged state.Changes by layer
UploadState.MERGEDenum — new state (db_id=6) representing uploads fully merged into the master reportProcessingStateDB-backed queries —get_upload_numbersandget_uploads_for_merginguseUpload.state_idcounts/filters, scoped to coverage reports (report_type IS NULL OR report_type = 'coverage')process_uploadpassesdb_sessiontoProcessingState. Onlystate_idis set toPROCESSED(not the legacystatestring) to avoid tripping the finisher's idempotency checkdb_session, reads merge candidates from DB.mark_uploads_as_mergedincludes aPROCESSEDstate guardupdate_uploadssetsstate="merged"andstate_id=MERGEDafter successful mergedb_sessionis now required onProcessingState. All Redis operations removed.mark_uploads_as_processingis a no-op (uploads start asUPLOADED). Safety-net finisher trigger andclear_in_progress_uploadsare resilient to transaction failuresKey design decisions
statevsstate_idseparation: The processor only setsstate_id. The legacystatestring is set after merging, preserving the finisher's idempotency checkreport_type IS NULL OR report_type = 'coverage'to avoid interfering with bundle analysis / test results pipelinesclear_in_progress_uploadsand the safety-net finisher trigger are wrapped in try/except since they run in error-recovery paths where the transaction may be abortedTest plan
ProcessingStateDB pathsschedule_tasksignatureprocess_uploadNote
Medium Risk
Replaces Redis-based processing/merging state with database queries and state transitions, affecting core upload/merge orchestration and finisher triggering. Risk is mitigated by scoping DB queries to coverage reports and adding extensive unit tests, but regressions could block merges or notifications if state transitions are wrong.
Overview
Upload processing state is now DB-backed instead of Redis-backed.
ProcessingStatenow requires a SQLAlchemydb_sessionand derives processing/processed counts and merge candidates fromUpload.state_id(coverage-only:report_type IS NULL OR coverage).The processor and finisher were updated to pass
db_sessionand to transition uploads viastate_id(UPLOADED → PROCESSED → MERGED), withmark_uploads_as_processingbecoming a no-op andclear_in_progress_uploadsbest-effort marking stuckUPLOADEDuploads asERROR.UploadTask.schedule_tasknow takesdb_sessionso coverage scheduling can initializeProcessingState, and new/updated tests assert the DB-driven lifecycle and merge/error outcomes.Written by Cursor Bugbot for commit eb9c82e. This will update automatically on new commits. Configure here.