refactor read/write of structs
This commit is contained in:
@@ -1,6 +1,8 @@
|
||||
|
||||
import {
|
||||
findIndexSS,
|
||||
createID,
|
||||
getState,
|
||||
AbstractItem, StructStore, Transaction, ID // eslint-disable-line
|
||||
} from '../internals.js'
|
||||
|
||||
@@ -163,21 +165,24 @@ export const writeDeleteSet = (encoder, ds) => {
|
||||
|
||||
/**
|
||||
* @param {decoding.Decoder} decoder
|
||||
* @param {StructStore} store
|
||||
* @param {Transaction} transaction
|
||||
* @param {StructStore} store
|
||||
*/
|
||||
export const readDeleteSet = (decoder, store, transaction) => {
|
||||
export const readDeleteSet = (decoder, transaction, store) => {
|
||||
const unappliedDS = new DeleteSet()
|
||||
const numClients = decoding.readVarUint(decoder)
|
||||
for (let i = 0; i < numClients; i++) {
|
||||
const client = decoding.readVarUint(decoder)
|
||||
const numberOfDeletes = decoding.readVarUint(decoder)
|
||||
const structs = store.clients.get(client) || []
|
||||
const lastStruct = structs[structs.length - 1]
|
||||
const state = lastStruct.id.clock + lastStruct.length
|
||||
const state = getState(store, client)
|
||||
for (let i = 0; i < numberOfDeletes; i++) {
|
||||
const clock = decoding.readVarUint(decoder)
|
||||
const len = decoding.readVarUint(decoder)
|
||||
if (clock < state) {
|
||||
if (state < clock + len) {
|
||||
addToDeleteSet(unappliedDS, createID(client, state), clock + len - state)
|
||||
}
|
||||
let index = findIndexSS(structs, clock)
|
||||
/**
|
||||
* We can ignore the case of GC and Delete structs, because we are going to skip them
|
||||
@@ -206,7 +211,14 @@ export const readDeleteSet = (decoder, store, transaction) => {
|
||||
break
|
||||
}
|
||||
}
|
||||
} else {
|
||||
addToDeleteSet(unappliedDS, createID(client, state), len)
|
||||
}
|
||||
}
|
||||
}
|
||||
if (unappliedDS.clients.size > 0) {
|
||||
const unappliedDSEncoder = encoding.createEncoder()
|
||||
writeDeleteSet(unappliedDSEncoder, unappliedDS)
|
||||
store.pendingDeleteReaders.push(decoding.createDecoder(encoding.toBuffer(unappliedDSEncoder)))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
|
||||
import {
|
||||
Transaction, ID, ItemType, AbstractItem, AbstractStruct // eslint-disable-line
|
||||
AbstractRef, ID, ItemType, AbstractItem, AbstractStruct // eslint-disable-line
|
||||
} from '../internals.js'
|
||||
|
||||
import * as math from 'lib0/math.js'
|
||||
@@ -14,6 +14,16 @@ export class StructStore {
|
||||
* @type {Map<number,Array<AbstractStruct>>}
|
||||
*/
|
||||
this.clients = new Map()
|
||||
/**
|
||||
* Store uncompleted struct readers here
|
||||
* @see tryResumePendingReaders
|
||||
* @type {Set<{stack:Array<AbstractRef>,structReaders:Map<number,IterableIterator<AbstractRef>>,missing:ID}>}
|
||||
*/
|
||||
this.pendingStructReaders = new Set()
|
||||
/**
|
||||
* @type {Array<decoding.Decoder>}
|
||||
*/
|
||||
this.pendingDeleteReaders = []
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -2,8 +2,8 @@
|
||||
import {
|
||||
findIndexSS,
|
||||
exists,
|
||||
ItemBinaryRef,
|
||||
GCRef,
|
||||
ItemBinaryRef,
|
||||
ItemDeletedRef,
|
||||
ItemEmbedRef,
|
||||
ItemFormatRef,
|
||||
@@ -15,6 +15,9 @@ import {
|
||||
readID,
|
||||
getState,
|
||||
getStates,
|
||||
readDeleteSet,
|
||||
writeDeleteSet,
|
||||
createDeleteSetFromStructStore,
|
||||
Transaction, AbstractStruct, AbstractRef, StructStore, ID // eslint-disable-line
|
||||
} from '../internals.js'
|
||||
|
||||
@@ -28,9 +31,9 @@ import * as iterator from 'lib0/iterator.js'
|
||||
* @typedef {Map<number, number>} StateMap
|
||||
*/
|
||||
|
||||
const structRefs = [
|
||||
ItemBinaryRef,
|
||||
export const structRefs = [
|
||||
GCRef,
|
||||
ItemBinaryRef,
|
||||
ItemDeletedRef,
|
||||
ItemEmbedRef,
|
||||
ItemFormatRef,
|
||||
@@ -44,7 +47,7 @@ const structRefs = [
|
||||
* @param {number} structsLen
|
||||
* @param {ID} nextID
|
||||
* @param {number} localState next expected clock by nextID.client
|
||||
* @return {Iterator<AbstractRef>}
|
||||
* @return {IterableIterator<AbstractRef>}
|
||||
*/
|
||||
const createStructReaderIterator = (decoder, structsLen, nextID, localState) => iterator.createIterator(() => {
|
||||
let done = false
|
||||
@@ -83,6 +86,7 @@ export const writeStructs = (encoder, store, _sm) => {
|
||||
// we use it in readStructs to jump ahead to the end of the message
|
||||
encoding.writeUint32(encoder, 0)
|
||||
_sm.forEach((clock, client) => {
|
||||
// only write if new structs are available
|
||||
if (getState(store, client) > clock) {
|
||||
sm.set(client, clock)
|
||||
}
|
||||
@@ -125,30 +129,17 @@ export const writeStructs = (encoder, store, _sm) => {
|
||||
}
|
||||
|
||||
/**
|
||||
* Read the next Item in a Decoder and fill this Item with the read data.
|
||||
*
|
||||
* This is called when data is received from a remote peer.
|
||||
*
|
||||
* @param {decoding.Decoder} decoder The decoder object to read data from.
|
||||
* @param {Transaction} transaction
|
||||
* @param {StructStore} store
|
||||
*
|
||||
* @private
|
||||
* @param {Map<number,number>} localState
|
||||
* @return {Map<number,IterableIterator<AbstractRef>>}
|
||||
*/
|
||||
export const readStructs = (decoder, transaction, store) => {
|
||||
const readStructReaders = (decoder, localState) => {
|
||||
/**
|
||||
* @type {Map<number,Iterator<AbstractRef>>}
|
||||
* @type {Map<number,IterableIterator<AbstractRef>>}
|
||||
*/
|
||||
const structReaders = new Map()
|
||||
const endOfMessagePos = decoder.pos + decoding.readUint32(decoder)
|
||||
const clientbeforeState = decoding.readVarUint(decoder)
|
||||
/**
|
||||
* Stack of pending structs waiting for struct dependencies.
|
||||
* Maximum length of stack is structReaders.size.
|
||||
* @type {Array<AbstractRef>}
|
||||
*/
|
||||
const stack = []
|
||||
const localState = getStates(store)
|
||||
for (let i = 0; i < clientbeforeState; i++) {
|
||||
const nextID = readID(decoder)
|
||||
const decoderPos = decoder.pos + decoding.readUint32(decoder)
|
||||
@@ -160,29 +151,152 @@ export const readStructs = (decoder, transaction, store) => {
|
||||
// Jump ahead to end of message so that reading can continue.
|
||||
// We will use the created struct readers for the remaining part of this workflow.
|
||||
decoder.pos = endOfMessagePos
|
||||
for (const it of structReaders.values()) {
|
||||
// todo try for in of it
|
||||
for (let res = it.next(); !res.done; res = it.next()) {
|
||||
stack.push(res.value)
|
||||
while (stack.length > 0) {
|
||||
const ref = stack[stack.length - 1]
|
||||
const m = ref._missing
|
||||
while (m.length > 0) {
|
||||
const nextMissing = m[m.length - 1]
|
||||
if (!exists(store, nextMissing)) {
|
||||
// @ts-ignore must not be undefined, otherwise unexpected case
|
||||
stack.push(structReaders.get(nextMissing.client).next().value)
|
||||
break
|
||||
return structReaders
|
||||
}
|
||||
|
||||
/**
|
||||
* Resume computing structs generated by struct readers.
|
||||
*
|
||||
* While there is something to do, we integrate structs in this order
|
||||
* 1. top element on stack, if stack is not empty
|
||||
* 2. next element from current struct reader (if empty, use next struct reader)
|
||||
*
|
||||
* If struct causally depends on another struct (ref.missing), we put next reader of
|
||||
* `ref.id.client` on top of stack.
|
||||
*
|
||||
* At some point we find a struct that has no causal dependencies,
|
||||
* then we start emptying the stack.
|
||||
*
|
||||
* It is not possible to have circles: i.e. struct1 (from client1) depends on struct2 (from client2)
|
||||
* depends on struct3 (from client1). Therefore the max stack size is eqaul to `structReaders.length`.
|
||||
*
|
||||
* This method is implemented in a way so that we can resume computation if this update
|
||||
* causally depends on another update.
|
||||
*
|
||||
* @param {Transaction} transaction
|
||||
* @param {StructStore} store
|
||||
* @param {Map<number,number>} localState
|
||||
* @param {Map<number,IterableIterator<AbstractRef>>} structReaders
|
||||
* @param {Array<AbstractRef>} stack Stack of pending structs waiting for struct dependencies.
|
||||
* Maximum length of stack is structReaders.size.
|
||||
*
|
||||
* @todo reimplement without iterators - read everything in arrays instead
|
||||
*/
|
||||
const execStructReaders = (transaction, store, localState, structReaders, stack) => {
|
||||
// iterate over all struct readers until we are done
|
||||
const structReaderIterator = structReaders.values()
|
||||
let structReaderIteratorResult = structReaderIterator.next()
|
||||
while (stack.length !== 0 || !structReaderIteratorResult.done) {
|
||||
if (stack.length === 0) {
|
||||
// stack is empty. We know that there there are more structReaders to be processed
|
||||
const nextStructRes = structReaderIteratorResult.value.next()
|
||||
if (nextStructRes.done) {
|
||||
// current structReaderIteratorResult is empty, use next one
|
||||
structReaderIteratorResult = structReaderIterator.next()
|
||||
} else {
|
||||
stack.push(nextStructRes.value)
|
||||
}
|
||||
} else {
|
||||
const ref = stack[stack.length - 1]
|
||||
const m = ref._missing
|
||||
while (m.length > 0) {
|
||||
const missing = m[m.length - 1]
|
||||
if (!exists(store, missing)) {
|
||||
// get the struct reader that has the missing struct
|
||||
const reader = structReaders.get(missing.client)
|
||||
const nextRef = reader === undefined ? undefined : reader.next().value
|
||||
if (nextRef === undefined) {
|
||||
// This update message causally depends on another update message.
|
||||
// Store current stack and readers in StructStore and resume the computation at another time
|
||||
store.pendingStructReaders.add({ stack, structReaders, missing })
|
||||
return
|
||||
}
|
||||
ref._missing.pop()
|
||||
stack.push(nextRef)
|
||||
break
|
||||
}
|
||||
if (m.length === 0) {
|
||||
const localClock = (localState.get(ref.id.client) || 0)
|
||||
const offset = ref.id.clock < localClock ? localClock - ref.id.clock : 0
|
||||
ref.toStruct(transaction, offset).integrate(transaction)
|
||||
stack.pop()
|
||||
ref._missing.pop()
|
||||
}
|
||||
if (m.length === 0) {
|
||||
const localClock = (localState.get(ref.id.client) || 0)
|
||||
const offset = ref.id.clock < localClock ? localClock - ref.id.clock : 0
|
||||
if (offset < ref.length) {
|
||||
ref.toStruct(transaction.y, store, offset).integrate(transaction)
|
||||
}
|
||||
stack.pop()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Try to resume pending struct readers in `store.pendingReaders` while `pendingReaders.nextMissing`
|
||||
* exists.
|
||||
*
|
||||
* @param {Transaction} transaction
|
||||
* @param {StructStore} store
|
||||
*/
|
||||
const tryResumePendingStructReaders = (transaction, store) => {
|
||||
let resume = true
|
||||
const pendingReaders = store.pendingStructReaders
|
||||
while (resume) {
|
||||
resume = false
|
||||
for (const pendingReader of pendingReaders) {
|
||||
if (exists(store, pendingReader.missing)) {
|
||||
resume = true // found at least one more reader to execute
|
||||
pendingReaders.delete(pendingReader)
|
||||
execStructReaders(transaction, store, getStates(store), pendingReader.structReaders, pendingReader.stack)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {Transaction} transaction
|
||||
* @param {StructStore} store
|
||||
*/
|
||||
export const tryResumePendingDeleteReaders = (transaction, store) => {
|
||||
const pendingReaders = store.pendingDeleteReaders
|
||||
store.pendingDeleteReaders = []
|
||||
for (let i = 0; i < pendingReaders.length; i++) {
|
||||
readDeleteSet(pendingReaders[i], transaction, store)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Read the next Item in a Decoder and fill this Item with the read data.
|
||||
*
|
||||
* This is called when data is received from a remote peer.
|
||||
*
|
||||
* @param {decoding.Decoder} decoder The decoder object to read data from.
|
||||
* @param {Transaction} transaction
|
||||
* @param {StructStore} store
|
||||
*
|
||||
* @private
|
||||
*/
|
||||
export const readStructs = (decoder, transaction, store) => {
|
||||
const localState = getStates(store)
|
||||
const readers = readStructReaders(decoder, localState)
|
||||
execStructReaders(transaction, store, localState, readers, [])
|
||||
tryResumePendingStructReaders(transaction, store)
|
||||
tryResumePendingDeleteReaders(transaction, store)
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {decoding.Decoder} decoder
|
||||
* @param {Transaction} transaction
|
||||
* @param {StructStore} store
|
||||
*/
|
||||
export const readModel = (decoder, transaction, store) => {
|
||||
readStructs(decoder, transaction, store)
|
||||
readDeleteSet(decoder, transaction, store)
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {encoding.Encoder} encoder
|
||||
* @param {StructStore} store
|
||||
* @param {Map<number,number>} [targetState] The state of the target that receives the update. Leave empty to write all known structs
|
||||
*/
|
||||
export const writeModel = (encoder, store, targetState = new Map()) => {
|
||||
writeStructs(encoder, store, targetState)
|
||||
writeDeleteSet(encoder, createDeleteSetFromStructStore(store))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user