feat: add encodeStateAsStreamOfUpdates

This commit is contained in:
Jens Claes 2022-09-23 12:05:48 +02:00
parent 24d1e2984c
commit 4f7584195b
No known key found for this signature in database
GPG Key ID: 680C2A47F80C295D
3 changed files with 511 additions and 5 deletions

View File

@ -57,6 +57,7 @@ export {
readUpdate,
readUpdateV2,
encodeStateAsUpdate,
encodeStateAsStreamOfUpdates,
encodeStateAsUpdateV2,
encodeStateVector,
UndoManager,

View File

@ -49,13 +49,24 @@ import * as math from 'lib0/math'
/**
* @param {Array<GC|Item>} structs All structs by `client`
* @param {number} minClock write structs starting with `ID(client,minClock)`
* @param {number | null} maxClock write structs with clock < maxClock for client
*
* @function
*/
const getStructsToWrite = (structs, minClock) => {
const getStructsToWrite = (structs, minClock, maxClock = null) => {
minClock = math.max(minClock, structs[0].id.clock) // make sure the first id exists
const startNewStructs = findIndexSS(structs, minClock)
return structs.slice(startNewStructs)
const lastStruct = structs[structs.length - 1]
if (maxClock == null || maxClock > lastStruct.id.clock) {
return structs.slice(startNewStructs)
}
let endNewStructs = findIndexSS(structs, maxClock)
if (maxClock > structs[endNewStructs].id.clock) {
// We write the last fully, so we don't split it until maxClock
// Therefore we need to also include the last one
endNewStructs += 1
}
return structs.slice(startNewStructs, endNewStructs)
}
/**
@ -63,11 +74,12 @@ const getStructsToWrite = (structs, minClock) => {
* @param {Array<GC|Item>} structs All structs by `client`
* @param {number} client
* @param {number} clock write structs starting with `ID(client,clock)`
* @param {number | null} maxClock write structs with clock < maxClock `ID(client,clock)`
* @returns {number} the last clock written
*
* @function
*/
const writeStructs = (encoder, structs, client, clock) => {
// write first id
const writeStructs = (encoder, structs, client, clock, maxClock = null) => {
clock = math.max(clock, structs[0].id.clock) // make sure the first id exists
const newStructs = getStructsToWrite(structs, clock, maxClock)
// write # encoded structs
@ -80,6 +92,8 @@ const writeStructs = (encoder, structs, client, clock) => {
for (let i = 1; i < newStructs.length; i++) {
newStructs[i].write(encoder, 0)
}
const lastStruct = newStructs[newStructs.length - 1]
return lastStruct == null ? clock : lastStruct.id.clock + lastStruct.length
}
/**
@ -532,7 +546,84 @@ export const writeStateAsUpdate = (encoder, doc, targetStateVector = new Map())
}
/**
* Write all the document as a single update message that can be applied on the remote document. If you specify the state of the remote client (`targetState`) it will
* @param {Array<[number, number]>} clientClocks
* @return {Array<[number, number]>}
*
* @function
*/
const sortClientsLargestToSmallest = (clientClocks) => {
return clientClocks.sort((a, z) => z[0] - a[0])
}
/**
* Write the whole document as a stream of update messages. If you specify the state of the remote client (`targetStateVector`) it will only write the operations that are missing.
*
* @param {() => UpdateEncoderV1 | UpdateEncoderV2} getEncoder
* @param {Doc} doc
* @param {object} options
* @param {(client: number, clock: number, maxClock: number) => Iterable<number> | Generator<number, void, number>} [options.clockSplits] For this client, where would you like the updates to be splitted. If a clockSplit is in the middle of an item, it might return the full item and so this function doesn't split exactly on the given clockSplits. Use the injected clock to know where the last split happened
* @param {(clientClocks: Array<[number, number]>) => Array<[number, number]>} [options.sortClients] How to sort the clients. In general, it's better to sort with higher client ids first as this heavily improves the conflict algorithm. This is also the default implementation.
* @param {Map<number,number>} [targetStateVector] The state of the target that receives the update. Leave empty to write all known structs
* @return {Iterable<UpdateEncoderV1 | UpdateEncoderV2>}
*
* @generator
*/
export const writeStateAsStreamOfUpdates = function * (getEncoder, doc, options, targetStateVector = new Map()) {
const deleteEncoder = getEncoder()
// no updates / structs to write
encoding.writeVarUint(deleteEncoder.restEncoder, 0)
writeDeleteSet(deleteEncoder, createDeleteSetFromStructStore(doc.store))
yield deleteEncoder
const sm = getStatesToWrite(doc.store, targetStateVector)
const sortClients = options.sortClients ?? sortClientsLargestToSmallest
for (let [client, clock] of sortClients(Array.from(sm.entries()))) {
const lastClockClient = getState(doc.store, client)
/** @type {Array<GC | Item> | undefined} */
const structs = doc.store.clients.get(client)
if (structs == null) {
continue
}
if (options.clockSplits != null) {
const iterator = options.clockSplits(client, clock, lastClockClient)[Symbol.iterator]()
while (true) {
// @ts-expect-error clock is number and iterator expects no argument...
const clockSplit = iterator.next(clock)
if (clockSplit.done || clockSplit.value >= lastClockClient) {
break
}
if (clockSplit.value <= clock) {
continue
}
const encoder = getEncoder()
// 1 client has structs to write
encoding.writeVarUint(encoder.restEncoder, 1)
clock = writeStructs(encoder, structs, client, clock, clockSplit.value)
// no deletes to write
encoding.writeVarUint(encoder.restEncoder, 0)
yield encoder
}
}
if (clock < lastClockClient) {
const encoder = getEncoder()
// 1 client has structs to write
encoding.writeVarUint(encoder.restEncoder, 1)
clock = writeStructs(encoder, structs, client, clock)
// no deletes to write
encoding.writeVarUint(encoder.restEncoder, 0)
yield encoder
}
}
}
/**
* Write the document as a single update message that can be applied on the remote document. If you specify the state of the remote client (`targetState`) it will
* only write the operations that are missing.
*
* Use `writeStateAsUpdate` instead if you are working with lib0/encoding.js#Encoder
@ -579,6 +670,61 @@ export const encodeStateAsUpdateV2 = (doc, encodedTargetStateVector = new Uint8A
*/
export const encodeStateAsUpdate = (doc, encodedTargetStateVector) => encodeStateAsUpdateV2(doc, encodedTargetStateVector, new UpdateEncoderV1())
/**
* Write the whole document as a stream of update messages that can be applied on the remote document. If you specify the state of the remote client (`targetState`) it will only write the operations that are missing.
*
* Use `writeStateAsStreamOfUpdates` instead if you are working with lib0/encoding.js#Encoder
*
* @param {Doc} doc
* @param {object} options
* @param {(client: number, clock: number, maxClock: number) => Iterable<number> | Generator<number, void, number>} [options.clockSplits]
* @param {(clientClocks: Array<[number, number]>) => Array<[number, number]>} [options.sortClients]
* @param {Uint8Array} [encodedTargetStateVector] The state of the target that receives the update. Leave empty to write all known structs
* @param {() => UpdateEncoderV1 | UpdateEncoderV2} [getEncoder]
* @return {Iterable<Uint8Array>}
*
* @generator
*/
export const encodeStateAsStreamOfUpdatesV2 = function * (doc, options, encodedTargetStateVector = new Uint8Array([0]), getEncoder = () => new UpdateEncoderV2()) {
const targetStateVector = decodeStateVector(encodedTargetStateVector)
for (const encoder of writeStateAsStreamOfUpdates(getEncoder, doc, options, targetStateVector)) {
yield encoder.toUint8Array()
}
const updates = []
// also add the pending updates (if there are any)
if (doc.store.pendingDs) {
updates.push(doc.store.pendingDs)
}
if (doc.store.pendingStructs) {
updates.push(diffUpdateV2(doc.store.pendingStructs.update, encodedTargetStateVector))
}
if (updates.length > 0) {
const encoder = getEncoder()
if (encoder.constructor === UpdateEncoderV1) {
yield mergeUpdates(updates.map((update, i) => i === 0 ? update : convertUpdateFormatV2ToV1(update)))
} else if (encoder.constructor === UpdateEncoderV2) {
yield mergeUpdatesV2(updates)
}
}
}
/**
* Write the whole document as a stream of update messages that can be applied on the remote document. If you specify the state of the remote client (`targetState`) it will only write the operations that are missing.
*
* Use `writeStateAsStreamOfUpdates` instead if you are working with lib0/encoding.js#Encoder
*
* @param {Doc} doc
* @param {object} options
* @param {(client: number, clock: number, maxClock: number) => Iterable<number> | Generator<number, void, number>} [options.clockSplits]
* @param {(clientClocks: Array<[number, number]>) => Array<[number, number]>} [options.sortClients]
* @param {Uint8Array} [encodedTargetStateVector] The state of the target that receives the update. Leave empty to write all known structs
* @return {Iterable<Uint8Array>}
*
* @function
*/
export const encodeStateAsStreamOfUpdates = (doc, options, encodedTargetStateVector) => encodeStateAsStreamOfUpdatesV2(doc, options, encodedTargetStateVector, () => new UpdateEncoderV1())
/**
* Read state vector from Decoder and return as Map
*

View File

@ -106,3 +106,362 @@ export const testDiffStateVectorOfUpdateIgnoresSkips = tc => {
t.assert(state.get(ydoc.clientID) === 1)
t.assert(state.size === 1)
}
/** @function
* @param {number} x
*/
const splitClocksBy = (x) => {
/**
* @param {number} _client
* @param {number} clock
* @param {number} maxClock
*/
return function * (_client, clock, maxClock) {
while (clock < maxClock) {
clock = Math.min(clock + x, maxClock)
clock = yield clock
}
}
}
/**
* @param {t.TestCase} tc
*/
export const testEncodeStateAsUpdatesWithOneClient = tc => {
const yDoc = new Y.Doc()
const yText = yDoc.getText('textBlock')
yText.applyDelta([{ insert: 'r' }])
yText.applyDelta([{ insert: 'o' }])
yText.applyDelta([{ insert: 'n' }])
yText.applyDelta([{ insert: 'e' }])
yText.applyDelta([{ insert: 'n' }])
const updates = Array.from(Y.encodeStateAsStreamOfUpdates(yDoc, { clockSplits: splitClocksBy(1) }))
const yDocToAssert = new Y.Doc()
updates.forEach((update) => {
Y.applyUpdate(yDocToAssert, update)
})
t.compareStrings(yDocToAssert.getText('textBlock').toString(), 'nenor')
// yDoc did 5 updates
// 1 (empty) delete set
t.compare(6, updates.length)
}
/**
* @param {t.TestCase} tc
*/
export const testEncodeStateAsUpdatesWithTwoClients = tc => {
// Arrange
const yDoc = new Y.Doc()
const yText = yDoc.getText('textBlock')
yText.applyDelta([{ insert: 'r' }])
yText.applyDelta([{ insert: 'o' }])
yText.applyDelta([{ insert: 'n' }])
const remoteDoc = new Y.Doc()
Y.applyUpdate(remoteDoc, Y.encodeStateAsUpdate(yDoc))
remoteDoc.getText('textBlock').applyDelta([{ insert: 'e' }])
Y.applyUpdate(yDoc, Y.encodeStateAsUpdate(remoteDoc))
yText.applyDelta([{ insert: 'n' }])
// Act
const updates = Array.from(Y.encodeStateAsStreamOfUpdates(yDoc, { clockSplits: splitClocksBy(1) }))
// Assert
const yDocToAssert = new Y.Doc()
updates.forEach((update) => {
Y.applyUpdate(yDocToAssert, update)
})
t.compareStrings(yDocToAssert.getText('textBlock').toString(), 'nenor')
// yDoc did 3+1=4 updates
// remoteDoc did 1 update
// 1 (empty) delete set
t.compare(6, updates.length)
}
/**
* @param {t.TestCase} tc
*/
export const testEncodeStateAsUpdatesWithItemsOfLength2 = tc => {
// Arrange
const yDoc = new Y.Doc()
const yText = yDoc.getText('textBlock')
yText.applyDelta([{ insert: 'or' }])
yText.applyDelta([{ insert: 'n' }])
yText.applyDelta([{ insert: 'ne' }])
// Act
const updates = Array.from(Y.encodeStateAsStreamOfUpdates(yDoc, { clockSplits: splitClocksBy(1) }))
// Assert
// yDoc did 3 updates (ne will keep together, even if we use clockSplit of 1)
// 1 (empty) delete set
t.compare(3 + 1, updates.length)
const yDocToAssert = new Y.Doc()
Y.applyUpdate(yDocToAssert, updates[0]) // delete set
t.compareStrings(yDocToAssert.getText('textBlock').toString(), '')
t.compare(Y.getState(yDocToAssert.store, yDoc.clientID), 0)
Y.applyUpdate(yDocToAssert, updates[1]) // delete set
t.compareStrings(yDocToAssert.getText('textBlock').toString(), 'or')
t.compare(Y.getState(yDocToAssert.store, yDoc.clientID), 2)
Y.applyUpdate(yDocToAssert, updates[2]) // delete set
t.compareStrings(yDocToAssert.getText('textBlock').toString(), 'nor')
t.compare(Y.getState(yDocToAssert.store, yDoc.clientID), 3)
Y.applyUpdate(yDocToAssert, updates[3]) // delete set
t.compareStrings(yDocToAssert.getText('textBlock').toString(), 'nenor')
t.compare(Y.getState(yDocToAssert.store, yDoc.clientID), 5)
}
/**
* @param {t.TestCase} tc
*/
export const testEncodeStateAsUpdatesWithBadClockSplits = tc => {
const yDoc = new Y.Doc()
const yText = yDoc.getText('textBlock')
yText.applyDelta([{ insert: 'r' }])
yText.applyDelta([{ insert: 'o' }])
yText.applyDelta([{ insert: 'n' }])
yText.applyDelta([{ insert: 'e' }])
yText.applyDelta([{ insert: 'n' }])
const updates = Array.from(Y.encodeStateAsStreamOfUpdates(yDoc, {
clockSplits: function * (_client, clock, maxClock) {
clock = yield clock - 1
clock = yield clock + 1 // first message
clock = yield clock
clock = yield clock
clock = yield clock
clock = yield clock + 1 // second message
clock = yield maxClock + 100 // last message
}
}))
const yDocToAssert = new Y.Doc()
// Delete set message
Y.applyUpdate(yDocToAssert, updates[0])
t.compareStrings(yDocToAssert.getText('textBlock').toString(), '')
// first message
Y.applyUpdate(yDocToAssert, updates[1])
t.compareStrings(yDocToAssert.getText('textBlock').toString(), 'r')
// second message
Y.applyUpdate(yDocToAssert, updates[2])
t.compareStrings(yDocToAssert.getText('textBlock').toString(), 'or')
Y.applyUpdate(yDocToAssert, updates[3])
t.compareStrings(yDocToAssert.getText('textBlock').toString(), 'nenor')
t.compare(4, updates.length)
}
/**
* @param {t.TestCase} tc
*/
export const testEncodeStateAsUpdatesShouldRespectClockSplits = tc => {
// Arrange
const yDoc = new Y.Doc()
/**
* @type {Array<number>}
*/
const clockSplits = []
/**
* @type {Array<Uint8Array>}
*/
const expectedUpdates = []
yDoc.on('update', (update) => {
clockSplits.push(Y.getState(yDoc.store, yDoc.clientID))
expectedUpdates.push(update)
})
const cells = yDoc.getArray('cells')
const cell0 = new Y.Map()
cell0.set('id', new Y.Text('zero'))
cell0.set('source', new Y.Text('# Hello World'))
cells.push([cell0])
const cell1 = new Y.Map()
cell1.set('id', new Y.Text('one'))
cell1.set('source', new Y.Text('import pandas as pd'))
cells.push([cell1])
yDoc.transact(() => {
yDoc.getMap('meta').set('language', 'python')
yDoc.getMap('state').set('version', 3)
})
// Act
const streamOfUpdates = Y.encodeStateAsStreamOfUpdates(yDoc, {
clockSplits: () => clockSplits
})
// Assert
const yDocToAssert = new Y.Doc()
let i = -1
for (const update of streamOfUpdates) {
Y.applyUpdate(yDocToAssert, update)
if (i >= 0) { // i == -1 is the delete set message
t.compare(update, expectedUpdates[i], 'updates match')
t.compare(Y.getState(yDocToAssert.store, yDoc.clientID), clockSplits[i], 'correct clock afterwards')
}
i++
}
t.compare(yDocToAssert.getArray('cells').toJSON(), [
{ id: 'zero', source: '# Hello World' },
{ id: 'one', source: 'import pandas as pd' }
])
t.compare(yDocToAssert.getMap('meta').toJSON(), { language: 'python' })
t.compare(yDocToAssert.getMap('state').toJSON(), { version: 3 })
}
/**
* @param {t.TestCase} tc
*/
export const testEncodeStateAsUpdatesWithMaps = tc => {
// Arrange
const yDoc = new Y.Doc()
const yMap = yDoc.getMap('myMap')
yMap.set('foo', 'foo1')
yMap.set('bar', 'bar1')
yMap.set('quux', 'quux1')
yMap.set('bar', 'bar2')
const expectedMap = {
foo: 'foo1',
bar: 'bar2',
quux: 'quux1'
}
// Act
const updates = Array.from(Y.encodeStateAsStreamOfUpdates(yDoc, {
clockSplits: splitClocksBy(2)
}))
// Assert
t.compare(3, updates.length)
const yDocToAssert = new Y.Doc()
// Delete set message
Y.applyUpdate(yDocToAssert, updates[0])
t.compare(10, updates[0].length) // There is a delete set!
t.compare(0, updates[0][0]) // No updates by clients
t.compareObjects(yDocToAssert.getMap('myMap').toJSON(), {}, 'after update 1')
// First 2 updates
Y.applyUpdate(yDocToAssert, updates[1])
// bar is not here because the item is in the delete set
t.compareObjects(yDocToAssert.getMap('myMap').toJSON(), { foo: 'foo1' }, 'after update 2')
// Last 2 updates
Y.applyUpdate(yDocToAssert, updates[2])
t.compareObjects(yDocToAssert.getMap('myMap').toJSON(), { foo: 'foo1', bar: 'bar2', quux: 'quux1' }, 'after update 3')
t.compareObjects(yDocToAssert.getMap('myMap').toJSON(), expectedMap)
}
/**
* @param {t.TestCase} tc
*/
export const testEncodeStateAsUpdatesWithDifferentSortingAndEditsByClients = tc => {
// Arrange
const yNotebook = new Y.Doc()
/**
* @type {Array<number>}
*/
const clockSplits = []
yNotebook.on('update', (update) => {
clockSplits.push(Y.getState(yNotebook.store, yNotebook.clientID))
})
const cells = yNotebook.getArray('cells')
const cell0 = new Y.Map()
cell0.set('id', new Y.Text('zero'))
cell0.set('source', new Y.Text('# Hello World'))
cells.push([cell0])
const cell1 = new Y.Map()
cell1.set('id', new Y.Text('one'))
cell1.set('source', new Y.Text('import pandas as pd'))
cells.push([cell1])
const cell2 = new Y.Map()
cell2.set('id', new Y.Text('two'))
cell2.set('source', new Y.Text('# Conclusion'))
cells.push([cell2])
yNotebook.transact(() => {
yNotebook.getMap('meta').set('language', 'python')
yNotebook.getMap('state').set('version', 3)
})
const clientDoc = new Y.Doc()
Y.applyUpdate(clientDoc, Y.encodeStateAsUpdate(yNotebook))
const source = clientDoc.getArray('cells').get(1).get('source')
source.insert(source.length, '\nimport random')
t.compare(source.toString(), 'import pandas as pd\nimport random', 'clientDoc should have right code')
Y.applyUpdate(yNotebook, Y.encodeStateAsUpdate(clientDoc))
console.log('clockSplits', clockSplits, yNotebook.clientID)
const updates = Array.from(Y.encodeStateAsStreamOfUpdates(yNotebook, {
clockSplits: (client) => {
if (client === yNotebook.clientID) {
return clockSplits
}
return []
},
sortClients: clientClocks => {
return [
...clientClocks.filter(([client]) => client === clientDoc.clientID),
...clientClocks.filter(([client]) => client === yNotebook.clientID)
]
}
}))
const ydoc = new Y.Doc()
Y.applyUpdate(ydoc, updates[0]) // delete set
t.compare(ydoc.getArray('cells').toJSON(), [])
Y.applyUpdate(ydoc, updates[1]) // clientDoc updates
t.compare(ydoc.getArray('cells').toJSON(), [])
Y.applyUpdate(ydoc, updates[2]) // cell 0 initialized
t.compare(ydoc.getArray('cells').toJSON(), [{ id: 'zero', source: '# Hello World' }])
Y.applyUpdate(ydoc, updates[3]) // cell 1 initialized, immediately applies edits by clients
t.compare(ydoc.getArray('cells').toJSON(), [
{ id: 'zero', source: '# Hello World' },
{ id: 'one', source: 'import pandas as pd\nimport random' }
])
Y.applyUpdate(ydoc, updates[4]) // cell 2 initialized
t.compare(ydoc.getArray('cells').toJSON(), [
{ id: 'zero', source: '# Hello World' },
{ id: 'one', source: 'import pandas as pd\nimport random' },
{ id: 'two', source: '# Conclusion' }
])
Y.applyUpdate(ydoc, updates[5]) // metadata
t.compare(ydoc.getArray('cells').toJSON(), [
{ id: 'zero', source: '# Hello World' },
{ id: 'one', source: 'import pandas as pd\nimport random' },
{ id: 'two', source: '# Conclusion' }
])
t.compare(6, updates.length)
}