Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
2cea686
feat(worker): make upload_finisher process full commit state
thomasrockhu-codecov Mar 11, 2026
6cab9cc
fix(worker): mark successfully merged uploads as merged
thomasrockhu-codecov Mar 11, 2026
95a8a79
test(worker): remove stale finisher concurrency gate tests
thomasrockhu-codecov Mar 11, 2026
126a0b6
fix(worker): prefer callback payload over reconstructed finisher data
thomasrockhu-codecov Mar 11, 2026
6e3b939
fix(worker): reconstruct all processed uploads for finisher merge
thomasrockhu-codecov Mar 11, 2026
8852b60
refactor(worker): tighten finisher idempotency to merged/error states
thomasrockhu-codecov Mar 11, 2026
b75054e
fix(worker): keep processed state terminal during finisher rollout
thomasrockhu-codecov Mar 11, 2026
4322546
fix(worker): handle unsuccessful results without error payload
thomasrockhu-codecov Mar 11, 2026
b4a0fcd
test(worker): cover update_uploads fallback for missing error payload
thomasrockhu-codecov Mar 11, 2026
c3ff0f9
refactor(worker): simplify finisher scheduling and gate/state respons…
thomasrockhu-codecov Mar 11, 2026
6306ed5
refactor(worker): simplify followup scheduling and bound merge batches
thomasrockhu-codecov Mar 11, 2026
2a0e36e
fix(worker): address PR #756 review thread issues
thomasrockhu-codecov Mar 11, 2026
7dcdac6
fix(worker): restore processing-state task wiring for worker CI
thomasrockhu-codecov Mar 11, 2026
6d43a16
fix: make updates
thomasrockhu-codecov Mar 11, 2026
93f3146
style(worker): apply repo pre-commit formatting
thomasrockhu-codecov Mar 11, 2026
53bc3ab
fix(worker): preserve merged state updates in upload_merger loop
thomasrockhu-codecov Mar 11, 2026
a0cbf7e
fix: make edits
thomasrockhu-codecov Mar 11, 2026
ceb7614
fix: more updates
thomasrockhu-codecov Mar 11, 2026
c48148b
fix(worker): restore missing finisher pipeline changes
thomasrockhu-codecov Mar 11, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions apps/worker/services/processing/merging.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ def update_uploads(

if result["successful"]:
update = {
"state_id": UploadState.PROCESSED.db_id,
"state": "processed",
"state_id": UploadState.MERGED.db_id,
"state": "merged",
}
report = reports.get(upload_id)
if report is not None:
Expand Down
25 changes: 1 addition & 24 deletions apps/worker/services/processing/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,16 @@
from celery.exceptions import CeleryError
from sqlalchemy.orm import Session as DbSession

from app import celery_app
from database.models.core import Commit
from database.models.reports import Upload
from helpers.reports import delete_archive_setting
from services.report import ProcessingError, RawReportInfo, ReportService
from services.report.parser.types import VersionOneParsedRawReport
from shared.api_archive.archive import ArchiveService
from shared.celery_config import upload_finisher_task_name
from shared.yaml import UserYaml

from .intermediate import save_intermediate_report
from .state import ProcessingState, should_trigger_postprocessing
from .state import ProcessingState
from .types import ProcessingResult, UploadArguments

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -71,27 +69,6 @@ def process_upload(
save_intermediate_report(upload_id, processing_result.report)
state.mark_upload_as_processed(upload_id)

# Check if all uploads are now processed and trigger finisher if needed
# This handles the case where a processor task retries outside of a chord
# (e.g., from visibility timeout or task_reject_on_worker_lost)
upload_numbers = state.get_upload_numbers()
if should_trigger_postprocessing(upload_numbers):
log.info(
"All uploads processed, triggering finisher",
extra={
"repo_id": repo_id,
"commit_sha": commit_sha,
"upload_id": upload_id,
},
)
celery_app.tasks[upload_finisher_task_name].apply_async(
kwargs={
"repoid": repo_id,
"commitid": commit_sha,
"commit_yaml": commit_yaml.to_dict(),
}
)

rewrite_or_delete_upload(archive_service, commit_yaml, report_info)

except CeleryError:
Expand Down
7 changes: 1 addition & 6 deletions apps/worker/services/processing/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,7 @@ def mark_uploads_as_merged(self, upload_ids: list[int]):
self._redis.srem(self._redis_key("processed"), *upload_ids)

def get_uploads_for_merging(self) -> set[int]:
return {
int(id)
for id in self._redis.srandmember(
self._redis_key("processed"), MERGE_BATCH_SIZE
)
}
return {int(id) for id in self._redis.smembers(self._redis_key("processed"))}

def _redis_key(self, state: str) -> str:
return f"upload-processing-state/{self.repoid}/{self.commitsha}/{state}"
125 changes: 3 additions & 122 deletions apps/worker/services/tests/test_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,8 @@


@pytest.mark.django_db(databases={"default"})
class TestProcessUploadOrphanedTaskRecovery:
"""
Tests for the orphaned upload recovery mechanism.

When a processor task retries outside of a chord (e.g., from visibility timeout
or task_reject_on_worker_lost), it should trigger the finisher if all uploads
are now processed.
"""

def test_triggers_finisher_when_last_upload_completes(
self, dbsession, mocker, mock_storage
):
"""
Test that finisher is triggered when the last upload completes processing.

This simulates the scenario where:
1. A processor task died before ACK, missing the chord callback
2. Task retries standalone (visibility timeout or worker rejection)
3. Completes successfully and notices all uploads are done
4. Triggers finisher to complete the upload flow
"""
class TestProcessUpload:
def test_processor_does_not_enqueue_finisher(self, dbsession, mocker, mock_storage):
# Setup
repository = RepositoryFactory.create()
commit = CommitFactory.create(repository=repository)
Expand Down Expand Up @@ -73,105 +54,6 @@ def test_triggers_finisher_when_last_upload_completes(
return_value=mock_state_instance,
)

# Mock should_trigger_postprocessing to return True
mocker.patch(
"services.processing.processing.should_trigger_postprocessing",
return_value=True,
)

# Mock celery app to capture finisher task scheduling
mock_celery_app = mocker.patch("services.processing.processing.celery_app")
mock_finisher_task = MagicMock()
mock_celery_app.tasks = {"app.tasks.upload.UploadFinisher": mock_finisher_task}

# Mock other dependencies
mocker.patch("services.processing.processing.save_intermediate_report")
mocker.patch("services.processing.processing.rewrite_or_delete_upload")

commit_yaml = UserYaml({})

# Execute
result = process_upload(
on_processing_error=lambda error: None,
db_session=dbsession,
repo_id=repository.repoid,
commit_sha=commit.commitid,
commit_yaml=commit_yaml,
arguments=arguments,
)

# Verify
assert result["successful"] is True
assert result["upload_id"] == upload.id_

# Verify finisher was triggered
mock_finisher_task.apply_async.assert_called_once_with(
kwargs={
"repoid": repository.repoid,
"commitid": commit.commitid,
"commit_yaml": commit_yaml.to_dict(),
}
)

def test_does_not_trigger_finisher_when_uploads_still_processing(
self, dbsession, mocker, mock_storage
):
"""
Test that finisher is NOT triggered when other uploads are still processing.

This verifies we don't prematurely trigger the finisher when only some
uploads have completed.
"""
# Setup
repository = RepositoryFactory.create()
commit = CommitFactory.create(repository=repository)
upload = UploadFactory.create(
report__commit=commit,
state="started",
)
dbsession.add_all([repository, commit, upload])
dbsession.flush()

arguments: UploadArguments = {
"commit": commit.commitid,
"upload_id": upload.id_,
"version": "v4",
"reportid": str(upload.report.external_id),
}

# Mock dependencies
mock_report_service = mocker.patch(
"services.processing.processing.ReportService"
)
mock_processing_result = MagicMock()
mock_processing_result.error = None
mock_processing_result.report = None
mock_report_service.return_value.build_report_from_raw_content.return_value = (
mock_processing_result
)

# Mock ProcessingState to indicate other uploads still processing
mock_state_instance = MagicMock()
mock_state_instance.get_upload_numbers.return_value = MagicMock(
processing=2,
processed=1, # Other uploads still in progress
)
mocker.patch(
"services.processing.processing.ProcessingState",
return_value=mock_state_instance,
)

# Mock should_trigger_postprocessing to return False
mocker.patch(
"services.processing.processing.should_trigger_postprocessing",
return_value=False,
)

# Mock celery app to capture finisher task scheduling
mock_celery_app = mocker.patch("services.processing.processing.celery_app")
mock_finisher_task = MagicMock()
mock_celery_app.tasks = {"app.tasks.upload.UploadFinisher": mock_finisher_task}

# Mock other dependencies
mocker.patch("services.processing.processing.save_intermediate_report")
mocker.patch("services.processing.processing.rewrite_or_delete_upload")
Expand All @@ -192,5 +74,4 @@ def test_does_not_trigger_finisher_when_uploads_still_processing(
assert result["successful"] is True
assert result["upload_id"] == upload.id_

# Verify finisher was NOT triggered
mock_finisher_task.apply_async.assert_not_called()
# Finisher enqueue is handled elsewhere; processor no longer triggers it directly.
6 changes: 3 additions & 3 deletions apps/worker/services/tests/test_processing_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ def test_batch_merging_many_uploads():
for id in range(1, 12):
state.mark_upload_as_processed(id)

# we have only processed 8 out of 9. we want to do a batched merge
# We now reconstruct all processed uploads in one pass.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ProcessingState test suite broken by API change

Medium Severity

ProcessingState.__init__ was changed in this PR to require db_session as a mandatory third argument, but test_processing_state.py still instantiates ProcessingState(1234, uuid4().hex) everywhere — including in the test_batch_merging_many_uploads test that was modified in this very diff. Every test in the file will raise TypeError at instantiation. Additionally, the TestProcessingStateEmptyListGuards class and the parametrized tests still assert Redis calls (mock_redis.sadd, mock_redis.srem) that are no longer made by the DB-backed implementation.

Fix in Cursor Fix in Web

assert should_perform_merge(state.get_upload_numbers())
merging = state.get_uploads_for_merging()
assert len(merging) == 10 # = MERGE_BATCH_SIZE
assert len(merging) == 11
state.mark_uploads_as_merged(merging)

# but no notifications yet
Expand All @@ -68,7 +68,7 @@ def test_batch_merging_many_uploads():
# with the last upload being processed, we do another merge, and then trigger notifications
assert should_perform_merge(state.get_upload_numbers())
merging = state.get_uploads_for_merging()
assert len(merging) == 2
assert len(merging) == 1
state.mark_uploads_as_merged(merging)

assert should_trigger_postprocessing(state.get_upload_numbers())
Expand Down
58 changes: 50 additions & 8 deletions apps/worker/tasks/tests/unit/test_upload_finisher_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -1068,9 +1068,11 @@ def test_idempotency_check_skips_already_processed_uploads(

# Create uploads that are already in "processed" state
upload_1 = UploadFactory.create(report=report, state="processed")
upload_2 = UploadFactory.create(report=report, state="error")
upload_2 = UploadFactory.create(report=report, state="merged")
upload_3 = UploadFactory.create(report=report, state="error")
dbsession.add(upload_1)
dbsession.add(upload_2)
dbsession.add(upload_3)
dbsession.flush()

# Mock the _process_reports_with_lock to verify it's NOT called
Expand All @@ -1080,7 +1082,8 @@ def test_idempotency_check_skips_already_processed_uploads(

previous_results = [
{"upload_id": upload_1.id, "successful": True, "arguments": {}},
{"upload_id": upload_2.id, "successful": False, "arguments": {}},
{"upload_id": upload_2.id, "successful": True, "arguments": {}},
{"upload_id": upload_3.id, "successful": False, "arguments": {}},
]

result = UploadFinisherTask().run_impl(
Expand All @@ -1094,7 +1097,7 @@ def test_idempotency_check_skips_already_processed_uploads(
# Verify that the finisher skipped all work
assert result == {
"already_completed": True,
"upload_ids": [upload_1.id, upload_2.id],
"upload_ids": [upload_1.id, upload_2.id, upload_3.id],
}

# Verify that _process_reports_with_lock was NOT called
Expand Down Expand Up @@ -1249,6 +1252,45 @@ def test_reconstruct_processing_results_returns_empty_when_no_uploads_found(
# Verify empty list returned when no uploads found
assert result == []

@pytest.mark.django_db
def test_run_impl_uses_reconstructed_results_beyond_callback_payload(
self, dbsession, mocker, mock_self_app
):
commit = CommitFactory.create()
dbsession.add(commit)
dbsession.flush()

reconstructed = [
{"upload_id": 101, "successful": True, "arguments": {}},
{"upload_id": 202, "successful": True, "arguments": {}},
]
mocker.patch.object(
UploadFinisherTask,
"_reconstruct_processing_results",
return_value=reconstructed,
)
mock_process = mocker.patch.object(
UploadFinisherTask, "_process_reports_with_lock"
)
mock_handle_finisher_lock = mocker.patch.object(
UploadFinisherTask,
"_handle_finisher_lock",
return_value={"notifications_called": False},
)

UploadFinisherTask().run_impl(
dbsession,
[{"upload_id": 101, "successful": True, "arguments": {}}],
repoid=commit.repoid,
commitid=commit.commitid,
commit_yaml={},
)

mock_process.assert_called_once()
processing_results = mock_process.call_args.args[3]
assert {result["upload_id"] for result in processing_results} == {101, 202}
mock_handle_finisher_lock.assert_called_once()

@pytest.mark.django_db
def test_coverage_notifications_not_blocked_by_test_results_uploads(
self,
Expand Down Expand Up @@ -1326,7 +1368,7 @@ def mock_process_reports(
upload_ids,
state,
):
# Call update_uploads to update the upload state to PROCESSED
# Call update_uploads to update the upload state to MERGED
# This uses the same SQLAlchemy session that the query will use
update_uploads(
db_session,
Expand All @@ -1346,11 +1388,11 @@ def mock_process_reports(
.first()
)
assert updated_upload is not None, "Upload should exist"
assert updated_upload.state == "processed", (
f"Upload state should be 'processed', got '{updated_upload.state}'"
assert updated_upload.state == "merged", (
f"Upload state should be 'merged', got '{updated_upload.state}'"
)
assert updated_upload.state_id == UploadState.PROCESSED.db_id, (
f"Upload state_id should be {UploadState.PROCESSED.db_id}, got {updated_upload.state_id}"
assert updated_upload.state_id == UploadState.MERGED.db_id, (
f"Upload state_id should be {UploadState.MERGED.db_id}, got {updated_upload.state_id}"
)

mock_process = mocker.patch.object(
Expand Down
40 changes: 23 additions & 17 deletions apps/worker/tasks/upload_finisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,22 +265,27 @@ def run_impl(

state = ProcessingState(repoid, commitid)

# If processing_results not provided (e.g., from orphaned upload recovery),
# reconstruct it from ProcessingState to ensure ALL uploads are included
if processing_results is None:
log.info(
"run_impl: processing_results not provided, reconstructing from ProcessingState"
)
processing_results = self._reconstruct_processing_results(
db_session, state, commit
)
log.info(
"run_impl: Reconstructed processing results",
extra={
"upload_count": len(processing_results),
"upload_ids": [r["upload_id"] for r in processing_results],
},
)
# Always reconstruct from state so the finisher covers all uploads for the commit,
# not only uploads present in callback payload.
reconstructed_results = self._reconstruct_processing_results(
db_session, state, commit
)
processing_results_by_id = {
result["upload_id"]: result for result in reconstructed_results
}
# Prefer callback payload when both contain the same upload_id because
# callback data is produced directly by the processor and can be newer.
processing_results_by_id.update(
{result["upload_id"]: result for result in (processing_results or [])}
)
processing_results = list(processing_results_by_id.values())
log.info(
"run_impl: Reconstructed processing results",
extra={
"upload_count": len(processing_results),
"upload_ids": [r["upload_id"] for r in processing_results],
},
)

upload_ids = [upload["upload_id"] for upload in processing_results]

Expand All @@ -294,7 +299,8 @@ def run_impl(
# Only skip if ALL uploads exist in DB and ALL are in final states
if len(uploads_in_db) == len(upload_ids):
all_already_processed = all(
upload.state in ("processed", "error") for upload in uploads_in_db
upload.state in ("processed", "merged", "error")
for upload in uploads_in_db
)
if all_already_processed:
log.info(
Expand Down
Loading