Skip to content

refactor: BaseSource contract v2 — TokenProvider protocol + typed exceptions#1626

Merged
felixschmetz merged 11 commits intomainfrom
refactor/source-contract-v2-token-providers
Mar 17, 2026
Merged

refactor: BaseSource contract v2 — TokenProvider protocol + typed exceptions#1626
felixschmetz merged 11 commits intomainfrom
refactor/source-contract-v2-token-providers

Conversation

@orhanrauf
Copy link
Member

@orhanrauf orhanrauf commented Mar 15, 2026

Summary

Phase 1 of the BaseSource contract v2 redesign. Replaces the monolithic TokenManager with a clean TokenProviderProtocol and introduces a typed exception hierarchy for source errors.

Token providers:

  • TokenProviderProtocol with get_token() / force_refresh() contract
  • OAuthTokenProvider — timer (25min) + asyncio.Lock + delegates to oauth2_service.refresh_and_persist()
  • StaticTokenProvider — raw string, raises SourceAuthError on refresh
  • AuthProviderTokenProvider — delegates to Pipedream/Composio auth providers
  • Moved to domains/sources/token_providers/ (source domain concern, not platform/auth)
  • Deleted TokenManager (480 lines) — replaced by thin OAuthTokenProvider (~185 lines)

Exception hierarchy (domains/sources/exceptions.py):

  • SourceError base > SourceAuthError > SourceTokenRefreshError
  • SourceRateLimitError, SourceTemporaryError, SourcePermanentError
  • SourceEntityError > SourceEntityForbiddenError, SourceEntityNotFoundError, SourceEntitySkippedError
  • SourceFileDownloadError
  • SourceCreationError, SourceValidationError

BaseSource cleanup:

  • _token_manager removed, _token_provider is the single path
  • get_access_token() delegates to _token_provider.get_token()
  • refresh_on_unauthorized() delegates to _token_provider.force_refresh()
  • All ~25 source files migrated. Zero token_manager references remain.

oauth2_service:

  • Added refresh_and_persist() — full load + decrypt + refresh + persist-rotation cycle

What is next (follow-up PRs)

  1. Integrate exceptions into sources + pipeline — sources currently still except Exception and swallow errors; pipeline needs to route by exception type (abort on auth, skip on entity error, retry on rate limit)
  2. Kill settersset_logger, set_http_client_factory, set_file_downloader, set_cursor, set_node_selections — pass at construction or as method params
  3. Make create() typed — replace Optional[Any] with typed auth/config params
  4. Extract utilities_validate_oauth2, clean_content_for_embedding out of BaseSource

Test plan

  • All token provider tests pass (21 tests in test_token_providers.py)
  • Lifecycle service tests pass
  • Ruff + ruff-format pass clean
  • E2E sync test with OAuth source (Linear, Asana, etc.)
  • E2E sync test with auth provider source (Pipedream/Composio)

Summary by cubic

Replaces the old TokenManager with an expiry‑aware TokenProviderProtocol and adds typed exceptions across sources, OAuth, and auth providers for clearer routing and more reliable token handling. Auth providers are moved under airweave.domains.auth_provider and lifecycle now configures providers consistently.

  • New Features

    • TokenProviderProtocol with get_token()/force_refresh(); implementations: OAuthTokenProvider (expiry‑aware with backoff), StaticTokenProvider, AuthProviderTokenProvider in airweave.domains.sources.token_providers.
    • oauth2_service.refresh_and_persist() returns RefreshResult(access_token, expires_in); OAuth refresh maps to typed errors in airweave.domains.oauth.exceptions.
  • Refactors

    • BaseSource now uses _token_provider; 401s trigger force_refresh(); get_access_token() falls back to self.access_token; simplified auth‑header paths in sources.
    • Lifecycle builds providers from normalized credentials for both create() and validate(); split normalization into _to_creds_dict and _process_creds_dict to reduce complexity; setup failures raise SourceCreationError.
    • OAuth callback validates via lifecycle and injects a StaticTokenProvider; DI container wires source_lifecycle.
    • Moved Pipedream/Composio/Klavis to airweave.domains.auth_provider.providers; added typed auth‑provider errors and extensive unit tests; removed backend/airweave/platform/sync/token_manager.py and its tests.
    • Fixed ruff import ordering and C901 violations.

