feat(worker): schedule one upload_finisher via redis gate#757
Conversation
bebfe63 to
c07692a
Compare
c07692a to
fea2836
Compare
73b8ff6 to
44de224
Compare
44de224 to
bf5da1c
Compare
5651fd6 to
b508cb4
Compare
7e12f77 to
c1c5f19
Compare
c1c5f19 to
e9dbaec
Compare
Use a commit-level SET NX gate to enqueue at most one upload_finisher task from processors and remove chord callback fanout in coverage scheduling. Made-with: Cursor
Only attempt finisher enqueue when merge conditions are met, and normalize schedule_task return handling so non-coverage task paths no longer call as_tuple on AsyncResult. Made-with: Cursor
Restore processor-side gate acquisition behavior and use explicit kwargs dispatch for coverage processor apply_async to keep task scheduling and tests consistent. Made-with: Cursor
Return processor task IDs as tuples directly and simplify task-id normalization logic to avoid a single-use wrapper class. Made-with: Cursor
…ad task" This reverts commit 215847c.
…oad task" This reverts commit e9cecc7.
Restore the cleanup branch state and normalize scheduled task IDs through an as_tuple path when available before applying generic fallbacks. Made-with: Cursor
Remove the helper wrapper for scheduled task ID normalization and use direct as_tuple/id handling at call sites for clearer control flow. Made-with: Cursor
Keep scheduled_tasks_list and scheduled_tasks handling in the original call-site shape while preserving direct as_tuple/id normalization without the removed helper. Made-with: Cursor
Use the same direct as_tuple handling as main for chunked and non-chunked scheduled tasks in UploadTask.run_impl. Made-with: Cursor
Use a Celery group result in coverage scheduling so run_impl's as_tuple handling matches main and avoids tuple-as_tuple runtime failures. Made-with: Cursor
Keep direct processor apply_async dispatch while returning a Celery ResultSet so existing as_tuple callsites and tests both work. Made-with: Cursor
9bce158 to
2d657c4
Compare
Keep direct per-upload apply_async calls and return an as_tuple-compatible object of task IDs to match existing run_impl behavior. Made-with: Cursor
Use group(...).apply_async().results to collect processor task IDs while preserving as_tuple-compatible return shape and avoiding per-task direct apply_async dispatch in run path. Made-with: Cursor
Pass finisher kwargs through UploadFlow.save_to_kwargs when processor-side gate enqueues the finisher so downstream flow checkpoint timing can be preserved. Made-with: Cursor
c7a3851 to
0f81b4d
Compare
Propagate checkpoint logger kwargs from processor arguments to finisher without calling UploadFlow.save_to_kwargs in processor context. Made-with: Cursor
…uling Only attempt finisher gate acquisition once all uploads for the commit are eligible to merge, and restore per-task upload processor apply_async kwargs scheduling so existing call-site/test expectations remain intact. Made-with: Cursor
Use _kwargs_key(UploadFlow) for checkpoint passthrough from processor arguments so finisher receives the expected serialized flow payload. Made-with: Cursor
Mock finisher enqueue in upload processing task tests and align gate-exists test expectation with should_perform_merge gating so processor-unit coverage does not depend on task router DB tables. Made-with: Cursor
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
When coverage uploads are still pending, upload_finisher now schedules a delayed sweep instead of exiting, and it clears the commit gate key once finishing work is complete. Made-with: Cursor
Reuse the canonical finisher gate key helper, cap sweep reschedules, preserve flow breadcrumbs during deferred sweeps, and clear gate keys on terminal failures. Made-with: Cursor
Delete the gate and return an accurate sweep_scheduled flag when max sweep attempts are exhausted, so commits are not blocked behind stale gate keys. Made-with: Cursor
Only clear the gate after successful finisher-lock completion so terminal lock failures do not silently drop pending commit post-processing. Made-with: Cursor
Extend the commit gate expiration whenever a sweep is scheduled so the gate remains valid throughout long-running sweep recovery loops. Made-with: Cursor
…ustion Delete the finisher gate even when _handle_finisher_lock returns None (terminal lock exhaustion) and add coverage to prevent gate stalling regressions. Made-with: Cursor
Update process_upload gate tests so redis set assertions match should_perform_merge behavior for both merge-eligible and non-eligible upload counts. Made-with: Cursor
Codecov Report❌ Patch coverage is
❌ Your patch check has failed because the patch coverage (85.24%) is below the target coverage (90.00%). You can increase the patch coverage or adjust the target coverage. Additional details and impacted files@@ Coverage Diff @@
## tomhu/finisher-source-of-truth #757 +/- ##
==================================================================
- Coverage 92.26% 92.25% -0.01%
==================================================================
Files 1304 1304
Lines 47923 47973 +50
Branches 1628 1628
==================================================================
+ Hits 44214 44259 +45
- Misses 3400 3405 +5
Partials 309 309
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
Codecov Report❌ Patch coverage is
📢 Thoughts on this report? Let us know! |
| for task_result in scheduled_tasks | ||
| if task_result and task_result.id | ||
| ] | ||
| return SimpleNamespace(as_tuple=lambda: tuple(scheduled_task_ids)) |
There was a problem hiding this comment.
Can this just be celery group?
| FINISHER_SWEEP_COUNTDOWN_SECONDS = 30 | ||
| FINISHER_MAX_SWEEP_ATTEMPTS = 20 |
There was a problem hiding this comment.
alphabetize
| "commitid": commitid, | ||
| "commit_yaml": commit_yaml.to_dict(), | ||
| "trigger": "sweep", | ||
| "sweep_attempt": sweep_attempt + 1, |
There was a problem hiding this comment.
Sweep attempt shouldn't be necessary. If sweep fails, we have watchdog
|
|
||
| repoid = int(repoid) | ||
| commit_yaml = UserYaml(commit_yaml) | ||
| sweep_attempt = int(kwargs.get("sweep_attempt", 0)) |
There was a problem hiding this comment.
not necessary
| # During rolling deploys, older workers can still persist "processed". | ||
| # Treat it as terminal here to avoid duplicate merges. | ||
| all_already_finalized = all( | ||
| upload.state in ("processed", "merged", "error") |
There was a problem hiding this comment.
let's remove processed here, it won't make sense in the new code paths
| checkpoint_kwargs_key = _kwargs_key(UploadFlow) | ||
| if checkpoint_kwargs_key in arguments: | ||
| finisher_kwargs[checkpoint_kwargs_key] = arguments[ | ||
| checkpoint_kwargs_key | ||
| ] |
There was a problem hiding this comment.
why is this necessary?
| for task_result in scheduled_tasks | ||
| if task_result and task_result.id | ||
| ] | ||
| return SimpleNamespace(as_tuple=lambda: tuple(scheduled_task_ids)) |
There was a problem hiding this comment.
does this have to exist? why not use celery group?
9add94b
into
tomhu/finisher-source-of-truth
Summary
SET NX EX) in processor completion so only one finisher task is enqueued per commitTest plan
ruff check --fix apps/worker/services/processing/processing.py apps/worker/services/tests/test_processing.py apps/worker/tasks/upload.py apps/worker/tasks/tests/unit/test_upload_task.pyruff format apps/worker/services/processing/processing.py apps/worker/services/tests/test_processing.py apps/worker/tasks/upload.py apps/worker/tasks/tests/unit/test_upload_task.pyStacked on #756.
Made with Cursor
Note
Medium Risk
Changes core upload processing/finishing orchestration (Redis gating, task scheduling, and finisher retry behavior), which can affect merge/notification timing and failure modes if the gate or sweep logic is incorrect.
Overview
Adds a commit-level Redis
SET NX EXgate inprocess_uploadso only the first worker that finishes the last needed upload enqueuesupload_finisher, passing through checkpoint kwargs when present.Refactors coverage upload scheduling in
tasks/upload.pyto stop using a Celerychordcallback and insteadapply_asynceachupload_processortask directly, relying on the new gated finisher trigger.Enhances
UploadFinisherTaskto manage the gate lifecycle and handle stragglers: it can reschedule itself as a timed “sweep” when coverage uploads are still pending (up toFINISHER_MAX_SWEEP_ATTEMPTS), and ensures the gate key is deleted on completion, lock exhaustion, timeout, or unexpected errors; tests are updated accordingly.Written by Cursor Bugbot for commit 9add94b. This will update automatically on new commits. Configure here.