fix remaining random tests
This commit is contained in:
@@ -85,12 +85,13 @@ export const isDeleted = (ds, id) => {
|
||||
export const sortAndMergeDeleteSet = ds => {
|
||||
ds.clients.forEach(dels => {
|
||||
dels.sort((a, b) => a.clock - b.clock)
|
||||
// merge items without filtering or splicing the array
|
||||
// i is the current pointer
|
||||
// j refers to the current insert position for the pointed item
|
||||
// try to merge dels[i] with dels[i-1]
|
||||
// try to merge dels[i] into dels[j-1] or set dels[j]=dels[i]
|
||||
let i, j
|
||||
for (i = 1, j = 1; i < dels.length; i++) {
|
||||
const left = dels[i - 1]
|
||||
const left = dels[j - 1]
|
||||
const right = dels[i]
|
||||
if (left.clock + left.len === right.clock) {
|
||||
left.len += right.len
|
||||
@@ -131,7 +132,7 @@ export const createDeleteSetFromStructStore = ss => {
|
||||
const clock = struct.id.clock
|
||||
let len = struct.length
|
||||
if (i + 1 < structs.length) {
|
||||
for (let next = structs[i + 1]; i + 1 < structs.length && next.id.clock === clock + len; i++) {
|
||||
for (let next = structs[i + 1]; i + 1 < structs.length && next.id.clock === clock + len && next.deleted; next = structs[++i + 1]) {
|
||||
len += next.length
|
||||
}
|
||||
}
|
||||
@@ -210,7 +211,7 @@ export const readDeleteSet = (decoder, transaction, store) => {
|
||||
}
|
||||
}
|
||||
} else {
|
||||
addToDeleteSet(unappliedDS, createID(client, state), len)
|
||||
addToDeleteSet(unappliedDS, createID(client, clock), len)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
|
||||
import {
|
||||
GC,
|
||||
AbstractRef, ID, ItemType, AbstractItem, AbstractStruct // eslint-disable-line
|
||||
} from '../internals.js'
|
||||
|
||||
@@ -173,7 +174,7 @@ export const getItemCleanStart = (store, id) => {
|
||||
* @type {AbstractItem}
|
||||
*/
|
||||
let struct = structs[index]
|
||||
if (struct.id.clock < id.clock) {
|
||||
if (struct.id.clock < id.clock && struct.constructor !== GC) {
|
||||
struct = struct.splitAt(store, id.clock - struct.id.clock)
|
||||
structs.splice(index + 1, 0, struct)
|
||||
}
|
||||
@@ -196,7 +197,7 @@ export const getItemCleanEnd = (store, id) => {
|
||||
const structs = store.clients.get(id.client)
|
||||
const index = findIndexSS(structs, id.clock)
|
||||
const struct = structs[index]
|
||||
if (id.clock !== struct.id.clock + struct.length - 1) {
|
||||
if (id.clock !== struct.id.clock + struct.length - 1 && struct.constructor !== GC) {
|
||||
structs.splice(index + 1, 0, struct.splitAt(store, id.clock - struct.id.clock + 1))
|
||||
}
|
||||
return struct
|
||||
|
||||
@@ -10,10 +10,11 @@ import {
|
||||
DeleteSet,
|
||||
sortAndMergeDeleteSet,
|
||||
getStates,
|
||||
AbstractType, AbstractItem, YEvent, ItemType, Y // eslint-disable-line
|
||||
AbstractType, AbstractStruct, YEvent, Y // eslint-disable-line
|
||||
} from '../internals.js'
|
||||
|
||||
import * as encoding from 'lib0/encoding.js'
|
||||
import * as map from 'lib0/map.js'
|
||||
|
||||
/**
|
||||
* A transaction is created for every change on the Yjs model. It is possible
|
||||
@@ -79,12 +80,17 @@ export class Transaction {
|
||||
* @type {encoding.Encoder|null}
|
||||
*/
|
||||
this._updateMessage = null
|
||||
/**
|
||||
* @type {Set<AbstractStruct>}
|
||||
*/
|
||||
this._replacedItems = new Set()
|
||||
}
|
||||
/**
|
||||
* @type {encoding.Encoder}
|
||||
* @type {encoding.Encoder|null}
|
||||
*/
|
||||
get updateMessage () {
|
||||
if (this._updateMessage === null) {
|
||||
// only create if content was added in transaction (state or ds changed)
|
||||
if (this._updateMessage === null && (this.deleteSet.clients.size > 0 || map.any(this.afterState, (clock, client) => this.beforeState.get(client) !== clock))) {
|
||||
const encoder = encoding.createEncoder()
|
||||
sortAndMergeDeleteSet(this.deleteSet)
|
||||
writeStructsFromTransaction(encoder, this)
|
||||
|
||||
146
src/utils/Y.js
146
src/utils/Y.js
@@ -97,88 +97,82 @@ export class Y extends Observable {
|
||||
callEventHandlerListeners(type._dEH, [events, transaction])
|
||||
})
|
||||
// only call afterTransaction listeners if anything changed
|
||||
const transactionChangedContent = transaction.changedParentTypes.size > 0 || transaction.deleteSet.clients.size > 0
|
||||
if (transactionChangedContent) {
|
||||
transaction.afterState = getStates(transaction.y.store)
|
||||
// when all changes & events are processed, emit afterTransaction event
|
||||
// transaction cleanup
|
||||
const store = transaction.y.store
|
||||
const ds = transaction.deleteSet
|
||||
// replace deleted items with ItemDeleted / GC
|
||||
sortAndMergeDeleteSet(ds)
|
||||
this.emit('afterTransaction', [this, transaction])
|
||||
transaction.afterState = getStates(transaction.y.store)
|
||||
// when all changes & events are processed, emit afterTransaction event
|
||||
// transaction cleanup
|
||||
const store = transaction.y.store
|
||||
const ds = transaction.deleteSet
|
||||
// replace deleted items with ItemDeleted / GC
|
||||
sortAndMergeDeleteSet(ds)
|
||||
this.emit('afterTransaction', [this, transaction])
|
||||
for (const [client, deleteItems] of ds.clients) {
|
||||
/**
|
||||
* @type {Set<ItemDeleted|GC>}
|
||||
* @type {Array<AbstractStruct>}
|
||||
*/
|
||||
const replacedItems = new Set()
|
||||
for (const [client, deleteItems] of ds.clients) {
|
||||
/**
|
||||
* @type {Array<AbstractStruct>}
|
||||
*/
|
||||
// @ts-ignore
|
||||
const structs = store.clients.get(client)
|
||||
for (let di = 0; di < deleteItems.length; di++) {
|
||||
const deleteItem = deleteItems[di]
|
||||
for (let si = findIndexSS(structs, deleteItem.clock); si < structs.length; si++) {
|
||||
const struct = structs[si]
|
||||
if (deleteItem.clock + deleteItem.len < struct.id.clock) {
|
||||
break
|
||||
}
|
||||
if (struct.deleted && struct instanceof AbstractItem && (struct.constructor !== ItemDeleted || (struct.parent._item !== null && struct.parent._item.deleted))) {
|
||||
// check if we can GC
|
||||
replacedItems.add(struct.gc(this))
|
||||
}
|
||||
// @ts-ignore
|
||||
const structs = store.clients.get(client)
|
||||
for (let di = 0; di < deleteItems.length; di++) {
|
||||
const deleteItem = deleteItems[di]
|
||||
for (let si = findIndexSS(structs, deleteItem.clock); si < structs.length; si++) {
|
||||
const struct = structs[si]
|
||||
if (deleteItem.clock + deleteItem.len < struct.id.clock) {
|
||||
break
|
||||
}
|
||||
if (struct.deleted && struct instanceof AbstractItem && (struct.constructor !== ItemDeleted || (struct.parent._item !== null && struct.parent._item.deleted))) {
|
||||
// check if we can GC
|
||||
struct.gc(transaction, store)
|
||||
}
|
||||
}
|
||||
}
|
||||
/**
|
||||
* @param {Array<AbstractStruct>} structs
|
||||
* @param {number} pos
|
||||
*/
|
||||
const tryToMergeWithLeft = (structs, pos) => {
|
||||
const left = structs[pos - 1]
|
||||
const right = structs[pos]
|
||||
if (left.deleted === right.deleted && left.constructor === right.constructor) {
|
||||
if (left.mergeWith(right)) {
|
||||
structs.splice(pos, 1)
|
||||
}
|
||||
}
|
||||
}
|
||||
// on all affected store.clients props, try to merge
|
||||
for (const [client, clock] of transaction.beforeState) {
|
||||
if (transaction.afterState.get(client) !== clock) {
|
||||
/**
|
||||
* @type {Array<AbstractStruct>}
|
||||
*/
|
||||
// @ts-ignore
|
||||
const structs = store.clients.get(client)
|
||||
// we iterate from right to left so we can safely remove entries
|
||||
const firstChangePos = math.max(findIndexSS(structs, clock), 1)
|
||||
for (let i = structs.length - 1; i >= firstChangePos; i--) {
|
||||
tryToMergeWithLeft(structs, i)
|
||||
}
|
||||
}
|
||||
}
|
||||
// try to merge replacedItems
|
||||
for (const replacedItem of replacedItems) {
|
||||
const id = replacedItem.id
|
||||
const client = id.client
|
||||
const clock = id.clock
|
||||
/**
|
||||
* @type {Array<AbstractStruct>}
|
||||
*/
|
||||
// @ts-ignore
|
||||
const structs = store.clients.get(client)
|
||||
const replacedStructPos = findIndexSS(structs, clock)
|
||||
if (replacedStructPos + 1 < structs.length) {
|
||||
tryToMergeWithLeft(structs, replacedStructPos + 1)
|
||||
}
|
||||
if (replacedStructPos > 0) {
|
||||
tryToMergeWithLeft(structs, replacedStructPos)
|
||||
}
|
||||
}
|
||||
this.emit('afterTransactionCleanup', [this, transaction])
|
||||
}
|
||||
/**
|
||||
* @param {Array<AbstractStruct>} structs
|
||||
* @param {number} pos
|
||||
*/
|
||||
const tryToMergeWithLeft = (structs, pos) => {
|
||||
const left = structs[pos - 1]
|
||||
const right = structs[pos]
|
||||
if (left.deleted === right.deleted && left.constructor === right.constructor) {
|
||||
if (left.mergeWith(right)) {
|
||||
structs.splice(pos, 1)
|
||||
}
|
||||
}
|
||||
}
|
||||
// on all affected store.clients props, try to merge
|
||||
for (const [client, clock] of transaction.afterState) {
|
||||
const beforeClock = transaction.beforeState.get(client) || 0
|
||||
if (beforeClock !== clock) {
|
||||
/**
|
||||
* @type {Array<AbstractStruct>}
|
||||
*/
|
||||
// @ts-ignore
|
||||
const structs = store.clients.get(client)
|
||||
// we iterate from right to left so we can safely remove entries
|
||||
const firstChangePos = math.max(findIndexSS(structs, beforeClock), 1)
|
||||
for (let i = structs.length - 1; i >= firstChangePos; i--) {
|
||||
tryToMergeWithLeft(structs, i)
|
||||
}
|
||||
}
|
||||
}
|
||||
// try to merge replacedItems
|
||||
for (const replacedItem of transaction._replacedItems) {
|
||||
const id = replacedItem.id
|
||||
const client = id.client
|
||||
const clock = id.clock
|
||||
/**
|
||||
* @type {Array<AbstractStruct>}
|
||||
*/
|
||||
// @ts-ignore
|
||||
const structs = store.clients.get(client)
|
||||
const replacedStructPos = findIndexSS(structs, clock)
|
||||
if (replacedStructPos + 1 < structs.length) {
|
||||
tryToMergeWithLeft(structs, replacedStructPos + 1)
|
||||
}
|
||||
if (replacedStructPos > 0) {
|
||||
tryToMergeWithLeft(structs, replacedStructPos)
|
||||
}
|
||||
}
|
||||
this.emit('afterTransactionCleanup', [this, transaction])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -179,13 +179,13 @@ const readStructReaders = (decoder, localState) => {
|
||||
* @param {Map<number,IterableIterator<AbstractRef>>} structReaders
|
||||
* @param {Array<AbstractRef>} stack Stack of pending structs waiting for struct dependencies.
|
||||
* Maximum length of stack is structReaders.size.
|
||||
* @param {IterableIterator<IterableIterator<AbstractRef>>} structReaderIterator
|
||||
* @param {IteratorResult<IterableIterator<AbstractRef>>} structReaderIteratorResult
|
||||
*
|
||||
* @todo reimplement without iterators - read everything in arrays instead
|
||||
*/
|
||||
const execStructReaders = (transaction, store, localState, structReaders, stack) => {
|
||||
const execStructReaders = (transaction, store, localState, structReaders, stack, structReaderIterator, structReaderIteratorResult) => {
|
||||
// iterate over all struct readers until we are done
|
||||
const structReaderIterator = structReaders.values()
|
||||
let structReaderIteratorResult = structReaderIterator.next()
|
||||
while (stack.length !== 0 || !structReaderIteratorResult.done) {
|
||||
if (stack.length === 0) {
|
||||
// stack is empty. We know that there there are more structReaders to be processed
|
||||
@@ -208,7 +208,7 @@ const execStructReaders = (transaction, store, localState, structReaders, stack)
|
||||
if (nextRef === undefined) {
|
||||
// This update message causally depends on another update message.
|
||||
// Store current stack and readers in StructStore and resume the computation at another time
|
||||
store.pendingStructReaders.add({ stack, structReaders, missing })
|
||||
store.pendingStructReaders.add({ stack, structReaders, missing, structReaderIterator, structReaderIteratorResult })
|
||||
return
|
||||
}
|
||||
stack.push(nextRef)
|
||||
@@ -220,6 +220,12 @@ const execStructReaders = (transaction, store, localState, structReaders, stack)
|
||||
const localClock = (localState.get(ref.id.client) || 0)
|
||||
const offset = ref.id.clock < localClock ? localClock - ref.id.clock : 0
|
||||
if (offset < ref.length) {
|
||||
if (ref.id.clock + offset !== localClock) {
|
||||
// A previous message from this client is missing
|
||||
// Store current stack and readers in StructStore and resume the computation at another time
|
||||
store.pendingStructReaders.add({ stack, structReaders, missing: createID(ref.id.client, localClock), structReaderIterator, structReaderIteratorResult })
|
||||
return
|
||||
}
|
||||
ref.toStruct(transaction.y, store, offset).integrate(transaction)
|
||||
}
|
||||
stack.pop()
|
||||
@@ -227,7 +233,7 @@ const execStructReaders = (transaction, store, localState, structReaders, stack)
|
||||
}
|
||||
}
|
||||
if (stack.length > 0) {
|
||||
store.pendingStructReaders.add({ stack, structReaders, missing: stack[stack.length - 1].id })
|
||||
store.pendingStructReaders.add({ stack, structReaders, missing: stack[stack.length - 1].id, structReaderIterator, structReaderIteratorResult })
|
||||
}
|
||||
}
|
||||
|
||||
@@ -247,7 +253,7 @@ const tryResumePendingStructReaders = (transaction, store) => {
|
||||
if (exists(store, pendingReader.missing)) {
|
||||
resume = true // found at least one more reader to execute
|
||||
pendingReaders.delete(pendingReader)
|
||||
execStructReaders(transaction, store, getStates(store), pendingReader.structReaders, pendingReader.stack)
|
||||
execStructReaders(transaction, store, getStates(store), pendingReader.structReaders, pendingReader.stack, pendingReader.structReaderIterator, pendingReader.structReaderIteratorResult)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -279,7 +285,8 @@ export const tryResumePendingDeleteReaders = (transaction, store) => {
|
||||
export const readStructs = (decoder, transaction, store) => {
|
||||
const localState = getStates(store)
|
||||
const readers = readStructReaders(decoder, localState)
|
||||
execStructReaders(transaction, store, localState, readers, [])
|
||||
const structReaderIterator = readers.values()
|
||||
execStructReaders(transaction, store, localState, readers, [], structReaderIterator, structReaderIterator.next())
|
||||
tryResumePendingStructReaders(transaction, store)
|
||||
tryResumePendingDeleteReaders(transaction, store)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user