Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
099193f
feat(worker): dedupe upload_finisher scheduling with redis gate
thomasrockhu-codecov Mar 11, 2026
c138e02
fix(worker): gate finisher scheduling and normalize task ids
thomasrockhu-codecov Mar 11, 2026
038fc25
fix(worker): stabilize gate scheduling and coverage task dispatch
thomasrockhu-codecov Mar 11, 2026
406ec7b
refactor(worker): remove ScheduledTasksResult wrapper in upload task
thomasrockhu-codecov Mar 11, 2026
e8d849d
Revert "refactor(worker): remove ScheduledTasksResult wrapper in uplo…
thomasrockhu-codecov Mar 11, 2026
4684337
Reapply "refactor(worker): remove ScheduledTasksResult wrapper in upl…
thomasrockhu-codecov Mar 11, 2026
4560ded
refactor(worker): prefer as_tuple normalization for scheduled task ids
thomasrockhu-codecov Mar 11, 2026
ceb058f
refactor(worker): inline scheduled task id extraction in upload task
thomasrockhu-codecov Mar 11, 2026
c324fd4
refactor(worker): align scheduled task variable flow with original style
thomasrockhu-codecov Mar 11, 2026
b3692bc
refactor(worker): restore upload task scheduling to main as_tuple flow
thomasrockhu-codecov Mar 11, 2026
2e960ac
fix(worker): return celery group result for coverage scheduling
thomasrockhu-codecov Mar 11, 2026
2d657c4
fix(worker): return ResultSet from coverage scheduling
thomasrockhu-codecov Mar 11, 2026
0d75c1e
fix(worker): return lightweight as_tuple-compatible scheduler result
thomasrockhu-codecov Mar 11, 2026
3b740bd
fix(worker): schedule coverage processors via celery group result
thomasrockhu-codecov Mar 11, 2026
0f81b4d
fix(worker): propagate UploadFlow checkpoint context to finisher enqueue
thomasrockhu-codecov Mar 11, 2026
5816216
fix(worker): pass existing checkpoint kwargs through finisher enqueue
thomasrockhu-codecov Mar 11, 2026
7e91055
fix(worker): gate finisher enqueue and restore processor kwargs sched…
thomasrockhu-codecov Mar 11, 2026
84dae18
fix(worker): use UploadFlow kwargs key when forwarding checkpoints
thomasrockhu-codecov Mar 11, 2026
9410786
test(worker): isolate processor tests from finisher routing side effects
thomasrockhu-codecov Mar 11, 2026
8ce6a6c
feat(worker): add finisher sweep and gate lifecycle cleanup
thomasrockhu-codecov Mar 11, 2026
a65328c
fix(worker): harden finisher sweep gate behavior
thomasrockhu-codecov Mar 11, 2026
de94365
fix(worker): clear finisher gate when sweep limit is reached
thomasrockhu-codecov Mar 11, 2026
d2062f2
fix(worker): keep finisher gate when finisher lock exhausts retries
thomasrockhu-codecov Mar 11, 2026
1bcbda2
fix(worker): refresh finisher gate TTL on sweep reschedule
thomasrockhu-codecov Mar 11, 2026
eea9a7c
fix(worker): clear finisher gate on terminal postprocessing lock exha…
thomasrockhu-codecov Mar 11, 2026
9add94b
test(worker): align finisher-gate processing tests with merge gating
thomasrockhu-codecov Mar 11, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 41 additions & 1 deletion apps/worker/services/processing/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,32 @@
from celery.exceptions import CeleryError
from sqlalchemy.orm import Session as DbSession

from app import celery_app
from database.models.core import Commit
from database.models.reports import Upload
from helpers.checkpoint_logger import _kwargs_key
from helpers.checkpoint_logger.flows import UploadFlow
from helpers.reports import delete_archive_setting
from services.report import ProcessingError, RawReportInfo, ReportService
from services.report.parser.types import VersionOneParsedRawReport
from shared.api_archive.archive import ArchiveService
from shared.celery_config import upload_finisher_task_name
from shared.helpers.redis import get_redis_connection
from shared.yaml import UserYaml

from .intermediate import save_intermediate_report
from .state import ProcessingState
from .state import ProcessingState, should_perform_merge
from .types import ProcessingResult, UploadArguments

log = logging.getLogger(__name__)

FINISHER_GATE_KEY_PREFIX = "upload_merger_lock"
FINISHER_GATE_TTL_SECONDS = 900


