concept for improved implementation of pending updates

This commit is contained in:
Kevin Jahns 2021-01-17 15:22:36 +01:00
parent dbd1b3cb59
commit 304812fb07
3 changed files with 69 additions and 83 deletions

View File

@ -80,5 +80,7 @@ export {
parseUpdateMeta,
parseUpdateMetaV2,
encodeStateVectorFromUpdate,
encodeStateVectorFromUpdateV2
encodeStateVectorFromUpdateV2,
encodeRelativePosition,
decodeRelativePosition
} from './internals.js'

View File

@ -15,24 +15,13 @@ export class StructStore {
*/
this.clients = new Map()
/**
* Store incompleted struct reads here
* `i` denotes to the next read operation
* We could shift the array of refs instead, but shift is incredible
* slow in Chrome for arrays with more than 100k elements
* @see tryResumePendingStructRefs
* @type {Map<number,{i:number,refs:Array<GC|Item>}>}
* @type {null | { missing: Map<number, number>, update: Uint8Array }}
*/
this.pendingClientsStructRefs = new Map()
this.pendingStructs = null
/**
* Stack of pending structs waiting for struct dependencies
* Maximum length of stack is structReaders.size
* @type {Array<GC|Item>}
* @type {null | Uint8Array}
*/
this.pendingStack = []
/**
* @type {Array<DSDecoderV2>}
*/
this.pendingDeleteReaders = []
this.pendingDs = null
}
}

View File

