diff --git a/package.json b/package.json index 140f8202244..095e592881b 100644 --- a/package.json +++ b/package.json @@ -482,6 +482,7 @@ "test:performance:mongodb": "npm run transpile && cross-env DEFAULT_STORAGE=mongodb mocha --config ./config/.mocharc.cjs ./test_tmp/performance.test.js --unhandled-rejections=strict --expose-gc", "test:performance:custom:browser": "npm run transpile && cross-env DEFAULT_STORAGE=custom CI=true karma start ./config/karma.performance.conf.cjs --single-run", "test:performance:custom:node": "npm run transpile && cross-env DEFAULT_STORAGE=custom mocha --config ./config/.mocharc.cjs ./test_tmp/performance.test.js --unhandled-rejections=strict --expose-gc --experimental-sqlite", + "test:performance:sqlite-trial": "npm run transpile && cross-env DEFAULT_STORAGE=sqlite-trial mocha --config ./config/.mocharc.cjs ./test_tmp/performance.test.js --unhandled-rejections=strict --expose-gc --experimental-sqlite", "test:performance": "npm run build && npm run test:performance:dexie && npm run test:performance:memory:browser && npm run test:performance:memory:node", "test:query-correctness-fuzzing:memory:node": "npm run transpile && cross-env DEFAULT_STORAGE=memory mocha --config ./config/.mocharc.cjs ./test_tmp/query-correctness-fuzzing.test.js --unhandled-rejections=strict --expose-gc", "test:query-correctness-fuzzing:custom:node": "npm run transpile && cross-env DEFAULT_STORAGE=custom mocha --config ./config/.mocharc.cjs ./test_tmp/query-correctness-fuzzing.test.js --unhandled-rejections=strict --expose-gc", diff --git a/src/plugins/cleanup/cleanup.ts b/src/plugins/cleanup/cleanup.ts index 88da82a64c0..d778996cdb8 100644 --- a/src/plugins/cleanup/cleanup.ts +++ b/src/plugins/cleanup/cleanup.ts @@ -43,11 +43,29 @@ export async function startCleanupForRxCollection( export async function initialCleanupWait(collection: RxCollection, cleanupPolicy: RxCleanupPolicy) { + /** + * Always wait for the database storage token to be resolved first. + * When collection creation runs in parallel with the internal store bulkWrite, + * the cleanup hook can fire before addCollections() has returned to the caller. + * Waiting for the storage token ensures the database startup is complete. + */ + await collection.database.storageToken; + + /** + * Yield the event loop to let addCollections() return to the caller + * before cleanup begins. This ensures the caller has a chance to set up + * replications (which block cleanup via awaitReplicationsInSync) + * before the first cleanup run. + */ + await new Promise(resolve => setTimeout(resolve, 0)); + /** * Wait until minimumDatabaseInstanceAge is reached * or collection is closed. */ - await collection.promiseWait(cleanupPolicy.minimumCollectionAge); + if (cleanupPolicy.minimumCollectionAge) { + await collection.promiseWait(cleanupPolicy.minimumCollectionAge); + } if (collection.closed) { return; } diff --git a/src/rx-collection.ts b/src/rx-collection.ts index c53bb4082ed..11bb2dc16c2 100644 --- a/src/rx-collection.ts +++ b/src/rx-collection.ts @@ -325,7 +325,16 @@ export class RxCollectionBase< this._subs.push(listenToRemoveSub); - const databaseStorageToken = await this.database.storageToken; + /** + * Do not await the storageToken here to keep it off the critical path. + * The token is resolved before any write can emit events, because + * addCollections() resolves it in the same bulkWrite. We cache + * it once resolved and use it synchronously in the callback. + */ + let databaseStorageToken: string = ''; + this.database.storageToken.then(t => { + databaseStorageToken = t; + }); const subDocs = this.storageInstance.changeStream().subscribe((eventBulk: any) => { const changeEventBulk: RxChangeEventBulk = { id: eventBulk.id, diff --git a/src/rx-database-internal-store.ts b/src/rx-database-internal-store.ts index 2bf45cc08ef..a167d194985 100644 --- a/src/rx-database-internal-store.ts +++ b/src/rx-database-internal-store.ts @@ -16,6 +16,7 @@ import type { RxDatabase, RxDocumentData, RxJsonSchema, + RxStorageBulkWriteResponse, RxStorageInstance, RxStorageWriteErrorConflict } from './types/index.d.ts'; @@ -244,6 +245,87 @@ export async function ensureStorageTokenDocumentExists( + rxDatabase: RxDatabase, + passwordHash: string | undefined +): RxDocumentData { + const storageToken = randomToken(10); + return { + id: STORAGE_TOKEN_DOCUMENT_ID, + context: INTERNAL_CONTEXT_STORAGE_TOKEN, + key: STORAGE_TOKEN_DOCUMENT_KEY, + data: { + rxdbVersion: rxDatabase.rxdbVersion, + token: storageToken, + instanceToken: rxDatabase.token, + passwordHash + }, + _deleted: false, + _meta: getDefaultRxDocumentMeta(), + _rev: getDefaultRevision(), + _attachments: {} + }; +} + + +/** + * Processes the storage token result from a combined bulkWrite + * that included the token document alongside other documents. + */ +export function processStorageTokenResult( + rxDatabase: RxDatabase, + tokenDocData: RxDocumentData, + writeResult: RxStorageBulkWriteResponse> +): RxDocumentData { + const tokenError = writeResult.error.find( + err => err.documentId === STORAGE_TOKEN_DOCUMENT_ID + ); + + if (!tokenError) { + return tokenDocData; + } + + if ( + tokenError.isError && + isBulkWriteConflictError(tokenError) + ) { + const conflictError = (tokenError as RxStorageWriteErrorConflict); + + if ( + !isDatabaseStateVersionCompatibleWithDatabaseCode( + conflictError.documentInDb.data.rxdbVersion, + rxDatabase.rxdbVersion + ) + ) { + throw newRxError('DM5', { + args: { + database: rxDatabase.name, + databaseStateVersion: conflictError.documentInDb.data.rxdbVersion, + codeVersion: rxDatabase.rxdbVersion + } + }); + } + + if ( + tokenDocData.data.passwordHash && + tokenDocData.data.passwordHash !== conflictError.documentInDb.data.passwordHash + ) { + throw newRxError('DB1', { + passwordHash: tokenDocData.data.passwordHash, + existingPasswordHash: conflictError.documentInDb.data.passwordHash + }); + } + + return ensureNotFalsy(conflictError.documentInDb); + } + throw tokenError; +} + + export function isDatabaseStateVersionCompatibleWithDatabaseCode( databaseStateVersion: string, codeVersion: string diff --git a/src/rx-database.ts b/src/rx-database.ts index f830ff92afc..1365936f096 100644 --- a/src/rx-database.ts +++ b/src/rx-database.ts @@ -43,7 +43,8 @@ import { getDefaultRevision, getDefaultRxDocumentMeta, defaultHashSha256, - RXDB_VERSION + RXDB_VERSION, + hasPremiumFlag } from './plugins/utils/index.ts'; import { newRxError @@ -77,6 +78,9 @@ import { import type { RxBackupState } from './plugins/backup/index.ts'; import { ensureStorageTokenDocumentExists, + buildStorageTokenDocumentData, + processStorageTokenResult, + STORAGE_TOKEN_DOCUMENT_ID, getAllCollectionDocuments, getPrimaryKeyOfInternalDocument, INTERNAL_CONTEXT_COLLECTION, @@ -163,18 +167,49 @@ export class RxDatabaseBase< ); /** - * Start writing the storage token. - * Do not await the creation because it would run - * in a critical path that increases startup time. - * - * Writing the token takes about 20 milliseconds - * even on a fast adapter, so this is worth it. + * Start computing the password hash early (async). + * The hash is needed for the storage token document. + * By starting it here, it runs in parallel with + * any work the caller does before calling addCollections(). */ - this.storageTokenDocument = ensureStorageTokenDocumentExists(this.asRxDatabase) + this._passwordHashPromise = this.password + ? this.hashFunction(JSON.stringify(this.password)) + : undefined; + + /** + * Pre-trigger the premium flag check so it's cached + * before collection.prepare() is called. This avoids + * a crypto.subtle.digest call on the critical path + * for each collection during prepare(). + */ + hasPremiumFlag(); + + /** + * Defer writing the storage token. + * addCollections() will include the token document in its + * bulkWrite to save one IndexedDB transaction. If addCollections() + * is never called, a fallback writes the token separately. + */ + const tokenDocResolvers = createPromiseWithResolvers>(); + this._tokenDocResolvers = tokenDocResolvers; + this._storageTokenWriteStarted = false; + + this.storageTokenDocument = tokenDocResolvers.promise .catch(err => this.startupErrors.push(err) as any); this.storageToken = this.storageTokenDocument .then(doc => doc.data.token) .catch(err => this.startupErrors.push(err) as any); + + /** + * Fallback: write the token separately if addCollections() + * has not been called. setTimeout(0) runs after the current + * microtask queue, giving addCollections() a chance to batch it. + */ + setTimeout(() => { + if (!this._storageTokenWriteStarted) { + this._writeStorageTokenFallback(); + } + }, 0); } } @@ -257,6 +292,33 @@ export class RxDatabaseBase< */ public storageTokenDocument: Promise> = PROMISE_RESOLVE_FALSE as any; + /** + * Internal state for deferred storage token writing. + * The token document is prepared in the constructor + * and written together with collection metadata in addCollections(). + */ + public _passwordHashPromise: Promise | undefined = undefined; + public _tokenDocResolvers: { + promise: Promise>; + resolve: (value: RxDocumentData) => void; + reject: (reason?: any) => void; + } = undefined as any; + public _storageTokenWriteStarted: boolean = false; + + /** + * Fallback: write the token separately if addCollections() + * was not called to batch the write. + */ + public _writeStorageTokenFallback(): void { + if (this._storageTokenWriteStarted) { + return; + } + this._storageTokenWriteStarted = true; + ensureStorageTokenDocumentExists(this.asRxDatabase) + .then(result => this._tokenDocResolvers.resolve(result)) + .catch(err => this._tokenDocResolvers.reject(err)); + } + /** * Contains the ids of all event bulks that have been emitted * by the database. @@ -319,75 +381,64 @@ export class RxDatabaseBase< }): Promise<{ [key in keyof CreatedCollections]: RxCollection }> { const jsonSchemas: { [key in keyof CreatedCollections]: RxJsonSchema } = {} as any; const schemas: { [key in keyof CreatedCollections]: RxSchema } = {} as any; - const bulkPutDocs: BulkWriteRow[] = []; + const bulkPutDocs: BulkWriteRow>[] = []; const useArgsByCollectionName: any = {}; - await Promise.all( - Object.entries(collectionCreators).map(async ([name, args]) => { - const collectionName: keyof CreatedCollections = name as any; - const rxJsonSchema = (args as RxCollectionCreator).schema; - jsonSchemas[collectionName] = rxJsonSchema; - const schema = createRxSchema(rxJsonSchema, this.hashFunction); - schemas[collectionName] = schema; - - // collection already exists - if ((this.collections as any)[name]) { - throw newRxError('DB3', { - name - }); - } + /** + * Phase 1: Synchronous setup — create schemas, check duplicates, + * prepare useArgs, and eagerly trigger schema hash computation. + * Accessing schema.hash starts the crypto.subtle.digest call + * immediately (non-blocking), so it runs in parallel with + * the storage instance creation in Phase 2. + */ + const hashPromises: { [key: string]: Promise; } = {}; + Object.entries(collectionCreators).forEach(([name, args]) => { + const collectionName: keyof CreatedCollections = name as any; + const rxJsonSchema = (args as RxCollectionCreator).schema; + jsonSchemas[collectionName] = rxJsonSchema; + const schema = createRxSchema(rxJsonSchema, this.hashFunction); + schemas[collectionName] = schema; + + /** + * Eagerly trigger schema hash computation. + * The getter calls JSON.stringify (sync) + hashFunction (async crypto.subtle.digest). + * Starting it here lets the digest run in parallel with the rest of + * Phase 1 work and all of Phase 2's storage instance creation. + */ + hashPromises[name] = schema.hash; - const collectionNameWithVersion = _collectionNamePrimary(name, rxJsonSchema); - const collectionDocData: RxDocumentData = { - id: getPrimaryKeyOfInternalDocument( - collectionNameWithVersion, - INTERNAL_CONTEXT_COLLECTION - ), - key: collectionNameWithVersion, - context: INTERNAL_CONTEXT_COLLECTION, - data: { - name: collectionName as any, - schemaHash: await schema.hash, - schema: schema.jsonSchema, - version: schema.version, - connectedStorages: [] - }, - _deleted: false, - _meta: getDefaultRxDocumentMeta(), - _rev: getDefaultRevision(), - _attachments: {} - }; - bulkPutDocs.push({ - document: collectionDocData + if ((this.collections as any)[name]) { + throw newRxError('DB3', { + name }); + } - const useArgs: any = Object.assign( - {}, - args, - { - name: collectionName, - schema, - database: this - } - ); - - // run hooks - const hookData: RxCollectionCreator & { name: string; } = flatClone(args) as any; - (hookData as any).database = this; - hookData.name = name; - runPluginHooks('preCreateRxCollection', hookData); - useArgs.conflictHandler = hookData.conflictHandler; - - useArgsByCollectionName[collectionName] = useArgs; - }) - ); + const useArgs: any = Object.assign( + {}, + args, + { + name: collectionName, + schema, + database: this + } + ); + + const hookData: RxCollectionCreator & { name: string; } = flatClone(args) as any; + (hookData as any).database = this; + hookData.name = name; + runPluginHooks('preCreateRxCollection', hookData); + useArgs.conflictHandler = hookData.conflictHandler; + + useArgsByCollectionName[collectionName] = useArgs; + }); /** - * Optimization: Start creating collection storage instances - * in parallel with the internal store bulkWrite and startup error check. - * Storage instance creation is independent of the internal store write, - * so we can overlap these I/O operations to reduce time-to-first-insert. + * Phase 2: Start all async I/O in parallel: + * - collection storage instance creation (opens IndexedDB databases) + * - schema hash computation (crypto.subtle.digest) + * - password hash (if not already computed, for token doc) + * All of these are independent and can overlap. */ const collectionStorageInstancePromises: { [key: string]: Promise>; } = {}; Object.keys(collectionCreators).forEach((collectionName) => { @@ -407,71 +458,89 @@ export class RxDatabaseBase< this.asRxDatabase, storageInstanceCreationParams ); - /** - * Prevent unhandled promise rejection warnings. - * If ensureNoStartupErrors() or the bulkWrite error handling throws - * (e.g. password mismatch, schema mismatch), these promises might - * never be awaited. The actual errors are still propagated when - * the promises are awaited in Phase 5 below. - */ promise.catch(() => { }); collectionStorageInstancePromises[collectionName] = promise; }); /** - * If the ensureNoStartupErrors or the bulkWrite error handling throws, - * we must close any pre-created storage instances to avoid resource leaks. + * Phase 3: Build all bulkWrite documents in a single Promise.all so that + * password hash + schema hashes resolve in parallel. The storage token + * document is included to combine both writes into one IDB transaction. */ - let putDocsResult; - try { - [putDocsResult] = await Promise.all([ - this.internalStore.bulkWrite( - bulkPutDocs, - 'rx-database-add-collection' + let includesTokenDoc = false; + let tokenDocData: RxDocumentData | undefined; + const shouldIncludeToken = !this._storageTokenWriteStarted; + if (shouldIncludeToken) { + this._storageTokenWriteStarted = true; + includesTokenDoc = true; + } + + const collectionDocs: BulkWriteRow>[] = []; + const buildPromises: Promise[] = Object.entries(collectionCreators).map(async ([name]) => { + const collectionName: keyof CreatedCollections = name as any; + const rxJsonSchema = jsonSchemas[collectionName]; + const schema = schemas[collectionName]; + const schemaHash = await hashPromises[name]; + + const collectionNameWithVersion = _collectionNamePrimary(name, rxJsonSchema); + const collectionDocData: RxDocumentData = { + id: getPrimaryKeyOfInternalDocument( + collectionNameWithVersion, + INTERNAL_CONTEXT_COLLECTION ), - ensureNoStartupErrors(this) - ]); + key: collectionNameWithVersion, + context: INTERNAL_CONTEXT_COLLECTION, + data: { + name: collectionName as any, + schemaHash, + schema: schema.jsonSchema, + version: schema.version, + connectedStorages: [] + }, + _deleted: false, + _meta: getDefaultRxDocumentMeta(), + _rev: getDefaultRevision(), + _attachments: {} + }; + collectionDocs.push({ + document: collectionDocData + } as any); + }); - await Promise.all( - putDocsResult.error.map(async (error) => { - if (error.status !== 409) { - throw newRxError('DB12', { - database: this.name, - writeError: error - }); - } - const docInDb: RxDocumentData = ensureNotFalsy(error.documentInDb); - const collectionName = docInDb.data.name; - const schema = (schemas as any)[collectionName]; - // collection already exists but has different schema - if (docInDb.data.schemaHash !== await schema.hash) { - throw newRxError('DB6', { - database: this.name, - collection: collectionName, - previousSchemaHash: docInDb.data.schemaHash, - schemaHash: await schema.hash, - previousSchema: docInDb.data.schema, - schema: ensureNotFalsy((jsonSchemas as any)[collectionName]) - }); - } - }) + // Build token doc in parallel with schema hash computation + if (shouldIncludeToken) { + buildPromises.push( + (async () => { + const passwordHash = this._passwordHashPromise + ? await this._passwordHashPromise + : undefined; + tokenDocData = buildStorageTokenDocumentData(this.asRxDatabase, passwordHash); + })() ); - } catch (err) { - /** - * Close any pre-created storage instances on error. - * Some instances might have failed to create (rejected promise), - * so we catch and ignore errors during cleanup. - */ - await Promise.all( - Object.values(collectionStorageInstancePromises).map( - p => p.then(instance => instance.close()).catch(() => { }) - ) - ); - throw err; } + await Promise.all(buildPromises); + + // Assemble final bulkWrite array: token doc first (if included), then collection docs + if (includesTokenDoc && tokenDocData) { + bulkPutDocs.push({ document: tokenDocData } as any); + } + for (let i = 0; i < collectionDocs.length; i++) { + bulkPutDocs.push(collectionDocs[i]); + } + + + /** + * Phase 4: Run the internal store bulkWrite AND collection creation + prepare() + * in parallel. Collection prepare() only needs the storage instance (from Phase 2), + * not the internal store write result. This overlaps the IDB transaction for + * metadata with the collection setup work (wrapping storage, creating caches, + * subscribing to change streams). + */ const ret: { [key in keyof CreatedCollections]: RxCollection } = {} as any; - await Promise.all( + + // Start collection creation immediately (doesn't need bulkWrite result) + const collectionCreationPromise = Promise.all( Object.keys(collectionCreators).map(async (collectionName) => { const useArgs = useArgsByCollectionName[collectionName]; const storageInstance = await collectionStorageInstancePromises[collectionName]; @@ -494,6 +563,80 @@ export class RxDatabaseBase< } }) ); + collectionCreationPromise.catch(() => { }); + + // Run bulkWrite in parallel with collection creation + const bulkWritePromise = (async () => { + const putDocsResult = await this.internalStore.bulkWrite( + bulkPutDocs as any, + 'rx-database-add-collection' + ); + + if (includesTokenDoc) { + try { + const resolvedTokenDoc = processStorageTokenResult( + this.asRxDatabase, + ensureNotFalsy(tokenDocData), + putDocsResult + ); + this._tokenDocResolvers.resolve(resolvedTokenDoc); + } catch (tokenErr: any) { + this._tokenDocResolvers.reject(tokenErr); + throw tokenErr; + } + } else { + await ensureNoStartupErrors(this); + } + + // Handle collection metadata write errors + await Promise.all( + putDocsResult.error + .filter(error => error.documentId !== STORAGE_TOKEN_DOCUMENT_ID) + .map(async (error) => { + if (error.status !== 409) { + throw newRxError('DB12', { + database: this.name, + writeError: error + }); + } + const docInDb: RxDocumentData = ensureNotFalsy(error.documentInDb); + const collectionName = docInDb.data.name; + const schema = (schemas as any)[collectionName]; + if (docInDb.data.schemaHash !== await schema.hash) { + throw newRxError('DB6', { + database: this.name, + collection: collectionName, + previousSchemaHash: docInDb.data.schemaHash, + schemaHash: await schema.hash, + previousSchema: docInDb.data.schema, + schema: ensureNotFalsy((jsonSchemas as any)[collectionName]) + }); + } + }) + ); + })(); + + try { + /** + * Use allSettled so we can check both results. + * The bulkWrite error (e.g. DB6 schema mismatch) takes priority + * over storage instance errors (plain Error from storage). + */ + const [bulkWriteResult, collectionResult] = await Promise.allSettled([bulkWritePromise, collectionCreationPromise]); + if (bulkWriteResult.status === 'rejected') { + throw bulkWriteResult.reason; + } + if (collectionResult.status === 'rejected') { + throw collectionResult.reason; + } + } catch (err) { + await Promise.all( + Object.values(collectionStorageInstancePromises).map( + p => p.then(instance => instance.close()).catch(() => { }) + ) + ); + throw err; + } return ret; } @@ -912,6 +1055,9 @@ export async function isRxDatabaseFirstTimeInstantiated( export async function ensureNoStartupErrors( rxDatabase: RxDatabaseBase ) { + if (!rxDatabase._storageTokenWriteStarted) { + rxDatabase._writeStorageTokenFallback(); + } await rxDatabase.storageToken; if (rxDatabase.startupErrors[0]) { throw rxDatabase.startupErrors[0]; diff --git a/test/performance.test.ts b/test/performance.test.ts index 3d74ca1a45e..ac7765f942f 100644 --- a/test/performance.test.ts +++ b/test/performance.test.ts @@ -31,10 +31,19 @@ describe('performance.test.ts', () => { this.timeout(500 * 1000); const runs = isFastMode() ? 1 : 40; const perfStorage = config.storage.getPerformanceStorage(); + /** + * The SQLite trial storage has a 300-document cap per collection, + * so we reduce the bulk insert count to stay within the limit. + */ + const isSQLiteTrial = config.storage.name === 'sqlite-trial'; await runPerformanceTests( perfStorage.storage, perfStorage.description, - { runs } + { + runs, + collectionsAmount: 10, + ...(isSQLiteTrial ? { docsAmount: 120, serialDocsAmount: 10 } : {}) + } ); }); /**