implemented auth utilities for yjs

This commit is contained in:
Kevin Jahns 2016-10-24 11:57:59 +02:00
parent 666ab8285c
commit 0521fac8d8
2 changed files with 143 additions and 98 deletions

View File

@ -1,6 +1,9 @@
/* @flow */ /* @flow */
'use strict' 'use strict'
function canRead (auth) { return auth === 'read' || auth === 'write' }
function canWrite (auth) { return auth === 'write' }
module.exports = function (Y/* :any */) { module.exports = function (Y/* :any */) {
class AbstractConnector { class AbstractConnector {
/* :: /* ::
@ -54,6 +57,8 @@ module.exports = function (Y/* :any */) {
this.syncStep2 = Promise.resolve() this.syncStep2 = Promise.resolve()
this.broadcastOpBuffer = [] this.broadcastOpBuffer = []
this.protocolVersion = 11 this.protocolVersion = 11
this.authInfo = opts.auth || null
this.checkAuth = opts.checkAuth || function () { return Promise.resolve('write') } // default is everyone has write access
} }
reconnect () { reconnect () {
} }
@ -87,6 +92,9 @@ module.exports = function (Y/* :any */) {
onUserEvent (f) { onUserEvent (f) {
this.userEventListeners.push(f) this.userEventListeners.push(f)
} }
removeUserEventListener (f) {
this.userEventListeners = this.userEventListeners.filter(g => { f !== g })
}
userLeft (user) { userLeft (user) {
if (this.connections[user] != null) { if (this.connections[user] != null) {
delete this.connections[user] delete this.connections[user]
@ -163,7 +171,8 @@ module.exports = function (Y/* :any */) {
type: 'sync step 1', type: 'sync step 1',
stateSet: stateSet, stateSet: stateSet,
deleteSet: deleteSet, deleteSet: deleteSet,
protocolVersion: conn.protocolVersion protocolVersion: conn.protocolVersion,
auth: conn.authInfo
}) })
}) })
} else { } else {
@ -217,7 +226,7 @@ module.exports = function (Y/* :any */) {
*/ */
receiveMessage (sender/* :UserId */, message/* :Message */) { receiveMessage (sender/* :UserId */, message/* :Message */) {
if (sender === this.userId) { if (sender === this.userId) {
return return Promise.resolve()
} }
if (this.debug) { if (this.debug) {
console.log(`receive ${sender} -> ${this.userId}: ${message.type}`, JSON.parse(JSON.stringify(message))) // eslint-disable-line console.log(`receive ${sender} -> ${this.userId}: ${message.type}`, JSON.parse(JSON.stringify(message))) // eslint-disable-line
@ -232,91 +241,118 @@ module.exports = function (Y/* :any */) {
type: 'sync stop', type: 'sync stop',
protocolVersion: this.protocolVersion protocolVersion: this.protocolVersion
}) })
return return Promise.reject('Incompatible protocol version')
} }
if (message.type === 'sync step 1') { if (message.auth != null && this.connections[sender] != null) {
let conn = this // authenticate using auth in message
let m = message var auth = this.checkAuth(message.auth, this.y)
this.y.db.requestTransaction(function *() { this.connections[sender].auth = auth
var currentStateSet = yield* this.getStateSet() auth.then(auth => {
yield* this.applyDeleteSet(m.deleteSet) for (var f of this.userEventListeners) {
f({
var ds = yield* this.getDeleteSet() action: 'userAuthenticated',
var ops = yield* this.getOperations(m.stateSet) user: sender,
conn.send(sender, { auth: auth
type: 'sync step 2',
os: ops,
stateSet: currentStateSet,
deleteSet: ds,
protocolVersion: this.protocolVersion
})
if (this.forwardToSyncingClients) {
conn.syncingClients.push(sender)
setTimeout(function () {
conn.syncingClients = conn.syncingClients.filter(function (cli) {
return cli !== sender
})
conn.send(sender, {
type: 'sync done'
})
}, 5000) // TODO: conn.syncingClientDuration)
} else {
conn.send(sender, {
type: 'sync done'
}) })
} }
conn._setSyncedWith(sender)
}) })
} else if (message.type === 'sync step 2') { } else if (this.connections[sender] != null && this.connections[sender].auth == null) {
let conn = this // authenticate without otherwise
var broadcastHB = !this.broadcastedHB this.connections[sender].auth = this.checkAuth(null, this.y)
this.broadcastedHB = true }
var db = this.y.db if (this.connections[sender] != null && this.connections[sender].auth != null) {
var defer = {} return this.connections[sender].auth.then((auth) => {
defer.promise = new Promise(function (resolve) { if (message.type === 'sync step 1' && canRead(auth)) {
defer.resolve = resolve let conn = this
}) let m = message
this.syncStep2 = defer.promise
let m /* :MessageSyncStep2 */ = message this.y.db.requestTransaction(function *() {
db.requestTransaction(function * () { var currentStateSet = yield* this.getStateSet()
yield* this.applyDeleteSet(m.deleteSet) if (canWrite(auth)) {
this.store.apply(m.os) yield* this.applyDeleteSet(m.deleteSet)
db.requestTransaction(function * () { }
var ops = yield* this.getOperations(m.stateSet)
if (ops.length > 0) { var ds = yield* this.getDeleteSet()
if (!broadcastHB) { // TODO: consider to broadcast here.. var ops = yield* this.getOperations(m.stateSet)
conn.send(sender, { conn.send(sender, {
type: 'update', type: 'sync step 2',
ops: ops os: ops,
}) stateSet: currentStateSet,
deleteSet: ds,
protocolVersion: this.protocolVersion,
auth: this.authInfo
})
if (this.forwardToSyncingClients) {
conn.syncingClients.push(sender)
setTimeout(function () {
conn.syncingClients = conn.syncingClients.filter(function (cli) {
return cli !== sender
})
conn.send(sender, {
type: 'sync done'
})
}, 5000) // TODO: conn.syncingClientDuration)
} else { } else {
// broadcast only once! conn.send(sender, {
conn.broadcastOps(ops) type: 'sync done'
})
}
conn._setSyncedWith(sender)
})
} else if (message.type === 'sync step 2' && canWrite(auth)) {
let conn = this
var broadcastHB = !this.broadcastedHB
this.broadcastedHB = true
var db = this.y.db
var defer = {}
defer.promise = new Promise(function (resolve) {
defer.resolve = resolve
})
this.syncStep2 = defer.promise
let m /* :MessageSyncStep2 */ = message
db.requestTransaction(function * () {
yield* this.applyDeleteSet(m.deleteSet)
this.store.apply(m.os)
db.requestTransaction(function * () {
var ops = yield* this.getOperations(m.stateSet)
if (ops.length > 0) {
if (!broadcastHB) { // TODO: consider to broadcast here..
conn.send(sender, {
type: 'update',
ops: ops
})
} else {
// broadcast only once!
conn.broadcastOps(ops)
}
}
defer.resolve()
})
})
} else if (message.type === 'sync done') {
var self = this
this.syncStep2.then(function () {
self._setSyncedWith(sender)
})
} else if (message.type === 'update' && canWrite(auth)) {
if (this.forwardToSyncingClients) {
for (var client of this.syncingClients) {
this.send(client, message)
} }
} }
defer.resolve() if (this.y.db.forwardAppliedOperations) {
}) var delops = message.ops.filter(function (o) {
}) return o.struct === 'Delete'
} else if (message.type === 'sync done') { })
var self = this if (delops.length > 0) {
this.syncStep2.then(function () { this.broadcastOps(delops)
self._setSyncedWith(sender) }
}) }
} else if (message.type === 'update') { this.y.db.apply(message.ops)
if (this.forwardToSyncingClients) {
for (var client of this.syncingClients) {
this.send(client, message)
} }
} })
if (this.y.db.forwardAppliedOperations) { } else {
var delops = message.ops.filter(function (o) { return Promise.reject('Unable to deliver message')
return o.struct === 'Delete'
})
if (delops.length > 0) {
this.broadcastOps(delops)
}
}
this.y.db.apply(message.ops)
} }
} }
_setSyncedWith (user) { _setSyncedWith (user) {

View File

@ -24,11 +24,21 @@ module.exports = function (Y) {
} }
}, },
whenTransactionsFinished: function () { whenTransactionsFinished: function () {
var ps = [] var self = this
for (var name in this.users) { return new Promise (function (resolve, reject) {
ps.push(this.users[name].y.db.whenTransactionsFinished()) // The connector first has to send the messages to the db.
} // Wait for the checkAuth-function to resolve
return Promise.all(ps) // The test lib only has a simple checkAuth function: `() => Promise.resolve()`
// Just add a function to the event-queue, in order to wait for the event.
// TODO: this may be buggy in test applications (but it isn't be for real-life apps)
setTimeout(function () {
var ps = []
for (var name in self.users) {
ps.push(self.users[name].y.db.whenTransactionsFinished())
}
Promise.all(ps).then(resolve, reject)
}, 0)
})
}, },
flushOne: function flushOne () { flushOne: function flushOne () {
var bufs = [] var bufs = []
@ -54,8 +64,9 @@ module.exports = function (Y) {
delete buff[sender] delete buff[sender]
} }
var user = globalRoom.users[userId] var user = globalRoom.users[userId]
user.receiveMessage(m[0], m[1]) return user.receiveMessage(m[0], m[1]).then(function () {
return user.y.db.whenTransactionsFinished() return user.y.db.whenTransactionsFinished()
}, function () {})
} else { } else {
return false return false
} }
@ -72,16 +83,14 @@ module.exports = function (Y) {
} }
globalRoom.whenTransactionsFinished().then(nextFlush) globalRoom.whenTransactionsFinished().then(nextFlush)
} else { } else {
setTimeout(function () { var c = globalRoom.flushOne()
var c = globalRoom.flushOne() if (c) {
if (c) { c.then(function () {
c.then(function () { globalRoom.whenTransactionsFinished().then(nextFlush)
globalRoom.whenTransactionsFinished().then(nextFlush) })
}) } else {
} else { resolve()
resolve() }
}
}, 0)
} }
} }
globalRoom.whenTransactionsFinished().then(nextFlush) globalRoom.whenTransactionsFinished().then(nextFlush)
@ -107,7 +116,7 @@ module.exports = function (Y) {
this.syncingClientDuration = 0 this.syncingClientDuration = 0
} }
receiveMessage (sender, m) { receiveMessage (sender, m) {
super.receiveMessage(sender, JSON.parse(JSON.stringify(m))) return super.receiveMessage(sender, JSON.parse(JSON.stringify(m)))
} }
send (userId, message) { send (userId, message) {
var buffer = globalRoom.buffers[userId] var buffer = globalRoom.buffers[userId]
@ -154,7 +163,7 @@ module.exports = function (Y) {
if (buff[sender].length === 0) { if (buff[sender].length === 0) {
delete buff[sender] delete buff[sender]
} }
this.receiveMessage(m[0], m[1]) yield this.receiveMessage(m[0], m[1])
} }
yield self.whenTransactionsFinished() yield self.whenTransactionsFinished()
}) })