improve data exchange performance

This commit is contained in:
Kevin Jahns 2016-01-15 17:57:06 +01:00
parent 102555a3b0
commit a5f55359c3
2 changed files with 35 additions and 22 deletions

View File

@ -50,6 +50,7 @@ module.exports = function (Y/* :any */) {
this.debug = opts.debug === true this.debug = opts.debug === true
this.broadcastedHB = false this.broadcastedHB = false
this.syncStep2 = Promise.resolve() this.syncStep2 = Promise.resolve()
this.broadcastOpBuffer = []
} }
reconnect () { reconnect () {
} }
@ -166,6 +167,31 @@ module.exports = function (Y/* :any */) {
console.log(`send ${this.userId} -> ${uid}: ${message.type}`, message) // eslint-disable-line console.log(`send ${this.userId} -> ${uid}: ${message.type}`, message) // eslint-disable-line
} }
} }
/*
Buffer operations, and broadcast them when ready.
*/
broadcastOps (ops) {
var self = this
function broadcastOperations () {
if (self.broadcastOpBuffer.length > 0) {
self.broadcast({
type: 'update',
ops: self.broadcastOpBuffer
})
self.broadcastOpBuffer = []
}
}
if (this.broadcastOpBuffer.length === 0) {
this.broadcastOpBuffer = ops
if (this.y.db.transactionInProgress) {
this.y.db.whenTransactionsFinished().then(broadcastOperations)
} else {
setTimeout(broadcastOperations, 0)
}
} else {
this.broadcastOpBuffer = this.broadcastOpBuffer.concat(ops)
}
}
/* /*
You received a raw message, and you know that it is intended for Yjs. Then call this function. You received a raw message, and you know that it is intended for Yjs. Then call this function.
*/ */
@ -226,15 +252,14 @@ module.exports = function (Y/* :any */) {
db.requestTransaction(function * () { db.requestTransaction(function * () {
var ops = yield* this.getOperations(m.stateSet) var ops = yield* this.getOperations(m.stateSet)
if (ops.length > 0) { if (ops.length > 0) {
var update /* :MessageUpdate */ = {
type: 'update',
ops: ops
}
if (!broadcastHB) { // TODO: consider to broadcast here.. if (!broadcastHB) { // TODO: consider to broadcast here..
conn.send(sender, update) conn.send(sender, {
type: 'update',
ops: ops
})
} else { } else {
// broadcast only once! // broadcast only once!
conn.broadcast(update) conn.broadcastOps(ops)
} }
} }
defer.resolve() defer.resolve()
@ -256,10 +281,7 @@ module.exports = function (Y/* :any */) {
return o.struct === 'Delete' return o.struct === 'Delete'
}) })
if (delops.length > 0) { if (delops.length > 0) {
this.broadcast({ this.broadcastOps(delops)
type: 'update',
ops: delops
})
} }
} }
this.y.db.apply(message.ops) this.y.db.apply(message.ops)

View File

@ -123,10 +123,7 @@ module.exports = function (Y/* :any */) {
} }
if (!this.store.y.connector.isDisconnected() && send.length > 0) { // TODO: && !this.store.forwardAppliedOperations (but then i don't send delete ops) if (!this.store.y.connector.isDisconnected() && send.length > 0) { // TODO: && !this.store.forwardAppliedOperations (but then i don't send delete ops)
// is connected, and this is not going to be send in addOperation // is connected, and this is not going to be send in addOperation
this.store.y.connector.broadcast({ this.store.y.connector.broadcastOps(send)
type: 'update',
ops: send
})
} }
} }
@ -522,10 +519,7 @@ module.exports = function (Y/* :any */) {
var ops = deletions.map(function (d) { var ops = deletions.map(function (d) {
return {struct: 'Delete', target: [d[0], d[1]]} return {struct: 'Delete', target: [d[0], d[1]]}
}) })
this.store.y.connector.broadcast({ this.store.y.connector.broadcastOps(ops)
type: 'update',
ops: ops
})
} }
} }
* isGarbageCollected (id) { * isGarbageCollected (id) {
@ -563,10 +557,7 @@ module.exports = function (Y/* :any */) {
yield* this.os.put(op) yield* this.os.put(op)
if (!this.store.y.connector.isDisconnected() && this.store.forwardAppliedOperations && op.id[0] !== '_') { if (!this.store.y.connector.isDisconnected() && this.store.forwardAppliedOperations && op.id[0] !== '_') {
// is connected, and this is not going to be send in addOperation // is connected, and this is not going to be send in addOperation
this.store.y.connector.broadcast({ this.store.y.connector.broadcastOps([op])
type: 'update',
ops: [op]
})
} }
} }
* getOperation (id/* :any */)/* :Transaction<any> */ { * getOperation (id/* :any */)/* :Transaction<any> */ {