From 304812fb07680f7c4c3265561bf57d7fade1cac2 Mon Sep 17 00:00:00 2001 From: Kevin Jahns Date: Sun, 17 Jan 2021 15:22:36 +0100 Subject: [PATCH] concept for improved implementation of pending updates --- src/index.js | 4 +- src/utils/StructStore.js | 19 ++---- src/utils/encoding.js | 129 +++++++++++++++++++-------------------- 3 files changed, 69 insertions(+), 83 deletions(-) diff --git a/src/index.js b/src/index.js index 5a2893f2..ec2fe7f9 100644 --- a/src/index.js +++ b/src/index.js @@ -80,5 +80,7 @@ export { parseUpdateMeta, parseUpdateMetaV2, encodeStateVectorFromUpdate, - encodeStateVectorFromUpdateV2 + encodeStateVectorFromUpdateV2, + encodeRelativePosition, + decodeRelativePosition } from './internals.js' diff --git a/src/utils/StructStore.js b/src/utils/StructStore.js index f2c7fd8a..4a773127 100644 --- a/src/utils/StructStore.js +++ b/src/utils/StructStore.js @@ -15,24 +15,13 @@ export class StructStore { */ this.clients = new Map() /** - * Store incompleted struct reads here - * `i` denotes to the next read operation - * We could shift the array of refs instead, but shift is incredible - * slow in Chrome for arrays with more than 100k elements - * @see tryResumePendingStructRefs - * @type {Map}>} + * @type {null | { missing: Map, update: Uint8Array }} */ - this.pendingClientsStructRefs = new Map() + this.pendingStructs = null /** - * Stack of pending structs waiting for struct dependencies - * Maximum length of stack is structReaders.size - * @type {Array} + * @type {null | Uint8Array} */ - this.pendingStack = [] - /** - * @type {Array} - */ - this.pendingDeleteReaders = [] + this.pendingDs = null } } diff --git a/src/utils/encoding.js b/src/utils/encoding.js index 1318cd3d..3d43e579 100644 --- a/src/utils/encoding.js +++ b/src/utils/encoding.js @@ -33,6 +33,7 @@ import { DSEncoderV2, DSDecoderV1, DSEncoderV1, + mergeUpdates, Doc, Transaction, GC, Item, StructStore, ID // eslint-disable-line } from '../internals.js' @@ -40,6 +41,7 @@ import * as encoding from 'lib0/encoding.js' import * as decoding from 'lib0/decoding.js' import * as binary from 'lib0/binary.js' import * as map from 'lib0/map.js' +import { mergeUpdatesV2 } from './updates.js' /** * @param {UpdateEncoderV1 | UpdateEncoderV2} encoder @@ -98,14 +100,17 @@ export const writeClientsStructs = (encoder, store, _sm) => { /** * @param {UpdateDecoderV1 | UpdateDecoderV2} decoder The decoder object to read data from. - * @param {Map>} clientRefs * @param {Doc} doc - * @return {Map>} + * @return {Map>} * * @private * @function */ -export const readClientsStructRefs = (decoder, clientRefs, doc) => { +export const readClientsStructRefs = (decoder, doc) => { + /** + * @type {Map>} + */ + const clientRefs = map.create() const numOfStateUpdates = decoding.readVarUint(decoder.restDecoder) for (let i = 0; i < numOfStateUpdates; i++) { const numberOfStructs = decoding.readVarUint(decoder.restDecoder) @@ -199,17 +204,18 @@ export const readClientsStructRefs = (decoder, clientRefs, doc) => { * * @param {Transaction} transaction * @param {StructStore} store + * @param {Map { +const integrateStructs = (transaction, store, clientsStructRefs) => { const stack = store.pendingStack // @todo don't forget to append stackhead at the end - const clientsStructRefs = store.pendingClientsStructRefs // sort them so that we take the higher id first, in case of conflicts the lower id will probably not conflict with the id from the higher user. const clientsStructRefsIds = Array.from(clientsStructRefs.keys()).sort((a, b) => a - b) if (clientsStructRefsIds.length === 0) { - return + return null } const getNextStructTarget = () => { let nextStructsTarget = /** @type {{i:number,refs:Array}} */ (clientsStructRefs.get(clientsStructRefsIds[clientsStructRefsIds.length - 1])) @@ -301,21 +307,6 @@ const resumeStructIntegration = (transaction, store) => { store.pendingClientsStructRefs.clear() } -/** - * @param {Transaction} transaction - * @param {StructStore} store - * - * @private - * @function - */ -export const tryResumePendingDeleteReaders = (transaction, store) => { - const pendingReaders = store.pendingDeleteReaders - store.pendingDeleteReaders = [] - for (let i = 0; i < pendingReaders.length; i++) { - readAndApplyDeleteSet(pendingReaders[i], transaction, store) - } -} - /** * @param {UpdateEncoderV1 | UpdateEncoderV2} encoder * @param {Transaction} transaction @@ -325,46 +316,6 @@ export const tryResumePendingDeleteReaders = (transaction, store) => { */ export const writeStructsFromTransaction = (encoder, transaction) => writeClientsStructs(encoder, transaction.doc.store, transaction.beforeState) -/** - * @param {StructStore} store - * @param {Map>} clientsStructsRefs - * - * @private - * @function - */ -const mergeReadStructsIntoPendingReads = (store, clientsStructsRefs) => { - const pendingClientsStructRefs = store.pendingClientsStructRefs - clientsStructsRefs.forEach((structRefs, client) => { - const pendingStructRefs = pendingClientsStructRefs.get(client) - if (pendingStructRefs === undefined) { - pendingClientsStructRefs.set(client, { refs: structRefs, i: 0 }) - } else { - // merge into existing structRefs - const merged = pendingStructRefs.i > 0 ? pendingStructRefs.refs.slice(pendingStructRefs.i) : pendingStructRefs.refs - for (let i = 0; i < structRefs.length; i++) { - merged.push(structRefs[i]) - } - pendingStructRefs.i = 0 - pendingStructRefs.refs = merged.sort((r1, r2) => r1.id.clock - r2.id.clock) - } - }) -} - -/** - * @param {Map,i:number}>} pendingClientsStructRefs - */ -const cleanupPendingStructs = pendingClientsStructRefs => { - // cleanup pendingClientsStructs if not fully finished - pendingClientsStructRefs.forEach((refs, client) => { - if (refs.i === refs.refs.length) { - pendingClientsStructRefs.delete(client) - } else { - refs.refs.splice(0, refs.i) - refs.i = 0 - } - }) -} - /** * Read the next Item in a Decoder and fill this Item with the read data. * @@ -378,23 +329,67 @@ const cleanupPendingStructs = pendingClientsStructRefs => { * @function */ export const readStructs = (decoder, transaction, store) => { - const clientsStructRefs = new Map() + let retry = false + const doc = transaction.doc // let start = performance.now() - readClientsStructRefs(decoder, clientsStructRefs, transaction.doc) + const ss = readClientsStructRefs(decoder, doc) // console.log('time to read structs: ', performance.now() - start) // @todo remove // start = performance.now() - mergeReadStructsIntoPendingReads(store, clientsStructRefs) + // console.log('time to merge: ', performance.now() - start) // @todo remove // start = performance.now() - resumeStructIntegration(transaction, store) + const { missing, ssRest } = integrateStructs(transaction, store, ss) + const pending = store.pendingStructs + if (pending) { + // check if we can apply something + for (const [clock, client] of pending.missing) { + if (clock < getState(store, client)) { + retry = true + break + } + } + } + if (missing) { + if (pending) { + pending.missing.set(missing.client, missing.clock) + // @todo support v2 as well + pending.update = mergeUpdates([pending.update, ssRest]) + } else { + store.pendingStructs = { missing: map.create(), update: ssRest } + store.pendingStructs.missing.set(missing.client, missing.clock) + } + } + + // console.log('time to integrate: ', performance.now() - start) // @todo remove // start = performance.now() - cleanupPendingStructs(store.pendingClientsStructRefs) + const dsRest = readAndApplyDeleteSet(decoder, transaction, store) + if (store.pendingDs) { + // @todo we could make a lower-bound state-vector check as we do above + const dsRest2 = readAndApplyDeleteSet(store.pendingDs, transaction, store) + if (dsRest1 && dsRest2) { + // case 1: ds1 != null && ds2 != null + store.pendingDs = mergeUpdatesV2([dsRest, dsRest2]) + } else { + // case 2: ds1 != null + // case 3: ds2 != null + // case 4: ds1 == null && ds2 == null + store.pendingDs = dsRest || dsRest2 + } + } else { + // Either dsRest == null && pendingDs == null OR dsRest != null + store.pendingDs = dsRest + } // console.log('time to cleanup: ', performance.now() - start) // @todo remove // start = performance.now() - tryResumePendingDeleteReaders(transaction, store) + // console.log('time to resume delete readers: ', performance.now() - start) // @todo remove // start = performance.now() + if (retry) { + const update = store.pendingStructs.update + store.pendingStructs = null + applyUpdate(update) + } } /**