From 4f57c91b82d5644b2ade287fe9ff27e073baa4b2 Mon Sep 17 00:00:00 2001 From: Kevin Jahns Date: Thu, 3 Aug 2017 00:24:01 +0200 Subject: [PATCH] fix syncing protocol - compute messages after auth --- src/Connector.js | 68 +++++++++++++++++++++++++----------------------- 1 file changed, 36 insertions(+), 32 deletions(-) diff --git a/src/Connector.js b/src/Connector.js index b5bfe38f..6956f6eb 100644 --- a/src/Connector.js +++ b/src/Connector.js @@ -156,22 +156,24 @@ export default function extendConnector (Y/* :any */) { sendSyncStep1(this, syncUser) } else { if (!conn.isSynced) { - this.y.db.requestTransaction(function * () { - if (!conn.isSynced) { - // it is crucial that isSynced is set at the time garbageCollectAfterSync is called - conn.isSynced = true - // It is safer to remove this! - // TODO: remove: yield * this.garbageCollectAfterSync() - // call whensynced listeners - for (var f of conn.whenSyncedListeners) { - f() - } - conn.whenSyncedListeners = [] - } - }) + conn._fireIsSyncedListeners() } } } + _fireIsSyncedListeners () { + this.y.db.whenTransactionsFinished().then(() => { + if (!this.isSynced) { + this.isSynced = true + // It is safer to remove this! + // TODO: remove: yield * this.garbageCollectAfterSync() + // call whensynced listeners + for (var f of this.whenSyncedListeners) { + f() + } + this.whenSyncedListeners = [] + } + }) + } send (uid, buffer) { if (!(buffer instanceof ArrayBuffer || buffer instanceof Uint8Array)) { throw new Error('Expected Message to be an ArrayBuffer or Uint8Array - please don\'t use this method to send custom messages') @@ -244,33 +246,32 @@ export default function extendConnector (Y/* :any */) { if (messageType === 'sync step 1' || messageType === 'sync step 2') { let auth = decoder.readVarUint() if (senderConn.auth == null) { - senderConn.processAfterAuth.push([sender, buffer]) - + senderConn.processAfterAuth.push([messageType, senderConn, decoder, encoder, sender]) // check auth return this.checkAuth(auth, this.y, sender).then(authPermissions => { - senderConn.auth = authPermissions - this.y.emit('userAuthenticated', { - user: senderConn.uid, - auth: authPermissions - }) - return senderConn.syncStep2.promise - }).then(() => { - if (senderConn.processAfterAuth == null) { - return Promise.resolve() + if (senderConn.auth == null) { + senderConn.auth = authPermissions + this.y.emit('userAuthenticated', { + user: senderConn.uid, + auth: authPermissions + }) } let messages = senderConn.processAfterAuth - senderConn.processAfterAuth = null - return Promise.all(messages.map(m => - this.receiveMessage(m[0], m[1]) - )) + senderConn.processAfterAuth = [] + + return messages.reduce((p, m) => + p.then(() => this.computeMessage(m[0], m[1], m[2], m[3], m[4])) + , Promise.resolve()) }) } } - if (senderConn.auth == null) { - senderConn.processAfterAuth.push([sender, buffer]) - return Promise.resolve() + if (senderConn.auth != null) { + return this.computeMessage(messageType, senderConn, decoder, encoder, sender) + } else { + senderConn.processAfterAuth.push([messageType, senderConn, decoder, encoder, sender]) } - + } + computeMessage (messageType, senderConn, decoder, encoder, sender) { if (messageType === 'sync step 1' && (senderConn.auth === 'write' || senderConn.auth === 'read')) { // cannot wait for sync step 1 to finish, because we may wait for sync step 2 in sync step 1 (->lock) computeMessageSyncStep1(decoder, encoder, this, senderConn, sender) @@ -292,6 +293,9 @@ export default function extendConnector (Y/* :any */) { this.currentSyncTarget = null this.findNextSyncTarget() } + if (this.role === 'slave' && conn.role === 'master') { + this._fireIsSyncedListeners() + } } /* Currently, the HB encodes operations as JSON. For the moment I want to keep it