implement merge-logic - #263
This commit is contained in:
parent
783c4d8209
commit
320da29b69
@ -5,29 +5,13 @@ import * as encoding from 'lib0/encoding.js'
|
||||
import {
|
||||
createID,
|
||||
readItemContent,
|
||||
Item, GC, AbstractUpdateDecoder, AbstractUpdateEncoder, UpdateDecoderV1, UpdateDecoderV2, UpdateEncoderV1, UpdateEncoderV2 // eslint-disable-line
|
||||
Item, GC, AbstractUpdateDecoder, UpdateDecoderV1, UpdateDecoderV2, UpdateEncoderV1, UpdateEncoderV2 // eslint-disable-line
|
||||
} from '../internals.js'
|
||||
|
||||
/**
|
||||
* @param {Array<Uint8Array>} updates
|
||||
* @return {Uint8Array}
|
||||
*/
|
||||
export const mergeUpdates = updates => {
|
||||
return updates[0]
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {Uint8Array} update
|
||||
* @param {Uint8Array} sv
|
||||
*/
|
||||
export const diffUpdate = (update, sv) => {
|
||||
return update
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {AbstractUpdateDecoder} decoder
|
||||
*/
|
||||
export function * lazyStructReaderGenerator (decoder) {
|
||||
function * lazyStructReaderGenerator (decoder) {
|
||||
const numOfStateUpdates = decoding.readVarUint(decoder.restDecoder)
|
||||
for (let i = 0; i < numOfStateUpdates; i++) {
|
||||
const numberOfStructs = decoding.readVarUint(decoder.restDecoder)
|
||||
@ -96,6 +80,13 @@ export class LazyStructWriter {
|
||||
*/
|
||||
constructor (encoder) {
|
||||
this.fresh = true
|
||||
/**
|
||||
* We keep the last written struct around in case we want to
|
||||
* merge it with the next written struct.
|
||||
* When a new struct is received: if mergeable ⇒ merge; otherwise ⇒ write curr and keep new struct around.
|
||||
* @type {null | Item | GC}
|
||||
*/
|
||||
this.curr = null
|
||||
this.currClient = 0
|
||||
this.startClock = 0
|
||||
this.written = 0
|
||||
@ -112,57 +103,192 @@ export class LazyStructWriter {
|
||||
*/
|
||||
this.clientStructs = []
|
||||
}
|
||||
}
|
||||
|
||||
flushCurr () {
|
||||
if (!this.fresh) {
|
||||
this.clientStructs.push({ written: this.written, restEncoder: encoding.toUint8Array(this.encoder.restEncoder) })
|
||||
this.encoder.restEncoder = encoding.createEncoder()
|
||||
this.fresh = true
|
||||
}
|
||||
}
|
||||
/**
|
||||
* @param {Array<Uint8Array>} updates
|
||||
* @return {Uint8Array}
|
||||
*/
|
||||
export const mergeUpdates = updates => mergeUpdatesV2(updates, UpdateDecoderV1, UpdateEncoderV2)
|
||||
|
||||
/**
|
||||
* @param {Item | GC} struct
|
||||
* @param {number} offset
|
||||
*/
|
||||
write (struct, offset) {
|
||||
// flush curr if we start another client
|
||||
if (!this.fresh && this.currClient !== struct.id.client) {
|
||||
this.flushCurr()
|
||||
this.currClient = struct.id.client
|
||||
// write next client
|
||||
this.encoder.writeClient(struct.id.client)
|
||||
// write startClock
|
||||
encoding.writeVarUint(this.encoder.restEncoder, struct.id.clock)
|
||||
}
|
||||
struct.write(this.encoder, offset)
|
||||
this.written++
|
||||
}
|
||||
|
||||
toUint8Array () {
|
||||
this.flushCurr()
|
||||
|
||||
// this is a fresh encoder because we called flushCurr
|
||||
const restEncoder = this.encoder.restEncoder
|
||||
|
||||
/**
|
||||
* Now we put all the fragments together.
|
||||
* This works similarly to `writeClientsStructs`
|
||||
*/
|
||||
|
||||
// write # states that were updated - i.e. the clients
|
||||
encoding.writeVarUint(restEncoder, this.clientStructs.length)
|
||||
|
||||
for (let i = 0; i < this.clientStructs.length; i++) {
|
||||
const partStructs = this.clientStructs[i]
|
||||
/**
|
||||
* Works similarly to `writeStructs`
|
||||
*/
|
||||
// write # encoded structs
|
||||
encoding.writeVarUint(restEncoder, partStructs.written)
|
||||
// write the rest of the fragment
|
||||
encoding.writeUint8Array(restEncoder, partStructs.restEncoder)
|
||||
}
|
||||
return this.encoder.toUint8Array()
|
||||
/**
|
||||
* This method is intended to slice any kind of struct and retrieve the right part.
|
||||
* It does not handle side-effects, so it should only be used by the lazy-encoder.
|
||||
*
|
||||
* @param {Item | GC} left
|
||||
* @param {number} diff
|
||||
* @return {Item | GC}
|
||||
*/
|
||||
const sliceStruct = (left, diff) => {
|
||||
if (left.constructor === GC) {
|
||||
const { client, clock } = left.id
|
||||
return new GC(createID(client, clock + diff), left.length - diff)
|
||||
} else {
|
||||
const leftItem = /** @type {Item} */ (left)
|
||||
const { client, clock } = leftItem.id
|
||||
return new Item(
|
||||
createID(client, clock + diff),
|
||||
null,
|
||||
createID(client, clock + diff - 1),
|
||||
null,
|
||||
leftItem.rightOrigin,
|
||||
leftItem.parent,
|
||||
leftItem.parentSub,
|
||||
leftItem.content.splice(diff)
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* This function works similarly to `readUpdateV2`.
|
||||
*
|
||||
* @param {Array<Uint8Array>} updates
|
||||
* @param {typeof UpdateDecoderV1 | typeof UpdateDecoderV2} [YDecoder]
|
||||
* @param {typeof UpdateEncoderV1 | typeof UpdateEncoderV2} [YEncoder]
|
||||
* @return {Uint8Array}
|
||||
*/
|
||||
export const mergeUpdatesV2 = (updates, YDecoder = UpdateDecoderV2, YEncoder = UpdateEncoderV2) => {
|
||||
const updateDecoders = updates.map(update => new UpdateDecoderV1(decoding.createDecoder(update)))
|
||||
let lazyStructDecoders = updateDecoders.map(decoder => new LazyStructReader(decoder))
|
||||
|
||||
/**
|
||||
* @todo we don't need offset because we always slice before
|
||||
* @type {null | { struct: Item | GC, offset: number }}
|
||||
*/
|
||||
let currWrite = null
|
||||
|
||||
const updateEncoder = new YEncoder()
|
||||
// write structs lazily
|
||||
const lazyStructEncoder = new LazyStructWriter(updateEncoder)
|
||||
|
||||
// Note: We need to ensure that all lazyStructDecoders are fully consumed
|
||||
// Note: Should merge document updates whenever possible - even from different updates
|
||||
// Note: Should handle that some operations cannot be applied yet ()
|
||||
|
||||
while (true) {
|
||||
// Write higher clients first ⇒ sort by clientID & clock and remove decoders without content
|
||||
lazyStructDecoders = lazyStructDecoders.filter(dec => dec.curr !== null)
|
||||
lazyStructDecoders.sort(
|
||||
/** @type {function(any,any):number} */ (dec1, dec2) =>
|
||||
dec1.curr.id.client === dec2.curr.id.client
|
||||
? dec1.curr.id.clock - dec2.curr.id.clock
|
||||
: dec1.curr.id.client - dec2.curr.id.client
|
||||
)
|
||||
if (lazyStructDecoders.length === 0) {
|
||||
break
|
||||
}
|
||||
const currDecoder = lazyStructDecoders[0]
|
||||
// write from currDecoder until the next operation is from another client or if filler-struct
|
||||
// then we need to reorder the decoders and find the next operation to write
|
||||
const firstClient = /** @type {Item | GC} */ (currDecoder.curr).id.client
|
||||
if (currWrite !== null) {
|
||||
let curr = /** @type {Item | GC} */ (currDecoder.curr)
|
||||
if (firstClient !== currWrite.struct.id.client) {
|
||||
writeStructToLazyStructWriter(lazyStructEncoder, currWrite.struct, currWrite.offset)
|
||||
currWrite = { struct: curr, offset: 0 }
|
||||
currDecoder.next()
|
||||
} else if (currWrite.struct.id.clock + currWrite.struct.length < curr.id.clock) {
|
||||
// @todo write currStruct & set currStruct = Skip(clock = currStruct.id.clock + currStruct.length, length = curr.id.clock - self.clock)
|
||||
throw new Error('unhandled case') // @Todo !
|
||||
} else if (currWrite.struct.id.clock + currWrite.struct.length >= curr.id.clock) {
|
||||
const diff = currWrite.struct.id.clock + currWrite.struct.length - curr.id.clock
|
||||
if (diff > 0) {
|
||||
curr = sliceStruct(curr, diff)
|
||||
}
|
||||
if (!currWrite.struct.mergeWith(/** @type {any} */ (curr))) {
|
||||
writeStructToLazyStructWriter(lazyStructEncoder, currWrite.struct, currWrite.offset)
|
||||
currWrite = { struct: curr, offset: 0 }
|
||||
currDecoder.next()
|
||||
}
|
||||
}
|
||||
} else {
|
||||
currWrite = { struct: /** @type {Item | GC} */ (currDecoder.curr), offset: 0 }
|
||||
currDecoder.next()
|
||||
}
|
||||
for (
|
||||
let next = currDecoder.curr;
|
||||
next !== null && next.id.client === firstClient && next.id.clock === currWrite.struct.id.clock + currWrite.struct.length; // @Todo && next.constructor !== skippable
|
||||
next = currDecoder.next()
|
||||
) {
|
||||
writeStructToLazyStructWriter(lazyStructEncoder, currWrite.struct, currWrite.offset)
|
||||
currWrite = { struct: next, offset: 0 }
|
||||
}
|
||||
}
|
||||
finishLazyStructWriting(lazyStructEncoder)
|
||||
|
||||
// Read DeleteSets and merge them.
|
||||
// Write merged deleteset.
|
||||
// -- updateEncoder.writeDs()
|
||||
return updateEncoder.toUint8Array()
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {Uint8Array} update
|
||||
* @param {Uint8Array} sv
|
||||
*/
|
||||
export const diffUpdate = (update, sv) => {
|
||||
return update
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {LazyStructWriter} lazyWriter
|
||||
*/
|
||||
const flushLazyStructWriter = lazyWriter => {
|
||||
if (!lazyWriter.fresh) {
|
||||
lazyWriter.clientStructs.push({ written: lazyWriter.written, restEncoder: encoding.toUint8Array(lazyWriter.encoder.restEncoder) })
|
||||
lazyWriter.encoder.restEncoder = encoding.createEncoder()
|
||||
lazyWriter.fresh = true
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {LazyStructWriter} lazyWriter
|
||||
* @param {Item | GC} struct
|
||||
* @param {number} offset
|
||||
*/
|
||||
const writeStructToLazyStructWriter = (lazyWriter, struct, offset) => {
|
||||
// flush curr if we start another client
|
||||
if (!lazyWriter.fresh && lazyWriter.currClient !== struct.id.client) {
|
||||
flushLazyStructWriter(lazyWriter)
|
||||
lazyWriter.currClient = struct.id.client
|
||||
// write next client
|
||||
lazyWriter.encoder.writeClient(struct.id.client)
|
||||
// write startClock
|
||||
encoding.writeVarUint(lazyWriter.encoder.restEncoder, struct.id.clock)
|
||||
}
|
||||
struct.write(lazyWriter.encoder, offset)
|
||||
lazyWriter.written++
|
||||
}
|
||||
/**
|
||||
* Call this function when we collected all parts and want to
|
||||
* put all the parts together. After calling this method,
|
||||
* you can continue using the UpdateEncoder.
|
||||
*
|
||||
* @param {LazyStructWriter} lazyWriter
|
||||
*/
|
||||
const finishLazyStructWriting = (lazyWriter) => {
|
||||
flushLazyStructWriter(lazyWriter)
|
||||
|
||||
// this is a fresh encoder because we called flushCurr
|
||||
const restEncoder = lazyWriter.encoder.restEncoder
|
||||
|
||||
/**
|
||||
* Now we put all the fragments together.
|
||||
* This works similarly to `writeClientsStructs`
|
||||
*/
|
||||
|
||||
// write # states that were updated - i.e. the clients
|
||||
encoding.writeVarUint(restEncoder, lazyWriter.clientStructs.length)
|
||||
|
||||
for (let i = 0; i < lazyWriter.clientStructs.length; i++) {
|
||||
const partStructs = lazyWriter.clientStructs[i]
|
||||
/**
|
||||
* Works similarly to `writeStructs`
|
||||
*/
|
||||
// write # encoded structs
|
||||
encoding.writeVarUint(restEncoder, partStructs.written)
|
||||
// write the rest of the fragment
|
||||
encoding.writeUint8Array(restEncoder, partStructs.restEncoder)
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user