diff --git a/src/utils/updates.js b/src/utils/updates.js index 9bfcfc01..40de0152 100644 --- a/src/utils/updates.js +++ b/src/utils/updates.js @@ -5,29 +5,13 @@ import * as encoding from 'lib0/encoding.js' import { createID, readItemContent, - Item, GC, AbstractUpdateDecoder, AbstractUpdateEncoder, UpdateDecoderV1, UpdateDecoderV2, UpdateEncoderV1, UpdateEncoderV2 // eslint-disable-line + Item, GC, AbstractUpdateDecoder, UpdateDecoderV1, UpdateDecoderV2, UpdateEncoderV1, UpdateEncoderV2 // eslint-disable-line } from '../internals.js' -/** - * @param {Array} updates - * @return {Uint8Array} - */ -export const mergeUpdates = updates => { - return updates[0] -} - -/** - * @param {Uint8Array} update - * @param {Uint8Array} sv - */ -export const diffUpdate = (update, sv) => { - return update -} - /** * @param {AbstractUpdateDecoder} decoder */ -export function * lazyStructReaderGenerator (decoder) { +function * lazyStructReaderGenerator (decoder) { const numOfStateUpdates = decoding.readVarUint(decoder.restDecoder) for (let i = 0; i < numOfStateUpdates; i++) { const numberOfStructs = decoding.readVarUint(decoder.restDecoder) @@ -96,6 +80,13 @@ export class LazyStructWriter { */ constructor (encoder) { this.fresh = true + /** + * We keep the last written struct around in case we want to + * merge it with the next written struct. + * When a new struct is received: if mergeable ⇒ merge; otherwise ⇒ write curr and keep new struct around. + * @type {null | Item | GC} + */ + this.curr = null this.currClient = 0 this.startClock = 0 this.written = 0 @@ -112,57 +103,192 @@ export class LazyStructWriter { */ this.clientStructs = [] } +} - flushCurr () { - if (!this.fresh) { - this.clientStructs.push({ written: this.written, restEncoder: encoding.toUint8Array(this.encoder.restEncoder) }) - this.encoder.restEncoder = encoding.createEncoder() - this.fresh = true - } - } +/** + * @param {Array} updates + * @return {Uint8Array} + */ +export const mergeUpdates = updates => mergeUpdatesV2(updates, UpdateDecoderV1, UpdateEncoderV2) - /** - * @param {Item | GC} struct - * @param {number} offset - */ - write (struct, offset) { - // flush curr if we start another client - if (!this.fresh && this.currClient !== struct.id.client) { - this.flushCurr() - this.currClient = struct.id.client - // write next client - this.encoder.writeClient(struct.id.client) - // write startClock - encoding.writeVarUint(this.encoder.restEncoder, struct.id.clock) - } - struct.write(this.encoder, offset) - this.written++ - } - - toUint8Array () { - this.flushCurr() - - // this is a fresh encoder because we called flushCurr - const restEncoder = this.encoder.restEncoder - - /** - * Now we put all the fragments together. - * This works similarly to `writeClientsStructs` - */ - - // write # states that were updated - i.e. the clients - encoding.writeVarUint(restEncoder, this.clientStructs.length) - - for (let i = 0; i < this.clientStructs.length; i++) { - const partStructs = this.clientStructs[i] - /** - * Works similarly to `writeStructs` - */ - // write # encoded structs - encoding.writeVarUint(restEncoder, partStructs.written) - // write the rest of the fragment - encoding.writeUint8Array(restEncoder, partStructs.restEncoder) - } - return this.encoder.toUint8Array() +/** + * This method is intended to slice any kind of struct and retrieve the right part. + * It does not handle side-effects, so it should only be used by the lazy-encoder. + * + * @param {Item | GC} left + * @param {number} diff + * @return {Item | GC} + */ +const sliceStruct = (left, diff) => { + if (left.constructor === GC) { + const { client, clock } = left.id + return new GC(createID(client, clock + diff), left.length - diff) + } else { + const leftItem = /** @type {Item} */ (left) + const { client, clock } = leftItem.id + return new Item( + createID(client, clock + diff), + null, + createID(client, clock + diff - 1), + null, + leftItem.rightOrigin, + leftItem.parent, + leftItem.parentSub, + leftItem.content.splice(diff) + ) + } +} + +/** + * + * This function works similarly to `readUpdateV2`. + * + * @param {Array} updates + * @param {typeof UpdateDecoderV1 | typeof UpdateDecoderV2} [YDecoder] + * @param {typeof UpdateEncoderV1 | typeof UpdateEncoderV2} [YEncoder] + * @return {Uint8Array} + */ +export const mergeUpdatesV2 = (updates, YDecoder = UpdateDecoderV2, YEncoder = UpdateEncoderV2) => { + const updateDecoders = updates.map(update => new UpdateDecoderV1(decoding.createDecoder(update))) + let lazyStructDecoders = updateDecoders.map(decoder => new LazyStructReader(decoder)) + + /** + * @todo we don't need offset because we always slice before + * @type {null | { struct: Item | GC, offset: number }} + */ + let currWrite = null + + const updateEncoder = new YEncoder() + // write structs lazily + const lazyStructEncoder = new LazyStructWriter(updateEncoder) + + // Note: We need to ensure that all lazyStructDecoders are fully consumed + // Note: Should merge document updates whenever possible - even from different updates + // Note: Should handle that some operations cannot be applied yet () + + while (true) { + // Write higher clients first ⇒ sort by clientID & clock and remove decoders without content + lazyStructDecoders = lazyStructDecoders.filter(dec => dec.curr !== null) + lazyStructDecoders.sort( + /** @type {function(any,any):number} */ (dec1, dec2) => + dec1.curr.id.client === dec2.curr.id.client + ? dec1.curr.id.clock - dec2.curr.id.clock + : dec1.curr.id.client - dec2.curr.id.client + ) + if (lazyStructDecoders.length === 0) { + break + } + const currDecoder = lazyStructDecoders[0] + // write from currDecoder until the next operation is from another client or if filler-struct + // then we need to reorder the decoders and find the next operation to write + const firstClient = /** @type {Item | GC} */ (currDecoder.curr).id.client + if (currWrite !== null) { + let curr = /** @type {Item | GC} */ (currDecoder.curr) + if (firstClient !== currWrite.struct.id.client) { + writeStructToLazyStructWriter(lazyStructEncoder, currWrite.struct, currWrite.offset) + currWrite = { struct: curr, offset: 0 } + currDecoder.next() + } else if (currWrite.struct.id.clock + currWrite.struct.length < curr.id.clock) { + // @todo write currStruct & set currStruct = Skip(clock = currStruct.id.clock + currStruct.length, length = curr.id.clock - self.clock) + throw new Error('unhandled case') // @Todo ! + } else if (currWrite.struct.id.clock + currWrite.struct.length >= curr.id.clock) { + const diff = currWrite.struct.id.clock + currWrite.struct.length - curr.id.clock + if (diff > 0) { + curr = sliceStruct(curr, diff) + } + if (!currWrite.struct.mergeWith(/** @type {any} */ (curr))) { + writeStructToLazyStructWriter(lazyStructEncoder, currWrite.struct, currWrite.offset) + currWrite = { struct: curr, offset: 0 } + currDecoder.next() + } + } + } else { + currWrite = { struct: /** @type {Item | GC} */ (currDecoder.curr), offset: 0 } + currDecoder.next() + } + for ( + let next = currDecoder.curr; + next !== null && next.id.client === firstClient && next.id.clock === currWrite.struct.id.clock + currWrite.struct.length; // @Todo && next.constructor !== skippable + next = currDecoder.next() + ) { + writeStructToLazyStructWriter(lazyStructEncoder, currWrite.struct, currWrite.offset) + currWrite = { struct: next, offset: 0 } + } + } + finishLazyStructWriting(lazyStructEncoder) + + // Read DeleteSets and merge them. + // Write merged deleteset. + // -- updateEncoder.writeDs() + return updateEncoder.toUint8Array() +} + +/** + * @param {Uint8Array} update + * @param {Uint8Array} sv + */ +export const diffUpdate = (update, sv) => { + return update +} + +/** + * @param {LazyStructWriter} lazyWriter + */ +const flushLazyStructWriter = lazyWriter => { + if (!lazyWriter.fresh) { + lazyWriter.clientStructs.push({ written: lazyWriter.written, restEncoder: encoding.toUint8Array(lazyWriter.encoder.restEncoder) }) + lazyWriter.encoder.restEncoder = encoding.createEncoder() + lazyWriter.fresh = true + } +} + +/** + * @param {LazyStructWriter} lazyWriter + * @param {Item | GC} struct + * @param {number} offset + */ +const writeStructToLazyStructWriter = (lazyWriter, struct, offset) => { + // flush curr if we start another client + if (!lazyWriter.fresh && lazyWriter.currClient !== struct.id.client) { + flushLazyStructWriter(lazyWriter) + lazyWriter.currClient = struct.id.client + // write next client + lazyWriter.encoder.writeClient(struct.id.client) + // write startClock + encoding.writeVarUint(lazyWriter.encoder.restEncoder, struct.id.clock) + } + struct.write(lazyWriter.encoder, offset) + lazyWriter.written++ +} +/** + * Call this function when we collected all parts and want to + * put all the parts together. After calling this method, + * you can continue using the UpdateEncoder. + * + * @param {LazyStructWriter} lazyWriter + */ +const finishLazyStructWriting = (lazyWriter) => { + flushLazyStructWriter(lazyWriter) + + // this is a fresh encoder because we called flushCurr + const restEncoder = lazyWriter.encoder.restEncoder + + /** + * Now we put all the fragments together. + * This works similarly to `writeClientsStructs` + */ + + // write # states that were updated - i.e. the clients + encoding.writeVarUint(restEncoder, lazyWriter.clientStructs.length) + + for (let i = 0; i < lazyWriter.clientStructs.length; i++) { + const partStructs = lazyWriter.clientStructs[i] + /** + * Works similarly to `writeStructs` + */ + // write # encoded structs + encoding.writeVarUint(restEncoder, partStructs.written) + // write the rest of the fragment + encoding.writeUint8Array(restEncoder, partStructs.restEncoder) } }