Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions apps/worker/services/processing/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,5 +134,13 @@ def get_uploads_for_merging(self) -> set[int]:
)
}

def get_all_processed_uploads(self) -> set[int]:
"""Return ALL upload IDs in the 'processed' set (no batch limit).

Used by the cooperative finisher to merge every pending upload in one
lock acquisition instead of processing them in MERGE_BATCH_SIZE chunks.
"""
return {int(id) for id in self._redis.smembers(self._redis_key("processed"))}

def _redis_key(self, state: str) -> str:
return f"upload-processing-state/{self.repoid}/{self.commitsha}/{state}"
15 changes: 15 additions & 0 deletions apps/worker/services/tests/test_processing_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,21 @@ def test_batch_merging_many_uploads():
assert should_trigger_postprocessing(state.get_upload_numbers())


def test_get_all_processed_uploads_returns_full_set():
"""get_all_processed_uploads returns every upload without the MERGE_BATCH_SIZE cap."""
state = ProcessingState(1234, uuid4().hex)
ids = list(range(1, 25))
state.mark_uploads_as_processing(ids)
for i in ids:
state.mark_upload_as_processed(i)

# get_uploads_for_merging is capped at MERGE_BATCH_SIZE (10)
assert len(state.get_uploads_for_merging()) <= 10

# get_all_processed_uploads returns everything
assert state.get_all_processed_uploads() == set(ids)


class TestProcessingStateEmptyListGuards:
"""Tests for empty list guards in ProcessingState methods."""

Expand Down
Loading
Loading