494 lines
17 KiB
JavaScript
494 lines
17 KiB
JavaScript
/* @flow */
|
|
'use strict'
|
|
|
|
function canRead (auth) { return auth === 'read' || auth === 'write' }
|
|
function canWrite (auth) { return auth === 'write' }
|
|
|
|
export default function extendConnector (Y/* :any */) {
|
|
class AbstractConnector {
|
|
/* ::
|
|
y: YConfig;
|
|
role: SyncRole;
|
|
connections: Object;
|
|
isSynced: boolean;
|
|
userEventListeners: Array<Function>;
|
|
whenSyncedListeners: Array<Function>;
|
|
currentSyncTarget: ?UserId;
|
|
syncingClients: Array<UserId>;
|
|
forwardToSyncingClients: boolean;
|
|
debug: boolean;
|
|
syncStep2: Promise;
|
|
userId: UserId;
|
|
send: Function;
|
|
broadcast: Function;
|
|
broadcastOpBuffer: Array<Operation>;
|
|
protocolVersion: number;
|
|
*/
|
|
/*
|
|
opts contains the following information:
|
|
role : String Role of this client ("master" or "slave")
|
|
userId : String Uniquely defines the user.
|
|
debug: Boolean Whether to print debug messages (optional)
|
|
*/
|
|
constructor (y, opts) {
|
|
this.y = y
|
|
if (opts == null) {
|
|
opts = {}
|
|
}
|
|
// Prefer to receive untransformed operations. This does only work if
|
|
// this client receives operations from only one other client.
|
|
// In particular, this does not work with y-webrtc.
|
|
// It will work with y-websockets-client
|
|
this.preferUntransformed = opts.preferUntransformed || false
|
|
if (opts.role == null || opts.role === 'master') {
|
|
this.role = 'master'
|
|
} else if (opts.role === 'slave') {
|
|
this.role = 'slave'
|
|
} else {
|
|
throw new Error("Role must be either 'master' or 'slave'!")
|
|
}
|
|
this.log = Y.debug('y:connector')
|
|
this.logMessage = Y.debug('y:connector-message')
|
|
this.y.db.forwardAppliedOperations = opts.forwardAppliedOperations || false
|
|
this.role = opts.role
|
|
this.connections = {}
|
|
this.isSynced = false
|
|
this.userEventListeners = []
|
|
this.whenSyncedListeners = []
|
|
this.currentSyncTarget = null
|
|
this.syncingClients = []
|
|
this.forwardToSyncingClients = opts.forwardToSyncingClients !== false
|
|
this.debug = opts.debug === true
|
|
this.broadcastOpBuffer = []
|
|
this.protocolVersion = 11
|
|
this.authInfo = opts.auth || null
|
|
this.checkAuth = opts.checkAuth || function () { return Promise.resolve('write') } // default is everyone has write access
|
|
if (opts.generateUserId !== false) {
|
|
this.setUserId(Y.utils.generateGuid())
|
|
}
|
|
}
|
|
resetAuth (auth) {
|
|
if (this.authInfo !== auth) {
|
|
this.authInfo = auth
|
|
this.broadcast({
|
|
type: 'auth',
|
|
auth: this.authInfo
|
|
})
|
|
}
|
|
}
|
|
reconnect () {
|
|
this.log('reconnecting..')
|
|
return this.y.db.startGarbageCollector()
|
|
}
|
|
disconnect () {
|
|
this.log('discronnecting..')
|
|
this.connections = {}
|
|
this.isSynced = false
|
|
this.currentSyncTarget = null
|
|
this.syncingClients = []
|
|
this.whenSyncedListeners = []
|
|
this.y.db.stopGarbageCollector()
|
|
return this.y.db.whenTransactionsFinished()
|
|
}
|
|
repair () {
|
|
this.log('Repairing the state of Yjs. This can happen if messages get lost, and Yjs detects that something is wrong. If this happens often, please report an issue here: https://github.com/y-js/yjs/issues')
|
|
for (var name in this.connections) {
|
|
this.connections[name].isSynced = false
|
|
}
|
|
this.isSynced = false
|
|
this.currentSyncTarget = null
|
|
this.findNextSyncTarget()
|
|
}
|
|
setUserId (userId) {
|
|
if (this.userId == null) {
|
|
this.log('Set userId to "%s"', userId)
|
|
this.userId = userId
|
|
return this.y.db.setUserId(userId)
|
|
} else {
|
|
return null
|
|
}
|
|
}
|
|
onUserEvent (f) {
|
|
this.userEventListeners.push(f)
|
|
}
|
|
removeUserEventListener (f) {
|
|
this.userEventListeners = this.userEventListeners.filter(g => f !== g)
|
|
}
|
|
userLeft (user) {
|
|
if (this.connections[user] != null) {
|
|
this.log('User left: %s', user)
|
|
delete this.connections[user]
|
|
if (user === this.currentSyncTarget) {
|
|
this.currentSyncTarget = null
|
|
this.findNextSyncTarget()
|
|
}
|
|
this.syncingClients = this.syncingClients.filter(function (cli) {
|
|
return cli !== user
|
|
})
|
|
for (var f of this.userEventListeners) {
|
|
f({
|
|
action: 'userLeft',
|
|
user: user
|
|
})
|
|
}
|
|
}
|
|
}
|
|
userJoined (user, role) {
|
|
if (role == null) {
|
|
throw new Error('You must specify the role of the joined user!')
|
|
}
|
|
if (this.connections[user] != null) {
|
|
throw new Error('This user already joined!')
|
|
}
|
|
this.log('User joined: %s', user)
|
|
this.connections[user] = {
|
|
isSynced: false,
|
|
role: role
|
|
}
|
|
let defer = {}
|
|
defer.promise = new Promise(function (resolve) { defer.resolve = resolve })
|
|
this.connections[user].syncStep2 = defer
|
|
for (var f of this.userEventListeners) {
|
|
f({
|
|
action: 'userJoined',
|
|
user: user,
|
|
role: role
|
|
})
|
|
}
|
|
if (this.currentSyncTarget == null) {
|
|
this.findNextSyncTarget()
|
|
}
|
|
}
|
|
// Execute a function _when_ we are connected.
|
|
// If not connected, wait until connected
|
|
whenSynced (f) {
|
|
if (this.isSynced) {
|
|
f()
|
|
} else {
|
|
this.whenSyncedListeners.push(f)
|
|
}
|
|
}
|
|
findNextSyncTarget () {
|
|
if (this.currentSyncTarget != null) {
|
|
return // "The current sync has not finished!"
|
|
}
|
|
|
|
var syncUser = null
|
|
for (var uid in this.connections) {
|
|
if (!this.connections[uid].isSynced) {
|
|
syncUser = uid
|
|
break
|
|
}
|
|
}
|
|
var conn = this
|
|
if (syncUser != null) {
|
|
this.currentSyncTarget = syncUser
|
|
this.y.db.requestTransaction(function * () {
|
|
var stateSet = yield * this.getStateSet()
|
|
// var deleteSet = yield * this.getDeleteSet()
|
|
var answer = {
|
|
type: 'sync step 1',
|
|
stateSet: stateSet,
|
|
// deleteSet: deleteSet,
|
|
protocolVersion: conn.protocolVersion,
|
|
auth: conn.authInfo
|
|
}
|
|
if (conn.preferUntransformed && Object.keys(stateSet).length === 0) {
|
|
answer.preferUntransformed = true
|
|
}
|
|
conn.send(syncUser, answer)
|
|
})
|
|
} else {
|
|
if (!conn.isSynced) {
|
|
this.y.db.requestTransaction(function * () {
|
|
if (!conn.isSynced) {
|
|
// it is crucial that isSynced is set at the time garbageCollectAfterSync is called
|
|
conn.isSynced = true
|
|
// It is safer to remove this!
|
|
// TODO: remove: yield * this.garbageCollectAfterSync()
|
|
// call whensynced listeners
|
|
for (var f of conn.whenSyncedListeners) {
|
|
f()
|
|
}
|
|
conn.whenSyncedListeners = []
|
|
}
|
|
})
|
|
}
|
|
}
|
|
}
|
|
send (uid, message) {
|
|
this.log('Send \'%s\' to %s', message.type, uid)
|
|
this.logMessage('Message: %j', message)
|
|
}
|
|
broadcast (message) {
|
|
this.log('Broadcast \'%s\'', message.type)
|
|
this.logMessage('Message: %j', message)
|
|
}
|
|
/*
|
|
Buffer operations, and broadcast them when ready.
|
|
*/
|
|
broadcastOps (ops) {
|
|
ops = ops.map(function (op) {
|
|
return Y.Struct[op.struct].encode(op)
|
|
})
|
|
var self = this
|
|
function broadcastOperations () {
|
|
if (self.broadcastOpBuffer.length > 0) {
|
|
self.broadcast({
|
|
type: 'update',
|
|
ops: self.broadcastOpBuffer
|
|
})
|
|
self.broadcastOpBuffer = []
|
|
}
|
|
}
|
|
if (this.broadcastOpBuffer.length === 0) {
|
|
this.broadcastOpBuffer = ops
|
|
this.y.db.whenTransactionsFinished().then(broadcastOperations)
|
|
} else {
|
|
this.broadcastOpBuffer = this.broadcastOpBuffer.concat(ops)
|
|
}
|
|
}
|
|
/*
|
|
You received a raw message, and you know that it is intended for Yjs. Then call this function.
|
|
*/
|
|
receiveMessage (sender/* :UserId */, message/* :Message */) {
|
|
if (sender === this.userId) {
|
|
return Promise.resolve()
|
|
}
|
|
this.log('Receive \'%s\' from %s', message.type, sender)
|
|
this.logMessage('Message: %j', message)
|
|
if (message.protocolVersion != null && message.protocolVersion !== this.protocolVersion) {
|
|
this.log(
|
|
`You tried to sync with a yjs instance that has a different protocol version
|
|
(You: ${this.protocolVersion}, Client: ${message.protocolVersion}).
|
|
The sync was stopped. You need to upgrade your dependencies (especially Yjs & the Connector)!
|
|
`)
|
|
this.send(sender, {
|
|
type: 'sync stop',
|
|
protocolVersion: this.protocolVersion
|
|
})
|
|
return Promise.reject(new Error('Incompatible protocol version'))
|
|
}
|
|
if (message.auth != null && this.connections[sender] != null) {
|
|
// authenticate using auth in message
|
|
var auth = this.checkAuth(message.auth, this.y)
|
|
this.connections[sender].auth = auth
|
|
auth.then(auth => {
|
|
for (var f of this.userEventListeners) {
|
|
f({
|
|
action: 'userAuthenticated',
|
|
user: sender,
|
|
auth: auth
|
|
})
|
|
}
|
|
})
|
|
} else if (this.connections[sender] != null && this.connections[sender].auth == null) {
|
|
// authenticate without otherwise
|
|
this.connections[sender].auth = this.checkAuth(null, this.y)
|
|
}
|
|
if (this.connections[sender] != null && this.connections[sender].auth != null) {
|
|
return this.connections[sender].auth.then((auth) => {
|
|
if (message.type === 'sync step 1' && canRead(auth)) {
|
|
let conn = this
|
|
let m = message
|
|
let wait // wait for sync step 2 to complete
|
|
if (this.role === 'slave') {
|
|
wait = Promise.all(Object.keys(this.connections)
|
|
.map(uid => this.connections[uid])
|
|
.filter(conn => conn.role === 'master')
|
|
.map(conn => conn.syncStep2.promise)
|
|
)
|
|
} else {
|
|
wait = Promise.resolve()
|
|
}
|
|
wait.then(() => {
|
|
this.y.db.requestTransaction(function * () {
|
|
var currentStateSet = yield * this.getStateSet()
|
|
// TODO: remove
|
|
// if (canWrite(auth)) {
|
|
// yield * this.applyDeleteSet(m.deleteSet)
|
|
// }
|
|
|
|
var ds = yield * this.getDeleteSet()
|
|
var answer = {
|
|
type: 'sync step 2',
|
|
stateSet: currentStateSet,
|
|
deleteSet: ds,
|
|
protocolVersion: this.protocolVersion,
|
|
auth: this.authInfo
|
|
}
|
|
if (message.preferUntransformed === true && Object.keys(m.stateSet).length === 0) {
|
|
answer.osUntransformed = yield * this.getOperationsUntransformed()
|
|
} else {
|
|
answer.os = yield * this.getOperations(m.stateSet)
|
|
}
|
|
conn.send(sender, answer)
|
|
if (this.forwardToSyncingClients) {
|
|
conn.syncingClients.push(sender)
|
|
setTimeout(function () {
|
|
conn.syncingClients = conn.syncingClients.filter(function (cli) {
|
|
return cli !== sender
|
|
})
|
|
conn.send(sender, {
|
|
type: 'sync done'
|
|
})
|
|
}, 5000) // TODO: conn.syncingClientDuration)
|
|
} else {
|
|
conn.send(sender, {
|
|
type: 'sync done'
|
|
})
|
|
}
|
|
})
|
|
})
|
|
} else if (message.type === 'sync step 2' && canWrite(auth)) {
|
|
var db = this.y.db
|
|
let defer = this.connections[sender].syncStep2
|
|
let m = message
|
|
// apply operations first
|
|
db.requestTransaction(function * () {
|
|
// yield * this.applyDeleteSet(m.deleteSet)
|
|
if (m.osUntransformed != null) {
|
|
yield * this.applyOperationsUntransformed(m.osUntransformed, m.stateSet)
|
|
} else {
|
|
this.store.apply(m.os)
|
|
}
|
|
// defer.resolve()
|
|
})
|
|
// then apply ds
|
|
db.whenTransactionsFinished().then(() => {
|
|
db.requestTransaction(function * () {
|
|
yield * this.applyDeleteSet(m.deleteSet)
|
|
})
|
|
defer.resolve()
|
|
})
|
|
return defer.promise
|
|
} else if (message.type === 'sync done') {
|
|
var self = this
|
|
this.connections[sender].syncStep2.promise.then(function () {
|
|
self._setSyncedWith(sender)
|
|
})
|
|
} else if (message.type === 'update' && canWrite(auth)) {
|
|
if (this.forwardToSyncingClients) {
|
|
for (var client of this.syncingClients) {
|
|
this.send(client, message)
|
|
}
|
|
}
|
|
if (this.y.db.forwardAppliedOperations) {
|
|
var delops = message.ops.filter(function (o) {
|
|
return o.struct === 'Delete'
|
|
})
|
|
if (delops.length > 0) {
|
|
this.broadcastOps(delops)
|
|
}
|
|
}
|
|
this.y.db.apply(message.ops)
|
|
}
|
|
})
|
|
} else {
|
|
return Promise.reject(new Error('Unable to deliver message'))
|
|
}
|
|
}
|
|
_setSyncedWith (user) {
|
|
var conn = this.connections[user]
|
|
if (conn != null) {
|
|
conn.isSynced = true
|
|
}
|
|
if (user === this.currentSyncTarget) {
|
|
this.currentSyncTarget = null
|
|
this.findNextSyncTarget()
|
|
}
|
|
}
|
|
/*
|
|
Currently, the HB encodes operations as JSON. For the moment I want to keep it
|
|
that way. Maybe we support encoding in the HB as XML in the future, but for now I don't want
|
|
too much overhead. Y is very likely to get changed a lot in the future
|
|
|
|
Because we don't want to encode JSON as string (with character escaping, wich makes it pretty much unreadable)
|
|
we encode the JSON as XML.
|
|
|
|
When the HB support encoding as XML, the format should look pretty much like this.
|
|
|
|
does not support primitive values as array elements
|
|
expects an ltx (less than xml) object
|
|
*/
|
|
parseMessageFromXml (m/* :any */) {
|
|
function parseArray (node) {
|
|
for (var n of node.children) {
|
|
if (n.getAttribute('isArray') === 'true') {
|
|
return parseArray(n)
|
|
} else {
|
|
return parseObject(n)
|
|
}
|
|
}
|
|
}
|
|
function parseObject (node/* :any */) {
|
|
var json = {}
|
|
for (var attrName in node.attrs) {
|
|
var value = node.attrs[attrName]
|
|
var int = parseInt(value, 10)
|
|
if (isNaN(int) || ('' + int) !== value) {
|
|
json[attrName] = value
|
|
} else {
|
|
json[attrName] = int
|
|
}
|
|
}
|
|
for (var n/* :any */ in node.children) {
|
|
var name = n.name
|
|
if (n.getAttribute('isArray') === 'true') {
|
|
json[name] = parseArray(n)
|
|
} else {
|
|
json[name] = parseObject(n)
|
|
}
|
|
}
|
|
return json
|
|
}
|
|
parseObject(m)
|
|
}
|
|
/*
|
|
encode message in xml
|
|
we use string because Strophe only accepts an "xml-string"..
|
|
So {a:4,b:{c:5}} will look like
|
|
<y a="4">
|
|
<b c="5"></b>
|
|
</y>
|
|
m - ltx element
|
|
json - Object
|
|
*/
|
|
encodeMessageToXml (msg, obj) {
|
|
// attributes is optional
|
|
function encodeObject (m, json) {
|
|
for (var name in json) {
|
|
var value = json[name]
|
|
if (name == null) {
|
|
// nop
|
|
} else if (value.constructor === Object) {
|
|
encodeObject(m.c(name), value)
|
|
} else if (value.constructor === Array) {
|
|
encodeArray(m.c(name), value)
|
|
} else {
|
|
m.setAttribute(name, value)
|
|
}
|
|
}
|
|
}
|
|
function encodeArray (m, array) {
|
|
m.setAttribute('isArray', 'true')
|
|
for (var e of array) {
|
|
if (e.constructor === Object) {
|
|
encodeObject(m.c('array-element'), e)
|
|
} else {
|
|
encodeArray(m.c('array-element'), e)
|
|
}
|
|
}
|
|
}
|
|
if (obj.constructor === Object) {
|
|
encodeObject(msg.c('y', { xmlns: 'http://y.ninja/connector-stanza' }), obj)
|
|
} else if (obj.constructor === Array) {
|
|
encodeArray(msg.c('y', { xmlns: 'http://y.ninja/connector-stanza' }), obj)
|
|
} else {
|
|
throw new Error("I can't encode this json!")
|
|
}
|
|
}
|
|
}
|
|
Y.AbstractConnector = AbstractConnector
|
|
}
|