Skip to content
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
2cea686
feat(worker): make upload_finisher process full commit state
thomasrockhu-codecov Mar 11, 2026
6cab9cc
fix(worker): mark successfully merged uploads as merged
thomasrockhu-codecov Mar 11, 2026
95a8a79
test(worker): remove stale finisher concurrency gate tests
thomasrockhu-codecov Mar 11, 2026
126a0b6
fix(worker): prefer callback payload over reconstructed finisher data
thomasrockhu-codecov Mar 11, 2026
6e3b939
fix(worker): reconstruct all processed uploads for finisher merge
thomasrockhu-codecov Mar 11, 2026
8852b60
refactor(worker): tighten finisher idempotency to merged/error states
thomasrockhu-codecov Mar 11, 2026
b75054e
fix(worker): keep processed state terminal during finisher rollout
thomasrockhu-codecov Mar 11, 2026
4322546
fix(worker): handle unsuccessful results without error payload
thomasrockhu-codecov Mar 11, 2026
b4a0fcd
test(worker): cover update_uploads fallback for missing error payload
thomasrockhu-codecov Mar 11, 2026
c3ff0f9
refactor(worker): simplify finisher scheduling and gate/state respons…
thomasrockhu-codecov Mar 11, 2026
6306ed5
refactor(worker): simplify followup scheduling and bound merge batches
thomasrockhu-codecov Mar 11, 2026
2a0e36e
fix(worker): address PR #756 review thread issues
thomasrockhu-codecov Mar 11, 2026
7dcdac6
fix(worker): restore processing-state task wiring for worker CI
thomasrockhu-codecov Mar 11, 2026
6d43a16
fix: make updates
thomasrockhu-codecov Mar 11, 2026
93f3146
style(worker): apply repo pre-commit formatting
thomasrockhu-codecov Mar 11, 2026
53bc3ab
fix(worker): preserve merged state updates in upload_merger loop
thomasrockhu-codecov Mar 11, 2026
a0cbf7e
fix: make edits
thomasrockhu-codecov Mar 11, 2026
ceb7614
fix: more updates
thomasrockhu-codecov Mar 11, 2026
c48148b
fix(worker): restore missing finisher pipeline changes
thomasrockhu-codecov Mar 11, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions apps/worker/services/processing/finisher_gate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from shared.helpers.redis import get_redis_connection

FINISHER_GATE_KEY_PREFIX = "upload_merger_lock"
FINISHER_GATE_TTL_SECONDS = 900


def finisher_gate_key(repo_id: int, commit_sha: str) -> str:
return f"{FINISHER_GATE_KEY_PREFIX}_{repo_id}_{commit_sha}"


def try_acquire_finisher_gate(repo_id: int, commit_sha: str) -> bool:
return bool(
get_redis_connection().set(
finisher_gate_key(repo_id, commit_sha),
"1",
nx=True,
ex=FINISHER_GATE_TTL_SECONDS,
)
)


def refresh_finisher_gate_ttl(repo_id: int, commit_sha: str) -> None:
get_redis_connection().expire(
finisher_gate_key(repo_id, commit_sha), FINISHER_GATE_TTL_SECONDS
)


def delete_finisher_gate(repo_id: int, commit_sha: str) -> None:
get_redis_connection().delete(finisher_gate_key(repo_id, commit_sha))
11 changes: 6 additions & 5 deletions apps/worker/services/processing/merging.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from services.yaml.reader import read_yaml_field
from shared.reports.enums import UploadState
from shared.reports.resources import Report, ReportTotals
from shared.upload.constants import UploadErrorCode
from shared.utils.sessions import SessionType
from shared.yaml import UserYaml

