-
Notifications
You must be signed in to change notification settings - Fork 5
feat: move docs map to docroom #83
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 6 commits
d26a5e3
3fa9174
89f3d1a
606b269
34abb04
38d341d
b76692c
c24dff9
b896242
4a1b60c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,9 +24,6 @@ const wsReadyStateOpen = 1; | |
| // disable gc when using snapshots! | ||
| const gcEnabled = false; | ||
|
|
||
| // The local cache of ydocs | ||
| const docs = new Map(); | ||
|
|
||
| const messageSync = 0; | ||
| const messageAwareness = 1; | ||
| const MAX_STORAGE_KEYS = 128; | ||
|
|
@@ -39,14 +36,12 @@ const MAX_STORAGE_VALUE_SIZE = 131072; | |
| * @param {WebSocket} conn - the websocket connection to close. | ||
| */ | ||
| export const closeConn = (doc, conn) => { | ||
| // eslint-disable-next-line no-console | ||
| console.log('Closing connection for - removing awareness states', doc.name); | ||
| if (doc.conns.has(conn)) { | ||
| const controlledIds = doc.conns.get(conn); | ||
| doc.conns.delete(conn); | ||
| awarenessProtocol.removeAwarenessStates(doc.awareness, Array.from(controlledIds), null); | ||
|
|
||
| if (doc.conns.size === 0) { | ||
| docs.delete(doc.name); | ||
| } | ||
| } | ||
| conn.close(); | ||
| }; | ||
|
|
@@ -168,7 +163,7 @@ export const showError = (ydoc, err) => { | |
| }; | ||
|
|
||
| export const persistence = { | ||
| closeConn: closeConn.bind(this), | ||
| closeConn, | ||
|
|
||
| /** | ||
| * Get the document from da-admin. If da-admin doesn't have the doc, a new empty doc is | ||
|
|
@@ -278,7 +273,7 @@ export const persistence = { | |
| * @param {WebSocket} conn - the websocket connection | ||
| * @param {TransactionalStorage} storage - the worker transactional storage object | ||
| */ | ||
| bindState: async (docName, ydoc, conn, storage) => { | ||
| bindState: async (docName, ydoc, conn, storage, docsCache) => { | ||
| let timingReadStateDuration; | ||
| let timingDaAdminGetDuration; | ||
|
|
||
|
|
@@ -337,7 +332,7 @@ export const persistence = { | |
| // eslint-disable-next-line no-console | ||
| console.log('Could not be restored, trying to restore from da-admin', docName); | ||
| setTimeout(() => { | ||
| if (ydoc === docs.get(docName)) { | ||
| if (ydoc === docsCache.get(docName)) { | ||
| const rootType = ydoc.getXmlFragment('prosemirror'); | ||
| ydoc.transact(() => { | ||
| try { | ||
|
|
@@ -360,15 +355,15 @@ export const persistence = { | |
|
|
||
| ydoc.on('update', async () => { | ||
| // Whenever we receive an update on the document store it in the local storage | ||
| if (ydoc === docs.get(docName)) { // make sure this ydoc is still active | ||
| if (ydoc === docsCache.get(docName)) { // make sure this ydoc is still active | ||
| storeState(docName, Y.encodeStateAsUpdate(ydoc), storage); | ||
| } | ||
| }); | ||
|
|
||
| ydoc.on('update', debounce(async () => { | ||
| // If we receive an update on the document, store it in da-admin, but debounce it | ||
| // to avoid excessive da-admin calls. | ||
| if (ydoc === docs.get(docName)) { | ||
| if (ydoc === docsCache.get(docName)) { | ||
| current = await persistence.update(ydoc, current); | ||
| } | ||
| }, 2000, { maxWait: 10000 })); | ||
|
|
@@ -436,25 +431,20 @@ export class WSSharedDoc extends Y.Doc { | |
| * @param {boolean} gc - whether garbage collection is enabled | ||
| * @returns The Yjs document object, which may be shared across multiple sockets. | ||
| */ | ||
| export const getYDoc = async (docname, conn, env, storage, timingData, gc = true) => { | ||
| let doc = docs.get(docname); | ||
| if (doc === undefined) { | ||
| // The doc is not yet in the cache, create a new one. | ||
| doc = new WSSharedDoc(docname); | ||
| doc.gc = gc; | ||
| docs.set(docname, doc); | ||
| } | ||
|
|
||
| if (!doc.conns.get(conn)) { | ||
| doc.conns.set(conn, new Set()); | ||
| } | ||
| export const createYDoc = (docname, conn, env, storage, docsCache, gc = true) => { | ||
| const doc = new WSSharedDoc(docname); | ||
| doc.gc = gc; | ||
|
|
||
| // Store the service binding to da-admin which we receive through the environment in the doc | ||
| doc.daadmin = env.daadmin; | ||
| if (!doc.promise) { | ||
| // The doc is not yet bound to the persistence layer, do so now. The promise will be resolved | ||
| // when bound. | ||
| doc.promise = persistence.bindState(docname, doc, conn, storage); | ||
| doc.promise = persistence.bindState(docname, doc, conn, storage, docsCache); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is not doing what happened before. IIRC, the idea was to use the promise to make sure the doc doesn't get bound to persistence called concurrently. @bosschaert, are you sure this is still correct (I would have expected to have this move up to the caller like it used to for the await)...
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah yes, the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok, I restored the logic but split the method in 2: |
||
|
|
||
| return doc; | ||
| }; | ||
|
|
||
| export const setupYDoc = async (doc, conn, timingData) => { | ||
| if (!doc.conns.get(conn)) { | ||
| doc.conns.set(conn, new Set()); | ||
| } | ||
|
|
||
| // We wait for the promise, for second and subsequent connections to the same doc, this will | ||
|
|
@@ -463,12 +453,8 @@ export const getYDoc = async (docname, conn, env, storage, timingData, gc = true | |
| if (timingData) { | ||
| timings.forEach((v, k) => timingData.set(k, v)); | ||
| } | ||
| return doc; | ||
| }; | ||
|
|
||
| // For testing | ||
| export const setYDoc = (docname, ydoc) => docs.set(docname, ydoc); | ||
|
|
||
| // This read sync message handles readonly connections | ||
| const readSyncMessage = (decoder, encoder, doc, readOnly, transactionOrigin) => { | ||
| const messageType = decoding.readVarUint(decoder); | ||
|
|
@@ -489,10 +475,11 @@ const readSyncMessage = (decoder, encoder, doc, readOnly, transactionOrigin) => | |
| }; | ||
|
|
||
| export const messageListener = (conn, doc, message) => { | ||
| let messageType; | ||
| try { | ||
| const encoder = encoding.createEncoder(); | ||
| const decoder = decoding.createDecoder(message); | ||
| const messageType = decoding.readVarUint(decoder); | ||
| messageType = decoding.readVarUint(decoder); | ||
| switch (messageType) { | ||
| case messageSync: | ||
| encoding.writeVarUint(encoder, messageSync); | ||
|
|
@@ -515,7 +502,7 @@ export const messageListener = (conn, doc, message) => { | |
| } | ||
| } catch (err) { | ||
| // eslint-disable-next-line no-console | ||
| console.error('Error in messageListener', err); | ||
| console.error(`Error in messageListener ${doc?.name} - messageType: ${messageType}`, err); | ||
| showError(doc, err); | ||
| } | ||
| }; | ||
|
|
@@ -528,18 +515,18 @@ export const messageListener = (conn, doc, message) => { | |
| * @param {string} docName - The name of the document | ||
| * @returns true if the document was found and invalidated, false otherwise. | ||
| */ | ||
| export const invalidateFromAdmin = async (docName) => { | ||
| // eslint-disable-next-line no-console | ||
| console.log('Invalidate from Admin received', docName); | ||
| const ydoc = docs.get(docName); | ||
| export const invalidateFromAdmin = async (ydoc) => { | ||
| if (ydoc) { | ||
| // eslint-disable-next-line no-console | ||
| console.log('Invalidating document', ydoc.name); | ||
|
|
||
| // As we are closing all connections, the ydoc will be removed from the docs map | ||
| ydoc.conns.forEach((_, c) => closeConn(ydoc, c)); | ||
|
|
||
| return true; | ||
| } else { | ||
| // eslint-disable-next-line no-console | ||
| console.log('Document not found', docName); | ||
| console.log('No document to invalidate'); | ||
| } | ||
| return false; | ||
| }; | ||
|
|
@@ -552,38 +539,32 @@ export const invalidateFromAdmin = async (docName) => { | |
| * @param {TransactionalStorage} storage - The worker transactional storage object | ||
| * @returns {Promise<void>} - The return value of this | ||
| */ | ||
| export const setupWSConnection = async (conn, docName, env, storage) => { | ||
| const timingData = new Map(); | ||
|
|
||
| export const setupWSConnection = async (conn, ydoc) => { | ||
| // eslint-disable-next-line no-param-reassign | ||
| conn.binaryType = 'arraybuffer'; | ||
| // get doc, initialize if it does not exist yet | ||
| const doc = await getYDoc(docName, conn, env, storage, timingData, true); | ||
|
|
||
| // listen and reply to events | ||
| conn.addEventListener('message', (message) => messageListener(conn, doc, new Uint8Array(message.data))); | ||
| conn.addEventListener('message', (message) => messageListener(conn, ydoc, new Uint8Array(message.data))); | ||
|
|
||
| // Check if connection is still alive | ||
| conn.addEventListener('close', () => { | ||
| closeConn(doc, conn); | ||
| closeConn(ydoc, conn); | ||
| }); | ||
| // put the following in a variables in a block so the interval handlers don't keep in in | ||
| // scope | ||
| { | ||
| // send sync step 1 | ||
| let encoder = encoding.createEncoder(); | ||
| encoding.writeVarUint(encoder, messageSync); | ||
| syncProtocol.writeSyncStep1(encoder, doc); | ||
| send(doc, conn, encoding.toUint8Array(encoder)); | ||
| const awarenessStates = doc.awareness.getStates(); | ||
| syncProtocol.writeSyncStep1(encoder, ydoc); | ||
| send(ydoc, conn, encoding.toUint8Array(encoder)); | ||
| const awarenessStates = ydoc.awareness.getStates(); | ||
| if (awarenessStates.size > 0) { | ||
| encoder = encoding.createEncoder(); | ||
| encoding.writeVarUint(encoder, messageAwareness); | ||
| encoding.writeVarUint8Array(encoder, awarenessProtocol | ||
| .encodeAwarenessUpdate(doc.awareness, Array.from(awarenessStates.keys()))); | ||
| send(doc, conn, encoding.toUint8Array(encoder)); | ||
| .encodeAwarenessUpdate(ydoc.awareness, Array.from(awarenessStates.keys()))); | ||
| send(ydoc, conn, encoding.toUint8Array(encoder)); | ||
| } | ||
| } | ||
|
|
||
| return timingData; | ||
| }; | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure this is correct. Wouldn't this basically remove the doc if a connection goes away?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On connection close, the doc is removed from the map ("local cache"). This is the same logic as before, just moved the code here to have access to the cache.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, what was missing in the new version is the check if there is no more connections. Added.