refactor: consolidate sync pipeline into self-contained domain with protocol-based DI#1614
refactor: consolidate sync pipeline into self-contained domain with protocol-based DI#1614felixschmetz wants to merge 13 commits intomainfrom
Conversation
| """Get all entities for a specific sync.""" | ||
| ... | ||
|
|
||
| async def bulk_get_by_entity_sync_and_definition( |
Check notice
Code scanning / CodeQL
Statement has no effect Note
| sync_context: SyncContext, | ||
| ) -> None: | ||
| """Delete orphaned entities from all handlers.""" | ||
| ... |
Check notice
Code scanning / CodeQL
Statement has no effect Note
| runtime: SyncRuntime, | ||
| ) -> None: | ||
| """Process a batch of entities through the full pipeline.""" | ||
| ... |
Check notice
Code scanning / CodeQL
Statement has no effect Note
| self, sync_context: SyncContext, runtime: SyncRuntime | ||
| ) -> None: | ||
| """Remove entities no longer present in the source.""" | ||
| ... |
Check notice
Code scanning / CodeQL
Statement has no effect Note
|
|
||
| async def cleanup_temp_files(self, sync_context: SyncContext, runtime: SyncRuntime) -> None: | ||
| """Clean up temporary files created during the sync.""" | ||
| ... |
Check notice
Code scanning / CodeQL
Statement has no effect Note
| execution_config: Optional[SyncConfig] = None, | ||
| ) -> SyncOrchestrator: | ||
| """Create and return a fully-wired SyncOrchestrator.""" | ||
| ... |
Check notice
Code scanning / CodeQL
Statement has no effect Note
There was a problem hiding this comment.
5 issues found across 26 files
Prompt for AI agents (unresolved issues)
Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.
<file name="backend/airweave/platform/sync/actions/entity/__init__.py">
<violation number="1" location="backend/airweave/platform/sync/actions/entity/__init__.py:6">
P1: Custom agent: **Check for Cursor Rules Drift**
Update the sync Cursor rule for this `domains/sync_pipeline` move. `.cursor/rules/sync-architecture.mdc` still only applies to `**/sync/**` and documents the old `platform/sync/...` layout / `SyncFactory.create_orchestrator()` API, so Cursor will keep giving stale guidance for these new imports.</violation>
</file>
<file name="backend/airweave/domains/entities/entity_repository.py">
<violation number="1" location="backend/airweave/domains/entities/entity_repository.py:12">
P1: Custom agent: **Explicit Protocol Implementation**
Make `EntityRepository` explicitly implement `EntityRepositoryProtocol`. The sync pipeline now consumes this dependency through the protocol in DI and constructor signatures, so the concrete repository should declare that contract directly.
(Based on your team's feedback about only requiring explicit protocol inheritance when the protocol has real polymorphic value.) [FEEDBACK_USED]</violation>
</file>
<file name="backend/airweave/domains/sync_pipeline/factory.py">
<violation number="1" location="backend/airweave/domains/sync_pipeline/factory.py:55">
P1: Custom agent: **Explicit Protocol Implementation**
`SyncFactory` has a matching `SyncFactoryProtocol` (in `protocols.py`) but does not explicitly inherit it. The protocol is used in the DI container and service injection (`SyncFactoryProtocol` type hints in `container.py` and `syncs/service.py`), so explicit inheritance ensures type-safety and catch signature mismatches early.
Change the class definition to:
```suggestion
class SyncFactory(SyncFactoryProtocol):
and add from .protocols import SyncFactoryProtocol to the imports.
(Based on your team's feedback about preserving behavior parity in refactors.) [FEEDBACK_USED]
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
| """ | ||
|
|
||
| @classmethod | ||
| def __init__( |
There was a problem hiding this comment.
P1: Custom agent: Explicit Protocol Implementation
SyncFactory has a matching SyncFactoryProtocol (in protocols.py) but does not explicitly inherit it. The protocol is used in the DI container and service injection (SyncFactoryProtocol type hints in container.py and syncs/service.py), so explicit inheritance ensures type-safety and catch signature mismatches early.
Change the class definition to:
| def __init__( | |
| class SyncFactory(SyncFactoryProtocol): |
and add from .protocols import SyncFactoryProtocol to the imports.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/airweave/domains/sync_pipeline/factory.py, line 55:
<comment>`SyncFactory` has a matching `SyncFactoryProtocol` (in `protocols.py`) but does not explicitly inherit it. The protocol is used in the DI container and service injection (`SyncFactoryProtocol` type hints in `container.py` and `syncs/service.py`), so explicit inheritance ensures type-safety and catch signature mismatches early.
Change the class definition to:
```suggestion
class SyncFactory(SyncFactoryProtocol):
and add from .protocols import SyncFactoryProtocol to the imports.
- def init(
-
self, -
sc_repo: SourceConnectionRepositoryProtocol,
</file context>
</details>
<a href="https://www.cubic.dev/action/fix/violation/88701d4f-9762-45e0-89d1-1c5588fbf654" target="_blank" rel="noopener noreferrer" data-no-image-dialog="true">
<picture>
<source media="(prefers-color-scheme: dark)" srcset="https://cubic.dev/buttons/fix-with-cubic-dark.svg">
<source media="(prefers-color-scheme: light)" srcset="https://cubic.dev/buttons/fix-with-cubic-light.svg">
<img alt="Fix with Cubic" src="https://cubic.dev/buttons/fix-with-cubic-dark.svg">
</picture>
</a>
There was a problem hiding this comment.
3 issues found across 6 files (changes from recent commits).
Prompt for AI agents (unresolved issues)
Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.
<file name="backend/airweave/platform/sync/actions/entity/__init__.py">
<violation number="1" location="backend/airweave/platform/sync/actions/entity/__init__.py:10">
P2: Add a return annotation to `__getattr__`; the current signature violates this repo's mypy settings and can fail static analysis.</violation>
<violation number="2" location="backend/airweave/platform/sync/actions/entity/__init__.py:14">
P1: Custom agent: **Check for Cursor Rules Drift**
Update `.cursor/rules/sync-architecture.mdc` for this sync-pipeline refactor. Its `**/sync/**` guidance still documents the old `SyncFactory.create_orchestrator()` setup, but this change now routes sync actions through `airweave.domains.sync_pipeline.*` and uses an injected `SyncFactory` instance, so Cursor guidance for sync files is out of date.</violation>
</file>
<file name="backend/airweave/domains/sync_pipeline/tests/test_entity_action_resolver.py">
<violation number="1" location="backend/airweave/domains/sync_pipeline/tests/test_entity_action_resolver.py:113">
P2: Patch `get_db_context` on the resolver module; patching `airweave.db.session` does not intercept the imported reference used by `EntityActionResolver`.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
backend/airweave/domains/sync_pipeline/tests/test_entity_action_resolver.py
Show resolved
Hide resolved
a530a82 to
f9b86a4
Compare
There was a problem hiding this comment.
CodeQL found more than 20 potential problems in the proposed changes. Check the Files changed tab for more details.
| organization_id: UUID, | ||
| ) -> int: | ||
| """Delete all memberships for a source connection.""" | ||
| ... |
Check notice
Code scanning / CodeQL
Statement has no effect Note
| organization_id: UUID, | ||
| ) -> AccessContext: | ||
| """Resolve user's access context by expanding group memberships.""" | ||
| ... |
Check notice
Code scanning / CodeQL
Statement has no effect Note
| organization_id: UUID, | ||
| ) -> Optional[AccessContext]: | ||
| """Resolve user's access context scoped to a collection.""" | ||
| ... |
Check notice
Code scanning / CodeQL
Statement has no effect Note
There was a problem hiding this comment.
2 issues found across 20 files (changes from recent commits).
Prompt for AI agents (unresolved issues)
Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.
<file name="backend/airweave/domains/sync_pipeline/factory.py">
<violation number="1" location="backend/airweave/domains/sync_pipeline/factory.py:32">
P1: Custom agent: **Check for Cursor Rules Drift**
Update the sync Cursor rule for the new `domains/sync_pipeline` layout. `.cursor/rules/sync-architecture.mdc` still targets `**/sync/**` and old `platform/sync/*` paths, so Cursor guidance will not apply to this refactored sync pipeline.</violation>
</file>
<file name="backend/airweave/domains/access_control/fakes/broker.py">
<violation number="1" location="backend/airweave/domains/access_control/fakes/broker.py:45">
P2: Returning `None` here makes the fake broker skip collection-scoped access checks by default, so tests can pass without exercising the same filtering behavior as production.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
| from airweave.domains.sync_pipeline.access_control_pipeline import AccessControlPipeline | ||
| from airweave.domains.sync_pipeline.access_control_resolver import ACActionResolver | ||
| from airweave.domains.sync_pipeline.builders import SyncContextBuilder | ||
| from airweave.domains.sync_pipeline.builders.destinations import DestinationsContextBuilder |
There was a problem hiding this comment.
P1: Custom agent: Check for Cursor Rules Drift
Update the sync Cursor rule for the new domains/sync_pipeline layout. .cursor/rules/sync-architecture.mdc still targets **/sync/** and old platform/sync/* paths, so Cursor guidance will not apply to this refactored sync pipeline.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/airweave/domains/sync_pipeline/factory.py, line 32:
<comment>Update the sync Cursor rule for the new `domains/sync_pipeline` layout. `.cursor/rules/sync-architecture.mdc` still targets `**/sync/**` and old `platform/sync/*` paths, so Cursor guidance will not apply to this refactored sync pipeline.</comment>
<file context>
@@ -28,8 +29,11 @@
from airweave.domains.sync_pipeline.access_control_pipeline import AccessControlPipeline
from airweave.domains.sync_pipeline.access_control_resolver import ACActionResolver
from airweave.domains.sync_pipeline.builders import SyncContextBuilder
+from airweave.domains.sync_pipeline.builders.destinations import DestinationsContextBuilder
+from airweave.domains.sync_pipeline.builders.source import SourceContextBuilder
from airweave.domains.sync_pipeline.builders.tracking import TrackingContextBuilder
</file context>
| ) -> Optional[AccessContext]: | ||
| if self._access_context is not None: | ||
| return self._access_context | ||
| return None |
There was a problem hiding this comment.
P2: Returning None here makes the fake broker skip collection-scoped access checks by default, so tests can pass without exercising the same filtering behavior as production.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/airweave/domains/access_control/fakes/broker.py, line 45:
<comment>Returning `None` here makes the fake broker skip collection-scoped access checks by default, so tests can pass without exercising the same filtering behavior as production.</comment>
<file context>
@@ -0,0 +1,60 @@
+ ) -> Optional[AccessContext]:
+ if self._access_context is not None:
+ return self._access_context
+ return None
+
+ def check_entity_access(
</file context>
Move SyncFactory, EntityPipeline, EntityActionResolver, EntityActionDispatcher to domains/sync_pipeline/ with protocols. Convert SyncFactory from classmethods to instance with injected deps (sc_repo, event_bus, embedders, entity_repo). Eliminate get_source_connection_id indirection chain. Remove direct embedder and access_token passing from SyncService/RunSyncActivity. Wire into DI container.
Add 15 tests for SyncFactory, EntityActionResolver, EntityPipeline covering DI wiring, action resolution (INSERT/UPDATE/KEEP), and orphan identification. Fix circular import in actions/__init__.py and actions/entity/__init__.py by converting eager re-exports to lazy __getattr__.
Move all remaining platform/sync/ modules (handlers, processors,
contexts, builders, config, types, actions, subscribers, tests) into
domains/sync_pipeline/. Replace direct crud.* calls with injected
repository protocols (EntityRepository, AccessControlMembershipRepository)
and inject ChunkEmbedProcessor via constructor DI through the factory chain.
- New domains/access_control/ domain with repo + protocol + fakes
- EntityRepository extended with 4 bulk write methods
- All tests updated to use injected mocks instead of module patches
- Circular imports eliminated; only platform/sync/{arf,token_manager,web_fetcher} remain
…with container DI - Add ACActionResolverProtocol + ACActionDispatcherProtocol; update AccessControlPipeline to depend on protocols instead of concrete classes - Eliminate isinstance check in EntityActionDispatcher by splitting into explicit destination_handlers + metadata_handler constructor params - Move lazy imports in SyncFactory to top-level (no circular dep risk) - Move AccessBroker from platform to domains/access_control with protocol, delete module-level singleton, wire via Container + factory - Update all consumers (AccessControlFilter, admin.py, SearchService) to receive broker from container instead of importing singleton - Add FakeAccessBroker, move broker tests to domains/, register in conftest
…iner init builders/source.py has a top-level container import, so importing it during core/container/__init__.py execution resolves the `container` name to the submodule instead of the variable — causing "module has no attribute 'source_lifecycle_service'" at runtime.
Replace platform/converters singleton system with domains/converters/ domain. ConverterRegistry is built in the container factory and injected into TextualRepresentationBuilder and ChunkEmbedProcessor, eliminating initialize_converters() calls and lazy imports.
| from airweave.domains.converters.fakes.registry import FakeConverterRegistry | ||
| from airweave.domains.sync_pipeline.processors.chunk_embed import ChunkEmbedProcessor | ||
|
|
||
| _TEXT_BUILDER_CLS = ( |
Check notice
Code scanning / CodeQL
Unused global variable Note test
| import pytest | ||
|
|
||
| from airweave.domains.converters.txt import TxtConverter | ||
| from airweave.domains.sync_pipeline.exceptions import EntityProcessingError |
Check notice
Code scanning / CodeQL
Unused import Note test
|
|
||
| def for_extension(self, ext: str) -> Optional[BaseTextConverter]: | ||
| """Return the converter for a given file extension, or None.""" | ||
| ... |
Check notice
Code scanning / CodeQL
Statement has no effect Note
|
|
||
| def for_web(self) -> BaseTextConverter: | ||
| """Return the web converter.""" | ||
| ... |
Check notice
Code scanning / CodeQL
Statement has no effect Note
| if text.count("\ufffd") == 0: | ||
| logger.debug(f"Detected encoding {detected_encoding} for {os.path.basename(path)}") | ||
| return text | ||
| except (UnicodeDecodeError, LookupError): |
Check notice
Code scanning / CodeQL
Empty except Note
There was a problem hiding this comment.
5 issues found across 41 files (changes from recent commits).
Prompt for AI agents (unresolved issues)
Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.
<file name="backend/airweave/domains/converters/registry.py">
<violation number="1" location="backend/airweave/domains/converters/registry.py:19">
P1: Custom agent: **Explicit Protocol Implementation**
Make `ConverterRegistry` explicitly inherit `ConverterRegistryProtocol`; this production registry is already consumed polymorphically through the container and sync pipeline.
(Based on your team's feedback about only requiring explicit protocol inheritance when the protocol has real polymorphic value.) [FEEDBACK_USED]</violation>
</file>
<file name="backend/airweave/domains/sync_pipeline/processors/chunk_embed.py">
<violation number="1" location="backend/airweave/domains/sync_pipeline/processors/chunk_embed.py:19">
P1: Custom agent: **Check for Cursor Rules Drift**
Update `.cursor/rules/sync-architecture.mdc` to match the `domains/sync_pipeline` refactor. Its frontmatter still scopes to `**/sync/**` and its architecture sections still describe `platform/sync/*`, so Cursor will miss these new files and continue suggesting the old sync layout.</violation>
<violation number="2" location="backend/airweave/domains/sync_pipeline/processors/chunk_embed.py:31">
P1: Custom agent: **Explicit Protocol Implementation**
Declare `ChunkEmbedProcessor` as explicitly implementing `ChunkEmbedProcessorProtocol`. The sync pipeline now depends on this processor through that protocol, so leaving the concrete class without the inheritance violates the explicit protocol implementation rule.
(Based on your team's feedback about only requiring explicit protocol inheritance when the protocol is actually used for polymorphic DI.) [FEEDBACK_USED]</violation>
</file>
<file name="backend/airweave/domains/converters/fakes/registry.py">
<violation number="1" location="backend/airweave/domains/converters/fakes/registry.py:26">
P2: `for_extension()` should preserve the real registry's `None` result for unsupported extensions; always returning a stub can hide unsupported-file regressions in tests.</violation>
</file>
<file name="backend/airweave/domains/sync_pipeline/pipeline/text_builder.py">
<violation number="1" location="backend/airweave/domains/sync_pipeline/pipeline/text_builder.py:30">
P1: This new required constructor breaks the existing `text_builder` module API that `SlackSource` still imports, so `airweave.platform.sources.slack` now fails to import.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
| from airweave.domains.converters.xlsx import XlsxConverter | ||
|
|
||
|
|
||
| class ConverterRegistry: |
There was a problem hiding this comment.
P1: Custom agent: Explicit Protocol Implementation
Make ConverterRegistry explicitly inherit ConverterRegistryProtocol; this production registry is already consumed polymorphically through the container and sync pipeline.
(Based on your team's feedback about only requiring explicit protocol inheritance when the protocol has real polymorphic value.)
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/airweave/domains/converters/registry.py, line 19:
<comment>Make `ConverterRegistry` explicitly inherit `ConverterRegistryProtocol`; this production registry is already consumed polymorphically through the container and sync pipeline.
(Based on your team's feedback about only requiring explicit protocol inheritance when the protocol has real polymorphic value.) </comment>
<file context>
@@ -0,0 +1,86 @@
+from airweave.domains.converters.xlsx import XlsxConverter
+
+
+class ConverterRegistry:
+ """Concrete registry that creates and owns all converter instances.
+
</file context>
| """ | ||
| """Unified processor that chunks text and computes embeddings.""" | ||
|
|
||
| def __init__(self, converter_registry: ConverterRegistryProtocol) -> None: |
There was a problem hiding this comment.
P1: Custom agent: Explicit Protocol Implementation
Declare ChunkEmbedProcessor as explicitly implementing ChunkEmbedProcessorProtocol. The sync pipeline now depends on this processor through that protocol, so leaving the concrete class without the inheritance violates the explicit protocol implementation rule.
(Based on your team's feedback about only requiring explicit protocol inheritance when the protocol is actually used for polymorphic DI.)
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/airweave/domains/sync_pipeline/processors/chunk_embed.py, line 31:
<comment>Declare `ChunkEmbedProcessor` as explicitly implementing `ChunkEmbedProcessorProtocol`. The sync pipeline now depends on this processor through that protocol, so leaving the concrete class without the inheritance violates the explicit protocol implementation rule.
(Based on your team's feedback about only requiring explicit protocol inheritance when the protocol is actually used for polymorphic DI.) </comment>
<file context>
@@ -25,24 +26,11 @@
- """
+ """Unified processor that chunks text and computes embeddings."""
+
+ def __init__(self, converter_registry: ConverterRegistryProtocol) -> None:
+ """Initialize with a converter registry for text building."""
+ self._text_builder = TextualRepresentationBuilder(converter_registry)
</file context>
| @@ -14,35 +14,23 @@ | |||
| import json | |||
There was a problem hiding this comment.
P1: Custom agent: Check for Cursor Rules Drift
Update .cursor/rules/sync-architecture.mdc to match the domains/sync_pipeline refactor. Its frontmatter still scopes to **/sync/** and its architecture sections still describe platform/sync/*, so Cursor will miss these new files and continue suggesting the old sync layout.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/airweave/domains/sync_pipeline/processors/chunk_embed.py, line 19:
<comment>Update `.cursor/rules/sync-architecture.mdc` to match the `domains/sync_pipeline` refactor. Its frontmatter still scopes to `**/sync/**` and its architecture sections still describe `platform/sync/*`, so Cursor will miss these new files and continue suggesting the old sync layout.</comment>
<file context>
@@ -14,8 +14,9 @@
+from airweave.domains.converters.protocols import ConverterRegistryProtocol
from airweave.domains.sync_pipeline.exceptions import SyncFailureError
-from airweave.domains.sync_pipeline.pipeline.text_builder import text_builder
+from airweave.domains.sync_pipeline.pipeline.text_builder import TextualRepresentationBuilder
from airweave.domains.sync_pipeline.processors.utils import filter_empty_representations
from airweave.platform.entities._base import BaseEntity, CodeFileEntity
</file context>
backend/airweave/domains/sync_pipeline/pipeline/text_builder.py
Outdated
Show resolved
Hide resolved
| def __init__(self, text: str = "fake-markdown") -> None: | ||
| self._stub = _StubConverter(text) | ||
|
|
||
| def for_extension(self, ext: str) -> Optional[BaseTextConverter]: |
There was a problem hiding this comment.
P2: for_extension() should preserve the real registry's None result for unsupported extensions; always returning a stub can hide unsupported-file regressions in tests.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/airweave/domains/converters/fakes/registry.py, line 26:
<comment>`for_extension()` should preserve the real registry's `None` result for unsupported extensions; always returning a stub can hide unsupported-file regressions in tests.</comment>
<file context>
@@ -0,0 +1,30 @@
+ def __init__(self, text: str = "fake-markdown") -> None:
+ self._stub = _StubConverter(text)
+
+ def for_extension(self, ext: str) -> Optional[BaseTextConverter]:
+ return self._stub
+
</file context>
…k federated search)
… OCR - Move ACL pipeline/resolver/dispatcher/handler/tracker/actions/schemas from sync_pipeline and platform/access_control into domains/access_control - Reorganize entity pipeline into sync_pipeline/entity/ sub-package - Consolidate OCR from core/protocols, adapters/ocr, platform/ocr into domains/ocr; eliminate MistralOcrAdapter passthrough - Convert TYPE_CHECKING guards to regular imports in ACL protocols - Update all importers across sources, tests, container factory
| from airweave.domains.access_control.pipeline import AccessControlPipeline | ||
| from airweave.domains.sync_pipeline.contexts import SyncContext | ||
| from airweave.domains.sync_pipeline.contexts.runtime import SyncRuntime | ||
| from airweave.domains.sync_pipeline.entity.pipeline import EntityPipeline |
Check failure
Code scanning / CodeQL
Module-level cyclic import Error
| from airweave.domains.sync_pipeline.pipeline.hash_computer import hash_computer | ||
| from airweave.domains.sync_pipeline.protocols import ( | ||
| EntityActionDispatcherProtocol, | ||
| EntityActionResolverProtocol, |
Check failure
Code scanning / CodeQL
Module-level cyclic import Error
| from airweave.domains.sync_pipeline.pipeline.entity_tracker import EntityTracker | ||
| from airweave.domains.sync_pipeline.pipeline.hash_computer import hash_computer | ||
| from airweave.domains.sync_pipeline.protocols import ( | ||
| EntityActionDispatcherProtocol, |
Check failure
Code scanning / CodeQL
Module-level cyclic import Error
| @property | ||
| def name(self) -> str: | ||
| """Handler name for logging and debugging.""" | ||
| ... |
Check notice
Code scanning / CodeQL
Statement has no effect Note
| Raises: | ||
| SyncFailureError: If any operation fails | ||
| """ | ||
| ... |
Check notice
Code scanning / CodeQL
Statement has no effect Note
| @property | ||
| def name(self) -> str: | ||
| """Handler name for logging and debugging.""" | ||
| ... |
Check notice
Code scanning / CodeQL
Statement has no effect Note
| sync_context: "SyncContext", | ||
| ) -> int: | ||
| """Handle a full action batch (main entry point).""" | ||
| ... |
Check notice
Code scanning / CodeQL
Statement has no effect Note
| sync_context: "SyncContext", | ||
| ) -> int: | ||
| """Handle upsert actions.""" | ||
| ... |
Check notice
Code scanning / CodeQL
Statement has no effect Note
| sync_context: "SyncContext", | ||
| ) -> int: | ||
| """Handle insert actions.""" | ||
| ... |
Check notice
Code scanning / CodeQL
Statement has no effect Note
| sync_context: "SyncContext", | ||
| ) -> int: | ||
| """Handle update actions.""" | ||
| ... |
Check notice
Code scanning / CodeQL
Statement has no effect Note
Move SyncFactory, EntityPipeline, EntityActionResolver, EntityActionDispatcher to domains/sync_pipeline/ with protocols. Convert SyncFactory from classmethods to instance with injected deps (sc_repo, event_bus, embedders, entity_repo). Eliminate get_source_connection_id indirection chain. Remove direct embedder and access_token passing from SyncService/RunSyncActivity. Wire into DI container.
Summary by cubic
Consolidated the sync pipeline into
airweave.domains.sync_pipelinewith protocol-based DI and moved converters, access control, and OCR into dedicated domains. This removes platform singletons/adapters, injects repositories/registries into processors and handlers, and updates container wiring.Refactors
SyncFactoryProtocol,ChunkEmbedProcessorProtocol,EntityRepositoryProtocol,AccessControlMembershipRepositoryProtocol,ACActionResolverProtocol,ACActionDispatcherProtocol, andConverterRegistryProtocol; replaced directcrud.*and converter lookups with injected repositories/registry.airweave.platform.sync.*toairweave.domains.sync_pipeline.*(builders, contexts, handlers, processors, config, types); simplifiedEntityActionDispatcher(explicitdestination_handlersandmetadata_handler).airweave.domains.access_control.*(pipeline, resolver, dispatcher, handler, tracker) with repository and broker protocols;AccessBrokernow resolved via DI.platformconverter singletons withairweave.domains.converters.*and a DI-backedConverterRegistryinjected intoTextualRepresentationBuilderandChunkEmbedProcessor.airweave.domains.ocr.*and movedOcrProviderunder domains; removedairweave.adapters.ocr.*and the Mistral adapter passthrough.AccessBrokerasAccessBrokerProtocol,AccessControlMembershipRepository,EntityRepository,ConverterRegistry,SyncFactoryProtocol; updated consumers to resolve via the container and switched imports toairweave.domains.sync_pipeline.config.SyncConfig.Bug Fixes
ConverterRegistryoptional for metadata-only flows (e.g., Slack federated search).SourceContextBuilder.Written for commit 9afe7e3. Summary will update on new commits.