diff --git a/src/structs/AbstractItem.js b/src/structs/AbstractItem.js index f61a5484..75bd466e 100644 --- a/src/structs/AbstractItem.js +++ b/src/structs/AbstractItem.js @@ -421,15 +421,16 @@ export class AbstractItem extends AbstractStruct { } /** - * @param {Y} y + * @param {Transaction} transaction + * @param {StructStore} store */ - gcChildren (y) {} + gcChildren (transaction, store) { } /** - * @param {Y} y - * @return {GC|ItemDeleted} + * @param {Transaction} transaction + * @param {StructStore} store */ - gc (y) { + gc (transaction, store) { let r if (this.parent._item !== null && this.parent._item.deleted) { r = new GC(this.id, this.length) @@ -449,8 +450,8 @@ export class AbstractItem extends AbstractStruct { } } } - replaceStruct(y.store, this, r) - return r + replaceStruct(store, this, r) + transaction._replacedItems.add(r) } /** @@ -584,6 +585,10 @@ export const changeItemRefOffset = (item, offset) => { */ export const computeItemParams = (y, store, leftid, rightid, parentid, parentSub, parentYKey) => { const left = leftid === null ? null : getItemCleanEnd(store, leftid) + if (left !== null && left.constructor !== GC && left.right !== null && left.right.id.client === left.id.client && left.right.id.clock === left.id.clock + left.length) { + // we split a merged op, we may need to merge it again after the transaction + y.transaction._replacedItems.add(left) + } const right = rightid === null ? null : getItemCleanStart(store, rightid) let parent = null if (parentid !== null) { diff --git a/src/structs/GC.js b/src/structs/GC.js index f79b3ecd..a2af4af1 100644 --- a/src/structs/GC.js +++ b/src/structs/GC.js @@ -5,7 +5,6 @@ import { AbstractRef, AbstractStruct, createID, - writeID, addStruct, Y, StructStore, Transaction, ID // eslint-disable-line } from '../internals.js' @@ -58,12 +57,7 @@ export class GC extends AbstractStruct { */ write (encoder, offset) { encoding.writeUint8(encoder, structGCRefNumber) - if (offset === 0) { - writeID(encoder, this.id) - } else { - writeID(encoder, createID(this.id.client, this.id.clock + offset)) - } - encoding.writeVarUint(encoder, this._len) + encoding.writeVarUint(encoder, this._len - offset) } } diff --git a/src/structs/ItemType.js b/src/structs/ItemType.js index ea30a6c7..2e546c43 100644 --- a/src/structs/ItemType.js +++ b/src/structs/ItemType.js @@ -22,12 +22,13 @@ import * as encoding from 'lib0/encoding.js' // eslint-disable-line import * as decoding from 'lib0/decoding.js' /** - * @param {Y} y + * @param {Transaction} transaction + * @param {StructStore} store * @param {AbstractItem | null} item */ -const gcChildren = (y, item) => { +const gcChildren = (transaction, store, item) => { while (item !== null) { - item.gc(y) + item.gc(transaction, store) item = item.right } } @@ -109,7 +110,6 @@ export class ItemType extends AbstractItem { * @private */ delete (transaction) { - const y = transaction.y super.delete(transaction) transaction.changed.delete(this.type) transaction.changedParentTypes.delete(this.type) @@ -128,29 +128,30 @@ export class ItemType extends AbstractItem { t = t.right } if (gcChildren) { - this.gcChildren(y) + this.gcChildren(transaction, transaction.y.store) } } /** - * @param {Y} y + * @param {Transaction} transaction + * @param {StructStore} store */ - gcChildren (y) { - gcChildren(y, this.type._start) + gcChildren (transaction, store) { + gcChildren(transaction, store, this.type._start) this.type._start = null this.type._map.forEach(item => { - gcChildren(y, item) + gcChildren(transaction, store, item) }) this._map = new Map() } /** - * @param {Y} y - * @return {ItemDeleted|GC} + * @param {Transaction} transaction + * @param {StructStore} store */ - gc (y) { - this.gcChildren(y) - return super.gc(y) + gc (transaction, store) { + super.gc(transaction, store) + this.gcChildren(transaction, store) } } diff --git a/src/utils/DeleteSet.js b/src/utils/DeleteSet.js index b1f437af..2940ea7b 100644 --- a/src/utils/DeleteSet.js +++ b/src/utils/DeleteSet.js @@ -85,12 +85,13 @@ export const isDeleted = (ds, id) => { export const sortAndMergeDeleteSet = ds => { ds.clients.forEach(dels => { dels.sort((a, b) => a.clock - b.clock) + // merge items without filtering or splicing the array // i is the current pointer // j refers to the current insert position for the pointed item - // try to merge dels[i] with dels[i-1] + // try to merge dels[i] into dels[j-1] or set dels[j]=dels[i] let i, j for (i = 1, j = 1; i < dels.length; i++) { - const left = dels[i - 1] + const left = dels[j - 1] const right = dels[i] if (left.clock + left.len === right.clock) { left.len += right.len @@ -131,7 +132,7 @@ export const createDeleteSetFromStructStore = ss => { const clock = struct.id.clock let len = struct.length if (i + 1 < structs.length) { - for (let next = structs[i + 1]; i + 1 < structs.length && next.id.clock === clock + len; i++) { + for (let next = structs[i + 1]; i + 1 < structs.length && next.id.clock === clock + len && next.deleted; next = structs[++i + 1]) { len += next.length } } @@ -210,7 +211,7 @@ export const readDeleteSet = (decoder, transaction, store) => { } } } else { - addToDeleteSet(unappliedDS, createID(client, state), len) + addToDeleteSet(unappliedDS, createID(client, clock), len) } } } diff --git a/src/utils/StructStore.js b/src/utils/StructStore.js index bb42a6f0..91865b85 100644 --- a/src/utils/StructStore.js +++ b/src/utils/StructStore.js @@ -1,5 +1,6 @@ import { + GC, AbstractRef, ID, ItemType, AbstractItem, AbstractStruct // eslint-disable-line } from '../internals.js' @@ -173,7 +174,7 @@ export const getItemCleanStart = (store, id) => { * @type {AbstractItem} */ let struct = structs[index] - if (struct.id.clock < id.clock) { + if (struct.id.clock < id.clock && struct.constructor !== GC) { struct = struct.splitAt(store, id.clock - struct.id.clock) structs.splice(index + 1, 0, struct) } @@ -196,7 +197,7 @@ export const getItemCleanEnd = (store, id) => { const structs = store.clients.get(id.client) const index = findIndexSS(structs, id.clock) const struct = structs[index] - if (id.clock !== struct.id.clock + struct.length - 1) { + if (id.clock !== struct.id.clock + struct.length - 1 && struct.constructor !== GC) { structs.splice(index + 1, 0, struct.splitAt(store, id.clock - struct.id.clock + 1)) } return struct diff --git a/src/utils/Transaction.js b/src/utils/Transaction.js index 66a04fb7..0076484c 100644 --- a/src/utils/Transaction.js +++ b/src/utils/Transaction.js @@ -10,10 +10,11 @@ import { DeleteSet, sortAndMergeDeleteSet, getStates, - AbstractType, AbstractItem, YEvent, ItemType, Y // eslint-disable-line + AbstractType, AbstractStruct, YEvent, Y // eslint-disable-line } from '../internals.js' import * as encoding from 'lib0/encoding.js' +import * as map from 'lib0/map.js' /** * A transaction is created for every change on the Yjs model. It is possible @@ -79,12 +80,17 @@ export class Transaction { * @type {encoding.Encoder|null} */ this._updateMessage = null + /** + * @type {Set} + */ + this._replacedItems = new Set() } /** - * @type {encoding.Encoder} + * @type {encoding.Encoder|null} */ get updateMessage () { - if (this._updateMessage === null) { + // only create if content was added in transaction (state or ds changed) + if (this._updateMessage === null && (this.deleteSet.clients.size > 0 || map.any(this.afterState, (clock, client) => this.beforeState.get(client) !== clock))) { const encoder = encoding.createEncoder() sortAndMergeDeleteSet(this.deleteSet) writeStructsFromTransaction(encoder, this) diff --git a/src/utils/Y.js b/src/utils/Y.js index 37c724d7..e9a4e700 100644 --- a/src/utils/Y.js +++ b/src/utils/Y.js @@ -97,88 +97,82 @@ export class Y extends Observable { callEventHandlerListeners(type._dEH, [events, transaction]) }) // only call afterTransaction listeners if anything changed - const transactionChangedContent = transaction.changedParentTypes.size > 0 || transaction.deleteSet.clients.size > 0 - if (transactionChangedContent) { - transaction.afterState = getStates(transaction.y.store) - // when all changes & events are processed, emit afterTransaction event - // transaction cleanup - const store = transaction.y.store - const ds = transaction.deleteSet - // replace deleted items with ItemDeleted / GC - sortAndMergeDeleteSet(ds) - this.emit('afterTransaction', [this, transaction]) + transaction.afterState = getStates(transaction.y.store) + // when all changes & events are processed, emit afterTransaction event + // transaction cleanup + const store = transaction.y.store + const ds = transaction.deleteSet + // replace deleted items with ItemDeleted / GC + sortAndMergeDeleteSet(ds) + this.emit('afterTransaction', [this, transaction]) + for (const [client, deleteItems] of ds.clients) { /** - * @type {Set} + * @type {Array} */ - const replacedItems = new Set() - for (const [client, deleteItems] of ds.clients) { - /** - * @type {Array} - */ - // @ts-ignore - const structs = store.clients.get(client) - for (let di = 0; di < deleteItems.length; di++) { - const deleteItem = deleteItems[di] - for (let si = findIndexSS(structs, deleteItem.clock); si < structs.length; si++) { - const struct = structs[si] - if (deleteItem.clock + deleteItem.len < struct.id.clock) { - break - } - if (struct.deleted && struct instanceof AbstractItem && (struct.constructor !== ItemDeleted || (struct.parent._item !== null && struct.parent._item.deleted))) { - // check if we can GC - replacedItems.add(struct.gc(this)) - } + // @ts-ignore + const structs = store.clients.get(client) + for (let di = 0; di < deleteItems.length; di++) { + const deleteItem = deleteItems[di] + for (let si = findIndexSS(structs, deleteItem.clock); si < structs.length; si++) { + const struct = structs[si] + if (deleteItem.clock + deleteItem.len < struct.id.clock) { + break + } + if (struct.deleted && struct instanceof AbstractItem && (struct.constructor !== ItemDeleted || (struct.parent._item !== null && struct.parent._item.deleted))) { + // check if we can GC + struct.gc(transaction, store) } } } - /** - * @param {Array} structs - * @param {number} pos - */ - const tryToMergeWithLeft = (structs, pos) => { - const left = structs[pos - 1] - const right = structs[pos] - if (left.deleted === right.deleted && left.constructor === right.constructor) { - if (left.mergeWith(right)) { - structs.splice(pos, 1) - } - } - } - // on all affected store.clients props, try to merge - for (const [client, clock] of transaction.beforeState) { - if (transaction.afterState.get(client) !== clock) { - /** - * @type {Array} - */ - // @ts-ignore - const structs = store.clients.get(client) - // we iterate from right to left so we can safely remove entries - const firstChangePos = math.max(findIndexSS(structs, clock), 1) - for (let i = structs.length - 1; i >= firstChangePos; i--) { - tryToMergeWithLeft(structs, i) - } - } - } - // try to merge replacedItems - for (const replacedItem of replacedItems) { - const id = replacedItem.id - const client = id.client - const clock = id.clock - /** - * @type {Array} - */ - // @ts-ignore - const structs = store.clients.get(client) - const replacedStructPos = findIndexSS(structs, clock) - if (replacedStructPos + 1 < structs.length) { - tryToMergeWithLeft(structs, replacedStructPos + 1) - } - if (replacedStructPos > 0) { - tryToMergeWithLeft(structs, replacedStructPos) - } - } - this.emit('afterTransactionCleanup', [this, transaction]) } + /** + * @param {Array} structs + * @param {number} pos + */ + const tryToMergeWithLeft = (structs, pos) => { + const left = structs[pos - 1] + const right = structs[pos] + if (left.deleted === right.deleted && left.constructor === right.constructor) { + if (left.mergeWith(right)) { + structs.splice(pos, 1) + } + } + } + // on all affected store.clients props, try to merge + for (const [client, clock] of transaction.afterState) { + const beforeClock = transaction.beforeState.get(client) || 0 + if (beforeClock !== clock) { + /** + * @type {Array} + */ + // @ts-ignore + const structs = store.clients.get(client) + // we iterate from right to left so we can safely remove entries + const firstChangePos = math.max(findIndexSS(structs, beforeClock), 1) + for (let i = structs.length - 1; i >= firstChangePos; i--) { + tryToMergeWithLeft(structs, i) + } + } + } + // try to merge replacedItems + for (const replacedItem of transaction._replacedItems) { + const id = replacedItem.id + const client = id.client + const clock = id.clock + /** + * @type {Array} + */ + // @ts-ignore + const structs = store.clients.get(client) + const replacedStructPos = findIndexSS(structs, clock) + if (replacedStructPos + 1 < structs.length) { + tryToMergeWithLeft(structs, replacedStructPos + 1) + } + if (replacedStructPos > 0) { + tryToMergeWithLeft(structs, replacedStructPos) + } + } + this.emit('afterTransactionCleanup', [this, transaction]) } } } diff --git a/src/utils/structEncoding.js b/src/utils/structEncoding.js index 2cc89006..0e4edc5d 100644 --- a/src/utils/structEncoding.js +++ b/src/utils/structEncoding.js @@ -179,13 +179,13 @@ const readStructReaders = (decoder, localState) => { * @param {Map>} structReaders * @param {Array} stack Stack of pending structs waiting for struct dependencies. * Maximum length of stack is structReaders.size. + * @param {IterableIterator>} structReaderIterator + * @param {IteratorResult>} structReaderIteratorResult * * @todo reimplement without iterators - read everything in arrays instead */ -const execStructReaders = (transaction, store, localState, structReaders, stack) => { +const execStructReaders = (transaction, store, localState, structReaders, stack, structReaderIterator, structReaderIteratorResult) => { // iterate over all struct readers until we are done - const structReaderIterator = structReaders.values() - let structReaderIteratorResult = structReaderIterator.next() while (stack.length !== 0 || !structReaderIteratorResult.done) { if (stack.length === 0) { // stack is empty. We know that there there are more structReaders to be processed @@ -208,7 +208,7 @@ const execStructReaders = (transaction, store, localState, structReaders, stack) if (nextRef === undefined) { // This update message causally depends on another update message. // Store current stack and readers in StructStore and resume the computation at another time - store.pendingStructReaders.add({ stack, structReaders, missing }) + store.pendingStructReaders.add({ stack, structReaders, missing, structReaderIterator, structReaderIteratorResult }) return } stack.push(nextRef) @@ -220,6 +220,12 @@ const execStructReaders = (transaction, store, localState, structReaders, stack) const localClock = (localState.get(ref.id.client) || 0) const offset = ref.id.clock < localClock ? localClock - ref.id.clock : 0 if (offset < ref.length) { + if (ref.id.clock + offset !== localClock) { + // A previous message from this client is missing + // Store current stack and readers in StructStore and resume the computation at another time + store.pendingStructReaders.add({ stack, structReaders, missing: createID(ref.id.client, localClock), structReaderIterator, structReaderIteratorResult }) + return + } ref.toStruct(transaction.y, store, offset).integrate(transaction) } stack.pop() @@ -227,7 +233,7 @@ const execStructReaders = (transaction, store, localState, structReaders, stack) } } if (stack.length > 0) { - store.pendingStructReaders.add({ stack, structReaders, missing: stack[stack.length - 1].id }) + store.pendingStructReaders.add({ stack, structReaders, missing: stack[stack.length - 1].id, structReaderIterator, structReaderIteratorResult }) } } @@ -247,7 +253,7 @@ const tryResumePendingStructReaders = (transaction, store) => { if (exists(store, pendingReader.missing)) { resume = true // found at least one more reader to execute pendingReaders.delete(pendingReader) - execStructReaders(transaction, store, getStates(store), pendingReader.structReaders, pendingReader.stack) + execStructReaders(transaction, store, getStates(store), pendingReader.structReaders, pendingReader.stack, pendingReader.structReaderIterator, pendingReader.structReaderIteratorResult) } } } @@ -279,7 +285,8 @@ export const tryResumePendingDeleteReaders = (transaction, store) => { export const readStructs = (decoder, transaction, store) => { const localState = getStates(store) const readers = readStructReaders(decoder, localState) - execStructReaders(transaction, store, localState, readers, []) + const structReaderIterator = readers.values() + execStructReaders(transaction, store, localState, readers, [], structReaderIterator, structReaderIterator.next()) tryResumePendingStructReaders(transaction, store) tryResumePendingDeleteReaders(transaction, store) } diff --git a/tests/testHelper.js b/tests/testHelper.js index 19fea109..ab11a8a7 100644 --- a/tests/testHelper.js +++ b/tests/testHelper.js @@ -20,9 +20,12 @@ import * as syncProtocol from 'y-protocols/sync.js' */ const afterTransaction = (y, transaction) => { y.mMux(() => { - const encoder = encoding.createEncoder() - syncProtocol.writeUpdate(encoder, transaction.updateMessage) - broadcastMessage(y, encoding.toBuffer(encoder)) + const m = transaction.updateMessage + if (m !== null) { + const encoder = encoding.createEncoder() + syncProtocol.writeUpdate(encoder, m) + broadcastMessage(y, encoding.toBuffer(encoder)) + } }) } diff --git a/tests/y-array.tests.js b/tests/y-array.tests.js index aa50bdef..abb29143 100644 --- a/tests/y-array.tests.js +++ b/tests/y-array.tests.js @@ -366,7 +366,6 @@ export const testRepeatGeneratingYarrayTests300 = tc => { * @param {t.TestCase} tc */ export const testRepeatGeneratingYarrayTests400 = tc => { - t.skip(!t.production) applyRandomTests(tc, arrayTransactions, 400) } @@ -374,7 +373,6 @@ export const testRepeatGeneratingYarrayTests400 = tc => { * @param {t.TestCase} tc */ export const testRepeatGeneratingYarrayTests500 = tc => { - t.skip(!t.production) applyRandomTests(tc, arrayTransactions, 500) } @@ -382,7 +380,6 @@ export const testRepeatGeneratingYarrayTests500 = tc => { * @param {t.TestCase} tc */ export const testRepeatGeneratingYarrayTests600 = tc => { - t.skip(!t.production) applyRandomTests(tc, arrayTransactions, 600) } @@ -390,7 +387,6 @@ export const testRepeatGeneratingYarrayTests600 = tc => { * @param {t.TestCase} tc */ export const testRepeatGeneratingYarrayTests1000 = tc => { - t.skip(!t.production) applyRandomTests(tc, arrayTransactions, 1000) } @@ -398,7 +394,6 @@ export const testRepeatGeneratingYarrayTests1000 = tc => { * @param {t.TestCase} tc */ export const testRepeatGeneratingYarrayTests1800 = tc => { - t.skip(!t.production) applyRandomTests(tc, arrayTransactions, 1800) } @@ -406,6 +401,13 @@ export const testRepeatGeneratingYarrayTests1800 = tc => { * @param {t.TestCase} tc */ export const testRepeatGeneratingYarrayTests10000 = tc => { - t.skip(!t.production) applyRandomTests(tc, arrayTransactions, 10000) } + +/** + * @param {t.TestCase} tc + */ +export const testRepeatGeneratingYarrayTests30000 = tc => { + t.skip(!t.production) + applyRandomTests(tc, arrayTransactions, 30000) +} diff --git a/tests/y-map.tests.js b/tests/y-map.tests.js index 20abb0e4..873c43f5 100644 --- a/tests/y-map.tests.js +++ b/tests/y-map.tests.js @@ -347,8 +347,8 @@ const mapTransactions = [ /** * @param {t.TestCase} tc */ -export const testRepeatGeneratingYmapTests20 = tc => { - applyRandomTests(tc, mapTransactions, 8) +export const testRepeatGeneratingYmapTests10 = tc => { + applyRandomTests(tc, mapTransactions, 10) } /** @@ -404,7 +404,6 @@ export const testRepeatGeneratingYmapTests300 = tc => { * @param {t.TestCase} tc */ export const testRepeatGeneratingYmapTests400 = tc => { - t.skip(!t.production) applyRandomTests(tc, mapTransactions, 400) } @@ -412,7 +411,6 @@ export const testRepeatGeneratingYmapTests400 = tc => { * @param {t.TestCase} tc */ export const testRepeatGeneratingYmapTests500 = tc => { - t.skip(!t.production) applyRandomTests(tc, mapTransactions, 500) } @@ -420,7 +418,6 @@ export const testRepeatGeneratingYmapTests500 = tc => { * @param {t.TestCase} tc */ export const testRepeatGeneratingYmapTests600 = tc => { - t.skip(!t.production) applyRandomTests(tc, mapTransactions, 600) } @@ -428,7 +425,6 @@ export const testRepeatGeneratingYmapTests600 = tc => { * @param {t.TestCase} tc */ export const testRepeatGeneratingYmapTests1000 = tc => { - t.skip(!t.production) applyRandomTests(tc, mapTransactions, 1000) } @@ -436,7 +432,6 @@ export const testRepeatGeneratingYmapTests1000 = tc => { * @param {t.TestCase} tc */ export const testRepeatGeneratingYmapTests1800 = tc => { - t.skip(!t.production) applyRandomTests(tc, mapTransactions, 1800) } @@ -444,6 +439,13 @@ export const testRepeatGeneratingYmapTests1800 = tc => { * @param {t.TestCase} tc */ export const testRepeatGeneratingYmapTests10000 = tc => { - t.skip(!t.production) applyRandomTests(tc, mapTransactions, 10000) } + +/** + * @param {t.TestCase} tc + */ +export const testRepeatGeneratingYmapTests100000 = tc => { + t.skip(!t.production) + applyRandomTests(tc, mapTransactions, 100000) +}