late join works (also when activating garbage collector), added some tests to verify (havent tested for large >500 operations)

This commit is contained in:
Kevin Jahns 2015-10-12 15:17:12 +02:00
parent 9c4074e3e3
commit e32aef4c9f
10 changed files with 178 additions and 83 deletions

View File

@ -4,7 +4,7 @@
"description": "A framework for real-time p2p shared editing on arbitrary complex data types", "description": "A framework for real-time p2p shared editing on arbitrary complex data types",
"main": "y.js", "main": "y.js",
"scripts": { "scripts": {
"test": "./node_modules/.bin/gulp test", "test": "node --harmony ./node_modules/.bin/gulp test",
"lint": "./node_modules/.bin/standard", "lint": "./node_modules/.bin/standard",
"build": "./node_modules/.bin/standard build" "build": "./node_modules/.bin/standard build"
}, },

View File

@ -156,13 +156,13 @@ class AbstractConnector {
var currentStateSet = yield* this.getStateSet() var currentStateSet = yield* this.getStateSet()
yield* this.applyDeleteSet(m.deleteSet) yield* this.applyDeleteSet(m.deleteSet)
var ds = yield* this.getDeleteSet()
var ops = yield* this.getOperations(m.stateSet) var ops = yield* this.getOperations(m.stateSet)
ops = JSON.parse(JSON.stringify(ops)) // TODO: don't do something like that!!
conn.send(sender, { conn.send(sender, {
type: 'sync step 2', type: 'sync step 2',
os: ops, os: ops,
stateSet: currentStateSet, stateSet: currentStateSet,
deleteSet: yield* this.getDeleteSet() deleteSet: ds
}) })
if (this.forwardToSyncingClients) { if (this.forwardToSyncingClients) {
conn.syncingClients.push(sender) conn.syncingClients.push(sender)

View File

@ -1,4 +1,4 @@
/* global getRandom, Y, wait */ /* global getRandom, Y, wait, async */
'use strict' 'use strict'
var globalRoom = { var globalRoom = {
@ -82,19 +82,25 @@ class Test extends Y.AbstractConnector {
globalRoom.addUser(this) globalRoom.addUser(this)
super.reconnect() super.reconnect()
} }
return this.flushAll()
} }
disconnect () { disconnect () {
if (!this.isDisconnected()) { if (!this.isDisconnected()) {
globalRoom.removeUser(this.userId) globalRoom.removeUser(this.userId)
super.disconnect() super.disconnect()
} }
return wait()
} }
flush () { flush () {
var buff = globalRoom.buffers[this.userId] var self = this
while (buff.length > 0) { return async(function * () {
var m = buff.shift() yield wait()
while (globalRoom.buffers[self.userId].length > 0) {
var m = globalRoom.buffers[self.userId].shift()
this.receiveMessage(m[0], m[1]) this.receiveMessage(m[0], m[1])
yield wait()
} }
})
} }
flushAll () { flushAll () {
return new Promise(function (resolve) { return new Promise(function (resolve) {
@ -115,7 +121,7 @@ class Test extends Y.AbstractConnector {
} }
// in the case that there are // in the case that there are
// still actions that want to be performed // still actions that want to be performed
wait(0).then(nextFlush) wait().then(nextFlush)
}) })
} }
/* /*

View File

@ -66,56 +66,68 @@ function getRandomNumber (n) {
} }
g.getRandomNumber = getRandomNumber g.getRandomNumber = getRandomNumber
g.applyRandomTransactions = async(function * applyRandomTransactions (users, objects, transactions, numberOfTransactions) { function * applyTransactions (relAmount, numberOfTransactions, objects, users, transactions) {
function randomTransaction (root) { function randomTransaction (root) {
var f = getRandom(transactions) var f = getRandom(transactions)
f(root) f(root)
} }
function * applyTransactions (relAmount) {
for (var i = 0; i < numberOfTransactions * relAmount + 1; i++) { for (var i = 0; i < numberOfTransactions * relAmount + 1; i++) {
var r = Math.random() var r = Math.random()
if (r >= 0.9) { if (r >= 0.5) {
// 10% chance to flush // 50% chance to flush
users[0].connector.flushOne() // flushes for some user.. (not necessarily 0) users[0].connector.flushOne() // flushes for some user.. (not necessarily 0)
} else if (r >= 0.1) { } else if (r >= 0.05) {
// 80% chance to create operation // 45% chance to create operation
randomTransaction(getRandom(objects)) randomTransaction(getRandom(objects))
} else { } else {
// 10% chance to disconnect/reconnect // 5% chance to disconnect/reconnect
var u = getRandom(users) var u = getRandom(users)
if (u.connector.isDisconnected()) { if (u.connector.isDisconnected()) {
u.reconnect() yield u.reconnect()
} else { } else {
u.disconnect() yield u.disconnect()
} }
} }
yield wait() yield wait()
} }
} }
yield* applyTransactions(0.5)
g.applyRandomTransactionsAllRejoinNoGC = async(function * applyRandomTransactions (users, objects, transactions, numberOfTransactions) {
yield* applyTransactions(1, numberOfTransactions, objects, users, transactions)
yield users[0].connector.flushAll() yield users[0].connector.flushAll()
yield g.garbageCollectAllUsers(users)
yield wait() yield wait()
users[0].disconnect()
yield wait()
yield* applyTransactions(0.5)
yield users[0].connector.flushAll()
yield wait(50)
for (var u in users) { for (var u in users) {
users[u].reconnect() yield users[u].reconnect()
} }
yield wait(100) yield wait(100)
yield users[0].connector.flushAll() yield users[0].connector.flushAll()
yield g.garbageCollectAllUsers(users)
})
g.applyRandomTransactionsWithGC = async(function * applyRandomTransactions (users, objects, transactions, numberOfTransactions) {
yield* applyTransactions(1, numberOfTransactions, objects, users.slice(1), transactions)
yield users[0].connector.flushAll()
yield g.garbageCollectAllUsers(users)
yield wait(100)
for (var u in users) {
// TODO: here, we enforce that two users never sync at the same time with u[0]
// enforce that in the connector itself!
yield users[u].reconnect()
}
yield wait(100)
yield users[0].connector.flushAll()
yield wait(100)
yield g.garbageCollectAllUsers(users)
}) })
g.garbageCollectAllUsers = async(function * garbageCollectAllUsers (users) { g.garbageCollectAllUsers = async(function * garbageCollectAllUsers (users) {
return yield wait(100)// TODO!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! // gc two times because of the two gc phases (really collect everything)
/* yield wait(100)
for (var i in users) { for (var i in users) {
yield users[i].db.garbageCollect() yield users[i].db.garbageCollect()
yield users[i].db.garbageCollect() yield users[i].db.garbageCollect()
} }
*/ yield wait(100)
}) })
g.compareAllUsers = async(function * compareAllUsers (users) { g.compareAllUsers = async(function * compareAllUsers (users) {
@ -142,7 +154,6 @@ g.compareAllUsers = async(function * compareAllUsers (users) {
}) })
} }
yield users[0].connector.flushAll() yield users[0].connector.flushAll()
// gc two times because of the two gc phases (really collect everything)
yield g.garbageCollectAllUsers(users) yield g.garbageCollectAllUsers(users)
for (var uid = 0; uid < users.length; uid++) { for (var uid = 0; uid < users.length; uid++) {
@ -178,6 +189,8 @@ g.compareAllUsers = async(function * compareAllUsers (users) {
u.db.requestTransaction(t1) u.db.requestTransaction(t1)
yield wait() yield wait()
u.db.os.iterate(null, null, function (o) { u.db.os.iterate(null, null, function (o) {
o = Y.utils.copyObject(o)
delete o.origin
db1.push(o) db1.push(o)
}) })
} else { } else {
@ -188,6 +201,8 @@ g.compareAllUsers = async(function * compareAllUsers (users) {
expect(ds1).toEqual(ds2) // exported structure expect(ds1).toEqual(ds2) // exported structure
var count = 0 var count = 0
u.db.os.iterate(null, null, function (o) { u.db.os.iterate(null, null, function (o) {
o = Y.utils.copyObject(o)
delete o.origin
expect(db1[count++]).toEqual(o) expect(db1[count++]).toEqual(o)
}) })
} }

View File

@ -158,6 +158,15 @@ class AbstractTransaction {
} }
} }
} }
/*
Really remove an op and all its effects.
The complicated case here is the Insert operation:
* reset left
* reset right
* reset parent.start
* reset parent.end
* reset origins of all right ops
*/
* garbageCollectOperation (id) { * garbageCollectOperation (id) {
// check to increase the state of the respective user // check to increase the state of the respective user
var state = yield* this.getState(id[0]) var state = yield* this.getState(id[0])
@ -185,11 +194,45 @@ class AbstractTransaction {
yield* this.setOperation(left) yield* this.setOperation(left)
} }
// remove gc'd op from the right op, if it exists // remove gc'd op from the right op, if it exists
// also reset origins of right ops
if (o.right != null) { if (o.right != null) {
var right = yield* this.getOperation(o.right) var right = yield* this.getOperation(o.right)
right.left = o.left right.left = o.left
if (Y.utils.compareIds(right.origin, o.id)) { // rights origin is o
// find new origin of right ops
// origin is the first left deleted operation
var neworigin = o.left
while (neworigin != null) {
var neworigin_ = yield* this.getOperation(neworigin)
if (neworigin_.deleted) {
break
}
neworigin = neworigin_.left
}
// reset origin of right
right.origin = neworigin
// reset origin of all right ops (except first right - duh!),
// until you find origin pointer to the left of o
var i = right.right == null ? null : yield* this.getOperation(right.right)
var ids = [o.id, o.right]
while (i != null && ids.some(function (id) {
return Y.utils.compareIds(id, i.origin)
})) {
if (Y.utils.compareIds(i.origin, o.id)) {
// reset origin of i
i.origin = neworigin
yield* this.setOperation(i)
}
// get next i
i = i.right == null ? null : yield* this.getOperation(i.right)
}
} /* otherwise, rights origin is to the left of o,
then there is no right op (from o), that origins in o */
yield* this.setOperation(right) yield* this.setOperation(right)
} }
// remove gc'd op from parent, if it exists // remove gc'd op from parent, if it exists
var parent = yield* this.getOperation(o.parent) var parent = yield* this.getOperation(o.parent)
var setParent = false // whether to save parent to the os var setParent = false // whether to save parent to the os
@ -255,6 +298,7 @@ class AbstractOperationStore {
function garbageCollect () { function garbageCollect () {
return new Promise((resolve) => { return new Promise((resolve) => {
os.requestTransaction(function * () { os.requestTransaction(function * () {
if (os.y.connector.isSynced) {
for (var i in os.gc2) { for (var i in os.gc2) {
var oid = os.gc2[i] var oid = os.gc2[i]
yield* this.garbageCollectOperation(oid) yield* this.garbageCollectOperation(oid)
@ -264,6 +308,7 @@ class AbstractOperationStore {
if (os.gcTimeout > 0) { if (os.gcTimeout > 0) {
os.gcInterval = setTimeout(garbageCollect, os.gcTimeout) os.gcInterval = setTimeout(garbageCollect, os.gcTimeout)
} }
}
resolve() resolve()
}) })
}) })

View File

@ -4,6 +4,7 @@
class DeleteStore extends Y.utils.RBTree { class DeleteStore extends Y.utils.RBTree {
constructor () { constructor () {
super() super()
this.mem = []
} }
isDeleted (id) { isDeleted (id) {
var n = this.findNodeWithUpperBound(id) var n = this.findNodeWithUpperBound(id)
@ -15,6 +16,7 @@ class DeleteStore extends Y.utils.RBTree {
returns the delete node returns the delete node
*/ */
markGarbageCollected (id) { markGarbageCollected (id) {
// this.mem.push(["gc", id]);
var n = this.markDeleted(id) var n = this.markDeleted(id)
if (!n.val.gc) { if (!n.val.gc) {
if (n.val.id[1] < id[1]) { if (n.val.id[1] < id[1]) {
@ -23,6 +25,9 @@ class DeleteStore extends Y.utils.RBTree {
n.val.len -= newlen n.val.len -= newlen
n = this.add({id: id, len: newlen, gc: false}) n = this.add({id: id, len: newlen, gc: false})
} }
// get prev&next before adding a new operation
var prev = n.prev()
var next = n.next()
if (id[1] < n.val.id[1] + n.val.len - 1) { if (id[1] < n.val.id[1] + n.val.len - 1) {
// un-extend right // un-extend right
this.add({id: [id[0], id[1] + 1], len: n.val.len - 1, gc: false}) this.add({id: [id[0], id[1] + 1], len: n.val.len - 1, gc: false})
@ -30,8 +35,6 @@ class DeleteStore extends Y.utils.RBTree {
} }
// set gc'd // set gc'd
n.val.gc = true n.val.gc = true
var prev = n.prev()
var next = n.next()
// can extend left? // can extend left?
if ( if (
prev != null && prev != null &&
@ -52,7 +55,6 @@ class DeleteStore extends Y.utils.RBTree {
super.delete(next.val.id) super.delete(next.val.id)
} }
} }
return n
} }
/* /*
Mark an operation as deleted. Mark an operation as deleted.
@ -60,6 +62,7 @@ class DeleteStore extends Y.utils.RBTree {
returns the delete node returns the delete node
*/ */
markDeleted (id) { markDeleted (id) {
// this.mem.push(["del", id]);
var n = this.findNodeWithUpperBound(id) var n = this.findNodeWithUpperBound(id)
if (n != null && n.val.id[0] === id[0]) { if (n != null && n.val.id[0] === id[0]) {
if (n.val.id[1] <= id[1] && id[1] < n.val.id[1] + n.val.len) { if (n.val.id[1] <= id[1] && id[1] < n.val.id[1] + n.val.len) {
@ -85,9 +88,11 @@ class DeleteStore extends Y.utils.RBTree {
) { ) {
n.val.len = n.val.len + next.val.len n.val.len = n.val.len + next.val.len
super.delete(next.val.id) super.delete(next.val.id)
} return this.findNode(n.val.id)
} else {
return n return n
} }
}
/* /*
A DeleteSet (ds) describes all the deleted ops in the OS A DeleteSet (ds) describes all the deleted ops in the OS
*/ */

View File

@ -83,10 +83,8 @@ describe('Memory', function () {
}) })
it('Debug #5', async(function * (done) { it('Debug #5', async(function * (done) {
var store = new Y.Memory(null, { var store = new Y.Memory(null, {
db: {
name: 'Memory', name: 'Memory',
gcTimeout: -1 gcTimeout: -1
}
}) })
store.requestTransaction(function * () { store.requestTransaction(function * () {
yield* this.applyDeleteSet({'16': [[1, 2, false]], '17': [[0, 1, true], [1, 3, false]]}) yield* this.applyDeleteSet({'16': [[1, 2, false]], '17': [[0, 1, true], [1, 3, false]]})
@ -98,10 +96,8 @@ describe('Memory', function () {
})) }))
it('Debug #6', async(function * (done) { it('Debug #6', async(function * (done) {
var store = new Y.Memory(null, { var store = new Y.Memory(null, {
db: {
name: 'Memory', name: 'Memory',
gcTimeout: -1 gcTimeout: -1
}
}) })
store.requestTransaction(function * () { store.requestTransaction(function * () {
yield* this.applyDeleteSet({'40': [[0, 3, false]]}) yield* this.applyDeleteSet({'40': [[0, 3, false]]})
@ -111,5 +107,23 @@ describe('Memory', function () {
done() done()
}) })
})) }))
it('Debug #7', function () {
ds.markDeleted(['9', 2])
ds.markDeleted(['11', 2])
ds.markDeleted(['11', 4])
ds.markDeleted(['11', 1])
ds.markDeleted(['9', 4])
ds.markDeleted(['10', 0])
ds.markGarbageCollected(['11', 2])
ds.markDeleted(['11', 2])
ds.markGarbageCollected(['11', 3])
ds.markDeleted(['11', 3])
ds.markDeleted(['11', 3])
ds.markDeleted(['9', 4])
ds.markDeleted(['10', 0])
ds.markGarbageCollected(['11', 1])
ds.markDeleted(['11', 1])
expect(ds.toDeleteSet()).toEqual({'9': [[2, 1, false], [4, 1, false]], '10': [[0, 1, false]], '11': [[1, 3, true], [4, 1, false]]})
})
}) })
}) })

View File

@ -1,7 +1,7 @@
/* global createUsers, wait, Y, compareAllUsers, getRandomNumber, applyRandomTransactions, async, garbageCollectAllUsers, describeManyTimes */ /* global createUsers, wait, Y, compareAllUsers, getRandomNumber, applyRandomTransactionsAllRejoinNoGC, applyRandomTransactionsWithGC, async, garbageCollectAllUsers, describeManyTimes */
/* eslint-env browser,jasmine */ /* eslint-env browser,jasmine */
var numberOfYArrayTests = 10 var numberOfYArrayTests = 100
var repeatArrayTests = 1 var repeatArrayTests = 1
describe('Array Type', function () { describe('Array Type', function () {
@ -71,11 +71,8 @@ describe('Array Type', function () {
l2 = yield y2.get('Array') l2 = yield y2.get('Array')
l2.insert(1, [2]) l2.insert(1, [2])
l2.insert(1, [3]) l2.insert(1, [3])
yield flushAll() yield yconfig2.reconnect()
yconfig2.reconnect() yield yconfig3.reconnect()
yconfig3.reconnect()
yield wait()
yield flushAll()
expect(l1.toArray()).toEqual(l2.toArray()) expect(l1.toArray()).toEqual(l2.toArray())
done() done()
})) }))
@ -84,15 +81,12 @@ describe('Array Type', function () {
l1 = yield y1.set('Array', Y.Array) l1 = yield y1.set('Array', Y.Array)
l1.insert(0, ['x', 'y']) l1.insert(0, ['x', 'y'])
yield flushAll() yield flushAll()
yconfig2.disconnect() yield yconfig2.disconnect()
yield wait() yield wait()
l2 = yield y2.get('Array') l2 = yield y2.get('Array')
l2.delete(1, 1) l2.delete(1, 1)
l1.delete(0, 2) l1.delete(0, 2)
yield flushAll() yield yconfig2.reconnect()
yconfig2.reconnect()
yield wait()
yield flushAll()
expect(l1.toArray()).toEqual(l2.toArray()) expect(l1.toArray()).toEqual(l2.toArray())
done() done()
})) }))
@ -130,7 +124,7 @@ describe('Array Type', function () {
l1.delete(0, 3) l1.delete(0, 3)
l2 = yield y2.get('Array') l2 = yield y2.get('Array')
yield wait() yield wait()
yconfig2.reconnect() yield yconfig2.reconnect()
yield wait() yield wait()
l3 = yield y3.get('Array') l3 = yield y3.get('Array')
yield flushAll() yield flushAll()
@ -148,7 +142,7 @@ describe('Array Type', function () {
yconfig1.disconnect() yconfig1.disconnect()
l1.delete(0, 3) l1.delete(0, 3)
l2 = yield y2.get('Array') l2 = yield y2.get('Array')
yconfig1.reconnect() yield yconfig1.reconnect()
l3 = yield y3.get('Array') l3 = yield y3.get('Array')
yield flushAll() yield flushAll()
expect(l1.toArray()).toEqual(l2.toArray()) expect(l1.toArray()).toEqual(l2.toArray())
@ -188,7 +182,7 @@ describe('Array Type', function () {
l1.delete(0, 3) l1.delete(0, 3)
l2 = yield y2.get('Array') l2 = yield y2.get('Array')
yield wait() yield wait()
yconfig1.reconnect() yield yconfig1.reconnect()
yield wait() yield wait()
l3 = yield y3.get('Array') l3 = yield y3.get('Array')
yield flushAll() yield flushAll()
@ -238,11 +232,21 @@ describe('Array Type', function () {
expect(this.arrays.length).toEqual(this.users.length) expect(this.arrays.length).toEqual(this.users.length)
done() done()
})) }))
it(`succeed after ${numberOfYArrayTests} actions`, async(function * (done) { it(`succeed after ${numberOfYArrayTests} actions, no GC, all users disconnecting/reconnecting`, async(function * (done) {
for (var u of this.users) { for (var u of this.users) {
u.connector.debug = true u.connector.debug = true
} }
yield applyRandomTransactions(this.users, this.arrays, randomArrayTransactions, numberOfYArrayTests) yield applyRandomTransactionsAllRejoinNoGC(this.users, this.arrays, randomArrayTransactions, numberOfYArrayTests)
yield flushAll()
yield compareArrayValues(this.arrays)
yield compareAllUsers(this.users)
done()
}))
it(`succeed after ${numberOfYArrayTests} actions, GC, user[0] is not disconnecting`, async(function * (done) {
for (var u of this.users) {
u.connector.debug = true
}
yield applyRandomTransactionsWithGC(this.users, this.arrays, randomArrayTransactions, numberOfYArrayTests)
yield flushAll() yield flushAll()
yield compareArrayValues(this.arrays) yield compareArrayValues(this.arrays)
yield compareAllUsers(this.users) yield compareAllUsers(this.users)

View File

@ -1,4 +1,4 @@
/* global createUsers, Y, compareAllUsers, getRandomNumber, applyRandomTransactions, async, describeManyTimes */ /* global createUsers, Y, compareAllUsers, getRandomNumber, applyRandomTransactionsAllRejoinNoGC, applyRandomTransactionsWithGC, async, describeManyTimes */
/* eslint-env browser,jasmine */ /* eslint-env browser,jasmine */
var numberOfYMapTests = 100 var numberOfYMapTests = 100
@ -201,8 +201,14 @@ describe('Map Type', function () {
this.maps = yield Promise.all(promises) this.maps = yield Promise.all(promises)
done() done()
})) }))
it(`succeed after ${numberOfYMapTests} actions`, async(function * (done) { it(`succeed after ${numberOfYMapTests} actions, no GC, all users disconnecting/reconnecting`, async(function * (done) {
yield applyRandomTransactions(this.users, this.maps, randomMapTransactions, numberOfYMapTests) yield applyRandomTransactionsAllRejoinNoGC(this.users, this.maps, randomMapTransactions, numberOfYMapTests)
yield flushAll()
yield compareMapValues(this.maps)
done()
}))
it(`succeed after ${numberOfYMapTests} actions, GC, user[0] is not disconnecting`, async(function * (done) {
yield applyRandomTransactionsWithGC(this.users, this.maps, randomMapTransactions, numberOfYMapTests)
yield flushAll() yield flushAll()
yield compareMapValues(this.maps) yield compareMapValues(this.maps)
done() done()

View File

@ -33,13 +33,13 @@ class YConfig {
return this.connector.isSynced return this.connector.isSynced
} }
disconnect () { disconnect () {
this.connector.disconnect() return this.connector.disconnect()
} }
reconnect () { reconnect () {
this.connector.reconnect() return this.connector.reconnect()
} }
destroy () { destroy () {
this.connector.disconnect() this.disconnect()
this.db.destroy() this.db.destroy()
this.connector = null this.connector = null
this.db = null this.db = null