Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,7 @@
"test:performance:mongodb": "npm run transpile && cross-env DEFAULT_STORAGE=mongodb mocha --config ./config/.mocharc.cjs ./test_tmp/performance.test.js --unhandled-rejections=strict --expose-gc",
"test:performance:custom:browser": "npm run transpile && cross-env DEFAULT_STORAGE=custom CI=true karma start ./config/karma.performance.conf.cjs --single-run",
"test:performance:custom:node": "npm run transpile && cross-env DEFAULT_STORAGE=custom mocha --config ./config/.mocharc.cjs ./test_tmp/performance.test.js --unhandled-rejections=strict --expose-gc --experimental-sqlite",
"test:performance:sqlite-trial": "npm run transpile && cross-env DEFAULT_STORAGE=sqlite-trial mocha --config ./config/.mocharc.cjs ./test_tmp/performance.test.js --unhandled-rejections=strict --expose-gc --experimental-sqlite",
"test:performance": "npm run build && npm run test:performance:dexie && npm run test:performance:memory:browser && npm run test:performance:memory:node",
"test:query-correctness-fuzzing:memory:node": "npm run transpile && cross-env DEFAULT_STORAGE=memory mocha --config ./config/.mocharc.cjs ./test_tmp/query-correctness-fuzzing.test.js --unhandled-rejections=strict --expose-gc",
"test:query-correctness-fuzzing:custom:node": "npm run transpile && cross-env DEFAULT_STORAGE=custom mocha --config ./config/.mocharc.cjs ./test_tmp/query-correctness-fuzzing.test.js --unhandled-rejections=strict --expose-gc",
Expand Down
20 changes: 19 additions & 1 deletion src/plugins/cleanup/cleanup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,29 @@ export async function startCleanupForRxCollection(


export async function initialCleanupWait(collection: RxCollection, cleanupPolicy: RxCleanupPolicy) {
/**
* Always wait for the database storage token to be resolved first.
* When collection creation runs in parallel with the internal store bulkWrite,
* the cleanup hook can fire before addCollections() has returned to the caller.
* Waiting for the storage token ensures the database startup is complete.
*/
await collection.database.storageToken;

/**
* Yield the event loop to let addCollections() return to the caller
* before cleanup begins. This ensures the caller has a chance to set up
* replications (which block cleanup via awaitReplicationsInSync)
* before the first cleanup run.
*/
await new Promise(resolve => setTimeout(resolve, 0));

/**
* Wait until minimumDatabaseInstanceAge is reached
* or collection is closed.
*/
await collection.promiseWait(cleanupPolicy.minimumCollectionAge);
if (cleanupPolicy.minimumCollectionAge) {
await collection.promiseWait(cleanupPolicy.minimumCollectionAge);
}
if (collection.closed) {
return;
}
Expand Down
11 changes: 10 additions & 1 deletion src/rx-collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,16 @@ export class RxCollectionBase<
this._subs.push(listenToRemoveSub);


const databaseStorageToken = await this.database.storageToken;
/**
* Do not await the storageToken here to keep it off the critical path.
* The token is resolved before any write can emit events, because
* addCollections() resolves it in the same bulkWrite. We cache
* it once resolved and use it synchronously in the callback.
*/
let databaseStorageToken: string = '';
this.database.storageToken.then(t => {
databaseStorageToken = t;
});
const subDocs = this.storageInstance.changeStream().subscribe((eventBulk: any) => {
const changeEventBulk: RxChangeEventBulk<RxDocumentType | RxLocalDocumentData> = {
id: eventBulk.id,
Expand Down
82 changes: 82 additions & 0 deletions src/rx-database-internal-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import type {
RxDatabase,
RxDocumentData,
RxJsonSchema,
RxStorageBulkWriteResponse,
RxStorageInstance,
RxStorageWriteErrorConflict
} from './types/index.d.ts';
Expand Down Expand Up @@ -244,6 +245,87 @@ export async function ensureStorageTokenDocumentExists<Collections extends Colle
}


/**
* Builds storage token document data without writing it,
* so it can be included in a combined bulkWrite.
*/
export function buildStorageTokenDocumentData<Collections extends CollectionsOfDatabase = any>(
rxDatabase: RxDatabase<Collections>,
passwordHash: string | undefined
): RxDocumentData<InternalStoreStorageTokenDocType> {
const storageToken = randomToken(10);
return {
id: STORAGE_TOKEN_DOCUMENT_ID,
context: INTERNAL_CONTEXT_STORAGE_TOKEN,
key: STORAGE_TOKEN_DOCUMENT_KEY,
data: {
rxdbVersion: rxDatabase.rxdbVersion,
token: storageToken,
instanceToken: rxDatabase.token,
passwordHash
},
_deleted: false,
_meta: getDefaultRxDocumentMeta(),
_rev: getDefaultRevision(),
_attachments: {}
};
}


/**
* Processes the storage token result from a combined bulkWrite
* that included the token document alongside other documents.
*/
export function processStorageTokenResult<Collections extends CollectionsOfDatabase = any>(
rxDatabase: RxDatabase<Collections>,
tokenDocData: RxDocumentData<InternalStoreStorageTokenDocType>,
writeResult: RxStorageBulkWriteResponse<InternalStoreDocType<any>>
): RxDocumentData<InternalStoreStorageTokenDocType> {
const tokenError = writeResult.error.find(
err => err.documentId === STORAGE_TOKEN_DOCUMENT_ID
);

if (!tokenError) {
return tokenDocData;
}

if (
tokenError.isError &&
isBulkWriteConflictError(tokenError)
) {
const conflictError = (tokenError as RxStorageWriteErrorConflict<InternalStoreStorageTokenDocType>);

if (
!isDatabaseStateVersionCompatibleWithDatabaseCode(
conflictError.documentInDb.data.rxdbVersion,
rxDatabase.rxdbVersion
)
) {
throw newRxError('DM5', {
args: {
database: rxDatabase.name,
databaseStateVersion: conflictError.documentInDb.data.rxdbVersion,
codeVersion: rxDatabase.rxdbVersion
}
});
}

if (
tokenDocData.data.passwordHash &&
tokenDocData.data.passwordHash !== conflictError.documentInDb.data.passwordHash
) {
throw newRxError('DB1', {
passwordHash: tokenDocData.data.passwordHash,
existingPasswordHash: conflictError.documentInDb.data.passwordHash
});
}

return ensureNotFalsy(conflictError.documentInDb);
}
throw tokenError;
}


export function isDatabaseStateVersionCompatibleWithDatabaseCode(
databaseStateVersion: string,
codeVersion: string
Expand Down
Loading
Loading