reworked getOperations (decrease size of sent operations, fixe some gc issues). garbageCollectOperation now sets origin to the direct left operation, if possible

This commit is contained in:
Kevin Jahns 2017-07-03 23:19:11 -07:00
parent 66de422749
commit 382d06f6d4
5 changed files with 38 additions and 28 deletions

View File

@ -185,7 +185,7 @@ export default function extendConnector (Y/* :any */) {
this.currentSyncTarget = syncUser this.currentSyncTarget = syncUser
this.y.db.requestTransaction(function * () { this.y.db.requestTransaction(function * () {
var stateSet = yield * this.getStateSet() var stateSet = yield * this.getStateSet()
var deleteSet = yield * this.getDeleteSet() // var deleteSet = yield * this.getDeleteSet()
var answer = { var answer = {
type: 'sync step 1', type: 'sync step 1',
stateSet: stateSet, stateSet: stateSet,
@ -346,21 +346,21 @@ export default function extendConnector (Y/* :any */) {
let m = message let m = message
// apply operations first // apply operations first
db.requestTransaction(function * () { db.requestTransaction(function * () {
yield * this.applyDeleteSet(m.deleteSet) // yield * this.applyDeleteSet(m.deleteSet)
if (m.osUntransformed != null) { if (m.osUntransformed != null) {
yield * this.applyOperationsUntransformed(m.osUntransformed, m.stateSet) yield * this.applyOperationsUntransformed(m.osUntransformed, m.stateSet)
} else { } else {
this.store.apply(m.os) this.store.apply(m.os)
} }
defer.resolve() // defer.resolve()
}) })
/*/ then apply ds // then apply ds
db.whenTransactionsFinished().then(() => { db.whenTransactionsFinished().then(() => {
db.requestTransaction(function * () { db.requestTransaction(function * () {
yield * this.applyDeleteSet(m.deleteSet) yield * this.applyDeleteSet(m.deleteSet)
}) })
defer.resolve() defer.resolve()
})*/ })
return defer.promise return defer.promise
} else if (message.type === 'sync done') { } else if (message.type === 'sync done') {
var self = this var self = this

View File

@ -81,10 +81,6 @@ export default function extendDatabase (Y /* :any */) {
function garbageCollect () { function garbageCollect () {
return os.whenTransactionsFinished().then(function () { return os.whenTransactionsFinished().then(function () {
if (os.gcTimeout > 0 && (os.gc1.length > 0 || os.gc2.length > 0)) { if (os.gcTimeout > 0 && (os.gc1.length > 0 || os.gc2.length > 0)) {
// debug
if (os.y.connector.isSynced === false) {
debugger
}
if (!os.y.connector.isSynced) { if (!os.y.connector.isSynced) {
console.warn('gc should be empty when not synced!') console.warn('gc should be empty when not synced!')
} }

View File

@ -366,6 +366,7 @@ export default function extendTransaction (Y) {
operations that can be gc'd and add them to the garbage collector. operations that can be gc'd and add them to the garbage collector.
*/ */
* garbageCollectAfterSync () { * garbageCollectAfterSync () {
// debugger
if (this.store.gc1.length > 0 || this.store.gc2.length > 0) { if (this.store.gc1.length > 0 || this.store.gc2.length > 0) {
console.warn('gc should be empty after sync') console.warn('gc should be empty after sync')
} }
@ -459,16 +460,8 @@ export default function extendTransaction (Y) {
if (o.originOf != null && o.originOf.length > 0) { if (o.originOf != null && o.originOf.length > 0) {
// find new origin of right ops // find new origin of right ops
// origin is the first left deleted operation // origin is the first left operation
var neworigin = o.left var neworigin = o.left
var neworigin_ = null
while (neworigin != null) {
neworigin_ = yield * this.getInsertion(neworigin)
if (neworigin_.deleted) {
break
}
neworigin = neworigin_.left
}
// reset origin of all right ops (except first right - duh!), // reset origin of all right ops (except first right - duh!),
@ -513,6 +506,7 @@ export default function extendTransaction (Y) {
} }
} }
if (neworigin != null) { if (neworigin != null) {
var neworigin_ = yield * this.getInsertion(neworigin)
if (neworigin_.originOf == null) { if (neworigin_.originOf == null) {
neworigin_.originOf = o.originOf neworigin_.originOf = o.originOf
} else { } else {
@ -937,29 +931,38 @@ export default function extendTransaction (Y) {
var send = [] var send = []
var endSV = yield * this.getStateVector() var endSV = yield * this.getStateVector()
for (var endState of endSV) { for (let endState of endSV) {
var user = endState.user let user = endState.user
if (user === '_') { if (user === '_') {
continue continue
} }
var startPos = startSS[user] || 0 let startPos = startSS[user] || 0
if (startPos > 0) { if (startPos > 0) {
// There is a change that [user, startPos] is in a composed Insertion (with a smaller counter) // There is a change that [user, startPos] is in a composed Insertion (with a smaller counter)
// find out if that is the case // find out if that is the case
var firstMissing = yield * this.getInsertion([user, startPos]) let firstMissing = yield * this.getInsertion([user, startPos])
if (firstMissing != null) { if (firstMissing != null) {
// update startPos // update startPos
startPos = firstMissing.id[1] startPos = firstMissing.id[1]
startSS[user] = startPos
} }
} }
startSS[user] = startPos
}
for (let endState of endSV) {
let user = endState.user
let startPos = startSS[user]
if (user === '_') {
continue
}
yield * this.os.iterate(this, [user, startPos], [user, Number.MAX_VALUE], function * (op) { yield * this.os.iterate(this, [user, startPos], [user, Number.MAX_VALUE], function * (op) {
op = Y.Struct[op.struct].encode(op) op = Y.Struct[op.struct].encode(op)
if (op.struct !== 'Insert') { if (op.struct !== 'Insert') {
send.push(op) send.push(op)
} else if (op.right == null || op.right[1] < (startSS[op.right[0]] || 0)) { } else if (op.right == null || op.right[1] < (startSS[op.right[0]] || 0)) {
// case 1. op.right is known // case 1. op.right is known
var o = op // this case is only reached if op.right is known.
// => this is not called for op.left, as op.right is unknown
let o = op
// Remember: ? // Remember: ?
// -> set op.right // -> set op.right
// 1. to the first operation that is known (according to startSS) // 1. to the first operation that is known (according to startSS)
@ -972,11 +975,14 @@ export default function extendTransaction (Y) {
if (o.left == null) { if (o.left == null) {
op.left = null op.left = null
send.push(op) send.push(op)
if (!Y.utils.compareIds(o.id, op.id)) { /* not necessary, as o is already sent..
if (!Y.utils.compareIds(o.id, op.id) && o.id[1] >= (startSS[o.id[0]] || 0)) {
// o is not op && o is unknown
o = Y.Struct[op.struct].encode(o) o = Y.Struct[op.struct].encode(o)
o.right = missingOrigins[missingOrigins.length - 1].id o.right = missingOrigins[missingOrigins.length - 1].id
send.push(o) send.push(o)
} }
*/
break break
} }
o = yield * this.getInsertion(o.left) o = yield * this.getInsertion(o.left)
@ -1012,6 +1018,11 @@ export default function extendTransaction (Y) {
} }
}) })
} }
if (send.some(o => send.filter(p => Y.utils.compareIds(o.id, p.id)).length > 1)) {
console.warn('getOperations os contains duplicates')
console.warn(send.map(o => send.filter(p => Y.utils.compareIds(o.id, p.id)).length))
debugger
}
return send.reverse() return send.reverse()
} }
/* /*

View File

@ -610,6 +610,7 @@ export default function Utils (Y) {
} }
} }
} }
return false
} }
Y.utils.matchesId = matchesId Y.utils.matchesId = matchesId

View File

@ -30,12 +30,14 @@ export async function compareUsers (t, users) {
await wait(100) await wait(100)
} }
await flushAll(t, users) await flushAll(t, users)
await wait()
await users[0].db.garbageCollect() await flushAll(t, users)
await users[0].db.garbageCollect()
var userTypeContents = users.map(u => u.share.array._content.map(c => c.val || JSON.stringify(c.type))) var userTypeContents = users.map(u => u.share.array._content.map(c => c.val || JSON.stringify(c.type)))
await users[0].db.garbageCollect()
await users[0].db.garbageCollect()
// disconnect all except user 0 // disconnect all except user 0
await Promise.all(users.slice(1).map(async u => await Promise.all(users.slice(1).map(async u =>
u.disconnect() u.disconnect()