order observer and transaction cleanups after one another

This commit is contained in:
Kevin Jahns 2019-04-26 13:31:00 +02:00
parent 21d86cd2be
commit a336cc167c
5 changed files with 137 additions and 99 deletions

View File

@ -215,10 +215,10 @@ const insertAttributes = (transaction, parent, left, right, currentAttributes, a
// insert format-start items // insert format-start items
for (let key in attributes) { for (let key in attributes) {
const val = attributes[key] const val = attributes[key]
const currentVal = currentAttributes.get(key) const currentVal = currentAttributes.get(key) || null
if (currentVal !== val) { if (currentVal !== val) {
// save negated attribute (set null if currentVal undefined) // save negated attribute (set null if currentVal undefined)
negatedAttributes.set(key, currentVal || null) negatedAttributes.set(key, currentVal)
left = new ItemFormat(nextID(transaction), left, left === null ? null : left.lastId, right, right === null ? null : right.id, parent, null, key, val) left = new ItemFormat(nextID(transaction), left, left === null ? null : left.lastId, right, right === null ? null : right.id, parent, null, key, val)
left.integrate(transaction) left.integrate(transaction)
} }

View File

@ -129,121 +129,128 @@ export const nextID = transaction => {
* @function * @function
*/ */
export const transact = (y, f) => { export const transact = (y, f) => {
const transactionCleanups = y._transactionCleanups
let initialCall = false let initialCall = false
if (y._transaction === null) { if (y._transaction === null) {
initialCall = true initialCall = true
y._transaction = new Transaction(y) y._transaction = new Transaction(y)
transactionCleanups.push(y._transaction)
y.emit('beforeTransaction', [y._transaction, y]) y.emit('beforeTransaction', [y._transaction, y])
} }
const transaction = y._transaction
try { try {
f(transaction) f(y._transaction)
} finally { } finally {
if (initialCall) { if (initialCall && transactionCleanups[0] === y._transaction) {
y._transaction = null // The first transaction ended, now process observer calls.
y.emit('beforeObserverCalls', [transaction, y]) // Observer call may create new transactions for which we need to call the observers and do cleanup.
// emit change events on changed types // We don't want to nest these calls, so we execute these calls one after another
transaction.changed.forEach((subs, itemtype) => { for (let i = 0; i < transactionCleanups.length; i++) {
itemtype._callObserver(transaction, subs) const transaction = transactionCleanups[i]
}) const store = transaction.y.store
transaction.changedParentTypes.forEach((events, type) => { const ds = transaction.deleteSet
events = events sortAndMergeDeleteSet(ds)
.filter(event => transaction.afterState = getStates(transaction.y.store)
event.target._item === null || !event.target._item.deleted y._transaction = null
) y.emit('beforeObserverCalls', [transaction, y])
events // emit change events on changed types
.forEach(event => { transaction.changed.forEach((subs, itemtype) => {
event.currentTarget = type itemtype._callObserver(transaction, subs)
}) })
// we don't need to check for events.length transaction.changedParentTypes.forEach((events, type) => {
// because we know it has at least one element events = events
callEventHandlerListeners(type._dEH, events, transaction) .filter(event =>
}) event.target._item === null || !event.target._item.deleted
// only call afterTransaction listeners if anything changed )
transaction.afterState = getStates(transaction.y.store) events
// when all changes & events are processed, emit afterTransaction event .forEach(event => {
// transaction cleanup event.currentTarget = type
const store = transaction.y.store })
const ds = transaction.deleteSet // we don't need to check for events.length
sortAndMergeDeleteSet(ds) // because we know it has at least one element
y.emit('afterTransaction', [transaction, y]) callEventHandlerListeners(type._dEH, events, transaction)
// replace deleted items with ItemDeleted / GC })
for (const [client, deleteItems] of ds.clients) { y.emit('afterTransaction', [transaction, y])
/** // replace deleted items with ItemDeleted / GC
* @type {Array<AbstractStruct>} for (const [client, deleteItems] of ds.clients) {
*/
// @ts-ignore
const structs = store.clients.get(client)
for (let di = 0; di < deleteItems.length; di++) {
const deleteItem = deleteItems[di]
for (let si = findIndexSS(structs, deleteItem.clock); si < structs.length; si++) {
const struct = structs[si]
if (deleteItem.clock + deleteItem.len <= struct.id.clock) {
break
}
if (struct.deleted && struct instanceof AbstractItem) {
if (struct.constructor !== ItemDeleted || (struct.parent._item !== null && struct.parent._item.deleted)) {
// check if we can GC
struct.gc(transaction, store)
} else {
// otherwise only gc children (if there are any)
struct.gcChildren(transaction, store)
}
}
}
}
}
/**
* @param {Array<AbstractStruct>} structs
* @param {number} pos
*/
const tryToMergeWithLeft = (structs, pos) => {
const left = structs[pos - 1]
const right = structs[pos]
if (left.deleted === right.deleted && left.constructor === right.constructor) {
if (left.mergeWith(right)) {
structs.splice(pos, 1)
if (right instanceof AbstractItem && right.parentSub !== null && right.parent._map.get(right.parentSub) === right) {
// @ts-ignore we already did a constructor check above
right.parent._map.set(right.parentSub, left)
}
}
}
}
// on all affected store.clients props, try to merge
for (const [client, clock] of transaction.afterState) {
const beforeClock = transaction.beforeState.get(client) || 0
if (beforeClock !== clock) {
/** /**
* @type {Array<AbstractStruct>} * @type {Array<AbstractStruct>}
*/ */
// @ts-ignore // @ts-ignore
const structs = store.clients.get(client) const structs = store.clients.get(client)
// we iterate from right to left so we can safely remove entries for (let di = 0; di < deleteItems.length; di++) {
const firstChangePos = math.max(findIndexSS(structs, beforeClock), 1) const deleteItem = deleteItems[di]
for (let i = structs.length - 1; i >= firstChangePos; i--) { for (let si = findIndexSS(structs, deleteItem.clock); si < structs.length; si++) {
tryToMergeWithLeft(structs, i) const struct = structs[si]
if (deleteItem.clock + deleteItem.len <= struct.id.clock) {
break
}
if (struct.deleted && struct instanceof AbstractItem) {
if (struct.constructor !== ItemDeleted || (struct.parent._item !== null && struct.parent._item.deleted)) {
// check if we can GC
struct.gc(transaction, store)
} else {
// otherwise only gc children (if there are any)
struct.gcChildren(transaction, store)
}
}
}
} }
} }
}
// try to merge mergeStructs
for (const mid of transaction._mergeStructs) {
const client = mid.client
const clock = mid.clock
/** /**
* @type {Array<AbstractStruct>} * @param {Array<AbstractStruct>} structs
* @param {number} pos
*/ */
// @ts-ignore const tryToMergeWithLeft = (structs, pos) => {
const structs = store.clients.get(client) const left = structs[pos - 1]
const replacedStructPos = findIndexSS(structs, clock) const right = structs[pos]
if (replacedStructPos + 1 < structs.length) { if (left.deleted === right.deleted && left.constructor === right.constructor) {
tryToMergeWithLeft(structs, replacedStructPos + 1) if (left.mergeWith(right)) {
structs.splice(pos, 1)
if (right instanceof AbstractItem && right.parentSub !== null && right.parent._map.get(right.parentSub) === right) {
// @ts-ignore we already did a constructor check above
right.parent._map.set(right.parentSub, left)
}
}
}
} }
if (replacedStructPos > 0) { // on all affected store.clients props, try to merge
tryToMergeWithLeft(structs, replacedStructPos) for (const [client, clock] of transaction.afterState) {
const beforeClock = transaction.beforeState.get(client) || 0
if (beforeClock !== clock) {
/**
* @type {Array<AbstractStruct>}
*/
// @ts-ignore
const structs = store.clients.get(client)
// we iterate from right to left so we can safely remove entries
const firstChangePos = math.max(findIndexSS(structs, beforeClock), 1)
for (let i = structs.length - 1; i >= firstChangePos; i--) {
tryToMergeWithLeft(structs, i)
}
}
} }
// try to merge mergeStructs
for (const mid of transaction._mergeStructs) {
const client = mid.client
const clock = mid.clock
/**
* @type {Array<AbstractStruct>}
*/
// @ts-ignore
const structs = store.clients.get(client)
const replacedStructPos = findIndexSS(structs, clock)
if (replacedStructPos + 1 < structs.length) {
tryToMergeWithLeft(structs, replacedStructPos + 1)
}
if (replacedStructPos > 0) {
tryToMergeWithLeft(structs, replacedStructPos)
}
}
// @todo Merge all the transactions into one and provide send the data as a single update message
// @todo implement a dedicatet event that we can use to send updates to other peer
y.emit('afterTransactionCleanup', [transaction, y])
} }
y.emit('afterTransactionCleanup', [transaction, y]) y._transactionCleanups = []
} }
} }
} }

