From b08aeee4fc868a5365ba310c6b0764a4d9db50ae Mon Sep 17 00:00:00 2001 From: Kevin Jahns Date: Sat, 26 Sep 2015 14:42:50 +0200 Subject: [PATCH] updating some changes i forgot to commit --- gulpfile.js | 13 +++++++----- src/Connector.js | 32 ++++++++++++++++++----------- src/Connectors/Test.js | 3 +++ src/Helper.spec.js | 13 ++++-------- src/OperationStore.js | 18 ++++++++-------- src/OperationStores/Memory.js | 20 ++++++++---------- src/OperationStores/RedBlackTree.js | 24 +++++++++++++++------- src/Struct.js | 2 +- src/Types/Array.spec.js | 4 ++-- 9 files changed, 74 insertions(+), 55 deletions(-) diff --git a/gulpfile.js b/gulpfile.js index a79a8245..761464c9 100644 --- a/gulpfile.js +++ b/gulpfile.js @@ -28,14 +28,17 @@ Specify which specs to use! Commands: - - build: + - build Build this library - - dev: + - dev:browser + Watch the ./src directory. + Builds the library on changes. + Starts an http-server and serves the test suite on http://127.0.0.1:8888. + - dev:node Watch the ./src directory. Builds and specs the library on changes. - Starts an http-server and serves the test suite on http://127.0.0.1:8888. - - build_test: - Builds the test suite + Usefull to run with node-inspector. + `node-debug $(which gulp) dev:node - test: Test this library */ diff --git a/src/Connector.js b/src/Connector.js index 912f6845..bee0e41e 100644 --- a/src/Connector.js +++ b/src/Connector.js @@ -142,27 +142,35 @@ class AbstractConnector { } if (this.debug) { console.log(`${sender} -> me: ${m.type}`, m);// eslint-disable-line + if (m.os != null && m.os.some(function(o){return o.deleted })){ + console.log("bullshit.. ") + debugger + } } if (m.type === 'sync step 1') { // TODO: make transaction, stream the ops let conn = this this.y.db.requestTransaction(function *() { var ops = yield* this.getOperations(m.stateSet) - var dels = yield* this.getOpsFromDeleteSet(m.deleteSet) - if (dels.length > 0) { - this.store.apply(dels) - // broadcast missing dels from syncing client - this.store.y.connector.broadcast({ - type: 'update', - ops: dels - }) - } conn.send(sender, { type: 'sync step 2', os: ops, stateSet: yield* this.getStateSet(), - deleteSet: yield* this.getDeleteSet() + deleteSet: yield* this.getDeleteSet() // TODO: consider that you have a ds from the other user.. }) + var dels = yield* this.getOpsFromDeleteSet(m.deleteSet) + if (dels.length > 0) { + for (var i in dels) { + // TODO: no longer get delete ops (just get the ids..)! + yield* Y.Struct.Delete.delete.call(this, dels[i].target) + } + /*/ broadcast missing dels from syncing client + this.store.y.connector.broadcast({ + type: 'update', + ops: dels + }) + */ + } if (this.forwardToSyncingClients) { conn.syncingClients.push(sender) setTimeout(function () { @@ -187,14 +195,14 @@ class AbstractConnector { this.y.db.requestTransaction(function *() { var ops = yield* this.getOperations(m.stateSet) var dels = yield* this.getOpsFromDeleteSet(m.deleteSet) - this.store.apply(dels) this.store.apply(m.os) + this.store.apply(dels) if (ops.length > 0) { m = { type: 'update', ops: ops } - if (!broadcastHB) { + if (!broadcastHB || true) { // TODO: consider to broadcast here.. conn.send(sender, m) } else { // broadcast only once! diff --git a/src/Connectors/Test.js b/src/Connectors/Test.js index b40cbbe4..7a48d06d 100644 --- a/src/Connectors/Test.js +++ b/src/Connectors/Test.js @@ -59,6 +59,9 @@ class Test extends Y.AbstractConnector { globalRoom.addUser(this) this.globalRoom = globalRoom } + receiveMessage (sender, m) { + super.receiveMessage(sender, JSON.parse(JSON.stringify(m))) + } send (userId, message) { globalRoom.buffers[userId].push(JSON.parse(JSON.stringify([this.userId, message]))) } diff --git a/src/Helper.spec.js b/src/Helper.spec.js index c6b0a5b8..3e9d84ee 100644 --- a/src/Helper.spec.js +++ b/src/Helper.spec.js @@ -84,16 +84,15 @@ g.applyRandomTransactions = async(function * applyRandomTransactions (users, obj } } applyTransactions() - applyTransactions() - /* TODO: call applyTransactions here.. yield users[0].connector.flushAll() + yield g.garbageCollectAllUsers(users) users[0].disconnect() yield wait() applyTransactions() yield users[0].connector.flushAll() + yield g.garbageCollectAllUsers(users) users[0].reconnect() - */ - yield wait() + yield wait(100) yield users[0].connector.flushAll() }) @@ -104,7 +103,7 @@ g.garbageCollectAllUsers = async(function * garbageCollectAllUsers (users) { } }) -g.compareAllUsers = async(function * compareAllUsers (users) { //eslint-disable-line +g.compareAllUsers = async(function * compareAllUsers (users) { var s1, s2 // state sets var ds1, ds2 // delete sets var allDels1, allDels2 // all deletions @@ -129,11 +128,7 @@ g.compareAllUsers = async(function * compareAllUsers (users) { //eslint-disable- } yield users[0].connector.flushAll() // gc two times because of the two gc phases (really collect everything) - yield wait(100) yield g.garbageCollectAllUsers(users) - yield wait(100) - yield g.garbageCollectAllUsers(users) - yield wait(100) for (var uid = 0; uid < users.length; uid++) { var u = users[uid] diff --git a/src/OperationStore.js b/src/OperationStore.js index a80496f8..59b08c60 100644 --- a/src/OperationStore.js +++ b/src/OperationStore.js @@ -122,7 +122,7 @@ Y.AbstractTransaction = AbstractTransaction * destroy() - destroy the database */ -class AbstractOperationStore { // eslint-disable-line no-unused-vars +class AbstractOperationStore { constructor (y, opts) { this.y = y // E.g. this.listenersById[id] : Array @@ -239,7 +239,7 @@ class AbstractOperationStore { // eslint-disable-line no-unused-vars apply (ops) { for (var key in ops) { var o = ops[key] - if (!o.gc) { + if (o.gc == null) { // TODO: why do i get the same op twice? var required = Y.Struct[o.struct].requiredOps(o) this.whenOperationsExist(required, o) } else { @@ -316,17 +316,19 @@ class AbstractOperationStore { // eslint-disable-line no-unused-vars } else { while (op != null) { var state = yield* this.getState(op.id[0]) - if (op.id[1] === state.clock) { - state.clock++ - yield* this.checkDeleteStoreForState(state) - yield* this.setState(state) - var isDeleted = this.store.ds.isDeleted(op.id) + if (op.id[1] === state.clock || (op.id[1] < state.clock && (yield* this.getOperation(op.id)) == null)) { + // either its a new operation (1. case), or it is an operation that was deleted, but is not yet in the OS + if (op.id[1] === state.clock) { + state.clock++ + yield* this.checkDeleteStoreForState(state) + yield* this.setState(state) + } yield* Y.Struct[op.struct].execute.call(this, op) yield* this.addOperation(op) yield* this.store.operationAdded(this, op) - if (isDeleted) { + if (this.store.ds.isDeleted(op.id)) { yield* Y.Struct['Delete'].execute.call(this, {struct: 'Delete', target: op.id}) } diff --git a/src/OperationStores/Memory.js b/src/OperationStores/Memory.js index f2ad5a9a..422a46e2 100644 --- a/src/OperationStores/Memory.js +++ b/src/OperationStores/Memory.js @@ -193,17 +193,15 @@ Y.Memory = (function () { var startPos = startSS[user] || 0 var endPos = endState.clock - this.os.iterate([user, startPos], [user, endPos], function (op) {// eslint-disable-line - if (!op.gc) { - ops.push(Y.Struct[op.struct].encode(op)) - } + this.os.iterate([user, startPos], [user, endPos], function (op) { + ops.push(op) }) } var res = [] for (var op of ops) { res.push(yield* this.makeOperationReady(startSS, op)) var state = startSS[op.id[0]] || 0 - if (state === op.id[1] || true) { + if ((state === op.id[1]) || true) { startSS[op.id[0]] = state + 1 } else { throw new Error('Unexpected operation!') @@ -212,32 +210,32 @@ Y.Memory = (function () { return res } * makeOperationReady (ss, op) { + op = Y.Struct[op.struct].encode(op) // instead of ss, you could use currSS (a ss that increments when you add an operation) op = Y.utils.copyObject(op) var o = op - var clock while (o.right != null) { // while unknown, go to the right - clock = ss[o.right[0]] || 0 - if (o.right[1] < clock && !o.gc) { + if (o.right[1] < (ss[o.right[0]] || 0)) { break } o = yield* this.getOperation(o.right) } + // new right is not gc'd and known according to the ss op.right = o.right while (o.left != null) { // while unknown, go to the right - clock = ss[o.left[0]] || 0 - if (o.left[1] < clock && !o.gc) { + if (o.left[1] < (ss[o.left[0]] || 0)) { break } o = yield* this.getOperation(o.left) } + // new left is not gc'd and known according to the ss op.left = o.left return op } } - class OperationStore extends Y.AbstractOperationStore { // eslint-disable-line no-undef + class OperationStore extends Y.AbstractOperationStore { constructor (y, opts) { super(y, opts) this.os = new Y.utils.RBTree() diff --git a/src/OperationStores/RedBlackTree.js b/src/OperationStores/RedBlackTree.js index 4af6f043..faa1177f 100644 --- a/src/OperationStores/RedBlackTree.js +++ b/src/OperationStores/RedBlackTree.js @@ -194,17 +194,27 @@ class RBTree { } return true } - logTable (from, to) { + logTable (from, to, filter) { + if (filter == null) { + filter = function () { + return true + } + } if (from == null) { from = null } if (to == null) { to = null } var os = [] this.iterate(from, to, function (o) { - var o_ = Y.utils.copyObject(o) - var id = o_.id - delete o_.id - o_['id[0]'] = id[0] - o_['id[1]'] = id[1] - os.push(o_) + if (filter(o)) { + var o_ = {} + for (var key in o) { + if (typeof o[key] === 'object') { + o_[key] = JSON.stringify(o[key]) + } else { + o_[key] = o[key] + } + } + os.push(o_) + } }) if (console.table != null) { console.table(os) diff --git a/src/Struct.js b/src/Struct.js index 88e4ecd8..1a96cde6 100644 --- a/src/Struct.js +++ b/src/Struct.js @@ -33,7 +33,7 @@ var Struct = { return op }, requiredOps: function (op) { - return [op.target] + return [] // [op.target] }, /* Delete an operation from the OS, and add it to the GC, if necessary. diff --git a/src/Types/Array.spec.js b/src/Types/Array.spec.js index cdbe312f..d9b465c3 100644 --- a/src/Types/Array.spec.js +++ b/src/Types/Array.spec.js @@ -8,10 +8,10 @@ describe('Array Type', function () { var y1, y2, y3, yconfig1, yconfig2, yconfig3, flushAll beforeEach(async(function * (done) { - yield createUsers(this, 3) + yield createUsers(this, 2) y1 = (yconfig1 = this.users[0]).root y2 = (yconfig2 = this.users[1]).root - y3 = (yconfig3 = this.users[2]).root + // y3 = (yconfig3 = this.users[2]).root flushAll = this.users[0].connector.flushAll yield wait(10) done()