complete refactor of update mechanism to allow encoding of pending updates - #263
This commit is contained in:
		
							parent
							
								
									304812fb07
								
							
						
					
					
						commit
						7edbb2485f
					
				@ -4,6 +4,7 @@ import {
 | 
				
			|||||||
  getState,
 | 
					  getState,
 | 
				
			||||||
  splitItem,
 | 
					  splitItem,
 | 
				
			||||||
  iterateStructs,
 | 
					  iterateStructs,
 | 
				
			||||||
 | 
					  UpdateEncoderV2,
 | 
				
			||||||
  DSDecoderV1, DSEncoderV1, DSDecoderV2, DSEncoderV2, Item, GC, StructStore, Transaction, ID // eslint-disable-line
 | 
					  DSDecoderV1, DSEncoderV1, DSDecoderV2, DSEncoderV2, Item, GC, StructStore, Transaction, ID // eslint-disable-line
 | 
				
			||||||
} from '../internals.js'
 | 
					} from '../internals.js'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -263,6 +264,7 @@ export const readDeleteSet = decoder => {
 | 
				
			|||||||
 * @param {DSDecoderV1 | DSDecoderV2} decoder
 | 
					 * @param {DSDecoderV1 | DSDecoderV2} decoder
 | 
				
			||||||
 * @param {Transaction} transaction
 | 
					 * @param {Transaction} transaction
 | 
				
			||||||
 * @param {StructStore} store
 | 
					 * @param {StructStore} store
 | 
				
			||||||
 | 
					 * @return {Uint8Array|null} Returns a v2 update containing all deletes that couldn't be applied yet; or null if all deletes were applied successfully.
 | 
				
			||||||
 *
 | 
					 *
 | 
				
			||||||
 * @private
 | 
					 * @private
 | 
				
			||||||
 * @function
 | 
					 * @function
 | 
				
			||||||
@ -315,9 +317,10 @@ export const readAndApplyDeleteSet = (decoder, transaction, store) => {
 | 
				
			|||||||
    }
 | 
					    }
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
  if (unappliedDS.clients.size > 0) {
 | 
					  if (unappliedDS.clients.size > 0) {
 | 
				
			||||||
    // TODO: no need for encoding+decoding ds anymore
 | 
					    const ds = new UpdateEncoderV2()
 | 
				
			||||||
    const unappliedDSEncoder = new DSEncoderV2()
 | 
					    encoding.writeVarUint(ds.restEncoder, 0) // encode 0 structs
 | 
				
			||||||
    writeDeleteSet(unappliedDSEncoder, unappliedDS)
 | 
					    writeDeleteSet(ds, unappliedDS)
 | 
				
			||||||
    store.pendingDeleteReaders.push(new DSDecoderV2(decoding.createDecoder((unappliedDSEncoder.toUint8Array()))))
 | 
					    return ds.toUint8Array()
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
 | 
					  return null
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
				
			|||||||
@ -33,15 +33,17 @@ import {
 | 
				
			|||||||
  DSEncoderV2,
 | 
					  DSEncoderV2,
 | 
				
			||||||
  DSDecoderV1,
 | 
					  DSDecoderV1,
 | 
				
			||||||
  DSEncoderV1,
 | 
					  DSEncoderV1,
 | 
				
			||||||
  mergeUpdates,
 | 
					  mergeUpdatesV2,
 | 
				
			||||||
  Doc, Transaction, GC, Item, StructStore, ID // eslint-disable-line
 | 
					  Skip,
 | 
				
			||||||
 | 
					  diffUpdate,
 | 
				
			||||||
 | 
					  Doc, Transaction, GC, Item, StructStore // eslint-disable-line
 | 
				
			||||||
} from '../internals.js'
 | 
					} from '../internals.js'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
import * as encoding from 'lib0/encoding.js'
 | 
					import * as encoding from 'lib0/encoding.js'
 | 
				
			||||||
import * as decoding from 'lib0/decoding.js'
 | 
					import * as decoding from 'lib0/decoding.js'
 | 
				
			||||||
import * as binary from 'lib0/binary.js'
 | 
					import * as binary from 'lib0/binary.js'
 | 
				
			||||||
import * as map from 'lib0/map.js'
 | 
					import * as map from 'lib0/map.js'
 | 
				
			||||||
import { mergeUpdatesV2 } from './updates.js'
 | 
					import * as math from 'lib0/math.js'
 | 
				
			||||||
 | 
					
 | 
				
			||||||
/**
 | 
					/**
 | 
				
			||||||
 * @param {UpdateEncoderV1 | UpdateEncoderV2} encoder
 | 
					 * @param {UpdateEncoderV1 | UpdateEncoderV2} encoder
 | 
				
			||||||
@ -53,6 +55,7 @@ import { mergeUpdatesV2 } from './updates.js'
 | 
				
			|||||||
 */
 | 
					 */
 | 
				
			||||||
const writeStructs = (encoder, structs, client, clock) => {
 | 
					const writeStructs = (encoder, structs, client, clock) => {
 | 
				
			||||||
  // write first id
 | 
					  // write first id
 | 
				
			||||||
 | 
					  clock = math.max(clock, structs[0].id.clock) // make sure the first id exists
 | 
				
			||||||
  const startNewStructs = findIndexSS(structs, clock)
 | 
					  const startNewStructs = findIndexSS(structs, clock)
 | 
				
			||||||
  // write # encoded structs
 | 
					  // write # encoded structs
 | 
				
			||||||
  encoding.writeVarUint(encoder.restEncoder, structs.length - startNewStructs)
 | 
					  encoding.writeVarUint(encoder.restEncoder, structs.length - startNewStructs)
 | 
				
			||||||
@ -101,14 +104,14 @@ export const writeClientsStructs = (encoder, store, _sm) => {
 | 
				
			|||||||
/**
 | 
					/**
 | 
				
			||||||
 * @param {UpdateDecoderV1 | UpdateDecoderV2} decoder The decoder object to read data from.
 | 
					 * @param {UpdateDecoderV1 | UpdateDecoderV2} decoder The decoder object to read data from.
 | 
				
			||||||
 * @param {Doc} doc
 | 
					 * @param {Doc} doc
 | 
				
			||||||
 * @return {Map<number, Array<Item | GC>>}
 | 
					 * @return {Map<number, { i: number, refs: Array<Item | GC> }>}
 | 
				
			||||||
 *
 | 
					 *
 | 
				
			||||||
 * @private
 | 
					 * @private
 | 
				
			||||||
 * @function
 | 
					 * @function
 | 
				
			||||||
 */
 | 
					 */
 | 
				
			||||||
export const readClientsStructRefs = (decoder, doc) => {
 | 
					export const readClientsStructRefs = (decoder, doc) => {
 | 
				
			||||||
  /**
 | 
					  /**
 | 
				
			||||||
   * @type {Map<number, Array<Item | GC>>}
 | 
					   * @type {Map<number, { i: number, refs: Array<Item | GC> }>}
 | 
				
			||||||
   */
 | 
					   */
 | 
				
			||||||
  const clientRefs = map.create()
 | 
					  const clientRefs = map.create()
 | 
				
			||||||
  const numOfStateUpdates = decoding.readVarUint(decoder.restDecoder)
 | 
					  const numOfStateUpdates = decoding.readVarUint(decoder.restDecoder)
 | 
				
			||||||
@ -121,10 +124,24 @@ export const readClientsStructRefs = (decoder, doc) => {
 | 
				
			|||||||
    const client = decoder.readClient()
 | 
					    const client = decoder.readClient()
 | 
				
			||||||
    let clock = decoding.readVarUint(decoder.restDecoder)
 | 
					    let clock = decoding.readVarUint(decoder.restDecoder)
 | 
				
			||||||
    // const start = performance.now()
 | 
					    // const start = performance.now()
 | 
				
			||||||
    clientRefs.set(client, refs)
 | 
					    clientRefs.set(client, { i: 0, refs })
 | 
				
			||||||
    for (let i = 0; i < numberOfStructs; i++) {
 | 
					    for (let i = 0; i < numberOfStructs; i++) {
 | 
				
			||||||
      const info = decoder.readInfo()
 | 
					      const info = decoder.readInfo()
 | 
				
			||||||
      if ((binary.BITS5 & info) !== 0) {
 | 
					      switch (binary.BITS5 & info) {
 | 
				
			||||||
 | 
					        case 0: { // GC
 | 
				
			||||||
 | 
					          const len = decoder.readLen()
 | 
				
			||||||
 | 
					          refs[i] = new GC(createID(client, clock), len)
 | 
				
			||||||
 | 
					          clock += len
 | 
				
			||||||
 | 
					          break
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					        case 10: { // Skip Struct (nothing to apply)
 | 
				
			||||||
 | 
					          // @todo we could reduce the amount of checks by adding Skip struct to clientRefs so we know that something is missing.
 | 
				
			||||||
 | 
					          const len = decoder.readLen()
 | 
				
			||||||
 | 
					          refs[i] = new Skip(createID(client, clock), len)
 | 
				
			||||||
 | 
					          clock += len
 | 
				
			||||||
 | 
					          break
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					        default: { // Item with content
 | 
				
			||||||
          /**
 | 
					          /**
 | 
				
			||||||
           * The optimized implementation doesn't use any variables because inlining variables is faster.
 | 
					           * The optimized implementation doesn't use any variables because inlining variables is faster.
 | 
				
			||||||
           * Below a non-optimized version is shown that implements the basic algorithm with
 | 
					           * Below a non-optimized version is shown that implements the basic algorithm with
 | 
				
			||||||
@ -172,10 +189,7 @@ export const readClientsStructRefs = (decoder, doc) => {
 | 
				
			|||||||
          */
 | 
					          */
 | 
				
			||||||
          refs[i] = struct
 | 
					          refs[i] = struct
 | 
				
			||||||
          clock += struct.length
 | 
					          clock += struct.length
 | 
				
			||||||
      } else {
 | 
					        }
 | 
				
			||||||
        const len = decoder.readLen()
 | 
					 | 
				
			||||||
        refs[i] = new GC(createID(client, clock), len)
 | 
					 | 
				
			||||||
        clock += len
 | 
					 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
    // console.log('time to read: ', performance.now() - start) // @todo remove
 | 
					    // console.log('time to read: ', performance.now() - start) // @todo remove
 | 
				
			||||||
@ -204,27 +218,32 @@ export const readClientsStructRefs = (decoder, doc) => {
 | 
				
			|||||||
 *
 | 
					 *
 | 
				
			||||||
 * @param {Transaction} transaction
 | 
					 * @param {Transaction} transaction
 | 
				
			||||||
 * @param {StructStore} store
 | 
					 * @param {StructStore} store
 | 
				
			||||||
 * @param {Map<number, { i: number, refs: (GC | Item)[] }} clientsStructRefs
 | 
					 * @param {Map<number, { i: number, refs: (GC | Item)[] }>} clientsStructRefs
 | 
				
			||||||
 * @return { null | { restStructs: Uint8Array, missing: { client: number, clock: number } } }
 | 
					 * @return { null | { update: Uint8Array, missing: Map<number,number> } }
 | 
				
			||||||
 *
 | 
					 *
 | 
				
			||||||
 * @private
 | 
					 * @private
 | 
				
			||||||
 * @function
 | 
					 * @function
 | 
				
			||||||
 */
 | 
					 */
 | 
				
			||||||
const integrateStructs = (transaction, store, clientsStructRefs) => {
 | 
					const integrateStructs = (transaction, store, clientsStructRefs) => {
 | 
				
			||||||
  const stack = store.pendingStack // @todo don't forget to append stackhead at the end
 | 
					  /**
 | 
				
			||||||
 | 
					   * @type {Array<Item | GC>}
 | 
				
			||||||
 | 
					   */
 | 
				
			||||||
 | 
					  const stack = []
 | 
				
			||||||
  // sort them so that we take the higher id first, in case of conflicts the lower id will probably not conflict with the id from the higher user.
 | 
					  // sort them so that we take the higher id first, in case of conflicts the lower id will probably not conflict with the id from the higher user.
 | 
				
			||||||
  const clientsStructRefsIds = Array.from(clientsStructRefs.keys()).sort((a, b) => a - b)
 | 
					  let clientsStructRefsIds = Array.from(clientsStructRefs.keys()).sort((a, b) => a - b)
 | 
				
			||||||
  if (clientsStructRefsIds.length === 0) {
 | 
					  if (clientsStructRefsIds.length === 0) {
 | 
				
			||||||
    return null
 | 
					    return null
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
  const getNextStructTarget = () => {
 | 
					  const getNextStructTarget = () => {
 | 
				
			||||||
 | 
					    if (clientsStructRefsIds.length === 0) {
 | 
				
			||||||
 | 
					      return null
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
    let nextStructsTarget = /** @type {{i:number,refs:Array<GC|Item>}} */ (clientsStructRefs.get(clientsStructRefsIds[clientsStructRefsIds.length - 1]))
 | 
					    let nextStructsTarget = /** @type {{i:number,refs:Array<GC|Item>}} */ (clientsStructRefs.get(clientsStructRefsIds[clientsStructRefsIds.length - 1]))
 | 
				
			||||||
    while (nextStructsTarget.refs.length === nextStructsTarget.i) {
 | 
					    while (nextStructsTarget.refs.length === nextStructsTarget.i) {
 | 
				
			||||||
      clientsStructRefsIds.pop()
 | 
					      clientsStructRefsIds.pop()
 | 
				
			||||||
      if (clientsStructRefsIds.length > 0) {
 | 
					      if (clientsStructRefsIds.length > 0) {
 | 
				
			||||||
        nextStructsTarget = /** @type {{i:number,refs:Array<GC|Item>}} */ (clientsStructRefs.get(clientsStructRefsIds[clientsStructRefsIds.length - 1]))
 | 
					        nextStructsTarget = /** @type {{i:number,refs:Array<GC|Item>}} */ (clientsStructRefs.get(clientsStructRefsIds[clientsStructRefsIds.length - 1]))
 | 
				
			||||||
      } else {
 | 
					      } else {
 | 
				
			||||||
        store.pendingClientsStructRefs.clear()
 | 
					 | 
				
			||||||
        return null
 | 
					        return null
 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
@ -232,49 +251,87 @@ const integrateStructs = (transaction, store, clientsStructRefs) => {
 | 
				
			|||||||
  }
 | 
					  }
 | 
				
			||||||
  let curStructsTarget = getNextStructTarget()
 | 
					  let curStructsTarget = getNextStructTarget()
 | 
				
			||||||
  if (curStructsTarget === null && stack.length === 0) {
 | 
					  if (curStructsTarget === null && stack.length === 0) {
 | 
				
			||||||
    return
 | 
					    return null
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  /**
 | 
				
			||||||
 | 
					   * @type {StructStore}
 | 
				
			||||||
 | 
					   */
 | 
				
			||||||
 | 
					  const restStructs = new StructStore()
 | 
				
			||||||
 | 
					  const missingSV = new Map()
 | 
				
			||||||
 | 
					  /**
 | 
				
			||||||
 | 
					   * @param {number} client
 | 
				
			||||||
 | 
					   * @param {number} clock
 | 
				
			||||||
 | 
					   */
 | 
				
			||||||
 | 
					  const updateMissingSv = (client, clock) => {
 | 
				
			||||||
 | 
					    const mclock = missingSV.get(client)
 | 
				
			||||||
 | 
					    if (mclock == null || mclock > clock) {
 | 
				
			||||||
 | 
					      missingSV.set(client, clock)
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
  /**
 | 
					  /**
 | 
				
			||||||
   * @type {GC|Item}
 | 
					   * @type {GC|Item}
 | 
				
			||||||
   */
 | 
					   */
 | 
				
			||||||
  let stackHead = stack.length > 0
 | 
					  let stackHead = /** @type {any} */ (curStructsTarget).refs[/** @type {any} */ (curStructsTarget).i++]
 | 
				
			||||||
    ? /** @type {GC|Item} */ (stack.pop())
 | 
					 | 
				
			||||||
    : /** @type {any} */ (curStructsTarget).refs[/** @type {any} */ (curStructsTarget).i++]
 | 
					 | 
				
			||||||
  // caching the state because it is used very often
 | 
					  // caching the state because it is used very often
 | 
				
			||||||
  const state = new Map()
 | 
					  const state = new Map()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					  const addStackToRestSS = () => {
 | 
				
			||||||
 | 
					    for (const item of stack) {
 | 
				
			||||||
 | 
					      const client = item.id.client
 | 
				
			||||||
 | 
					      const unapplicableItems = clientsStructRefs.get(client)
 | 
				
			||||||
 | 
					      if (unapplicableItems) {
 | 
				
			||||||
 | 
					        // decrement because we weren't able to apply previous operation
 | 
				
			||||||
 | 
					        unapplicableItems.i--
 | 
				
			||||||
 | 
					        restStructs.clients.set(client, unapplicableItems.refs.slice(unapplicableItems.i))
 | 
				
			||||||
 | 
					        clientsStructRefs.delete(client)
 | 
				
			||||||
 | 
					        unapplicableItems.i = 0
 | 
				
			||||||
 | 
					        unapplicableItems.refs = []
 | 
				
			||||||
 | 
					      } else {
 | 
				
			||||||
 | 
					        // item was the last item on clientsStructRefs and the field was already cleared. Add item to restStructs and continue
 | 
				
			||||||
 | 
					        restStructs.clients.set(client, [item])
 | 
				
			||||||
 | 
					      }
 | 
				
			||||||
 | 
					      // remove client from clientsStructRefsIds to prevent users from applying the same update again
 | 
				
			||||||
 | 
					      clientsStructRefsIds = clientsStructRefsIds.filter(c => c !== client)
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					    stack.length = 0
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  // iterate over all struct readers until we are done
 | 
					  // iterate over all struct readers until we are done
 | 
				
			||||||
  while (true) {
 | 
					  while (true) {
 | 
				
			||||||
 | 
					    if (stackHead.constructor !== Skip) {
 | 
				
			||||||
      const localClock = map.setIfUndefined(state, stackHead.id.client, () => getState(store, stackHead.id.client))
 | 
					      const localClock = map.setIfUndefined(state, stackHead.id.client, () => getState(store, stackHead.id.client))
 | 
				
			||||||
    const offset = stackHead.id.clock < localClock ? localClock - stackHead.id.clock : 0
 | 
					      const offset = localClock - stackHead.id.clock
 | 
				
			||||||
    if (stackHead.id.clock + offset !== localClock) {
 | 
					      if (offset < 0) {
 | 
				
			||||||
      // A previous message from this client is missing
 | 
					        // update from the same client is missing
 | 
				
			||||||
      // check if there is a pending structRef with a smaller clock and switch them
 | 
					        stack.push(stackHead)
 | 
				
			||||||
 | 
					        updateMissingSv(stackHead.id.client, stackHead.id.clock - 1)
 | 
				
			||||||
 | 
					        // hid a dead wall, add all items from stack to restSS
 | 
				
			||||||
 | 
					        addStackToRestSS()
 | 
				
			||||||
 | 
					      } else {
 | 
				
			||||||
 | 
					        const missing = stackHead.getMissing(transaction, store)
 | 
				
			||||||
 | 
					        if (missing !== null) {
 | 
				
			||||||
 | 
					          stack.push(stackHead)
 | 
				
			||||||
 | 
					          // get the struct reader that has the missing struct
 | 
				
			||||||
          /**
 | 
					          /**
 | 
				
			||||||
           * @type {{ refs: Array<GC|Item>, i: number }}
 | 
					           * @type {{ refs: Array<GC|Item>, i: number }}
 | 
				
			||||||
           */
 | 
					           */
 | 
				
			||||||
      const structRefs = clientsStructRefs.get(stackHead.id.client) || { refs: [], i: 0 }
 | 
					          const structRefs = clientsStructRefs.get(/** @type {number} */ (missing)) || { refs: [], i: 0 }
 | 
				
			||||||
      if (structRefs.refs.length !== structRefs.i) {
 | 
					          if (structRefs.refs.length === structRefs.i) {
 | 
				
			||||||
        const r = structRefs.refs[structRefs.i]
 | 
					            // This update message causally depends on another update message that doesn't exist yet
 | 
				
			||||||
        if (r.id.clock < stackHead.id.clock) {
 | 
					            updateMissingSv(/** @type {number} */ (missing), getState(store, missing))
 | 
				
			||||||
          // put ref with smaller clock on stack instead and continue
 | 
					            addStackToRestSS()
 | 
				
			||||||
          structRefs.refs[structRefs.i] = stackHead
 | 
					          } else {
 | 
				
			||||||
          stackHead = r
 | 
					            stackHead = structRefs.refs[structRefs.i++]
 | 
				
			||||||
          // sort the set because this approach might bring the list out of order
 | 
					 | 
				
			||||||
          structRefs.refs = structRefs.refs.slice(structRefs.i).sort((r1, r2) => r1.id.clock - r2.id.clock)
 | 
					 | 
				
			||||||
          structRefs.i = 0
 | 
					 | 
				
			||||||
            continue
 | 
					            continue
 | 
				
			||||||
          }
 | 
					          }
 | 
				
			||||||
      }
 | 
					        } else if (offset === 0 || offset < stackHead.length) {
 | 
				
			||||||
      // wait until missing struct is available
 | 
					          // all fine, apply the stackhead
 | 
				
			||||||
      stack.push(stackHead)
 | 
					 | 
				
			||||||
      return
 | 
					 | 
				
			||||||
    }
 | 
					 | 
				
			||||||
    const missing = stackHead.getMissing(transaction, store)
 | 
					 | 
				
			||||||
    if (missing === null) {
 | 
					 | 
				
			||||||
      if (offset === 0 || offset < stackHead.length) {
 | 
					 | 
				
			||||||
          stackHead.integrate(transaction, offset)
 | 
					          stackHead.integrate(transaction, offset)
 | 
				
			||||||
          state.set(stackHead.id.client, stackHead.id.clock + stackHead.length)
 | 
					          state.set(stackHead.id.client, stackHead.id.clock + stackHead.length)
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
 | 
					      }
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
    // iterate to next stackHead
 | 
					    // iterate to next stackHead
 | 
				
			||||||
    if (stack.length > 0) {
 | 
					    if (stack.length > 0) {
 | 
				
			||||||
      stackHead = /** @type {GC|Item} */ (stack.pop())
 | 
					      stackHead = /** @type {GC|Item} */ (stack.pop())
 | 
				
			||||||
@ -289,22 +346,16 @@ const integrateStructs = (transaction, store, clientsStructRefs) => {
 | 
				
			|||||||
        stackHead = /** @type {GC|Item} */ (curStructsTarget.refs[curStructsTarget.i++])
 | 
					        stackHead = /** @type {GC|Item} */ (curStructsTarget.refs[curStructsTarget.i++])
 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
    } else {
 | 
					 | 
				
			||||||
      // get the struct reader that has the missing struct
 | 
					 | 
				
			||||||
      /**
 | 
					 | 
				
			||||||
       * @type {{ refs: Array<GC|Item>, i: number }}
 | 
					 | 
				
			||||||
       */
 | 
					 | 
				
			||||||
      const structRefs = clientsStructRefs.get(missing) || { refs: [], i: 0 }
 | 
					 | 
				
			||||||
      if (structRefs.refs.length === structRefs.i) {
 | 
					 | 
				
			||||||
        // This update message causally depends on another update message.
 | 
					 | 
				
			||||||
        stack.push(stackHead)
 | 
					 | 
				
			||||||
        return
 | 
					 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
      stack.push(stackHead)
 | 
					  if (restStructs.clients.size > 0) {
 | 
				
			||||||
      stackHead = structRefs.refs[structRefs.i++]
 | 
					    const encoder = new UpdateEncoderV2()
 | 
				
			||||||
 | 
					    writeClientsStructs(encoder, restStructs, new Map())
 | 
				
			||||||
 | 
					    // write empty deleteset
 | 
				
			||||||
 | 
					    // writeDeleteSet(encoder, new DeleteSet())
 | 
				
			||||||
 | 
					    encoding.writeVarUint(encoder.restEncoder, 0) // => no need for an extra function call, just write 0 deletes
 | 
				
			||||||
 | 
					    return { missing: missingSV, update: encoder.toUint8Array() }
 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
  }
 | 
					  return null
 | 
				
			||||||
  store.pendingClientsStructRefs.clear()
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
/**
 | 
					/**
 | 
				
			||||||
@ -317,57 +368,60 @@ const integrateStructs = (transaction, store, clientsStructRefs) => {
 | 
				
			|||||||
export const writeStructsFromTransaction = (encoder, transaction) => writeClientsStructs(encoder, transaction.doc.store, transaction.beforeState)
 | 
					export const writeStructsFromTransaction = (encoder, transaction) => writeClientsStructs(encoder, transaction.doc.store, transaction.beforeState)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
/**
 | 
					/**
 | 
				
			||||||
 * Read the next Item in a Decoder and fill this Item with the read data.
 | 
					 * Read and apply a document update.
 | 
				
			||||||
 *
 | 
					 *
 | 
				
			||||||
 * This is called when data is received from a remote peer.
 | 
					 * This function has the same effect as `applyUpdate` but accepts an decoder.
 | 
				
			||||||
 *
 | 
					 *
 | 
				
			||||||
 * @param {UpdateDecoderV1 | UpdateDecoderV2} decoder The decoder object to read data from.
 | 
					 * @param {decoding.Decoder} decoder
 | 
				
			||||||
 * @param {Transaction} transaction
 | 
					 * @param {Doc} ydoc
 | 
				
			||||||
 * @param {StructStore} store
 | 
					 * @param {any} [transactionOrigin] This will be stored on `transaction.origin` and `.on('update', (update, origin))`
 | 
				
			||||||
 | 
					 * @param {UpdateDecoderV1 | UpdateDecoderV2} [structDecoder]
 | 
				
			||||||
 *
 | 
					 *
 | 
				
			||||||
 * @private
 | 
					 | 
				
			||||||
 * @function
 | 
					 * @function
 | 
				
			||||||
 */
 | 
					 */
 | 
				
			||||||
export const readStructs = (decoder, transaction, store) => {
 | 
					export const readUpdateV2 = (decoder, ydoc, transactionOrigin, structDecoder = new UpdateDecoderV2(decoder)) =>
 | 
				
			||||||
 | 
					  transact(ydoc, transaction => {
 | 
				
			||||||
    let retry = false
 | 
					    let retry = false
 | 
				
			||||||
    const doc = transaction.doc
 | 
					    const doc = transaction.doc
 | 
				
			||||||
 | 
					    const store = doc.store
 | 
				
			||||||
    // let start = performance.now()
 | 
					    // let start = performance.now()
 | 
				
			||||||
  const ss = readClientsStructRefs(decoder, doc)
 | 
					    const ss = readClientsStructRefs(structDecoder, doc)
 | 
				
			||||||
    // console.log('time to read structs: ', performance.now() - start) // @todo remove
 | 
					    // console.log('time to read structs: ', performance.now() - start) // @todo remove
 | 
				
			||||||
    // start = performance.now()
 | 
					    // start = performance.now()
 | 
				
			||||||
 | 
					 | 
				
			||||||
    // console.log('time to merge: ', performance.now() - start) // @todo remove
 | 
					    // console.log('time to merge: ', performance.now() - start) // @todo remove
 | 
				
			||||||
    // start = performance.now()
 | 
					    // start = performance.now()
 | 
				
			||||||
  const { missing, ssRest } = integrateStructs(transaction, store, ss)
 | 
					    const restStructs = integrateStructs(transaction, store, ss)
 | 
				
			||||||
    const pending = store.pendingStructs
 | 
					    const pending = store.pendingStructs
 | 
				
			||||||
    if (pending) {
 | 
					    if (pending) {
 | 
				
			||||||
      // check if we can apply something
 | 
					      // check if we can apply something
 | 
				
			||||||
    for (const [clock, client] of pending.missing) {
 | 
					      for (const [client, clock] of pending.missing) {
 | 
				
			||||||
        if (clock < getState(store, client)) {
 | 
					        if (clock < getState(store, client)) {
 | 
				
			||||||
          retry = true
 | 
					          retry = true
 | 
				
			||||||
          break
 | 
					          break
 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
 | 
					      if (restStructs) {
 | 
				
			||||||
 | 
					        // merge restStructs into store.pending
 | 
				
			||||||
 | 
					        for (const [client, clock] of restStructs.missing) {
 | 
				
			||||||
 | 
					          const mclock = pending.missing.get(client)
 | 
				
			||||||
 | 
					          if (mclock == null || mclock > clock) {
 | 
				
			||||||
 | 
					            pending.missing.set(client, clock)
 | 
				
			||||||
 | 
					          }
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					        pending.update = mergeUpdatesV2([pending.update, restStructs.update])
 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
  if (missing) {
 | 
					 | 
				
			||||||
    if (pending) {
 | 
					 | 
				
			||||||
      pending.missing.set(missing.client, missing.clock)
 | 
					 | 
				
			||||||
      // @todo support v2 as well
 | 
					 | 
				
			||||||
      pending.update = mergeUpdates([pending.update, ssRest])
 | 
					 | 
				
			||||||
    } else {
 | 
					    } else {
 | 
				
			||||||
      store.pendingStructs = { missing: map.create(), update: ssRest }
 | 
					      store.pendingStructs = restStructs
 | 
				
			||||||
      store.pendingStructs.missing.set(missing.client, missing.clock)
 | 
					 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
  }
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    // console.log('time to integrate: ', performance.now() - start) // @todo remove
 | 
					    // console.log('time to integrate: ', performance.now() - start) // @todo remove
 | 
				
			||||||
    // start = performance.now()
 | 
					    // start = performance.now()
 | 
				
			||||||
  const dsRest = readAndApplyDeleteSet(decoder, transaction, store)
 | 
					    const dsRest = readAndApplyDeleteSet(structDecoder, transaction, store)
 | 
				
			||||||
    if (store.pendingDs) {
 | 
					    if (store.pendingDs) {
 | 
				
			||||||
      // @todo we could make a lower-bound state-vector check as we do above
 | 
					      // @todo we could make a lower-bound state-vector check as we do above
 | 
				
			||||||
    const dsRest2 = readAndApplyDeleteSet(store.pendingDs, transaction, store)
 | 
					      const pendingDSUpdate = new UpdateDecoderV2(decoding.createDecoder(store.pendingDs))
 | 
				
			||||||
    if (dsRest1 && dsRest2) {
 | 
					      decoding.readVarUint(pendingDSUpdate.restDecoder) // read 0 structs, because we only encode deletes in pendingdsupdate
 | 
				
			||||||
 | 
					      const dsRest2 = readAndApplyDeleteSet(pendingDSUpdate, transaction, store)
 | 
				
			||||||
 | 
					      if (dsRest && dsRest2) {
 | 
				
			||||||
        // case 1: ds1 != null && ds2 != null
 | 
					        // case 1: ds1 != null && ds2 != null
 | 
				
			||||||
        store.pendingDs = mergeUpdatesV2([dsRest, dsRest2])
 | 
					        store.pendingDs = mergeUpdatesV2([dsRest, dsRest2])
 | 
				
			||||||
      } else {
 | 
					      } else {
 | 
				
			||||||
@ -386,28 +440,10 @@ export const readStructs = (decoder, transaction, store) => {
 | 
				
			|||||||
    // console.log('time to resume delete readers: ', performance.now() - start) // @todo remove
 | 
					    // console.log('time to resume delete readers: ', performance.now() - start) // @todo remove
 | 
				
			||||||
    // start = performance.now()
 | 
					    // start = performance.now()
 | 
				
			||||||
    if (retry) {
 | 
					    if (retry) {
 | 
				
			||||||
    const update = store.pendingStructs.update
 | 
					      const update = /** @type {{update: Uint8Array}} */ (store.pendingStructs).update
 | 
				
			||||||
      store.pendingStructs = null
 | 
					      store.pendingStructs = null
 | 
				
			||||||
    applyUpdate(update)
 | 
					      applyUpdateV2(transaction.doc, update)
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
}
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
/**
 | 
					 | 
				
			||||||
 * Read and apply a document update.
 | 
					 | 
				
			||||||
 *
 | 
					 | 
				
			||||||
 * This function has the same effect as `applyUpdate` but accepts an decoder.
 | 
					 | 
				
			||||||
 *
 | 
					 | 
				
			||||||
 * @param {decoding.Decoder} decoder
 | 
					 | 
				
			||||||
 * @param {Doc} ydoc
 | 
					 | 
				
			||||||
 * @param {any} [transactionOrigin] This will be stored on `transaction.origin` and `.on('update', (update, origin))`
 | 
					 | 
				
			||||||
 * @param {UpdateDecoderV1 | UpdateDecoderV2} [structDecoder]
 | 
					 | 
				
			||||||
 *
 | 
					 | 
				
			||||||
 * @function
 | 
					 | 
				
			||||||
 */
 | 
					 | 
				
			||||||
export const readUpdateV2 = (decoder, ydoc, transactionOrigin, structDecoder = new UpdateDecoderV2(decoder)) =>
 | 
					 | 
				
			||||||
  transact(ydoc, transaction => {
 | 
					 | 
				
			||||||
    readStructs(structDecoder, transaction, ydoc.store)
 | 
					 | 
				
			||||||
    readAndApplyDeleteSet(structDecoder, transaction, ydoc.store)
 | 
					 | 
				
			||||||
  }, transactionOrigin, false)
 | 
					  }, transactionOrigin, false)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
/**
 | 
					/**
 | 
				
			||||||
@ -484,7 +520,21 @@ export const writeStateAsUpdate = (encoder, doc, targetStateVector = new Map())
 | 
				
			|||||||
export const encodeStateAsUpdateV2 = (doc, encodedTargetStateVector, encoder = new UpdateEncoderV2()) => {
 | 
					export const encodeStateAsUpdateV2 = (doc, encodedTargetStateVector, encoder = new UpdateEncoderV2()) => {
 | 
				
			||||||
  const targetStateVector = encodedTargetStateVector == null ? new Map() : decodeStateVector(encodedTargetStateVector)
 | 
					  const targetStateVector = encodedTargetStateVector == null ? new Map() : decodeStateVector(encodedTargetStateVector)
 | 
				
			||||||
  writeStateAsUpdate(encoder, doc, targetStateVector)
 | 
					  writeStateAsUpdate(encoder, doc, targetStateVector)
 | 
				
			||||||
  return encoder.toUint8Array()
 | 
					  const updates = [encoder.toUint8Array()]
 | 
				
			||||||
 | 
					  // also add the pending updates (if there are any)
 | 
				
			||||||
 | 
					  // @todo support diffirent encoders
 | 
				
			||||||
 | 
					  if (encoder.constructor === UpdateEncoderV2) {
 | 
				
			||||||
 | 
					    if (doc.store.pendingDs) {
 | 
				
			||||||
 | 
					      updates.push(doc.store.pendingDs)
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					    if (doc.store.pendingStructs) {
 | 
				
			||||||
 | 
					      updates.push(diffUpdate(doc.store.pendingStructs.update, encodedTargetStateVector))
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					    if (updates.length > 1) {
 | 
				
			||||||
 | 
					      return mergeUpdatesV2(updates)
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					  }
 | 
				
			||||||
 | 
					  return updates[0]
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
/**
 | 
					/**
 | 
				
			||||||
 | 
				
			|||||||
@ -151,6 +151,7 @@ export const encodeStateVectorFromUpdateV2 = (update, YEncoder = DSEncoderV2, YD
 | 
				
			|||||||
    let size = 1
 | 
					    let size = 1
 | 
				
			||||||
    let currClient = curr.id.client
 | 
					    let currClient = curr.id.client
 | 
				
			||||||
    let currClock = curr.id.clock
 | 
					    let currClock = curr.id.clock
 | 
				
			||||||
 | 
					    let stopCounting = false
 | 
				
			||||||
    for (; curr !== null; curr = updateDecoder.next()) {
 | 
					    for (; curr !== null; curr = updateDecoder.next()) {
 | 
				
			||||||
      if (currClient !== curr.id.client) {
 | 
					      if (currClient !== curr.id.client) {
 | 
				
			||||||
        size++
 | 
					        size++
 | 
				
			||||||
@ -159,9 +160,15 @@ export const encodeStateVectorFromUpdateV2 = (update, YEncoder = DSEncoderV2, YD
 | 
				
			|||||||
        encoding.writeVarUint(encoder.restEncoder, currClient)
 | 
					        encoding.writeVarUint(encoder.restEncoder, currClient)
 | 
				
			||||||
        encoding.writeVarUint(encoder.restEncoder, currClock)
 | 
					        encoding.writeVarUint(encoder.restEncoder, currClock)
 | 
				
			||||||
        currClient = curr.id.client
 | 
					        currClient = curr.id.client
 | 
				
			||||||
 | 
					        stopCounting = false
 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
 | 
					      if (curr.constructor === Skip) {
 | 
				
			||||||
 | 
					        stopCounting = true
 | 
				
			||||||
 | 
					      }
 | 
				
			||||||
 | 
					      if (!stopCounting) {
 | 
				
			||||||
        currClock = curr.id.clock + curr.length
 | 
					        currClock = curr.id.clock + curr.length
 | 
				
			||||||
      }
 | 
					      }
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
    // write what we have
 | 
					    // write what we have
 | 
				
			||||||
    encoding.writeVarUint(encoder.restEncoder, currClient)
 | 
					    encoding.writeVarUint(encoder.restEncoder, currClient)
 | 
				
			||||||
    encoding.writeVarUint(encoder.restEncoder, currClock)
 | 
					    encoding.writeVarUint(encoder.restEncoder, currClock)
 | 
				
			||||||
@ -181,7 +188,7 @@ export const encodeStateVectorFromUpdateV2 = (update, YEncoder = DSEncoderV2, YD
 | 
				
			|||||||
 * @param {Uint8Array} update
 | 
					 * @param {Uint8Array} update
 | 
				
			||||||
 * @return {Uint8Array}
 | 
					 * @return {Uint8Array}
 | 
				
			||||||
 */
 | 
					 */
 | 
				
			||||||
export const encodeStateVectorFromUpdate = update => encodeStateVectorFromUpdateV2(update, DSEncoderV1, UpdateDecoderV2)
 | 
					export const encodeStateVectorFromUpdate = update => encodeStateVectorFromUpdateV2(update, DSEncoderV1, UpdateDecoderV1)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
/**
 | 
					/**
 | 
				
			||||||
 * @param {Uint8Array} update
 | 
					 * @param {Uint8Array} update
 | 
				
			||||||
@ -388,9 +395,10 @@ export const mergeUpdatesV2 = (updates, YDecoder = UpdateDecoderV2, YEncoder = U
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
/**
 | 
					/**
 | 
				
			||||||
 * @param {Uint8Array} update
 | 
					 * @param {Uint8Array} update
 | 
				
			||||||
 * @param {Uint8Array} sv
 | 
					 * @param {Uint8Array} [sv]
 | 
				
			||||||
 */
 | 
					 */
 | 
				
			||||||
export const diffUpdate = (update, sv) => {
 | 
					export const diffUpdate = (update, sv = new Uint8Array([0])) => {
 | 
				
			||||||
 | 
					  // @todo!!!
 | 
				
			||||||
  return update
 | 
					  return update
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
				
			|||||||
@ -336,9 +336,8 @@ export const compare = users => {
 | 
				
			|||||||
  const userXmlValues = users.map(u => u.get('xml', Y.YXmlElement).toString())
 | 
					  const userXmlValues = users.map(u => u.get('xml', Y.YXmlElement).toString())
 | 
				
			||||||
  const userTextValues = users.map(u => u.getText('text').toDelta())
 | 
					  const userTextValues = users.map(u => u.getText('text').toDelta())
 | 
				
			||||||
  for (const u of users) {
 | 
					  for (const u of users) {
 | 
				
			||||||
    t.assert(u.store.pendingDeleteReaders.length === 0)
 | 
					    t.assert(u.store.pendingDs === null)
 | 
				
			||||||
    t.assert(u.store.pendingStack.length === 0)
 | 
					    t.assert(u.store.pendingStructs === null)
 | 
				
			||||||
    t.assert(u.store.pendingClientsStructRefs.size === 0)
 | 
					 | 
				
			||||||
  }
 | 
					  }
 | 
				
			||||||
  // Test Array iterator
 | 
					  // Test Array iterator
 | 
				
			||||||
  t.compare(users[0].getArray('array').toArray(), Array.from(users[0].getArray('array')))
 | 
					  t.compare(users[0].getArray('array').toArray(), Array.from(users[0].getArray('array')))
 | 
				
			||||||
 | 
				
			|||||||
@ -52,17 +52,17 @@ const encDoc = {
 | 
				
			|||||||
  mergeUpdates: (updates) => {
 | 
					  mergeUpdates: (updates) => {
 | 
				
			||||||
    const ydoc = new Y.Doc()
 | 
					    const ydoc = new Y.Doc()
 | 
				
			||||||
    updates.forEach(update => {
 | 
					    updates.forEach(update => {
 | 
				
			||||||
      Y.applyUpdate(ydoc, update)
 | 
					      Y.applyUpdateV2(ydoc, update)
 | 
				
			||||||
    })
 | 
					    })
 | 
				
			||||||
    return Y.encodeStateAsUpdate(ydoc)
 | 
					    return Y.encodeStateAsUpdateV2(ydoc)
 | 
				
			||||||
  },
 | 
					  },
 | 
				
			||||||
  encodeStateAsUpdate: Y.encodeStateAsUpdate,
 | 
					  encodeStateAsUpdate: Y.encodeStateAsUpdateV2,
 | 
				
			||||||
  applyUpdate: Y.applyUpdate,
 | 
					  applyUpdate: Y.applyUpdateV2,
 | 
				
			||||||
  logUpdate: Y.logUpdate,
 | 
					  logUpdate: Y.logUpdateV2,
 | 
				
			||||||
  parseUpdateMeta: Y.parseUpdateMetaV2,
 | 
					  parseUpdateMeta: Y.parseUpdateMetaV2,
 | 
				
			||||||
  encodeStateVectorFromUpdate: Y.encodeStateVectorFromUpdateV2,
 | 
					  encodeStateVectorFromUpdate: Y.encodeStateVectorFromUpdateV2,
 | 
				
			||||||
  encodeStateVector: Y.encodeStateVector,
 | 
					  encodeStateVector: Y.encodeStateVectorV2,
 | 
				
			||||||
  updateEventName: 'update',
 | 
					  updateEventName: 'updateV2',
 | 
				
			||||||
  description: 'Merge via Y.Doc'
 | 
					  description: 'Merge via Y.Doc'
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@ -131,13 +131,13 @@ const checkUpdateCases = (ydoc, updates, enc) => {
 | 
				
			|||||||
  // Case 5: overlapping with many duplicates
 | 
					  // Case 5: overlapping with many duplicates
 | 
				
			||||||
  cases.push(enc.mergeUpdates(cases))
 | 
					  cases.push(enc.mergeUpdates(cases))
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  const targetState = enc.encodeStateAsUpdate(ydoc)
 | 
					  // const targetState = enc.encodeStateAsUpdate(ydoc)
 | 
				
			||||||
  t.info('Target State: ')
 | 
					  // t.info('Target State: ')
 | 
				
			||||||
  enc.logUpdate(targetState)
 | 
					  // enc.logUpdate(targetState)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
  cases.forEach((updates, i) => {
 | 
					  cases.forEach((updates, i) => {
 | 
				
			||||||
    t.info('State Case $' + i + ':')
 | 
					    // t.info('State Case $' + i + ':')
 | 
				
			||||||
    enc.logUpdate(updates)
 | 
					    // enc.logUpdate(updates)
 | 
				
			||||||
    const merged = new Y.Doc()
 | 
					    const merged = new Y.Doc()
 | 
				
			||||||
    enc.applyUpdate(merged, updates)
 | 
					    enc.applyUpdate(merged, updates)
 | 
				
			||||||
    t.compareArrays(merged.getArray().toArray(), ydoc.getArray().toArray())
 | 
					    t.compareArrays(merged.getArray().toArray(), ydoc.getArray().toArray())
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user