View File

@ -39,6 +39,11 @@ export class Y extends Observable {
* @private * @private
*/ */
this._transaction = null this._transaction = null
/**
* @type {Array<Transaction>}
* @private
*/
this._transactionCleanups = []
} }
/** /**
* Changes that happen inside of a transaction are bundled. This means that * Changes that happen inside of a transaction are bundled. This means that

View File

@ -233,7 +233,7 @@ export class TestConnector {
* @template T * @template T
* @param {t.TestCase} tc * @param {t.TestCase} tc
* @param {{users?:number}} conf * @param {{users?:number}} conf
* @param {InitTestObjectCallback<T>} initTestObject * @param {InitTestObjectCallback<T>} [initTestObject]
* @return {{testObjects:Array<any>,testConnector:TestConnector,users:Array<TestYInstance>,array0:Y.Array<any>,array1:Y.Array<any>,array2:Y.Array<any>,map0:Y.Map<any>,map1:Y.Map<any>,map2:Y.Map<any>,map3:Y.Map<any>,text0:Y.Text,text1:Y.Text,text2:Y.Text,xml0:Y.XmlElement,xml1:Y.XmlElement,xml2:Y.XmlElement}} * @return {{testObjects:Array<any>,testConnector:TestConnector,users:Array<TestYInstance>,array0:Y.Array<any>,array1:Y.Array<any>,array2:Y.Array<any>,map0:Y.Map<any>,map1:Y.Map<any>,map2:Y.Map<any>,map3:Y.Map<any>,text0:Y.Text,text1:Y.Text,text2:Y.Text,xml0:Y.XmlElement,xml1:Y.XmlElement,xml2:Y.XmlElement}}
*/ */
export const init = (tc, { users = 5 } = {}, initTestObject) => { export const init = (tc, { users = 5 } = {}, initTestObject) => {
@ -256,7 +256,7 @@ export const init = (tc, { users = 5 } = {}, initTestObject) => {
result['text' + i] = y.get('text', Y.Text) result['text' + i] = y.get('text', Y.Text)
} }
testConnector.syncAll() testConnector.syncAll()
result.testObjects = result.users.map(initTestObject) result.testObjects = result.users.map(initTestObject || (() => null))
// @ts-ignore // @ts-ignore
return result return result
} }

View File

@ -144,6 +144,32 @@ export const testInsertAndDeleteEvents = tc => {
compare(users) compare(users)
} }
/**
* @param {t.TestCase} tc
*/
export const testNestedObserverEvents = tc => {
const { array0, users } = init(tc, { users: 2 })
/**
* @type {Array<number>}
*/
const vals = []
array0.observe(e => {
if (array0.length === 1) {
// inserting, will call this observer again
// we expect that this observer is called after this event handler finishedn
array0.insert(1, [1])
vals.push(0)
} else {
// this should be called the second time an element is inserted (above case)
vals.push(1)
}
})
array0.insert(0, [0])
t.compareArrays(vals, [0, 1])
t.compareArrays(array0.toArray(), [0, 1])
compare(users)
}
/** /**
* @param {t.TestCase} tc * @param {t.TestCase} tc
*/ */