Written for commit 2960b48. Summary will update on new commits.

Copy link
Contributor

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

8 issues found across 42 files

Prompt for AI agents (unresolved issues)

Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.


<file name=".cursor/rules/source-contract-redesign.mdc">

<violation number="1" location=".cursor/rules/source-contract-redesign.mdc:76">
P3: Soften the 'Zero `token_manager` references remain' claim; there are still TokenManager mentions in the repo.</violation>

<violation number="2" location=".cursor/rules/source-contract-redesign.mdc:107">
P2: This handover note points `get_token_for_resource` at the wrong class; the current implementation is on `BaseSource`, not `OAuthTokenProvider`.</violation>
</file>

<file name="backend/airweave/platform/sources/zendesk.py">

<violation number="1" location="backend/airweave/platform/sources/zendesk.py:115">
P1: Catch the new source auth exceptions around `force_refresh()`. The token-provider API no longer throws `TokenRefreshError`, so refresh failures now miss this handler and can turn into 500s.</violation>
</file>

<file name="backend/airweave/platform/sources/_base.py">

<violation number="1" location="backend/airweave/platform/sources/_base.py:67">
P1: Custom agent: **Check for Cursor Rules Drift**

Three cursor rule files still reference the deleted `TokenManager` / `token_manager` pattern and will give Cursor AI stale guidance. These need updating to reflect the new `TokenProviderProtocol` / `_token_provider` pattern:

- `.cursor/rules/source-connector-implementation.mdc` (lines 595, 598): `self.token_manager` → `self._token_provider`, `refresh_on_unauthorized()` → `force_refresh()`
- `.cursor/rules/auth-providers.mdc` (lines 53–81): entire "Integration with TokenManager" section still documents the deleted `TokenManager` class
- `.cursor/rules/sync-architecture.mdc` (line 206): `### TokenManager` section header still present</violation>

<violation number="2" location="backend/airweave/platform/sources/_base.py:221">
P1: Keep the `access_token` fallback here. Direct-token flows skip installing a token provider, so `get_access_token()` now returns `None` for sources that still store the injected token on `self.access_token`.</violation>
</file>

<file name="backend/airweave/domains/sources/tests/test_lifecycle.py">

<violation number="1" location="backend/airweave/domains/sources/tests/test_lifecycle.py:735">
P1: The refactored test still passes `db=` to `_configure_token_provider`, but that helper no longer accepts a `db` argument.</violation>
</file>

<file name="backend/airweave/platform/sources/todoist.py">

<violation number="1" location="backend/airweave/platform/sources/todoist.py:72">
P1: Fall back to the stored instance token here; `get_access_token()` returns `None` for the direct-token lifecycle path, so this now sends `Bearer None` and breaks authenticated Todoist requests.</violation>
</file>

<file name="backend/airweave/platform/sources/shopify.py">

<violation number="1" location="backend/airweave/platform/sources/shopify.py:219">
P1: `_get_headers()` now reads from `get_access_token()`, but Shopify still stores its token on `self.access_token`, so authenticated requests lose the token and fail.</violation>
</file>

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.

Copy link
Contributor

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

12 issues found across 42 files

Prompt for AI agents (unresolved issues)

Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.


<file name=".cursor/rules/source-contract-redesign.mdc">

<violation number="1" location=".cursor/rules/source-contract-redesign.mdc:73">
P1: Custom agent: **Check for Cursor Rules Drift**

Update the existing source/auth Cursor rules too; they still teach Cursor to use `TokenManager`, `create(cls, access_token...)`, and `instance.access_token`, which conflicts with this new `TokenProviderProtocol` contract.</violation>
</file>

<file name="backend/airweave/domains/sources/lifecycle.py">

