fixed several bugs in multi join/rejoin

This commit is contained in:
Kevin Jahns 2015-10-08 02:12:20 +02:00
parent 1ace3e3120
commit 6a13419c62
7 changed files with 142 additions and 71 deletions

View File

@ -55,6 +55,9 @@ class AbstractConnector {
this.currentSyncTarget = null this.currentSyncTarget = null
this.findNextSyncTarget() this.findNextSyncTarget()
} }
this.syncingClients = this.syncingClients.filter(function (cli) {
return cli !== user
})
for (var f of this.userEventListeners) { for (var f of this.userEventListeners) {
f({ f({
action: 'userLeft', action: 'userLeft',
@ -142,7 +145,7 @@ class AbstractConnector {
return return
} }
if (this.debug) { if (this.debug) {
console.log(`receive ${sender} -> ${this.userId}: ${m.type}`, m) // eslint-disable-line console.log(`receive ${sender} -> ${this.userId}: ${m.type}`, JSON.parse(JSON.stringify(m))) // eslint-disable-line
} }
if (m.type === 'sync step 1') { if (m.type === 'sync step 1') {
// TODO: make transaction, stream the ops // TODO: make transaction, stream the ops
@ -168,6 +171,7 @@ class AbstractConnector {
conn.send(sender, { conn.send(sender, {
type: 'sync done' type: 'sync done'
}) })
conn._setSyncedWith(sender)
}, conn.syncingClientDuration) }, conn.syncingClientDuration)
} else { } else {
conn.send(sender, { conn.send(sender, {
@ -199,11 +203,7 @@ class AbstractConnector {
} }
}) })
} else if (m.type === 'sync done') { } else if (m.type === 'sync done') {
this.connections[sender].isSynced = true this._setSyncedWith(sender)
if (sender === this.currentSyncTarget) {
this.currentSyncTarget = null
this.findNextSyncTarget()
}
} else if (m.type === 'update') { } else if (m.type === 'update') {
if (this.forwardToSyncingClients) { if (this.forwardToSyncingClients) {
for (var client of this.syncingClients) { for (var client of this.syncingClients) {
@ -213,6 +213,16 @@ class AbstractConnector {
this.y.db.apply(m.ops) this.y.db.apply(m.ops)
} }
} }
_setSyncedWith (user) {
var conn = this.connections[user]
if (conn != null) {
conn.isSynced = true
}
if (user === this.currentSyncTarget) {
this.currentSyncTarget = null
this.findNextSyncTarget()
}
}
/* /*
Currently, the HB encodes operations as JSON. For the moment I want to keep it Currently, the HB encodes operations as JSON. For the moment I want to keep it
that way. Maybe we support encoding in the HB as XML in the future, but for now I don't want that way. Maybe we support encoding in the HB as XML in the future, but for now I don't want

View File

@ -58,25 +58,36 @@ class Test extends Y.AbstractConnector {
this.setUserId((userIdCounter++) + '') this.setUserId((userIdCounter++) + '')
globalRoom.addUser(this) globalRoom.addUser(this)
this.globalRoom = globalRoom this.globalRoom = globalRoom
this.syncingClientDuration = 0
} }
receiveMessage (sender, m) { receiveMessage (sender, m) {
super.receiveMessage(sender, JSON.parse(JSON.stringify(m))) super.receiveMessage(sender, JSON.parse(JSON.stringify(m)))
} }
send (userId, message) { send (userId, message) {
globalRoom.buffers[userId].push(JSON.parse(JSON.stringify([this.userId, message]))) var buffer = globalRoom.buffers[userId]
if (buffer != null) {
buffer.push(JSON.parse(JSON.stringify([this.userId, message])))
}
} }
broadcast (message) { broadcast (message) {
for (var key in globalRoom.buffers) { for (var key in globalRoom.buffers) {
globalRoom.buffers[key].push(JSON.parse(JSON.stringify([this.userId, message]))) globalRoom.buffers[key].push(JSON.parse(JSON.stringify([this.userId, message])))
} }
} }
isDisconnected () {
return globalRoom.users[this.userId] == null
}
reconnect () { reconnect () {
globalRoom.addUser(this) if (this.isDisconnected()) {
super.reconnect() globalRoom.addUser(this)
super.reconnect()
}
} }
disconnect () { disconnect () {
globalRoom.removeUser(this.userId) if (!this.isDisconnected()) {
super.disconnect() globalRoom.removeUser(this.userId)
super.disconnect()
}
} }
flush () { flush () {
var buff = globalRoom.buffers[this.userId] var buff = globalRoom.buffers[this.userId]
@ -107,6 +118,9 @@ class Test extends Y.AbstractConnector {
wait(0).then(nextFlush) wait(0).then(nextFlush)
}) })
} }
/*
Flushes an operation for some user..
*/
flushOne () { flushOne () {
flushOne() flushOne()
} }

View File

@ -36,7 +36,7 @@ function wait (t) {
return new Promise(function (resolve) { return new Promise(function (resolve) {
setTimeout(function () { setTimeout(function () {
resolve() resolve()
}, t) }, t * 7)
}) })
} }
g.wait = wait g.wait = wait
@ -71,29 +71,40 @@ g.applyRandomTransactions = async(function * applyRandomTransactions (users, obj
var f = getRandom(transactions) var f = getRandom(transactions)
f(root) f(root)
} }
function applyTransactions (relAmount) { function * applyTransactions (relAmount) {
for (var i = 0; i < numberOfTransactions * relAmount + 1; i++) { for (var i = 0; i < numberOfTransactions * relAmount + 1; i++) {
var r = Math.random() var r = Math.random()
if (r >= 0.9) { if (r >= 0.9) {
// 10% chance to flush // 10% chance to flush
users[0].connector.flushOne() users[0].connector.flushOne() // flushes for some user.. (not necessarily 0)
} else { } else if (r >= 0.1) {
// 80% chance to create operation
randomTransaction(getRandom(objects)) randomTransaction(getRandom(objects))
} else {
// 10% chance to disconnect/reconnect
var u = getRandom(users)
if (u.connector.isDisconnected()) {
u.reconnect()
} else {
u.disconnect()
}
} }
wait() yield wait()
} }
} }
applyTransactions(0.5) yield* applyTransactions(0.5)
yield users[0].connector.flushAll() yield users[0].connector.flushAll()
yield g.garbageCollectAllUsers(users) yield g.garbageCollectAllUsers(users)
yield wait() yield wait()
users[0].disconnect() users[0].disconnect()
yield wait() yield wait()
applyTransactions(0.5) yield* applyTransactions(0.5)
yield users[0].connector.flushAll() yield users[0].connector.flushAll()
yield wait(50)
for (var u in users) {
users[u].reconnect()
}
yield wait(100) yield wait(100)
users[0].reconnect()
yield wait()
yield users[0].connector.flushAll() yield users[0].connector.flushAll()
}) })

