integration refactor with stackHead magic
This commit is contained in:
parent
3449687280
commit
6e8167fe51
@ -485,7 +485,7 @@ export class Item extends AbstractStruct {
|
|||||||
this.content.integrate(transaction, this)
|
this.content.integrate(transaction, this)
|
||||||
// add parent to transaction.changed
|
// add parent to transaction.changed
|
||||||
addChangedTypeToTransaction(transaction, /** @type {AbstractType<any>} */ (this.parent), this.parentSub)
|
addChangedTypeToTransaction(transaction, /** @type {AbstractType<any>} */ (this.parent), this.parentSub)
|
||||||
if ((/** @type {AbstractType<any>} */ (this.parent)._item !== null && /** @type {AbstractType<any>} */ (this.parent)._item.deleted) || (this.right !== null && this.parentSub !== null)) {
|
if ((/** @type {AbstractType<any>} */ (this.parent)._item !== null && /** @type {AbstractType<any>} */ (this.parent)._item.deleted) || (this.parentSub !== null && this.right !== null)) {
|
||||||
// delete if parent is deleted or if this is not the current attribute value of parent
|
// delete if parent is deleted or if this is not the current attribute value of parent
|
||||||
this.delete(transaction)
|
this.delete(transaction)
|
||||||
}
|
}
|
||||||
|
@ -39,6 +39,7 @@ import {
|
|||||||
import * as encoding from 'lib0/encoding.js'
|
import * as encoding from 'lib0/encoding.js'
|
||||||
import * as decoding from 'lib0/decoding.js'
|
import * as decoding from 'lib0/decoding.js'
|
||||||
import * as binary from 'lib0/binary.js'
|
import * as binary from 'lib0/binary.js'
|
||||||
|
import * as map from 'lib0/map.js'
|
||||||
|
|
||||||
export let DefaultDSEncoder = DSEncoderV1
|
export let DefaultDSEncoder = DSEncoderV1
|
||||||
export let DefaultDSDecoder = DSDecoderV1
|
export let DefaultDSDecoder = DSDecoderV1
|
||||||
@ -222,38 +223,55 @@ export const readClientsStructRefs = (decoder, clientRefs, doc) => {
|
|||||||
* @function
|
* @function
|
||||||
*/
|
*/
|
||||||
const resumeStructIntegration = (transaction, store) => {
|
const resumeStructIntegration = (transaction, store) => {
|
||||||
const stack = store.pendingStack
|
const stack = store.pendingStack // @todo don't forget to append stackhead at the end
|
||||||
const clientsStructRefs = store.pendingClientsStructRefs
|
const clientsStructRefs = store.pendingClientsStructRefs
|
||||||
// sort them so that we take the higher id first, in case of conflicts the lower id will probably not conflict with the id from the higher user.
|
// sort them so that we take the higher id first, in case of conflicts the lower id will probably not conflict with the id from the higher user.
|
||||||
const clientsStructRefsIds = Array.from(clientsStructRefs.keys()).sort((a, b) => a - b)
|
const clientsStructRefsIds = Array.from(clientsStructRefs.keys()).sort((a, b) => a - b)
|
||||||
let curStructsTarget = /** @type {{i:number,refs:Array<GC|Item>}} */ (clientsStructRefs.get(clientsStructRefsIds[clientsStructRefsIds.length - 1]))
|
if (clientsStructRefsIds.length === 0) {
|
||||||
// iterate over all struct readers until we are done
|
return
|
||||||
while (stack.length !== 0 || clientsStructRefsIds.length > 0) {
|
}
|
||||||
if (stack.length === 0) {
|
const getNextStructTarget = () => {
|
||||||
// take any first struct from clientsStructRefs and put it on the stack
|
let nextStructsTarget = /** @type {{i:number,refs:Array<GC|Item>}} */ (clientsStructRefs.get(clientsStructRefsIds[clientsStructRefsIds.length - 1]))
|
||||||
if (curStructsTarget.i < curStructsTarget.refs.length) {
|
while (nextStructsTarget.refs.length === nextStructsTarget.i) {
|
||||||
stack.push(curStructsTarget.refs[curStructsTarget.i++])
|
clientsStructRefsIds.pop()
|
||||||
|
if (clientsStructRefsIds.length > 0) {
|
||||||
|
nextStructsTarget = /** @type {{i:number,refs:Array<GC|Item>}} */ (clientsStructRefs.get(clientsStructRefsIds[clientsStructRefsIds.length - 1]))
|
||||||
} else {
|
} else {
|
||||||
clientsStructRefsIds.pop()
|
store.pendingClientsStructRefs.clear()
|
||||||
if (clientsStructRefsIds.length > 0) {
|
return null
|
||||||
curStructsTarget = /** @type {{i:number,refs:Array<GC|Item>}} */ (clientsStructRefs.get(clientsStructRefsIds[clientsStructRefsIds.length - 1]))
|
|
||||||
}
|
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
const ref = stack[stack.length - 1]
|
return nextStructsTarget
|
||||||
const localClock = getState(store, ref.id.client)
|
}
|
||||||
const offset = ref.id.clock < localClock ? localClock - ref.id.clock : 0
|
let curStructsTarget = getNextStructTarget()
|
||||||
if (ref.id.clock + offset !== localClock) {
|
if (curStructsTarget === null && stack.length === 0) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* @type {GC|Item}
|
||||||
|
*/
|
||||||
|
let stackHead = stack.length > 0
|
||||||
|
? /** @type {GC|Item} */ (stack.pop())
|
||||||
|
: /** @type {any} */ (curStructsTarget).refs[/** @type {any} */ (curStructsTarget).i++]
|
||||||
|
// caching the state because it is used very often
|
||||||
|
const state = new Map()
|
||||||
|
// iterate over all struct readers until we are done
|
||||||
|
while (true) {
|
||||||
|
const localClock = map.setIfUndefined(state, stackHead.id.client, () => getState(store, stackHead.id.client))
|
||||||
|
const offset = stackHead.id.clock < localClock ? localClock - stackHead.id.clock : 0
|
||||||
|
if (stackHead.id.clock + offset !== localClock) {
|
||||||
// A previous message from this client is missing
|
// A previous message from this client is missing
|
||||||
// check if there is a pending structRef with a smaller clock and switch them
|
// check if there is a pending structRef with a smaller clock and switch them
|
||||||
const structRefs = clientsStructRefs.get(ref.id.client) || { refs: [], i: 0 }
|
/**
|
||||||
|
* @type {{ refs: Array<GC|Item>, i: number }}
|
||||||
|
*/
|
||||||
|
const structRefs = clientsStructRefs.get(stackHead.id.client) || { refs: [], i: 0 }
|
||||||
if (structRefs.refs.length !== structRefs.i) {
|
if (structRefs.refs.length !== structRefs.i) {
|
||||||
const r = structRefs.refs[structRefs.i]
|
const r = structRefs.refs[structRefs.i]
|
||||||
if (r.id.clock < ref.id.clock) {
|
if (r.id.clock < stackHead.id.clock) {
|
||||||
// put ref with smaller clock on stack instead and continue
|
// put ref with smaller clock on stack instead and continue
|
||||||
structRefs.refs[structRefs.i] = ref
|
structRefs.refs[structRefs.i] = stackHead
|
||||||
stack[stack.length - 1] = r
|
stackHead = r
|
||||||
// sort the set because this approach might bring the list out of order
|
// sort the set because this approach might bring the list out of order
|
||||||
structRefs.refs = structRefs.refs.slice(structRefs.i).sort((r1, r2) => r1.id.clock - r2.id.clock)
|
structRefs.refs = structRefs.refs.slice(structRefs.i).sort((r1, r2) => r1.id.clock - r2.id.clock)
|
||||||
structRefs.i = 0
|
structRefs.i = 0
|
||||||
@ -261,22 +279,42 @@ const resumeStructIntegration = (transaction, store) => {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
// wait until missing struct is available
|
// wait until missing struct is available
|
||||||
|
stack.push(stackHead)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
const missing = ref.getMissing(transaction, store)
|
const missing = stackHead.getMissing(transaction, store)
|
||||||
if (missing === null) {
|
if (missing === null) {
|
||||||
if (offset < ref.length) {
|
if (offset === 0 || offset < stackHead.length) {
|
||||||
ref.integrate(transaction, offset)
|
stackHead.integrate(transaction, offset)
|
||||||
|
state.set(stackHead.id.client, stackHead.id.clock + stackHead.length)
|
||||||
|
}
|
||||||
|
// iterate to next stackHead
|
||||||
|
if (stack.length > 0) {
|
||||||
|
stackHead = /** @type {GC|Item} */ (stack.pop())
|
||||||
|
} else if (curStructsTarget !== null && curStructsTarget.i < curStructsTarget.refs.length) {
|
||||||
|
stackHead = /** @type {GC|Item} */ (curStructsTarget.refs[curStructsTarget.i++])
|
||||||
|
} else {
|
||||||
|
curStructsTarget = getNextStructTarget()
|
||||||
|
if (curStructsTarget === null) {
|
||||||
|
// we are done!
|
||||||
|
break
|
||||||
|
} else {
|
||||||
|
stackHead = /** @type {GC|Item} */ (curStructsTarget.refs[curStructsTarget.i++])
|
||||||
|
}
|
||||||
}
|
}
|
||||||
stack.pop()
|
|
||||||
} else {
|
} else {
|
||||||
// get the struct reader that has the missing struct
|
// get the struct reader that has the missing struct
|
||||||
|
/**
|
||||||
|
* @type {{ refs: Array<GC|Item>, i: number }}
|
||||||
|
*/
|
||||||
const structRefs = clientsStructRefs.get(missing) || { refs: [], i: 0 }
|
const structRefs = clientsStructRefs.get(missing) || { refs: [], i: 0 }
|
||||||
if (structRefs.refs.length === structRefs.i) {
|
if (structRefs.refs.length === structRefs.i) {
|
||||||
// This update message causally depends on another update message.
|
// This update message causally depends on another update message.
|
||||||
|
stack.push(stackHead)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
stack.push(structRefs.refs[structRefs.i++])
|
stack.push(stackHead)
|
||||||
|
stackHead = structRefs.refs[structRefs.i++]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
store.pendingClientsStructRefs.clear()
|
store.pendingClientsStructRefs.clear()
|
||||||
@ -360,22 +398,22 @@ const cleanupPendingStructs = pendingClientsStructRefs => {
|
|||||||
*/
|
*/
|
||||||
export const readStructs = (decoder, transaction, store) => {
|
export const readStructs = (decoder, transaction, store) => {
|
||||||
const clientsStructRefs = new Map()
|
const clientsStructRefs = new Map()
|
||||||
let start = performance.now()
|
// let start = performance.now()
|
||||||
readClientsStructRefs(decoder, clientsStructRefs, transaction.doc)
|
readClientsStructRefs(decoder, clientsStructRefs, transaction.doc)
|
||||||
console.log('time to read structs: ', performance.now() - start) // @todo remove
|
// console.log('time to read structs: ', performance.now() - start) // @todo remove
|
||||||
start = performance.now()
|
// start = performance.now()
|
||||||
mergeReadStructsIntoPendingReads(store, clientsStructRefs)
|
mergeReadStructsIntoPendingReads(store, clientsStructRefs)
|
||||||
console.log('time to merge: ', performance.now() - start) // @todo remove
|
// console.log('time to merge: ', performance.now() - start) // @todo remove
|
||||||
start = performance.now()
|
// start = performance.now()
|
||||||
resumeStructIntegration(transaction, store)
|
resumeStructIntegration(transaction, store)
|
||||||
console.log('time to integrate: ', performance.now() - start) // @todo remove
|
// console.log('time to integrate: ', performance.now() - start) // @todo remove
|
||||||
start = performance.now()
|
// start = performance.now()
|
||||||
cleanupPendingStructs(store.pendingClientsStructRefs)
|
cleanupPendingStructs(store.pendingClientsStructRefs)
|
||||||
console.log('time to cleanup: ', performance.now() - start) // @todo remove
|
// console.log('time to cleanup: ', performance.now() - start) // @todo remove
|
||||||
start = performance.now()
|
// start = performance.now()
|
||||||
tryResumePendingDeleteReaders(transaction, store)
|
tryResumePendingDeleteReaders(transaction, store)
|
||||||
console.log('time to resume delete readers: ', performance.now() - start) // @todo remove
|
// console.log('time to resume delete readers: ', performance.now() - start) // @todo remove
|
||||||
start = performance.now()
|
// start = performance.now()
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
Loading…
x
Reference in New Issue
Block a user