Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 0 additions & 13 deletions backend/airweave/adapters/ocr/__init__.py

This file was deleted.

47 changes: 0 additions & 47 deletions backend/airweave/adapters/ocr/mistral.py

This file was deleted.

7 changes: 4 additions & 3 deletions backend/airweave/api/v1/endpoints/admin.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# mypy: ignore-errors
"""Admin-only API endpoints for organization management.

TODO: Enhance CRUD layer to support bypassing organization filtering cleanly.
Expand Down Expand Up @@ -34,6 +35,7 @@
from airweave.core.shared_models import FeatureFlag as FeatureFlagEnum
from airweave.crud.crud_organization_billing import organization_billing
from airweave.db.unit_of_work import UnitOfWork
from airweave.domains.access_control.protocols import AccessBrokerProtocol
from airweave.domains.billing.operations import BillingOperations
from airweave.domains.billing.repository import (
BillingPeriodRepository,
Expand All @@ -43,13 +45,13 @@
from airweave.domains.embedders.protocols import DenseEmbedderProtocol, SparseEmbedderProtocol
from airweave.domains.organizations.logic import generate_org_name
from airweave.domains.source_connections.protocols import SourceConnectionServiceProtocol
from airweave.domains.sync_pipeline.config import SyncConfig
from airweave.domains.syncs.protocols import SyncJobServiceProtocol
from airweave.domains.temporal.protocols import TemporalWorkflowServiceProtocol
from airweave.domains.usage.repository import UsageRepository
from airweave.models.organization import Organization
from airweave.models.organization_billing import OrganizationBilling
from airweave.models.user_organization import UserOrganization
from airweave.platform.sync.config import SyncConfig
from airweave.schemas.organization_billing import BillingPlan, BillingStatus

router = TrailingSlashRouter()
Expand Down Expand Up @@ -1409,14 +1411,13 @@ async def admin_get_user_principals(
db: AsyncSession = Depends(deps.get_db),
ctx: ApiContext = Depends(deps.get_context),
collection_repo: CollectionRepositoryProtocol = Inject(CollectionRepositoryProtocol),
access_broker: AccessBrokerProtocol = Inject(AccessBrokerProtocol),
) -> List[str]:
"""Admin-only: Get the resolved access principals for a user in a collection.

