Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions apps/worker/services/processing/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ def process_upload(
upload = db_session.query(Upload).filter_by(id_=upload_id).first()
assert upload

state = ProcessingState(repo_id, commit_sha)
# this in a noop in normal cases, but relevant for task retries:
state = ProcessingState(repo_id, commit_sha, db_session=db_session)
# this is a noop in normal cases, but relevant for task retries:
state.mark_uploads_as_processing([upload_id])

report_service = ReportService(commit_yaml)
Expand All @@ -71,10 +71,12 @@ def process_upload(
save_intermediate_report(upload_id, processing_result.report)
state.mark_upload_as_processed(upload_id)

# Check if all uploads are now processed and trigger finisher if needed
# This handles the case where a processor task retries outside of a chord
# (e.g., from visibility timeout or task_reject_on_worker_lost)
upload_numbers = state.get_upload_numbers()
# Safety net: trigger finisher if all uploads are done.
# Handles retries outside a chord (visibility timeout, task_reject_on_worker_lost).
# Will be replaced by gate key mechanism in a future PR.
# Use Redis-only state for counting (dual-write keeps Redis in sync).
redis_state = ProcessingState(repo_id, commit_sha)
upload_numbers = redis_state.get_upload_numbers()
if should_trigger_postprocessing(upload_numbers):
log.info(
"All uploads processed, triggering finisher",
Expand Down
16 changes: 2 additions & 14 deletions apps/worker/services/processing/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,12 +128,10 @@ def get_upload_numbers(self):
return UploadNumbers(processing, processed)

def mark_uploads_as_processing(self, upload_ids: list[int]):
if not upload_ids or self._db_session:
if not upload_ids:
return
Comment on lines 130 to 132
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Removing the self._db_session guard in mark_uploads_as_processing causes task retries to corrupt Redis state, preventing the finisher task from being triggered.
Severity: CRITICAL

Suggested Fix

Restore the self._db_session check in the guard condition within mark_uploads_as_processing. The line should be if not upload_ids or self._db_session: return. This ensures the function is a no-op for Redis when a database session is provided, preventing state corruption during retries.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.

Location: apps/worker/services/processing/state.py#L130-L132

Potential issue: In a task retry scenario where a task fails after an upload is marked
as processed, the `mark_uploads_as_processing` function is called again. Due to the
removal of a check for `self._db_session`, this function re-adds the already processed
upload ID to the Redis "processing" set. This results in the upload ID existing in both
the "processing" and "processed" Redis sets. Consequently, a subsequent safety check,
`should_trigger_postprocessing`, incorrectly determines that processing is not complete,
which prevents the finisher task from running and leaves the commit in a stuck state.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pre-existing behavior identical to main branch. mark_uploads_as_processing always wrote to Redis on main (no db_session guard existed). This PR restores that same behavior for dual-write. Not a new regression.

key = self._redis_key("processing")
self._redis.sadd(key, *upload_ids)
# Set TTL to match intermediate report expiration (24 hours)
# This ensures state keys don't accumulate indefinitely
self._redis.expire(key, PROCESSING_STATE_TTL)

def clear_in_progress_uploads(self, upload_ids: list[int]):
Expand All @@ -159,13 +157,10 @@ def clear_in_progress_uploads(self, upload_ids: list[int]):
)
if updated > 0:
CLEARED_UPLOADS.inc(updated)
self._redis.srem(self._redis_key("processing"), *upload_ids)
return
removed_uploads = self._redis.srem(self._redis_key("processing"), *upload_ids)
if removed_uploads > 0:
# the normal flow would move the uploads from the "processing" set
# to the "processed" set via `mark_upload_as_processed`.
# this function here is only called in the error case and we don't expect
# this to be triggered often, if at all.
CLEARED_UPLOADS.inc(removed_uploads)

def mark_upload_as_processed(self, upload_id: int):
Expand All @@ -176,21 +171,14 @@ def mark_upload_as_processed(self, upload_id: int):
# Don't set upload.state here -- the finisher's idempotency check
# uses state="processed" to detect already-merged uploads.
# The state string is set by update_uploads() after merging.
return

processing_key = self._redis_key("processing")
processed_key = self._redis_key("processed")

res = self._redis.smove(processing_key, processed_key, upload_id)
if not res:
# this can happen when `upload_id` was never in the source set,
# which probably is the case during initial deployment as
# the code adding this to the initial set was not deployed yet
# TODO: make sure to remove this code after a grace period
self._redis.sadd(processed_key, upload_id)

# Set TTL on processed key to match intermediate report expiration
# This ensures uploads marked as processed have a bounded lifetime
self._redis.expire(processed_key, PROCESSING_STATE_TTL)

def mark_uploads_as_merged(self, upload_ids: list[int]):
Expand Down
149 changes: 10 additions & 139 deletions apps/worker/services/tests/test_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,125 +9,21 @@
)
from services.processing.processing import process_upload
from services.processing.types import UploadArguments
from shared.reports.enums import UploadState
from shared.yaml import UserYaml


@pytest.mark.django_db(databases={"default"})
class TestProcessUploadOrphanedTaskRecovery:
"""
Tests for the orphaned upload recovery mechanism.

When a processor task retries outside of a chord (e.g., from visibility timeout
or task_reject_on_worker_lost), it should trigger the finisher if all uploads
are now processed.
"""

def test_triggers_finisher_when_last_upload_completes(
self, dbsession, mocker, mock_storage
):
"""
Test that finisher is triggered when the last upload completes processing.

This simulates the scenario where:
1. A processor task died before ACK, missing the chord callback
2. Task retries standalone (visibility timeout or worker rejection)
3. Completes successfully and notices all uploads are done
4. Triggers finisher to complete the upload flow
"""
# Setup
repository = RepositoryFactory.create()
commit = CommitFactory.create(repository=repository)
upload = UploadFactory.create(
report__commit=commit,
state="started",
)
dbsession.add_all([repository, commit, upload])
dbsession.flush()

arguments: UploadArguments = {
"commit": commit.commitid,
"upload_id": upload.id_,
"version": "v4",
"reportid": str(upload.report.external_id),
}

# Mock dependencies
mock_report_service = mocker.patch(
"services.processing.processing.ReportService"
)
mock_processing_result = MagicMock()
mock_processing_result.error = None
mock_processing_result.report = None
mock_report_service.return_value.build_report_from_raw_content.return_value = (
mock_processing_result
)

# Mock ProcessingState to indicate all uploads are processed
mock_state_instance = MagicMock()
mock_state_instance.get_upload_numbers.return_value = MagicMock(
processing=0,
processed=0, # All uploads processed and merged
)
mocker.patch(
"services.processing.processing.ProcessingState",
return_value=mock_state_instance,
)

# Mock should_trigger_postprocessing to return True
mocker.patch(
"services.processing.processing.should_trigger_postprocessing",
return_value=True,
)

# Mock celery app to capture finisher task scheduling
mock_celery_app = mocker.patch("services.processing.processing.celery_app")
mock_finisher_task = MagicMock()
mock_celery_app.tasks = {"app.tasks.upload.UploadFinisher": mock_finisher_task}

# Mock other dependencies
mocker.patch("services.processing.processing.save_intermediate_report")
mocker.patch("services.processing.processing.rewrite_or_delete_upload")

commit_yaml = UserYaml({})

# Execute
result = process_upload(
on_processing_error=lambda error: None,
db_session=dbsession,
repo_id=repository.repoid,
commit_sha=commit.commitid,
commit_yaml=commit_yaml,
arguments=arguments,
)

# Verify
assert result["successful"] is True
assert result["upload_id"] == upload.id_

# Verify finisher was triggered
mock_finisher_task.apply_async.assert_called_once_with(
kwargs={
"repoid": repository.repoid,
"commitid": commit.commitid,
"commit_yaml": commit_yaml.to_dict(),
}
)

def test_does_not_trigger_finisher_when_uploads_still_processing(
class TestProcessUpload:
def test_successful_processing_marks_upload_as_processed(
self, dbsession, mocker, mock_storage
):
"""
Test that finisher is NOT triggered when other uploads are still processing.

This verifies we don't prematurely trigger the finisher when only some
uploads have completed.
"""
# Setup
repository = RepositoryFactory.create()
commit = CommitFactory.create(repository=repository)
upload = UploadFactory.create(
report__commit=commit,
state="started",
state_id=UploadState.UPLOADED.db_id,
)
dbsession.add_all([repository, commit, upload])
dbsession.flush()
Expand All @@ -139,7 +35,6 @@ def test_does_not_trigger_finisher_when_uploads_still_processing(
"reportid": str(upload.report.external_id),
}

# Mock dependencies
mock_report_service = mocker.patch(
"services.processing.processing.ReportService"
)
Expand All @@ -150,47 +45,23 @@ def test_does_not_trigger_finisher_when_uploads_still_processing(
mock_processing_result
)

# Mock ProcessingState to indicate other uploads still processing
mock_state_instance = MagicMock()
mock_state_instance.get_upload_numbers.return_value = MagicMock(
processing=2,
processed=1, # Other uploads still in progress
)
mocker.patch(
"services.processing.processing.ProcessingState",
return_value=mock_state_instance,
)

# Mock should_trigger_postprocessing to return False
mocker.patch(
"services.processing.processing.should_trigger_postprocessing",
return_value=False,
)

# Mock celery app to capture finisher task scheduling
mock_celery_app = mocker.patch("services.processing.processing.celery_app")
mock_finisher_task = MagicMock()
mock_celery_app.tasks = {"app.tasks.upload.UploadFinisher": mock_finisher_task}

# Mock other dependencies
mocker.patch("services.processing.processing.save_intermediate_report")
mocker.patch("services.processing.processing.rewrite_or_delete_upload")

commit_yaml = UserYaml({})

# Execute
result = process_upload(
on_processing_error=lambda error: None,
db_session=dbsession,
repo_id=repository.repoid,
commit_sha=commit.commitid,
commit_yaml=commit_yaml,
commit_yaml=UserYaml({}),
arguments=arguments,
)

# Verify
assert result["successful"] is True
assert result["upload_id"] == upload.id_

# Verify finisher was NOT triggered
mock_finisher_task.apply_async.assert_not_called()
dbsession.refresh(upload)
assert upload.state_id == UploadState.PROCESSED.db_id
# state string is not updated by the processor -- the finisher sets it
# after merging (to avoid triggering the finisher's idempotency check early)
assert upload.state == "started"
Loading