From 382d06f6d493db9a5446e27e337866cc69ab0537 Mon Sep 17 00:00:00 2001 From: Kevin Jahns Date: Mon, 3 Jul 2017 23:19:11 -0700 Subject: [PATCH] reworked getOperations (decrease size of sent operations, fixe some gc issues). garbageCollectOperation now sets origin to the direct left operation, if possible --- src/Connector.js | 10 +++++----- src/Database.js | 4 ---- src/Transaction.js | 43 +++++++++++++++++++++++++++---------------- src/Utils.js | 1 + tests-lib/helper.js | 8 +++++--- 5 files changed, 38 insertions(+), 28 deletions(-) diff --git a/src/Connector.js b/src/Connector.js index 0e8e69fc..c67e3fcc 100644 --- a/src/Connector.js +++ b/src/Connector.js @@ -185,7 +185,7 @@ export default function extendConnector (Y/* :any */) { this.currentSyncTarget = syncUser this.y.db.requestTransaction(function * () { var stateSet = yield * this.getStateSet() - var deleteSet = yield * this.getDeleteSet() + // var deleteSet = yield * this.getDeleteSet() var answer = { type: 'sync step 1', stateSet: stateSet, @@ -346,21 +346,21 @@ export default function extendConnector (Y/* :any */) { let m = message // apply operations first db.requestTransaction(function * () { - yield * this.applyDeleteSet(m.deleteSet) + // yield * this.applyDeleteSet(m.deleteSet) if (m.osUntransformed != null) { yield * this.applyOperationsUntransformed(m.osUntransformed, m.stateSet) } else { this.store.apply(m.os) } - defer.resolve() + // defer.resolve() }) - /*/ then apply ds + // then apply ds db.whenTransactionsFinished().then(() => { db.requestTransaction(function * () { yield * this.applyDeleteSet(m.deleteSet) }) defer.resolve() - })*/ + }) return defer.promise } else if (message.type === 'sync done') { var self = this diff --git a/src/Database.js b/src/Database.js index ed59119a..f529c2ab 100644 --- a/src/Database.js +++ b/src/Database.js @@ -81,10 +81,6 @@ export default function extendDatabase (Y /* :any */) { function garbageCollect () { return os.whenTransactionsFinished().then(function () { if (os.gcTimeout > 0 && (os.gc1.length > 0 || os.gc2.length > 0)) { - // debug - if (os.y.connector.isSynced === false) { - debugger - } if (!os.y.connector.isSynced) { console.warn('gc should be empty when not synced!') } diff --git a/src/Transaction.js b/src/Transaction.js index e58e348c..4dd4f1fd 100644 --- a/src/Transaction.js +++ b/src/Transaction.js @@ -366,6 +366,7 @@ export default function extendTransaction (Y) { operations that can be gc'd and add them to the garbage collector. */ * garbageCollectAfterSync () { + // debugger if (this.store.gc1.length > 0 || this.store.gc2.length > 0) { console.warn('gc should be empty after sync') } @@ -459,16 +460,8 @@ export default function extendTransaction (Y) { if (o.originOf != null && o.originOf.length > 0) { // find new origin of right ops - // origin is the first left deleted operation + // origin is the first left operation var neworigin = o.left - var neworigin_ = null - while (neworigin != null) { - neworigin_ = yield * this.getInsertion(neworigin) - if (neworigin_.deleted) { - break - } - neworigin = neworigin_.left - } // reset origin of all right ops (except first right - duh!), @@ -513,6 +506,7 @@ export default function extendTransaction (Y) { } } if (neworigin != null) { + var neworigin_ = yield * this.getInsertion(neworigin) if (neworigin_.originOf == null) { neworigin_.originOf = o.originOf } else { @@ -937,29 +931,38 @@ export default function extendTransaction (Y) { var send = [] var endSV = yield * this.getStateVector() - for (var endState of endSV) { - var user = endState.user + for (let endState of endSV) { + let user = endState.user if (user === '_') { continue } - var startPos = startSS[user] || 0 + let startPos = startSS[user] || 0 if (startPos > 0) { // There is a change that [user, startPos] is in a composed Insertion (with a smaller counter) // find out if that is the case - var firstMissing = yield * this.getInsertion([user, startPos]) + let firstMissing = yield * this.getInsertion([user, startPos]) if (firstMissing != null) { // update startPos startPos = firstMissing.id[1] - startSS[user] = startPos } } + startSS[user] = startPos + } + for (let endState of endSV) { + let user = endState.user + let startPos = startSS[user] + if (user === '_') { + continue + } yield * this.os.iterate(this, [user, startPos], [user, Number.MAX_VALUE], function * (op) { op = Y.Struct[op.struct].encode(op) if (op.struct !== 'Insert') { send.push(op) } else if (op.right == null || op.right[1] < (startSS[op.right[0]] || 0)) { // case 1. op.right is known - var o = op + // this case is only reached if op.right is known. + // => this is not called for op.left, as op.right is unknown + let o = op // Remember: ? // -> set op.right // 1. to the first operation that is known (according to startSS) @@ -972,11 +975,14 @@ export default function extendTransaction (Y) { if (o.left == null) { op.left = null send.push(op) - if (!Y.utils.compareIds(o.id, op.id)) { + /* not necessary, as o is already sent.. + if (!Y.utils.compareIds(o.id, op.id) && o.id[1] >= (startSS[o.id[0]] || 0)) { + // o is not op && o is unknown o = Y.Struct[op.struct].encode(o) o.right = missingOrigins[missingOrigins.length - 1].id send.push(o) } + */ break } o = yield * this.getInsertion(o.left) @@ -1012,6 +1018,11 @@ export default function extendTransaction (Y) { } }) } + if (send.some(o => send.filter(p => Y.utils.compareIds(o.id, p.id)).length > 1)) { + console.warn('getOperations os contains duplicates') + console.warn(send.map(o => send.filter(p => Y.utils.compareIds(o.id, p.id)).length)) + debugger + } return send.reverse() } /* diff --git a/src/Utils.js b/src/Utils.js index 7e844d6a..c18f112a 100644 --- a/src/Utils.js +++ b/src/Utils.js @@ -610,6 +610,7 @@ export default function Utils (Y) { } } } + return false } Y.utils.matchesId = matchesId diff --git a/tests-lib/helper.js b/tests-lib/helper.js index bdba1501..93164d89 100644 --- a/tests-lib/helper.js +++ b/tests-lib/helper.js @@ -30,12 +30,14 @@ export async function compareUsers (t, users) { await wait(100) } await flushAll(t, users) - - await users[0].db.garbageCollect() - await users[0].db.garbageCollect() + await wait() + await flushAll(t, users) var userTypeContents = users.map(u => u.share.array._content.map(c => c.val || JSON.stringify(c.type))) + await users[0].db.garbageCollect() + await users[0].db.garbageCollect() + // disconnect all except user 0 await Promise.all(users.slice(1).map(async u => u.disconnect()