Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
099193f
feat(worker): dedupe upload_finisher scheduling with redis gate
thomasrockhu-codecov Mar 11, 2026
c138e02
fix(worker): gate finisher scheduling and normalize task ids
thomasrockhu-codecov Mar 11, 2026
038fc25
fix(worker): stabilize gate scheduling and coverage task dispatch
thomasrockhu-codecov Mar 11, 2026
406ec7b
refactor(worker): remove ScheduledTasksResult wrapper in upload task
thomasrockhu-codecov Mar 11, 2026
e8d849d
Revert "refactor(worker): remove ScheduledTasksResult wrapper in uplo…
thomasrockhu-codecov Mar 11, 2026
4684337
Reapply "refactor(worker): remove ScheduledTasksResult wrapper in upl…
thomasrockhu-codecov Mar 11, 2026
4560ded
refactor(worker): prefer as_tuple normalization for scheduled task ids
thomasrockhu-codecov Mar 11, 2026
ceb058f
refactor(worker): inline scheduled task id extraction in upload task
thomasrockhu-codecov Mar 11, 2026
c324fd4
refactor(worker): align scheduled task variable flow with original style
thomasrockhu-codecov Mar 11, 2026
b3692bc
refactor(worker): restore upload task scheduling to main as_tuple flow
thomasrockhu-codecov Mar 11, 2026
2e960ac
fix(worker): return celery group result for coverage scheduling
thomasrockhu-codecov Mar 11, 2026
2d657c4
fix(worker): return ResultSet from coverage scheduling
thomasrockhu-codecov Mar 11, 2026
0d75c1e
fix(worker): return lightweight as_tuple-compatible scheduler result
thomasrockhu-codecov Mar 11, 2026
3b740bd
fix(worker): schedule coverage processors via celery group result
thomasrockhu-codecov Mar 11, 2026
0f81b4d
fix(worker): propagate UploadFlow checkpoint context to finisher enqueue
thomasrockhu-codecov Mar 11, 2026
5816216
fix(worker): pass existing checkpoint kwargs through finisher enqueue
thomasrockhu-codecov Mar 11, 2026
7e91055
fix(worker): gate finisher enqueue and restore processor kwargs sched…
thomasrockhu-codecov Mar 11, 2026
84dae18
fix(worker): use UploadFlow kwargs key when forwarding checkpoints
thomasrockhu-codecov Mar 11, 2026
9410786
test(worker): isolate processor tests from finisher routing side effects
thomasrockhu-codecov Mar 11, 2026
8ce6a6c
feat(worker): add finisher sweep and gate lifecycle cleanup
thomasrockhu-codecov Mar 11, 2026
a65328c
fix(worker): harden finisher sweep gate behavior
thomasrockhu-codecov Mar 11, 2026
de94365
fix(worker): clear finisher gate when sweep limit is reached
thomasrockhu-codecov Mar 11, 2026
d2062f2
fix(worker): keep finisher gate when finisher lock exhausts retries
thomasrockhu-codecov Mar 11, 2026
1bcbda2
fix(worker): refresh finisher gate TTL on sweep reschedule
thomasrockhu-codecov Mar 11, 2026
eea9a7c
fix(worker): clear finisher gate on terminal postprocessing lock exha…
thomasrockhu-codecov Mar 11, 2026
9add94b
test(worker): align finisher-gate processing tests with merge gating
thomasrockhu-codecov Mar 11, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions apps/worker/services/processing/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@
from celery.exceptions import CeleryError
from sqlalchemy.orm import Session as DbSession

from app import celery_app
from database.models.core import Commit
from database.models.reports import Upload
from helpers.reports import delete_archive_setting
from services.report import ProcessingError, RawReportInfo, ReportService
from services.report.parser.types import VersionOneParsedRawReport
from shared.api_archive.archive import ArchiveService
from shared.celery_config import upload_finisher_task_name
from shared.helpers.redis import get_redis_connection
from shared.yaml import UserYaml

from .intermediate import save_intermediate_report
Expand All @@ -19,6 +22,13 @@

log = logging.getLogger(__name__)

FINISHER_GATE_KEY_PREFIX = "upload_merger_lock"
FINISHER_GATE_TTL_SECONDS = 900


def finisher_gate_key(repo_id: int, commit_sha: str) -> str:
return f"{FINISHER_GATE_KEY_PREFIX}_{repo_id}_{commit_sha}"


