refactor: carve out ARF & Storage into first-class Code Blue domains#1596
refactor: carve out ARF & Storage into first-class Code Blue domains#1596felixschmetz wants to merge 8 commits intomainfrom
Conversation
There was a problem hiding this comment.
CodeQL found more than 20 potential problems in the proposed changes. Check the Files changed tab for more details.
Move ArfService, ArfReader, and ArfReplaySource from platform/sync/arf/ and platform/storage/ into domains/arf/ with proper constructor injection, protocols, fakes, and table-driven tests. Wire through container, factory, conftest, and temporal worker wiring. Remove all singletons and factory indirection (get_admin_sync_service). Add AdminSyncService to the container. Backward-compat shims with [code blue] markers left in old locations.
…pat shims Keep AdminSyncService as a module-level singleton with lazy container.arf_service access (code blue todo for future DI). Delete ARF backward-compat re-exports from platform/sync/ and platform/storage/ since all imports now use domains/arf/ directly.
d2b5827 to
675d8ca
Compare
Move platform/storage/ to domains/storage/ with full import updates across ~50 files. Wire StorageBackend through Container, consolidate 3 inline FakeStorageBackend copies into domains/storage/fakes/, rename protocol.py to protocols.py to match domain convention.
Container owns the singleton; builders/source.py now injects storage_backend into FileService and ArfReplaySource. Lazy fallbacks in FileService and SyncFileManager resolve from container instead of factory. Factory is now a pure creator (no caching).
Add 16 tests for ArfService (file entities, get_entity, iter_entities, cleanup_stale_entities, list_syncs) and 2 tests for ArfReader file restoration. Remove dead __getattr__ lazy imports from backends/__init__.
| _container = container_mod.container | ||
| arf_service = _container.arf_service if _container else None | ||
| dispatcher = EntityDispatcherBuilder.build( | ||
| destinations=runtime.destinations, | ||
| arf_service=arf_service, | ||
| execution_config=resolved_config, |
| file_downloader = FileService( | ||
| sync_job_id=sync_job.id, | ||
| storage_backend=app_container.storage_backend, | ||
| ) |
…up lazy imports - ArfService/ArfReader now explicitly implement their protocols - Fix broken `from airweave.domains.storage import storage_backend` in snapshot.py (symbol never existed in __init__) → use container - Hoist sync_file_manager to top-level import in web_fetcher.py, remove 6 scattered lazy imports - Document long-term DI plan on SyncFileManager
…ct() Module-level singleton removed; SyncFileManager now takes required StorageBackend via constructor, lives in Container. Callers (web_fetcher, file_retrieval) pull from container. New SyncFileManagerProtocol in protocols.py; file_retrieval.py uses Inject(SyncFileManagerProtocol).
…NG guards FileService no longer has a lazy storage fallback — storage_backend is a required constructor arg (only caller already injects it). ContextualLogger in protocols.py is now a regular import, not behind TYPE_CHECKING.
| # [code blue] todo: inject arf_service once admin domain is extracted | ||
| from airweave.core import container as container_mod | ||
|
|
||
| arf_service = ArfService() | ||
| arf_service = container_mod.container.arf_service |
There was a problem hiding this comment.
temporary - let's defer restructuring the admin stuff
| """Get storage backend from container.""" | ||
| if self._storage is None: | ||
| from airweave.platform.storage import storage_backend | ||
| # [code blue] todo: inject via constructor once sources receive container | ||
| from airweave.core import container as container_mod | ||
|
|
||
| self._storage = storage_backend | ||
| self._storage = container_mod.container.storage_backend |
There was a problem hiding this comment.
Hmm, are we ok with this in the short term?
There was a problem hiding this comment.
5 issues found across 82 files
Note: This PR contains a large number of files. cubic only reviews up to 75 files per PR, so some files may not have been reviewed.
Prompt for AI agents (unresolved issues)
Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.
<file name="backend/airweave/domains/arf/service.py">
<violation number="1" location="backend/airweave/domains/arf/service.py:47">
P2: Reuse `StoragePaths` here instead of duplicating the ARF path helpers and `_safe_filename` logic.</violation>
</file>
<file name="backend/airweave/domains/storage/fakes/backend.py">
<violation number="1" location="backend/airweave/domains/storage/fakes/backend.py:70">
P1: Include `write_file` entries in `list_files`; otherwise binary fixtures disappear from `list_files` and `count_files`.</violation>
</file>
<file name="backend/list_syncs_for_migration.py">
<violation number="1" location="backend/list_syncs_for_migration.py:108">
P2: This now creates a new storage backend for every sync. In large migration runs that means repeated backend/client initialization and, for cloud backends, leaked async clients because the new instances are never closed.</violation>
</file>
<file name="backend/airweave/platform/sync/actions/entity/builder.py">
<violation number="1" location="backend/airweave/platform/sync/actions/entity/builder.py:120">
P1: Custom agent: **Check for Cursor Rules Drift**
Update the ARF/sync Cursor rules for this file-structure and API-pattern change. This builder now injects `ArfServiceProtocol` into `ArfHandler`, but `.cursor/rules/arf.mdc` and `.cursor/rules/sync-architecture.mdc` still document deleted `platform/storage/*` / `platform/sync/raw_data.py` paths and obsolete `RawDataHandler` wiring.</violation>
</file>
<file name="backend/airweave/domains/arf/fakes/service.py">
<violation number="1" location="backend/airweave/domains/arf/fakes/service.py:11">
P1: Custom agent: **Explicit Protocol Implementation**
Explicit Protocol Implementation: this fake service implements `ArfServiceProtocol` but does not inherit it. Declare `FakeArfService(ArfServiceProtocol)` so the ARF write-path contract stays explicit and statically enforced.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
| return len(json_keys) > 0 or len(file_keys) > 0 | ||
|
|
||
| async def list_files(self, prefix: str = "") -> List[str]: | ||
| return [k for k in self._json_store if k.startswith(prefix)] |
There was a problem hiding this comment.
P1: Include write_file entries in list_files; otherwise binary fixtures disappear from list_files and count_files.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/airweave/domains/storage/fakes/backend.py, line 70:
<comment>Include `write_file` entries in `list_files`; otherwise binary fixtures disappear from `list_files` and `count_files`.</comment>
<file context>
@@ -0,0 +1,86 @@
+ return len(json_keys) > 0 or len(file_keys) > 0
+
+ async def list_files(self, prefix: str = "") -> List[str]:
+ return [k for k in self._json_store if k.startswith(prefix)]
+
+ async def list_dirs(self, prefix: str = "") -> List[str]:
</file context>
| logger.warning("Skipping ArfHandler (no arf_service provided)") | ||
| return | ||
|
|
||
| handlers.append(ArfHandler(arf_service=arf_service)) |
There was a problem hiding this comment.
P1: Custom agent: Check for Cursor Rules Drift
Update the ARF/sync Cursor rules for this file-structure and API-pattern change. This builder now injects ArfServiceProtocol into ArfHandler, but .cursor/rules/arf.mdc and .cursor/rules/sync-architecture.mdc still document deleted platform/storage/* / platform/sync/raw_data.py paths and obsolete RawDataHandler wiring.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/airweave/platform/sync/actions/entity/builder.py, line 120:
<comment>Update the ARF/sync Cursor rules for this file-structure and API-pattern change. This builder now injects `ArfServiceProtocol` into `ArfHandler`, but `.cursor/rules/arf.mdc` and `.cursor/rules/sync-architecture.mdc` still document deleted `platform/storage/*` / `platform/sync/raw_data.py` paths and obsolete `RawDataHandler` wiring.</comment>
<file context>
@@ -106,16 +102,24 @@ def _add_destination_handler(
+ logger.warning("Skipping ArfHandler (no arf_service provided)")
+ return
+
+ handlers.append(ArfHandler(arf_service=arf_service))
+ if logger:
+ logger.debug("Added ArfHandler")
</file context>
| from airweave.platform.entities._base import BaseEntity | ||
|
|
||
|
|
||
| class FakeArfService: |
There was a problem hiding this comment.
P1: Custom agent: Explicit Protocol Implementation
Explicit Protocol Implementation: this fake service implements ArfServiceProtocol but does not inherit it. Declare FakeArfService(ArfServiceProtocol) so the ARF write-path contract stays explicit and statically enforced.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/airweave/domains/arf/fakes/service.py, line 11:
<comment>Explicit Protocol Implementation: this fake service implements `ArfServiceProtocol` but does not inherit it. Declare `FakeArfService(ArfServiceProtocol)` so the ARF write-path contract stays explicit and statically enforced.</comment>
<file context>
@@ -0,0 +1,132 @@
+from airweave.platform.entities._base import BaseEntity
+
+
+class FakeArfService:
+ """In-memory fake for ArfServiceProtocol.
+
</file context>
| @@ -0,0 +1,330 @@ | |||
| """ARF service for entity capture and retrieval. | |||
There was a problem hiding this comment.
P2: Reuse StoragePaths here instead of duplicating the ARF path helpers and _safe_filename logic.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/airweave/domains/arf/service.py, line 47:
<comment>Reuse `StoragePaths` here instead of duplicating the ARF path helpers and `_safe_filename` logic.</comment>
<file context>
@@ -0,0 +1,330 @@
+ # =========================================================================
+
+ @staticmethod
+ def _sync_path(sync_id: str) -> str:
+ return f"raw/{sync_id}"
+
</file context>
| from airweave.domains.storage.factory import get_storage_backend | ||
|
|
||
| arf_service = ArfService(storage=storage_backend) | ||
| arf_service = ArfService(storage=get_storage_backend()) |
There was a problem hiding this comment.
P2: This now creates a new storage backend for every sync. In large migration runs that means repeated backend/client initialization and, for cloud backends, leaked async clients because the new instances are never closed.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/list_syncs_for_migration.py, line 108:
<comment>This now creates a new storage backend for every sync. In large migration runs that means repeated backend/client initialization and, for cloud backends, leaked async clients because the new instances are never closed.</comment>
<file context>
@@ -102,10 +102,10 @@ async def get_postgres_entity_count(conn, sync_id: UUID) -> int:
+ from airweave.domains.storage.factory import get_storage_backend
- arf_service = ArfService(storage=storage_backend)
+ arf_service = ArfService(storage=get_storage_backend())
count = await arf_service.get_entity_count(str(sync_id))
return count
</file context>
Summary
Extracts ARF (raw entity capture/replay) and Storage (file/object backend) out of the
monolithic
platform/package into self-contained domain modules underdomains/,establishing the "Code Blue" domain architecture pattern for infrastructure-level concerns.
What changed
domains/arf/— new domain owning the full ARF lifecycle: capture, replay, reader,manifest management. Protocols (
ArfServiceProtocol,ArfReaderProtocol), concreteservice, types, fakes, and co-located unit tests.
domains/storage/— new domain owning all persistent storage: backend protocol(
StorageBackend), four backend implementations (filesystem, S3, Azure Blob, GCS),SyncFileManager, factory, paths, exceptions, fakes, and co-located unit tests.StorageBackend,SyncFileManager, andArfServiceare nowfirst-class Container slots, injected via
Inject()across the codebase.platform/storage/andplatform/sync/arf/(all code migrated, not duplicated).domains.storageinstead ofplatform.storage.Why
The
platform/package had grown into a grab-bag mixing orchestration logic withinfrastructure concerns. Pulling storage and ARF into
domains/gives each module:Migration shape
platform/storage/→domains/storage/platform/storage/arf_reader→domains/arf/readerplatform/sync/arf/→domains/arf/