updating some changes i forgot to commit

This commit is contained in:
Kevin Jahns 2015-09-26 14:42:50 +02:00
parent 5e4c56af29
commit b08aeee4fc
9 changed files with 74 additions and 55 deletions

View File

@ -28,14 +28,17 @@
Specify which specs to use! Specify which specs to use!
Commands: Commands:
- build: - build
Build this library Build this library
- dev: - dev:browser
Watch the ./src directory.
Builds the library on changes.
Starts an http-server and serves the test suite on http://127.0.0.1:8888.
- dev:node
Watch the ./src directory. Watch the ./src directory.
Builds and specs the library on changes. Builds and specs the library on changes.
Starts an http-server and serves the test suite on http://127.0.0.1:8888. Usefull to run with node-inspector.
- build_test: `node-debug $(which gulp) dev:node
Builds the test suite
- test: - test:
Test this library Test this library
*/ */

View File

@ -142,27 +142,35 @@ class AbstractConnector {
} }
if (this.debug) { if (this.debug) {
console.log(`${sender} -> me: ${m.type}`, m);// eslint-disable-line console.log(`${sender} -> me: ${m.type}`, m);// eslint-disable-line
if (m.os != null && m.os.some(function(o){return o.deleted })){
console.log("bullshit.. ")
debugger
}
} }
if (m.type === 'sync step 1') { if (m.type === 'sync step 1') {
// TODO: make transaction, stream the ops // TODO: make transaction, stream the ops
let conn = this let conn = this
this.y.db.requestTransaction(function *() { this.y.db.requestTransaction(function *() {
var ops = yield* this.getOperations(m.stateSet) var ops = yield* this.getOperations(m.stateSet)
var dels = yield* this.getOpsFromDeleteSet(m.deleteSet)
if (dels.length > 0) {
this.store.apply(dels)
// broadcast missing dels from syncing client
this.store.y.connector.broadcast({
type: 'update',
ops: dels
})
}
conn.send(sender, { conn.send(sender, {
type: 'sync step 2', type: 'sync step 2',
os: ops, os: ops,
stateSet: yield* this.getStateSet(), stateSet: yield* this.getStateSet(),
deleteSet: yield* this.getDeleteSet() deleteSet: yield* this.getDeleteSet() // TODO: consider that you have a ds from the other user..
}) })
var dels = yield* this.getOpsFromDeleteSet(m.deleteSet)
if (dels.length > 0) {
for (var i in dels) {
// TODO: no longer get delete ops (just get the ids..)!
yield* Y.Struct.Delete.delete.call(this, dels[i].target)
}
/*/ broadcast missing dels from syncing client
this.store.y.connector.broadcast({
type: 'update',
ops: dels
})
*/
}
if (this.forwardToSyncingClients) { if (this.forwardToSyncingClients) {
conn.syncingClients.push(sender) conn.syncingClients.push(sender)
setTimeout(function () { setTimeout(function () {
@ -187,14 +195,14 @@ class AbstractConnector {
this.y.db.requestTransaction(function *() { this.y.db.requestTransaction(function *() {
var ops = yield* this.getOperations(m.stateSet) var ops = yield* this.getOperations(m.stateSet)
var dels = yield* this.getOpsFromDeleteSet(m.deleteSet) var dels = yield* this.getOpsFromDeleteSet(m.deleteSet)
this.store.apply(dels)
this.store.apply(m.os) this.store.apply(m.os)
this.store.apply(dels)
if (ops.length > 0) { if (ops.length > 0) {
m = { m = {
type: 'update', type: 'update',
ops: ops ops: ops
} }
if (!broadcastHB) { if (!broadcastHB || true) { // TODO: consider to broadcast here..
conn.send(sender, m) conn.send(sender, m)
} else { } else {
// broadcast only once! // broadcast only once!

View File

@ -59,6 +59,9 @@ class Test extends Y.AbstractConnector {
globalRoom.addUser(this) globalRoom.addUser(this)
this.globalRoom = globalRoom this.globalRoom = globalRoom
} }
receiveMessage (sender, m) {
super.receiveMessage(sender, JSON.parse(JSON.stringify(m)))
}
send (userId, message) { send (userId, message) {
globalRoom.buffers[userId].push(JSON.parse(JSON.stringify([this.userId, message]))) globalRoom.buffers[userId].push(JSON.parse(JSON.stringify([this.userId, message])))
} }

View File

@ -84,16 +84,15 @@ g.applyRandomTransactions = async(function * applyRandomTransactions (users, obj
} }
} }
applyTransactions() applyTransactions()
applyTransactions()
/* TODO: call applyTransactions here..
yield users[0].connector.flushAll() yield users[0].connector.flushAll()
yield g.garbageCollectAllUsers(users)
users[0].disconnect() users[0].disconnect()
yield wait() yield wait()
applyTransactions() applyTransactions()
yield users[0].connector.flushAll() yield users[0].connector.flushAll()
yield g.garbageCollectAllUsers(users)
users[0].reconnect() users[0].reconnect()
*/ yield wait(100)
yield wait()
yield users[0].connector.flushAll() yield users[0].connector.flushAll()
}) })
@ -104,7 +103,7 @@ g.garbageCollectAllUsers = async(function * garbageCollectAllUsers (users) {
} }
}) })
g.compareAllUsers = async(function * compareAllUsers (users) { //eslint-disable-line g.compareAllUsers = async(function * compareAllUsers (users) {
var s1, s2 // state sets var s1, s2 // state sets
var ds1, ds2 // delete sets var ds1, ds2 // delete sets
var allDels1, allDels2 // all deletions var allDels1, allDels2 // all deletions
@ -129,11 +128,7 @@ g.compareAllUsers = async(function * compareAllUsers (users) { //eslint-disable-
} }
yield users[0].connector.flushAll() yield users[0].connector.flushAll()
// gc two times because of the two gc phases (really collect everything) // gc two times because of the two gc phases (really collect everything)
yield wait(100)
yield g.garbageCollectAllUsers(users) yield g.garbageCollectAllUsers(users)
yield wait(100)
yield g.garbageCollectAllUsers(users)
yield wait(100)
for (var uid = 0; uid < users.length; uid++) { for (var uid = 0; uid < users.length; uid++) {
var u = users[uid] var u = users[uid]

View File

@ -122,7 +122,7 @@ Y.AbstractTransaction = AbstractTransaction
* destroy() * destroy()
- destroy the database - destroy the database
*/ */
class AbstractOperationStore { // eslint-disable-line no-unused-vars class AbstractOperationStore {
constructor (y, opts) { constructor (y, opts) {
this.y = y this.y = y
// E.g. this.listenersById[id] : Array<Listener> // E.g. this.listenersById[id] : Array<Listener>
@ -239,7 +239,7 @@ class AbstractOperationStore { // eslint-disable-line no-unused-vars
apply (ops) { apply (ops) {
for (var key in ops) { for (var key in ops) {
var o = ops[key] var o = ops[key]
if (!o.gc) { if (o.gc == null) { // TODO: why do i get the same op twice?
var required = Y.Struct[o.struct].requiredOps(o) var required = Y.Struct[o.struct].requiredOps(o)
this.whenOperationsExist(required, o) this.whenOperationsExist(required, o)
} else { } else {
@ -316,17 +316,19 @@ class AbstractOperationStore { // eslint-disable-line no-unused-vars
} else { } else {
while (op != null) { while (op != null) {
var state = yield* this.getState(op.id[0]) var state = yield* this.getState(op.id[0])
if (op.id[1] === state.clock || (op.id[1] < state.clock && (yield* this.getOperation(op.id)) == null)) {
// either its a new operation (1. case), or it is an operation that was deleted, but is not yet in the OS
if (op.id[1] === state.clock) { if (op.id[1] === state.clock) {
state.clock++ state.clock++
yield* this.checkDeleteStoreForState(state) yield* this.checkDeleteStoreForState(state)
yield* this.setState(state) yield* this.setState(state)
var isDeleted = this.store.ds.isDeleted(op.id) }
yield* Y.Struct[op.struct].execute.call(this, op) yield* Y.Struct[op.struct].execute.call(this, op)
yield* this.addOperation(op) yield* this.addOperation(op)
yield* this.store.operationAdded(this, op) yield* this.store.operationAdded(this, op)
if (isDeleted) { if (this.store.ds.isDeleted(op.id)) {
yield* Y.Struct['Delete'].execute.call(this, {struct: 'Delete', target: op.id}) yield* Y.Struct['Delete'].execute.call(this, {struct: 'Delete', target: op.id})
} }

View File

@ -193,17 +193,15 @@ Y.Memory = (function () {
var startPos = startSS[user] || 0 var startPos = startSS[user] || 0
var endPos = endState.clock var endPos = endState.clock
this.os.iterate([user, startPos], [user, endPos], function (op) {// eslint-disable-line this.os.iterate([user, startPos], [user, endPos], function (op) {
if (!op.gc) { ops.push(op)
ops.push(Y.Struct[op.struct].encode(op))
}
}) })
} }
var res = [] var res = []
for (var op of ops) { for (var op of ops) {
res.push(yield* this.makeOperationReady(startSS, op)) res.push(yield* this.makeOperationReady(startSS, op))
var state = startSS[op.id[0]] || 0 var state = startSS[op.id[0]] || 0
if (state === op.id[1] || true) { if ((state === op.id[1]) || true) {
startSS[op.id[0]] = state + 1 startSS[op.id[0]] = state + 1
} else { } else {
throw new Error('Unexpected operation!') throw new Error('Unexpected operation!')
@ -212,32 +210,32 @@ Y.Memory = (function () {
return res return res
} }
* makeOperationReady (ss, op) { * makeOperationReady (ss, op) {
op = Y.Struct[op.struct].encode(op)
// instead of ss, you could use currSS (a ss that increments when you add an operation) // instead of ss, you could use currSS (a ss that increments when you add an operation)
op = Y.utils.copyObject(op) op = Y.utils.copyObject(op)
var o = op var o = op
var clock
while (o.right != null) { while (o.right != null) {
// while unknown, go to the right // while unknown, go to the right
clock = ss[o.right[0]] || 0 if (o.right[1] < (ss[o.right[0]] || 0)) {
if (o.right[1] < clock && !o.gc) {
break break
} }
o = yield* this.getOperation(o.right) o = yield* this.getOperation(o.right)
} }
// new right is not gc'd and known according to the ss
op.right = o.right op.right = o.right
while (o.left != null) { while (o.left != null) {
// while unknown, go to the right // while unknown, go to the right
clock = ss[o.left[0]] || 0 if (o.left[1] < (ss[o.left[0]] || 0)) {
if (o.left[1] < clock && !o.gc) {
break break
} }
o = yield* this.getOperation(o.left) o = yield* this.getOperation(o.left)
} }
// new left is not gc'd and known according to the ss
op.left = o.left op.left = o.left
return op return op
} }
} }
class OperationStore extends Y.AbstractOperationStore { // eslint-disable-line no-undef class OperationStore extends Y.AbstractOperationStore {
constructor (y, opts) { constructor (y, opts) {
super(y, opts) super(y, opts)
this.os = new Y.utils.RBTree() this.os = new Y.utils.RBTree()

View File

@ -194,17 +194,27 @@ class RBTree {
} }
return true return true
} }
logTable (from, to) { logTable (from, to, filter) {
if (filter == null) {
filter = function () {
return true
}
}
if (from == null) { from = null } if (from == null) { from = null }
if (to == null) { to = null } if (to == null) { to = null }
var os = [] var os = []
this.iterate(from, to, function (o) { this.iterate(from, to, function (o) {
var o_ = Y.utils.copyObject(o) if (filter(o)) {
var id = o_.id var o_ = {}
delete o_.id for (var key in o) {
o_['id[0]'] = id[0] if (typeof o[key] === 'object') {
o_['id[1]'] = id[1] o_[key] = JSON.stringify(o[key])
} else {
o_[key] = o[key]
}
}
os.push(o_) os.push(o_)
}
}) })
if (console.table != null) { if (console.table != null) {
console.table(os) console.table(os)

View File

@ -33,7 +33,7 @@ var Struct = {
return op return op
}, },
requiredOps: function (op) { requiredOps: function (op) {
return [op.target] return [] // [op.target]
}, },
/* /*
Delete an operation from the OS, and add it to the GC, if necessary. Delete an operation from the OS, and add it to the GC, if necessary.

View File

@ -8,10 +8,10 @@ describe('Array Type', function () {
var y1, y2, y3, yconfig1, yconfig2, yconfig3, flushAll var y1, y2, y3, yconfig1, yconfig2, yconfig3, flushAll
beforeEach(async(function * (done) { beforeEach(async(function * (done) {
yield createUsers(this, 3) yield createUsers(this, 2)
y1 = (yconfig1 = this.users[0]).root y1 = (yconfig1 = this.users[0]).root
y2 = (yconfig2 = this.users[1]).root y2 = (yconfig2 = this.users[1]).root
y3 = (yconfig3 = this.users[2]).root // y3 = (yconfig3 = this.users[2]).root
flushAll = this.users[0].connector.flushAll flushAll = this.users[0].connector.flushAll
yield wait(10) yield wait(10)
done() done()