From a5f55359c32c62d4ed69c0147bfcb04a8061a019 Mon Sep 17 00:00:00 2001 From: Kevin Jahns Date: Fri, 15 Jan 2016 17:57:06 +0100 Subject: [PATCH] improve data exchange performance --- src/Connector.js | 42 ++++++++++++++++++++++++++++++++---------- src/Transaction.js | 15 +++------------ 2 files changed, 35 insertions(+), 22 deletions(-) diff --git a/src/Connector.js b/src/Connector.js index bc55ccf2..4d537ede 100644 --- a/src/Connector.js +++ b/src/Connector.js @@ -50,6 +50,7 @@ module.exports = function (Y/* :any */) { this.debug = opts.debug === true this.broadcastedHB = false this.syncStep2 = Promise.resolve() + this.broadcastOpBuffer = [] } reconnect () { } @@ -166,6 +167,31 @@ module.exports = function (Y/* :any */) { console.log(`send ${this.userId} -> ${uid}: ${message.type}`, message) // eslint-disable-line } } + /* + Buffer operations, and broadcast them when ready. + */ + broadcastOps (ops) { + var self = this + function broadcastOperations () { + if (self.broadcastOpBuffer.length > 0) { + self.broadcast({ + type: 'update', + ops: self.broadcastOpBuffer + }) + self.broadcastOpBuffer = [] + } + } + if (this.broadcastOpBuffer.length === 0) { + this.broadcastOpBuffer = ops + if (this.y.db.transactionInProgress) { + this.y.db.whenTransactionsFinished().then(broadcastOperations) + } else { + setTimeout(broadcastOperations, 0) + } + } else { + this.broadcastOpBuffer = this.broadcastOpBuffer.concat(ops) + } + } /* You received a raw message, and you know that it is intended for Yjs. Then call this function. */ @@ -226,15 +252,14 @@ module.exports = function (Y/* :any */) { db.requestTransaction(function * () { var ops = yield* this.getOperations(m.stateSet) if (ops.length > 0) { - var update /* :MessageUpdate */ = { - type: 'update', - ops: ops - } if (!broadcastHB) { // TODO: consider to broadcast here.. - conn.send(sender, update) + conn.send(sender, { + type: 'update', + ops: ops + }) } else { // broadcast only once! - conn.broadcast(update) + conn.broadcastOps(ops) } } defer.resolve() @@ -256,10 +281,7 @@ module.exports = function (Y/* :any */) { return o.struct === 'Delete' }) if (delops.length > 0) { - this.broadcast({ - type: 'update', - ops: delops - }) + this.broadcastOps(delops) } } this.y.db.apply(message.ops) diff --git a/src/Transaction.js b/src/Transaction.js index 0d0b609d..da82223c 100644 --- a/src/Transaction.js +++ b/src/Transaction.js @@ -123,10 +123,7 @@ module.exports = function (Y/* :any */) { } if (!this.store.y.connector.isDisconnected() && send.length > 0) { // TODO: && !this.store.forwardAppliedOperations (but then i don't send delete ops) // is connected, and this is not going to be send in addOperation - this.store.y.connector.broadcast({ - type: 'update', - ops: send - }) + this.store.y.connector.broadcastOps(send) } } @@ -522,10 +519,7 @@ module.exports = function (Y/* :any */) { var ops = deletions.map(function (d) { return {struct: 'Delete', target: [d[0], d[1]]} }) - this.store.y.connector.broadcast({ - type: 'update', - ops: ops - }) + this.store.y.connector.broadcastOps(ops) } } * isGarbageCollected (id) { @@ -563,10 +557,7 @@ module.exports = function (Y/* :any */) { yield* this.os.put(op) if (!this.store.y.connector.isDisconnected() && this.store.forwardAppliedOperations && op.id[0] !== '_') { // is connected, and this is not going to be send in addOperation - this.store.y.connector.broadcast({ - type: 'update', - ops: [op] - }) + this.store.y.connector.broadcastOps([op]) } } * getOperation (id/* :any */)/* :Transaction */ {