Expand Down Expand Up @@ -129,21 +130,21 @@ def update_uploads(

if result["successful"]:
update = {
"state_id": UploadState.PROCESSED.db_id,
"state": "processed",
"state_id": UploadState.MERGED.db_id,
"state": "merged",
}
report = reports.get(upload_id)
if report is not None:
all_totals.append(make_totals(upload_id, report.totals))
elif result["error"]:
else:
update = {
"state_id": UploadState.ERROR.db_id,
"state": "error",
}
error = UploadError(
upload_id=upload_id,
error_code=result["error"]["code"],
error_params=result["error"]["params"],
error_code=result["error"]["code"] if result.get("error") else UploadErrorCode.UNKNOWN_PROCESSING,
error_params=result["error"]["params"] if result.get("error") else {},
)
all_errors.append(error)

Expand Down
43 changes: 24 additions & 19 deletions apps/worker/services/processing/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,18 @@
from database.models.core import Commit
from database.models.reports import Upload
from helpers.reports import delete_archive_setting
from services.processing.finisher_gate import (
finisher_gate_key,
try_acquire_finisher_gate,
)
from services.report import ProcessingError, RawReportInfo, ReportService
from services.report.parser.types import VersionOneParsedRawReport
from shared.api_archive.archive import ArchiveService
from shared.celery_config import upload_finisher_task_name
from shared.yaml import UserYaml

from .intermediate import save_intermediate_report
from .state import ProcessingState, should_trigger_postprocessing
from .state import ProcessingState, should_perform_merge
from .types import ProcessingResult, UploadArguments

log = logging.getLogger(__name__)
Expand All @@ -43,8 +47,8 @@ def process_upload(
upload = db_session.query(Upload).filter_by(id_=upload_id).first()
assert upload

state = ProcessingState(repo_id, commit_sha)
# this in a noop in normal cases, but relevant for task retries:
state = ProcessingState(repo_id, commit_sha, db_session=db_session)
# this is a noop in normal cases, but relevant for task retries:
state.mark_uploads_as_processing([upload_id])
Comment on lines +51 to 52
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: A call is made to state.mark_uploads_as_processing(), but this method was removed from the refactored ProcessingState class, which will cause a runtime AttributeError.
Severity: CRITICAL

Suggested Fix

Remove the call to the non-existent mark_uploads_as_processing method from processing.py and upload.py. The refactoring of ProcessingState to be database-backed made this state-tracking call obsolete, and its removal from the callers will align the code with the new design.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.

Location: apps/worker/services/processing/processing.py#L51-L52

Potential issue: Code in `processing.py` and `upload.py` calls the method
`state.mark_uploads_as_processing()`. However, this method was removed from the
refactored `ProcessingState` class, which is now database-backed instead of
Redis-backed. As a result, any attempt to process an upload will raise an
`AttributeError` because the method does not exist on the `ProcessingState` object. This
will cause the task to crash and completely block the upload processing pipeline.


report_service = ReportService(commit_yaml)
Expand All @@ -70,28 +74,29 @@ def process_upload(
if processing_result.report:
save_intermediate_report(upload_id, processing_result.report)
state.mark_upload_as_processed(upload_id)
db_session.commit()

# Check if all uploads are now processed and trigger finisher if needed
# This handles the case where a processor task retries outside of a chord
# (e.g., from visibility timeout or task_reject_on_worker_lost)
upload_numbers = state.get_upload_numbers()
if should_trigger_postprocessing(upload_numbers):
log.info(
"All uploads processed, triggering finisher",
extra={
"repo_id": repo_id,
"commit_sha": commit_sha,
"upload_id": upload_id,
},
)
celery_app.tasks[upload_finisher_task_name].apply_async(
kwargs={
if should_perform_merge(upload_numbers):
gate_key = finisher_gate_key(repo_id, commit_sha)
if try_acquire_finisher_gate(repo_id, commit_sha):
log.info(
"Enqueuing upload finisher via gate",
extra={
"repo_id": repo_id,
"commit_sha": commit_sha,
"upload_id": upload_id,
"gate_key": gate_key,
},
)
finisher_kwargs = {
"repoid": repo_id,
"commitid": commit_sha,
"commit_yaml": commit_yaml.to_dict(),
}
)

celery_app.tasks[upload_finisher_task_name].apply_async(
kwargs=finisher_kwargs
)
rewrite_or_delete_upload(archive_service, commit_yaml, report_info)

except CeleryError:
Expand Down
180 changes: 124 additions & 56 deletions apps/worker/services/processing/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,21 @@
"intermediate report".
"""

import logging
from dataclasses import dataclass

from shared.helpers.redis import get_redis_connection
from sqlalchemy import case, func
from sqlalchemy.orm import Session

from database.enums import ReportType
from database.models.core import Commit
from database.models.reports import CommitReport, Upload
from shared.metrics import Counter
from shared.reports.enums import UploadState

MERGE_BATCH_SIZE = 10
log = logging.getLogger(__name__)

# TTL for processing state keys in Redis (24 hours, matches intermediate report TTL)
# This prevents state keys from accumulating indefinitely and ensures consistency
# with intermediate report expiration
PROCESSING_STATE_TTL = 24 * 60 * 60
MERGE_BATCH_SIZE = 10

CLEARED_UPLOADS = Counter(
"worker_processing_cleared_uploads",
Expand All @@ -41,9 +45,9 @@

@dataclass
class UploadNumbers:
processing: int
uploaded: int
"""
The number of uploads currently being processed.
The number of uploads that have finished being uploaded.
"""

processed: int
Expand All @@ -60,79 +64,143 @@ def should_perform_merge(uploads: UploadNumbers) -> bool:
This is the case when no more uploads are expected,
or we reached the desired batch size for merging.
"""
return uploads.processing == 0 or uploads.processed >= MERGE_BATCH_SIZE
return uploads.processed > 0


def should_trigger_postprocessing(uploads: UploadNumbers) -> bool:
def should_trigger_postuploaded(uploads: UploadNumbers) -> bool:
"""
Determines whether post-processing steps, such as notifications, etc,
Determines whether post-uploaded steps, such as notifications, etc,
should be performed.

This is the case when no more uploads are expected,
and all the processed uploads have been merged into the "master report".
"""
return uploads.processing == 0 and uploads.processed == 0
return uploads.uploaded == 0 and uploads.processed == 0


class ProcessingState:
def __init__(self, repoid: int, commitsha: str) -> None:
self._redis = get_redis_connection()
def __init__(self, repoid: int, commitsha: str, db_session: Session) -> None:
self.repoid = repoid
self.commitsha = commitsha
self._db_session = db_session

def get_upload_numbers(self):
processing = self._redis.scard(self._redis_key("processing"))
processed = self._redis.scard(self._redis_key("processed"))
return UploadNumbers(processing, processed)

def mark_uploads_as_processing(self, upload_ids: list[int]):
if not upload_ids:
return
key = self._redis_key("processing")
self._redis.sadd(key, *upload_ids)
# Set TTL to match intermediate report expiration (24 hours)
# This ensures state keys don't accumulate indefinitely
self._redis.expire(key, PROCESSING_STATE_TTL)
row = (
self._db_session.query(
func.count(
case(
(
Upload.state_id == UploadState.UPLOADED.db_id,
Upload.id_,
),
)
),
func.count(
case(
(
Upload.state_id == UploadState.PROCESSED.db_id,
Upload.id_,
),
)
),
)
.join(CommitReport, Upload.report_id == CommitReport.id_)
.join(Commit, CommitReport.commit_id == Commit.id_)
.filter(
Commit.repoid == self.repoid,
Commit.commitid == self.commitsha,
(CommitReport.report_type == None) # noqa: E711
| (CommitReport.report_type == ReportType.COVERAGE.value),
)
.one()
)
return UploadNumbers(uploaded=row[0], processed=row[1])

def clear_in_progress_uploads(self, upload_ids: list[int]):
if not upload_ids:
return
removed_uploads = self._redis.srem(self._redis_key("processing"), *upload_ids)
if removed_uploads > 0:
# the normal flow would move the uploads from the "processing" set
# to the "processed" set via `mark_upload_as_processed`.
# this function here is only called in the error case and we don't expect
# this to be triggered often, if at all.
CLEARED_UPLOADS.inc(removed_uploads)
# Mark still-UPLOADED uploads as ERROR so they stop being counted
# as "uploaded" in get_upload_numbers(). Only matches UPLOADED --
# already-PROCESSED uploads (success path) are unaffected.
#
# This runs in a finally block, so the transaction may already be
# in a failed state. Best-effort: log and move on if the DB is
# unreachable — the upload stays UPLOADED, which is safe.
try:
updated = (
self._db_session.query(Upload)
.filter(
Upload.id_.in_(upload_ids),
Upload.state_id == UploadState.UPLOADED.db_id,
)
.update(
{
Upload.state_id: UploadState.ERROR.db_id,
Upload.state: "error",
},
synchronize_session="fetch",
)
)
if updated > 0:
CLEARED_UPLOADS.inc(updated)
except Exception:
log.warning(
"Failed to clear in-progress uploads (transaction may be aborted)",
extra={"upload_ids": upload_ids},
exc_info=True,
)

def mark_upload_as_processed(self, upload_id: int):
processing_key = self._redis_key("processing")
processed_key = self._redis_key("processed")

res = self._redis.smove(processing_key, processed_key, upload_id)
if not res:
# this can happen when `upload_id` was never in the source set,
# which probably is the case during initial deployment as
# the code adding this to the initial set was not deployed yet
# TODO: make sure to remove this code after a grace period
self._redis.sadd(processed_key, upload_id)

# Set TTL on processed key to match intermediate report expiration
# This ensures uploads marked as processed have a bounded lifetime
self._redis.expire(processed_key, PROCESSING_STATE_TTL)
upload = self._db_session.query(Upload).get(upload_id)
if upload:
upload.state_id = UploadState.PROCESSED.db_id
# Don't set upload.state here -- the finisher's idempotency check
# uses state="processed" to detect already-merged uploads.
# The state string is set by update_uploads() after merging.

def mark_uploads_as_merged(self, upload_ids: list[int]):
if not upload_ids:
return
self._redis.srem(self._redis_key("processed"), *upload_ids)
self._db_session.query(Upload).filter(
Upload.id_.in_(upload_ids),
Upload.state_id == UploadState.PROCESSED.db_id,
).update(
{
Upload.state_id: UploadState.MERGED.db_id,
Upload.state: "merged",
},
synchronize_session="fetch",
)
self._db_session.commit()

def get_uploads_for_merging(self) -> set[int]:
return {
int(id)
for id in self._redis.srandmember(
self._redis_key("processed"), MERGE_BATCH_SIZE
rows = (
self._db_session.query(Upload.id_)
.join(CommitReport, Upload.report_id == CommitReport.id_)
.join(Commit, CommitReport.commit_id == Commit.id_)
.filter(
Commit.repoid == self.repoid,
Commit.commitid == self.commitsha,
(CommitReport.report_type == None) # noqa: E711
| (CommitReport.report_type == ReportType.COVERAGE.value),
Upload.state_id == UploadState.PROCESSED.db_id,
)
}

def _redis_key(self, state: str) -> str:
return f"upload-processing-state/{self.repoid}/{self.commitsha}/{state}"
.limit(MERGE_BATCH_SIZE)
.all()
)
return {row[0] for row in rows}

def count_remaining_coverage_uploads(self) -> int:
return (
self._db_session.query(Upload)
.join(CommitReport, Upload.report_id == CommitReport.id_)
.join(Commit, CommitReport.commit_id == Commit.id_)
.filter(
Commit.repoid == self.repoid,
Commit.commitid == self.commitsha,
(CommitReport.report_type == None) # noqa: E711
| (CommitReport.report_type == ReportType.COVERAGE.value),
Upload.state_id == UploadState.UPLOADED.db_id,
)
.count()
)
Loading
Loading