fixed late join issues when gc is turned off
This commit is contained in:
parent
aadef59934
commit
9c4074e3e3
@ -120,7 +120,7 @@ gulp.task('build:test', function () {
|
||||
if (!options.regenerator) {
|
||||
babelOptions.blacklist = 'regenerator'
|
||||
}
|
||||
gulp.src('src/**/*.js')
|
||||
return gulp.src('src/**/*.js')
|
||||
.pipe(sourcemaps.init())
|
||||
.pipe(babel(babelOptions))
|
||||
.pipe(sourcemaps.write())
|
||||
|
@ -30,6 +30,7 @@ class AbstractConnector {
|
||||
this.forwardToSyncingClients = opts.forwardToSyncingClients !== false
|
||||
this.debug = opts.debug === true
|
||||
this.broadcastedHB = false
|
||||
this.syncStep2 = Promise.resolve()
|
||||
}
|
||||
reconnect () {
|
||||
}
|
||||
@ -184,27 +185,34 @@ class AbstractConnector {
|
||||
let conn = this
|
||||
var broadcastHB = !this.broadcastedHB
|
||||
this.broadcastedHB = true
|
||||
this.y.db.requestTransaction(function * () {
|
||||
yield* this.applyDeleteSet(m.deleteSet)
|
||||
this.store.apply(m.os)
|
||||
})
|
||||
this.y.db.requestTransaction(function * () {
|
||||
var ops = yield* this.getOperations(m.stateSet)
|
||||
if (ops.length > 0) {
|
||||
m = {
|
||||
type: 'update',
|
||||
ops: ops
|
||||
}
|
||||
if (!broadcastHB || true) { // TODO: consider to broadcast here..
|
||||
conn.send(sender, m)
|
||||
} else {
|
||||
// broadcast only once!
|
||||
conn.broadcast(m)
|
||||
}
|
||||
}
|
||||
var db = this.y.db
|
||||
this.syncStep2 = new Promise(function (resolve) {
|
||||
db.requestTransaction(function * () {
|
||||
yield* this.applyDeleteSet(m.deleteSet)
|
||||
this.store.apply(m.os)
|
||||
db.requestTransaction(function * () {
|
||||
var ops = yield* this.getOperations(m.stateSet)
|
||||
if (ops.length > 0) {
|
||||
m = {
|
||||
type: 'update',
|
||||
ops: ops
|
||||
}
|
||||
if (!broadcastHB) { // TODO: consider to broadcast here..
|
||||
conn.send(sender, m)
|
||||
} else {
|
||||
// broadcast only once!
|
||||
conn.broadcast(m)
|
||||
}
|
||||
}
|
||||
resolve()
|
||||
})
|
||||
})
|
||||
})
|
||||
} else if (m.type === 'sync done') {
|
||||
this._setSyncedWith(sender)
|
||||
var self = this
|
||||
this.syncStep2.then(function () {
|
||||
self._setSyncedWith(sender)
|
||||
})
|
||||
} else if (m.type === 'update') {
|
||||
if (this.forwardToSyncingClients) {
|
||||
for (var client of this.syncingClients) {
|
||||
|
@ -18,7 +18,7 @@ g.g = g
|
||||
|
||||
g.YConcurrency_TestingMode = true
|
||||
|
||||
jasmine.DEFAULT_TIMEOUT_INTERVAL = 5000
|
||||
jasmine.DEFAULT_TIMEOUT_INTERVAL = 10000
|
||||
|
||||
g.describeManyTimes = function describeManyTimes (times, name, f) {
|
||||
for (var i = 0; i < times; i++) {
|
||||
@ -31,12 +31,12 @@ g.describeManyTimes = function describeManyTimes (times, name, f) {
|
||||
*/
|
||||
function wait (t) {
|
||||
if (t == null) {
|
||||
t = 5
|
||||
t = 10
|
||||
}
|
||||
return new Promise(function (resolve) {
|
||||
setTimeout(function () {
|
||||
resolve()
|
||||
}, t)
|
||||
}, t * 2)
|
||||
})
|
||||
}
|
||||
g.wait = wait
|
||||
@ -110,10 +110,12 @@ g.applyRandomTransactions = async(function * applyRandomTransactions (users, obj
|
||||
|
||||
g.garbageCollectAllUsers = async(function * garbageCollectAllUsers (users) {
|
||||
return yield wait(100)// TODO!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
|
||||
/*
|
||||
for (var i in users) {
|
||||
yield users[i].db.garbageCollect()
|
||||
yield users[i].db.garbageCollect()
|
||||
}
|
||||
*/
|
||||
})
|
||||
|
||||
g.compareAllUsers = async(function * compareAllUsers (users) {
|
||||
@ -239,7 +241,7 @@ function async (makeGenerator) {
|
||||
try {
|
||||
return handle(generator.next())
|
||||
} catch (ex) {
|
||||
generator.throw(ex) // TODO: check this out
|
||||
generator.throw(ex)
|
||||
// return Promise.reject(ex)
|
||||
}
|
||||
}
|
||||
|
@ -4,9 +4,6 @@
|
||||
class DeleteStore extends Y.utils.RBTree {
|
||||
constructor () {
|
||||
super()
|
||||
// TODO: debugggg
|
||||
this.mem = [];
|
||||
this.memDS = [];
|
||||
}
|
||||
isDeleted (id) {
|
||||
var n = this.findNodeWithUpperBound(id)
|
||||
@ -18,9 +15,7 @@ class DeleteStore extends Y.utils.RBTree {
|
||||
returns the delete node
|
||||
*/
|
||||
markGarbageCollected (id) {
|
||||
this.mem.push({"gc": id});
|
||||
var n = this.markDeleted(id)
|
||||
this.mem.pop()
|
||||
if (!n.val.gc) {
|
||||
if (n.val.id[1] < id[1]) {
|
||||
// un-extend left
|
||||
@ -65,7 +60,6 @@ class DeleteStore extends Y.utils.RBTree {
|
||||
returns the delete node
|
||||
*/
|
||||
markDeleted (id) {
|
||||
this.mem.push({"del": id});
|
||||
var n = this.findNodeWithUpperBound(id)
|
||||
if (n != null && n.val.id[0] === id[0]) {
|
||||
if (n.val.id[1] <= id[1] && id[1] < n.val.id[1] + n.val.len) {
|
||||
@ -125,8 +119,6 @@ Y.Memory = (function () {
|
||||
this.ss = store.ss
|
||||
this.os = store.os
|
||||
this.ds = store.ds
|
||||
|
||||
this.memDS = store.ds.memDS; // TODO: remove
|
||||
}
|
||||
* checkDeleteStoreForState (state) {
|
||||
var n = this.ds.findNodeWithUpperBound([state.user, state.clock])
|
||||
@ -149,11 +141,6 @@ Y.Memory = (function () {
|
||||
}
|
||||
}
|
||||
|
||||
var memAction = {
|
||||
before: yield* this.getDeleteSet(),
|
||||
applied: JSON.parse(JSON.stringify(ds))
|
||||
};
|
||||
|
||||
for (var user in ds) {
|
||||
var dv = ds[user]
|
||||
var pos = 0
|
||||
@ -215,8 +202,6 @@ Y.Memory = (function () {
|
||||
yield* this.deleteOperation(id)
|
||||
}
|
||||
}
|
||||
memAction.after = yield* this.getDeleteSet();
|
||||
this.memDS.push(memAction);
|
||||
}
|
||||
* isDeleted (id) {
|
||||
return this.ds.isDeleted(id)
|
||||
@ -294,43 +279,67 @@ Y.Memory = (function () {
|
||||
var res = []
|
||||
for (var op of ops) {
|
||||
res.push(yield* this.makeOperationReady(startSS, op))
|
||||
/*
|
||||
var state = startSS[op.id[0]] || 0
|
||||
if ((state === op.id[1]) || true) {
|
||||
startSS[op.id[0]] = op.id[1] + 1
|
||||
} else {
|
||||
throw new Error('Unexpected operation!')
|
||||
}
|
||||
*/
|
||||
}
|
||||
return res
|
||||
}
|
||||
* makeOperationReady (ss, op) {
|
||||
/*
|
||||
Here, we make op executable for the receiving user.
|
||||
|
||||
Notes:
|
||||
startSS: denotes to the SV that the remote user sent
|
||||
currSS: denotes to the state vector that the user should have if he
|
||||
applies all already sent operations (increases is each step)
|
||||
|
||||
We face several problems:
|
||||
* Execute op as is won't work because ops depend on each other
|
||||
-> find a way so that they do not anymore
|
||||
* When changing left, must not go more to the left than the origin
|
||||
* When changing right, you have to consider that other ops may have op
|
||||
as their origin, this means that you must not set one of these ops
|
||||
as the new right (interdependencies of ops)
|
||||
* can't just go to the right until you find the first known operation,
|
||||
With currSS
|
||||
-> interdependency of ops is a problem
|
||||
With startSS
|
||||
-> leads to inconsistencies when two users join at the same time.
|
||||
Then the position depends on the order of execution -> error!
|
||||
|
||||
Solution:
|
||||
-> re-create originial situation
|
||||
-> set op.left = op.origin (which never changes)
|
||||
-> set op.right
|
||||
to the first operation that is known (according to startSS)
|
||||
or to the first operation that has an origin that is not to the
|
||||
right of op.
|
||||
-> Enforces unique execution order -> happy user
|
||||
|
||||
Improvements: TODO
|
||||
* Could set left to origin, or the first known operation
|
||||
(startSS or currSS.. ?)
|
||||
-> Could be necessary when I turn GC again.
|
||||
-> Is a bad(ish) idea because it requires more computation
|
||||
*/
|
||||
* makeOperationReady (startSS, op) {
|
||||
op = Y.Struct[op.struct].encode(op)
|
||||
// instead of ss, you could use currSS (a ss that increments when you add an operation)
|
||||
op = Y.utils.copyObject(op)
|
||||
var o = op
|
||||
|
||||
var ids = [op.id]
|
||||
// search for the new op.right
|
||||
// it is either the first known op (according to startSS)
|
||||
// or the o that has no origin to the right of op
|
||||
// (this is why we use the ids array)
|
||||
while (o.right != null) {
|
||||
// while unknown, go to the right
|
||||
if (o.right[1] < (ss[o.right[0]] || 0)) { // && !Y.utils.compareIds(op.id, o.origin)
|
||||
var right = yield* this.getOperation(o.right)
|
||||
if (o.right[1] < (startSS[o.right[0]] || 0) || !ids.some(function (id) {
|
||||
return Y.utils.compareIds(id, right.origin)
|
||||
})) {
|
||||
break
|
||||
}
|
||||
o = yield* this.getOperation(o.right)
|
||||
ids.push(o.right)
|
||||
o = right
|
||||
}
|
||||
// new right is known according to the ss
|
||||
op.right = o.right
|
||||
/*
|
||||
while (o.left != null) {
|
||||
// while unknown, go to the right
|
||||
if (o.left[1] < (ss[o.left[0]] || 0)) {
|
||||
break
|
||||
}
|
||||
o = yield* this.getOperation(o.left)
|
||||
}
|
||||
// new left is known according to the ss
|
||||
op.left = o.left
|
||||
*/
|
||||
op.left = op.origin
|
||||
return op
|
||||
}
|
||||
}
|
||||
|
@ -2,7 +2,7 @@
|
||||
/* eslint-env browser,jasmine */
|
||||
|
||||
var numberOfYArrayTests = 10
|
||||
var repeatArrayTests = 1000
|
||||
var repeatArrayTests = 1
|
||||
|
||||
describe('Array Type', function () {
|
||||
var y1, y2, y3, yconfig1, yconfig2, yconfig3, flushAll
|
||||
|
Loading…
x
Reference in New Issue
Block a user