added option for servers that want to propagate applied operations (aka the websockets connector)

This commit is contained in:
Kevin Jahns 2015-11-15 02:04:06 +01:00
parent ae12b087e7
commit 58a612eaa1
3 changed files with 32 additions and 1 deletions

View File

@ -20,6 +20,7 @@ module.exports = function (Y) {
} else { } else {
throw new Error("Role must be either 'master' or 'slave'!") throw new Error("Role must be either 'master' or 'slave'!")
} }
this.y.db.forwardAppliedOperations = opts.forwardAppliedOperations || false
this.role = opts.role this.role = opts.role
this.connections = {} this.connections = {}
this.isSynced = false this.isSynced = false
@ -221,6 +222,17 @@ module.exports = function (Y) {
this.send(client, m) this.send(client, m)
} }
} }
if (this.y.db.forwardAppliedOperations) {
var delops = m.ops.filter(function (o) {
return o.struct === 'Delete'
})
if (delops.length > 0) {
this.broadcast({
type: 'update',
ops: delops
})
}
}
this.y.db.apply(m.ops) this.y.db.apply(m.ops)
} }
} }

View File

@ -16,6 +16,8 @@ module.exports = function (Y) {
class AbstractDatabase { class AbstractDatabase {
constructor (y, opts) { constructor (y, opts) {
this.y = y this.y = y
// whether to broadcast all applied operations (insert & delete hook)
this.forwardAppliedOperations = false
// E.g. this.listenersById[id] : Array<Listener> // E.g. this.listenersById[id] : Array<Listener>
this.listenersById = {} this.listenersById = {}
// Execute the next time a transaction is requested // Execute the next time a transaction is requested

View File

@ -104,7 +104,8 @@ module.exports = function (Y) {
yield* this.store.tryExecute.call(this, op) yield* this.store.tryExecute.call(this, op)
send.push(Y.Struct[op.struct].encode(op)) send.push(Y.Struct[op.struct].encode(op))
} }
if (!this.store.y.connector.isDisconnected()) { if (!this.store.y.connector.isDisconnected()) { // TODO: && !this.store.forwardAppliedOperations (but then i don't send delete ops)
// is connected, and this is not going to be send in addOperation
this.store.y.connector.broadcast({ this.store.y.connector.broadcast({
type: 'update', type: 'update',
ops: send ops: send
@ -483,6 +484,15 @@ module.exports = function (Y) {
yield* this.garbageCollectOperation(id) yield* this.garbageCollectOperation(id)
} }
} }
if (this.store.forwardAppliedOperations) {
var ops = deletions.map(function(d){
return {struct: 'Delete', target: [d[0], d[1]]}
})
this.store.y.connector.broadcast({
type: 'update',
ops: ops
})
}
} }
* isGarbageCollected (id) { * isGarbageCollected (id) {
var n = yield* this.ds.findWithUpperBound(id) var n = yield* this.ds.findWithUpperBound(id)
@ -517,6 +527,13 @@ module.exports = function (Y) {
} }
* addOperation (op) { * addOperation (op) {
yield* this.os.put(op) yield* this.os.put(op)
if (!this.store.y.connector.isDisconnected() && this.store.forwardAppliedOperations) {
// is connected, and this is not going to be send in addOperation
this.store.y.connector.broadcast({
type: 'update',
ops: [op]
})
}
} }
* getOperation (id) { * getOperation (id) {
return yield* this.os.find(id) return yield* this.os.find(id)