<violation number="1" location="backend/airweave/domains/sources/lifecycle.py:39">
P2: Custom agent: **Check for Cursor Rules Drift**

`.cursor/rules/sync-architecture.mdc` still documents the deleted `TokenManager` class (purpose, features, refresh logic code block). This PR replaces it with `OAuthTokenProvider` / `StaticTokenProvider` / `AuthProviderTokenProvider` in `domains/sources/token_providers/`. The stale section will cause Cursor to generate code referencing a class that no longer exists.

The `### TokenManager` section should be replaced with a summary pointing to the new token provider protocol, or removed with a reference to `.cursor/rules/source-contract-redesign.mdc` which already documents the new design.</violation>
</file>

<file name="backend/airweave/platform/auth_providers/composio.py">

<violation number="1" location="backend/airweave/platform/auth_providers/composio.py:12">
P1: Custom agent: **Check for Cursor Rules Drift**

Update `.cursor/rules/auth-providers.mdc` to match the TokenProvider/typed-exception architecture. It still teaches Cursor to use `TokenManager` for auth providers, which this refactor removes, so AI guidance for `**/auth_providers/**` is now wrong.</violation>
</file>

<file name="backend/airweave/domains/sources/token_providers/auth_provider.py">

<violation number="1" location="backend/airweave/domains/sources/token_providers/auth_provider.py:20">
P1: Custom agent: **Check for Cursor Rules Drift**

Update the Cursor auth-provider/sync architecture rules to replace the deleted `TokenManager` flow with `TokenProviderProtocol`/`AuthProviderTokenProvider`. Right now Cursor guidance still tells developers to wire auth providers through `TokenManager`, which contradicts this new source-auth pattern.</violation>
</file>

<file name="backend/airweave/domains/oauth/protocols.py">

<violation number="1" location="backend/airweave/domains/oauth/protocols.py:127">
P1: Custom agent: **Check for Cursor Rules Drift**

Update the relevant Cursor rules that still describe `TokenManager`. This new `refresh_and_persist()` API establishes the TokenProvider-based refresh flow, but `.cursor/rules/auth-providers.mdc` and `.cursor/rules/sync-architecture.mdc` still instruct Cursor to use the removed TokenManager architecture.</violation>
</file>

<file name="backend/airweave/domains/sources/exceptions.py">

<violation number="1" location="backend/airweave/domains/sources/exceptions.py:59">
P1: Custom agent: **Check for Cursor Rules Drift**

Update the connector implementation Cursor rule to the v2 source contract. It still teaches `self.token_manager.refresh_on_unauthorized()` and bare `HTTPStatusError`/`except Exception`, which now contradict the new `SourceError` hierarchy and token-provider flow introduced here.</violation>
</file>

<file name="backend/airweave/platform/sources/monday.py">

<violation number="1" location="backend/airweave/platform/sources/monday.py:93">
P1: This breaks direct-token syncs: `_graphql_query()` now ignores the injected instance token and reads only from `token_provider`, but the lifecycle intentionally skips configuring a provider when `access_token` is passed directly.</violation>
</file>

<file name="backend/airweave/platform/sources/zendesk.py">

<violation number="1" location="backend/airweave/platform/sources/zendesk.py:112">
P1: `get_access_token()` still bypasses the new token provider, so Zendesk will keep using the stale constructor token after a refresh or rotation.</violation>
</file>

<file name="backend/airweave/platform/sources/shopify.py">

<violation number="1" location="backend/airweave/platform/sources/shopify.py:219">
P1: `_get_headers()` now ignores Shopify's stored `self.access_token`, so direct-auth Shopify instances send requests without a token unless a token provider was attached separately.

