diff --git a/src/Connector.js b/src/Connector.js index bee0e41e..88f7ca41 100644 --- a/src/Connector.js +++ b/src/Connector.js @@ -22,6 +22,7 @@ class AbstractConnector { } this.role = opts.role this.connections = {} + this.isSynced = false this.userEventListeners = [] this.whenSyncedListeners = [] this.currentSyncTarget = null @@ -97,7 +98,7 @@ class AbstractConnector { true otherwise */ findNextSyncTarget () { - if (this.currentSyncTarget != null) { + if (this.currentSyncTarget != null || this.isSynced) { return // "The current sync has not finished!" } @@ -118,19 +119,18 @@ class AbstractConnector { deleteSet: yield* this.getDeleteSet() }) }) - } - // This user synced with at least one user, set the state to synced (TODO: does this suffice?) - if (!this.isSynced) { + } else { this.isSynced = true for (var f of this.whenSyncedListeners) { f() } this.whenSyncedListeners = [] + this.y.db.garbageCollectAfterSync() } } send (uid, message) { if (this.debug) { - console.log(`me -> ${uid}: ${message.type}`, m);// eslint-disable-line + console.log(`send ${this.userId} -> ${uid}: ${message.type}`, m);// eslint-disable-line } } /* @@ -141,36 +141,26 @@ class AbstractConnector { return } if (this.debug) { - console.log(`${sender} -> me: ${m.type}`, m);// eslint-disable-line - if (m.os != null && m.os.some(function(o){return o.deleted })){ - console.log("bullshit.. ") - debugger - } + console.log(`receive ${sender} -> ${this.userId}: ${m.type}`, m);// eslint-disable-line } if (m.type === 'sync step 1') { // TODO: make transaction, stream the ops let conn = this this.y.db.requestTransaction(function *() { + var currentStateSet = yield* this.getStateSet() + var dels = yield* this.getOpsFromDeleteSet(m.deleteSet) + for (var i in dels) { + // TODO: no longer get delete ops (just get the ids..)! + yield* Y.Struct.Delete.delete.call(this, dels[i].target) + } + var ops = yield* this.getOperations(m.stateSet) conn.send(sender, { type: 'sync step 2', os: ops, - stateSet: yield* this.getStateSet(), - deleteSet: yield* this.getDeleteSet() // TODO: consider that you have a ds from the other user.. + stateSet: currentStateSet, + deleteSet: yield* this.getDeleteSet() }) - var dels = yield* this.getOpsFromDeleteSet(m.deleteSet) - if (dels.length > 0) { - for (var i in dels) { - // TODO: no longer get delete ops (just get the ids..)! - yield* Y.Struct.Delete.delete.call(this, dels[i].target) - } - /*/ broadcast missing dels from syncing client - this.store.y.connector.broadcast({ - type: 'update', - ops: dels - }) - */ - } if (this.forwardToSyncingClients) { conn.syncingClients.push(sender) setTimeout(function () { @@ -192,11 +182,13 @@ class AbstractConnector { let conn = this var broadcastHB = !this.broadcastedHB this.broadcastedHB = true - this.y.db.requestTransaction(function *() { - var ops = yield* this.getOperations(m.stateSet) + this.y.db.requestTransaction(function * () { var dels = yield* this.getOpsFromDeleteSet(m.deleteSet) + for (var i in dels) { + yield* Y.Struct.Delete.delete.call(this, dels[i].target) + } + var ops = yield* this.getOperations(m.stateSet) this.store.apply(m.os) - this.store.apply(dels) if (ops.length > 0) { m = { type: 'update', diff --git a/src/Helper.spec.js b/src/Helper.spec.js index 3e9d84ee..95d9f234 100644 --- a/src/Helper.spec.js +++ b/src/Helper.spec.js @@ -71,8 +71,8 @@ g.applyRandomTransactions = async(function * applyRandomTransactions (users, obj var f = getRandom(transactions) f(root) } - function applyTransactions () { - for (var i = 0; i < numberOfTransactions / 2 + 1; i++) { + function applyTransactions (relAmount) { + for (var i = 0; i < numberOfTransactions * relAmount + 1; i++) { var r = Math.random() if (r >= 0.9) { // 10% chance to flush @@ -83,16 +83,17 @@ g.applyRandomTransactions = async(function * applyRandomTransactions (users, obj wait() } } - applyTransactions() + applyTransactions(0.5) yield users[0].connector.flushAll() yield g.garbageCollectAllUsers(users) + yield wait() users[0].disconnect() yield wait() - applyTransactions() + applyTransactions(0.5) yield users[0].connector.flushAll() - yield g.garbageCollectAllUsers(users) - users[0].reconnect() yield wait(100) + users[0].reconnect() + yield wait() yield users[0].connector.flushAll() }) @@ -219,14 +220,25 @@ function async (makeGenerator) { return handle(generator.throw(err)) }) } - // this may throw errors here, but its ok since this is used only for debugging - return handle(generator.next()) - /* try { + try { return handle(generator.next()) } catch (ex) { generator.throw(ex) // TODO: check this out // return Promise.reject(ex) - }*/ + } } } g.async = async + +function logUsers (self) { + if (self.constructor === Array) { + self = {users: self} + } + console.log('User 1: ', self.users[0].connector.userId) // eslint-disable-line + self.users[0].db.os.logTable() // eslint-disable-line + console.log('User 2: ', self.users[1].connector.userId) // eslint-disable-line + self.users[1].db.os.logTable() // eslint-disable-line + console.log('User 3: ', self.users[2].connector.userId) // eslint-disable-line + self.users[2].db.os.logTable() // eslint-disable-line +} +g.logUsers = logUsers diff --git a/src/OperationStore.js b/src/OperationStore.js index 59b08c60..7594c06c 100644 --- a/src/OperationStore.js +++ b/src/OperationStore.js @@ -148,7 +148,7 @@ class AbstractOperationStore { this.waitingOperations = new Y.utils.RBTree() this.gc1 = [] // first stage - this.gc2 = [] // second stage -> after that, kill it + this.gc2 = [] // second stage -> after that, remove the op this.gcTimeout = opts.gcTimeout || 5000 var os = this function garbageCollect () { @@ -197,10 +197,46 @@ class AbstractOperationStore { garbageCollect() } } - addToGarbageCollector (op) { - if (op.gc == null) { + garbageCollectAfterSync () { + var os = this.os + var self = this + os.iterate(null, null, function (op) { + if (op.deleted && op.left != null && op.right != null) { + var left = os.find(op.left) + var right = os.find(op.right) + self.addToGarbageCollector(op, left, right) + } + }) + } + /* + Try to add to GC. + + TODO: rename this function + + Only gc when + * creator of op is online + * left & right defined and both are from the same creator as op + + returns true iff op was added to GC + */ + addToGarbageCollector (op, left, right) { + if ( + op.gc == null && + op.deleted === true && + this.y.connector.isSynced && + // (this.y.connector.connections[op.id[0]] != null || op.id[0] === this.y.connector.userId) && + left != null && + right != null && + left.deleted && + right.deleted && + left.id[0] === op.id[0] && + right.id[0] === op.id[0] + ) { op.gc = true this.gc1.push(op.id) + return true + } else { + return false } } removeFromGarbageCollector (op) { diff --git a/src/OperationStores/Memory.js b/src/OperationStores/Memory.js index 422a46e2..d2fa4ffe 100644 --- a/src/OperationStores/Memory.js +++ b/src/OperationStores/Memory.js @@ -175,7 +175,7 @@ Y.Memory = (function () { return stateVector } * getStateSet () { - return this.ss + return Y.utils.copyObject(this.ss) } * getOperations (startSS) { // TODO: use bounds here! diff --git a/src/Struct.js b/src/Struct.js index 1a96cde6..4da01341 100644 --- a/src/Struct.js +++ b/src/Struct.js @@ -46,34 +46,51 @@ var Struct = { */ delete: function * (targetId) { var target = yield* this.getOperation(targetId) - if (target != null && !target.deleted) { - target.deleted = true - if (target.left != null && (yield* this.getOperation(target.left)).deleted) { - // left is defined & the left op is already deleted. - // => Then this may get gc'd - this.store.addToGarbageCollector(target) - } - if (target.right != null) { - var right = yield* this.getOperation(target.right) - if (right.deleted && right.gc == null) { - this.store.addToGarbageCollector(right) - yield* this.setOperation(right) - } - } - yield* this.setOperation(target) - var t = this.store.initializedTypes[JSON.stringify(target.parent)] - if (t != null) { - yield* t._changed(this, { - struct: 'Delete', - target: targetId - }) + + if (target == null || !target.deleted) { + this.ds.delete(targetId) + var state = yield* this.getState(targetId[0]) + if (state.clock === targetId[1]) { + yield* this.checkDeleteStoreForState(state) + yield* this.setState(state) } } - this.ds.delete(targetId) - var state = yield* this.getState(targetId[0]) - if (state.clock === targetId[1]) { - yield* this.checkDeleteStoreForState(state) - yield* this.setState(state) + + if (target != null && target.gc == null) { + if (!target.deleted) { + // set deleted & notify type + target.deleted = true + var type = this.store.initializedTypes[JSON.stringify(target.parent)] + if (type != null) { + yield* type._changed(this, { + struct: 'Delete', + target: targetId + }) + } + } + var left = target.left != null ? yield* this.getOperation(target.left) : null + var right = target.right != null ? yield* this.getOperation(target.right) : null + + this.store.addToGarbageCollector(target, left, right) + + // set here because it was deleted and/or gc'd + yield* this.setOperation(target) + + if ( + left != null && + left.left != null && + this.store.addToGarbageCollector(left, yield* this.getOperation(left.left), target) + ) { + yield* this.setOperation(left) + } + + if ( + right != null && + right.right != null && + this.store.addToGarbageCollector(right, target, yield* this.getOperation(right.right)) + ) { + yield* this.setOperation(right) + } } }, execute: function * (op) { @@ -215,6 +232,12 @@ var Struct = { left = yield* this.getOperation(op.left) op.right = left.right left.right = op.id + + // if left exists, and it is supposed to be gc'd. Remove it from the gc + if (left.gc != null) { + this.store.removeFromGarbageCollector(left) + } + yield* this.setOperation(left) } else { op.right = op.parentSub ? parent.map[op.parentSub] || null : parent.start @@ -228,7 +251,6 @@ var Struct = { if (right.gc != null) { this.store.removeFromGarbageCollector(right) } - yield* this.setOperation(right) } diff --git a/src/Types/Array.spec.js b/src/Types/Array.spec.js index d9b465c3..38d9adc4 100644 --- a/src/Types/Array.spec.js +++ b/src/Types/Array.spec.js @@ -1,17 +1,17 @@ /* global createUsers, wait, Y, compareAllUsers, getRandomNumber, applyRandomTransactions, async, garbageCollectAllUsers, describeManyTimes */ /* eslint-env browser,jasmine */ -var numberOfYArrayTests = 100 -var repeatArrayTests = 1 +var numberOfYArrayTests = 50 +var repeatArrayTests = 300 describe('Array Type', function () { var y1, y2, y3, yconfig1, yconfig2, yconfig3, flushAll beforeEach(async(function * (done) { - yield createUsers(this, 2) + yield createUsers(this, 3) y1 = (yconfig1 = this.users[0]).root y2 = (yconfig2 = this.users[1]).root - // y3 = (yconfig3 = this.users[2]).root + y3 = (yconfig3 = this.users[2]).root flushAll = this.users[0].connector.flushAll yield wait(10) done() @@ -244,7 +244,9 @@ describe('Array Type', function () { } yield applyRandomTransactions(this.users, this.arrays, randomArrayTransactions, numberOfYArrayTests) yield flushAll() + yield garbageCollectAllUsers(this.users) yield compareArrayValues(this.arrays) + yield compareAllUsers(this.users) done() })) }) diff --git a/src/Types/Map.spec.js b/src/Types/Map.spec.js index 16d5d0ba..22b173c8 100644 --- a/src/Types/Map.spec.js +++ b/src/Types/Map.spec.js @@ -1,7 +1,7 @@ /* global createUsers, Y, compareAllUsers, getRandomNumber, applyRandomTransactions, async, describeManyTimes */ /* eslint-env browser,jasmine */ -var numberOfYMapTests = 150 +var numberOfYMapTests = 70 var repeatMapTeasts = 1 describe('Map Type', function () { diff --git a/src/y.js b/src/y.js index f5e98f36..81a3089b 100644 --- a/src/y.js +++ b/src/y.js @@ -36,12 +36,15 @@ class YConfig { this.connector.disconnect() } reconnect () { + this.connector.reconnect() + /* TODO: maybe do this.. Promise.all([ this.db.garbageCollect(), this.db.garbageCollect() ]).then(() => { this.connector.reconnect() }) + */ } destroy () { this.connector.disconnect()