diff --git a/src/Binary/Encoder.js b/src/Binary/Encoder.js index f080ccaf..4fbeff4b 100644 --- a/src/Binary/Encoder.js +++ b/src/Binary/Encoder.js @@ -6,6 +6,7 @@ const bits8 = 0b11111111 export default class BinaryEncoder { constructor () { + // TODO: implement chained Uint8Array buffers instead of Array buffer this.data = [] } diff --git a/src/Connector.js b/src/Connector.js index dbf85047..b0f6c65c 100644 --- a/src/Connector.js +++ b/src/Connector.js @@ -88,6 +88,7 @@ export default class AbstractConnector { isSynced: false, role: role, processAfterAuth: [], + processAfterSync: [], auth: auth || null, receivedSyncStep2: false }) @@ -122,17 +123,15 @@ export default class AbstractConnector { } _fireIsSyncedListeners () { - setTimeout(() => { - if (!this.isSynced) { - this.isSynced = true - // It is safer to remove this! - // call whensynced listeners - for (var f of this.whenSyncedListeners) { - f() - } - this.whenSyncedListeners = [] + if (!this.isSynced) { + this.isSynced = true + // It is safer to remove this! + // call whensynced listeners + for (var f of this.whenSyncedListeners) { + f() } - }, 0) + this.whenSyncedListeners = [] + } } send (uid, buffer) { @@ -237,16 +236,16 @@ export default class AbstractConnector { let messages = senderConn.processAfterAuth senderConn.processAfterAuth = [] - return messages.reduce((p, m) => - p.then(() => this.computeMessage(m[0], m[1], m[2], m[3], m[4])) - , Promise.resolve()) + messages.forEach(m => + this.computeMessage(m[0], m[1], m[2], m[3], m[4]) + ) }) } } - if (skipAuth || senderConn.auth != null) { - return this.computeMessage(messageType, senderConn, decoder, encoder, sender, skipAuth) + if ((skipAuth || senderConn.auth != null) && (messageType !== 'update' || senderConn.isSynced)) { + this.computeMessage(messageType, senderConn, decoder, encoder, sender, skipAuth) } else { - senderConn.processAfterAuth.push([messageType, senderConn, decoder, encoder, sender, false]) + senderConn.processAfterSync.push([messageType, senderConn, decoder, encoder, sender, false]) } } @@ -270,9 +269,15 @@ export default class AbstractConnector { _setSyncedWith (user) { if (user != null) { - this.connections.get(user).isSynced = true + const userConn = this.connections.get(user) + userConn.isSynced = true + const messages = userConn.processAfterSync + userConn.processAfterSync = [] + messages.forEach(m => { + this.computeMessage(m[0], m[1], m[2], m[3], m[4]) + }) } - let conns = Array.from(this.connections.values()) + const conns = Array.from(this.connections.values()) if (conns.length > 0 && conns.every(u => u.isSynced)) { this._fireIsSyncedListeners() } diff --git a/src/MessageHandler/integrateRemoteStructs.js b/src/MessageHandler/integrateRemoteStructs.js index 074c6b3b..8f6a7df8 100644 --- a/src/MessageHandler/integrateRemoteStructs.js +++ b/src/MessageHandler/integrateRemoteStructs.js @@ -19,22 +19,27 @@ class MissingEntry { function _integrateRemoteStructHelper (y, struct) { struct._integrate(y) if (struct.constructor !== Delete) { - let msu = y._missingStructs.get(struct._id.user) + const id = struct._id + let msu = y._missingStructs.get(id.user) if (msu != null) { - let len = struct._length - for (let i = 0; i < len; i++) { - if (msu.has(struct._id.clock + i)) { - let msuc = msu.get(struct._id.clock + i) - msuc.forEach(missingDef => { + let clock = id.clock + const finalClock = clock + struct._length + for (;clock < finalClock; clock++) { + const missingStructs = msu.get(clock) + if (missingStructs !== undefined) { + missingStructs.forEach(missingDef => { missingDef.missing-- if (missingDef.missing === 0) { - let missing = missingDef.struct._fromBinary(y, missingDef.decoder) + const decoder = missingDef.decoder + let oldPos = decoder.pos + let missing = missingDef.struct._fromBinary(y, decoder) + decoder.pos = oldPos if (missing.length === 0) { y._readyToIntegrate.push(missingDef.struct) } } }) - msu.delete(struct._id.clock) + msu.delete(clock) } } } @@ -57,10 +62,10 @@ export function stringifyStructs (y, decoder, strBuilder) { export function integrateRemoteStructs (decoder, encoder, y) { while (decoder.length !== decoder.pos) { - let decoderPos = decoder.pos let reference = decoder.readVarUint() let Constr = getStruct(reference) let struct = new Constr() + let decoderPos = decoder.pos let missing = struct._fromBinary(y, decoder) if (missing.length === 0) { while (struct != null) { diff --git a/src/Struct/Item.js b/src/Struct/Item.js index c7ceffed..8b88f96b 100644 --- a/src/Struct/Item.js +++ b/src/Struct/Item.js @@ -16,11 +16,27 @@ export function splitHelper (y, a, b, diff) { b._id = new ID(aID.user, aID.clock + diff) b._origin = a b._left = a + b._right = a._right + if (b._right !== null) { + b._right._left = b + } + b._right_origin = a._right_origin + // do not set a._right_origin, as this will lead to problems when syncing a._right = b - a._right_origin = b b._parent = a._parent b._parentSub = a._parentSub b._deleted = a._deleted + // now search all relevant items to the right and update origin + // if origin is not it foundOrigins, we don't have to search any longer + let foundOrigins = new Set() + foundOrigins.add(a) + let o = b._right + while (o !== null && foundOrigins.has(o._origin)) { + if (o._origin === a) { + o._origin = b + } + o = o._right + } y.os.put(b) } @@ -133,21 +149,25 @@ export default class Item { // Note that conflictingItems is a subset of itemsBeforeOrigin while (o !== null && o !== this._right) { itemsBeforeOrigin.add(o) + conflictingItems.add(o) if (this._origin === o._origin) { // case 1 if (o._id.user < this._id.user) { this._left = o - conflictingItems = new Set() + conflictingItems.clear() } - } else if (itemsBeforeOrigin.has(o)) { + } else if (itemsBeforeOrigin.has(o._origin)) { // case 2 - if (conflictingItems.has(o)) { + if (!conflictingItems.has(o._origin)) { this._left = o - conflictingItems = new Set() + conflictingItems.clear() } } else { break } + // TODO: try to use right_origin instead. + // Then you could basically omit conflictingItems! + // Note: you probably can't use right_origin in every case.. only when setting _left o = o._right } // reconnect left/right + update parent map/start if necessary @@ -193,9 +213,12 @@ export default class Item { if (this._origin !== null) { info += 0b1 // origin is defined } + // TODO: remove + /* no longer send _left if (this._left !== this._origin) { info += 0b10 // do not copy origin to left } + */ if (this._right_origin !== null) { info += 0b100 } @@ -236,24 +259,9 @@ export default class Item { missing.push(originID) } else { this._origin = origin - } - } - } - // read left - if (info & 0b10) { - // left !== origin - const leftID = decoder.readID() - if (this._left === null) { - const left = y.os.getItemCleanEnd(leftID) - if (left === null) { - // use origin instead this._left = this._origin - } else { - this._left = left } } - } else { - this._left = this._origin } // read right if (info & 0b100) { diff --git a/src/Struct/ItemJSON.js b/src/Struct/ItemJSON.js index f17b9bae..3c5fba14 100644 --- a/src/Struct/ItemJSON.js +++ b/src/Struct/ItemJSON.js @@ -28,7 +28,9 @@ export default class ItemJSON extends Item { } } _logString () { - return `ItemJSON(id:${logID(this._id)},content:${JSON.stringify(this._content)},left:${logID(this._left)},origin:${logID(this._origin)},right:${logID(this._right)},parent:${logID(this._parent)},parentSub:${logID(this._parentSub)})` + const left = this._left !== null ? this._left._lastId : null + const origin = this._origin !== null ? this._origin._lastId : null + return `ItemJSON(id:${logID(this._id)},content:${JSON.stringify(this._content)},left:${logID(left)},origin:${logID(origin)},right:${logID(this._right)},parent:${logID(this._parent)},parentSub:${logID(this._parentSub)})` } _splitAt (y, diff) { if (diff === 0) { diff --git a/src/Struct/ItemString.js b/src/Struct/ItemString.js index eaa504f8..3409ae42 100644 --- a/src/Struct/ItemString.js +++ b/src/Struct/ItemString.js @@ -19,7 +19,9 @@ export default class ItemString extends Item { encoder.writeVarString(this._content) } _logString () { - return `ItemJSON(id:${logID(this._id)},content:${JSON.stringify(this._content)},left:${logID(this._left)},origin:${logID(this._origin)},right:${logID(this._right)},parent:${logID(this._parent)},parentSub:${logID(this._parentSub)})` + const left = this._left !== null ? this._left._lastId : null + const origin = this._origin !== null ? this._origin._lastId : null + return `ItemJSON(id:${logID(this._id)},content:${JSON.stringify(this._content)},left:${logID(left)},origin:${logID(origin)},right:${logID(this._right)},parent:${logID(this._parent)},parentSub:${logID(this._parentSub)})` } _splitAt (y, diff) { if (diff === 0) { diff --git a/src/Type/YArray.js b/src/Type/YArray.js index 8673d117..49ef0d79 100644 --- a/src/Type/YArray.js +++ b/src/Type/YArray.js @@ -204,6 +204,8 @@ export default class YArray extends Type { this.insertAfter(left, content) } _logString () { - return `YArray(id:${logID(this._id)},start:${logID(this._start)},left:${logID(this._left)},origin:${logID(this._origin)},right:${logID(this._right)},parent:${logID(this._parent)},parentSub:${logID(this._parentSub)})` + const left = this._left !== null ? this._left._lastId : null + const origin = this._origin !== null ? this._origin._lastId : null + return `YArray(id:${logID(this._id)},start:${logID(this._start)},left:${logID(left)},origin:${logID(origin)},right:${logID(this._right)},parent:${logID(this._parent)},parentSub:${logID(this._parentSub)})` } } diff --git a/src/Type/YMap.js b/src/Type/YMap.js index 473f04ff..1c7b5fdd 100644 --- a/src/Type/YMap.js +++ b/src/Type/YMap.js @@ -83,6 +83,8 @@ export default class YMap extends Type { } } _logString () { - return `YMap(id:${logID(this._id)},mapSize:${this._map.size},left:${logID(this._left)},origin:${logID(this._origin)},right:${logID(this._right)},parent:${logID(this._parent)},parentSub:${logID(this._parentSub)})` + const left = this._left !== null ? this._left._lastId : null + const origin = this._origin !== null ? this._origin._lastId : null + return `YMap(id:${logID(this._id)},mapSize:${this._map.size},left:${logID(left)},origin:${logID(origin)},right:${logID(this._right)},parent:${logID(this._parent)},parentSub:${logID(this._parentSub)})` } } diff --git a/src/Type/YText.js b/src/Type/YText.js index 86f1951f..f1ac0414 100644 --- a/src/Type/YText.js +++ b/src/Type/YText.js @@ -52,6 +52,8 @@ export default class YText extends YArray { }) } _logString () { - return `YText(id:${logID(this._id)},start:${logID(this._start)},left:${logID(this._left)},origin:${logID(this._origin)},right:${logID(this._right)},parent:${logID(this._parent)},parentSub:${logID(this._parentSub)})` + const left = this._left !== null ? this._left._lastId : null + const origin = this._origin !== null ? this._origin._lastId : null + return `YText(id:${logID(this._id)},start:${logID(this._start)},left:${logID(left)},origin:${logID(origin)},right:${logID(this._right)},parent:${logID(this._parent)},parentSub:${logID(this._parentSub)})` } } diff --git a/src/Type/y-xml/YXmlFragment.js b/src/Type/y-xml/YXmlFragment.js index a86823a1..c3cfcd1a 100644 --- a/src/Type/y-xml/YXmlFragment.js +++ b/src/Type/y-xml/YXmlFragment.js @@ -153,6 +153,8 @@ export default class YXmlFragment extends YArray { } } _logString () { - return `YXml(id:${logID(this._id)},parent:${logID(this._parent)},parentSub:${this._parentSub})` + const left = this._left !== null ? this._left._lastId : null + const origin = this._origin !== null ? this._origin._lastId : null + return `YXml(id:${logID(this._id)},left:${logID(left)},origin:${logID(origin)},right:${this._right},parent:${logID(this._parent)},parentSub:${this._parentSub})` } } diff --git a/test/y-array.tests.js b/test/y-array.tests.js index ec285913..a32c5a80 100644 --- a/test/y-array.tests.js +++ b/test/y-array.tests.js @@ -272,7 +272,7 @@ var arrayTransactions = [ ] test('y-array: Random tests (5)', async function randomArray5 (t) { - await applyRandomTests(t, arrayTransactions, 5) + await applyRandomTests(t, arrayTransactions, 12) }) test('y-array: Random tests (42)', async function randomArray42 (t) { diff --git a/tests-lib/helper.js b/tests-lib/helper.js index ebf7fe9a..15f536e8 100644 --- a/tests-lib/helper.js +++ b/tests-lib/helper.js @@ -248,6 +248,8 @@ export function wait (t) { export async function applyRandomTests (t, mods, iterations) { const chance = new Chance(t.getSeed() * 1000000000) + // TODO: remove + console.info('seed: ' + t._seed) var initInformation = await initArrays(t, { users: 5, chance: chance }) let { users } = initInformation for (var i = 0; i < iterations; i++) { diff --git a/tests-lib/test-connector.js b/tests-lib/test-connector.js index 335d4182..8461b1b6 100644 --- a/tests-lib/test-connector.js +++ b/tests-lib/test-connector.js @@ -119,22 +119,20 @@ export default function extendTestConnector (Y) { }) } receiveMessage (sender, m) { - setTimeout(() => { - if (this.y.userID !== sender && this.connections.has(sender)) { - var buffer = this.connections.get(sender).buffer - if (buffer == null) { - buffer = this.connections.get(sender).buffer = [] - } - buffer.push(m) - if (this.chance.bool({likelihood: 30})) { - // flush 1/2 with 30% chance - var flushLength = Math.round(buffer.length / 2) - buffer.splice(0, flushLength).forEach(m => { - super.receiveMessage(sender, m) - }) - } + if (this.y.userID !== sender && this.connections.has(sender)) { + var buffer = this.connections.get(sender).buffer + if (buffer == null) { + buffer = this.connections.get(sender).buffer = [] } - }, 0) + buffer.push(m) + if (this.chance.bool({likelihood: 30})) { + // flush 1/2 with 30% chance + var flushLength = Math.round(buffer.length / 2) + buffer.splice(0, flushLength).forEach(m => { + super.receiveMessage(sender, m) + }) + } + } } async _flushAll (flushUsers) { if (flushUsers.some(u => u.connector.y.userID === this.y.userID)) {