Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .cursor/rules/backend-rules.mdc
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,12 @@ If necessary (like editing ), refer [sync-architecture.mdc](mdc:.cursor/rules/sy
- **Comprehensive Data**: Database + PostHog contain all search metadata, performance metrics, and feature usage data
- **Non-blocking**: Both persistence methods fail gracefully without affecting search functionality
- **Automatic Tracking**: All search operations (regular, streaming, legacy) tracked uniformly via `SearchService`

### Security

- Never use `random.*` for security-sensitive values — ruff
rule `S311` bans it
- Use `secrets.choice()`, `secrets.randbelow()`,
`secrets.token_bytes()`, or `secrets.token_urlsafe()` instead
- Seeded `random.Random(seed)` is acceptable **only** for
deterministic test-data generation (stub sources)
5 changes: 5 additions & 0 deletions .cursor/rules/frontend-rules.mdc
Original file line number Diff line number Diff line change
Expand Up @@ -341,3 +341,8 @@ try {
- Sensitive data stripped from errors
- No credentials in localStorage
- Secure OAuth state management

### 4. **Randomness**
- Never use `Math.random()` — it is banned by ESLint
- Use `crypto.getRandomValues()` for random bytes/integers
- Use `crypto.randomUUID()` for UUIDs
46 changes: 46 additions & 0 deletions backend/airweave/core/readable_id.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
"""Readable ID generation using a CSPRNG.

Provides a single ``generate_readable_id`` helper that converts a
human-readable name into a URL-safe slug with a random suffix, e.g.
``"finance-data-ab12x9"``. The suffix is drawn from
:func:`secrets.choice` (CSPRNG) rather than :mod:`random` (Mersenne
Twister).
"""

import re
import secrets
import string

_ALPHABET = string.ascii_lowercase + string.digits


def generate_readable_id(name: str) -> str:
"""Generate a readable ID from a name.

Converts the name to lowercase, replaces spaces with hyphens,
removes special characters, and appends a cryptographically random
6-character suffix to ensure uniqueness.

Args:
name: The human-readable name to convert.

Returns:
A URL-safe readable identifier (e.g. ``"finance-data-ab123x"``).
"""
# Convert to lowercase and replace spaces with hyphens
readable_id = name.lower().strip()

# Replace any character that's not a letter, number, or space with nothing
readable_id = re.sub(r"[^a-z0-9\s]", "", readable_id)
# Replace spaces with hyphens
readable_id = re.sub(r"\s+", "-", readable_id)
# Ensure no consecutive hyphens
readable_id = re.sub(r"-+", "-", readable_id)
# Trim hyphens from start and end
readable_id = readable_id.strip("-")

# Add random alphanumeric suffix (CSPRNG)
suffix = "".join(secrets.choice(_ALPHABET) for _ in range(6))
readable_id = f"{(readable_id + '-') if readable_id else ''}{suffix}"

return readable_id
5 changes: 2 additions & 3 deletions backend/airweave/domains/oauth/oauth2_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import asyncio
import base64
import hashlib
import random
import secrets
from typing import Any, Optional, Tuple
from urllib.parse import urlencode
Expand Down Expand Up @@ -573,7 +572,7 @@ async def _make_token_request(
logger.info(f"Received response: Status {response.status_code}, ")

if self._is_oauth_rate_limit_error(response):
delay = base_delay * (2**attempt) + random.uniform(0, 2)
delay = base_delay * (2**attempt) + secrets.randbelow(2001) / 1000
logger.warning(
f"OAuth rate limit hit, waiting {delay:.1f}s before retry "
f"(attempt {attempt + 1}/{max_retries})"
Expand All @@ -586,7 +585,7 @@ async def _make_token_request(

except httpx.HTTPStatusError as e:
if self._is_oauth_rate_limit_error(e.response):
delay = base_delay * (2**attempt) + random.uniform(0, 2)
delay = base_delay * (2**attempt) + secrets.randbelow(2001) / 1000
logger.warning(
f"OAuth rate limit hit (exception), waiting {delay:.1f}s before retry "
f"(attempt {attempt + 1}/{max_retries})"
Expand Down
6 changes: 3 additions & 3 deletions backend/airweave/email/services.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Email service for sending emails via Resend."""

import asyncio
import random
import secrets
import time
from datetime import datetime, timedelta, timezone
from typing import Optional
Expand Down Expand Up @@ -139,7 +139,7 @@ def _send_welcome_email_sync(to_email: str, user_name: str) -> None:
resend.api_key = settings.RESEND_API_KEY

# Generate random delay between 10 and 40 minutes
delay_minutes = random.randint(10, 40)
delay_minutes = secrets.randbelow(31) + 10

# Calculate scheduled time using ISO 8601 format
scheduled_time = datetime.now(timezone.utc) + timedelta(minutes=delay_minutes)
Expand Down Expand Up @@ -203,7 +203,7 @@ def _send_welcome_followup_email_sync(to_email: str, user_name: str) -> None:
resend.api_key = settings.RESEND_API_KEY

# Generate random delay between 30 and 60 minutes
delay_minutes = random.randint(30, 60)
delay_minutes = secrets.randbelow(31) + 30

# Schedule for 5 days from now plus random delay
scheduled_time = datetime.now(timezone.utc) + timedelta(days=5, minutes=delay_minutes)
Expand Down
106 changes: 55 additions & 51 deletions backend/airweave/platform/sources/ctti.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"""

import asyncio
import random
import secrets
from typing import Any, AsyncGenerator, Dict, Optional, Union

import asyncpg
Expand Down Expand Up @@ -141,7 +141,7 @@ async def _retry_with_backoff(self, func, *args, max_retries: int = 3, **kwargs)
if attempt < max_retries:
# Calculate delay with exponential backoff and jitter
base_delay = 2**attempt # 1s, 2s, 4s
jitter = random.uniform(0.1, 0.5)
jitter = 0.1 + secrets.randbelow(401) / 1000
delay = base_delay + jitter

self.logger.warning(
Expand Down Expand Up @@ -189,6 +189,58 @@ async def _close_pool(self) -> None:
await self.pool.close()
self.pool = None

async def _fetch_records(self, last_nct_id: str, remaining: int, total_synced: int, limit: int):
"""Fetch clinical trial records from the AACT database.

Handles sync-mode logging, query construction, and query execution
with retry logic.
"""
if last_nct_id:
self.logger.debug(
f"📊 Incremental sync from NCT_ID > {last_nct_id} "
f"({total_synced}/{limit} synced, {remaining} remaining)"
)
else:
self.logger.debug(f"🔄 Full sync (no cursor), limit={limit}")

pool = await self._ensure_pool()

if last_nct_id:
query = f"""
SELECT nct_id
FROM "{self.AACT_SCHEMA}"."{self.AACT_TABLE}"
WHERE nct_id IS NOT NULL AND nct_id > $1
ORDER BY nct_id ASC
LIMIT {remaining}
"""
query_args = [last_nct_id]
else:
query = f"""
SELECT nct_id
FROM "{self.AACT_SCHEMA}"."{self.AACT_TABLE}"
WHERE nct_id IS NOT NULL
ORDER BY nct_id ASC
LIMIT {remaining}
"""
query_args = []

async def _execute_query():
async with pool.acquire() as conn:
if last_nct_id:
self.logger.debug(
f"Fetching up to {remaining} clinical trials from AACT "
f"(NCT_ID > {last_nct_id})"
)
else:
self.logger.debug(
f"Fetching up to {remaining} clinical trials from AACT (full sync)"
)
records = await conn.fetch(query, *query_args)
self.logger.debug(f"Fetched {len(records)} clinical trial records")
return records

return await self._retry_with_backoff(_execute_query)

async def generate_entities(self) -> AsyncGenerator[CTTIWebEntity, None]:
"""Generate WebEntity instances for each nct_id in the AACT studies table.

Expand Down Expand Up @@ -218,55 +270,7 @@ async def generate_entities(self) -> AsyncGenerator[CTTIWebEntity, None]:
)
return

# Log sync mode
if last_nct_id:
self.logger.debug(
f"📊 Incremental sync from NCT_ID > {last_nct_id} "
f"({total_synced}/{limit} synced, {remaining} remaining)"
)
else:
self.logger.debug(f"🔄 Full sync (no cursor), limit={limit}")

pool = await self._ensure_pool()

# Build query with cursor-based filtering
# Use parameterized query to prevent SQL injection
# Use 'remaining' as the limit to enforce total limit across syncs
if last_nct_id:
query = f"""
SELECT nct_id
FROM "{self.AACT_SCHEMA}"."{self.AACT_TABLE}"
WHERE nct_id IS NOT NULL AND nct_id > $1
ORDER BY nct_id ASC
LIMIT {remaining}
"""
query_args = [last_nct_id]
else:
query = f"""
SELECT nct_id
FROM "{self.AACT_SCHEMA}"."{self.AACT_TABLE}"
WHERE nct_id IS NOT NULL
ORDER BY nct_id ASC
LIMIT {remaining}
"""
query_args = []

async def _execute_query():
async with pool.acquire() as conn:
if last_nct_id:
self.logger.debug(
f"Fetching up to {remaining} clinical trials from AACT "
f"(NCT_ID > {last_nct_id})"
)
else:
self.logger.debug(
f"Fetching up to {remaining} clinical trials from AACT (full sync)"
)
records = await conn.fetch(query, *query_args)
self.logger.debug(f"Fetched {len(records)} clinical trial records")
return records

records = await self._retry_with_backoff(_execute_query)
records = await self._fetch_records(last_nct_id, remaining, total_synced, limit)

self.logger.debug(f"Processing {len(records)} records into entities")
entities_created = 0
Expand Down
8 changes: 4 additions & 4 deletions backend/airweave/platform/sync/web_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import asyncio
import hashlib
import os
import random
import secrets
from typing import List
from uuid import uuid4

Expand Down Expand Up @@ -217,7 +217,7 @@ async def _retry_with_backoff(
if is_connection_error:
# Longer delays for connection issues
base_delay = 5 * (attempt + 1) # 5, 10, 15 seconds
jitter = random.uniform(1.0, 2.0)
jitter = 1.0 + secrets.randbelow(1001) / 1000
delay = base_delay + jitter

logger.warning(
Expand All @@ -227,7 +227,7 @@ async def _retry_with_backoff(
elif is_rate_limited:
# Medium delay for rate limiting
base_delay = 3 ** (attempt + 1) # 3, 9, 27 seconds
jitter = random.uniform(0.5, 1.0)
jitter = 0.5 + secrets.randbelow(501) / 1000
delay = base_delay + jitter

logger.warning(
Expand All @@ -237,7 +237,7 @@ async def _retry_with_backoff(
else:
# Standard exponential backoff
base_delay = 2 * (attempt + 1) # 2, 4, 6 seconds
jitter = random.uniform(0.1, 0.5)
jitter = 0.1 + secrets.randbelow(401) / 1000
delay = base_delay + jitter

logger.warning(
Expand Down
36 changes: 1 addition & 35 deletions backend/airweave/schemas/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,51 +3,17 @@
A collection is a group of different data sources that you can search using a single endpoint.
"""

import random
import re
import string
from datetime import datetime
from typing import Optional
from uuid import UUID

from pydantic import BaseModel, ConfigDict, EmailStr, Field, field_validator, model_validator

from airweave.core.readable_id import generate_readable_id
from airweave.core.shared_models import CollectionStatus
from airweave.platform.sync.config.base import SyncConfig


def generate_readable_id(name: str) -> str:
"""Generate a readable ID from a collection name.

Converts the name to lowercase, replaces spaces with hyphens,
removes special characters, and adds a random 6-character suffix
to ensure uniqueness.

Args:
name: The collection name to convert

Returns:
A URL-safe readable identifier (e.g., "finance-data-ab123")
"""
# Convert to lowercase and replace spaces with hyphens
readable_id = name.lower().strip()

# Replace any character that's not a letter, number, or space with nothing
readable_id = re.sub(r"[^a-z0-9\s]", "", readable_id)
# Replace spaces with hyphens
readable_id = re.sub(r"\s+", "-", readable_id)
# Ensure no consecutive hyphens
readable_id = re.sub(r"-+", "-", readable_id)
# Trim hyphens from start and end
readable_id = readable_id.strip("-")

# Add random alphanumeric suffix
suffix = "".join(random.choices(string.ascii_lowercase + string.digits, k=6))
readable_id = f"{readable_id}-{suffix}"

return readable_id


class CollectionBase(BaseModel):
"""Base schema for collections with common fields."""

Expand Down
36 changes: 1 addition & 35 deletions backend/airweave/schemas/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,50 +6,16 @@

"""

import random
import re
import string
from datetime import datetime
from typing import Optional
from uuid import UUID

from pydantic import BaseModel, ConfigDict, EmailStr, Field, field_validator, model_validator

from airweave.core.readable_id import generate_readable_id
from airweave.core.shared_models import ConnectionStatus, IntegrationType


def generate_readable_id(name: str) -> str:
"""Generate a readable ID from a connection name.

Converts the name to lowercase, replaces spaces with hyphens,
removes special characters, and adds a random 6-character suffix
to ensure uniqueness.

Args:
name: The connection name to convert

Returns:
A URL-safe readable identifier (e.g., "stripe-connection-ab123")
"""
# Convert to lowercase and replace spaces with hyphens
readable_id = name.lower().strip()

# Replace any character that's not a letter, number, or space with nothing
readable_id = re.sub(r"[^a-z0-9\s]", "", readable_id)
# Replace spaces with hyphens
readable_id = re.sub(r"\s+", "-", readable_id)
# Ensure no consecutive hyphens
readable_id = re.sub(r"-+", "-", readable_id)
# Trim hyphens from start and end
readable_id = readable_id.strip("-")

# Add random alphanumeric suffix
suffix = "".join(random.choices(string.ascii_lowercase + string.digits, k=6))
readable_id = f"{(readable_id + '-') if readable_id else ''}{suffix}"

return readable_id


class ConnectionBase(BaseModel):
"""Base schema for connections."""

Expand Down
Loading
Loading