fix several sync issues. improve performance a bit by removing ds from first sync step

This commit is contained in:
Kevin Jahns 2017-06-29 15:04:36 -07:00
parent 409a9414f1
commit a64730e651
3 changed files with 69 additions and 49 deletions

View File

@ -59,7 +59,6 @@ export default function extendConnector (Y/* :any */) {
this.syncingClients = [] this.syncingClients = []
this.forwardToSyncingClients = opts.forwardToSyncingClients !== false this.forwardToSyncingClients = opts.forwardToSyncingClients !== false
this.debug = opts.debug === true this.debug = opts.debug === true
this.syncStep2 = Promise.resolve()
this.broadcastOpBuffer = [] this.broadcastOpBuffer = []
this.protocolVersion = 11 this.protocolVersion = 11
this.authInfo = opts.auth || null this.authInfo = opts.auth || null
@ -146,6 +145,9 @@ export default function extendConnector (Y/* :any */) {
isSynced: false, isSynced: false,
role: role role: role
} }
let defer = {}
defer.promise = new Promise(function (resolve) { defer.resolve = resolve })
this.connections[user].syncStep2 = defer
for (var f of this.userEventListeners) { for (var f of this.userEventListeners) {
f({ f({
action: 'userJoined', action: 'userJoined',
@ -187,7 +189,7 @@ export default function extendConnector (Y/* :any */) {
var answer = { var answer = {
type: 'sync step 1', type: 'sync step 1',
stateSet: stateSet, stateSet: stateSet,
deleteSet: deleteSet, // deleteSet: deleteSet,
protocolVersion: conn.protocolVersion, protocolVersion: conn.protocolVersion,
auth: conn.authInfo auth: conn.authInfo
} }
@ -289,51 +291,59 @@ export default function extendConnector (Y/* :any */) {
if (message.type === 'sync step 1' && canRead(auth)) { if (message.type === 'sync step 1' && canRead(auth)) {
let conn = this let conn = this
let m = message let m = message
let wait // wait for sync step 2 to complete
if (this.role === 'slave') {
wait = Promise.all(Object.keys(this.connections)
.map(uid => this.connections[uid])
.filter(conn => conn.role === 'master')
.map(conn => conn.syncStep2.promise)
)
} else {
wait = Promise.resolve()
}
wait.then(() => {
this.y.db.requestTransaction(function * () {
var currentStateSet = yield * this.getStateSet()
// TODO: remove
// if (canWrite(auth)) {
// yield * this.applyDeleteSet(m.deleteSet)
// }
this.y.db.requestTransaction(function * () { var ds = yield * this.getDeleteSet()
var currentStateSet = yield * this.getStateSet() var answer = {
if (canWrite(auth)) { type: 'sync step 2',
yield * this.applyDeleteSet(m.deleteSet) stateSet: currentStateSet,
} deleteSet: ds,
protocolVersion: this.protocolVersion,
var ds = yield * this.getDeleteSet() auth: this.authInfo
var answer = { }
type: 'sync step 2', if (message.preferUntransformed === true && Object.keys(m.stateSet).length === 0) {
stateSet: currentStateSet, answer.osUntransformed = yield * this.getOperationsUntransformed()
deleteSet: ds, } else {
protocolVersion: this.protocolVersion, answer.os = yield * this.getOperations(m.stateSet)
auth: this.authInfo }
} conn.send(sender, answer)
if (message.preferUntransformed === true && Object.keys(m.stateSet).length === 0) { if (this.forwardToSyncingClients) {
answer.osUntransformed = yield * this.getOperationsUntransformed() conn.syncingClients.push(sender)
} else { setTimeout(function () {
answer.os = yield * this.getOperations(m.stateSet) conn.syncingClients = conn.syncingClients.filter(function (cli) {
} return cli !== sender
conn.send(sender, answer) })
if (this.forwardToSyncingClients) { conn.send(sender, {
conn.syncingClients.push(sender) type: 'sync done'
setTimeout(function () { })
conn.syncingClients = conn.syncingClients.filter(function (cli) { }, 5000) // TODO: conn.syncingClientDuration)
return cli !== sender } else {
})
conn.send(sender, { conn.send(sender, {
type: 'sync done' type: 'sync done'
}) })
}, 5000) // TODO: conn.syncingClientDuration) }
} else { })
conn.send(sender, {
type: 'sync done'
})
}
}) })
} else if (message.type === 'sync step 2' && canWrite(auth)) { } else if (message.type === 'sync step 2' && canWrite(auth)) {
var db = this.y.db var db = this.y.db
var defer = {} let defer = this.connections[sender].syncStep2
defer.promise = new Promise(function (resolve) { let m = message
defer.resolve = resolve
})
this.syncStep2 = defer.promise
let m /* :MessageSyncStep2 */ = message
// apply operations first // apply operations first
db.requestTransaction(function * () { db.requestTransaction(function * () {
yield * this.applyDeleteSet(m.deleteSet) yield * this.applyDeleteSet(m.deleteSet)
@ -344,18 +354,17 @@ export default function extendConnector (Y/* :any */) {
} }
defer.resolve() defer.resolve()
}) })
/* /*/ then apply ds
then apply ds
db.whenTransactionsFinished().then(() => { db.whenTransactionsFinished().then(() => {
db.requestTransaction(function * () { db.requestTransaction(function * () {
yield * this.applyDeleteSet(m.deleteSet) yield * this.applyDeleteSet(m.deleteSet)
}) })
defer.resolve() defer.resolve()
})*/ })*/
return this.syncStep2 return defer.promise
} else if (message.type === 'sync done') { } else if (message.type === 'sync done') {
var self = this var self = this
this.syncStep2.then(function () { this.connections[sender].syncStep2.promise.then(function () {
self._setSyncedWith(sender) self._setSyncedWith(sender)
}) })
} else if (message.type === 'update' && canWrite(auth)) { } else if (message.type === 'update' && canWrite(auth)) {

View File

@ -93,9 +93,19 @@ export async function initArrays (t, opts) {
var chance = opts.chance || new Chance(t.getSeed() * 1000000000) var chance = opts.chance || new Chance(t.getSeed() * 1000000000)
var connector = Object.assign({ room: 'debugging_' + t.name, testContext: t, chance }, opts.connector) var connector = Object.assign({ room: 'debugging_' + t.name, testContext: t, chance }, opts.connector)
for (let i = 0; i < opts.users; i++) { for (let i = 0; i < opts.users; i++) {
let dbOpts
let connOpts
if (i === 0) {
// Only one instance can gc!
dbOpts = Object.assign({ gc: true }, opts.db)
connOpts = Object.assign({ role: 'master' }, connector)
} else {
dbOpts = Object.assign({ gc: false }, opts.db)
connOpts = Object.assign({ role: 'slave' }, connector)
}
let y = await Y({ let y = await Y({
connector: connector, connector: connOpts,
db: Object.assign({ gc: i === 0 }, opts.db), // Only one instance can gc! db: dbOpts,
share: share share: share
}) })
result.users.push(y) result.users.push(y)

View File

@ -1,4 +1,5 @@
/* global Y */ /* global Y */
import { wait } from './helper.js'
var rooms = {} var rooms = {}
@ -13,8 +14,8 @@ export class TestRoom {
connector.setUserId('' + (this.nextUserId++)) connector.setUserId('' + (this.nextUserId++))
} }
Object.keys(this.users).forEach(uid => { Object.keys(this.users).forEach(uid => {
this.users[uid].userJoined(connector.userId, 'master') this.users[uid].userJoined(connector.userId, connector.role)
connector.userJoined(uid, 'master') connector.userJoined(uid, this.users[uid].role)
}) })
this.users[connector.userId] = connector this.users[connector.userId] = connector
} }
@ -43,6 +44,7 @@ export class TestRoom {
users = allUserIds.map(id => this.users[id].y) users = allUserIds.map(id => this.users[id].y)
} }
while (flushing) { while (flushing) {
await wait(10)
let res = await Promise.all(allUserIds.map(id => this.users[id]._flushAll(users))) let res = await Promise.all(allUserIds.map(id => this.users[id]._flushAll(users)))
flushing = res.some(status => status === 'flushing') flushing = res.some(status => status === 'flushing')
} }
@ -65,7 +67,6 @@ export default function extendTestConnector (Y) {
if (options.room == null) { if (options.room == null) {
throw new Error('You must define a room name!') throw new Error('You must define a room name!')
} }
options.role = 'slave'
super(y, options) super(y, options)
this.options = options this.options = options
this.room = options.room this.room = options.room