implemented three-way sync for master-slave apps

This commit is contained in:
Kevin Jahns 2017-07-31 02:06:07 +02:00
parent aa6edcfd9b
commit e2ec53be65
2 changed files with 26 additions and 23 deletions

View File

@ -1,5 +1,5 @@
import { BinaryEncoder, BinaryDecoder } from './Encoding.js' import { BinaryEncoder, BinaryDecoder } from './Encoding.js'
import { computeMessageSyncStep1, computeMessageSyncStep2, computeMessageUpdate } from './MessageHandler.js' import { sendSyncStep1, computeMessageSyncStep1, computeMessageSyncStep2, computeMessageUpdate } from './MessageHandler.js'
export default function extendConnector (Y/* :any */) { export default function extendConnector (Y/* :any */) {
class AbstractConnector { class AbstractConnector {
@ -111,7 +111,8 @@ export default function extendConnector (Y/* :any */) {
uid: user, uid: user,
isSynced: false, isSynced: false,
role: role, role: role,
processAfterAuth: [] processAfterAuth: [],
receivedSyncStep2: false
}) })
let defer = {} let defer = {}
defer.promise = new Promise(function (resolve) { defer.resolve = resolve }) defer.promise = new Promise(function (resolve) { defer.resolve = resolve })
@ -137,8 +138,8 @@ export default function extendConnector (Y/* :any */) {
} }
} }
findNextSyncTarget () { findNextSyncTarget () {
if (this.currentSyncTarget != null) { if (this.currentSyncTarget != null || this.role === 'slave') {
return // "The current sync has not finished!" return // "The current sync has not finished or this is controlled by a master!"
} }
var syncUser = null var syncUser = null
@ -151,17 +152,7 @@ export default function extendConnector (Y/* :any */) {
var conn = this var conn = this
if (syncUser != null) { if (syncUser != null) {
this.currentSyncTarget = syncUser this.currentSyncTarget = syncUser
this.y.db.requestTransaction(function * () { sendSyncStep1(this, syncUser)
let encoder = new BinaryEncoder()
encoder.writeVarString(conn.opts.room || '')
encoder.writeVarString('sync step 1')
encoder.writeVarString(conn.authInfo || '')
encoder.writeVarUint(conn.protocolVersion)
let preferUntransformed = conn.preferUntransformed && this.os.length === 0 // TODO: length may not be defined
encoder.writeUint8(preferUntransformed ? 1 : 0)
yield * this.writeStateSet(encoder)
conn.send(syncUser, encoder.createBuffer())
})
} else { } else {
if (!conn.isSynced) { if (!conn.isSynced) {
this.y.db.requestTransaction(function * () { this.y.db.requestTransaction(function * () {

View File

@ -1,6 +1,6 @@
import Y from './y.js' import Y from './y.js'
import { BinaryDecoder } from './Encoding.js' import { BinaryDecoder, BinaryEncoder } from './Encoding.js'
export function formatYjsMessage (buffer) { export function formatYjsMessage (buffer) {
let decoder = new BinaryDecoder(buffer) let decoder = new BinaryDecoder(buffer)
@ -52,6 +52,20 @@ export async function computeMessageUpdate (decoder, encoder, conn) {
conn.y.db.applyOperations(decoder) conn.y.db.applyOperations(decoder)
} }
export function sendSyncStep1 (conn, syncUser) {
conn.y.db.requestTransaction(function * () {
let encoder = new BinaryEncoder()
encoder.writeVarString(conn.opts.room || '')
encoder.writeVarString('sync step 1')
encoder.writeVarString(conn.authInfo || '')
encoder.writeVarUint(conn.protocolVersion)
let preferUntransformed = conn.preferUntransformed && this.os.length === 0 // TODO: length may not be defined
encoder.writeUint8(preferUntransformed ? 1 : 0)
yield * this.writeStateSet(encoder)
conn.send(syncUser, encoder.createBuffer())
})
}
export function logMessageSyncStep1 (decoder, strBuilder) { export function logMessageSyncStep1 (decoder, strBuilder) {
let auth = decoder.readVarString() let auth = decoder.readVarString()
let protocolVersion = decoder.readVarUint() let protocolVersion = decoder.readVarUint()
@ -78,13 +92,7 @@ export async function computeMessageSyncStep1 (decoder, encoder, conn, senderCon
conn.y.destroy() conn.y.destroy()
} }
if (conn.role === 'slave') { // send sync step 2
// wait for sync step 2 to complete
await Promise.all(Array.from(conn.connections.values())
.filter(conn => conn.role === 'master')
.map(conn => conn.syncStep2.promise)
)
}
conn.y.db.requestTransaction(function * () { conn.y.db.requestTransaction(function * () {
encoder.writeVarString('sync step 2') encoder.writeVarString('sync step 2')
encoder.writeVarString(conn.authInfo || '') encoder.writeVarString(conn.authInfo || '')
@ -100,7 +108,11 @@ export async function computeMessageSyncStep1 (decoder, encoder, conn, senderCon
yield * this.writeDeleteSet(encoder) yield * this.writeDeleteSet(encoder)
conn.send(senderConn.uid, encoder.createBuffer()) conn.send(senderConn.uid, encoder.createBuffer())
senderConn.receivedSyncStep2 = true
}) })
if (conn.role === 'slave') {
sendSyncStep1(conn, sender)
}
await conn.y.db.whenTransactionsFinished() await conn.y.db.whenTransactionsFinished()
} }