delay errors in observe callbacks to throw after cleanup is done
This commit is contained in:
parent
f4c919d9ec
commit
f53dff5043
@ -30,7 +30,7 @@ import * as encoding from 'lib0/encoding.js' // eslint-disable-line
|
||||
* @param {EventType} event
|
||||
*/
|
||||
export const callTypeObservers = (type, transaction, event) => {
|
||||
callEventHandlerListeners(type._eH, event, transaction)
|
||||
const changedType = type
|
||||
const changedParentTypes = transaction.changedParentTypes
|
||||
while (true) {
|
||||
// @ts-ignore
|
||||
@ -40,6 +40,7 @@ export const callTypeObservers = (type, transaction, event) => {
|
||||
}
|
||||
type = type._item.parent
|
||||
}
|
||||
callEventHandlerListeners(changedType._eH, event, transaction)
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -17,6 +17,7 @@ import * as encoding from 'lib0/encoding.js'
|
||||
import * as map from 'lib0/map.js'
|
||||
import * as math from 'lib0/math.js'
|
||||
import * as set from 'lib0/set.js'
|
||||
import { callAll } from 'lib0/function.js'
|
||||
|
||||
/**
|
||||
* A transaction is created for every change on the Yjs model. It is possible
|
||||
@ -144,6 +145,164 @@ export const addChangedTypeToTransaction = (transaction, type, parentSub) => {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {Array<Transaction>} transactionCleanups
|
||||
* @param {number} i
|
||||
*/
|
||||
const cleanupTransactions = (transactionCleanups, i) => {
|
||||
if (i < transactionCleanups.length) {
|
||||
const transaction = transactionCleanups[i]
|
||||
const doc = transaction.doc
|
||||
const store = doc.store
|
||||
const ds = transaction.deleteSet
|
||||
try {
|
||||
sortAndMergeDeleteSet(ds)
|
||||
transaction.afterState = getStateVector(transaction.doc.store)
|
||||
doc._transaction = null
|
||||
doc.emit('beforeObserverCalls', [transaction, doc])
|
||||
/**
|
||||
* An array of event callbacks.
|
||||
*
|
||||
* Each callback is called even if the other ones throw errors.
|
||||
*
|
||||
* @type {Array<function():void>}
|
||||
*/
|
||||
const fs = []
|
||||
// observe events on changed types
|
||||
transaction.changed.forEach((subs, itemtype) =>
|
||||
fs.push(() => {
|
||||
if (itemtype._item === null || !itemtype._item.deleted) {
|
||||
itemtype._callObserver(transaction, subs)
|
||||
}
|
||||
})
|
||||
)
|
||||
fs.push(() => {
|
||||
// deep observe events
|
||||
transaction.changedParentTypes.forEach((events, type) =>
|
||||
fs.push(() => {
|
||||
// We need to think about the possibility that the user transforms the
|
||||
// Y.Doc in the event.
|
||||
if (type._item === null || !type._item.deleted) {
|
||||
events = events
|
||||
.filter(event =>
|
||||
event.target._item === null || !event.target._item.deleted
|
||||
)
|
||||
events
|
||||
.forEach(event => {
|
||||
event.currentTarget = type
|
||||
})
|
||||
// We don't need to check for events.length
|
||||
// because we know it has at least one element
|
||||
callEventHandlerListeners(type._dEH, events, transaction)
|
||||
}
|
||||
})
|
||||
)
|
||||
fs.push(() => doc.emit('afterTransaction', [transaction, doc]))
|
||||
})
|
||||
callAll(fs, [])
|
||||
} finally {
|
||||
/**
|
||||
* @param {Array<AbstractStruct>} structs
|
||||
* @param {number} pos
|
||||
*/
|
||||
const tryToMergeWithLeft = (structs, pos) => {
|
||||
const left = structs[pos - 1]
|
||||
const right = structs[pos]
|
||||
if (left.deleted === right.deleted && left.constructor === right.constructor) {
|
||||
if (left.mergeWith(right)) {
|
||||
structs.splice(pos, 1)
|
||||
if (right instanceof Item && right.parentSub !== null && right.parent._map.get(right.parentSub) === right) {
|
||||
right.parent._map.set(right.parentSub, /** @type {Item} */ (left))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// Replace deleted items with ItemDeleted / GC.
|
||||
// This is where content is actually remove from the Yjs Doc.
|
||||
if (doc.gc) {
|
||||
for (const [client, deleteItems] of ds.clients) {
|
||||
const structs = /** @type {Array<AbstractStruct>} */ (store.clients.get(client))
|
||||
for (let di = deleteItems.length - 1; di >= 0; di--) {
|
||||
const deleteItem = deleteItems[di]
|
||||
const endDeleteItemClock = deleteItem.clock + deleteItem.len
|
||||
for (
|
||||
let si = findIndexSS(structs, deleteItem.clock), struct = structs[si];
|
||||
si < structs.length && struct.id.clock < endDeleteItemClock;
|
||||
struct = structs[++si]
|
||||
) {
|
||||
const struct = structs[si]
|
||||
if (deleteItem.clock + deleteItem.len <= struct.id.clock) {
|
||||
break
|
||||
}
|
||||
if (struct instanceof Item && struct.deleted && !struct.keep) {
|
||||
struct.gc(store, false)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// try to merge deleted / gc'd items
|
||||
// merge from right to left for better efficiecy and so we don't miss any merge targets
|
||||
for (const [client, deleteItems] of ds.clients) {
|
||||
const structs = /** @type {Array<AbstractStruct>} */ (store.clients.get(client))
|
||||
for (let di = deleteItems.length - 1; di >= 0; di--) {
|
||||
const deleteItem = deleteItems[di]
|
||||
// start with merging the item next to the last deleted item
|
||||
const mostRightIndexToCheck = math.min(structs.length - 1, 1 + findIndexSS(structs, deleteItem.clock + deleteItem.len - 1))
|
||||
for (
|
||||
let si = mostRightIndexToCheck, struct = structs[si];
|
||||
si > 0 && struct.id.clock >= deleteItem.clock;
|
||||
struct = structs[--si]
|
||||
) {
|
||||
tryToMergeWithLeft(structs, si)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// on all affected store.clients props, try to merge
|
||||
for (const [client, clock] of transaction.afterState) {
|
||||
const beforeClock = transaction.beforeState.get(client) || 0
|
||||
if (beforeClock !== clock) {
|
||||
const structs = /** @type {Array<AbstractStruct>} */ (store.clients.get(client))
|
||||
// we iterate from right to left so we can safely remove entries
|
||||
const firstChangePos = math.max(findIndexSS(structs, beforeClock), 1)
|
||||
for (let i = structs.length - 1; i >= firstChangePos; i--) {
|
||||
tryToMergeWithLeft(structs, i)
|
||||
}
|
||||
}
|
||||
}
|
||||
// try to merge mergeStructs
|
||||
// @todo: it makes more sense to transform mergeStructs to a DS, sort it, and merge from right to left
|
||||
// but at the moment DS does not handle duplicates
|
||||
for (const mid of transaction._mergeStructs) {
|
||||
const client = mid.client
|
||||
const clock = mid.clock
|
||||
const structs = /** @type {Array<AbstractStruct>} */ (store.clients.get(client))
|
||||
const replacedStructPos = findIndexSS(structs, clock)
|
||||
if (replacedStructPos + 1 < structs.length) {
|
||||
tryToMergeWithLeft(structs, replacedStructPos + 1)
|
||||
}
|
||||
if (replacedStructPos > 0) {
|
||||
tryToMergeWithLeft(structs, replacedStructPos)
|
||||
}
|
||||
}
|
||||
// @todo Merge all the transactions into one and provide send the data as a single update message
|
||||
doc.emit('afterTransactionCleanup', [transaction, doc])
|
||||
if (doc._observers.has('update')) {
|
||||
const updateMessage = computeUpdateMessageFromTransaction(transaction)
|
||||
if (updateMessage !== null) {
|
||||
doc.emit('update', [encoding.toUint8Array(updateMessage), transaction.origin, doc])
|
||||
}
|
||||
}
|
||||
if (transactionCleanups.length <= i + 1) {
|
||||
doc._transactionCleanups = []
|
||||
} else {
|
||||
cleanupTransactions(transactionCleanups, i + 1)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Implements the functionality of `y.transact(()=>{..})`
|
||||
*
|
||||
@ -169,134 +328,13 @@ export const transact = (doc, f, origin = null, local = true) => {
|
||||
if (initialCall && transactionCleanups[0] === doc._transaction) {
|
||||
// The first transaction ended, now process observer calls.
|
||||
// Observer call may create new transactions for which we need to call the observers and do cleanup.
|
||||
// We don't want to nest these calls, so we execute these calls one after another
|
||||
for (let i = 0; i < transactionCleanups.length; i++) {
|
||||
const transaction = transactionCleanups[i]
|
||||
const store = transaction.doc.store
|
||||
const ds = transaction.deleteSet
|
||||
sortAndMergeDeleteSet(ds)
|
||||
transaction.afterState = getStateVector(transaction.doc.store)
|
||||
doc._transaction = null
|
||||
doc.emit('beforeObserverCalls', [transaction, doc])
|
||||
// emit change events on changed types
|
||||
transaction.changed.forEach((subs, itemtype) => {
|
||||
if (itemtype._item === null || !itemtype._item.deleted) {
|
||||
itemtype._callObserver(transaction, subs)
|
||||
}
|
||||
})
|
||||
transaction.changedParentTypes.forEach((events, type) => {
|
||||
// We need to think about the possibility that the user transforms the
|
||||
// Y.Doc in the event.
|
||||
if (type._item === null || !type._item.deleted) {
|
||||
events = events
|
||||
.filter(event =>
|
||||
event.target._item === null || !event.target._item.deleted
|
||||
)
|
||||
events
|
||||
.forEach(event => {
|
||||
event.currentTarget = type
|
||||
})
|
||||
// We don't need to check for events.length
|
||||
// because we know it has at least one element
|
||||
callEventHandlerListeners(type._dEH, events, transaction)
|
||||
}
|
||||
})
|
||||
doc.emit('afterTransaction', [transaction, doc])
|
||||
/**
|
||||
* @param {Array<AbstractStruct>} structs
|
||||
* @param {number} pos
|
||||
*/
|
||||
const tryToMergeWithLeft = (structs, pos) => {
|
||||
const left = structs[pos - 1]
|
||||
const right = structs[pos]
|
||||
if (left.deleted === right.deleted && left.constructor === right.constructor) {
|
||||
if (left.mergeWith(right)) {
|
||||
structs.splice(pos, 1)
|
||||
if (right instanceof Item && right.parentSub !== null && right.parent._map.get(right.parentSub) === right) {
|
||||
right.parent._map.set(right.parentSub, /** @type {Item} */ (left))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// Replace deleted items with ItemDeleted / GC.
|
||||
// This is where content is actually remove from the Yjs Doc.
|
||||
if (doc.gc) {
|
||||
for (const [client, deleteItems] of ds.clients) {
|
||||
const structs = /** @type {Array<AbstractStruct>} */ (store.clients.get(client))
|
||||
for (let di = deleteItems.length - 1; di >= 0; di--) {
|
||||
const deleteItem = deleteItems[di]
|
||||
const endDeleteItemClock = deleteItem.clock + deleteItem.len
|
||||
for (
|
||||
let si = findIndexSS(structs, deleteItem.clock), struct = structs[si];
|
||||
si < structs.length && struct.id.clock < endDeleteItemClock;
|
||||
struct = structs[++si]
|
||||
) {
|
||||
const struct = structs[si]
|
||||
if (deleteItem.clock + deleteItem.len <= struct.id.clock) {
|
||||
break
|
||||
}
|
||||
if (struct instanceof Item && struct.deleted && !struct.keep) {
|
||||
struct.gc(store, false)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// try to merge deleted / gc'd items
|
||||
// merge from right to left for better efficiecy and so we don't miss any merge targets
|
||||
for (const [client, deleteItems] of ds.clients) {
|
||||
const structs = /** @type {Array<AbstractStruct>} */ (store.clients.get(client))
|
||||
for (let di = deleteItems.length - 1; di >= 0; di--) {
|
||||
const deleteItem = deleteItems[di]
|
||||
// start with merging the item next to the last deleted item
|
||||
const mostRightIndexToCheck = math.min(structs.length - 1, 1 + findIndexSS(structs, deleteItem.clock + deleteItem.len - 1))
|
||||
for (
|
||||
let si = mostRightIndexToCheck, struct = structs[si];
|
||||
si > 0 && struct.id.clock >= deleteItem.clock;
|
||||
struct = structs[--si]
|
||||
) {
|
||||
tryToMergeWithLeft(structs, si)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// on all affected store.clients props, try to merge
|
||||
for (const [client, clock] of transaction.afterState) {
|
||||
const beforeClock = transaction.beforeState.get(client) || 0
|
||||
if (beforeClock !== clock) {
|
||||
const structs = /** @type {Array<AbstractStruct>} */ (store.clients.get(client))
|
||||
// we iterate from right to left so we can safely remove entries
|
||||
const firstChangePos = math.max(findIndexSS(structs, beforeClock), 1)
|
||||
for (let i = structs.length - 1; i >= firstChangePos; i--) {
|
||||
tryToMergeWithLeft(structs, i)
|
||||
}
|
||||
}
|
||||
}
|
||||
// try to merge mergeStructs
|
||||
// @todo: it makes more sense to transform mergeStructs to a DS, sort it, and merge from right to left
|
||||
// but at the moment DS does not handle duplicates
|
||||
for (const mid of transaction._mergeStructs) {
|
||||
const client = mid.client
|
||||
const clock = mid.clock
|
||||
const structs = /** @type {Array<AbstractStruct>} */ (store.clients.get(client))
|
||||
const replacedStructPos = findIndexSS(structs, clock)
|
||||
if (replacedStructPos + 1 < structs.length) {
|
||||
tryToMergeWithLeft(structs, replacedStructPos + 1)
|
||||
}
|
||||
if (replacedStructPos > 0) {
|
||||
tryToMergeWithLeft(structs, replacedStructPos)
|
||||
}
|
||||
}
|
||||
// @todo Merge all the transactions into one and provide send the data as a single update message
|
||||
doc.emit('afterTransactionCleanup', [transaction, doc])
|
||||
if (doc._observers.has('update')) {
|
||||
const updateMessage = computeUpdateMessageFromTransaction(transaction)
|
||||
if (updateMessage !== null) {
|
||||
doc.emit('update', [encoding.toUint8Array(updateMessage), transaction.origin, doc])
|
||||
}
|
||||
}
|
||||
}
|
||||
doc._transactionCleanups = []
|
||||
// We don't want to nest these calls, so we execute these calls one after
|
||||
// another.
|
||||
// Also we need to ensure that all cleanups are called, even if the
|
||||
// observes throw errors.
|
||||
// This file is full of hacky try {} finally {} blocks to ensure that an
|
||||
// event can throw errors and also that the cleanup is called.
|
||||
cleanupTransactions(transactionCleanups, 0)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -207,7 +207,7 @@ export const testChangeEvent = tc => {
|
||||
const newArr = new Y.Array()
|
||||
array0.insert(0, [newArr, 4, 'dtrn'])
|
||||
t.assert(changes !== null && changes.added.size === 2 && changes.deleted.size === 0)
|
||||
t.compare(changes.delta, [{insert: [newArr, 4, 'dtrn']}])
|
||||
t.compare(changes.delta, [{ insert: [newArr, 4, 'dtrn'] }])
|
||||
changes = null
|
||||
array0.delete(0, 2)
|
||||
t.assert(changes !== null && changes.added.size === 0 && changes.deleted.size === 2)
|
||||
|
@ -340,6 +340,56 @@ export const testChangeEvent = tc => {
|
||||
compare(users)
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {t.TestCase} tc
|
||||
*/
|
||||
export const testYmapEventExceptionsShouldCompleteTransaction = tc => {
|
||||
const doc = new Y.Doc()
|
||||
const map = doc.getMap('map')
|
||||
|
||||
let updateCalled = false
|
||||
let throwingObserverCalled = false
|
||||
let throwingDeepObserverCalled = false
|
||||
doc.on('update', () => {
|
||||
updateCalled = true
|
||||
})
|
||||
|
||||
const throwingObserver = () => {
|
||||
throwingObserverCalled = true
|
||||
throw new Error('Failure')
|
||||
}
|
||||
|
||||
const throwingDeepObserver = () => {
|
||||
throwingDeepObserverCalled = true
|
||||
throw new Error('Failure')
|
||||
}
|
||||
|
||||
map.observe(throwingObserver)
|
||||
map.observeDeep(throwingDeepObserver)
|
||||
|
||||
t.fails(() => {
|
||||
map.set('y', '2')
|
||||
})
|
||||
|
||||
t.assert(updateCalled)
|
||||
t.assert(throwingObserverCalled)
|
||||
t.assert(throwingDeepObserverCalled)
|
||||
|
||||
// check if it works again
|
||||
updateCalled = false
|
||||
throwingObserverCalled = false
|
||||
throwingDeepObserverCalled = false
|
||||
t.fails(() => {
|
||||
map.set('z', '3')
|
||||
})
|
||||
|
||||
t.assert(updateCalled)
|
||||
t.assert(throwingObserverCalled)
|
||||
t.assert(throwingDeepObserverCalled)
|
||||
|
||||
t.assert(map.get('z') === '3')
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {t.TestCase} tc
|
||||
*/
|
||||
|
@ -81,10 +81,10 @@ export const testBasicFormat = tc => {
|
||||
export const testGetDeltaWithEmbeds = tc => {
|
||||
const { text0 } = init(tc, { users: 1 })
|
||||
text0.applyDelta([{
|
||||
insert: {linebreak: 's'}
|
||||
insert: { linebreak: 's' }
|
||||
}])
|
||||
t.compare(text0.toDelta(), [{
|
||||
insert: {linebreak: 's'}
|
||||
insert: { linebreak: 's' }
|
||||
}])
|
||||
}
|
||||
|
||||
@ -127,7 +127,7 @@ export const testSnapshot = tc => {
|
||||
delete v.attributes.ychange.user
|
||||
}
|
||||
})
|
||||
t.compare(state2Diff, [{insert: 'a'}, {insert: 'x', attributes: {ychange: { type: 'added' }}}, {insert: 'b', attributes: {ychange: { type: 'removed' }}}, { insert: 'cd' }])
|
||||
t.compare(state2Diff, [{ insert: 'a' }, { insert: 'x', attributes: { ychange: { type: 'added' } } }, { insert: 'b', attributes: { ychange: { type: 'removed' } } }, { insert: 'cd' }])
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -38,7 +38,10 @@
|
||||
"moduleResolution": "node", /* Specify module resolution strategy: 'node' (Node.js) or 'classic' (TypeScript pre-1.6). */
|
||||
"baseUrl": "./", /* Base directory to resolve non-absolute module names. */
|
||||
"paths": {
|
||||
"yjs": ["./src/index.js"]
|
||||
"yjs": ["./src/index.js"],
|
||||
"lib0/*": ["node_modules/lib0/*"],
|
||||
"lib0/set.js": ["node_modules/lib0/set.js"],
|
||||
"lib0/function.js": ["node_modules/lib0/function.js"]
|
||||
}, /* A series of entries which re-map imports to lookup locations relative to the 'baseUrl'. */
|
||||
// "rootDirs": [], /* List of root folders whose combined content represents the structure of the project at runtime. */
|
||||
// "typeRoots": [], /* List of folders to include type definitions from. */
|
||||
|
Loading…
x
Reference in New Issue
Block a user