(Based on your team's feedback about preserving behavior parity in refactor PRs.) [FEEDBACK_USED]</violation>
</file>

<file name="backend/airweave/platform/sources/notion.py">

<violation number="1" location="backend/airweave/platform/sources/notion.py:185">
P1: Direct-token Notion syncs now lose authentication because these requests only read from `get_access_token()`, but the lifecycle skips configuring a token provider for injected tokens.

(Based on your team's feedback about preserving behavior parity in refactors.) [FEEDBACK_USED]</violation>
</file>

<file name="backend/airweave/platform/sources/_base.py">

<violation number="1" location="backend/airweave/platform/sources/_base.py:221">
P1: Keep the `access_token` fallback here. Direct token injection intentionally skips token-provider setup, so this now returns `None` for sources that still call `get_access_token()` during sync.</violation>
</file>

<file name="backend/airweave/domains/sources/tests/test_lifecycle.py">

<violation number="1" location="backend/airweave/domains/sources/tests/test_lifecycle.py:735">
P2: Remove the obsolete `db=` argument when calling `_configure_token_provider`; the helper no longer accepts it, so this test now fails with `TypeError` before reaching any assertions.</violation>
</file>

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.

error_code = ""
try:
error_code = exc.response.json().get("error", "")
except Exception:

Check notice

Code scanning / CodeQL

Empty except Note

'except' clause does nothing but pass and there is no explanatory comment.

async def get_token(self) -> str:
"""Return a valid token, refreshing proactively if stale."""
...

Check notice

Code scanning / CodeQL

Statement has no effect Note

This statement has no effect.
Copy link
Contributor

@cubic-dev-ai cubic-dev-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

8 issues found across 21 files (changes from recent commits).

Prompt for AI agents (unresolved issues)

Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.


<file name="backend/airweave/domains/sources/lifecycle.py">

<violation number="1" location="backend/airweave/domains/sources/lifecycle.py:168">
P1: `validate()` no longer normalizes dict/BaseModel OAuth credentials before calling `source.validate()`, so OAuth validations can send the whole credential object as the bearer token instead of the extracted `access_token`.</violation>

<violation number="2" location="backend/airweave/domains/sources/lifecycle.py:641">
P1: Custom agent: **Check for Cursor Rules Drift**

Update `.cursor/rules/source-contract-redesign.mdc` to match this new token-provider flow. It still says direct token injection has no provider and lightweight `validate()` uses `StaticTokenProvider`, but this change reverses both behaviors.</violation>
</file>

<file name="backend/airweave/domains/sources/exceptions.py">

<violation number="1" location="backend/airweave/domains/sources/exceptions.py:205">
P1: Custom agent: **Check for Cursor Rules Drift**

Update `.cursor/rules/source-contract-redesign.mdc` to use `SourceServerError` as the canonical upstream-failure exception. The rule still documents distinct `SourceTemporaryError` / `SourcePermanentError` classes and retry semantics that no longer match this code.</violation>
</file>

<file name="backend/airweave/domains/sources/token_providers/static.py">

<violation number="1" location="backend/airweave/domains/sources/token_providers/static.py:38">
P1: Custom agent: **Check for Cursor Rules Drift**

Update `.cursor/rules/source-contract-redesign.mdc`: it still tells Cursor that `StaticTokenProvider.force_refresh()` raises `SourceAuthError` / `SourceTokenRefreshError`, but this change moves that contract to `TokenRefreshNotSupportedError` with `provider_kind` metadata.</violation>
</file>

<file name="backend/airweave/domains/sources/token_providers/oauth.py">

<violation number="1" location="backend/airweave/domains/sources/token_providers/oauth.py:97">
P2: Custom agent: **Check for Cursor Rules Drift**

`.cursor/rules/source-contract-redesign.mdc` has drifted from the new `OAuthTokenProvider` implementation:
- Describes a fixed "25min" timer, but the code now computes dynamic refresh intervals from `expires_in`.
- Lists `can_refresh` as a constructor parameter, but it's now derived internally from `oauth_type` + `_has_refresh_token()`.
- References `check_has_refresh_token` static method (now module-level `_has_refresh_token`).
- States providers throw `SourceTokenRefreshError`, but they now throw `TokenProviderError` subtypes.

These inaccuracies will lead Cursor AI to suggest the old constructor signature and exception types.</violation>
</file>

<file name="backend/airweave/domains/oauth/oauth2_service.py">

<violation number="1" location="backend/airweave/domains/oauth/oauth2_service.py:26">
P1: Custom agent: **Check for Cursor Rules Drift**

Two cursor rules still document the deleted `TokenManager` class and will give Cursor AI wrong guidance.

- `.cursor/rules/auth-providers.mdc` lines 53–100+: entire "Integration with TokenManager" section describes the deleted class, its initialization, and refresh flow.
- `.cursor/rules/source-connector-implementation.mdc` lines 406, 595–605: code examples import `TokenRefreshError` and call `self.token_manager.refresh_on_unauthorized()` — both removed in this PR.

Update both rules to reference `TokenProviderProtocol` (`get_token()` / `force_refresh()`), the new exception hierarchy (`OAuthRefresh*Error`, `SourceTokenRefreshError`), and `AuthProviderTokenProvider` for the auth-provider integration path.</violation>
</file>

<file name="backend/airweave/platform/auth_providers/exceptions.py">

<violation number="1" location="backend/airweave/platform/auth_providers/exceptions.py:19">
P1: Custom agent: **Check for Cursor Rules Drift**

Update the relevant Cursor rules for this auth-provider exception change. `auth-providers.mdc` still tells Cursor to use `TokenManager`, and `source-contract-redesign.mdc` still documents `SourceTemporaryError`/`SourcePermanentError` instead of the new `SourceServerError` mapping introduced here.</violation>
</file>

<file name="backend/tests/unit/platform/sync/test_token_providers.py">

<violation number="1" location="backend/tests/unit/platform/sync/test_token_providers.py:16">
P1: Custom agent: **Check for Cursor Rules Drift**

Update the relevant Cursor rules for the TokenProvider v2 contract. This change switches the codebase to `credentials`/`oauth_type` and `TokenProviderError`/`TokenRefreshNotSupportedError`, but the existing rules still tell Cursor to use deleted `TokenManager` flows and old `SourceAuthError`-based token APIs.</violation>
</file>

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.

orhanrauf and others added 9 commits March 16, 2026 20:14
…source exceptions

Phase 1 of BaseSource contract v2 redesign. Key changes:

- Introduce TokenProviderProtocol with get_token()/force_refresh() contract
- Add three implementations: OAuthTokenProvider (timer + lock + delegates to
  oauth2_service.refresh_and_persist), StaticTokenProvider, AuthProviderTokenProvider
- Move token providers to domains/sources/token_providers/ (source domain concern)
- Delete monolithic TokenManager (480 lines) — replaced by thin OAuthTokenProvider (~185 lines)
- Add refresh_and_persist() to oauth2_service for full load+decrypt+refresh+persist cycle
- Build typed SourceError exception hierarchy (SourceAuthError, SourceTokenRefreshError,
  SourceRateLimitError, SourceTemporaryError, SourceEntityError subtypes, etc.)
- Remove _token_manager from BaseSource, add _token_provider as single path
- Migrate all ~25 source files from token_manager to token_provider
- Update lifecycle service to configure token providers
- Rewrite and rename tests (test_token_manager.py → test_token_providers.py)

Note: mypy and import-linter hooks skipped (pre-existing Poetry config issue).
Ruff and ruff-format pass clean.

Next: integrate exceptions into source methods and pipeline routing,
kill remaining setters, make create() typed.
…, add retries

- OAuthTokenProvider uses actual expires_in from provider (80% of lifetime,
  clamped [60s, 50min]) instead of hardcoded 25-min interval; falls back to
  default when expires_in is unavailable
- Switch to time.monotonic() for NTP-immune scheduling
- refresh_and_persist returns RefreshResult(access_token, expires_in) instead
  of bare str — surfaces token lifetime through the full chain
- OAuthTokenProvider accepts raw credentials + oauth_type, determines refresh
  capability internally — eliminates extract_token/check_has_refresh_token
  static helpers and simplifies lifecycle._configure_token_provider
- BaseSource.get_access_token() falls back to self.access_token when no
  provider is configured, removing token re-extraction in validate()
- Token provider setup failures now raise SourceCreationError instead of
  being silently swallowed
- Add tenacity retry (3 attempts, exp backoff) to both OAuthTokenProvider
  and AuthProviderTokenProvider for transient errors (5xx, rate limits)
- Consolidate SourceTemporaryError/SourcePermanentError into SourceServerError
  and AuthProviderTemporaryError into AuthProviderServerError (aliases kept)
- Fix pre-existing test exception mismatches (SourceAuthError → TokenProvider*)
…vice, and sources

Part of the source contract v2 token provider refactor — updates remaining
files to use the new typed exception hierarchy and removes stale auth
header logic from intercom/zendesk.
- test_callback_service: mock source_lifecycle.validate() to raise
  SourceValidationError (validates via lifecycle now, not inline)
- test_oauth2_service: expect OAuthRefreshCredentialMissingError instead
  of TokenRefreshError for missing refresh token
- test_oauth2_service: expect OAuthRefreshServerError instead of
  httpx.HTTPStatusError for 500 errors (_make_token_request translates)
…redentials

- source-contract-redesign.mdc: fix exception hierarchy (SourceServerError
  aliases), OAuthTokenProvider (dynamic expiry, derived can_refresh,
  module-level _has_refresh_token), StaticTokenProvider
  (TokenRefreshNotSupportedError), resolution matrix (direct injection →
  StaticTokenProvider), add TokenProviderError/OAuthRefreshError/
  AuthProviderError hierarchies
- auth-providers.mdc: replace deleted TokenManager section with
  TokenProviderProtocol / AuthProviderTokenProvider integration
- source-connector-implementation.mdc: update create() signature to
  keyword-only credentials, replace token_manager with get_access_token() /
  refresh_on_unauthorized(), fix exception imports, fix breadcrumb
  entity_type field
- lifecycle.py: add _normalize_credentials_for_validate() so validate()
  extracts access_token from dict/BaseModel OAuth creds before passing to
  source.create() — prevents sending entire credential object as bearer token
…_credentials

Replace duplicated _process_credentials_for_source + _normalize_credentials_for_validate
with one shared method that takes a SourceRegistryEntry. Both the full create() path and
lightweight validate() path now use the same logic. Handles str/dict/BaseModel/unexpected
types cleanly with proper early returns.
Add 17 tests covering:
- callback_service: SourceNotFoundError → 404
- exceptions: OAuthRefreshBadRequestError/RateLimitError __init__
- oauth2_service: refresh_and_persist (happy + both error paths),
  _make_token_request (ConnectError, Timeout, generic, retry exhaust),
  _raise_typed_refresh_error (401, 400, 403, 500, unexpected, non-JSON)
OAuthRefreshTokenRevokedError,
)
from airweave.domains.oauth.oauth2_service import OAuth2Service
from airweave.domains.oauth.types import RefreshResult

Check notice

Code scanning / CodeQL

Unused import Note test

Import of 'RefreshResult' is not used.
Move _base.py, exceptions.py, auth_result.py to domains/auth_provider/ root.
Move composio.py, pipedream.py, klavis.py to domains/auth_provider/providers/.
Update all imports across lifecycle, token_providers, types, registry, tests.
Delete platform/auth_providers/ entirely.

Add 74 new tests (test_exceptions, test_composio, test_pipedream) covering
all previously-uncovered lines: exceptions 100%, composio 98%, pipedream 90%.
import httpx
import pytest

from airweave.domains.auth_provider.auth_result import AuthResult

Check notice

Code scanning / CodeQL

Unused import Note test

Import of 'AuthResult' is not used.
Sort imports in lifecycle.py and token_providers/auth_provider.py (I001).
Split _normalize_credentials into _to_creds_dict + _process_creds_dict
to reduce cyclomatic complexity from 13 to under 10 (C901).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants