userJoined accepts auth parameter. Sync with all users at once, instead of one at a time

This commit is contained in:
Kevin Jahns 2017-08-04 16:27:07 +02:00
parent 90b2a895b8
commit d093ef56c8
2 changed files with 27 additions and 40 deletions

View File

@ -43,10 +43,12 @@ export default function extendConnector (Y/* :any */) {
this.setUserId(Y.utils.generateUserId())
}
}
reconnect () {
this.log('reconnecting..')
return this.y.db.startGarbageCollector()
}
disconnect () {
this.log('discronnecting..')
this.connections = new Map()
@ -56,13 +58,16 @@ export default function extendConnector (Y/* :any */) {
this.y.db.stopGarbageCollector()
return this.y.db.whenTransactionsFinished()
}
repair () {
this.log('Repairing the state of Yjs. This can happen if messages get lost, and Yjs detects that something is wrong. If this happens often, please report an issue here: https://github.com/y-js/yjs/issues')
this.connections.forEach(user => { user.isSynced = false })
this.isSynced = false
this.currentSyncTarget = null
this.findNextSyncTarget()
this.connections.forEach((user, userId) => {
user.isSynced = false
this._syncWithUser(userId)
})
}
setUserId (userId) {
if (this.userId == null) {
if (!Number.isInteger(userId)) {
@ -77,20 +82,21 @@ export default function extendConnector (Y/* :any */) {
return null
}
}
onUserEvent (f) {
this.userEventListeners.push(f)
}
removeUserEventListener (f) {
this.userEventListeners = this.userEventListeners.filter(g => f !== g)
}
userLeft (user) {
if (this.connections.has(user)) {
this.log('%s: User left %s', this.userId, user)
this.connections.delete(user)
if (user === this.currentSyncTarget) {
this.currentSyncTarget = null
this.findNextSyncTarget()
}
// check if isSynced event can be sent now
this._setSyncedWith(null)
for (var f of this.userEventListeners) {
f({
action: 'userLeft',
@ -99,7 +105,7 @@ export default function extendConnector (Y/* :any */) {
}
}
}
userJoined (user, role) {
userJoined (user, role, auth) {
if (role == null) {
throw new Error('You must specify the role of the joined user!')
}
@ -112,7 +118,7 @@ export default function extendConnector (Y/* :any */) {
isSynced: false,
role: role,
processAfterAuth: [],
auth: null,
auth: auth || null,
receivedSyncStep2: false
})
let defer = {}
@ -125,9 +131,7 @@ export default function extendConnector (Y/* :any */) {
role: role
})
}
if (this.currentSyncTarget == null) {
this.findNextSyncTarget()
}
this._syncWithUser(user)
}
// Execute a function _when_ we are connected.
// If not connected, wait until connected
@ -138,27 +142,11 @@ export default function extendConnector (Y/* :any */) {
this.whenSyncedListeners.push(f)
}
}
findNextSyncTarget () {
if (this.currentSyncTarget != null || this.role === 'slave') {
_syncWithUser (userid) {
if (this.role === 'slave') {
return // "The current sync has not finished or this is controlled by a master!"
}
var syncUser = null
for (var [uid, user] of this.connections) {
if (!user.isSynced) {
syncUser = uid
break
}
}
var conn = this
if (syncUser != null) {
this.currentSyncTarget = syncUser
sendSyncStep1(this, syncUser)
} else {
if (!conn.isSynced) {
conn._fireIsSyncedListeners()
}
}
sendSyncStep1(this, userid)
}
_fireIsSyncedListeners () {
this.y.db.whenTransactionsFinished().then(() => {
@ -271,6 +259,7 @@ export default function extendConnector (Y/* :any */) {
senderConn.processAfterAuth.push([messageType, senderConn, decoder, encoder, sender])
}
}
computeMessage (messageType, senderConn, decoder, encoder, sender) {
if (messageType === 'sync step 1' && (senderConn.auth === 'write' || senderConn.auth === 'read')) {
// cannot wait for sync step 1 to finish, because we may wait for sync step 2 in sync step 1 (->lock)
@ -284,19 +273,17 @@ export default function extendConnector (Y/* :any */) {
return Promise.reject(new Error('Unable to receive message'))
}
}
_setSyncedWith (user) {
var conn = this.connections.get(user)
if (conn != null) {
conn.isSynced = true
if (user != null) {
this.connections.get(user).isSynced = true
}
if (user === this.currentSyncTarget) {
this.currentSyncTarget = null
this.findNextSyncTarget()
}
if (this.role === 'slave' && conn.role === 'master') {
let conns = Array.from(this.connections.values())
if (conns.length > 0 && conns.every(u => u.isSynced)) {
this._fireIsSyncedListeners()
}
}
/*
Currently, the HB encodes operations as JSON. For the moment I want to keep it
that way. Maybe we support encoding in the HB as XML in the future, but for now I don't want

View File

@ -141,7 +141,7 @@ export async function initArrays (t, opts) {
let connOpts
if (i === 0) {
// Only one instance can gc!
dbOpts = Object.assign({ gc: true }, opts.db)
dbOpts = Object.assign({ gc: false }, opts.db)
connOpts = Object.assign({ role: 'master' }, connector)
} else {
dbOpts = Object.assign({ gc: false }, opts.db)