Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
197 changes: 177 additions & 20 deletions backend/airweave/platform/destinations/vespa/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import json
import time
from datetime import datetime
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple
from urllib.parse import quote
from uuid import UUID

Expand All @@ -25,6 +25,8 @@
from airweave.platform.destinations.vespa.config import (
ALL_VESPA_SCHEMAS,
DELETE_BATCH_SIZE,
DELETE_CONCURRENCY,
DELETE_QUERY_HITS_LIMIT,
FEED_MAX_CONNECTIONS,
FEED_MAX_QUEUE_SIZE,
FEED_MAX_WORKERS,
Expand Down Expand Up @@ -183,10 +185,11 @@ def _feed_sync(docs_iter=doc_dicts, schema_name=schema):
# -------------------------------------------------------------------------

async def delete_by_selection(self, schema: str, selection: str) -> DeleteResult:
"""Delete documents using Vespa's selection-based bulk delete API.
"""Delete documents using Vespa's selection-based bulk delete API (visitor scan).

This is faster than query-then-delete because it performs server-side
deletion in a single streaming operation.
Uses a visitor that walks all buckets evaluating the selection expression.
Appropriate for broad deletions (by sync_id, collection_id) but slow for
targeted deletions -- use _delete_by_doc_ids for those.

Uses: DELETE /document/v1/{namespace}/{doctype}/docid?selection={expr}&cluster={cluster}

Expand Down Expand Up @@ -264,39 +267,193 @@ async def delete_by_parent_ids(
collection_id: UUID,
batch_size: int = DELETE_BATCH_SIZE,
) -> List[DeleteResult]:
"""Delete all documents for parent IDs across all schemas.
"""Delete all chunk documents for the given parent entity IDs.

Batches parent IDs to avoid overly long selection expressions.
Uses a two-step indexed approach:
1. YQL query on indexed original_entity_id to resolve Vespa doc IDs
2. Direct DELETE by document ID (O(1) per doc, parallelized)

Falls back to the old visitor-based selection delete on query failure.

Args:
parent_ids: List of parent entity IDs
parent_ids: List of parent entity IDs (pre-chunking IDs)
collection_id: Collection ID to scope deletion
batch_size: Max parent IDs per batch
batch_size: Max parent IDs per query batch

Returns:
List of DeleteResult for all operations
"""
if not parent_ids:
return []

results = []
total_deleted = 0
for i in range(0, len(parent_ids), batch_size):
batch = parent_ids[i : i + batch_size]

for schema in ALL_VESPA_SCHEMAS:
parent_conditions = " or ".join(
f"{schema}.airweave_system_metadata_original_entity_id=='{pid}'"
for pid in batch
)
selection = (
f"({parent_conditions}) and "
f"{schema}.airweave_system_metadata_collection_id=='{collection_id}'"
try:
doc_ids = await self._query_doc_ids_by_parent_ids(batch, collection_id)
if doc_ids:
count = await self._delete_by_doc_ids(doc_ids)
total_deleted += count
except Exception as e:
self._logger.warning(
f"[VespaClient] Fast delete failed for batch of {len(batch)} parent IDs, "
f"falling back to selection-based delete: {e}"
)
result = await self.delete_by_selection(schema, selection)
results.append(result)
total_deleted += await self._delete_by_parent_ids_selection(batch, collection_id)

return [DeleteResult(deleted_count=total_deleted, schema=None)]

async def _query_doc_ids_by_parent_ids(
self,
parent_ids: List[str],
collection_id: UUID,
) -> List[Tuple[str, str]]:
"""Query Vespa for all chunk document IDs belonging to the given parent entity IDs.

Uses indexed fields (both fast-search) for an efficient B-tree lookup
instead of a visitor scan.

Args:
parent_ids: Batch of parent entity IDs to resolve
collection_id: Collection ID to scope the query

Returns:
List of (schema_name, doc_id) tuples for direct deletion
"""
escaped_ids = ", ".join(f"'{pid}'" for pid in parent_ids)
Copy link
Contributor

@cubic-dev-ai cubic-dev-ai bot Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2: Parent IDs are interpolated into YQL without escaping single quotes, despite the variable being named escaped_ids. Replace embedded single quotes to avoid malformed YQL and misleading naming.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/airweave/platform/destinations/vespa/client.py, line 323:

<comment>Parent IDs are interpolated into YQL without escaping single quotes, despite the variable being named `escaped_ids`. Replace embedded single quotes to avoid malformed YQL and misleading naming.</comment>

<file context>
@@ -264,39 +267,193 @@ async def delete_by_parent_ids(
+        Returns:
+            List of (schema_name, doc_id) tuples for direct deletion
+        """
+        escaped_ids = ", ".join(f"'{pid}'" for pid in parent_ids)
+        source_list = ", ".join(ALL_VESPA_SCHEMAS)
+        yql = (
</file context>
Suggested change
escaped_ids = ", ".join(f"'{pid}'" for pid in parent_ids)
escaped_ids = ", ".join(f"'{pid.replace(chr(39), chr(39)*2)}'" for pid in parent_ids)
Fix with Cubic

source_list = ", ".join(ALL_VESPA_SCHEMAS)
yql = (
f"select documentid, sddocname() from sources {source_list} where "
f"airweave_system_metadata_original_entity_id in ({escaped_ids}) and "
f"airweave_system_metadata_collection_id = '{collection_id}'"
)
query_params = {
"yql": yql,
"hits": DELETE_QUERY_HITS_LIMIT,
Copy link
Contributor

@cubic-dev-ai cubic-dev-ai bot Mar 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: Silent data truncation when matching documents exceed DELETE_QUERY_HITS_LIMIT. If the query returns exactly the limit, remaining documents are orphaned with no warning. Check response.json['root']['fields']['totalCount'] against len(hits) and either paginate or log a warning and fall back to the selection-based delete for completeness.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/airweave/platform/destinations/vespa/client.py, line 332:

<comment>Silent data truncation when matching documents exceed `DELETE_QUERY_HITS_LIMIT`. If the query returns exactly the limit, remaining documents are orphaned with no warning. Check `response.json['root']['fields']['totalCount']` against `len(hits)` and either paginate or log a warning and fall back to the selection-based delete for completeness.</comment>

<file context>
@@ -264,39 +267,193 @@ async def delete_by_parent_ids(
+        )
+        query_params = {
+            "yql": yql,
+            "hits": DELETE_QUERY_HITS_LIMIT,
+            "timeout": "30s",
+            "summary": "none",
</file context>
Fix with Cubic

"timeout": "30s",
"summary": "none",
}

start = time.perf_counter()
response: Any = await asyncio.to_thread(self.app.query, body=query_params)
elapsed_ms = (time.perf_counter() - start) * 1000

if not response.is_successful():
raw = response.json if hasattr(response, "json") else {}
error_msg = raw.get("root", {}).get("errors", str(raw))
raise RuntimeError(f"Doc ID query failed: {error_msg}")

hits: List[Dict[str, Any]] = response.hits or []
self._logger.debug(
f"[VespaClient] Resolved {len(hits)} doc IDs for {len(parent_ids)} parent IDs "
f"in {elapsed_ms:.1f}ms"
)

results: List[Tuple[str, str]] = []
for hit in hits:
full_id = hit.get("id", "")
schema, doc_id = self._parse_vespa_document_id(full_id)
if schema and doc_id:
results.append((schema, doc_id))

return results

@staticmethod
def _parse_vespa_document_id(full_id: str) -> Tuple[Optional[str], Optional[str]]:
"""Parse Vespa's full document ID into (schema, doc_id).

Vespa document IDs look like: id:airweave:file_entity::file_entity_slack_msg_123__chunk_0
Format: id:{namespace}:{schema}::{user_doc_id}

Returns:
(schema, doc_id) or (None, None) if parsing fails
"""
if not full_id.startswith("id:"):
return None, None
parts = full_id.split("::", 1)
if len(parts) != 2:
return None, None
prefix = parts[0]
doc_id = parts[1]
prefix_parts = prefix.split(":")
if len(prefix_parts) < 3:
return None, None
schema = prefix_parts[2]
return schema, doc_id

async def _delete_by_doc_ids(
self,
doc_ids: List[Tuple[str, str]],
) -> int:
"""Delete documents by their Vespa document IDs using parallel direct DELETEs.

Each delete is an O(1) bucket hash lookup (no visitor scan).

Args:
doc_ids: List of (schema, doc_id) tuples

Returns:
Number of successfully deleted documents
"""
if not doc_ids:
return 0

base_url = f"{settings.VESPA_URL}:{settings.VESPA_PORT}"
semaphore = asyncio.Semaphore(DELETE_CONCURRENCY)
deleted = 0
failed = 0

async with httpx.AsyncClient(timeout=settings.VESPA_TIMEOUT) as client:

async def _delete_one(schema: str, doc_id: str) -> bool:
url = f"{base_url}/document/v1/airweave/{schema}/docid/{quote(doc_id, safe='')}"
async with semaphore:
resp = await client.delete(url)
return resp.status_code == 200

start = time.perf_counter()
tasks = [_delete_one(schema, doc_id) for schema, doc_id in doc_ids]
results = await asyncio.gather(*tasks, return_exceptions=True)
elapsed_ms = (time.perf_counter() - start) * 1000

for r in results:
if r is True:
deleted += 1
else:
failed += 1

if failed > 0:
self._logger.warning(
f"[VespaClient] Direct delete: {deleted} ok, {failed} failed "
f"out of {len(doc_ids)} in {elapsed_ms:.0f}ms"
)
else:
self._logger.info(
f"[VespaClient] Direct delete: {deleted}/{len(doc_ids)} docs in {elapsed_ms:.0f}ms"
)

return deleted

async def _delete_by_parent_ids_selection(
self,
parent_ids: List[str],
collection_id: UUID,
) -> int:
"""Fallback: delete by parent IDs using the old visitor-based selection scan."""
total = 0
for schema in ALL_VESPA_SCHEMAS:
parent_conditions = " or ".join(
f"{schema}.airweave_system_metadata_original_entity_id=='{pid}'"
for pid in parent_ids
)
selection = (
f"({parent_conditions}) and "
f"{schema}.airweave_system_metadata_collection_id=='{collection_id}'"
)
result = await self.delete_by_selection(schema, selection)
total += result.deleted_count
return total

def _build_bulk_delete_url(self, schema: str, selection: str) -> str:
"""Build the URL for Vespa bulk delete operation."""
base_url = f"{settings.VESPA_URL}:{settings.VESPA_PORT}"
Expand Down
10 changes: 8 additions & 2 deletions backend/airweave/platform/destinations/vespa/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,14 @@
# Delete Settings
# =============================================================================

# Batch size for bulk_delete_by_parent_ids
DELETE_BATCH_SIZE = 50
# Max parent IDs per query batch in bulk_delete_by_parent_ids
DELETE_BATCH_SIZE = 200

# Max concurrent direct-delete HTTP requests (semaphore limit)
DELETE_CONCURRENCY = 20

# Max doc IDs to resolve per YQL query (Vespa query result limit)
DELETE_QUERY_HITS_LIMIT = 10000

# =============================================================================
# Vespa Schema Names
Expand Down
8 changes: 5 additions & 3 deletions backend/airweave/platform/destinations/vespa/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
Simple Pydantic models for type safety and clear interfaces between components.
"""

from typing import Any, Dict, List
from typing import Any, Dict, List, Optional

from pydantic import BaseModel, ConfigDict, Field

Expand Down Expand Up @@ -39,8 +39,10 @@ class DeleteResult(BaseModel):
model_config = ConfigDict(populate_by_name=True)

deleted_count: int = Field(default=0, description="Number of deleted documents")
schema_name: str = Field(
..., alias="schema", description="Schema the deletion was performed on"
schema_name: Optional[str] = Field(
default=None,
alias="schema",
description="Schema the deletion was performed on (None for cross-schema)",
)


Expand Down
Loading
Loading