View File

@ -158,57 +158,55 @@ class AbstractTransaction {
} }
} }
* garbageCollectOperation (id) { * garbageCollectOperation (id) {
var o = yield* this.getOperation(id)
if (o == null) {
return
}
if (!o.deleted) {
yield* this.deleteOperation(id)
o = yield* this.getOperation(id)
}
// TODO: I don't think that this is necessary!!
// check to increase the state of the respective user // check to increase the state of the respective user
var state = yield* this.getState(id[0]) var state = yield* this.getState(id[0])
if (state.clock === id[1]) { if (state.clock === id[1]) {
state.clock++
// also check if more expected operations were gc'd // also check if more expected operations were gc'd
yield* this.checkDeleteStoreForState(state) yield* this.checkDeleteStoreForState(state)
// then set the state // then set the state
yield* this.setState(state) yield* this.setState(state)
} }
this.ds.markGarbageCollected(id)
// remove gc'd op from the left op, if it exists // if op exists, then clean that mess up..
if (o.left != null) { var o = yield* this.getOperation(id)
var left = yield* this.getOperation(o.left) if (o != null) {
left.right = o.right if (!o.deleted) {
yield* this.setOperation(left) yield* this.deleteOperation(id)
o = yield* this.getOperation(id)
}
// remove gc'd op from the left op, if it exists
if (o.left != null) {
var left = yield* this.getOperation(o.left)
left.right = o.right
yield* this.setOperation(left)
}
// remove gc'd op from the right op, if it exists
if (o.right != null) {
var right = yield* this.getOperation(o.right)
right.left = o.left
yield* this.setOperation(right)
}
// remove gc'd op from parent, if it exists
var parent = yield* this.getOperation(o.parent)
var setParent = false // whether to save parent to the os
if (Y.utils.compareIds(parent.start, o.id)) {
// gc'd op is the start
setParent = true
parent.start = o.right
}
if (Y.utils.compareIds(parent.end, o.id)) {
// gc'd op is the end
setParent = true
parent.end = o.left
}
if (setParent) {
yield* this.setOperation(parent)
}
yield* this.removeOperation(o.id) // actually remove it from the os
} }
// remove gc'd op from the right op, if it exists
if (o.right != null) {
var right = yield* this.getOperation(o.right)
right.left = o.left
yield* this.setOperation(right)
}
// remove gc'd op from parent, if it exists
var parent = yield* this.getOperation(o.parent)
var setParent = false // whether to save parent to the os
if (Y.utils.compareIds(parent.start, o.id)) {
// gc'd op is the start
setParent = true
parent.start = o.right
}
if (Y.utils.compareIds(parent.end, o.id)) {
// gc'd op is the end
setParent = true
parent.end = o.left
}
if (setParent) {
yield* this.setOperation(parent)
}
yield* this.removeOperation(o.id) // actually remove it from the os
this.ds.markGarbageCollected(o.id)
} }
} }
Y.AbstractTransaction = AbstractTransaction Y.AbstractTransaction = AbstractTransaction
@ -276,8 +274,20 @@ class AbstractOperationStore {
} }
} }
stopGarbageCollector () { stopGarbageCollector () {
this.gc1 = [] var self = this
this.gc2 = [] return new Promise(function (resolve) {
self.requestTransaction(function * () {
var ungc = self.gc1.concat(self.gc2)
self.gc1 = []
self.gc2 = []
for (var i in ungc) {
var op = yield* this.getOperation(ungc[i])
delete op.gc
yield* this.setOperation(op)
}
resolve()
})
})
} }
garbageCollectAfterSync () { garbageCollectAfterSync () {
var os = this.os var os = this.os
@ -307,7 +317,7 @@ class AbstractOperationStore {
op.deleted === true && op.deleted === true &&
this.y.connector.isSynced && this.y.connector.isSynced &&
left != null && left != null &&
left.deleted left.deleted === true
) { ) {
op.gc = true op.gc = true
this.gc1.push(op.id) this.gc1.push(op.id)

View File

@ -5,7 +5,8 @@ class DeleteStore extends Y.utils.RBTree {
constructor () { constructor () {
super() super()
// TODO: debugggg // TODO: debugggg
this.mem = [] this.mem = [];
this.memDS = [];
} }
isDeleted (id) { isDeleted (id) {
var n = this.findNodeWithUpperBound(id) var n = this.findNodeWithUpperBound(id)
@ -17,7 +18,7 @@ class DeleteStore extends Y.utils.RBTree {
returns the delete node returns the delete node
*/ */
markGarbageCollected (id) { markGarbageCollected (id) {
this.mem.push({"gc": id}) this.mem.push({"gc": id});
var n = this.markDeleted(id) var n = this.markDeleted(id)
this.mem.pop() this.mem.pop()
if (!n.val.gc) { if (!n.val.gc) {
@ -64,7 +65,7 @@ class DeleteStore extends Y.utils.RBTree {
returns the delete node returns the delete node
*/ */
markDeleted (id) { markDeleted (id) {
this.mem.push({"del": id}) this.mem.push({"del": id});
var n = this.findNodeWithUpperBound(id) var n = this.findNodeWithUpperBound(id)
if (n != null && n.val.id[0] === id[0]) { if (n != null && n.val.id[0] === id[0]) {
if (n.val.id[1] <= id[1] && id[1] < n.val.id[1] + n.val.len) { if (n.val.id[1] <= id[1] && id[1] < n.val.id[1] + n.val.len) {
@ -124,6 +125,8 @@ Y.Memory = (function () {
this.ss = store.ss this.ss = store.ss
this.os = store.os this.os = store.os
this.ds = store.ds this.ds = store.ds
this.memDS = store.ds.memDS; // TODO: remove
} }
* checkDeleteStoreForState (state) { * checkDeleteStoreForState (state) {
var n = this.ds.findNodeWithUpperBound([state.user, state.clock]) var n = this.ds.findNodeWithUpperBound([state.user, state.clock])
@ -145,6 +148,12 @@ Y.Memory = (function () {
deletions.push([user, c, gc]) deletions.push([user, c, gc])
} }
} }
var memAction = {
before: yield* this.getDeleteSet(),
applied: JSON.parse(JSON.stringify(ds))
};
for (var user in ds) { for (var user in ds) {
var dv = ds[user] var dv = ds[user]
var pos = 0 var pos = 0
@ -206,6 +215,8 @@ Y.Memory = (function () {
yield* this.deleteOperation(id) yield* this.deleteOperation(id)
} }
} }
memAction.after = yield* this.getDeleteSet();
this.memDS.push(memAction);
} }
* isDeleted (id) { * isDeleted (id) {
return this.ds.isDeleted(id) return this.ds.isDeleted(id)

View File

@ -46,7 +46,7 @@ describe('Memory', function () {
ds.markGarbageCollected(['291', 2]) ds.markGarbageCollected(['291', 2])
expect(ds.toDeleteSet()).toEqual({'291': [[2, 1, true]], '293': [[0, 1, true], [1, 1, false]]}) expect(ds.toDeleteSet()).toEqual({'291': [[2, 1, true]], '293': [[0, 1, true], [1, 1, false]]})
}) })
it('Debug #2', function () { it('Debug #3', function () {
ds.markDeleted(['581', 0]) ds.markDeleted(['581', 0])
ds.markDeleted(['581', 1]) ds.markDeleted(['581', 1])
ds.markDeleted(['580', 0]) ds.markDeleted(['580', 0])
@ -62,7 +62,7 @@ describe('Memory', function () {
ds.markGarbageCollected(['580', 1]) ds.markGarbageCollected(['580', 1])
expect(ds.toDeleteSet()).toEqual({'580': [[0, 1, false], [1, 1, true], [2, 1, false]], '581': [[0, 3, true]]}) expect(ds.toDeleteSet()).toEqual({'580': [[0, 1, false], [1, 1, true], [2, 1, false]], '581': [[0, 3, true]]})
}) })
it('Debug #2', function () { it('Debug #4', function () {
ds.markDeleted(['544', 0]) ds.markDeleted(['544', 0])
ds.markDeleted(['543', 2]) ds.markDeleted(['543', 2])
ds.markDeleted(['544', 0]) ds.markDeleted(['544', 0])
@ -81,5 +81,20 @@ describe('Memory', function () {
ds.markGarbageCollected(['543', 3]) ds.markGarbageCollected(['543', 3])
expect(ds.toDeleteSet()).toEqual({'543': [[2, 3, true]], '544': [[0, 1, true], [1, 1, false], [2, 1, true]], '545': [[1, 1, false]]}) expect(ds.toDeleteSet()).toEqual({'543': [[2, 3, true]], '544': [[0, 1, true], [1, 1, false], [2, 1, true]], '545': [[1, 1, false]]})
}) })
it('Debug #5', async(function * (done) {
var store = new Y.Memory(null, {
db: {
name: 'Memory',
gcTimeout: -1
}
})
store.requestTransaction(function * () {
yield* this.applyDeleteSet({'16': [[1, 2, false]], '17': [[0, 1, true], [1, 3, false]]})
expect(this.ds.toDeleteSet()).toEqual({'16': [[1, 2, false]], '17': [[0, 1, true], [1, 3, false]]})
yield* this.applyDeleteSet({'16': [[1, 2, false]], '17': [[0, 4, true]]})
expect(this.ds.toDeleteSet()).toEqual({'16': [[1, 2, false]], '17': [[0, 4, true]]})
done()
})
}))
}) })
}) })

View File

@ -1,8 +1,8 @@
/* global createUsers, wait, Y, compareAllUsers, getRandomNumber, applyRandomTransactions, async, garbageCollectAllUsers, describeManyTimes */ /* global createUsers, wait, Y, compareAllUsers, getRandomNumber, applyRandomTransactions, async, garbageCollectAllUsers, describeManyTimes */
/* eslint-env browser,jasmine */ /* eslint-env browser,jasmine */
var numberOfYArrayTests = 200 var numberOfYArrayTests = 20
var repeatArrayTests = 1 var repeatArrayTests = 1000
describe('Array Type', function () { describe('Array Type', function () {
var y1, y2, y3, yconfig1, yconfig2, yconfig3, flushAll var y1, y2, y3, yconfig1, yconfig2, yconfig3, flushAll