late sync with insertions only work now

This commit is contained in:
Kevin Jahns 2015-07-25 23:26:52 +00:00
parent 5e0d602e12
commit 56165a3c10
7 changed files with 182 additions and 32 deletions

View File

@ -26,6 +26,16 @@ class AbstractConnector { // eslint-disable-line no-unused-vars
this.debug = opts.debug === true this.debug = opts.debug === true
this.broadcastedHB = false this.broadcastedHB = false
} }
reconnect () {
}
disconnect () {
this.connections = {}
this.isSynced = false
this.currentSyncTarget = null
this.broadcastedHB = false
this.syncingClients = []
this.whenSyncedListeners = []
}
setUserId (userId) { setUserId (userId) {
this.userId = userId this.userId = userId
this.y.db.setUserId(userId) this.y.db.setUserId(userId)
@ -97,7 +107,8 @@ class AbstractConnector { // eslint-disable-line no-unused-vars
this.y.db.requestTransaction(function *() { this.y.db.requestTransaction(function *() {
conn.send(syncUser, { conn.send(syncUser, {
type: 'sync step 1', type: 'sync step 1',
stateVector: yield* this.getStateVector() stateSet: yield* this.getStateSet(),
deleteSet: yield* this.getDeleteSet()
}) })
}) })
} }
@ -107,13 +118,12 @@ class AbstractConnector { // eslint-disable-line no-unused-vars
for (var f of this.whenSyncedListeners) { for (var f of this.whenSyncedListeners) {
f() f()
} }
this.whenSyncedListeners = null this.whenSyncedListeners = []
} }
} }
send (uid, message) { send (uid, message) {
if (this.debug) { if (this.debug) {
console.log(`me -> ${uid}: ${message.type}`);// eslint-disable-line console.log(`me -> ${uid}: ${message.type}`, m);// eslint-disable-line
console.dir(m); // eslint-disable-line
} }
super(uid, message) super(uid, message)
} }
@ -123,19 +133,27 @@ class AbstractConnector { // eslint-disable-line no-unused-vars
return return
} }
if (this.debug) { if (this.debug) {
console.log(`${sender} -> me: ${m.type}`);// eslint-disable-line console.log(`${sender} -> me: ${m.type}`, m);// eslint-disable-line
console.dir(m); // eslint-disable-line
} }
if (m.type === 'sync step 1') { if (m.type === 'sync step 1') {
// TODO: make transaction, stream the ops // TODO: make transaction, stream the ops
let conn = this let conn = this
this.y.db.requestTransaction(function *() { this.y.db.requestTransaction(function *() {
var ops = yield* this.getOperations(m.stateVector) var ops = yield* this.getOperations(m.stateSet)
var sv = yield* this.getStateVector() var dels = yield* this.getOpsFromDeleteSet(m.deleteSet)
if (dels.length > 0) {
this.store.apply(dels)
// broadcast missing dels from syncing client
this.store.y.connector.broadcast({
type: 'update',
ops: dels
})
}
conn.send(sender, { conn.send(sender, {
type: 'sync step 2', type: 'sync step 2',
os: ops, os: ops,
stateVector: sv stateSet: yield* this.getStateSet(),
deleteSet: yield* this.getDeleteSet()
}) })
if (this.forwardToSyncingClients) { if (this.forwardToSyncingClients) {
conn.syncingClients.push(sender) conn.syncingClients.push(sender)
@ -159,7 +177,10 @@ class AbstractConnector { // eslint-disable-line no-unused-vars
var broadcastHB = !this.broadcastedHB var broadcastHB = !this.broadcastedHB
this.broadcastedHB = true this.broadcastedHB = true
this.y.db.requestTransaction(function *() { this.y.db.requestTransaction(function *() {
var ops = yield* this.getOperations(m.stateVector) var ops = yield* this.getOperations(m.stateSet)
var dels = yield* this.getOpsFromDeleteSet(m.deleteSet)
this.store.apply(dels)
this.store.apply(m.os)
if (ops.length > 0) { if (ops.length > 0) {
m = { m = {
type: 'update', type: 'update',

View File

@ -64,8 +64,13 @@ class Test extends AbstractConnector {
globalRoom.buffers[key].push(JSON.parse(JSON.stringify([this.userId, message]))) globalRoom.buffers[key].push(JSON.parse(JSON.stringify([this.userId, message])))
} }
} }
reconnect () {
globalRoom.addUser(this)
super()
}
disconnect () { disconnect () {
globalRoom.removeUser(this.userId) globalRoom.removeUser(this.userId)
super()
} }
flush () { flush () {
var buff = globalRoom.buffers[this.userId] var buff = globalRoom.buffers[this.userId]

View File

@ -38,16 +38,27 @@ async function applyRandomTransactions (users, objects, transactions, numberOfTr
var f = getRandom(transactions) var f = getRandom(transactions)
f(root) f(root)
} }
for (var i = 0; i < numberOfTransactions; i++) { function applyTransactions () {
var r = Math.random() for (var i = 0; i < numberOfTransactions / 2 + 1; i++) {
if (r >= 0.9) { var r = Math.random()
// 10% chance to flush if (r >= 0.9) {
users[0].connector.flushOne() // 10% chance to flush
} else { users[0].connector.flushOne()
randomTransaction(getRandom(objects)) } else {
randomTransaction(getRandom(objects))
}
wait()
} }
wait()
} }
applyTransactions()
await users[0].connector.flushAll()
users[0].disconnect()
await wait()
applyTransactions()
await users[0].connector.flushAll()
users[0].reconnect()
await wait()
await users[0].connector.flushAll()
} }
async function compareAllUsers(users){//eslint-disable-line async function compareAllUsers(users){//eslint-disable-line
@ -55,7 +66,7 @@ async function compareAllUsers(users){//eslint-disable-line
var db1 = [] var db1 = []
function * t1 () { function * t1 () {
s1 = yield* this.getStateSet() s1 = yield* this.getStateSet()
ds1 = yield* this.getDeletionSet() ds1 = yield* this.getDeleteSet()
allDels1 = [] allDels1 = []
yield* this.ds.iterate(null, null, function (d) { yield* this.ds.iterate(null, null, function (d) {
allDels1.push(d) allDels1.push(d)
@ -63,7 +74,7 @@ async function compareAllUsers(users){//eslint-disable-line
} }
function * t2 () { function * t2 () {
s2 = yield* this.getStateSet() s2 = yield* this.getStateSet()
ds2 = yield* this.getDeletionSet() ds2 = yield* this.getDeleteSet()
allDels2 = [] allDels2 = []
yield* this.ds.iterate(null, null, function (d) { yield* this.ds.iterate(null, null, function (d) {
allDels2.push(d) allDels2.push(d)

View File

@ -107,11 +107,12 @@ class DeleteStore extends RBTree { // eslint-disable-line
} }
} }
}) })
for (; pos < dv.len; pos++) { for (; pos < dv.length; pos++) {
d = dv[pos] d = dv[pos]
createDeletions(user, d[0], d[1]) createDeletions(user, d[0], d[1])
} }
} }
return deletions
} }
} }
@ -124,13 +125,13 @@ Y.Memory = (function () { // eslint-disable-line no-unused-vars
this.os = store.os this.os = store.os
this.ds = store.ds this.ds = store.ds
} }
* getDeletionSet (id) { * getDeleteSet (id) {
return this.ds.toDeleteSet(id) return this.ds.toDeleteSet(id)
} }
* isDeleted (id) { * isDeleted (id) {
return this.ds.isDeleted(id) return this.ds.isDeleted(id)
} }
* getDeletions (ds) { * getOpsFromDeleteSet (ds) {
return this.ds.getDeletions(ds) return this.ds.getDeletions(ds)
} }
* setOperation (op) { // eslint-disable-line * setOperation (op) { // eslint-disable-line
@ -201,13 +202,20 @@ Y.Memory = (function () { // eslint-disable-line no-unused-vars
var res = [] var res = []
for (var op of ops) { for (var op of ops) {
res.push(yield* this.makeOperationReady(startSS, op)) res.push(yield* this.makeOperationReady(startSS, op))
var state = startSS[op.id[0]] || 0
if (state === op.id[1]) {
startSS[op.id[0]] = state + 1
} else {
throw new Error('Unexpected operation!')
}
} }
return res return res
} }
* makeOperationReady (ss, op) { * makeOperationReady (ss, op) {
// instead of ss, you could use currSS (a ss that increments when you add an operation) // instead of ss, you could use currSS (a ss that increments when you add an operation)
var clock op = copyObject(op)
var o = op var o = op
var clock
while (o.right != null) { while (o.right != null) {
// while unknown, go to the right // while unknown, go to the right
clock = ss[o.right[0]] clock = ss[o.right[0]]
@ -216,8 +224,16 @@ Y.Memory = (function () { // eslint-disable-line no-unused-vars
} }
o = yield* this.getOperation(o.right) o = yield* this.getOperation(o.right)
} }
op = copyObject(op)
op.right = o.right op.right = o.right
while (o.left != null) {
// while unknown, go to the right
clock = ss[o.left[0]]
if (clock != null && o.left[1] < clock) {
break
}
o = yield* this.getOperation(o.left)
}
op.left = o.left
return op return op
} }
} }

View File

@ -19,5 +19,14 @@ describe('Memory', function () {
expect(ds.isDeleted(['u1', 11])).toBeTruthy() expect(ds.isDeleted(['u1', 11])).toBeTruthy()
expect(ds.toDeleteSet()).toBeTruthy({'u1': [10, 2]}) expect(ds.toDeleteSet()).toBeTruthy({'u1': [10, 2]})
}) })
it('Creates operations', function () {
ds.add({id: ['5', 3], len: 2})
var dels = ds.getDeletions({5: [[4, 1]]})
expect(dels.length === 1).toBeTruthy()
expect(dels[0]).toEqual({
struct: 'Delete',
target: ['5', 3]
})
})
}) })
}) })

View File

@ -1,17 +1,17 @@
/* global createUsers, wait, Y, compareAllUsers, getRandomNumber, applyRandomTransactions */ /* global createUsers, wait, Y, compareAllUsers, getRandomNumber, applyRandomTransactions */
/* eslint-env browser,jasmine */ /* eslint-env browser,jasmine */
var numberOfYArrayTests = 80 var numberOfYArrayTests = 10
describe('Array Type', function () { describe('Array Type', function () {
var y1, y2, y3, flushAll var y1, y2, y3, yconfig1, yconfig2, yconfig3, flushAll
jasmine.DEFAULT_TIMEOUT_INTERVAL = 50000 jasmine.DEFAULT_TIMEOUT_INTERVAL = 5000
beforeEach(async function (done) { beforeEach(async function (done) {
await createUsers(this, 5) await createUsers(this, 5)
y1 = this.users[0].root y1 = (yconfig1 = this.users[0]).root
y2 = this.users[1].root y2 = (yconfig2 = this.users[1]).root
y3 = this.users[2].root y3 = (yconfig3 = this.users[2]).root
flushAll = this.users[0].connector.flushAll flushAll = this.users[0].connector.flushAll
done() done()
}) })
@ -59,6 +59,42 @@ describe('Array Type', function () {
expect(l2.toArray()).toEqual([0, 2, 'y']) expect(l2.toArray()).toEqual([0, 2, 'y'])
done() done()
}) })
it('Handles getOperations ascending ids bug in late sync', async function (done) {
var l1, l2
l1 = await y1.set('Array', Y.Array)
l1.insert(0, ['x', 'y'])
await flushAll()
yconfig3.disconnect()
yconfig2.disconnect()
await wait()
l2 = await y2.get('Array')
l2.insert(1, [2])
l2.insert(1, [3])
await flushAll()
yconfig2.reconnect()
yconfig3.reconnect()
await wait()
await flushAll()
expect(l1.toArray()).toEqual(l2.toArray())
done()
})
it('Handles deletions in late sync', async function (done) {
var l1, l2
l1 = await y1.set('Array', Y.Array)
l1.insert(0, ['x', 'y'])
await flushAll()
yconfig2.disconnect()
await wait()
l2 = await y2.get('Array')
l2.delete(1, 1)
l1.delete(0, 2)
await flushAll()
yconfig2.reconnect()
await wait()
await flushAll()
expect(l1.toArray()).toEqual(l2.toArray())
done()
})
it('Basic insert. Then delete the whole array', async function (done) { it('Basic insert. Then delete the whole array', async function (done) {
var l1, l2, l3 var l1, l2, l3
l1 = await y1.set('Array', Y.Array) l1 = await y1.set('Array', Y.Array)
@ -73,6 +109,42 @@ describe('Array Type', function () {
expect(l2.toArray()).toEqual([]) expect(l2.toArray()).toEqual([])
done() done()
}) })
it('Basic insert. Then delete the whole array (merge listeners on late sync)', async function (done) {
var l1, l2, l3
l1 = await y1.set('Array', Y.Array)
l1.insert(0, ['x', 'y', 'z'])
await flushAll()
yconfig2.disconnect()
l1.delete(0, 3)
l2 = await y2.get('Array')
await wait()
yconfig2.reconnect()
await wait()
l3 = await y3.get('Array')
await flushAll()
expect(l1.toArray()).toEqual(l2.toArray())
expect(l2.toArray()).toEqual(l3.toArray())
expect(l2.toArray()).toEqual([])
done()
})
it('Basic insert. Then delete the whole array (merge deleter on late sync)', async function (done) {
var l1, l2, l3
l1 = await y1.set('Array', Y.Array)
l1.insert(0, ['x', 'y', 'z'])
await flushAll()
yconfig1.disconnect()
l1.delete(0, 3)
l2 = await y2.get('Array')
await wait()
yconfig1.reconnect()
await wait()
l3 = await y3.get('Array')
await flushAll()
expect(l1.toArray()).toEqual(l2.toArray())
expect(l2.toArray()).toEqual(l3.toArray())
expect(l2.toArray()).toEqual([])
done()
})
it('throw insert & delete events', async function (done) { it('throw insert & delete events', async function (done) {
var array = await this.users[0].root.set('array', Y.Array) var array = await this.users[0].root.set('array', Y.Array)
var event var event
@ -97,7 +169,7 @@ describe('Array Type', function () {
done() done()
}) })
}) })
describe(`${numberOfYArrayTests} Random tests`, function () { describe(`Random tests`, function () {
var randomArrayTransactions = [ var randomArrayTransactions = [
function insert (array) { function insert (array) {
array.insert(getRandomNumber(array.toArray().length), [getRandomNumber()]) array.insert(getRandomNumber(array.toArray().length), [getRandomNumber()])
@ -136,6 +208,13 @@ describe('Array Type', function () {
done() done()
}) })
it(`succeed after ${numberOfYArrayTests} actions`, async function (done) { it(`succeed after ${numberOfYArrayTests} actions`, async function (done) {
while (this.users.length > 2) {
this.users.pop().disconnect()
this.arrays.pop()
}
for (var u of this.users) {
u.connector.debug = true
}
await applyRandomTransactions(this.users, this.arrays, randomArrayTransactions, numberOfYArrayTests) await applyRandomTransactions(this.users, this.arrays, randomArrayTransactions, numberOfYArrayTests)
await flushAll() await flushAll()
await compareArrayValues(this.arrays) await compareArrayValues(this.arrays)

View File

@ -29,6 +29,15 @@ class YConfig { // eslint-disable-line no-unused-vars
callback(yconfig) callback(yconfig)
}) })
} }
isConnected () {
return this.connector.isSynced
}
disconnect () {
this.connector.disconnect()
}
reconnect () {
this.connector.reconnect()
}
destroy () { destroy () {
this.connector.disconnect() this.connector.disconnect()
this.db.removeDatabase() this.db.removeDatabase()