From 6a13419c62fe2955e51bdaa47cf109e708a18000 Mon Sep 17 00:00:00 2001 From: Kevin Jahns Date: Thu, 8 Oct 2015 02:12:20 +0200 Subject: [PATCH] fixed several bugs in multi join/rejoin --- src/Connector.js | 22 +++++-- src/Connectors/Test.js | 24 ++++++-- src/Helper.spec.js | 29 ++++++--- src/OperationStore.js | 98 ++++++++++++++++-------------- src/OperationStores/Memory.js | 17 +++++- src/OperationStores/Memory.spec.js | 19 +++++- src/Types/Array.spec.js | 4 +- 7 files changed, 142 insertions(+), 71 deletions(-) diff --git a/src/Connector.js b/src/Connector.js index b787d55c..cd431d26 100644 --- a/src/Connector.js +++ b/src/Connector.js @@ -55,6 +55,9 @@ class AbstractConnector { this.currentSyncTarget = null this.findNextSyncTarget() } + this.syncingClients = this.syncingClients.filter(function (cli) { + return cli !== user + }) for (var f of this.userEventListeners) { f({ action: 'userLeft', @@ -142,7 +145,7 @@ class AbstractConnector { return } if (this.debug) { - console.log(`receive ${sender} -> ${this.userId}: ${m.type}`, m) // eslint-disable-line + console.log(`receive ${sender} -> ${this.userId}: ${m.type}`, JSON.parse(JSON.stringify(m))) // eslint-disable-line } if (m.type === 'sync step 1') { // TODO: make transaction, stream the ops @@ -168,6 +171,7 @@ class AbstractConnector { conn.send(sender, { type: 'sync done' }) + conn._setSyncedWith(sender) }, conn.syncingClientDuration) } else { conn.send(sender, { @@ -199,11 +203,7 @@ class AbstractConnector { } }) } else if (m.type === 'sync done') { - this.connections[sender].isSynced = true - if (sender === this.currentSyncTarget) { - this.currentSyncTarget = null - this.findNextSyncTarget() - } + this._setSyncedWith(sender) } else if (m.type === 'update') { if (this.forwardToSyncingClients) { for (var client of this.syncingClients) { @@ -213,6 +213,16 @@ class AbstractConnector { this.y.db.apply(m.ops) } } + _setSyncedWith (user) { + var conn = this.connections[user] + if (conn != null) { + conn.isSynced = true + } + if (user === this.currentSyncTarget) { + this.currentSyncTarget = null + this.findNextSyncTarget() + } + } /* Currently, the HB encodes operations as JSON. For the moment I want to keep it that way. Maybe we support encoding in the HB as XML in the future, but for now I don't want diff --git a/src/Connectors/Test.js b/src/Connectors/Test.js index 7a48d06d..1d0fd150 100644 --- a/src/Connectors/Test.js +++ b/src/Connectors/Test.js @@ -58,25 +58,36 @@ class Test extends Y.AbstractConnector { this.setUserId((userIdCounter++) + '') globalRoom.addUser(this) this.globalRoom = globalRoom + this.syncingClientDuration = 0 } receiveMessage (sender, m) { super.receiveMessage(sender, JSON.parse(JSON.stringify(m))) } send (userId, message) { - globalRoom.buffers[userId].push(JSON.parse(JSON.stringify([this.userId, message]))) + var buffer = globalRoom.buffers[userId] + if (buffer != null) { + buffer.push(JSON.parse(JSON.stringify([this.userId, message]))) + } } broadcast (message) { for (var key in globalRoom.buffers) { globalRoom.buffers[key].push(JSON.parse(JSON.stringify([this.userId, message]))) } } + isDisconnected () { + return globalRoom.users[this.userId] == null + } reconnect () { - globalRoom.addUser(this) - super.reconnect() + if (this.isDisconnected()) { + globalRoom.addUser(this) + super.reconnect() + } } disconnect () { - globalRoom.removeUser(this.userId) - super.disconnect() + if (!this.isDisconnected()) { + globalRoom.removeUser(this.userId) + super.disconnect() + } } flush () { var buff = globalRoom.buffers[this.userId] @@ -107,6 +118,9 @@ class Test extends Y.AbstractConnector { wait(0).then(nextFlush) }) } + /* + Flushes an operation for some user.. + */ flushOne () { flushOne() } diff --git a/src/Helper.spec.js b/src/Helper.spec.js index c3655e53..898974c4 100644 --- a/src/Helper.spec.js +++ b/src/Helper.spec.js @@ -36,7 +36,7 @@ function wait (t) { return new Promise(function (resolve) { setTimeout(function () { resolve() - }, t) + }, t * 7) }) } g.wait = wait @@ -71,29 +71,40 @@ g.applyRandomTransactions = async(function * applyRandomTransactions (users, obj var f = getRandom(transactions) f(root) } - function applyTransactions (relAmount) { + function * applyTransactions (relAmount) { for (var i = 0; i < numberOfTransactions * relAmount + 1; i++) { var r = Math.random() if (r >= 0.9) { // 10% chance to flush - users[0].connector.flushOne() - } else { + users[0].connector.flushOne() // flushes for some user.. (not necessarily 0) + } else if (r >= 0.1) { + // 80% chance to create operation randomTransaction(getRandom(objects)) + } else { + // 10% chance to disconnect/reconnect + var u = getRandom(users) + if (u.connector.isDisconnected()) { + u.reconnect() + } else { + u.disconnect() + } } - wait() + yield wait() } } - applyTransactions(0.5) + yield* applyTransactions(0.5) yield users[0].connector.flushAll() yield g.garbageCollectAllUsers(users) yield wait() users[0].disconnect() yield wait() - applyTransactions(0.5) + yield* applyTransactions(0.5) yield users[0].connector.flushAll() + yield wait(50) + for (var u in users) { + users[u].reconnect() + } yield wait(100) - users[0].reconnect() - yield wait() yield users[0].connector.flushAll() }) diff --git a/src/OperationStore.js b/src/OperationStore.js index b1eb02cf..8f472981 100644 --- a/src/OperationStore.js +++ b/src/OperationStore.js @@ -158,57 +158,55 @@ class AbstractTransaction { } } * garbageCollectOperation (id) { - var o = yield* this.getOperation(id) - - if (o == null) { - return - } - - if (!o.deleted) { - yield* this.deleteOperation(id) - o = yield* this.getOperation(id) - } - - // TODO: I don't think that this is necessary!! // check to increase the state of the respective user var state = yield* this.getState(id[0]) if (state.clock === id[1]) { + state.clock++ // also check if more expected operations were gc'd yield* this.checkDeleteStoreForState(state) // then set the state yield* this.setState(state) } + this.ds.markGarbageCollected(id) - // remove gc'd op from the left op, if it exists - if (o.left != null) { - var left = yield* this.getOperation(o.left) - left.right = o.right - yield* this.setOperation(left) + // if op exists, then clean that mess up.. + var o = yield* this.getOperation(id) + if (o != null) { + if (!o.deleted) { + yield* this.deleteOperation(id) + o = yield* this.getOperation(id) + } + + // remove gc'd op from the left op, if it exists + if (o.left != null) { + var left = yield* this.getOperation(o.left) + left.right = o.right + yield* this.setOperation(left) + } + // remove gc'd op from the right op, if it exists + if (o.right != null) { + var right = yield* this.getOperation(o.right) + right.left = o.left + yield* this.setOperation(right) + } + // remove gc'd op from parent, if it exists + var parent = yield* this.getOperation(o.parent) + var setParent = false // whether to save parent to the os + if (Y.utils.compareIds(parent.start, o.id)) { + // gc'd op is the start + setParent = true + parent.start = o.right + } + if (Y.utils.compareIds(parent.end, o.id)) { + // gc'd op is the end + setParent = true + parent.end = o.left + } + if (setParent) { + yield* this.setOperation(parent) + } + yield* this.removeOperation(o.id) // actually remove it from the os } - // remove gc'd op from the right op, if it exists - if (o.right != null) { - var right = yield* this.getOperation(o.right) - right.left = o.left - yield* this.setOperation(right) - } - // remove gc'd op from parent, if it exists - var parent = yield* this.getOperation(o.parent) - var setParent = false // whether to save parent to the os - if (Y.utils.compareIds(parent.start, o.id)) { - // gc'd op is the start - setParent = true - parent.start = o.right - } - if (Y.utils.compareIds(parent.end, o.id)) { - // gc'd op is the end - setParent = true - parent.end = o.left - } - if (setParent) { - yield* this.setOperation(parent) - } - yield* this.removeOperation(o.id) // actually remove it from the os - this.ds.markGarbageCollected(o.id) } } Y.AbstractTransaction = AbstractTransaction @@ -276,8 +274,20 @@ class AbstractOperationStore { } } stopGarbageCollector () { - this.gc1 = [] - this.gc2 = [] + var self = this + return new Promise(function (resolve) { + self.requestTransaction(function * () { + var ungc = self.gc1.concat(self.gc2) + self.gc1 = [] + self.gc2 = [] + for (var i in ungc) { + var op = yield* this.getOperation(ungc[i]) + delete op.gc + yield* this.setOperation(op) + } + resolve() + }) + }) } garbageCollectAfterSync () { var os = this.os @@ -307,7 +317,7 @@ class AbstractOperationStore { op.deleted === true && this.y.connector.isSynced && left != null && - left.deleted + left.deleted === true ) { op.gc = true this.gc1.push(op.id) diff --git a/src/OperationStores/Memory.js b/src/OperationStores/Memory.js index cf3c869e..63434c9b 100644 --- a/src/OperationStores/Memory.js +++ b/src/OperationStores/Memory.js @@ -5,7 +5,8 @@ class DeleteStore extends Y.utils.RBTree { constructor () { super() // TODO: debugggg - this.mem = [] + this.mem = []; + this.memDS = []; } isDeleted (id) { var n = this.findNodeWithUpperBound(id) @@ -17,7 +18,7 @@ class DeleteStore extends Y.utils.RBTree { returns the delete node */ markGarbageCollected (id) { - this.mem.push({"gc": id}) + this.mem.push({"gc": id}); var n = this.markDeleted(id) this.mem.pop() if (!n.val.gc) { @@ -64,7 +65,7 @@ class DeleteStore extends Y.utils.RBTree { returns the delete node */ markDeleted (id) { - this.mem.push({"del": id}) + this.mem.push({"del": id}); var n = this.findNodeWithUpperBound(id) if (n != null && n.val.id[0] === id[0]) { if (n.val.id[1] <= id[1] && id[1] < n.val.id[1] + n.val.len) { @@ -124,6 +125,8 @@ Y.Memory = (function () { this.ss = store.ss this.os = store.os this.ds = store.ds + + this.memDS = store.ds.memDS; // TODO: remove } * checkDeleteStoreForState (state) { var n = this.ds.findNodeWithUpperBound([state.user, state.clock]) @@ -145,6 +148,12 @@ Y.Memory = (function () { deletions.push([user, c, gc]) } } + + var memAction = { + before: yield* this.getDeleteSet(), + applied: JSON.parse(JSON.stringify(ds)) + }; + for (var user in ds) { var dv = ds[user] var pos = 0 @@ -206,6 +215,8 @@ Y.Memory = (function () { yield* this.deleteOperation(id) } } + memAction.after = yield* this.getDeleteSet(); + this.memDS.push(memAction); } * isDeleted (id) { return this.ds.isDeleted(id) diff --git a/src/OperationStores/Memory.spec.js b/src/OperationStores/Memory.spec.js index 43e88b49..16535613 100644 --- a/src/OperationStores/Memory.spec.js +++ b/src/OperationStores/Memory.spec.js @@ -46,7 +46,7 @@ describe('Memory', function () { ds.markGarbageCollected(['291', 2]) expect(ds.toDeleteSet()).toEqual({'291': [[2, 1, true]], '293': [[0, 1, true], [1, 1, false]]}) }) - it('Debug #2', function () { + it('Debug #3', function () { ds.markDeleted(['581', 0]) ds.markDeleted(['581', 1]) ds.markDeleted(['580', 0]) @@ -62,7 +62,7 @@ describe('Memory', function () { ds.markGarbageCollected(['580', 1]) expect(ds.toDeleteSet()).toEqual({'580': [[0, 1, false], [1, 1, true], [2, 1, false]], '581': [[0, 3, true]]}) }) - it('Debug #2', function () { + it('Debug #4', function () { ds.markDeleted(['544', 0]) ds.markDeleted(['543', 2]) ds.markDeleted(['544', 0]) @@ -81,5 +81,20 @@ describe('Memory', function () { ds.markGarbageCollected(['543', 3]) expect(ds.toDeleteSet()).toEqual({'543': [[2, 3, true]], '544': [[0, 1, true], [1, 1, false], [2, 1, true]], '545': [[1, 1, false]]}) }) + it('Debug #5', async(function * (done) { + var store = new Y.Memory(null, { + db: { + name: 'Memory', + gcTimeout: -1 + } + }) + store.requestTransaction(function * () { + yield* this.applyDeleteSet({'16': [[1, 2, false]], '17': [[0, 1, true], [1, 3, false]]}) + expect(this.ds.toDeleteSet()).toEqual({'16': [[1, 2, false]], '17': [[0, 1, true], [1, 3, false]]}) + yield* this.applyDeleteSet({'16': [[1, 2, false]], '17': [[0, 4, true]]}) + expect(this.ds.toDeleteSet()).toEqual({'16': [[1, 2, false]], '17': [[0, 4, true]]}) + done() + }) + })) }) }) diff --git a/src/Types/Array.spec.js b/src/Types/Array.spec.js index a65d45ac..53ba97f7 100644 --- a/src/Types/Array.spec.js +++ b/src/Types/Array.spec.js @@ -1,8 +1,8 @@ /* global createUsers, wait, Y, compareAllUsers, getRandomNumber, applyRandomTransactions, async, garbageCollectAllUsers, describeManyTimes */ /* eslint-env browser,jasmine */ -var numberOfYArrayTests = 200 -var repeatArrayTests = 1 +var numberOfYArrayTests = 20 +var repeatArrayTests = 1000 describe('Array Type', function () { var y1, y2, y3, yconfig1, yconfig2, yconfig3, flushAll