@ -33,6 +33,7 @@ import {
DSEncoderV2,
DSDecoderV1,
DSEncoderV1,
mergeUpdates,
Doc, Transaction, GC, Item, StructStore, ID // eslint-disable-line
} from '../internals.js'
@ -40,6 +41,7 @@ import * as encoding from 'lib0/encoding.js'
import * as decoding from 'lib0/decoding.js'
import * as binary from 'lib0/binary.js'
import * as map from 'lib0/map.js'
import { mergeUpdatesV2 } from './updates.js'
/**
* @param {UpdateEncoderV1 | UpdateEncoderV2} encoder
@ -98,14 +100,17 @@ export const writeClientsStructs = (encoder, store, _sm) => {
/**
* @param {UpdateDecoderV1 | UpdateDecoderV2} decoder The decoder object to read data from.
* @param {Map<number,Array<GC|Item>>} clientRefs
* @param {Doc} doc
* @return {Map<number,Array<GC|Item>>}
* @return {Map<number, Array<Item | GC>>}
*
* @private
* @function
*/
export const readClientsStructRefs = (decoder, clientRefs, doc) => {
export const readClientsStructRefs = (decoder, doc) => {
/**
* @type {Map<number, Array<Item | GC>>}
*/
const clientRefs = map.create()
const numOfStateUpdates = decoding.readVarUint(decoder.restDecoder)
for (let i = 0; i < numOfStateUpdates; i++) {
const numberOfStructs = decoding.readVarUint(decoder.restDecoder)
@ -199,17 +204,18 @@ export const readClientsStructRefs = (decoder, clientRefs, doc) => {
*
* @param {Transaction} transaction
* @param {StructStore} store
* @param {Map<number, { i: number, refs: (GC | Item)[] }} clientsStructRefs
* @return { null | { restStructs: Uint8Array, missing: { client: number, clock: number } } }
*
* @private
* @function
*/
const resumeStructIntegration = (transaction, store) => {
const integrateStructs = (transaction, store, clientsStructRefs) => {
const stack = store.pendingStack // @todo don't forget to append stackhead at the end
const clientsStructRefs = store.pendingClientsStructRefs
// sort them so that we take the higher id first, in case of conflicts the lower id will probably not conflict with the id from the higher user.
const clientsStructRefsIds = Array.from(clientsStructRefs.keys()).sort((a, b) => a - b)
if (clientsStructRefsIds.length === 0) {
return
return null
}
const getNextStructTarget = () => {
let nextStructsTarget = /** @type {{i:number,refs:Array<GC|Item>}} */ (clientsStructRefs.get(clientsStructRefsIds[clientsStructRefsIds.length - 1]))
@ -301,21 +307,6 @@ const resumeStructIntegration = (transaction, store) => {
store.pendingClientsStructRefs.clear()
}
/**
* @param {Transaction} transaction
* @param {StructStore} store
*
* @private
* @function
*/
export const tryResumePendingDeleteReaders = (transaction, store) => {
const pendingReaders = store.pendingDeleteReaders
store.pendingDeleteReaders = []
for (let i = 0; i < pendingReaders.length; i++) {
readAndApplyDeleteSet(pendingReaders[i], transaction, store)
}
}
/**
* @param {UpdateEncoderV1 | UpdateEncoderV2} encoder
* @param {Transaction} transaction
@ -325,46 +316,6 @@ export const tryResumePendingDeleteReaders = (transaction, store) => {
*/
export const writeStructsFromTransaction = (encoder, transaction) => writeClientsStructs(encoder, transaction.doc.store, transaction.beforeState)
/**
* @param {StructStore} store
* @param {Map<number, Array<GC|Item>>} clientsStructsRefs
*
* @private
* @function
*/
const mergeReadStructsIntoPendingReads = (store, clientsStructsRefs) => {
const pendingClientsStructRefs = store.pendingClientsStructRefs
clientsStructsRefs.forEach((structRefs, client) => {
const pendingStructRefs = pendingClientsStructRefs.get(client)
if (pendingStructRefs === undefined) {
pendingClientsStructRefs.set(client, { refs: structRefs, i: 0 })
} else {
// merge into existing structRefs
const merged = pendingStructRefs.i > 0 ? pendingStructRefs.refs.slice(pendingStructRefs.i) : pendingStructRefs.refs
for (let i = 0; i < structRefs.length; i++) {
merged.push(structRefs[i])
}
pendingStructRefs.i = 0
pendingStructRefs.refs = merged.sort((r1, r2) => r1.id.clock - r2.id.clock)
}
})
}
/**
* @param {Map<number,{refs:Array<GC|Item>,i:number}>} pendingClientsStructRefs
*/
const cleanupPendingStructs = pendingClientsStructRefs => {
// cleanup pendingClientsStructs if not fully finished
pendingClientsStructRefs.forEach((refs, client) => {
if (refs.i === refs.refs.length) {
pendingClientsStructRefs.delete(client)
} else {
refs.refs.splice(0, refs.i)
refs.i = 0
}
})
}
/**
* Read the next Item in a Decoder and fill this Item with the read data.
*
@ -378,23 +329,67 @@ const cleanupPendingStructs = pendingClientsStructRefs => {
* @function
*/
export const readStructs = (decoder, transaction, store) => {
const clientsStructRefs = new Map()
let retry = false
const doc = transaction.doc
// let start = performance.now()
readClientsStructRefs(decoder, clientsStructRefs, transaction.doc)
const ss = readClientsStructRefs(decoder, doc)
// console.log('time to read structs: ', performance.now() - start) // @todo remove
// start = performance.now()
mergeReadStructsIntoPendingReads(store, clientsStructRefs)
// console.log('time to merge: ', performance.now() - start) // @todo remove
// start = performance.now()
resumeStructIntegration(transaction, store)
const { missing, ssRest } = integrateStructs(transaction, store, ss)
const pending = store.pendingStructs
if (pending) {
// check if we can apply something
for (const [clock, client] of pending.missing) {
if (clock < getState(store, client)) {
retry = true
break
}
}
}
if (missing) {
if (pending) {
pending.missing.set(missing.client, missing.clock)
// @todo support v2 as well
pending.update = mergeUpdates([pending.update, ssRest])
} else {
store.pendingStructs = { missing: map.create(), update: ssRest }
store.pendingStructs.missing.set(missing.client, missing.clock)
}
}
// console.log('time to integrate: ', performance.now() - start) // @todo remove
// start = performance.now()
cleanupPendingStructs(store.pendingClientsStructRefs)
const dsRest = readAndApplyDeleteSet(decoder, transaction, store)
if (store.pendingDs) {
// @todo we could make a lower-bound state-vector check as we do above
const dsRest2 = readAndApplyDeleteSet(store.pendingDs, transaction, store)
if (dsRest1 && dsRest2) {
// case 1: ds1 != null && ds2 != null
store.pendingDs = mergeUpdatesV2([dsRest, dsRest2])
} else {
// case 2: ds1 != null
// case 3: ds2 != null
// case 4: ds1 == null && ds2 == null
store.pendingDs = dsRest || dsRest2
}
} else {
// Either dsRest == null && pendingDs == null OR dsRest != null
store.pendingDs = dsRest
}
// console.log('time to cleanup: ', performance.now() - start) // @todo remove
// start = performance.now()
tryResumePendingDeleteReaders(transaction, store)
// console.log('time to resume delete readers: ', performance.now() - start) // @todo remove
// start = performance.now()
if (retry) {
const update = store.pendingStructs.update
store.pendingStructs = null
applyUpdate(update)
}
}
/**