From e2ec53be6583efe5ecdb414c15695633a45b71b3 Mon Sep 17 00:00:00 2001
From: Kevin Jahns <kevin.jahns@rwth-aachen.de>
Date: Mon, 31 Jul 2017 02:06:07 +0200
Subject: [PATCH] implemented three-way sync for master-slave apps

---
 src/Connector.js      | 21 ++++++---------------
 src/MessageHandler.js | 28 ++++++++++++++++++++--------
 2 files changed, 26 insertions(+), 23 deletions(-)

diff --git a/src/Connector.js b/src/Connector.js
index 3a22d776..d544e63a 100644
--- a/src/Connector.js
+++ b/src/Connector.js
@@ -1,5 +1,5 @@
 import { BinaryEncoder, BinaryDecoder } from './Encoding.js'
-import { computeMessageSyncStep1, computeMessageSyncStep2, computeMessageUpdate } from './MessageHandler.js'
+import { sendSyncStep1, computeMessageSyncStep1, computeMessageSyncStep2, computeMessageUpdate } from './MessageHandler.js'
 
 export default function extendConnector (Y/* :any */) {
   class AbstractConnector {
@@ -111,7 +111,8 @@ export default function extendConnector (Y/* :any */) {
         uid: user,
         isSynced: false,
         role: role,
-        processAfterAuth: []
+        processAfterAuth: [],
+        receivedSyncStep2: false
       })
       let defer = {}
       defer.promise = new Promise(function (resolve) { defer.resolve = resolve })
@@ -137,8 +138,8 @@ export default function extendConnector (Y/* :any */) {
       }
     }
     findNextSyncTarget () {
-      if (this.currentSyncTarget != null) {
-        return // "The current sync has not finished!"
+      if (this.currentSyncTarget != null || this.role === 'slave') {
+        return // "The current sync has not finished or this is controlled by a master!"
       }
 
       var syncUser = null
@@ -151,17 +152,7 @@ export default function extendConnector (Y/* :any */) {
       var conn = this
       if (syncUser != null) {
         this.currentSyncTarget = syncUser
-        this.y.db.requestTransaction(function * () {
-          let encoder = new BinaryEncoder()
-          encoder.writeVarString(conn.opts.room || '')
-          encoder.writeVarString('sync step 1')
-          encoder.writeVarString(conn.authInfo || '')
-          encoder.writeVarUint(conn.protocolVersion)
-          let preferUntransformed = conn.preferUntransformed && this.os.length === 0 // TODO: length may not be defined
-          encoder.writeUint8(preferUntransformed ? 1 : 0)
-          yield * this.writeStateSet(encoder)
-          conn.send(syncUser, encoder.createBuffer())
-        })
+        sendSyncStep1(this, syncUser)
       } else {
         if (!conn.isSynced) {
           this.y.db.requestTransaction(function * () {
diff --git a/src/MessageHandler.js b/src/MessageHandler.js
index 0081d759..e2b01603 100644
--- a/src/MessageHandler.js
+++ b/src/MessageHandler.js
@@ -1,6 +1,6 @@
 
 import Y from './y.js'
-import { BinaryDecoder } from './Encoding.js'
+import { BinaryDecoder, BinaryEncoder } from './Encoding.js'
 
 export function formatYjsMessage (buffer) {
   let decoder = new BinaryDecoder(buffer)
@@ -52,6 +52,20 @@ export async function computeMessageUpdate (decoder, encoder, conn) {
   conn.y.db.applyOperations(decoder)
 }
 
+export function sendSyncStep1 (conn, syncUser) {
+  conn.y.db.requestTransaction(function * () {
+    let encoder = new BinaryEncoder()
+    encoder.writeVarString(conn.opts.room || '')
+    encoder.writeVarString('sync step 1')
+    encoder.writeVarString(conn.authInfo || '')
+    encoder.writeVarUint(conn.protocolVersion)
+    let preferUntransformed = conn.preferUntransformed && this.os.length === 0 // TODO: length may not be defined
+    encoder.writeUint8(preferUntransformed ? 1 : 0)
+    yield * this.writeStateSet(encoder)
+    conn.send(syncUser, encoder.createBuffer())
+  })
+}
+
 export function logMessageSyncStep1 (decoder, strBuilder) {
   let auth = decoder.readVarString()
   let protocolVersion = decoder.readVarUint()
@@ -78,13 +92,7 @@ export async function computeMessageSyncStep1 (decoder, encoder, conn, senderCon
     conn.y.destroy()
   }
 
-  if (conn.role === 'slave') {
-    // wait for sync step 2 to complete
-    await Promise.all(Array.from(conn.connections.values())
-      .filter(conn => conn.role === 'master')
-      .map(conn => conn.syncStep2.promise)
-    )
-  }
+  // send sync step 2
   conn.y.db.requestTransaction(function * () {
     encoder.writeVarString('sync step 2')
     encoder.writeVarString(conn.authInfo || '')
@@ -100,7 +108,11 @@ export async function computeMessageSyncStep1 (decoder, encoder, conn, senderCon
 
     yield * this.writeDeleteSet(encoder)
     conn.send(senderConn.uid, encoder.createBuffer())
+    senderConn.receivedSyncStep2 = true
   })
+  if (conn.role === 'slave') {
+    sendSyncStep1(conn, sender)
+  }
   await conn.y.db.whenTransactionsFinished()
 }