diff --git a/src/Connector.js b/src/Connector.js index 3a22d776..d544e63a 100644 --- a/src/Connector.js +++ b/src/Connector.js @@ -1,5 +1,5 @@ import { BinaryEncoder, BinaryDecoder } from './Encoding.js' -import { computeMessageSyncStep1, computeMessageSyncStep2, computeMessageUpdate } from './MessageHandler.js' +import { sendSyncStep1, computeMessageSyncStep1, computeMessageSyncStep2, computeMessageUpdate } from './MessageHandler.js' export default function extendConnector (Y/* :any */) { class AbstractConnector { @@ -111,7 +111,8 @@ export default function extendConnector (Y/* :any */) { uid: user, isSynced: false, role: role, - processAfterAuth: [] + processAfterAuth: [], + receivedSyncStep2: false }) let defer = {} defer.promise = new Promise(function (resolve) { defer.resolve = resolve }) @@ -137,8 +138,8 @@ export default function extendConnector (Y/* :any */) { } } findNextSyncTarget () { - if (this.currentSyncTarget != null) { - return // "The current sync has not finished!" + if (this.currentSyncTarget != null || this.role === 'slave') { + return // "The current sync has not finished or this is controlled by a master!" } var syncUser = null @@ -151,17 +152,7 @@ export default function extendConnector (Y/* :any */) { var conn = this if (syncUser != null) { this.currentSyncTarget = syncUser - this.y.db.requestTransaction(function * () { - let encoder = new BinaryEncoder() - encoder.writeVarString(conn.opts.room || '') - encoder.writeVarString('sync step 1') - encoder.writeVarString(conn.authInfo || '') - encoder.writeVarUint(conn.protocolVersion) - let preferUntransformed = conn.preferUntransformed && this.os.length === 0 // TODO: length may not be defined - encoder.writeUint8(preferUntransformed ? 1 : 0) - yield * this.writeStateSet(encoder) - conn.send(syncUser, encoder.createBuffer()) - }) + sendSyncStep1(this, syncUser) } else { if (!conn.isSynced) { this.y.db.requestTransaction(function * () { diff --git a/src/MessageHandler.js b/src/MessageHandler.js index 0081d759..e2b01603 100644 --- a/src/MessageHandler.js +++ b/src/MessageHandler.js @@ -1,6 +1,6 @@ import Y from './y.js' -import { BinaryDecoder } from './Encoding.js' +import { BinaryDecoder, BinaryEncoder } from './Encoding.js' export function formatYjsMessage (buffer) { let decoder = new BinaryDecoder(buffer) @@ -52,6 +52,20 @@ export async function computeMessageUpdate (decoder, encoder, conn) { conn.y.db.applyOperations(decoder) } +export function sendSyncStep1 (conn, syncUser) { + conn.y.db.requestTransaction(function * () { + let encoder = new BinaryEncoder() + encoder.writeVarString(conn.opts.room || '') + encoder.writeVarString('sync step 1') + encoder.writeVarString(conn.authInfo || '') + encoder.writeVarUint(conn.protocolVersion) + let preferUntransformed = conn.preferUntransformed && this.os.length === 0 // TODO: length may not be defined + encoder.writeUint8(preferUntransformed ? 1 : 0) + yield * this.writeStateSet(encoder) + conn.send(syncUser, encoder.createBuffer()) + }) +} + export function logMessageSyncStep1 (decoder, strBuilder) { let auth = decoder.readVarString() let protocolVersion = decoder.readVarUint() @@ -78,13 +92,7 @@ export async function computeMessageSyncStep1 (decoder, encoder, conn, senderCon conn.y.destroy() } - if (conn.role === 'slave') { - // wait for sync step 2 to complete - await Promise.all(Array.from(conn.connections.values()) - .filter(conn => conn.role === 'master') - .map(conn => conn.syncStep2.promise) - ) - } + // send sync step 2 conn.y.db.requestTransaction(function * () { encoder.writeVarString('sync step 2') encoder.writeVarString(conn.authInfo || '') @@ -100,7 +108,11 @@ export async function computeMessageSyncStep1 (decoder, encoder, conn, senderCon yield * this.writeDeleteSet(encoder) conn.send(senderConn.uid, encoder.createBuffer()) + senderConn.receivedSyncStep2 = true }) + if (conn.role === 'slave') { + sendSyncStep1(conn, sender) + } await conn.y.db.whenTransactionsFinished() }