fixed DS bugs (i guess..) now handling more complicated scenarios
This commit is contained in:
parent
6a13419c62
commit
aadef59934
@ -90,7 +90,7 @@ class AbstractConnector {
|
||||
// Execute a function _when_ we are connected.
|
||||
// If not connected, wait until connected
|
||||
whenSynced (f) {
|
||||
if (this.isSynced === true) {
|
||||
if (this.isSynced) {
|
||||
f()
|
||||
} else {
|
||||
this.whenSyncedListeners.push(f)
|
||||
@ -125,6 +125,7 @@ class AbstractConnector {
|
||||
})
|
||||
} else {
|
||||
this.isSynced = true
|
||||
// call when synced listeners
|
||||
for (var f of this.whenSyncedListeners) {
|
||||
f()
|
||||
}
|
||||
@ -171,13 +172,13 @@ class AbstractConnector {
|
||||
conn.send(sender, {
|
||||
type: 'sync done'
|
||||
})
|
||||
conn._setSyncedWith(sender)
|
||||
}, conn.syncingClientDuration)
|
||||
} else {
|
||||
conn.send(sender, {
|
||||
type: 'sync done'
|
||||
})
|
||||
}
|
||||
conn._setSyncedWith(sender)
|
||||
})
|
||||
} else if (m.type === 'sync step 2') {
|
||||
let conn = this
|
||||
|
@ -36,7 +36,7 @@ function wait (t) {
|
||||
return new Promise(function (resolve) {
|
||||
setTimeout(function () {
|
||||
resolve()
|
||||
}, t * 7)
|
||||
}, t)
|
||||
})
|
||||
}
|
||||
g.wait = wait
|
||||
@ -109,6 +109,7 @@ g.applyRandomTransactions = async(function * applyRandomTransactions (users, obj
|
||||
})
|
||||
|
||||
g.garbageCollectAllUsers = async(function * garbageCollectAllUsers (users) {
|
||||
return yield wait(100)// TODO!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
|
||||
for (var i in users) {
|
||||
yield users[i].db.garbageCollect()
|
||||
yield users[i].db.garbageCollect()
|
||||
|
@ -41,6 +41,7 @@
|
||||
* addOperation(op)
|
||||
- add an operation to the database.
|
||||
This may only be called once for every op.id
|
||||
Must return a function that returns the next operation in the database (ordered by id)
|
||||
* getOperation(id)
|
||||
* removeOperation(id)
|
||||
- remove an operation from the database. This is called when an operation
|
||||
@ -105,9 +106,9 @@ class AbstractTransaction {
|
||||
for (var i = 0; i < ops.length; i++) {
|
||||
var op = ops[i]
|
||||
yield* this.store.tryExecute.call(this, op)
|
||||
send.push(Y.utils.copyObject(Y.Struct[op.struct].encode(op)))
|
||||
send.push(Y.Struct[op.struct].encode(op))
|
||||
}
|
||||
if (this.store.y.connector.broadcastedHB) {
|
||||
if (!this.store.y.connector.isDisconnected()) {
|
||||
this.store.y.connector.broadcast({
|
||||
type: 'update',
|
||||
ops: send
|
||||
@ -246,7 +247,6 @@ class AbstractOperationStore {
|
||||
// wont be kept in memory.
|
||||
this.initializedTypes = {}
|
||||
this.whenUserIdSetListener = null
|
||||
this.waitingOperations = new Y.utils.RBTree()
|
||||
|
||||
this.gc1 = [] // first stage
|
||||
this.gc2 = [] // second stage -> after that, remove the op
|
||||
@ -441,56 +441,39 @@ class AbstractOperationStore {
|
||||
}
|
||||
/*
|
||||
Actually execute an operation, when all expected operations are available.
|
||||
If op is not yet expected, add it to the list of waiting operations.
|
||||
|
||||
This will also try to execute waiting operations
|
||||
(ops that were not expected yet), after it was applied
|
||||
*/
|
||||
* tryExecute (op) {
|
||||
if (op.struct === 'Delete') {
|
||||
yield* Y.Struct.Delete.execute.call(this, op)
|
||||
} else {
|
||||
while (op != null) {
|
||||
var state = yield* this.getState(op.id[0])
|
||||
if (op.id[1] === state.clock) {
|
||||
// either its a new operation (1. case), or it is an operation that was deleted, but is not yet in the OS
|
||||
if (op.id[1] === state.clock) {
|
||||
state.clock++
|
||||
yield* this.checkDeleteStoreForState(state)
|
||||
yield* this.setState(state)
|
||||
}
|
||||
} else if ((yield* this.getOperation(op.id)) == null) {
|
||||
yield* Y.Struct[op.struct].execute.call(this, op)
|
||||
var next = yield* this.addOperation(op)
|
||||
yield* this.store.operationAdded(this, op, next)
|
||||
|
||||
yield* Y.Struct[op.struct].execute.call(this, op)
|
||||
yield* this.addOperation(op)
|
||||
yield* this.store.operationAdded(this, op)
|
||||
|
||||
// Delete if DS says this is actually deleted
|
||||
if (this.store.ds.isDeleted(op.id)) {
|
||||
yield* Y.Struct['Delete'].execute.call(this, {struct: 'Delete', target: op.id})
|
||||
}
|
||||
|
||||
// find next operation to execute
|
||||
op = this.store.waitingOperations.find([op.id[0], state.clock])
|
||||
if (op != null) {
|
||||
this.store.waitingOperations.delete([op.id[0], state.clock])
|
||||
}
|
||||
} else {
|
||||
if (op.id[1] > state.clock) {
|
||||
// has to be executed at some point later
|
||||
this.store.waitingOperations.add(op)
|
||||
}
|
||||
op = null
|
||||
}
|
||||
// Delete if DS says this is actually deleted
|
||||
if (this.store.ds.isDeleted(op.id)) {
|
||||
yield* Y.Struct['Delete'].execute.call(this, {struct: 'Delete', target: op.id})
|
||||
}
|
||||
}
|
||||
}
|
||||
// called by a transaction when an operation is added
|
||||
* operationAdded (transaction, op) {
|
||||
* operationAdded (transaction, op, next) {
|
||||
// increase SS
|
||||
var o = op
|
||||
var state = yield* transaction.getState(op.id[0])
|
||||
while (o != null && o.id[1] === state.clock && op.id[0] === o.id[0]) {
|
||||
// either its a new operation (1. case), or it is an operation that was deleted, but is not yet in the OS
|
||||
state.clock++
|
||||
yield* transaction.checkDeleteStoreForState(state)
|
||||
o = next()
|
||||
}
|
||||
yield* transaction.setState(state)
|
||||
|
||||
// notify whenOperation listeners (by id)
|
||||
var sid = JSON.stringify(op.id)
|
||||
var l = this.listenersById[sid]
|
||||
delete this.listenersById[sid]
|
||||
|
||||
// notify whenOperation listeners (by id)
|
||||
if (l != null) {
|
||||
for (var key in l) {
|
||||
var listener = l[key]
|
||||
|
@ -186,7 +186,7 @@ Y.Memory = (function () {
|
||||
if (d[2] && !n.gc) {
|
||||
// d marks as gc'd but n does not
|
||||
// then delete either way
|
||||
createDeletions(user, d[0], diff, d[2])
|
||||
createDeletions(user, d[0], Math.min(diff, d[1]), d[2])
|
||||
}
|
||||
}
|
||||
if (d[1] <= diff) {
|
||||
@ -228,7 +228,15 @@ Y.Memory = (function () {
|
||||
return op
|
||||
}
|
||||
* addOperation (op) {
|
||||
this.os.add(op)
|
||||
var n = this.os.add(op)
|
||||
return function () {
|
||||
if (n != null) {
|
||||
n = n.next()
|
||||
return n != null ? n.val : null
|
||||
} else {
|
||||
return null
|
||||
}
|
||||
}
|
||||
}
|
||||
* getOperation (id) {
|
||||
return this.os.find(id)
|
||||
@ -286,12 +294,14 @@ Y.Memory = (function () {
|
||||
var res = []
|
||||
for (var op of ops) {
|
||||
res.push(yield* this.makeOperationReady(startSS, op))
|
||||
/*
|
||||
var state = startSS[op.id[0]] || 0
|
||||
if ((state === op.id[1]) || true) {
|
||||
startSS[op.id[0]] = state + 1
|
||||
startSS[op.id[0]] = op.id[1] + 1
|
||||
} else {
|
||||
throw new Error('Unexpected operation!')
|
||||
}
|
||||
*/
|
||||
}
|
||||
return res
|
||||
}
|
||||
@ -300,15 +310,17 @@ Y.Memory = (function () {
|
||||
// instead of ss, you could use currSS (a ss that increments when you add an operation)
|
||||
op = Y.utils.copyObject(op)
|
||||
var o = op
|
||||
|
||||
while (o.right != null) {
|
||||
// while unknown, go to the right
|
||||
if (o.right[1] < (ss[o.right[0]] || 0)) {
|
||||
if (o.right[1] < (ss[o.right[0]] || 0)) { // && !Y.utils.compareIds(op.id, o.origin)
|
||||
break
|
||||
}
|
||||
o = yield* this.getOperation(o.right)
|
||||
}
|
||||
// new right is not gc'd and known according to the ss
|
||||
// new right is known according to the ss
|
||||
op.right = o.right
|
||||
/*
|
||||
while (o.left != null) {
|
||||
// while unknown, go to the right
|
||||
if (o.left[1] < (ss[o.left[0]] || 0)) {
|
||||
@ -316,8 +328,9 @@ Y.Memory = (function () {
|
||||
}
|
||||
o = yield* this.getOperation(o.left)
|
||||
}
|
||||
// new left is not gc'd and known according to the ss
|
||||
// new left is known according to the ss
|
||||
op.left = o.left
|
||||
*/
|
||||
return op
|
||||
}
|
||||
}
|
||||
|
@ -1,4 +1,4 @@
|
||||
/* global Y */
|
||||
/* global Y, async */
|
||||
/* eslint-env browser,jasmine,console */
|
||||
|
||||
describe('Memory', function () {
|
||||
@ -96,5 +96,20 @@ describe('Memory', function () {
|
||||
done()
|
||||
})
|
||||
}))
|
||||
it('Debug #6', async(function * (done) {
|
||||
var store = new Y.Memory(null, {
|
||||
db: {
|
||||
name: 'Memory',
|
||||
gcTimeout: -1
|
||||
}
|
||||
})
|
||||
store.requestTransaction(function * () {
|
||||
yield* this.applyDeleteSet({'40': [[0, 3, false]]})
|
||||
expect(this.ds.toDeleteSet()).toEqual({'40': [[0, 3, false]]})
|
||||
yield* this.applyDeleteSet({'39': [[2, 2, false]], '40': [[0, 1, true], [1, 2, false]], '41': [[2, 1, false]]})
|
||||
expect(this.ds.toDeleteSet()).toEqual({'39': [[2, 2, false]], '40': [[0, 1, true], [1, 2, false]], '41': [[2, 1, false]]})
|
||||
done()
|
||||
})
|
||||
}))
|
||||
})
|
||||
})
|
||||
|
@ -80,6 +80,9 @@ var Struct = {
|
||||
if (op.right != null) {
|
||||
ids.push(op.right)
|
||||
}
|
||||
if (op.origin != null && !Y.utils.compareIds(op.left, op.origin)) {
|
||||
ids.push(op.origin)
|
||||
}
|
||||
// if (op.right == null && op.left == null) {
|
||||
ids.push(op.parent)
|
||||
|
||||
@ -175,12 +178,6 @@ var Struct = {
|
||||
op.right = left.right
|
||||
left.right = op.id
|
||||
|
||||
/*/ if left exists, and it is supposed to be gc'd. Remove it from the gc
|
||||
if (left.gc != null) {
|
||||
this.store.removeFromGarbageCollector(left)
|
||||
}
|
||||
*/
|
||||
|
||||
yield* this.setOperation(left)
|
||||
} else {
|
||||
op.right = op.parentSub ? parent.map[op.parentSub] || null : parent.start
|
||||
|
@ -1,7 +1,7 @@
|
||||
/* global createUsers, wait, Y, compareAllUsers, getRandomNumber, applyRandomTransactions, async, garbageCollectAllUsers, describeManyTimes */
|
||||
/* eslint-env browser,jasmine */
|
||||
|
||||
var numberOfYArrayTests = 20
|
||||
var numberOfYArrayTests = 10
|
||||
var repeatArrayTests = 1000
|
||||
|
||||
describe('Array Type', function () {
|
||||
|
Loading…
x
Reference in New Issue
Block a user