From 078d6141e609af1b5d7f52e7a1ee7ade1efb2b6e Mon Sep 17 00:00:00 2001 From: Ashish-Abraham Date: Sat, 1 Nov 2025 19:07:15 +0530 Subject: [PATCH 1/3] feat(mcp): file locking mechanism on snapshot --- packages/mcp/package.json | 4 +- packages/mcp/src/handlers.ts | 26 ++++++++++++ packages/mcp/src/lock.ts | 77 ++++++++++++++++++++++++++++++++++++ 3 files changed, 106 insertions(+), 1 deletion(-) create mode 100644 packages/mcp/src/lock.ts diff --git a/packages/mcp/package.json b/packages/mcp/package.json index 40f65e94..e3fa00f8 100644 --- a/packages/mcp/package.json +++ b/packages/mcp/package.json @@ -20,10 +20,12 @@ "dependencies": { "@zilliz/claude-context-core": "workspace:*", "@modelcontextprotocol/sdk": "^1.12.1", - "zod": "^3.25.55" + "zod": "^3.25.55", + "proper-lockfile": "^4.1.2" }, "devDependencies": { "@types/node": "^20.0.0", + "@types/proper-lockfile": "^4.1.4", "tsx": "^4.19.4", "typescript": "^5.0.0" }, diff --git a/packages/mcp/src/handlers.ts b/packages/mcp/src/handlers.ts index 1530d0c3..e61ff777 100644 --- a/packages/mcp/src/handlers.ts +++ b/packages/mcp/src/handlers.ts @@ -1,6 +1,7 @@ import * as fs from "fs"; import * as path from "path"; import * as crypto from "crypto"; +import { isCurrentProcessLeader, acquireLock } from './lock'; import { Context, COLLECTION_LIMIT_MESSAGE } from "@zilliz/claude-context-core"; import { SnapshotManager } from "./snapshot.js"; import { ensureAbsolutePath, truncateContent, trackCodebasePath } from "./utils.js"; @@ -29,6 +30,11 @@ export class ToolHandlers { * - If local snapshot is missing directories (exist in cloud), ignore them */ private async syncIndexedCodebasesFromCloud(): Promise { + if (!isCurrentProcessLeader()) { + console.log('[SYNC-CLOUD] This process is a follower. Skipping cloud sync.'); + return; + } + try { console.log(`[SYNC-CLOUD] šŸ”„ Syncing indexed codebases from Zilliz Cloud...`); @@ -142,6 +148,16 @@ export class ToolHandlers { } public async handleIndexCodebase(args: any) { + if (!isCurrentProcessLeader()) { + return { + content: [{ + type: "text", + text: "Another process is already indexing. This process is a follower and cannot index." + }], + isError: true + }; + } + const { path: codebasePath, force, splitter, customExtensions, ignorePatterns } = args; const forceReindex = force || false; const splitterType = splitter || 'ast'; // Default to AST @@ -569,6 +585,16 @@ export class ToolHandlers { } public async handleClearIndex(args: any) { + if (!isCurrentProcessLeader()) { + return { + content: [{ + type: "text", + text: "Another process is already indexing. This process is a follower and cannot index." + }], + isError: true + }; + } + const { path: codebasePath } = args; if (this.snapshotManager.getIndexedCodebases().length === 0 && this.snapshotManager.getIndexingCodebases().length === 0) { diff --git a/packages/mcp/src/lock.ts b/packages/mcp/src/lock.ts new file mode 100644 index 00000000..bcbc6a92 --- /dev/null +++ b/packages/mcp/src/lock.ts @@ -0,0 +1,77 @@ + +import os from 'os'; +import path from 'path'; +import fs from 'fs'; +import lockfile from 'proper-lockfile'; + +const CONTEXT_DIR = path.join(os.homedir(), '.context'); +const LOCK_FILE = path.join(CONTEXT_DIR, 'leader.lock'); + +// Ensure the .context directory exists +if (!fs.existsSync(CONTEXT_DIR)) { + fs.mkdirSync(CONTEXT_DIR, { recursive: true }); +} + +let isLeader = false; +let lockInterval: NodeJS.Timeout | undefined; + +export async function acquireLock(): Promise { + if (isLeader) { + return true; + } + try { + // Using flock is generally more reliable as the lock is released by the OS if the process dies. + // proper-lockfile will use flock on systems that support it (Linux, BSD, etc.) + // and fall back to other mechanisms on systems that don't (like Windows). + await lockfile.lock(LOCK_FILE, { retries: 0, realpath: false }); + isLeader = true; + console.log('Acquired leader lock. This process is now the leader.'); + if (lockInterval) { + clearInterval(lockInterval); + lockInterval = undefined; + } + return true; + } catch (error) { + console.log('Could not acquire leader lock, running as follower.'); + isLeader = false; + if (!lockInterval) { + lockInterval = setInterval(acquireLock, 5000); // Check every 5 seconds + } + return false; + } +} + +export async function releaseLock(): Promise { + if (isLeader) { + try { + await lockfile.unlock(LOCK_FILE, { realpath: false }); + isLeader = false; + console.log('Released leader lock.'); + } catch (error) { + console.error('Error releasing leader lock:', error); + } + } +} + +export function isCurrentProcessLeader(): boolean { + return isLeader; +} + +export function getLockFilePath(): string { + return LOCK_FILE; +} + +// Graceful shutdown +process.on('exit', async () => { + await releaseLock(); +}); + +process.on('SIGINT', async () => { + await releaseLock(); + process.exit(); +}); + +process.on('SIGTERM', async () => { + await releaseLock(); + process.exit(); +}); From c9612d8c7a0eb3d49c91e4eeaa3385c6ba2ea5bb Mon Sep 17 00:00:00 2001 From: Ashish-Abraham Date: Tue, 20 Jan 2026 22:22:31 +0530 Subject: [PATCH 2/3] added acquire lock correstions --- packages/mcp/src/handlers.ts | 204 ++++++------- packages/mcp/src/index.ts | 21 +- packages/mcp/src/lock.ts | 266 +++++++++++----- packages/mcp/src/snapshot.ts | 574 ++++++++++------------------------- packages/mcp/src/sync.ts | 7 + 5 files changed, 465 insertions(+), 607 deletions(-) diff --git a/packages/mcp/src/handlers.ts b/packages/mcp/src/handlers.ts index e61ff777..cb3a3f6a 100644 --- a/packages/mcp/src/handlers.ts +++ b/packages/mcp/src/handlers.ts @@ -1,10 +1,12 @@ import * as fs from "fs"; import * as path from "path"; import * as crypto from "crypto"; -import { isCurrentProcessLeader, acquireLock } from './lock'; import { Context, COLLECTION_LIMIT_MESSAGE } from "@zilliz/claude-context-core"; import { SnapshotManager } from "./snapshot.js"; import { ensureAbsolutePath, truncateContent, trackCodebasePath } from "./utils.js"; +import { isCurrentProcessLeader } from "./lock.js"; + + export class ToolHandlers { private context: Context; @@ -18,7 +20,6 @@ export class ToolHandlers { this.currentWorkspace = process.cwd(); console.log(`[WORKSPACE] Current workspace: ${this.currentWorkspace}`); } - /** * Sync indexed codebases from Zilliz Cloud collections * This method fetches all collections from the vector database, @@ -30,8 +31,9 @@ export class ToolHandlers { * - If local snapshot is missing directories (exist in cloud), ignore them */ private async syncIndexedCodebasesFromCloud(): Promise { + // Leader check - followers should not sync from cloud (write operation) if (!isCurrentProcessLeader()) { - console.log('[SYNC-CLOUD] This process is a follower. Skipping cloud sync.'); + console.log('[SYNC-CLOUD] ā„¹ļø Follower mode: Skipping cloud sync (read-only)'); return; } @@ -56,7 +58,7 @@ export class ToolHandlers { this.snapshotManager.removeIndexedCodebase(codebasePath); console.log(`[SYNC-CLOUD] āž– Removed local codebase: ${codebasePath}`); } - this.snapshotManager.saveCodebaseSnapshot(); + await this.snapshotManager.saveCodebaseSnapshot(); console.log(`[SYNC-CLOUD] šŸ’¾ Updated snapshot to match empty cloud state`); } return; @@ -134,7 +136,7 @@ export class ToolHandlers { console.log(`[SYNC-CLOUD] ā„¹ļø Skipping addition of cloud codebases not present locally (per sync policy)`); if (hasChanges) { - this.snapshotManager.saveCodebaseSnapshot(); + await this.snapshotManager.saveCodebaseSnapshot(); console.log(`[SYNC-CLOUD] šŸ’¾ Updated snapshot to match cloud state`); } else { console.log(`[SYNC-CLOUD] āœ… Local snapshot already matches cloud state`); @@ -148,11 +150,12 @@ export class ToolHandlers { } public async handleIndexCodebase(args: any) { + // Leader check if (!isCurrentProcessLeader()) { return { content: [{ type: "text", - text: "Another process is already indexing. This process is a follower and cannot index." + text: "Error: This MCP instance is a follower and cannot perform indexing operations. Only the leader instance can modify the index." }], isError: true }; @@ -164,7 +167,11 @@ export class ToolHandlers { const customFileExtensions = customExtensions || []; const customIgnorePatterns = ignorePatterns || []; + try { + + await this.snapshotManager.refreshFromFile(); + // Sync indexed codebases from cloud first await this.syncIndexedCodebasesFromCloud(); @@ -246,9 +253,21 @@ export class ToolHandlers { // CRITICAL: Pre-index collection creation validation try { console.log(`[INDEX-VALIDATION] šŸ” Validating collection creation capability`); - const canCreateCollection = await this.context.getVectorDatabase().checkCollectionLimit(); + //dummy collection name + const collectionName = `dummy_collection_${Date.now()}`; + await this.context.getVectorDatabase().createCollection(collectionName, 128); + if (await this.context.getVectorDatabase().hasCollection(collectionName)) { + console.log(`[INDEX-VALIDATION] ā„¹ļø Dummy collection created successfully`); + await this.context.getVectorDatabase().dropCollection(collectionName); + } else { + console.log(`[INDEX-VALIDATION] āŒ Dummy collection creation failed`); + } + console.log(`[INDEX-VALIDATION] āœ… Collection creation validation completed`); + } catch (validationError: any) { + const errorMessage = typeof validationError === 'string' ? validationError : + (validationError instanceof Error ? validationError.message : String(validationError)); - if (!canCreateCollection) { + if (errorMessage === COLLECTION_LIMIT_MESSAGE || errorMessage.includes(COLLECTION_LIMIT_MESSAGE)) { console.error(`[INDEX-VALIDATION] āŒ Collection limit validation failed: ${absolutePath}`); // CRITICAL: Immediately return the COLLECTION_LIMIT_MESSAGE to MCP client @@ -259,19 +278,17 @@ export class ToolHandlers { }], isError: true }; + } else { + // Handle other collection creation errors + console.error(`[INDEX-VALIDATION] āŒ Collection creation validation failed:`, validationError); + return { + content: [{ + type: "text", + text: `Error validating collection creation: ${validationError.message || validationError}` + }], + isError: true + }; } - - console.log(`[INDEX-VALIDATION] āœ… Collection creation validation completed`); - } catch (validationError: any) { - // Handle other collection creation errors - console.error(`[INDEX-VALIDATION] āŒ Collection creation validation failed:`, validationError); - return { - content: [{ - type: "text", - text: `Error validating collection creation: ${validationError.message || validationError}` - }], - isError: true - }; } // Add custom extensions if provided @@ -286,22 +303,17 @@ export class ToolHandlers { this.context.addCustomIgnorePatterns(customIgnorePatterns); } - // Check current status and log if retrying after failure - const currentStatus = this.snapshotManager.getCodebaseStatus(absolutePath); - if (currentStatus === 'indexfailed') { - const failedInfo = this.snapshotManager.getCodebaseInfo(absolutePath) as any; - console.log(`[BACKGROUND-INDEX] Retrying indexing for previously failed codebase. Previous error: ${failedInfo?.errorMessage || 'Unknown error'}`); - } - - // Set to indexing status and save snapshot immediately - this.snapshotManager.setCodebaseIndexing(absolutePath, 0); - this.snapshotManager.saveCodebaseSnapshot(); + // Add to indexing list and save snapshot immediately + this.snapshotManager.addIndexingCodebase(absolutePath); + await this.snapshotManager.saveCodebaseSnapshot(); // Track the codebase path for syncing trackCodebasePath(absolutePath); // Start background indexing - now safe to proceed - this.startBackgroundIndexing(absolutePath, forceReindex, splitterType); + await this.startBackgroundIndexing(absolutePath, forceReindex, splitterType); + + const stats = this.indexingStats; const pathInfo = codebasePath !== absolutePath ? `\nNote: Input path '${codebasePath}' was resolved to absolute path '${absolutePath}'` @@ -315,10 +327,17 @@ export class ToolHandlers { ? `\nUsing ${customIgnorePatterns.length} custom ignore patterns: ${customIgnorePatterns.join(', ')}` : ''; + let message = `Completed indexing for codebase '${absolutePath}' using ${splitterType.toUpperCase()} splitter.${pathInfo}${extensionInfo}${ignoreInfo}`; + + if (stats) { + message += `\n\nIndexed ${stats.indexedFiles} files, ${stats.totalChunks} chunks.`; + console.log(stats) + } + return { content: [{ type: "text", - text: `Started background indexing for codebase '${absolutePath}' using ${splitterType.toUpperCase()} splitter.${pathInfo}${extensionInfo}${ignoreInfo}\n\nIndexing is running in the background. You can search the codebase while indexing is in progress, but results may be incomplete until indexing completes.` + text: message }] }; @@ -381,28 +400,29 @@ export class ToolHandlers { // Start indexing with the appropriate context and progress tracking console.log(`[BACKGROUND-INDEX] šŸš€ Beginning codebase indexing process...`); - const stats = await contextForThisTask.indexCodebase(absolutePath, (progress) => { - // Update progress in snapshot manager using new method - this.snapshotManager.setCodebaseIndexing(absolutePath, progress.percentage); + const stats = await contextForThisTask.indexCodebase(absolutePath, async (progress) => { + // Update progress in snapshot manager + this.snapshotManager.updateIndexingProgress(absolutePath, progress.percentage); // Save snapshot periodically (every 2 seconds to avoid too frequent saves) const currentTime = Date.now(); - if (currentTime - lastSaveTime >= 2000) { // 2 seconds = 2000ms - this.snapshotManager.saveCodebaseSnapshot(); + if (currentTime - lastSaveTime >= 5000) { // 2 seconds = 2000ms + await this.snapshotManager.saveCodebaseSnapshot(); lastSaveTime = currentTime; console.log(`[BACKGROUND-INDEX] šŸ’¾ Saved progress snapshot at ${progress.percentage.toFixed(1)}%`); } console.log(`[BACKGROUND-INDEX] Progress: ${progress.phase} - ${progress.percentage}% (${progress.current}/${progress.total})`); }); + await this.snapshotManager.saveCodebaseSnapshot(); console.log(`[BACKGROUND-INDEX] āœ… Indexing completed successfully! Files: ${stats.indexedFiles}, Chunks: ${stats.totalChunks}`); - // Set codebase to indexed status with complete statistics - this.snapshotManager.setCodebaseIndexed(absolutePath, stats); + // Move from indexing to indexed list + this.snapshotManager.moveFromIndexingToIndexed(absolutePath); this.indexingStats = { indexedFiles: stats.indexedFiles, totalChunks: stats.totalChunks }; // Save snapshot after updating codebase lists - this.snapshotManager.saveCodebaseSnapshot(); + await this.snapshotManager.saveCodebaseSnapshot(); let message = `Background indexing completed for '${absolutePath}' using ${splitterType.toUpperCase()} splitter.\nIndexed ${stats.indexedFiles} files, ${stats.totalChunks} chunks.`; if (stats.status === 'limit_reached') { @@ -413,17 +433,12 @@ export class ToolHandlers { } catch (error: any) { console.error(`[BACKGROUND-INDEX] Error during indexing for ${absolutePath}:`, error); - - // Get the last attempted progress - const lastProgress = this.snapshotManager.getIndexingProgress(absolutePath); - - // Set codebase to failed status with error information - const errorMessage = error.message || String(error); - this.snapshotManager.setCodebaseIndexFailed(absolutePath, errorMessage, lastProgress); - this.snapshotManager.saveCodebaseSnapshot(); + // Remove from indexing list on error + this.snapshotManager.removeIndexingCodebase(absolutePath); + await this.snapshotManager.saveCodebaseSnapshot(); // Log error but don't crash MCP service - indexing errors are handled gracefully - console.error(`[BACKGROUND-INDEX] Indexing failed for ${absolutePath}: ${errorMessage}`); + console.error(`[BACKGROUND-INDEX] Indexing failed for ${absolutePath}: ${error.message || error}`); } } @@ -432,6 +447,7 @@ export class ToolHandlers { const resultLimit = limit || 10; try { + await this.snapshotManager.refreshFromFile(); // Sync indexed codebases from cloud first await this.syncIndexedCodebasesFromCloud(); @@ -585,11 +601,12 @@ export class ToolHandlers { } public async handleClearIndex(args: any) { + // Leader check if (!isCurrentProcessLeader()) { return { content: [{ type: "text", - text: "Another process is already indexing. This process is a follower and cannot index." + text: "Error: This MCP instance is a follower and cannot perform clear operations. Only the leader instance can modify the index." }], isError: true }; @@ -664,14 +681,15 @@ export class ToolHandlers { }; } - // Completely remove the cleared codebase from snapshot - this.snapshotManager.removeCodebaseCompletely(absolutePath); + // Remove the cleared codebase from both lists + this.snapshotManager.removeIndexedCodebase(absolutePath); + this.snapshotManager.removeIndexingCodebase(absolutePath); // Reset indexing stats if this was the active codebase this.indexingStats = null; // Save snapshot after clearing index - this.snapshotManager.saveCodebaseSnapshot(); + await this.snapshotManager.saveCodebaseSnapshot(); let resultText = `Successfully cleared codebase '${absolutePath}'`; @@ -718,6 +736,9 @@ export class ToolHandlers { const { path: codebasePath } = args; try { + + await this.snapshotManager.refreshFromFile(); + // Force absolute path resolution const absolutePath = ensureAbsolutePath(codebasePath); @@ -744,62 +765,27 @@ export class ToolHandlers { }; } - // Check indexing status using new status system - const status = this.snapshotManager.getCodebaseStatus(absolutePath); - const info = this.snapshotManager.getCodebaseInfo(absolutePath); + // Check indexing status + const isIndexed = this.snapshotManager.getIndexedCodebases().includes(absolutePath); + const isIndexing = this.snapshotManager.getIndexingCodebases().includes(absolutePath); + const indexingProgress = this.snapshotManager.getIndexingProgress(absolutePath); let statusMessage = ''; - switch (status) { - case 'indexed': - if (info && 'indexedFiles' in info) { - const indexedInfo = info as any; - statusMessage = `āœ… Codebase '${absolutePath}' is fully indexed and ready for search.`; - statusMessage += `\nšŸ“Š Statistics: ${indexedInfo.indexedFiles} files, ${indexedInfo.totalChunks} chunks`; - statusMessage += `\nšŸ“… Status: ${indexedInfo.indexStatus}`; - statusMessage += `\nšŸ• Last updated: ${new Date(indexedInfo.lastUpdated).toLocaleString()}`; - } else { - statusMessage = `āœ… Codebase '${absolutePath}' is fully indexed and ready for search.`; - } - break; - - case 'indexing': - if (info && 'indexingPercentage' in info) { - const indexingInfo = info as any; - const progressPercentage = indexingInfo.indexingPercentage || 0; - statusMessage = `šŸ”„ Codebase '${absolutePath}' is currently being indexed. Progress: ${progressPercentage.toFixed(1)}%`; - - // Add more detailed status based on progress - if (progressPercentage < 10) { - statusMessage += ' (Preparing and scanning files...)'; - } else if (progressPercentage < 100) { - statusMessage += ' (Processing files and generating embeddings...)'; - } - statusMessage += `\nšŸ• Last updated: ${new Date(indexingInfo.lastUpdated).toLocaleString()}`; - } else { - statusMessage = `šŸ”„ Codebase '${absolutePath}' is currently being indexed.`; - } - break; - - case 'indexfailed': - if (info && 'errorMessage' in info) { - const failedInfo = info as any; - statusMessage = `āŒ Codebase '${absolutePath}' indexing failed.`; - statusMessage += `\n🚨 Error: ${failedInfo.errorMessage}`; - if (failedInfo.lastAttemptedPercentage !== undefined) { - statusMessage += `\nšŸ“Š Failed at: ${failedInfo.lastAttemptedPercentage.toFixed(1)}% progress`; - } - statusMessage += `\nšŸ• Failed at: ${new Date(failedInfo.lastUpdated).toLocaleString()}`; - statusMessage += `\nšŸ’” You can retry indexing by running the index_codebase command again.`; - } else { - statusMessage = `āŒ Codebase '${absolutePath}' indexing failed. You can retry indexing.`; - } - break; - - case 'not_found': - default: - statusMessage = `āŒ Codebase '${absolutePath}' is not indexed. Please use the index_codebase tool to index it first.`; - break; + if (isIndexed) { + statusMessage = `āœ… Codebase '${absolutePath}' is fully indexed and ready for search.`; + } else if (isIndexing) { + const progressPercentage = indexingProgress !== undefined ? indexingProgress : 0; + statusMessage = `šŸ”„ Codebase '${absolutePath}' is currently being indexed. Progress: ${progressPercentage.toFixed(1)}%`; + + // Add more detailed status based on progress + if (progressPercentage < 10) { + statusMessage += ' (Preparing and scanning files...)'; + } else if (progressPercentage < 100) { + statusMessage += ' (Processing files and generating embeddings...)'; + } + } else { + statusMessage = `āŒ Codebase '${absolutePath}' is not indexed. Please use the index_codebase tool to index it first.`; } const pathInfo = codebasePath !== absolutePath @@ -823,4 +809,8 @@ export class ToolHandlers { }; } } -} \ No newline at end of file +} + + + + diff --git a/packages/mcp/src/index.ts b/packages/mcp/src/index.ts index 8c4c3b28..ed49398e 100644 --- a/packages/mcp/src/index.ts +++ b/packages/mcp/src/index.ts @@ -23,6 +23,7 @@ import { } from "@modelcontextprotocol/sdk/types.js"; import { Context } from "@zilliz/claude-context-core"; import { MilvusVectorDatabase } from "@zilliz/claude-context-core"; +import { MilvusRestfulVectorDatabase } from "@zilliz/claude-context-core"; // Import our modular components import { createMcpConfig, logConfigurationSummary, showHelpMessage, ContextMcpConfig } from "./config.js"; @@ -30,6 +31,7 @@ import { createEmbeddingInstance, logEmbeddingProviderInfo } from "./embedding.j import { SnapshotManager } from "./snapshot.js"; import { SyncManager } from "./sync.js"; import { ToolHandlers } from "./handlers.js"; +import { acquireLeaderLock } from "./lock.js"; class ContextMcpServer { private server: Server; @@ -60,7 +62,11 @@ class ContextMcpServer { logEmbeddingProviderInfo(config, embedding); // Initialize vector database - const vectorDatabase = new MilvusVectorDatabase({ + // const vectorDatabase = new MilvusVectorDatabase({ + // address: config.milvusAddress, + // ...(config.milvusToken && { token: config.milvusToken }) + // }); + const vectorDatabase = new MilvusRestfulVectorDatabase({ address: config.milvusAddress, ...(config.milvusToken && { token: config.milvusToken }) }); @@ -77,11 +83,15 @@ class ContextMcpServer { this.toolHandlers = new ToolHandlers(this.context, this.snapshotManager); // Load existing codebase snapshot on startup - this.snapshotManager.loadCodebaseSnapshot(); + // this.snapshotManager.loadCodebaseSnapshot(); this.setupTools(); } + async init() { + await this.snapshotManager.loadCodebaseSnapshot(); + } + private setupTools() { const index_description = ` Index a codebase directory to enable semantic search using a configurable code splitter. @@ -246,6 +256,8 @@ This tool is versatile and can be used before completing various tasks to retrie } async start() { + + await this.init(); console.log('[SYNC-DEBUG] MCP server start() method called'); console.log('Starting Context MCP server...'); @@ -259,6 +271,11 @@ This tool is versatile and can be used before completing various tasks to retrie // Start background sync after server is connected console.log('[SYNC-DEBUG] Initializing background sync...'); this.syncManager.startBackgroundSync(); + + // Try to acquire leader lock (non-blocking, will be leader or follower) + console.log('[LEADER] Attempting to acquire leader lock...'); + await acquireLeaderLock(); + console.log('[SYNC-DEBUG] MCP server initialization complete'); } } diff --git a/packages/mcp/src/lock.ts b/packages/mcp/src/lock.ts index bcbc6a92..861f6fde 100644 --- a/packages/mcp/src/lock.ts +++ b/packages/mcp/src/lock.ts @@ -1,77 +1,189 @@ - -import os from 'os'; -import path from 'path'; -import fs from 'fs'; -import lockfile from 'proper-lockfile'; - -const CONTEXT_DIR = path.join(os.homedir(), '.context'); -const LOCK_FILE = path.join(CONTEXT_DIR, 'leader.lock'); - -// Ensure the .context directory exists -if (!fs.existsSync(CONTEXT_DIR)) { - fs.mkdirSync(CONTEXT_DIR, { recursive: true }); -} - -let isLeader = false; -let lockInterval: NodeJS.Timeout | undefined; - -export async function acquireLock(): Promise { - if (isLeader) { - return true; - } - try { - // Using flock is generally more reliable as the lock is released by the OS if the process dies. - // proper-lockfile will use flock on systems that support it (Linux, BSD, etc.) - // and fall back to other mechanisms on systems that don't (like Windows). - await lockfile.lock(LOCK_FILE, { retries: 0, realpath: false }); - isLeader = true; - console.log('Acquired leader lock. This process is now the leader.'); - if (lockInterval) { - clearInterval(lockInterval); - lockInterval = undefined; - } - return true; - } catch (error) { - console.log('Could not acquire leader lock, running as follower.'); - isLeader = false; - if (!lockInterval) { - lockInterval = setInterval(acquireLock, 5000); // Check every 5 seconds - } - return false; - } -} - -export async function releaseLock(): Promise { - if (isLeader) { - try { - await lockfile.unlock(LOCK_FILE, { realpath: false }); - isLeader = false; - console.log('Released leader lock.'); - } catch (error) { - console.error('Error releasing leader lock:', error); - } - } -} - -export function isCurrentProcessLeader(): boolean { - return isLeader; -} - -export function getLockFilePath(): string { - return LOCK_FILE; -} - -// Graceful shutdown -process.on('exit', async () => { - await releaseLock(); -}); - -process.on('SIGINT', async () => { - await releaseLock(); - process.exit(); -}); - -process.on('SIGTERM', async () => { - await releaseLock(); - process.exit(); -}); +import os from 'os'; +import path from 'path'; +import fs from 'fs'; +import lockfile from 'proper-lockfile'; + +const CONTEXT_DIR = path.join(os.homedir(), '.context'); +const SNAPSHOT_LOCK_FILE = path.join(CONTEXT_DIR, 'snapshot.lock'); +const LEADER_LOCK_FILE = path.join(CONTEXT_DIR, 'leader.lock'); + +// Ensure the .context directory exists +if (!fs.existsSync(CONTEXT_DIR)) { + fs.mkdirSync(CONTEXT_DIR, { recursive: true }); +} + +// Create snapshot lock file if it doesn't exist +if (!fs.existsSync(SNAPSHOT_LOCK_FILE)) { + fs.writeFileSync(SNAPSHOT_LOCK_FILE, ''); +} + +// Create leader lock file if it doesn't exist +if (!fs.existsSync(LEADER_LOCK_FILE)) { + fs.writeFileSync(LEADER_LOCK_FILE, ''); +} + +// Leader election state +let isLeader = false; +let lockInterval: any | undefined; + +/** + * Attempt to acquire the leader lock. + * If successful, this instance becomes the leader. + * If unsuccessful, it becomes a follower and retries periodically. + */ +export async function acquireLeaderLock(): Promise { + if (isLeader) { + return true; + } + + try { + // Try to acquire the lock immediately without retries + // proper-lockfile will use flock where available + await lockfile.lock(LEADER_LOCK_FILE, { + retries: 0, + realpath: false + }); + + isLeader = true; + console.log('[LEADER] Acquired leader lock. This process is now the LEADER.'); + + if (lockInterval) { + clearInterval(lockInterval); + lockInterval = undefined; + } + return true; + } catch (error: any) { + // Lock acquisition failed - someone else is leader + if (isLeader) { + console.log('[LEADER] Lost leader lock.'); + isLeader = false; + } + + if (!lockInterval) { + console.log('[LEADER] Could not acquire leader lock, running as FOLLOWER.'); + // Retry every 5 seconds + lockInterval = setInterval(async () => { + await acquireLeaderLock(); + }, 5000); + } + return false; + } +} + +/** + * Check if the current process is the leader + */ +export function isCurrentProcessLeader(): boolean { + return isLeader; +} + +/** + * Release the leader lock (e.g. on shutdown) + */ +export async function releaseLeaderLock(): Promise { + if (isLeader) { + try { + await lockfile.unlock(LEADER_LOCK_FILE, { realpath: false }); + isLeader = false; + console.log('[LEADER] Released leader lock.'); + } catch (error) { + console.error('[LEADER] Error releasing leader lock:', error); + } + } +} + +/** + * Execute a function with file lock protection (for snapshot file) + * Lock is acquired only for the duration of the function execution + */ +export async function withFileLock( + fn: () => Promise, + options: { maxRetries?: number; retryDelay?: number } = {} +): Promise { + const maxRetries = options.maxRetries ?? 10; + const retryDelay = options.retryDelay ?? 100; + + let lastError: Error | null = null; + + for (let attempt = 0; attempt <= maxRetries; attempt++) { + try { + // Acquire lock with short retry interval + const release = await lockfile.lock(SNAPSHOT_LOCK_FILE, { + retries: { + retries: 5, + minTimeout: 50, + maxTimeout: 200 + }, + realpath: false, + stale: 10000 // Consider lock stale after 10 seconds + }); + + try { + // Execute the protected function + const result = await fn(); + return result; + } finally { + // Always release the lock + try { + await release(); + } catch (releaseError) { + console.warn('[LOCK] Warning: Error releasing lock:', releaseError); + } + } + } catch (error: any) { + lastError = error; + + if (attempt < maxRetries) { + await new Promise(resolve => setTimeout(resolve, retryDelay)); + } + } + } + + throw new Error(`Failed to acquire lock after ${maxRetries + 1} attempts: ${lastError?.message}`); +} + +/** + * Check if snapshot lock file exists and is locked + */ +export function isLocked(): boolean { + try { + return lockfile.checkSync(SNAPSHOT_LOCK_FILE, { realpath: false }); + } catch { + return false; + } +} + +/** + * Force unlock in case of stale lock (use with caution) + */ +export async function forceUnlock(): Promise { + try { + if (fs.existsSync(SNAPSHOT_LOCK_FILE)) { + await lockfile.unlock(SNAPSHOT_LOCK_FILE, { realpath: false }); + console.log('[LOCK] Force unlocked stale snapshot lock'); + } + } catch (error) { + console.warn('[LOCK] Warning: Could not force unlock:', error); + } +} + +// Cleanup on process exit +process.on('exit', () => { + try { + if (isLocked()) { + lockfile.unlockSync(SNAPSHOT_LOCK_FILE, { realpath: false }); + } + } catch { } +}); + +process.on('SIGINT', async () => { + await releaseLeaderLock(); + await forceUnlock(); + process.exit(130); +}); + +process.on('SIGTERM', async () => { + await releaseLeaderLock(); + await forceUnlock(); + process.exit(143); +}); \ No newline at end of file diff --git a/packages/mcp/src/snapshot.ts b/packages/mcp/src/snapshot.ts index 81982b7b..5a5b934b 100644 --- a/packages/mcp/src/snapshot.ts +++ b/packages/mcp/src/snapshot.ts @@ -1,506 +1,238 @@ import * as fs from "fs"; import * as path from "path"; import * as os from "os"; -import { - CodebaseSnapshot, - CodebaseSnapshotV1, - CodebaseSnapshotV2, - CodebaseInfo, - CodebaseInfoIndexing, - CodebaseInfoIndexed, - CodebaseInfoIndexFailed -} from "./config.js"; +import { CodebaseSnapshot } from "./config.js"; +import { withFileLock } from './lock.js'; export class SnapshotManager { private snapshotFilePath: string; private indexedCodebases: string[] = []; - private indexingCodebases: Map = new Map(); // Map of codebase path to progress percentage - private codebaseFileCount: Map = new Map(); // Map of codebase path to indexed file count - private codebaseInfoMap: Map = new Map(); // Map of codebase path to complete info + private indexingCodebases: Map = new Map(); constructor() { - // Initialize snapshot file path this.snapshotFilePath = path.join(os.homedir(), '.context', 'mcp-codebase-snapshot.json'); } - /** - * Check if snapshot is v2 format - */ - private isV2Format(snapshot: any): snapshot is CodebaseSnapshotV2 { - return snapshot && snapshot.formatVersion === 'v2'; - } - - /** - * Convert v1 format to internal state - */ - private loadV1Format(snapshot: CodebaseSnapshotV1): void { - console.log('[SNAPSHOT-DEBUG] Loading v1 format snapshot'); - - // Validate that the codebases still exist - const validCodebases: string[] = []; - for (const codebasePath of snapshot.indexedCodebases) { - if (fs.existsSync(codebasePath)) { - validCodebases.push(codebasePath); - console.log(`[SNAPSHOT-DEBUG] Validated codebase: ${codebasePath}`); - } else { - console.warn(`[SNAPSHOT-DEBUG] Codebase no longer exists, removing: ${codebasePath}`); - } - } - - // Handle indexing codebases - treat them as not indexed since they were interrupted - let indexingCodebasesList: string[] = []; - if (Array.isArray(snapshot.indexingCodebases)) { - // Legacy format: string[] - indexingCodebasesList = snapshot.indexingCodebases; - console.log(`[SNAPSHOT-DEBUG] Found legacy indexingCodebases array format with ${indexingCodebasesList.length} entries`); - } else if (snapshot.indexingCodebases && typeof snapshot.indexingCodebases === 'object') { - // New format: Record - indexingCodebasesList = Object.keys(snapshot.indexingCodebases); - console.log(`[SNAPSHOT-DEBUG] Found new indexingCodebases object format with ${indexingCodebasesList.length} entries`); - } - - for (const codebasePath of indexingCodebasesList) { - if (fs.existsSync(codebasePath)) { - console.warn(`[SNAPSHOT-DEBUG] Found interrupted indexing codebase: ${codebasePath}. Treating as not indexed.`); - // Don't add to validIndexingCodebases - treat as not indexed - } else { - console.warn(`[SNAPSHOT-DEBUG] Interrupted indexing codebase no longer exists: ${codebasePath}`); - } - } - - // Restore state - only fully indexed codebases - this.indexedCodebases = validCodebases; - this.indexingCodebases = new Map(); // Reset indexing codebases since they were interrupted - this.codebaseFileCount = new Map(); // No file count info in v1 format - - // Populate codebaseInfoMap for v1 indexed codebases (with minimal info) - this.codebaseInfoMap = new Map(); - const now = new Date().toISOString(); - for (const codebasePath of validCodebases) { - const info: CodebaseInfoIndexed = { - status: 'indexed', - indexedFiles: 0, // Unknown in v1 format - totalChunks: 0, // Unknown in v1 format - indexStatus: 'completed', // Assume completed for v1 format - lastUpdated: now - }; - this.codebaseInfoMap.set(codebasePath, info); - } - } - - /** - * Convert v2 format to internal state - */ - private loadV2Format(snapshot: CodebaseSnapshotV2): void { - console.log('[SNAPSHOT-DEBUG] Loading v2 format snapshot'); - - const validIndexedCodebases: string[] = []; - const validIndexingCodebases = new Map(); - const validFileCount = new Map(); - const validCodebaseInfoMap = new Map(); - - for (const [codebasePath, info] of Object.entries(snapshot.codebases)) { - if (!fs.existsSync(codebasePath)) { - console.warn(`[SNAPSHOT-DEBUG] Codebase no longer exists, removing: ${codebasePath}`); - continue; - } - - // Store the complete info for this codebase - validCodebaseInfoMap.set(codebasePath, info); - - if (info.status === 'indexed') { - validIndexedCodebases.push(codebasePath); - if ('indexedFiles' in info) { - validFileCount.set(codebasePath, info.indexedFiles); - } - console.log(`[SNAPSHOT-DEBUG] Validated indexed codebase: ${codebasePath} (${info.indexedFiles || 'unknown'} files, ${info.totalChunks || 'unknown'} chunks)`); - } else if (info.status === 'indexing') { - if ('indexingPercentage' in info) { - validIndexingCodebases.set(codebasePath, info.indexingPercentage); - } - console.warn(`[SNAPSHOT-DEBUG] Found interrupted indexing codebase: ${codebasePath} (${info.indexingPercentage || 0}%). Treating as not indexed.`); - // Don't add to indexed - treat interrupted indexing as not indexed - } else if (info.status === 'indexfailed') { - console.warn(`[SNAPSHOT-DEBUG] Found failed indexing codebase: ${codebasePath}. Error: ${info.errorMessage}`); - // Failed indexing codebases are not added to indexed or indexing lists - // But we keep the info for potential retry - } - } - - // Restore state - this.indexedCodebases = validIndexedCodebases; - this.indexingCodebases = new Map(); // Reset indexing codebases since they were interrupted - this.codebaseFileCount = validFileCount; - this.codebaseInfoMap = validCodebaseInfoMap; - } - public getIndexedCodebases(): string[] { - // Read from JSON file to ensure consistency and persistence - try { - if (!fs.existsSync(this.snapshotFilePath)) { - return []; - } - - const snapshotData = fs.readFileSync(this.snapshotFilePath, 'utf8'); - const snapshot: CodebaseSnapshot = JSON.parse(snapshotData); - - if (this.isV2Format(snapshot)) { - return Object.entries(snapshot.codebases) - .filter(([_, info]) => info.status === 'indexed') - .map(([path, _]) => path); - } else { - // V1 format - return snapshot.indexedCodebases || []; - } - } catch (error) { - console.warn(`[SNAPSHOT-DEBUG] Error reading indexed codebases from file:`, error); - // Fallback to memory if file reading fails - return [...this.indexedCodebases]; - } + return [...this.indexedCodebases]; } public getIndexingCodebases(): string[] { - // Read from JSON file to ensure consistency and persistence - try { - if (!fs.existsSync(this.snapshotFilePath)) { - return []; - } - - const snapshotData = fs.readFileSync(this.snapshotFilePath, 'utf8'); - const snapshot: CodebaseSnapshot = JSON.parse(snapshotData); - - if (this.isV2Format(snapshot)) { - return Object.entries(snapshot.codebases) - .filter(([_, info]) => info.status === 'indexing') - .map(([path, _]) => path); - } else { - // V1 format - Handle both legacy array format and new object format - if (Array.isArray(snapshot.indexingCodebases)) { - // Legacy format: return the array directly - return snapshot.indexingCodebases; - } else if (snapshot.indexingCodebases && typeof snapshot.indexingCodebases === 'object') { - // New format: return the keys of the object - return Object.keys(snapshot.indexingCodebases); - } - } - - return []; - } catch (error) { - console.warn(`[SNAPSHOT-DEBUG] Error reading indexing codebases from file:`, error); - // Fallback to memory if file reading fails - return Array.from(this.indexingCodebases.keys()); - } + return Array.from(this.indexingCodebases.keys()); } - /** - * @deprecated Use getCodebaseInfo() for individual codebases or iterate through codebases for v2 format support - */ public getIndexingCodebasesWithProgress(): Map { return new Map(this.indexingCodebases); } public getIndexingProgress(codebasePath: string): number | undefined { - // Read from JSON file to ensure consistency and persistence - try { - if (!fs.existsSync(this.snapshotFilePath)) { - return undefined; - } - - const snapshotData = fs.readFileSync(this.snapshotFilePath, 'utf8'); - const snapshot: CodebaseSnapshot = JSON.parse(snapshotData); - - if (this.isV2Format(snapshot)) { - const info = snapshot.codebases[codebasePath]; - if (info && info.status === 'indexing') { - return info.indexingPercentage || 0; - } - return undefined; - } else { - // V1 format - Handle both legacy array format and new object format - if (Array.isArray(snapshot.indexingCodebases)) { - // Legacy format: if path exists in array, assume 0% progress - return snapshot.indexingCodebases.includes(codebasePath) ? 0 : undefined; - } else if (snapshot.indexingCodebases && typeof snapshot.indexingCodebases === 'object') { - // New format: return the actual progress percentage - return snapshot.indexingCodebases[codebasePath]; - } - } - - return undefined; - } catch (error) { - console.warn(`[SNAPSHOT-DEBUG] Error reading progress from file for ${codebasePath}:`, error); - // Fallback to memory if file reading fails - return this.indexingCodebases.get(codebasePath); - } + return this.indexingCodebases.get(codebasePath); } - /** - * @deprecated Use setCodebaseIndexing() instead for v2 format support - */ public addIndexingCodebase(codebasePath: string, progress: number = 0): void { this.indexingCodebases.set(codebasePath, progress); - - // Also update codebaseInfoMap for v2 compatibility - const info: CodebaseInfoIndexing = { - status: 'indexing', - indexingPercentage: progress, - lastUpdated: new Date().toISOString() - }; - this.codebaseInfoMap.set(codebasePath, info); } - /** - * @deprecated Use setCodebaseIndexing() instead for v2 format support - */ public updateIndexingProgress(codebasePath: string, progress: number): void { if (this.indexingCodebases.has(codebasePath)) { this.indexingCodebases.set(codebasePath, progress); - - // Also update codebaseInfoMap for v2 compatibility - const info: CodebaseInfoIndexing = { - status: 'indexing', - indexingPercentage: progress, - lastUpdated: new Date().toISOString() - }; - this.codebaseInfoMap.set(codebasePath, info); } } - /** - * @deprecated Use removeCodebaseCompletely() or state-specific methods instead for v2 format support - */ public removeIndexingCodebase(codebasePath: string): void { this.indexingCodebases.delete(codebasePath); - // Also remove from codebaseInfoMap for v2 compatibility - this.codebaseInfoMap.delete(codebasePath); } - /** - * @deprecated Use setCodebaseIndexed() instead for v2 format support - */ - public addIndexedCodebase(codebasePath: string, fileCount?: number): void { + public addIndexedCodebase(codebasePath: string): void { if (!this.indexedCodebases.includes(codebasePath)) { this.indexedCodebases.push(codebasePath); } - if (fileCount !== undefined) { - this.codebaseFileCount.set(codebasePath, fileCount); - } - - // Also update codebaseInfoMap for v2 compatibility - const info: CodebaseInfoIndexed = { - status: 'indexed', - indexedFiles: fileCount || 0, - totalChunks: 0, // Unknown in v1 method - indexStatus: 'completed', - lastUpdated: new Date().toISOString() - }; - this.codebaseInfoMap.set(codebasePath, info); } - /** - * @deprecated Use removeCodebaseCompletely() or state-specific methods instead for v2 format support - */ public removeIndexedCodebase(codebasePath: string): void { this.indexedCodebases = this.indexedCodebases.filter(path => path !== codebasePath); - this.codebaseFileCount.delete(codebasePath); - // Also remove from codebaseInfoMap for v2 compatibility - this.codebaseInfoMap.delete(codebasePath); } - /** - * @deprecated Use setCodebaseIndexed() instead for v2 format support - */ - public moveFromIndexingToIndexed(codebasePath: string, fileCount?: number): void { + public moveFromIndexingToIndexed(codebasePath: string): void { this.removeIndexingCodebase(codebasePath); - this.addIndexedCodebase(codebasePath, fileCount); + this.addIndexedCodebase(codebasePath); } /** - * @deprecated Use getCodebaseInfo() and check indexedFiles property instead for v2 format support + * Load snapshot from file with file lock protection */ - public getIndexedFileCount(codebasePath: string): number | undefined { - return this.codebaseFileCount.get(codebasePath); - } + public async loadCodebaseSnapshot(): Promise { + console.log('[SNAPSHOT-DEBUG] Loading codebase snapshot from:', this.snapshotFilePath); - /** - * @deprecated Use setCodebaseIndexed() with complete stats instead for v2 format support - */ - public setIndexedFileCount(codebasePath: string, fileCount: number): void { - this.codebaseFileCount.set(codebasePath, fileCount); - } + try { + await withFileLock(async () => { + if (!fs.existsSync(this.snapshotFilePath)) { + console.log('[SNAPSHOT-DEBUG] Snapshot file does not exist. Starting with empty codebase list.'); + return; + } - /** - * Set codebase to indexing status - */ - public setCodebaseIndexing(codebasePath: string, progress: number = 0): void { - this.indexingCodebases.set(codebasePath, progress); + const snapshotData = fs.readFileSync(this.snapshotFilePath, 'utf8'); + const snapshot: CodebaseSnapshot = JSON.parse(snapshotData); - // Remove from other states - this.indexedCodebases = this.indexedCodebases.filter(path => path !== codebasePath); - this.codebaseFileCount.delete(codebasePath); - - // Update info map - const info: CodebaseInfoIndexing = { - status: 'indexing', - indexingPercentage: progress, - lastUpdated: new Date().toISOString() - }; - this.codebaseInfoMap.set(codebasePath, info); - } + console.log('[SNAPSHOT-DEBUG] Loaded snapshot:', snapshot); - /** - * Set codebase to indexed status with complete statistics - */ - public setCodebaseIndexed( - codebasePath: string, - stats: { indexedFiles: number; totalChunks: number; status: 'completed' | 'limit_reached' } - ): void { - // Add to indexed list if not already there - if (!this.indexedCodebases.includes(codebasePath)) { - this.indexedCodebases.push(codebasePath); - } + // Validate indexed codebases + const validCodebases: string[] = []; + for (const codebasePath of snapshot.indexedCodebases) { + if (fs.existsSync(codebasePath)) { + validCodebases.push(codebasePath); + console.log(`[SNAPSHOT-DEBUG] Validated codebase: ${codebasePath}`); + } else { + console.warn(`[SNAPSHOT-DEBUG] Codebase no longer exists, removing: ${codebasePath}`); + } + } - // Remove from indexing state - this.indexingCodebases.delete(codebasePath); + // Handle indexing codebases + let indexingCodebasesList: string[] = []; + let indexingProgress: Record = {}; - // Update file count and info - this.codebaseFileCount.set(codebasePath, stats.indexedFiles); - - const info: CodebaseInfoIndexed = { - status: 'indexed', - indexedFiles: stats.indexedFiles, - totalChunks: stats.totalChunks, - indexStatus: stats.status, - lastUpdated: new Date().toISOString() - }; - this.codebaseInfoMap.set(codebasePath, info); - } + if (Array.isArray(snapshot.indexingCodebases)) { + indexingCodebasesList = snapshot.indexingCodebases; + console.log(`[SNAPSHOT-DEBUG] Found legacy indexingCodebases array format with ${indexingCodebasesList.length} entries`); + } else if (snapshot.indexingCodebases && typeof snapshot.indexingCodebases === 'object') { + indexingProgress = snapshot.indexingCodebases; + indexingCodebasesList = Object.keys(indexingProgress); + console.log(`[SNAPSHOT-DEBUG] Found new indexingCodebases object format with ${indexingCodebasesList.length} entries`); + } - /** - * Set codebase to failed status - */ - public setCodebaseIndexFailed( - codebasePath: string, - errorMessage: string, - lastAttemptedPercentage?: number - ): void { - // Remove from other states - this.indexedCodebases = this.indexedCodebases.filter(path => path !== codebasePath); - this.indexingCodebases.delete(codebasePath); - this.codebaseFileCount.delete(codebasePath); - - // Update info map - const info: CodebaseInfoIndexFailed = { - status: 'indexfailed', - errorMessage: errorMessage, - lastAttemptedPercentage: lastAttemptedPercentage, - lastUpdated: new Date().toISOString() - }; - this.codebaseInfoMap.set(codebasePath, info); - } + // Restore valid indexing codebases with their progress + const validIndexingCodebases = new Map(); + for (const codebasePath of indexingCodebasesList) { + if (fs.existsSync(codebasePath)) { + const progress = indexingProgress[codebasePath] ?? 0; + validIndexingCodebases.set(codebasePath, progress); + console.log(`[SNAPSHOT-DEBUG] Restored indexing codebase: ${codebasePath} (${progress}%)`); + } else { + console.warn(`[SNAPSHOT-DEBUG] Indexing codebase no longer exists: ${codebasePath}`); + } + } - /** - * Get codebase status - */ - public getCodebaseStatus(codebasePath: string): 'indexed' | 'indexing' | 'indexfailed' | 'not_found' { - const info = this.codebaseInfoMap.get(codebasePath); - if (!info) return 'not_found'; - return info.status; - } + // Update in-memory state + this.indexedCodebases = validCodebases; + this.indexingCodebases = validIndexingCodebases; - /** - * Get complete codebase information - */ - public getCodebaseInfo(codebasePath: string): CodebaseInfo | undefined { - return this.codebaseInfoMap.get(codebasePath); - } + console.log(`[SNAPSHOT-DEBUG] Restored ${validCodebases.length} fully indexed codebases.`); + console.log(`[SNAPSHOT-DEBUG] Restored ${validIndexingCodebases.size} indexing codebases.`); + }); - /** - * Get all failed codebases - */ - public getFailedCodebases(): string[] { - return Array.from(this.codebaseInfoMap.entries()) - .filter(([_, info]) => info.status === 'indexfailed') - .map(([path, _]) => path); + } catch (error: any) { + console.error('[SNAPSHOT-DEBUG] Error loading snapshot:', error); + console.log('[SNAPSHOT-DEBUG] Starting with empty codebase list due to snapshot error.'); + } } /** - * Completely remove a codebase from all tracking (for clear_index operation) + * Save snapshot to file with file lock protection */ - public removeCodebaseCompletely(codebasePath: string): void { - // Remove from all internal state - this.indexedCodebases = this.indexedCodebases.filter(path => path !== codebasePath); - this.indexingCodebases.delete(codebasePath); - this.codebaseFileCount.delete(codebasePath); - this.codebaseInfoMap.delete(codebasePath); + public async saveCodebaseSnapshot(): Promise { + console.log('[SNAPSHOT-DEBUG] Saving codebase snapshot to:', this.snapshotFilePath); - console.log(`[SNAPSHOT-DEBUG] Completely removed codebase from snapshot: ${codebasePath}`); - } + try { + await withFileLock(async () => { + // Ensure directory exists + const snapshotDir = path.dirname(this.snapshotFilePath); + if (!fs.existsSync(snapshotDir)) { + fs.mkdirSync(snapshotDir, { recursive: true }); + console.log('[SNAPSHOT-DEBUG] Created snapshot directory:', snapshotDir); + } - public loadCodebaseSnapshot(): void { - console.log('[SNAPSHOT-DEBUG] Loading codebase snapshot from:', this.snapshotFilePath); + // Re-read the file to merge any changes made by other processes + let existingSnapshot: CodebaseSnapshot | null = null; + if (fs.existsSync(this.snapshotFilePath)) { + try { + const existingData = fs.readFileSync(this.snapshotFilePath, 'utf8'); + existingSnapshot = JSON.parse(existingData); + } catch (error) { + console.warn('[SNAPSHOT-DEBUG] Could not read existing snapshot for merging:', error); + } + } - try { - if (!fs.existsSync(this.snapshotFilePath)) { - console.log('[SNAPSHOT-DEBUG] Snapshot file does not exist. Starting with empty codebase list.'); - return; - } + // Merge logic: combine data from file and in-memory + const mergedIndexed = new Set(this.indexedCodebases); + const mergedIndexing = new Map(this.indexingCodebases); + + if (existingSnapshot) { + // Add indexed codebases from file + existingSnapshot.indexedCodebases.forEach(path => mergedIndexed.add(path)); + + // Merge indexing codebases (keep higher progress) + if (typeof existingSnapshot.indexingCodebases === 'object' && !Array.isArray(existingSnapshot.indexingCodebases)) { + Object.entries(existingSnapshot.indexingCodebases).forEach(([path, progress]) => { + const currentProgress = mergedIndexing.get(path) ?? 0; + mergedIndexing.set(path, Math.max(currentProgress, progress)); + }); + } + } - const snapshotData = fs.readFileSync(this.snapshotFilePath, 'utf8'); - const snapshot: CodebaseSnapshot = JSON.parse(snapshotData); + // Convert to snapshot format + const indexingCodebasesObject: Record = {}; + mergedIndexing.forEach((progress, path) => { + indexingCodebasesObject[path] = progress; + }); - console.log('[SNAPSHOT-DEBUG] Loaded snapshot:', snapshot); + const snapshot: CodebaseSnapshot = { + indexedCodebases: Array.from(mergedIndexed), + indexingCodebases: indexingCodebasesObject, + lastUpdated: new Date().toISOString() + }; - if (this.isV2Format(snapshot)) { - this.loadV2Format(snapshot); - } else { - this.loadV1Format(snapshot); - } + // Write atomically using temp file + const tempFile = `${this.snapshotFilePath}.tmp`; + fs.writeFileSync(tempFile, JSON.stringify(snapshot, null, 2)); + fs.renameSync(tempFile, this.snapshotFilePath); - // Always save in v2 format after loading (migration) - this.saveCodebaseSnapshot(); + console.log('[SNAPSHOT-DEBUG] Snapshot saved successfully. Indexed:', mergedIndexed.size, 'Indexing:', mergedIndexing.size); + }); } catch (error: any) { - console.error('[SNAPSHOT-DEBUG] Error loading snapshot:', error); - console.log('[SNAPSHOT-DEBUG] Starting with empty codebase list due to snapshot error.'); + console.error('[SNAPSHOT-DEBUG] Error saving snapshot:', error); } } - public saveCodebaseSnapshot(): void { - console.log('[SNAPSHOT-DEBUG] Saving codebase snapshot to:', this.snapshotFilePath); + /** + * Refresh in-memory state from file + * Useful to sync with changes made by other processes + */ + public async refreshFromFile(): Promise { + console.log('[SNAPSHOT-DEBUG] Refreshing from file...'); try { - // Ensure directory exists - const snapshotDir = path.dirname(this.snapshotFilePath); - if (!fs.existsSync(snapshotDir)) { - fs.mkdirSync(snapshotDir, { recursive: true }); - console.log('[SNAPSHOT-DEBUG] Created snapshot directory:', snapshotDir); - } - - // Build v2 format snapshot using the complete info map - const codebases: Record = {}; - - // Add all codebases from the info map - for (const [codebasePath, info] of this.codebaseInfoMap) { - codebases[codebasePath] = info; - } - - const snapshot: CodebaseSnapshotV2 = { - formatVersion: 'v2', - codebases: codebases, - lastUpdated: new Date().toISOString() - }; - - fs.writeFileSync(this.snapshotFilePath, JSON.stringify(snapshot, null, 2)); - - const indexedCount = this.indexedCodebases.length; - const indexingCount = this.indexingCodebases.size; - const failedCount = this.getFailedCodebases().length; + await withFileLock(async () => { + if (!fs.existsSync(this.snapshotFilePath)) { + return; + } - console.log(`[SNAPSHOT-DEBUG] Snapshot saved successfully in v2 format. Indexed: ${indexedCount}, Indexing: ${indexingCount}, Failed: ${failedCount}`); + const snapshotData = fs.readFileSync(this.snapshotFilePath, 'utf8'); + const snapshot: CodebaseSnapshot = JSON.parse(snapshotData); + + // Update indexed codebases (merge with existing) + const updatedIndexed = new Set(this.indexedCodebases); + snapshot.indexedCodebases.forEach(path => { + if (fs.existsSync(path)) { + updatedIndexed.add(path); + } + }); + this.indexedCodebases = Array.from(updatedIndexed); + + // Update indexing codebases (merge progress, keep higher values) + if (typeof snapshot.indexingCodebases === 'object' && !Array.isArray(snapshot.indexingCodebases)) { + Object.entries(snapshot.indexingCodebases).forEach(([path, progress]) => { + if (fs.existsSync(path)) { + const currentProgress = this.indexingCodebases.get(path) ?? 0; + this.indexingCodebases.set(path, Math.max(currentProgress, progress)); + } + }); + } - } catch (error: any) { - console.error('[SNAPSHOT-DEBUG] Error saving snapshot:', error); + console.log('[SNAPSHOT-DEBUG] Refreshed from file. Indexed:', this.indexedCodebases.length, 'Indexing:', this.indexingCodebases.size); + }); + } catch (error) { + console.warn('[SNAPSHOT-DEBUG] Error refreshing from file:', error); } } -} \ No newline at end of file +} diff --git a/packages/mcp/src/sync.ts b/packages/mcp/src/sync.ts index 0f53bab8..862f134b 100644 --- a/packages/mcp/src/sync.ts +++ b/packages/mcp/src/sync.ts @@ -1,6 +1,7 @@ import * as fs from "fs"; import { Context, FileSynchronizer } from "@zilliz/claude-context-core"; import { SnapshotManager } from "./snapshot.js"; +import { isCurrentProcessLeader } from "./lock.js"; export class SyncManager { private context: Context; @@ -13,6 +14,12 @@ export class SyncManager { } public async handleSyncIndex(): Promise { + // Leader check - followers should not sync + if (!isCurrentProcessLeader()) { + console.log('[SYNC-DEBUG] Follower mode: Skipping background sync (read-only)'); + return; + } + const syncStartTime = Date.now(); console.log(`[SYNC-DEBUG] handleSyncIndex() called at ${new Date().toISOString()}`); From 097bacc6b8df63515b9be5b512850e017fe96ab8 Mon Sep 17 00:00:00 2001 From: aabraha2 Date: Wed, 21 Jan 2026 06:51:09 +0000 Subject: [PATCH 3/3] CodebaseSnapshot version fix --- packages/mcp/src/snapshot.ts | 120 ++++++++++++++++++++++++----------- 1 file changed, 83 insertions(+), 37 deletions(-) diff --git a/packages/mcp/src/snapshot.ts b/packages/mcp/src/snapshot.ts index 5a5b934b..0a512279 100644 --- a/packages/mcp/src/snapshot.ts +++ b/packages/mcp/src/snapshot.ts @@ -1,9 +1,13 @@ import * as fs from "fs"; import * as path from "path"; import * as os from "os"; -import { CodebaseSnapshot } from "./config.js"; +import { CodebaseSnapshot, CodebaseSnapshotV2 } from "./config.js"; import { withFileLock } from './lock.js'; +function isV2(snapshot: CodebaseSnapshot): snapshot is CodebaseSnapshotV2 { + return (snapshot as CodebaseSnapshotV2).formatVersion === 'v2'; +} + export class SnapshotManager { private snapshotFilePath: string; private indexedCodebases: string[] = []; @@ -78,26 +82,43 @@ export class SnapshotManager { // Validate indexed codebases const validCodebases: string[] = []; - for (const codebasePath of snapshot.indexedCodebases) { - if (fs.existsSync(codebasePath)) { - validCodebases.push(codebasePath); - console.log(`[SNAPSHOT-DEBUG] Validated codebase: ${codebasePath}`); - } else { - console.warn(`[SNAPSHOT-DEBUG] Codebase no longer exists, removing: ${codebasePath}`); - } - } - - // Handle indexing codebases let indexingCodebasesList: string[] = []; let indexingProgress: Record = {}; - if (Array.isArray(snapshot.indexingCodebases)) { - indexingCodebasesList = snapshot.indexingCodebases; - console.log(`[SNAPSHOT-DEBUG] Found legacy indexingCodebases array format with ${indexingCodebasesList.length} entries`); - } else if (snapshot.indexingCodebases && typeof snapshot.indexingCodebases === 'object') { - indexingProgress = snapshot.indexingCodebases; - indexingCodebasesList = Object.keys(indexingProgress); - console.log(`[SNAPSHOT-DEBUG] Found new indexingCodebases object format with ${indexingCodebasesList.length} entries`); + if (isV2(snapshot)) { + // Handle V2 + for (const [path, info] of Object.entries(snapshot.codebases)) { + if (info.status === 'indexed') { + if (fs.existsSync(path)) { + validCodebases.push(path); + console.log(`[SNAPSHOT-DEBUG] Validated codebase: ${path}`); + } else { + console.warn(`[SNAPSHOT-DEBUG] Codebase no longer exists, removing: ${path}`); + } + } else if (info.status === 'indexing') { + indexingCodebasesList.push(path); + indexingProgress[path] = info.indexingPercentage; + } + } + } else { + // Handle V1 + for (const codebasePath of snapshot.indexedCodebases) { + if (fs.existsSync(codebasePath)) { + validCodebases.push(codebasePath); + console.log(`[SNAPSHOT-DEBUG] Validated codebase: ${codebasePath}`); + } else { + console.warn(`[SNAPSHOT-DEBUG] Codebase no longer exists, removing: ${codebasePath}`); + } + } + + if (Array.isArray(snapshot.indexingCodebases)) { + indexingCodebasesList = snapshot.indexingCodebases; + console.log(`[SNAPSHOT-DEBUG] Found legacy indexingCodebases array format with ${indexingCodebasesList.length} entries`); + } else if (snapshot.indexingCodebases && typeof snapshot.indexingCodebases === 'object') { + indexingProgress = snapshot.indexingCodebases; + indexingCodebasesList = Object.keys(indexingProgress); + console.log(`[SNAPSHOT-DEBUG] Found new indexingCodebases object format with ${indexingCodebasesList.length} entries`); + } } // Restore valid indexing codebases with their progress @@ -157,15 +178,26 @@ export class SnapshotManager { const mergedIndexing = new Map(this.indexingCodebases); if (existingSnapshot) { - // Add indexed codebases from file - existingSnapshot.indexedCodebases.forEach(path => mergedIndexed.add(path)); - - // Merge indexing codebases (keep higher progress) - if (typeof existingSnapshot.indexingCodebases === 'object' && !Array.isArray(existingSnapshot.indexingCodebases)) { - Object.entries(existingSnapshot.indexingCodebases).forEach(([path, progress]) => { - const currentProgress = mergedIndexing.get(path) ?? 0; - mergedIndexing.set(path, Math.max(currentProgress, progress)); + if (isV2(existingSnapshot)) { + Object.entries(existingSnapshot.codebases).forEach(([path, info]) => { + if (info.status === 'indexed') { + mergedIndexed.add(path); + } else if (info.status === 'indexing') { + const currentProgress = mergedIndexing.get(path) ?? 0; + mergedIndexing.set(path, Math.max(currentProgress, info.indexingPercentage)); + } }); + } else { + // Add indexed codebases from file + existingSnapshot.indexedCodebases.forEach(path => mergedIndexed.add(path)); + + // Merge indexing codebases (keep higher progress) + if (typeof existingSnapshot.indexingCodebases === 'object' && !Array.isArray(existingSnapshot.indexingCodebases)) { + Object.entries(existingSnapshot.indexingCodebases).forEach(([path, progress]) => { + const currentProgress = mergedIndexing.get(path) ?? 0; + mergedIndexing.set(path, Math.max(currentProgress, progress)); + }); + } } } @@ -212,23 +244,37 @@ export class SnapshotManager { // Update indexed codebases (merge with existing) const updatedIndexed = new Set(this.indexedCodebases); - snapshot.indexedCodebases.forEach(path => { - if (fs.existsSync(path)) { - updatedIndexed.add(path); - } - }); - this.indexedCodebases = Array.from(updatedIndexed); - // Update indexing codebases (merge progress, keep higher values) - if (typeof snapshot.indexingCodebases === 'object' && !Array.isArray(snapshot.indexingCodebases)) { - Object.entries(snapshot.indexingCodebases).forEach(([path, progress]) => { - if (fs.existsSync(path)) { + if (isV2(snapshot)) { + Object.entries(snapshot.codebases).forEach(([path, info]) => { + if (info.status === 'indexed' && fs.existsSync(path)) { + updatedIndexed.add(path); + } + if (info.status === 'indexing' && fs.existsSync(path)) { const currentProgress = this.indexingCodebases.get(path) ?? 0; - this.indexingCodebases.set(path, Math.max(currentProgress, progress)); + this.indexingCodebases.set(path, Math.max(currentProgress, info.indexingPercentage)); + } + }); + } else { + snapshot.indexedCodebases.forEach(path => { + if (fs.existsSync(path)) { + updatedIndexed.add(path); } }); + + // Update indexing codebases (merge progress, keep higher values) + if (typeof snapshot.indexingCodebases === 'object' && !Array.isArray(snapshot.indexingCodebases)) { + Object.entries(snapshot.indexingCodebases).forEach(([path, progress]) => { + if (fs.existsSync(path)) { + const currentProgress = this.indexingCodebases.get(path) ?? 0; + this.indexingCodebases.set(path, Math.max(currentProgress, progress)); + } + }); + } } + this.indexedCodebases = Array.from(updatedIndexed); + console.log('[SNAPSHOT-DEBUG] Refreshed from file. Indexed:', this.indexedCodebases.length, 'Indexing:', this.indexingCodebases.size); }); } catch (error) {