@sentry_sdk.trace
def process_upload(
Expand Down Expand Up @@ -69,6 +79,26 @@ def process_upload(
save_intermediate_report(upload_id, processing_result.report)
state.mark_upload_as_processed(upload_id)

gate_key = finisher_gate_key(repo_id, commit_sha)
redis = get_redis_connection()
if redis.set(gate_key, "1", nx=True, ex=FINISHER_GATE_TTL_SECONDS):
log.info(
"Enqueuing upload finisher via gate",
extra={
"repo_id": repo_id,
"commit_sha": commit_sha,
"upload_id": upload_id,
"gate_key": gate_key,
},
)
celery_app.tasks[upload_finisher_task_name].apply_async(
kwargs={
"repoid": repo_id,
"commitid": commit_sha,
"commit_yaml": commit_yaml.to_dict(),
}
)

rewrite_or_delete_upload(archive_service, commit_yaml, report_info)

except CeleryError:
Expand Down
96 changes: 93 additions & 3 deletions apps/worker/services/tests/test_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@


@pytest.mark.django_db(databases={"default"})
class TestProcessUpload:
def test_processor_does_not_enqueue_finisher(self, dbsession, mocker, mock_storage):
class TestProcessUploadFinisherGate:
def test_triggers_finisher_when_gate_is_acquired(
self, dbsession, mocker, mock_storage
):
# Setup
repository = RepositoryFactory.create()
commit = CommitFactory.create(repository=repository)
Expand Down Expand Up @@ -54,6 +56,92 @@ def test_processor_does_not_enqueue_finisher(self, dbsession, mocker, mock_stora
return_value=mock_state_instance,
)

# Mock celery app to capture finisher task scheduling
mock_celery_app = mocker.patch("services.processing.processing.celery_app")
mock_finisher_task = MagicMock()
mock_celery_app.tasks = {"app.tasks.upload.UploadFinisher": mock_finisher_task}
mock_redis = mocker.patch("services.processing.processing.get_redis_connection")
mock_redis.return_value.set.return_value = True

# Mock other dependencies
mocker.patch("services.processing.processing.save_intermediate_report")
mocker.patch("services.processing.processing.rewrite_or_delete_upload")

commit_yaml = UserYaml({})

# Execute
result = process_upload(
on_processing_error=lambda error: None,
db_session=dbsession,
repo_id=repository.repoid,
commit_sha=commit.commitid,
commit_yaml=commit_yaml,
arguments=arguments,
)

# Verify
assert result["successful"] is True
assert result["upload_id"] == upload.id_

# Verify finisher was triggered
mock_finisher_task.apply_async.assert_called_once_with(
kwargs={
"repoid": repository.repoid,
"commitid": commit.commitid,
"commit_yaml": commit_yaml.to_dict(),
}
)
mock_redis.return_value.set.assert_called_once()

def test_does_not_trigger_finisher_when_gate_exists(
self, dbsession, mocker, mock_storage
):
# Setup
repository = RepositoryFactory.create()
commit = CommitFactory.create(repository=repository)
upload = UploadFactory.create(
report__commit=commit,
state="started",
)
dbsession.add_all([repository, commit, upload])
dbsession.flush()

arguments: UploadArguments = {
"commit": commit.commitid,
"upload_id": upload.id_,
"version": "v4",
"reportid": str(upload.report.external_id),
}

# Mock dependencies
mock_report_service = mocker.patch(
"services.processing.processing.ReportService"
)
mock_processing_result = MagicMock()
mock_processing_result.error = None
mock_processing_result.report = None
mock_report_service.return_value.build_report_from_raw_content.return_value = (
mock_processing_result
)

# Mock ProcessingState to indicate other uploads still processing
mock_state_instance = MagicMock()
mock_state_instance.get_upload_numbers.return_value = MagicMock(
processing=2,
processed=1, # Other uploads still in progress
)
mocker.patch(
"services.processing.processing.ProcessingState",
return_value=mock_state_instance,
)

# Mock celery app to capture finisher task scheduling
mock_celery_app = mocker.patch("services.processing.processing.celery_app")
mock_finisher_task = MagicMock()
mock_celery_app.tasks = {"app.tasks.upload.UploadFinisher": mock_finisher_task}
mock_redis = mocker.patch("services.processing.processing.get_redis_connection")
mock_redis.return_value.set.return_value = False

# Mock other dependencies
mocker.patch("services.processing.processing.save_intermediate_report")
mocker.patch("services.processing.processing.rewrite_or_delete_upload")
Expand All @@ -74,4 +162,6 @@ def test_processor_does_not_enqueue_finisher(self, dbsession, mocker, mock_stora
assert result["successful"] is True
assert result["upload_id"] == upload.id_

# Finisher enqueue is handled elsewhere; processor no longer triggers it directly.
# Verify finisher was NOT triggered.
mock_finisher_task.apply_async.assert_not_called()
mock_redis.return_value.set.assert_called_once()
116 changes: 34 additions & 82 deletions apps/worker/tasks/tests/unit/test_upload_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@
from tasks.test_results_processor import test_results_processor_task
from tasks.tests.utils import ensure_hard_time_limit_task_is_numeric
from tasks.upload import UploadContext, UploadTask
from tasks.upload_finisher import upload_finisher_task
from tasks.upload_processor import upload_processor_task

here = Path(__file__)

Expand Down Expand Up @@ -190,7 +188,9 @@ def test_upload_task_call(
mock_self_app,
):
_start_upload_flow(mocker)
mocked_chord = mocker.patch("tasks.upload.chord")
mocked_apply_async = mocker.patch(
"tasks.upload.upload_processor_task.apply_async"
)
url = "v4/raw/2019-05-22/C3C4715CA57C910D11D5EB899FC86A7E/4c4e4654ac25037ae869caeb3619d485970b6304/a84d445c-9c1e-434f-8275-f18f1f320f81.txt"

commit = CommitFactory.create(
Expand Down Expand Up @@ -233,26 +233,11 @@ def test_upload_task_call(
.filter_by(report_id=commit.report.id, build_code="some_random_build")
.first()
)
processor = upload_processor_task.s(
repoid=commit.repoid,
commitid="abf6d4df662c47e32460020ab14abf9303581429",
commit_yaml={"codecov": {"max_report_age": "1y ago"}},
arguments={
"url": url,
"flags": [],
"build": "some_random_build",
"upload_id": first_session.id,
"upload_pk": first_session.id,
},
)
kwargs = {
"repoid": commit.repoid,
"commitid": "abf6d4df662c47e32460020ab14abf9303581429",
"commit_yaml": {"codecov": {"max_report_age": "1y ago"}},
}
kwargs[_kwargs_key(UploadFlow)] = mocker.ANY
finisher = upload_finisher_task.signature(kwargs=kwargs)
mocked_chord.assert_called_with([processor], finisher)
mocked_apply_async.assert_called_once()
scheduled_kwargs = mocked_apply_async.call_args.kwargs["kwargs"]
assert scheduled_kwargs["repoid"] == commit.repoid
assert scheduled_kwargs["commitid"] == commit.commitid
assert scheduled_kwargs["arguments"]["upload_id"] == first_session.id
calls = [
call(
"time_before_processing",
Expand Down Expand Up @@ -807,7 +792,9 @@ def test_upload_task_upload_processing_delay_enough_delay(
)
mock_configuration.set_params({"setup": {"upload_processing_delay": 1000}})
mocker.patch.object(UploadTask, "possibly_setup_webhooks", return_value=True)
mocked_chord = mocker.patch("tasks.upload.chord")
mocked_apply_async = mocker.patch(
"tasks.upload.upload_processor_task.apply_async"
)
dbsession.add(commit)
dbsession.flush()

Expand All @@ -834,7 +821,7 @@ def test_upload_task_upload_processing_delay_enough_delay(
commit_report = commit.commit_report(report_type=ReportType.COVERAGE)
assert commit_report is not None
assert len(commit_report.uploads) == 2
mocked_chord.assert_called_with([mocker.ANY, mocker.ANY], mocker.ANY)
assert mocked_apply_async.call_count == 2
mock_self_app.tasks[upload_breadcrumb_task_name].apply_async.assert_has_calls(
[
call(
Expand Down Expand Up @@ -879,7 +866,9 @@ def test_upload_task_upload_processing_delay_upload_is_none(
"tasks.upload.possibly_update_commit_from_provider_info", return_value=True
)
mocker.patch.object(UploadTask, "possibly_setup_webhooks", return_value=True)
mocked_chord = mocker.patch("tasks.upload.chord")
mocked_apply_async = mocker.patch(
"tasks.upload.upload_processor_task.apply_async"
)
commit = CommitFactory.create(
parent_commit_id=None,
message="",
Expand Down Expand Up @@ -912,7 +901,7 @@ def test_upload_task_upload_processing_delay_upload_is_none(
commit_report = commit.commit_report(report_type=ReportType.COVERAGE)
assert commit_report is not None
assert len(commit_report.uploads) == 2
mocked_chord.assert_called_with([mocker.ANY, mocker.ANY], mocker.ANY)
assert mocked_apply_async.call_count == 2
mock_self_app.tasks[upload_breadcrumb_task_name].apply_async.assert_has_calls(
[
call(
Expand Down Expand Up @@ -952,7 +941,9 @@ def test_upload_task_call_multiple_processors(
mock_storage,
mock_self_app,
):
mocked_chord = mocker.patch("tasks.upload.chord")
mocked_apply_async = mocker.patch(
"tasks.upload.upload_processor_task.apply_async"
)

commit = CommitFactory.create(
message="",
Expand Down Expand Up @@ -994,29 +985,7 @@ def test_upload_task_call_multiple_processors(
commit_report = commit.commit_report(report_type=ReportType.COVERAGE)
assert commit_report is not None
assert len(commit_report.uploads) == 8
processors = [
upload_processor_task.s(
repoid=commit.repoid,
commitid="abf6d4df662c47e32460020ab14abf9303581429",
commit_yaml={"codecov": {"max_report_age": "1y ago"}},
arguments={
**arguments,
"flags": [],
"upload_id": mocker.ANY,
"upload_pk": mocker.ANY,
},
)
for arguments in redis_queue
]
processors.reverse() # whatever the reason
kwargs = {
"repoid": commit.repoid,
"commitid": "abf6d4df662c47e32460020ab14abf9303581429",
"commit_yaml": {"codecov": {"max_report_age": "1y ago"}},
}
kwargs[_kwargs_key(UploadFlow)] = mocker.ANY
t_final = upload_finisher_task.signature(kwargs=kwargs)
mocked_chord.assert_called_with(processors, t_final)
assert mocked_apply_async.call_count == 8
mock_self_app.tasks[upload_breadcrumb_task_name].apply_async.assert_has_calls(
[
call(
Expand Down Expand Up @@ -1590,7 +1559,9 @@ def test_schedule_task_with_one_task(
mocker.patch(
"tasks.upload.UploadTask.possibly_setup_webhooks", return_value=True
)
mocked_chord = mocker.patch("tasks.upload.chord")
mocked_apply_async = mocker.patch(
"tasks.upload.upload_processor_task.apply_async"
)
commit = CommitFactory.create()
commit_yaml = {"codecov": {"max_report_age": "100y ago"}}
dbsession.add(commit)
Expand All @@ -1607,22 +1578,8 @@ def test_schedule_task_with_one_task(
ReportFactory.create(),
upload_args,
)
assert result == mocked_chord.return_value.apply_async.return_value
processor = upload_processor_task.s(
repoid=commit.repoid,
commitid=commit.commitid,
commit_yaml=commit_yaml,
arguments={"upload_id": 1, "upload_pk": 1},
)
finisher = upload_finisher_task.signature(
kwargs={
"repoid": commit.repoid,
"commitid": commit.commitid,
"commit_yaml": commit_yaml,
_kwargs_key(UploadFlow): mocker.ANY,
}
)
mocked_chord.assert_called_with([processor], finisher)
assert result is not None
mocked_apply_async.assert_called_once()
mock_self_app.tasks[
upload_breadcrumb_task_name
].apply_async.assert_called_once_with(
Expand Down Expand Up @@ -2205,7 +2162,9 @@ def test_upload_debounce_limit(
self, dbsession, mocker, mock_config, mock_storage, mock_self_app
):
mock_config(10, "setup", "upload_debounce_limit")
mocked_chord = mocker.patch("tasks.upload.chord")
mocked_apply_async = mocker.patch(
"tasks.upload.upload_processor_task.apply_async"
)
mocked_fetch_yaml = mocker.patch(
"tasks.upload.fetch_commit_yaml_and_possibly_store"
)
Expand Down Expand Up @@ -2256,21 +2215,14 @@ def test_upload_debounce_limit(
commit.commitid,
)

# every chord instance gets called 5 times
assert len(mocked_chord.mock_calls) / 5 == 3

chord_calls = mocked_chord.call_args_list
all_processor_tasks = []
for chord_call in chord_calls:
processor_tasks = chord_call[0][0]
all_processor_tasks.extend(processor_tasks)
assert mocked_apply_async.call_count == 25

actual_storage_paths = []
for task in all_processor_tasks:
if hasattr(task, "kwargs") and "arguments" in task.kwargs:
arguments = task.kwargs["arguments"]
if "url" in arguments:
actual_storage_paths.append(arguments["url"])
for call_args in mocked_apply_async.call_args_list:
kwargs = call_args.kwargs.get("kwargs", {})
arguments = kwargs.get("arguments", {})
if "url" in arguments:
actual_storage_paths.append(arguments["url"])

expected_storage_paths = [item["url"] for item in redis_queue]

Expand Down
Loading
Loading