def finisher_gate_key(repo_id: int, commit_sha: str) -> str:
return f"{FINISHER_GATE_KEY_PREFIX}_{repo_id}_{commit_sha}"


@sentry_sdk.trace
def process_upload(
Expand Down Expand Up @@ -69,6 +81,34 @@ def process_upload(
save_intermediate_report(upload_id, processing_result.report)
state.mark_upload_as_processed(upload_id)

upload_numbers = state.get_upload_numbers()
if should_perform_merge(upload_numbers):
gate_key = finisher_gate_key(repo_id, commit_sha)
redis = get_redis_connection()
if redis.set(gate_key, "1", nx=True, ex=FINISHER_GATE_TTL_SECONDS):
log.info(
"Enqueuing upload finisher via gate",
extra={
"repo_id": repo_id,
"commit_sha": commit_sha,
"upload_id": upload_id,
"gate_key": gate_key,
},
)
finisher_kwargs = {
"repoid": repo_id,
"commitid": commit_sha,
"commit_yaml": commit_yaml.to_dict(),
}
checkpoint_kwargs_key = _kwargs_key(UploadFlow)
if checkpoint_kwargs_key in arguments:
finisher_kwargs[checkpoint_kwargs_key] = arguments[
checkpoint_kwargs_key
]
Comment on lines +103 to +107
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this necessary?

celery_app.tasks[upload_finisher_task_name].apply_async(
kwargs=finisher_kwargs
)

rewrite_or_delete_upload(archive_service, commit_yaml, report_info)

except CeleryError:
Expand Down
96 changes: 93 additions & 3 deletions apps/worker/services/tests/test_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@


@pytest.mark.django_db(databases={"default"})
class TestProcessUpload:
def test_processor_does_not_enqueue_finisher(self, dbsession, mocker, mock_storage):
class TestProcessUploadFinisherGate:
def test_triggers_finisher_when_gate_is_acquired(
self, dbsession, mocker, mock_storage
):
# Setup
repository = RepositoryFactory.create()
commit = CommitFactory.create(repository=repository)
Expand Down Expand Up @@ -54,6 +56,92 @@ def test_processor_does_not_enqueue_finisher(self, dbsession, mocker, mock_stora
return_value=mock_state_instance,
)

# Mock celery app to capture finisher task scheduling
mock_celery_app = mocker.patch("services.processing.processing.celery_app")
mock_finisher_task = MagicMock()
mock_celery_app.tasks = {"app.tasks.upload.UploadFinisher": mock_finisher_task}
mock_redis = mocker.patch("services.processing.processing.get_redis_connection")
mock_redis.return_value.set.return_value = True

# Mock other dependencies
mocker.patch("services.processing.processing.save_intermediate_report")
mocker.patch("services.processing.processing.rewrite_or_delete_upload")

commit_yaml = UserYaml({})

# Execute
result = process_upload(
on_processing_error=lambda error: None,
db_session=dbsession,
repo_id=repository.repoid,
commit_sha=commit.commitid,
commit_yaml=commit_yaml,
arguments=arguments,
)

# Verify
assert result["successful"] is True
assert result["upload_id"] == upload.id_

# Verify finisher was triggered
mock_finisher_task.apply_async.assert_called_once_with(
kwargs={
"repoid": repository.repoid,
"commitid": commit.commitid,
"commit_yaml": commit_yaml.to_dict(),
}
)
mock_redis.return_value.set.assert_called_once()

def test_does_not_trigger_finisher_when_gate_exists(
self, dbsession, mocker, mock_storage
):
# Setup
repository = RepositoryFactory.create()
commit = CommitFactory.create(repository=repository)
upload = UploadFactory.create(
report__commit=commit,
state="started",
)
dbsession.add_all([repository, commit, upload])
dbsession.flush()

arguments: UploadArguments = {
"commit": commit.commitid,
"upload_id": upload.id_,
"version": "v4",
"reportid": str(upload.report.external_id),
}

# Mock dependencies
mock_report_service = mocker.patch(
"services.processing.processing.ReportService"
)
mock_processing_result = MagicMock()
mock_processing_result.error = None
mock_processing_result.report = None
mock_report_service.return_value.build_report_from_raw_content.return_value = (
mock_processing_result
)

# Mock ProcessingState to indicate other uploads still processing
mock_state_instance = MagicMock()
mock_state_instance.get_upload_numbers.return_value = MagicMock(
processing=2,
processed=1, # Other uploads still in progress
)
mocker.patch(
"services.processing.processing.ProcessingState",
return_value=mock_state_instance,
)

# Mock celery app to capture finisher task scheduling
mock_celery_app = mocker.patch("services.processing.processing.celery_app")
mock_finisher_task = MagicMock()
mock_celery_app.tasks = {"app.tasks.upload.UploadFinisher": mock_finisher_task}
mock_redis = mocker.patch("services.processing.processing.get_redis_connection")
mock_redis.return_value.set.return_value = False

# Mock other dependencies
mocker.patch("services.processing.processing.save_intermediate_report")
mocker.patch("services.processing.processing.rewrite_or_delete_upload")
Expand All @@ -74,4 +162,6 @@ def test_processor_does_not_enqueue_finisher(self, dbsession, mocker, mock_stora
assert result["successful"] is True
assert result["upload_id"] == upload.id_

# Finisher enqueue is handled elsewhere; processor no longer triggers it directly.
# Verify finisher was NOT triggered.
mock_finisher_task.apply_async.assert_not_called()
mock_redis.return_value.set.assert_not_called()
148 changes: 147 additions & 1 deletion apps/worker/tasks/tests/unit/test_upload_finisher_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from tasks.upload_finisher import (
FINISHER_BASE_RETRY_COUNTDOWN_SECONDS,
FINISHER_BLOCKING_TIMEOUT_SECONDS,
FINISHER_MAX_SWEEP_ATTEMPTS,
ReportService,
ShouldCallNotifyResult,
UploadFinisherTask,
Expand Down Expand Up @@ -991,6 +992,9 @@ def test_soft_time_limit_handling(self, dbsession, mocker, mock_self_app):
mocker.patch(
"tasks.upload_finisher.load_commit_diff", side_effect=SoftTimeLimitExceeded
)
mock_delete_gate = mocker.patch.object(
UploadFinisherTask, "_delete_finisher_gate"
)

commit = CommitFactory.create()
dbsession.add(commit)
Expand Down Expand Up @@ -1020,6 +1024,7 @@ def test_soft_time_limit_handling(self, dbsession, mocker, mock_self_app):
"sentry_trace_id": None,
}
)
mock_delete_gate.assert_called_once_with(commit.repoid, commit.commitid)

@pytest.mark.django_db
def test_generic_exception_handling(self, dbsession, mocker, mock_self_app):
Expand All @@ -1038,6 +1043,9 @@ def test_generic_exception_handling(self, dbsession, mocker, mock_self_app):

previous_results = [{"upload_id": 0, "successful": True, "arguments": {}}]

mock_delete_gate = mocker.patch.object(
UploadFinisherTask, "_delete_finisher_gate"
)
result = UploadFinisherTask().run_impl(
dbsession,
previous_results,
Expand Down Expand Up @@ -1074,6 +1082,7 @@ def test_generic_exception_handling(self, dbsession, mocker, mock_self_app):
"sentry_trace_id": None,
}
)
mock_delete_gate.assert_called_once_with(commit.repoid, commit.commitid)

@pytest.mark.django_db
def test_idempotency_check_skips_already_processed_uploads(
Expand Down Expand Up @@ -1115,7 +1124,12 @@ def test_idempotency_check_skips_already_processed_uploads(
{"upload_id": upload_3.id, "successful": False, "arguments": {}},
]

result = UploadFinisherTask().run_impl(
task = UploadFinisherTask()
mock_count_pending = mocker.patch.object(
task, "_count_remaining_coverage_uploads", return_value=0
)
mock_delete_gate = mocker.patch.object(task, "_delete_finisher_gate")
result = task.run_impl(
dbsession,
previous_results,
repoid=commit.repoid,
Expand All @@ -1131,6 +1145,78 @@ def test_idempotency_check_skips_already_processed_uploads(

# Verify that _process_reports_with_lock was NOT called
mock_process.assert_not_called()
mock_count_pending.assert_called_once()
mock_delete_gate.assert_called_once()

@pytest.mark.django_db
def test_idempotency_schedules_sweep_when_pending_uploads_remain(
self, dbsession, mocker, mock_self_app
):
commit = CommitFactory.create()
dbsession.add(commit)
dbsession.flush()

report = CommitReport(commit_id=commit.id_)
dbsession.add(report)
dbsession.flush()

upload_1 = UploadFactory.create(report=report, state="merged")
dbsession.add(upload_1)
dbsession.flush()

task = UploadFinisherTask()
mock_count_pending = mocker.patch.object(
task, "_count_remaining_coverage_uploads", return_value=2
)
mock_schedule_sweep = mocker.patch.object(task, "_schedule_sweep")
mock_delete_gate = mocker.patch.object(task, "_delete_finisher_gate")

result = task.run_impl(
dbsession,
[{"upload_id": upload_1.id, "successful": True, "arguments": {}}],
repoid=commit.repoid,
commitid=commit.commitid,
commit_yaml={},
)

assert result == {"sweep_scheduled": True, "remaining_uploads": 2}
mock_count_pending.assert_called_once()
mock_schedule_sweep.assert_called_once()
mock_delete_gate.assert_not_called()

@pytest.mark.django_db
def test_idempotency_does_not_reschedule_sweep_after_max_attempts(
self, dbsession, mocker, mock_self_app
):
commit = CommitFactory.create()
dbsession.add(commit)
dbsession.flush()

report = CommitReport(commit_id=commit.id_)
dbsession.add(report)
dbsession.flush()

upload_1 = UploadFactory.create(report=report, state="merged")
dbsession.add(upload_1)
dbsession.flush()

task = UploadFinisherTask()
mocker.patch.object(task, "_count_remaining_coverage_uploads", return_value=2)
mock_schedule_sweep = mocker.patch.object(task, "_schedule_sweep")
mock_delete_gate = mocker.patch.object(task, "_delete_finisher_gate")

result = task.run_impl(
dbsession,
[{"upload_id": upload_1.id, "successful": True, "arguments": {}}],
repoid=commit.repoid,
commitid=commit.commitid,
commit_yaml={},
sweep_attempt=FINISHER_MAX_SWEEP_ATTEMPTS,
)

assert result == {"sweep_scheduled": False, "remaining_uploads": 2}
mock_schedule_sweep.assert_not_called()
mock_delete_gate.assert_called_once()

@pytest.mark.django_db
def test_idempotency_check_proceeds_when_uploads_not_finished(
Expand Down Expand Up @@ -1448,6 +1534,66 @@ def mock_process_reports(
# This is the key assertion - notifications should NOT be blocked by test_results uploads
mock_handle_finisher_lock.assert_called_once()

@pytest.mark.django_db
def test_run_impl_deletes_gate_after_successful_completion(
self, dbsession, mocker, mock_self_app
):
commit = CommitFactory.create()
dbsession.add(commit)
dbsession.flush()

task = UploadFinisherTask()
mocker.patch.object(
task,
"_reconstruct_processing_results",
return_value=[{"upload_id": 1, "successful": True, "arguments": {}}],
)
mocker.patch.object(task, "_process_reports_with_lock")
mocker.patch.object(task, "_count_remaining_coverage_uploads", return_value=0)
mocker.patch.object(task, "_handle_finisher_lock", return_value={"done": True})
mock_delete_gate = mocker.patch.object(task, "_delete_finisher_gate")

result = task.run_impl(
dbsession,
processing_results=[],
repoid=commit.repoid,
commitid=commit.commitid,
commit_yaml={},
)

assert result == {"done": True}
mock_delete_gate.assert_called_once()

@pytest.mark.django_db
def test_run_impl_deletes_gate_when_finisher_lock_is_exhausted(
self, dbsession, mocker, mock_self_app
):
commit = CommitFactory.create()
dbsession.add(commit)
dbsession.flush()

task = UploadFinisherTask()
mocker.patch.object(
task,
"_reconstruct_processing_results",
return_value=[{"upload_id": 1, "successful": True, "arguments": {}}],
)
mocker.patch.object(task, "_process_reports_with_lock")
mocker.patch.object(task, "_count_remaining_coverage_uploads", return_value=0)
mocker.patch.object(task, "_handle_finisher_lock", return_value=None)
mock_delete_gate = mocker.patch.object(task, "_delete_finisher_gate")

result = task.run_impl(
dbsession,
processing_results=[],
repoid=commit.repoid,
commitid=commit.commitid,
commit_yaml={},
)

assert result is None
mock_delete_gate.assert_called_once()


class TestLockManagerConfiguration:
"""Tests for lock manager configuration: finite blocking_timeout
Expand Down
Loading
Loading