fix some tests, implement event classes for types, and re-implement logging
This commit is contained in:
@@ -3,7 +3,7 @@ import BinaryDecoder from './Binary/Decoder.js'
|
||||
|
||||
import { sendSyncStep1, readSyncStep1 } from './MessageHandler/syncStep1.js'
|
||||
import { readSyncStep2 } from './MessageHandler/syncStep2.js'
|
||||
import { readUpdate } from './MessageHandler/update.js'
|
||||
import { integrateRemoteStructs } from './MessageHandler/integrateRemoteStructs.js'
|
||||
|
||||
import debug from 'debug'
|
||||
|
||||
@@ -136,19 +136,21 @@ export default class AbstractConnector {
|
||||
}
|
||||
|
||||
send (uid, buffer) {
|
||||
const y = this.y
|
||||
if (!(buffer instanceof ArrayBuffer || buffer instanceof Uint8Array)) {
|
||||
throw new Error('Expected Message to be an ArrayBuffer or Uint8Array - don\'t use this method to send custom messages')
|
||||
}
|
||||
this.log('%s: Send \'%y\' to %s', this.y.userID, buffer, uid)
|
||||
this.logMessage('Message: %Y', buffer)
|
||||
this.log('%s: Send \'%y\' to %s', y.userID, buffer, uid)
|
||||
this.logMessage('Message: %Y', [y, buffer])
|
||||
}
|
||||
|
||||
broadcast (buffer) {
|
||||
const y = this.y
|
||||
if (!(buffer instanceof ArrayBuffer || buffer instanceof Uint8Array)) {
|
||||
throw new Error('Expected Message to be an ArrayBuffer or Uint8Array - don\'t use this method to send custom messages')
|
||||
}
|
||||
this.log('%s: Broadcast \'%y\'', this.y.userID, buffer)
|
||||
this.logMessage('Message: %Y', buffer)
|
||||
this.log('%s: Broadcast \'%y\'', y.userID, buffer)
|
||||
this.logMessage('Message: %Y', [y, buffer])
|
||||
}
|
||||
|
||||
/*
|
||||
@@ -177,7 +179,7 @@ export default class AbstractConnector {
|
||||
this.broadcast(this.broadcastBuffer.createBuffer())
|
||||
this.broadcastBuffer = new BinaryEncoder()
|
||||
}
|
||||
})
|
||||
}, 0)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -199,11 +201,13 @@ export default class AbstractConnector {
|
||||
You received a raw message, and you know that it is intended for Yjs. Then call this function.
|
||||
*/
|
||||
receiveMessage (sender, buffer, skipAuth) {
|
||||
const y = this.y
|
||||
const userID = y.userID
|
||||
skipAuth = skipAuth || false
|
||||
if (!(buffer instanceof ArrayBuffer || buffer instanceof Uint8Array)) {
|
||||
return Promise.reject(new Error('Expected Message to be an ArrayBuffer or Uint8Array!'))
|
||||
}
|
||||
if (sender === this.y.userID) {
|
||||
if (sender === userID) {
|
||||
return Promise.resolve()
|
||||
}
|
||||
let decoder = new BinaryDecoder(buffer)
|
||||
@@ -212,8 +216,8 @@ export default class AbstractConnector {
|
||||
encoder.writeVarString(roomname)
|
||||
let messageType = decoder.readVarString()
|
||||
let senderConn = this.connections.get(sender)
|
||||
this.log('%s: Receive \'%s\' from %s', this.y.userID, messageType, sender)
|
||||
this.logMessage('Message: %Y', buffer)
|
||||
this.log('%s: Receive \'%s\' from %s', userID, messageType, sender)
|
||||
this.logMessage('Message: %Y', [y, buffer])
|
||||
if (senderConn == null && !skipAuth) {
|
||||
throw new Error('Received message from unknown peer!')
|
||||
}
|
||||
@@ -222,10 +226,10 @@ export default class AbstractConnector {
|
||||
if (senderConn.auth == null) {
|
||||
senderConn.processAfterAuth.push([messageType, senderConn, decoder, encoder, sender])
|
||||
// check auth
|
||||
return this.checkAuth(auth, this.y, sender).then(authPermissions => {
|
||||
return this.checkAuth(auth, y, sender).then(authPermissions => {
|
||||
if (senderConn.auth == null) {
|
||||
senderConn.auth = authPermissions
|
||||
this.y.emit('userAuthenticated', {
|
||||
y.emit('userAuthenticated', {
|
||||
user: senderConn.uid,
|
||||
auth: authPermissions
|
||||
})
|
||||
@@ -250,16 +254,17 @@ export default class AbstractConnector {
|
||||
if (messageType === 'sync step 1' && (senderConn.auth === 'write' || senderConn.auth === 'read')) {
|
||||
// cannot wait for sync step 1 to finish, because we may wait for sync step 2 in sync step 1 (->lock)
|
||||
readSyncStep1(decoder, encoder, this.y, senderConn, sender)
|
||||
} else if (messageType === 'sync step 2' && senderConn.auth === 'write') {
|
||||
this.y.transact(() => {
|
||||
readSyncStep2(decoder, encoder, this.y, senderConn, sender)
|
||||
})
|
||||
} else if (messageType === 'update' && (skipAuth || senderConn.auth === 'write')) {
|
||||
this.y.transact(() => {
|
||||
readUpdate(decoder, encoder, this.y, senderConn, sender)
|
||||
})
|
||||
} else {
|
||||
throw new Error('Unable to receive message')
|
||||
const y = this.y
|
||||
y.transact(function () {
|
||||
if (messageType === 'sync step 2' && senderConn.auth === 'write') {
|
||||
readSyncStep2(decoder, encoder, y, senderConn, sender)
|
||||
} else if (messageType === 'update' && (skipAuth || senderConn.auth === 'write')) {
|
||||
integrateRemoteStructs(decoder, encoder, y, senderConn, sender)
|
||||
} else {
|
||||
throw new Error('Unable to receive message')
|
||||
}
|
||||
}, true)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -28,9 +28,7 @@ function _integrateRemoteStructHelper (y, struct) {
|
||||
missingDef.missing--
|
||||
if (missingDef.missing === 0) {
|
||||
let missing = missingDef.struct._fromBinary(y, missingDef.decoder)
|
||||
if (missing.length > 0) {
|
||||
console.error('Missing should be empty!')
|
||||
} else {
|
||||
if (missing.length === 0) {
|
||||
y._readyToIntegrate.push(missingDef.struct)
|
||||
}
|
||||
}
|
||||
@@ -42,6 +40,21 @@ function _integrateRemoteStructHelper (y, struct) {
|
||||
}
|
||||
}
|
||||
|
||||
export function stringifyStructs (y, decoder, strBuilder) {
|
||||
while (decoder.length !== decoder.pos) {
|
||||
let reference = decoder.readVarUint()
|
||||
let Constr = getStruct(reference)
|
||||
let struct = new Constr()
|
||||
let missing = struct._fromBinary(y, decoder)
|
||||
let logMessage = struct._logString()
|
||||
if (missing.length > 0) {
|
||||
logMessage += missing.map(id => `ID (user: ${id.user}, clock: ${id.clock})`).join(', ')
|
||||
}
|
||||
logMessage += '\n'
|
||||
strBuilder.push(logMessage)
|
||||
}
|
||||
}
|
||||
|
||||
export function integrateRemoteStructs (decoder, encoder, y) {
|
||||
while (decoder.length !== decoder.pos) {
|
||||
let decoderPos = decoder.pos
|
||||
|
||||
@@ -1,16 +1,16 @@
|
||||
import BinaryDecoder from '../Binary/Decoder.js'
|
||||
import { stringifyUpdate } from './update.js'
|
||||
import { stringifyStructs } from './integrateRemoteStructs.js'
|
||||
import { stringifySyncStep1 } from './syncStep1.js'
|
||||
import { stringifySyncStep2 } from './syncStep2.js'
|
||||
|
||||
export function messageToString (y, buffer) {
|
||||
export function messageToString ([y, buffer]) {
|
||||
let decoder = new BinaryDecoder(buffer)
|
||||
decoder.readVarString() // read roomname
|
||||
let type = decoder.readVarString()
|
||||
let strBuilder = []
|
||||
strBuilder.push('\n === ' + type + ' ===\n')
|
||||
if (type === 'update') {
|
||||
stringifyUpdate(y, decoder, strBuilder)
|
||||
stringifyStructs(y, decoder, strBuilder)
|
||||
} else if (type === 'sync step 1') {
|
||||
stringifySyncStep1(y, decoder, strBuilder)
|
||||
} else if (type === 'sync step 2') {
|
||||
|
||||
@@ -1,11 +1,9 @@
|
||||
import { integrateRemoteStructs } from './integrateRemoteStructs.js'
|
||||
import { stringifyUpdate } from './update.js'
|
||||
import { stringifyStructs, integrateRemoteStructs } from './integrateRemoteStructs.js'
|
||||
import { readDeleteSet } from './deleteSet.js'
|
||||
|
||||
export function stringifySyncStep2 (y, decoder, strBuilder) {
|
||||
strBuilder.push(' - auth: ' + decoder.readVarString() + '\n')
|
||||
strBuilder.push(' == OS: \n')
|
||||
stringifyUpdate(y, decoder, strBuilder)
|
||||
// write DS to string
|
||||
strBuilder.push(' == DS: \n')
|
||||
let len = decoder.readUint32()
|
||||
@@ -20,6 +18,7 @@ export function stringifySyncStep2 (y, decoder, strBuilder) {
|
||||
strBuilder.push(`[${from}, ${to}, ${gc}]`)
|
||||
}
|
||||
}
|
||||
stringifyStructs(y, decoder, strBuilder)
|
||||
}
|
||||
|
||||
export function readSyncStep2 (decoder, encoder, y, senderConn, sender) {
|
||||
|
||||
@@ -1,19 +0,0 @@
|
||||
|
||||
import { getStruct } from '../Util/structReferences.js'
|
||||
|
||||
export function stringifyUpdate (y, decoder, strBuilder) {
|
||||
while (decoder.length !== decoder.pos) {
|
||||
let reference = decoder.readVarUint()
|
||||
let Constr = getStruct(reference)
|
||||
let struct = new Constr()
|
||||
let missing = struct._fromBinary(y, decoder)
|
||||
let logMessage = struct._logString()
|
||||
if (missing.length > 0) {
|
||||
logMessage += missing.map(m => m._logString()).join(', ')
|
||||
}
|
||||
logMessage += '\n'
|
||||
strBuilder.push(logMessage)
|
||||
}
|
||||
}
|
||||
|
||||
export { integrateRemoteStructs as readUpdate } from './integrateRemoteStructs.js'
|
||||
@@ -15,7 +15,9 @@ export default class OperationStore extends Tree {
|
||||
struct = new Constr()
|
||||
struct._id = id
|
||||
struct._parent = y
|
||||
struct._integrate(y)
|
||||
y.transact(() => {
|
||||
struct._integrate(y)
|
||||
})
|
||||
this.put(struct)
|
||||
}
|
||||
return struct
|
||||
|
||||
@@ -2,6 +2,7 @@ import { getReference } from '../Util/structReferences.js'
|
||||
import ID from '../Util/ID.js'
|
||||
import { RootFakeUserID } from '../Util/RootID.js'
|
||||
import Delete from './Delete.js'
|
||||
import { transactionTypeChanged } from '../Transaction.js'
|
||||
|
||||
/**
|
||||
* Helper utility to split an Item (see _splitAt)
|
||||
@@ -64,10 +65,7 @@ export default class Item {
|
||||
del._length = this._length
|
||||
del._integrate(y, true)
|
||||
}
|
||||
const parent = this._parent
|
||||
if (parent !== y && !parent._deleted) {
|
||||
y._transactionChangedTypes.set(parent, this._parentSub)
|
||||
}
|
||||
transactionTypeChanged(y, this._parent, this._parentSub)
|
||||
}
|
||||
/**
|
||||
* This is called right before this struct receives any children.
|
||||
@@ -98,7 +96,7 @@ export default class Item {
|
||||
// missing content from user
|
||||
throw new Error('Can not apply yet!')
|
||||
}
|
||||
if (!parent._deleted && !y._transactionChangedTypes.has(parent) && !y._transactionNewTypes.has(parent)) {
|
||||
if (!parent._deleted && !y._transaction.changedTypes.has(parent) && !y._transaction.newTypes.has(parent)) {
|
||||
// this is the first time parent is updated
|
||||
// or this types is new
|
||||
this._parent._beforeChange()
|
||||
@@ -178,10 +176,7 @@ export default class Item {
|
||||
}
|
||||
}
|
||||
y.os.put(this)
|
||||
if (parent !== y && !parent._deleted) {
|
||||
y._transactionChangedTypes.set(parent, parentSub)
|
||||
}
|
||||
|
||||
transactionTypeChanged(y, parent, parentSub)
|
||||
if (this._id.user !== RootFakeUserID) {
|
||||
if (y.connector._forwardAppliedStructs || this._id.user === y.userID) {
|
||||
y.connector.broadcastStruct(this)
|
||||
|
||||
@@ -29,7 +29,7 @@ export default class Type extends Item {
|
||||
this._eventHandler.removeEventListener(f)
|
||||
}
|
||||
_integrate (y) {
|
||||
y._transactionNewTypes.add(this)
|
||||
y._transaction.newTypes.add(this)
|
||||
super._integrate(y)
|
||||
this._y = y
|
||||
// when integrating children we must make sure to
|
||||
@@ -48,7 +48,7 @@ export default class Type extends Item {
|
||||
}
|
||||
_delete (y, createDelete) {
|
||||
super._delete(y, createDelete)
|
||||
y._transactionChangedTypes.delete(this)
|
||||
y._transaction.changedTypes.delete(this)
|
||||
// delete map types
|
||||
for (let value of this._map.values()) {
|
||||
if (value instanceof Item && !value._deleted) {
|
||||
|
||||
@@ -1,9 +1,16 @@
|
||||
import Type from '../Struct/Type.js'
|
||||
import ItemJSON from '../Struct/ItemJSON.js'
|
||||
|
||||
class YArrayEvent {
|
||||
constructor (yarray, remote) {
|
||||
this.target = yarray
|
||||
this.remote = remote
|
||||
}
|
||||
}
|
||||
|
||||
export default class YArray extends Type {
|
||||
_callObserver () {
|
||||
this._eventHandler.callEventListeners({})
|
||||
_callObserver (parentSubs, remote) {
|
||||
this._eventHandler.callEventListeners(new YArrayEvent(this, remote))
|
||||
}
|
||||
get (i) {
|
||||
// TODO: This can be improved!
|
||||
@@ -107,12 +114,13 @@ export default class YArray extends Type {
|
||||
}
|
||||
item = item._right
|
||||
}
|
||||
if (length > 0) {
|
||||
throw new Error('Delete exceeds the range of the YArray')
|
||||
}
|
||||
})
|
||||
if (length > 0) {
|
||||
throw new Error('Delete exceeds the range of the YArray')
|
||||
}
|
||||
}
|
||||
insertAfter (left, content) {
|
||||
const y = this._y
|
||||
const apply = () => {
|
||||
let right
|
||||
if (left === null) {
|
||||
@@ -123,10 +131,13 @@ export default class YArray extends Type {
|
||||
let prevJsonIns = null
|
||||
for (let i = 0; i < content.length; i++) {
|
||||
let c = content[i]
|
||||
if (typeof c === 'function') {
|
||||
c = new c() // eslint-disable-line new-cap
|
||||
}
|
||||
if (c instanceof Type) {
|
||||
if (prevJsonIns !== null) {
|
||||
if (this._y !== null) {
|
||||
prevJsonIns._integrate(this._y)
|
||||
if (y !== null) {
|
||||
prevJsonIns._integrate(y)
|
||||
}
|
||||
left = prevJsonIns
|
||||
prevJsonIns = null
|
||||
@@ -136,8 +147,8 @@ export default class YArray extends Type {
|
||||
c._right = right
|
||||
c._right_origin = right
|
||||
c._parent = this
|
||||
if (this._y !== null) {
|
||||
c._integrate(this._y)
|
||||
if (y !== null) {
|
||||
c._integrate(y)
|
||||
} else if (left === null) {
|
||||
this._start = c
|
||||
}
|
||||
@@ -155,12 +166,12 @@ export default class YArray extends Type {
|
||||
prevJsonIns._content.push(c)
|
||||
}
|
||||
}
|
||||
if (prevJsonIns !== null && this._y !== null) {
|
||||
prevJsonIns._integrate(this._y)
|
||||
if (prevJsonIns !== null && y !== null) {
|
||||
prevJsonIns._integrate(y)
|
||||
}
|
||||
}
|
||||
if (this._y !== null) {
|
||||
this._y.transact(apply)
|
||||
if (y !== null) {
|
||||
y.transact(apply)
|
||||
} else {
|
||||
apply()
|
||||
}
|
||||
@@ -170,13 +181,19 @@ export default class YArray extends Type {
|
||||
let left = null
|
||||
let right = this._start
|
||||
let count = 0
|
||||
const y = this._y
|
||||
while (right !== null) {
|
||||
if (count <= pos && pos < count + right._length) {
|
||||
right = right._splitAt(this._y, pos - count)
|
||||
const rightLen = right._deleted ? 0 : (right._length - 1)
|
||||
if (count <= pos && pos <= count + rightLen) {
|
||||
const splitDiff = pos - count
|
||||
right = right._splitAt(y, splitDiff)
|
||||
left = right._left
|
||||
count += splitDiff
|
||||
break
|
||||
}
|
||||
count += right._length
|
||||
if (!right._deleted) {
|
||||
count += right._length
|
||||
}
|
||||
left = right
|
||||
right = right._right
|
||||
}
|
||||
|
||||
@@ -2,11 +2,17 @@ import Type from '../Struct/Type.js'
|
||||
import Item from '../Struct/Item.js'
|
||||
import ItemJSON from '../Struct/ItemJSON.js'
|
||||
|
||||
class YMapEvent {
|
||||
constructor (ymap, subs, remote) {
|
||||
this.target = ymap
|
||||
this.keysChanged = subs
|
||||
this.remote = remote
|
||||
}
|
||||
}
|
||||
|
||||
export default class YMap extends Type {
|
||||
_callObserver (parentSub) {
|
||||
this._eventHandler.callEventListeners({
|
||||
name: parentSub
|
||||
})
|
||||
_callObserver (parentSubs, remote) {
|
||||
this._eventHandler.callEventListeners(new YMapEvent(this, parentSubs, remote))
|
||||
}
|
||||
toJSON () {
|
||||
const map = {}
|
||||
@@ -36,13 +42,21 @@ export default class YMap extends Type {
|
||||
})
|
||||
}
|
||||
set (key, value) {
|
||||
this._y.transact(() => {
|
||||
const y = this._y
|
||||
y.transact(() => {
|
||||
const old = this._map.get(key) || null
|
||||
if (old !== null) {
|
||||
old._delete(this._y)
|
||||
if (old instanceof ItemJSON && old._content[0] === value) {
|
||||
// Trying to overwrite with same value
|
||||
// break here
|
||||
return value
|
||||
}
|
||||
old._delete(y)
|
||||
}
|
||||
let v
|
||||
if (value instanceof Item) {
|
||||
if (typeof value === 'function') {
|
||||
v = new value() // eslint-disable-line new-cap
|
||||
} else if (value instanceof Item) {
|
||||
v = value
|
||||
} else {
|
||||
v = new ItemJSON()
|
||||
@@ -52,7 +66,7 @@ export default class YMap extends Type {
|
||||
v._right_origin = old
|
||||
v._parent = this
|
||||
v._parentSub = key
|
||||
v._integrate(this._y)
|
||||
v._integrate(y)
|
||||
})
|
||||
return value
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ import { defaultDomFilter, applyChangesFromDom, reflectChangesOnDom } from './ut
|
||||
|
||||
import YArray from '../YArray.js'
|
||||
import YXmlText from './YXmlText.js'
|
||||
import YXmlEvent from './YXmlEvent.js'
|
||||
|
||||
function domToYXml (parent, doms) {
|
||||
const types = []
|
||||
@@ -65,22 +66,8 @@ export default class YXmlFragment extends YArray {
|
||||
xml.setDomFilter(f)
|
||||
})
|
||||
}
|
||||
_callObserver (parentSub) {
|
||||
let event
|
||||
if (parentSub !== null) {
|
||||
event = {
|
||||
type: 'attributeChanged',
|
||||
name: parentSub,
|
||||
value: this.getAttribute(parentSub),
|
||||
target: this
|
||||
}
|
||||
} else {
|
||||
event = {
|
||||
type: 'contentChanged',
|
||||
target: this
|
||||
}
|
||||
}
|
||||
this._eventHandler.callEventListeners(event)
|
||||
_callObserver (parentSubs, remote) {
|
||||
this._eventHandler.callEventListeners(new YXmlEvent(this, parentSubs, remote))
|
||||
}
|
||||
toString () {
|
||||
return this.map(xml => xml.toString()).join('')
|
||||
|
||||
@@ -131,13 +131,17 @@ export function reflectChangesOnDom (event) {
|
||||
yxml._mutualExclude(() => {
|
||||
// TODO: do this once before applying stuff
|
||||
// let anchorViewPosition = getAnchorViewPosition(yxml._scrollElement)
|
||||
if (event.type === 'attributeChanged') {
|
||||
if (event.value === undefined) {
|
||||
dom.removeAttribute(event.name)
|
||||
|
||||
// update attributes
|
||||
event.attributesChanged.forEach(attributeName => {
|
||||
const value = yxml.getAttribute(attributeName)
|
||||
if (value === undefined) {
|
||||
dom.remoteAttribute(attributeName)
|
||||
} else {
|
||||
dom.setAttribute(event.name, event.value)
|
||||
dom.setAttribute(attributeName, value)
|
||||
}
|
||||
} else if (event.type === 'contentChanged') {
|
||||
})
|
||||
if (event.childListChanged) {
|
||||
// create fragment of undeleted nodes
|
||||
const fragment = document.createDocumentFragment()
|
||||
yxml.forEach(function (t) {
|
||||
|
||||
31
src/Y.js
31
src/Y.js
@@ -16,12 +16,7 @@ import { YXmlFragment, YXmlElement, YXmlText } from './Type/y-xml/y-xml.js'
|
||||
import BinaryDecoder from './Binary/Decoder.js'
|
||||
|
||||
import debug from 'debug'
|
||||
|
||||
function callTypesAfterTransaction (y) {
|
||||
y._transactionChangedTypes.forEach(function (parentSub, type) {
|
||||
type._callObserver(parentSub)
|
||||
})
|
||||
}
|
||||
import Transaction from './Transaction.js'
|
||||
|
||||
export default class Y extends NamedEventHandler {
|
||||
constructor (opts) {
|
||||
@@ -42,25 +37,27 @@ export default class Y extends NamedEventHandler {
|
||||
this._missingStructs = new Map()
|
||||
this._readyToIntegrate = []
|
||||
this._transactionsInProgress = 0
|
||||
// types added during transaction
|
||||
this._transactionNewTypes = new Set()
|
||||
// changed types (does not include new types)
|
||||
this._transactionChangedTypes = new Map()
|
||||
this.on('afterTransaction', callTypesAfterTransaction)
|
||||
this._transaction = null
|
||||
}
|
||||
_beforeChange () {}
|
||||
transact (f) {
|
||||
this._transactionsInProgress++
|
||||
transact (f, remote = false) {
|
||||
let initialCall = this._transaction === null
|
||||
if (initialCall) {
|
||||
this._transaction = new Transaction(this)
|
||||
}
|
||||
try {
|
||||
f()
|
||||
} catch (e) {
|
||||
console.error(e)
|
||||
}
|
||||
this._transactionsInProgress--
|
||||
if (this._transactionsInProgress === 0) {
|
||||
if (initialCall) {
|
||||
// emit change events on changed types
|
||||
this._transaction.changedTypes.forEach(function (subs, type) {
|
||||
type._callObserver(subs, remote)
|
||||
})
|
||||
this._transaction = null
|
||||
// when all changes & events are processed, emit afterTransaction event
|
||||
this.emit('afterTransaction', this)
|
||||
this._transactionChangedTypes = new Map()
|
||||
this._transactionNewTypes = new Set()
|
||||
}
|
||||
}
|
||||
// fake _start for root properties (y.set('name', type))
|
||||
|
||||
Reference in New Issue
Block a user