fix syncing protocol - compute messages after auth
This commit is contained in:
parent
3e1d89253f
commit
4f57c91b82
@ -156,22 +156,24 @@ export default function extendConnector (Y/* :any */) {
|
|||||||
sendSyncStep1(this, syncUser)
|
sendSyncStep1(this, syncUser)
|
||||||
} else {
|
} else {
|
||||||
if (!conn.isSynced) {
|
if (!conn.isSynced) {
|
||||||
this.y.db.requestTransaction(function * () {
|
conn._fireIsSyncedListeners()
|
||||||
if (!conn.isSynced) {
|
|
||||||
// it is crucial that isSynced is set at the time garbageCollectAfterSync is called
|
|
||||||
conn.isSynced = true
|
|
||||||
// It is safer to remove this!
|
|
||||||
// TODO: remove: yield * this.garbageCollectAfterSync()
|
|
||||||
// call whensynced listeners
|
|
||||||
for (var f of conn.whenSyncedListeners) {
|
|
||||||
f()
|
|
||||||
}
|
|
||||||
conn.whenSyncedListeners = []
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
_fireIsSyncedListeners () {
|
||||||
|
this.y.db.whenTransactionsFinished().then(() => {
|
||||||
|
if (!this.isSynced) {
|
||||||
|
this.isSynced = true
|
||||||
|
// It is safer to remove this!
|
||||||
|
// TODO: remove: yield * this.garbageCollectAfterSync()
|
||||||
|
// call whensynced listeners
|
||||||
|
for (var f of this.whenSyncedListeners) {
|
||||||
|
f()
|
||||||
|
}
|
||||||
|
this.whenSyncedListeners = []
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
send (uid, buffer) {
|
send (uid, buffer) {
|
||||||
if (!(buffer instanceof ArrayBuffer || buffer instanceof Uint8Array)) {
|
if (!(buffer instanceof ArrayBuffer || buffer instanceof Uint8Array)) {
|
||||||
throw new Error('Expected Message to be an ArrayBuffer or Uint8Array - please don\'t use this method to send custom messages')
|
throw new Error('Expected Message to be an ArrayBuffer or Uint8Array - please don\'t use this method to send custom messages')
|
||||||
@ -244,33 +246,32 @@ export default function extendConnector (Y/* :any */) {
|
|||||||
if (messageType === 'sync step 1' || messageType === 'sync step 2') {
|
if (messageType === 'sync step 1' || messageType === 'sync step 2') {
|
||||||
let auth = decoder.readVarUint()
|
let auth = decoder.readVarUint()
|
||||||
if (senderConn.auth == null) {
|
if (senderConn.auth == null) {
|
||||||
senderConn.processAfterAuth.push([sender, buffer])
|
senderConn.processAfterAuth.push([messageType, senderConn, decoder, encoder, sender])
|
||||||
|
|
||||||
// check auth
|
// check auth
|
||||||
return this.checkAuth(auth, this.y, sender).then(authPermissions => {
|
return this.checkAuth(auth, this.y, sender).then(authPermissions => {
|
||||||
senderConn.auth = authPermissions
|
if (senderConn.auth == null) {
|
||||||
this.y.emit('userAuthenticated', {
|
senderConn.auth = authPermissions
|
||||||
user: senderConn.uid,
|
this.y.emit('userAuthenticated', {
|
||||||
auth: authPermissions
|
user: senderConn.uid,
|
||||||
})
|
auth: authPermissions
|
||||||
return senderConn.syncStep2.promise
|
})
|
||||||
}).then(() => {
|
|
||||||
if (senderConn.processAfterAuth == null) {
|
|
||||||
return Promise.resolve()
|
|
||||||
}
|
}
|
||||||
let messages = senderConn.processAfterAuth
|
let messages = senderConn.processAfterAuth
|
||||||
senderConn.processAfterAuth = null
|
senderConn.processAfterAuth = []
|
||||||
return Promise.all(messages.map(m =>
|
|
||||||
this.receiveMessage(m[0], m[1])
|
return messages.reduce((p, m) =>
|
||||||
))
|
p.then(() => this.computeMessage(m[0], m[1], m[2], m[3], m[4]))
|
||||||
|
, Promise.resolve())
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (senderConn.auth == null) {
|
if (senderConn.auth != null) {
|
||||||
senderConn.processAfterAuth.push([sender, buffer])
|
return this.computeMessage(messageType, senderConn, decoder, encoder, sender)
|
||||||
return Promise.resolve()
|
} else {
|
||||||
|
senderConn.processAfterAuth.push([messageType, senderConn, decoder, encoder, sender])
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
computeMessage (messageType, senderConn, decoder, encoder, sender) {
|
||||||
if (messageType === 'sync step 1' && (senderConn.auth === 'write' || senderConn.auth === 'read')) {
|
if (messageType === 'sync step 1' && (senderConn.auth === 'write' || senderConn.auth === 'read')) {
|
||||||
// cannot wait for sync step 1 to finish, because we may wait for sync step 2 in sync step 1 (->lock)
|
// cannot wait for sync step 1 to finish, because we may wait for sync step 2 in sync step 1 (->lock)
|
||||||
computeMessageSyncStep1(decoder, encoder, this, senderConn, sender)
|
computeMessageSyncStep1(decoder, encoder, this, senderConn, sender)
|
||||||
@ -292,6 +293,9 @@ export default function extendConnector (Y/* :any */) {
|
|||||||
this.currentSyncTarget = null
|
this.currentSyncTarget = null
|
||||||
this.findNextSyncTarget()
|
this.findNextSyncTarget()
|
||||||
}
|
}
|
||||||
|
if (this.role === 'slave' && conn.role === 'master') {
|
||||||
|
this._fireIsSyncedListeners()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
/*
|
/*
|
||||||
Currently, the HB encodes operations as JSON. For the moment I want to keep it
|
Currently, the HB encodes operations as JSON. For the moment I want to keep it
|
||||||
|
Loading…
x
Reference in New Issue
Block a user