diff --git a/src/Connector.js b/src/Connector.js index 3f935cae..0e8e69fc 100644 --- a/src/Connector.js +++ b/src/Connector.js @@ -59,7 +59,6 @@ export default function extendConnector (Y/* :any */) { this.syncingClients = [] this.forwardToSyncingClients = opts.forwardToSyncingClients !== false this.debug = opts.debug === true - this.syncStep2 = Promise.resolve() this.broadcastOpBuffer = [] this.protocolVersion = 11 this.authInfo = opts.auth || null @@ -146,6 +145,9 @@ export default function extendConnector (Y/* :any */) { isSynced: false, role: role } + let defer = {} + defer.promise = new Promise(function (resolve) { defer.resolve = resolve }) + this.connections[user].syncStep2 = defer for (var f of this.userEventListeners) { f({ action: 'userJoined', @@ -187,7 +189,7 @@ export default function extendConnector (Y/* :any */) { var answer = { type: 'sync step 1', stateSet: stateSet, - deleteSet: deleteSet, + // deleteSet: deleteSet, protocolVersion: conn.protocolVersion, auth: conn.authInfo } @@ -289,51 +291,59 @@ export default function extendConnector (Y/* :any */) { if (message.type === 'sync step 1' && canRead(auth)) { let conn = this let m = message + let wait // wait for sync step 2 to complete + if (this.role === 'slave') { + wait = Promise.all(Object.keys(this.connections) + .map(uid => this.connections[uid]) + .filter(conn => conn.role === 'master') + .map(conn => conn.syncStep2.promise) + ) + } else { + wait = Promise.resolve() + } + wait.then(() => { + this.y.db.requestTransaction(function * () { + var currentStateSet = yield * this.getStateSet() + // TODO: remove + // if (canWrite(auth)) { + // yield * this.applyDeleteSet(m.deleteSet) + // } - this.y.db.requestTransaction(function * () { - var currentStateSet = yield * this.getStateSet() - if (canWrite(auth)) { - yield * this.applyDeleteSet(m.deleteSet) - } - - var ds = yield * this.getDeleteSet() - var answer = { - type: 'sync step 2', - stateSet: currentStateSet, - deleteSet: ds, - protocolVersion: this.protocolVersion, - auth: this.authInfo - } - if (message.preferUntransformed === true && Object.keys(m.stateSet).length === 0) { - answer.osUntransformed = yield * this.getOperationsUntransformed() - } else { - answer.os = yield * this.getOperations(m.stateSet) - } - conn.send(sender, answer) - if (this.forwardToSyncingClients) { - conn.syncingClients.push(sender) - setTimeout(function () { - conn.syncingClients = conn.syncingClients.filter(function (cli) { - return cli !== sender - }) + var ds = yield * this.getDeleteSet() + var answer = { + type: 'sync step 2', + stateSet: currentStateSet, + deleteSet: ds, + protocolVersion: this.protocolVersion, + auth: this.authInfo + } + if (message.preferUntransformed === true && Object.keys(m.stateSet).length === 0) { + answer.osUntransformed = yield * this.getOperationsUntransformed() + } else { + answer.os = yield * this.getOperations(m.stateSet) + } + conn.send(sender, answer) + if (this.forwardToSyncingClients) { + conn.syncingClients.push(sender) + setTimeout(function () { + conn.syncingClients = conn.syncingClients.filter(function (cli) { + return cli !== sender + }) + conn.send(sender, { + type: 'sync done' + }) + }, 5000) // TODO: conn.syncingClientDuration) + } else { conn.send(sender, { type: 'sync done' }) - }, 5000) // TODO: conn.syncingClientDuration) - } else { - conn.send(sender, { - type: 'sync done' - }) - } + } + }) }) } else if (message.type === 'sync step 2' && canWrite(auth)) { var db = this.y.db - var defer = {} - defer.promise = new Promise(function (resolve) { - defer.resolve = resolve - }) - this.syncStep2 = defer.promise - let m /* :MessageSyncStep2 */ = message + let defer = this.connections[sender].syncStep2 + let m = message // apply operations first db.requestTransaction(function * () { yield * this.applyDeleteSet(m.deleteSet) @@ -344,18 +354,17 @@ export default function extendConnector (Y/* :any */) { } defer.resolve() }) - /* - then apply ds + /*/ then apply ds db.whenTransactionsFinished().then(() => { db.requestTransaction(function * () { yield * this.applyDeleteSet(m.deleteSet) }) defer.resolve() })*/ - return this.syncStep2 + return defer.promise } else if (message.type === 'sync done') { var self = this - this.syncStep2.then(function () { + this.connections[sender].syncStep2.promise.then(function () { self._setSyncedWith(sender) }) } else if (message.type === 'update' && canWrite(auth)) { diff --git a/tests-lib/helper.js b/tests-lib/helper.js index 4d0ed0ad..bdba1501 100644 --- a/tests-lib/helper.js +++ b/tests-lib/helper.js @@ -93,9 +93,19 @@ export async function initArrays (t, opts) { var chance = opts.chance || new Chance(t.getSeed() * 1000000000) var connector = Object.assign({ room: 'debugging_' + t.name, testContext: t, chance }, opts.connector) for (let i = 0; i < opts.users; i++) { + let dbOpts + let connOpts + if (i === 0) { + // Only one instance can gc! + dbOpts = Object.assign({ gc: true }, opts.db) + connOpts = Object.assign({ role: 'master' }, connector) + } else { + dbOpts = Object.assign({ gc: false }, opts.db) + connOpts = Object.assign({ role: 'slave' }, connector) + } let y = await Y({ - connector: connector, - db: Object.assign({ gc: i === 0 }, opts.db), // Only one instance can gc! + connector: connOpts, + db: dbOpts, share: share }) result.users.push(y) diff --git a/tests-lib/test-connector.js b/tests-lib/test-connector.js index cec03492..d75c2591 100644 --- a/tests-lib/test-connector.js +++ b/tests-lib/test-connector.js @@ -1,4 +1,5 @@ /* global Y */ +import { wait } from './helper.js' var rooms = {} @@ -13,8 +14,8 @@ export class TestRoom { connector.setUserId('' + (this.nextUserId++)) } Object.keys(this.users).forEach(uid => { - this.users[uid].userJoined(connector.userId, 'master') - connector.userJoined(uid, 'master') + this.users[uid].userJoined(connector.userId, connector.role) + connector.userJoined(uid, this.users[uid].role) }) this.users[connector.userId] = connector } @@ -43,6 +44,7 @@ export class TestRoom { users = allUserIds.map(id => this.users[id].y) } while (flushing) { + await wait(10) let res = await Promise.all(allUserIds.map(id => this.users[id]._flushAll(users))) flushing = res.some(status => status === 'flushing') } @@ -65,7 +67,6 @@ export default function extendTestConnector (Y) { if (options.room == null) { throw new Error('You must define a room name!') } - options.role = 'slave' super(y, options) this.options = options this.room = options.room