Returns all principals (user + group memberships) that would be used for
access control filtering when the user searches the collection.
"""
from airweave.platform.access_control.broker import access_broker

_require_admin_permission(ctx, FeatureFlagEnum.API_KEY_ADMIN_SYNC)

collection = await collection_repo.get_by_readable_id(
Expand Down
17 changes: 16 additions & 1 deletion backend/airweave/core/container/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
)
from airweave.core.protocols.identity import IdentityProvider
from airweave.core.protocols.payment import PaymentGatewayProtocol
from airweave.domains.access_control.protocols import AccessBrokerProtocol
from airweave.domains.auth_provider.protocols import (
AuthProviderRegistryProtocol,
AuthProviderServiceProtocol,
Expand All @@ -45,14 +46,18 @@
)
from airweave.domains.connect.protocols import ConnectServiceProtocol
from airweave.domains.connections.protocols import ConnectionRepositoryProtocol
from airweave.domains.converters.protocols import ConverterRegistryProtocol
from airweave.domains.credentials.protocols import IntegrationCredentialRepositoryProtocol
from airweave.domains.embedders.protocols import (
DenseEmbedderProtocol,
DenseEmbedderRegistryProtocol,
SparseEmbedderProtocol,
SparseEmbedderRegistryProtocol,
)
from airweave.domains.entities.protocols import EntityDefinitionRegistryProtocol
from airweave.domains.entities.protocols import (
EntityDefinitionRegistryProtocol,
EntityRepositoryProtocol,
)
from airweave.domains.oauth.protocols import (
OAuth1ServiceProtocol,
OAuth2ServiceProtocol,
Expand All @@ -75,6 +80,7 @@
SourceRegistryProtocol,
SourceServiceProtocol,
)
from airweave.domains.sync_pipeline.protocols import SyncFactoryProtocol
from airweave.domains.syncs.protocols import (
SyncCursorRepositoryProtocol,
SyncJobRepositoryProtocol,
Expand Down Expand Up @@ -186,6 +192,12 @@ async def my_endpoint(event_bus: EventBus = Inject(EventBus)):
sync_job_service: SyncJobServiceProtocol
sync_service: SyncServiceProtocol
sync_lifecycle: SyncLifecycleServiceProtocol
sync_factory: SyncFactoryProtocol

entity_repo: EntityRepositoryProtocol

# Access control broker (resolves user → group principals)
access_broker: AccessBrokerProtocol

# Temporal domain
temporal_workflow_service: TemporalWorkflowServiceProtocol
Expand Down Expand Up @@ -224,6 +236,9 @@ async def my_endpoint(event_bus: EventBus = Inject(EventBus)):
# Connect domain service (session-based frontend integration flows)
connect_service: ConnectServiceProtocol

# Converter registry (maps file extensions to converter instances)
converter_registry: ConverterRegistryProtocol

# OCR provider (with fallback chain + circuit breaking)
# Optional: None when no OCR backend (Mistral/Docling) is configured
ocr_provider: Optional[OcrProvider] = None
Expand Down
61 changes: 49 additions & 12 deletions backend/airweave/core/container/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@
PrometheusHttpMetrics,
PrometheusMetricsRenderer,
)
from airweave.adapters.ocr.docling import DoclingOcrAdapter
from airweave.adapters.ocr.fallback import FallbackOcrProvider
from airweave.adapters.ocr.mistral import MistralOcrAdapter
from airweave.adapters.pubsub.redis import RedisPubSub
from airweave.adapters.webhooks.endpoint_verifier import HttpEndpointVerifier
from airweave.adapters.webhooks.svix import SvixAdapter
Expand All @@ -37,13 +34,15 @@
from airweave.core.health.service import HealthService
from airweave.core.logging import logger
from airweave.core.metrics_service import PrometheusMetricsService
from airweave.core.protocols import CircuitBreaker, OcrProvider, PubSub
from airweave.core.protocols import CircuitBreaker, PubSub
from airweave.core.protocols.event_bus import EventBus
from airweave.core.protocols.identity import IdentityProvider
from airweave.core.protocols.payment import PaymentGatewayProtocol
from airweave.core.protocols.webhooks import WebhookPublisher
from airweave.core.redis_client import redis_client
from airweave.db.session import health_check_engine
from airweave.domains.access_control.broker import AccessBroker
from airweave.domains.access_control.repository import AccessControlMembershipRepository
from airweave.domains.auth_provider.registry import AuthProviderRegistry
from airweave.domains.auth_provider.service import AuthProviderService
from airweave.domains.browse_tree.repository import NodeSelectionRepository
Expand All @@ -54,6 +53,7 @@
VectorDbDeploymentMetadataRepository,
)
from airweave.domains.connections.repository import ConnectionRepository
from airweave.domains.converters.registry import ConverterRegistry
from airweave.domains.credentials.repository import IntegrationCredentialRepository
from airweave.domains.embedders.config import (
DENSE_EMBEDDER,
Expand All @@ -67,6 +67,7 @@
FastEmbedSparseEmbedder as DomainFastEmbedSparseEmbedder,
)
from airweave.domains.entities.entity_count_repository import EntityCountRepository
from airweave.domains.entities.entity_repository import EntityRepository
from airweave.domains.entities.registry import EntityDefinitionRegistry
from airweave.domains.oauth.callback_service import OAuthCallbackService
from airweave.domains.oauth.flow_service import OAuthFlowService
Expand All @@ -76,6 +77,10 @@
OAuthInitSessionRepository,
OAuthRedirectSessionRepository,
)
from airweave.domains.ocr.docling import DoclingOcrAdapter
from airweave.domains.ocr.fallback import FallbackOcrProvider
from airweave.domains.ocr.mistral.converter import MistralOCR
from airweave.domains.ocr.protocols import OcrProvider
from airweave.domains.organizations.protocols import UserOrganizationRepositoryProtocol
from airweave.domains.organizations.repository import OrganizationRepository as OrgRepo
from airweave.domains.organizations.repository import UserOrganizationRepository
Expand All @@ -89,6 +94,10 @@
from airweave.domains.sources.registry import SourceRegistry
from airweave.domains.sources.service import SourceService
from airweave.domains.sources.validation import SourceValidationService
from airweave.domains.sync_pipeline.factory import SyncFactory
from airweave.domains.sync_pipeline.processors.chunk_embed import ChunkEmbedProcessor
from airweave.domains.sync_pipeline.subscribers.progress_relay import SyncProgressRelay
from airweave.domains.syncs.service import SyncService
from airweave.domains.syncs.sync_cursor_repository import SyncCursorRepository
from airweave.domains.syncs.sync_job_repository import SyncJobRepository
from airweave.domains.syncs.sync_job_service import SyncJobService
Expand All @@ -105,7 +114,6 @@
from airweave.domains.webhooks.service import WebhookServiceImpl
from airweave.domains.webhooks.subscribers import WebhookEventSubscriber
from airweave.platform.auth.settings import integration_settings
from airweave.platform.sync.subscribers.progress_relay import SyncProgressRelay
from airweave.platform.temporal.client import TemporalClient


Expand Down Expand Up @@ -376,7 +384,34 @@ def create_container(settings: Settings) -> Container:
sparse_embedder = _create_sparse_embedder(sparse_embedder_registry)

# -----------------------------------------------------------------
# Collection service (needs collection_repo, sc_repo, sync_lifecycle, dense_registry)
# Access control membership repo + chunk embed processor
# -----------------------------------------------------------------
acl_membership_repo = AccessControlMembershipRepository()
access_broker = AccessBroker(acl_repo=acl_membership_repo)
converter_registry = ConverterRegistry(ocr_provider=ocr_provider)
chunk_embed_processor = ChunkEmbedProcessor(converter_registry=converter_registry)

# -----------------------------------------------------------------
# Sync factory + service
# -----------------------------------------------------------------
sync_factory = SyncFactory(
sc_repo=source_deps["sc_repo"],
event_bus=event_bus,
usage_checker=usage_checker,
dense_embedder=dense_embedder,
sparse_embedder=sparse_embedder,
entity_repo=sync_deps["entity_repo"],
acl_repo=acl_membership_repo,
processor=chunk_embed_processor,
)

sync_service = SyncService(
sync_job_service=sync_deps["sync_job_service"],
sync_factory=sync_factory,
)

# -----------------------------------------------------------------
# Collection service
# -----------------------------------------------------------------
collection_service = CollectionService(
collection_repo=source_deps["collection_repo"],
Expand Down Expand Up @@ -477,8 +512,12 @@ def create_container(settings: Settings) -> Container:
payment_gateway=billing_services["payment_gateway"],
sync_record_service=sync_deps["sync_record_service"],
sync_job_service=sync_deps["sync_job_service"],
sync_service=sync_deps["sync_service"],
sync_service=sync_service,
sync_lifecycle=sync_deps["sync_lifecycle"],
sync_factory=sync_factory,
entity_repo=sync_deps["entity_repo"],
access_broker=access_broker,
converter_registry=converter_registry,
temporal_workflow_service=sync_deps["temporal_workflow_service"],
temporal_schedule_service=sync_deps["temporal_schedule_service"],
usage_checker=usage_checker,
Expand Down Expand Up @@ -619,7 +658,7 @@ def _create_ocr_provider(
Returns None with a warning when no providers are available.
"""
try:
mistral_ocr = MistralOcrAdapter()
mistral_ocr = MistralOCR()
except Exception as e:
logger.error(f"Error creating Mistral OCR adapter: {e}")
mistral_ocr = None
Expand Down Expand Up @@ -852,12 +891,10 @@ def _create_sync_services(
4. SyncLifecycleService (needs everything above)
"""
entity_count_repo = EntityCountRepository()
entity_repo = EntityRepository()

sync_job_service = SyncJobService(sync_job_repo=sync_job_repo)

from airweave.domains.syncs.service import SyncService

sync_service = SyncService(sync_job_service=sync_job_service)
temporal_workflow_service = TemporalWorkflowService()

sync_record_service = SyncRecordService(
Expand Down Expand Up @@ -899,11 +936,11 @@ def _create_sync_services(
return {
"sync_record_service": sync_record_service,
"sync_job_service": sync_job_service,
"sync_service": sync_service,
"sync_lifecycle": sync_lifecycle,
"temporal_workflow_service": temporal_workflow_service,
"temporal_schedule_service": temporal_schedule_service,
"response_builder": response_builder,
"entity_repo": entity_repo,
}


Expand Down
2 changes: 1 addition & 1 deletion backend/airweave/core/protocols/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
MetricsService,
WorkerMetrics,
)
from airweave.core.protocols.ocr import OcrProvider
from airweave.core.protocols.payment import PaymentGatewayProtocol
from airweave.core.protocols.pubsub import PubSub, PubSubSubscription
from airweave.core.protocols.rate_limiter import RateLimiter
Expand All @@ -32,6 +31,7 @@
WebhookServiceProtocol,
)
from airweave.core.protocols.worker_metrics_registry import WorkerMetricsRegistryProtocol
from airweave.domains.ocr.protocols import OcrProvider

__all__ = [
"AgenticSearchMetrics",
Expand Down
2 changes: 1 addition & 1 deletion backend/airweave/core/sync_job_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from airweave.core.logging import logger
from airweave.core.shared_models import SyncJobStatus
from airweave.db.session import get_db_context
from airweave.platform.sync.pipeline.entity_tracker import SyncStats
from airweave.domains.sync_pipeline.pipeline.entity_tracker import SyncStats


class SyncJobService:
Expand Down
1 change: 1 addition & 0 deletions backend/airweave/domains/access_control/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Access control domain — membership repository and protocols."""
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from typing import TYPE_CHECKING, List

if TYPE_CHECKING:
from airweave.platform.access_control.schemas import MembershipTuple
from airweave.domains.access_control.schemas import MembershipTuple


# =============================================================================
Expand Down
Loading
Loading