From 654510f3ffd26e004099ef2cdc6b29e94a22262e Mon Sep 17 00:00:00 2001 From: Kevin Jahns Date: Wed, 10 Apr 2019 18:52:22 +0200 Subject: [PATCH] read struct refs as array --- src/index.js | 1 - src/structs/AbstractItem.js | 16 -- src/types/YXmlElement.js | 2 +- src/utils/StructStore.js | 18 ++- src/utils/Transaction.js | 2 + src/utils/Y.js | 2 - src/utils/structEncoding.js | 294 +++++++++++++++++------------------- tests/testHelper.js | 3 +- 8 files changed, 161 insertions(+), 177 deletions(-) diff --git a/src/index.js b/src/index.js index a995c3d5..d32657eb 100644 --- a/src/index.js +++ b/src/index.js @@ -22,7 +22,6 @@ export { ID, createID, compareIDs, - writeStructs, writeStructsFromTransaction, readStructs, getState, diff --git a/src/structs/AbstractItem.js b/src/structs/AbstractItem.js index 09b82c00..12b92eb6 100644 --- a/src/structs/AbstractItem.js +++ b/src/structs/AbstractItem.js @@ -131,22 +131,6 @@ export class AbstractItem extends AbstractStruct { const parent = this.parent const parentSub = this.parentSub const length = this.length - /* - # $this has to find a unique position between origin and the next known character - # case 1: $origin equals $o.origin: the $creator parameter decides if left or right - # let $OL= [o1,o2,o3,o4], whereby $this is to be inserted between o1 and o4 - # o2,o3 and o4 origin is 1 (the position of o2) - # there is the case that $this.creator < o2.creator, but o3.creator < $this.creator - # then o2 knows o3. Since on another client $OL could be [o1,o3,o4] the problem is complex - # therefore $this would be always to the right of o3 - # case 2: $origin < $o.origin - # if current $this insert_position > $o origin: $this ins - # else $insert_position will not change - # (maybe we encounter case 1 later, then this will be to the right of $o) - # case 3: $origin > $o.origin - # $this insert_position is to the left of $o (forever!) - */ - // handle conflicts /** * @type {AbstractItem|null} */ diff --git a/src/types/YXmlElement.js b/src/types/YXmlElement.js index 7c7c0a37..b1b608e7 100644 --- a/src/types/YXmlElement.js +++ b/src/types/YXmlElement.js @@ -173,7 +173,7 @@ export class YXmlFragment extends AbstractType { * Returns all YXmlElements that match the query. * Similar to Dom's {@link querySelectorAll}. * - * TODO: Does not yet support all queries. Currently only query by tagName. + * @todo Does not yet support all queries. Currently only query by tagName. * * @param {CSS_Selector} query The query on the children * @return {Array} The elements that match this query. diff --git a/src/utils/StructStore.js b/src/utils/StructStore.js index db72b970..ce382e39 100644 --- a/src/utils/StructStore.js +++ b/src/utils/StructStore.js @@ -1,3 +1,4 @@ +// todo rename AbstractRef to abstractStructRef import { GC, @@ -16,11 +17,20 @@ export class StructStore { */ this.clients = new Map() /** - * Store uncompleted struct readers here - * @see tryResumePendingReaders - * @type {Set<{stack:Array,structReaders:Map>,missing:ID,structReaderIterator:IterableIterator>,structReaderIteratorResult:IteratorResult>}>} + * Store incompleted struct reads here + * `i` denotes to the next read operation + * We could shift the array of refs instead, but shift is incredible + * slow in Chrome for arrays with more than 100k elements + * @see tryResumePendingStructRefs + * @type {Map}>} */ - this.pendingStructReaders = new Set() + this.pendingClientsStructRefs = new Map() + /** + * Stack of pending structs waiting for struct dependencies + * Maximum length of stack is structReaders.size + * @type {Array} + */ + this.pendingStack = [] /** * @type {Array} */ diff --git a/src/utils/Transaction.js b/src/utils/Transaction.js index bf691de7..04543d7a 100644 --- a/src/utils/Transaction.js +++ b/src/utils/Transaction.js @@ -115,6 +115,8 @@ export const nextID = transaction => { } /** + * Implements the functionality of `y.transact(()=>{..})` + * * @param {Y} y * @param {function(Transaction):void} f */ diff --git a/src/utils/Y.js b/src/utils/Y.js index 05ad8812..080a7c49 100644 --- a/src/utils/Y.js +++ b/src/utils/Y.js @@ -53,8 +53,6 @@ export class Y extends Observable { * other peers. * * @param {function(Transaction):void} f The function that should be executed as a transaction - * - * @todo separate this into a separate function */ transact (f) { transact(this, f) diff --git a/src/utils/structEncoding.js b/src/utils/structEncoding.js index 0e4edc5d..7bd220d1 100644 --- a/src/utils/structEncoding.js +++ b/src/utils/structEncoding.js @@ -23,9 +23,7 @@ import { import * as encoding from 'lib0/encoding.js' import * as decoding from 'lib0/decoding.js' -import * as map from 'lib0/map.js' import * as binary from 'lib0/binary.js' -import * as iterator from 'lib0/iterator.js' /** * @typedef {Map} StateMap @@ -43,48 +41,53 @@ export const structRefs = [ ] /** - * @param {decoding.Decoder} decoder - * @param {number} structsLen - * @param {ID} nextID - * @param {number} localState next expected clock by nextID.client - * @return {IterableIterator} + * @param {encoding.Encoder} encoder + * @param {Array} structs All structs by `client` + * @param {number} client + * @param {number} clock write structs starting with `ID(client,clock)` */ -const createStructReaderIterator = (decoder, structsLen, nextID, localState) => iterator.createIterator(() => { - let done = false - let value - do { - if (structsLen === 0) { - done = true - value = undefined - break - } - const info = decoding.readUint8(decoder) - value = new structRefs[binary.BITS5 & info](decoder, nextID, info) - nextID = createID(nextID.client, nextID.clock + value.length) - structsLen-- - } while (nextID.clock <= localState) // read until we find something new (check nextID.clock instead because it equals `clock+len`) - return { done, value } -}) +const writeStructs = (encoder, structs, client, clock) => { + // write first id + const startNewStructs = findIndexSS(structs, clock) + // write # encoded structs + encoding.writeVarUint(encoder, structs.length - startNewStructs) + writeID(encoder, createID(client, clock)) + const firstStruct = structs[startNewStructs] + // write first struct with an offset + firstStruct.write(encoder, clock - firstStruct.id.clock, 0) + for (let i = startNewStructs + 1; i < structs.length; i++) { + structs[i].write(encoder, 0, 0) + } +} /** - * @param {encoding.Encoder} encoder - * @param {Transaction} transaction + * @param {decoding.Decoder} decoder + * @param {number} numOfStructs + * @param {ID} nextID + * @return {Array} */ -export const writeStructsFromTransaction = (encoder, transaction) => writeStructs(encoder, transaction.y.store, transaction.beforeState) +const readStructRefs = (decoder, numOfStructs, nextID) => { + /** + * @type {Array} + */ + const refs = [] + for (let i = 0; i < numOfStructs; i++) { + const info = decoding.readUint8(decoder) + const ref = new structRefs[binary.BITS5 & info](decoder, nextID, info) + nextID = createID(nextID.client, nextID.clock + ref.length) + refs.push(ref) + } + return refs +} /** * @param {encoding.Encoder} encoder * @param {StructStore} store * @param {StateMap} _sm */ -export const writeStructs = (encoder, store, _sm) => { +export const writeClientsStructs = (encoder, store, _sm) => { // we filter all valid _sm entries into sm const sm = new Map() - const encoderUserPosMap = map.create() - const startMessagePos = encoding.length(encoder) - // write diff to pos of end of this message - // we use it in readStructs to jump ahead to the end of the message - encoding.writeUint32(encoder, 0) _sm.forEach((clock, client) => { // only write if new structs are available if (getState(store, client) > clock) { @@ -99,59 +102,28 @@ export const writeStructs = (encoder, store, _sm) => { // write # states that were updated encoding.writeVarUint(encoder, sm.size) sm.forEach((clock, client) => { - // write first id - writeID(encoder, createID(client, clock)) - encoderUserPosMap.set(client, encoding.length(encoder)) - // write diff to pos where structs are written - encoding.writeUint32(encoder, 0) - }) - sm.forEach((clock, client) => { - const decPos = encoderUserPosMap.get(client) - // fill out diff to pos where structs are written - encoding.setUint32(encoder, decPos, encoding.length(encoder) - decPos) - /** - * @type {Array} - */ // @ts-ignore - const structs = store.clients.get(client) - const startNewStructs = findIndexSS(structs, clock) - // write # encoded structs - encoding.writeVarUint(encoder, structs.length - startNewStructs) - const firstStruct = structs[startNewStructs] - // write first struct with an offset (may be 0) - firstStruct.write(encoder, clock - firstStruct.id.clock, 0) - for (let i = startNewStructs + 1; i < structs.length; i++) { - structs[i].write(encoder, 0, 0) - } + writeStructs(encoder, store.clients.get(client), client, clock) }) - // fill out diff to pos of end of message - encoding.setUint32(encoder, startMessagePos, encoding.length(encoder) - startMessagePos) } /** * @param {decoding.Decoder} decoder The decoder object to read data from. - * @param {Map} localState - * @return {Map>} + * @return {Map>} */ -const readStructReaders = (decoder, localState) => { +export const readClientsStructRefs = decoder => { /** - * @type {Map>} + * @type {Map>} */ - const structReaders = new Map() - const endOfMessagePos = decoder.pos + decoding.readUint32(decoder) - const clientbeforeState = decoding.readVarUint(decoder) - for (let i = 0; i < clientbeforeState; i++) { + const clientRefs = new Map() + const numOfStateUpdates = decoding.readVarUint(decoder) + for (let i = 0; i < numOfStateUpdates; i++) { + const numberOfStructs = decoding.readVarUint(decoder) const nextID = readID(decoder) - const decoderPos = decoder.pos + decoding.readUint32(decoder) - const structReaderDecoder = decoding.clone(decoder, decoderPos) - const numberOfStructs = decoding.readVarUint(structReaderDecoder) - structReaders.set(nextID.client, createStructReaderIterator(structReaderDecoder, numberOfStructs, nextID, localState.get(nextID.client) || 0)) + const refs = readStructRefs(decoder, numberOfStructs, nextID) + clientRefs.set(nextID.client, refs) } - // Decoder is still stuck at creating struct readers. - // Jump ahead to end of message so that reading can continue. - // We will use the created struct readers for the remaining part of this workflow. - decoder.pos = endOfMessagePos - return structReaders + return clientRefs } /** @@ -175,86 +147,69 @@ const readStructReaders = (decoder, localState) => { * * @param {Transaction} transaction * @param {StructStore} store - * @param {Map} localState - * @param {Map>} structReaders - * @param {Array} stack Stack of pending structs waiting for struct dependencies. - * Maximum length of stack is structReaders.size. - * @param {IterableIterator>} structReaderIterator - * @param {IteratorResult>} structReaderIteratorResult * * @todo reimplement without iterators - read everything in arrays instead */ -const execStructReaders = (transaction, store, localState, structReaders, stack, structReaderIterator, structReaderIteratorResult) => { +const resumeStructIntegration = (transaction, store) => { + const stack = store.pendingStack + const clientsStructRefs = store.pendingClientsStructRefs // iterate over all struct readers until we are done - while (stack.length !== 0 || !structReaderIteratorResult.done) { + while (stack.length !== 0 || clientsStructRefs.size !== 0) { if (stack.length === 0) { - // stack is empty. We know that there there are more structReaders to be processed - const nextStructRes = structReaderIteratorResult.value.next() - if (nextStructRes.done) { - // current structReaderIteratorResult is empty, use next one - structReaderIteratorResult = structReaderIterator.next() - } else { - stack.push(nextStructRes.value) - } - } else { - const ref = stack[stack.length - 1] - const m = ref._missing - while (m.length > 0) { - const missing = m[m.length - 1] - if (!exists(store, missing)) { - // get the struct reader that has the missing struct - const reader = structReaders.get(missing.client) - const nextRef = reader === undefined ? undefined : reader.next().value - if (nextRef === undefined) { - // This update message causally depends on another update message. - // Store current stack and readers in StructStore and resume the computation at another time - store.pendingStructReaders.add({ stack, structReaders, missing, structReaderIterator, structReaderIteratorResult }) - return - } - stack.push(nextRef) - break - } - ref._missing.pop() - } - if (m.length === 0) { - const localClock = (localState.get(ref.id.client) || 0) - const offset = ref.id.clock < localClock ? localClock - ref.id.clock : 0 - if (offset < ref.length) { - if (ref.id.clock + offset !== localClock) { - // A previous message from this client is missing - // Store current stack and readers in StructStore and resume the computation at another time - store.pendingStructReaders.add({ stack, structReaders, missing: createID(ref.id.client, localClock), structReaderIterator, structReaderIteratorResult }) - return - } - ref.toStruct(transaction.y, store, offset).integrate(transaction) - } - stack.pop() + // take any first struct from clientsStructRefs and put it on the stack + const [client, structRefs] = clientsStructRefs.entries().next().value + stack.push(structRefs.refs[structRefs.i++]) + if (structRefs.refs.length === structRefs.i) { + clientsStructRefs.delete(client) } } - } - if (stack.length > 0) { - store.pendingStructReaders.add({ stack, structReaders, missing: stack[stack.length - 1].id, structReaderIterator, structReaderIteratorResult }) - } -} - -/** - * Try to resume pending struct readers in `store.pendingReaders` while `pendingReaders.nextMissing` - * exists. - * - * @param {Transaction} transaction - * @param {StructStore} store - */ -const tryResumePendingStructReaders = (transaction, store) => { - let resume = true - const pendingReaders = store.pendingStructReaders - while (resume) { - resume = false - for (const pendingReader of pendingReaders) { - if (exists(store, pendingReader.missing)) { - resume = true // found at least one more reader to execute - pendingReaders.delete(pendingReader) - execStructReaders(transaction, store, getStates(store), pendingReader.structReaders, pendingReader.stack, pendingReader.structReaderIterator, pendingReader.structReaderIteratorResult) + const ref = stack[stack.length - 1] + const m = ref._missing + const client = ref.id.client + const localClock = getState(store, client) + const offset = ref.id.clock < localClock ? localClock - ref.id.clock : 0 + if (ref.id.clock + offset !== localClock) { + // A previous message from this client is missing + // check if there is a pending structRef with a smaller clock and switch them + const structRefs = clientsStructRefs.get(client) + if (structRefs !== undefined) { + const r = structRefs.refs[structRefs.i] + if (r.id.clock < ref.id.clock) { + // put ref with smaller clock on stack instead and continue + structRefs.refs[structRefs.i] = ref + stack[stack.length - 1] = r + // sort the set because this approach might bring the list out of order + structRefs.refs = structRefs.refs.slice(structRefs.i).sort((r1, r2) => r1.id.client - r2.id.client) + structRefs.i = 0 + continue + } } + // wait until missing struct is available + return + } + while (m.length > 0) { + const missing = m[m.length - 1] + if (!exists(store, missing)) { + const client = missing.client + // get the struct reader that has the missing struct + const structRefs = clientsStructRefs.get(client) + if (structRefs === undefined) { + // This update message causally depends on another update message. + return + } + stack.push(structRefs.refs[structRefs.i++]) + if (structRefs.i === structRefs.refs.length) { + clientsStructRefs.delete(client) + } + break + } + ref._missing.pop() + } + if (m.length === 0) { + if (offset < ref.length) { + ref.toStruct(transaction.y, store, offset).integrate(transaction) + } + stack.pop() } } } @@ -271,6 +226,43 @@ export const tryResumePendingDeleteReaders = (transaction, store) => { } } +/** + * @param {Map,i:number}>} pendingClientsStructRefs + * @param {number} client + * @param {Array} refs + */ +const setPendingClientsStructRefs = (pendingClientsStructRefs, client, refs) => { + pendingClientsStructRefs.set(client, { refs, i: 0 }) +} + +/** + * @param {encoding.Encoder} encoder + * @param {Transaction} transaction + */ +export const writeStructsFromTransaction = (encoder, transaction) => writeClientsStructs(encoder, transaction.y.store, transaction.beforeState) + +/** + * @param {StructStore} store + * @param {Map>} clientsStructsRefs + */ +const mergeReadStructsIntoPendingReads = (store, clientsStructsRefs) => { + const pendingClientsStructRefs = store.pendingClientsStructRefs + for (const [client, structRefs] of clientsStructsRefs) { + const pendingStructRefs = pendingClientsStructRefs.get(client) + if (pendingStructRefs === undefined) { + setPendingClientsStructRefs(pendingClientsStructRefs, client, structRefs) + } else { + // merge into existing structRefs + const merged = pendingStructRefs.i > 0 ? pendingStructRefs.refs.slice(pendingStructRefs.i) : pendingStructRefs.refs + for (let i = 0; i < structRefs.length; i++) { + merged.push(structRefs[i]) + } + pendingStructRefs.i = 0 + pendingStructRefs.refs = merged.sort((r1, r2) => r1.id.clock - r2.id.clock) + } + } +} + /** * Read the next Item in a Decoder and fill this Item with the read data. * @@ -283,11 +275,9 @@ export const tryResumePendingDeleteReaders = (transaction, store) => { * @private */ export const readStructs = (decoder, transaction, store) => { - const localState = getStates(store) - const readers = readStructReaders(decoder, localState) - const structReaderIterator = readers.values() - execStructReaders(transaction, store, localState, readers, [], structReaderIterator, structReaderIterator.next()) - tryResumePendingStructReaders(transaction, store) + const clientsStructRefs = readClientsStructRefs(decoder) + mergeReadStructsIntoPendingReads(store, clientsStructRefs) + resumeStructIntegration(transaction, store) tryResumePendingDeleteReaders(transaction, store) } @@ -307,6 +297,6 @@ export const readModel = (decoder, transaction, store) => { * @param {Map} [targetState] The state of the target that receives the update. Leave empty to write all known structs */ export const writeModel = (encoder, store, targetState = new Map()) => { - writeStructs(encoder, store, targetState) + writeClientsStructs(encoder, store, targetState) writeDeleteSet(encoder, createDeleteSetFromStructStore(store)) } diff --git a/tests/testHelper.js b/tests/testHelper.js index 44b94883..06adaffd 100644 --- a/tests/testHelper.js +++ b/tests/testHelper.js @@ -276,7 +276,8 @@ export const compare = users => { const userTextValues = users.map(u => u.getText('text').toDelta()) for (const u of users) { t.assert(u.store.pendingDeleteReaders.length === 0) - t.assert(u.store.pendingStructReaders.size === 0) + t.assert(u.store.pendingStack.length === 0) + t.assert(u.store.pendingClientsStructRefs.size === 0) } for (let i = 0; i < users.length - 1; i++) { t.compare(userArrayValues[i].length, users[i].getArray('array').length)