298 lines
9.7 KiB
JavaScript
298 lines
9.7 KiB
JavaScript
import BinaryEncoder from './Util/Binary/Encoder.js'
|
|
import BinaryDecoder from './Util/Binary/Decoder.js'
|
|
|
|
import { sendSyncStep1, readSyncStep1 } from './MessageHandler/syncStep1.js'
|
|
import { readSyncStep2 } from './MessageHandler/syncStep2.js'
|
|
import { integrateRemoteStructs } from './MessageHandler/integrateRemoteStructs.js'
|
|
|
|
import debug from 'debug'
|
|
|
|
// TODO: rename Connector
|
|
|
|
export default class AbstractConnector {
|
|
constructor (y, opts) {
|
|
this.y = y
|
|
this.opts = opts
|
|
if (opts.role == null || opts.role === 'master') {
|
|
this.role = 'master'
|
|
} else if (opts.role === 'slave') {
|
|
this.role = 'slave'
|
|
} else {
|
|
throw new Error("Role must be either 'master' or 'slave'!")
|
|
}
|
|
this.log = debug('y:connector')
|
|
this.logMessage = debug('y:connector-message')
|
|
this._forwardAppliedStructs = opts.forwardAppliedOperations || false // TODO: rename
|
|
this.role = opts.role
|
|
this.connections = new Map()
|
|
this.isSynced = false
|
|
this.userEventListeners = []
|
|
this.whenSyncedListeners = []
|
|
this.currentSyncTarget = null
|
|
this.debug = opts.debug === true
|
|
this.broadcastBuffer = new BinaryEncoder()
|
|
this.broadcastBufferSize = 0
|
|
this.protocolVersion = 11
|
|
this.authInfo = opts.auth || null
|
|
this.checkAuth = opts.checkAuth || function () { return Promise.resolve('write') } // default is everyone has write access
|
|
if (opts.maxBufferLength == null) {
|
|
this.maxBufferLength = -1
|
|
} else {
|
|
this.maxBufferLength = opts.maxBufferLength
|
|
}
|
|
}
|
|
|
|
reconnect () {
|
|
this.log('reconnecting..')
|
|
}
|
|
|
|
disconnect () {
|
|
this.log('discronnecting..')
|
|
this.connections = new Map()
|
|
this.isSynced = false
|
|
this.currentSyncTarget = null
|
|
this.whenSyncedListeners = []
|
|
return Promise.resolve()
|
|
}
|
|
|
|
onUserEvent (f) {
|
|
this.userEventListeners.push(f)
|
|
}
|
|
|
|
removeUserEventListener (f) {
|
|
this.userEventListeners = this.userEventListeners.filter(g => f !== g)
|
|
}
|
|
|
|
userLeft (user) {
|
|
if (this.connections.has(user)) {
|
|
this.log('%s: User left %s', this.y.userID, user)
|
|
this.connections.delete(user)
|
|
// check if isSynced event can be sent now
|
|
this._setSyncedWith(null)
|
|
for (var f of this.userEventListeners) {
|
|
f({
|
|
action: 'userLeft',
|
|
user: user
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
userJoined (user, role, auth) {
|
|
if (role == null) {
|
|
throw new Error('You must specify the role of the joined user!')
|
|
}
|
|
if (this.connections.has(user)) {
|
|
throw new Error('This user already joined!')
|
|
}
|
|
this.log('%s: User joined %s', this.y.userID, user)
|
|
this.connections.set(user, {
|
|
uid: user,
|
|
isSynced: false,
|
|
role: role,
|
|
processAfterAuth: [],
|
|
processAfterSync: [],
|
|
auth: auth || null,
|
|
receivedSyncStep2: false
|
|
})
|
|
let defer = {}
|
|
defer.promise = new Promise(function (resolve) { defer.resolve = resolve })
|
|
this.connections.get(user).syncStep2 = defer
|
|
for (var f of this.userEventListeners) {
|
|
f({
|
|
action: 'userJoined',
|
|
user: user,
|
|
role: role
|
|
})
|
|
}
|
|
this._syncWithUser(user)
|
|
}
|
|
|
|
// Execute a function _when_ we are connected.
|
|
// If not connected, wait until connected
|
|
whenSynced (f) {
|
|
if (this.isSynced) {
|
|
f()
|
|
} else {
|
|
this.whenSyncedListeners.push(f)
|
|
}
|
|
}
|
|
|
|
_syncWithUser (userID) {
|
|
if (this.role === 'slave') {
|
|
return // "The current sync has not finished or this is controlled by a master!"
|
|
}
|
|
sendSyncStep1(this, userID)
|
|
}
|
|
|
|
_fireIsSyncedListeners () {
|
|
if (!this.isSynced) {
|
|
this.isSynced = true
|
|
// It is safer to remove this!
|
|
// call whensynced listeners
|
|
for (var f of this.whenSyncedListeners) {
|
|
f()
|
|
}
|
|
this.whenSyncedListeners = []
|
|
this.y._setContentReady()
|
|
this.y.emit('synced')
|
|
}
|
|
}
|
|
|
|
send (uid, buffer) {
|
|
const y = this.y
|
|
if (!(buffer instanceof ArrayBuffer || buffer instanceof Uint8Array)) {
|
|
throw new Error('Expected Message to be an ArrayBuffer or Uint8Array - don\'t use this method to send custom messages')
|
|
}
|
|
this.log('User%s to User%s: Send \'%y\'', y.userID, uid, buffer)
|
|
this.logMessage('User%s to User%s: Send %Y', y.userID, uid, [y, buffer])
|
|
}
|
|
|
|
broadcast (buffer) {
|
|
const y = this.y
|
|
if (!(buffer instanceof ArrayBuffer || buffer instanceof Uint8Array)) {
|
|
throw new Error('Expected Message to be an ArrayBuffer or Uint8Array - don\'t use this method to send custom messages')
|
|
}
|
|
this.log('User%s: Broadcast \'%y\'', y.userID, buffer)
|
|
this.logMessage('User%s: Broadcast: %Y', y.userID, [y, buffer])
|
|
}
|
|
|
|
/*
|
|
Buffer operations, and broadcast them when ready.
|
|
*/
|
|
broadcastStruct (struct) {
|
|
const firstContent = this.broadcastBuffer.length === 0
|
|
if (firstContent) {
|
|
this.broadcastBuffer.writeVarString(this.y.room)
|
|
this.broadcastBuffer.writeVarString('update')
|
|
this.broadcastBufferSize = 0
|
|
this.broadcastBufferSizePos = this.broadcastBuffer.pos
|
|
this.broadcastBuffer.writeUint32(0)
|
|
}
|
|
this.broadcastBufferSize++
|
|
struct._toBinary(this.broadcastBuffer)
|
|
if (this.maxBufferLength > 0 && this.broadcastBuffer.length > this.maxBufferLength) {
|
|
// it is necessary to send the buffer now
|
|
// cache the buffer and check if server is responsive
|
|
const buffer = this.broadcastBuffer
|
|
buffer.setUint32(this.broadcastBufferSizePos, this.broadcastBufferSize)
|
|
this.broadcastBuffer = new BinaryEncoder()
|
|
this.whenRemoteResponsive().then(() => {
|
|
this.broadcast(buffer.createBuffer())
|
|
})
|
|
} else if (firstContent) {
|
|
// send the buffer when all transactions are finished
|
|
// (or buffer exceeds maxBufferLength)
|
|
setTimeout(() => {
|
|
if (this.broadcastBuffer.length > 0) {
|
|
const buffer = this.broadcastBuffer
|
|
buffer.setUint32(this.broadcastBufferSizePos, this.broadcastBufferSize)
|
|
this.broadcast(buffer.createBuffer())
|
|
this.broadcastBuffer = new BinaryEncoder()
|
|
}
|
|
}, 0)
|
|
}
|
|
}
|
|
|
|
/*
|
|
* Somehow check the responsiveness of the remote clients/server
|
|
* Default behavior:
|
|
* Wait 100ms before broadcasting the next batch of operations
|
|
*
|
|
* Only used when maxBufferLength is set
|
|
*
|
|
*/
|
|
whenRemoteResponsive () {
|
|
return new Promise(function (resolve) {
|
|
setTimeout(resolve, 100)
|
|
})
|
|
}
|
|
|
|
/*
|
|
You received a raw message, and you know that it is intended for Yjs. Then call this function.
|
|
*/
|
|
receiveMessage (sender, buffer, skipAuth) {
|
|
const y = this.y
|
|
const userID = y.userID
|
|
skipAuth = skipAuth || false
|
|
if (!(buffer instanceof ArrayBuffer || buffer instanceof Uint8Array)) {
|
|
return Promise.reject(new Error('Expected Message to be an ArrayBuffer or Uint8Array!'))
|
|
}
|
|
if (sender === userID) {
|
|
return Promise.resolve()
|
|
}
|
|
let decoder = new BinaryDecoder(buffer)
|
|
let encoder = new BinaryEncoder()
|
|
let roomname = decoder.readVarString() // read room name
|
|
encoder.writeVarString(roomname)
|
|
let messageType = decoder.readVarString()
|
|
let senderConn = this.connections.get(sender)
|
|
this.log('User%s from User%s: Receive \'%s\'', userID, sender, messageType)
|
|
this.logMessage('User%s from User%s: Receive %Y', userID, sender, [y, buffer])
|
|
if (senderConn == null && !skipAuth) {
|
|
throw new Error('Received message from unknown peer!')
|
|
}
|
|
if (messageType === 'sync step 1' || messageType === 'sync step 2') {
|
|
let auth = decoder.readVarUint()
|
|
if (senderConn.auth == null) {
|
|
senderConn.processAfterAuth.push([messageType, senderConn, decoder, encoder, sender])
|
|
// check auth
|
|
return this.checkAuth(auth, y, sender).then(authPermissions => {
|
|
if (senderConn.auth == null) {
|
|
senderConn.auth = authPermissions
|
|
y.emit('userAuthenticated', {
|
|
user: senderConn.uid,
|
|
auth: authPermissions
|
|
})
|
|
}
|
|
let messages = senderConn.processAfterAuth
|
|
senderConn.processAfterAuth = []
|
|
|
|
messages.forEach(m =>
|
|
this.computeMessage(m[0], m[1], m[2], m[3], m[4])
|
|
)
|
|
})
|
|
}
|
|
}
|
|
if ((skipAuth || senderConn.auth != null) && (messageType !== 'update' || senderConn.isSynced)) {
|
|
this.computeMessage(messageType, senderConn, decoder, encoder, sender, skipAuth)
|
|
} else {
|
|
senderConn.processAfterSync.push([messageType, senderConn, decoder, encoder, sender, false])
|
|
}
|
|
}
|
|
|
|
computeMessage (messageType, senderConn, decoder, encoder, sender, skipAuth) {
|
|
if (messageType === 'sync step 1' && (senderConn.auth === 'write' || senderConn.auth === 'read')) {
|
|
// cannot wait for sync step 1 to finish, because we may wait for sync step 2 in sync step 1 (->lock)
|
|
readSyncStep1(decoder, encoder, this.y, senderConn, sender)
|
|
} else {
|
|
const y = this.y
|
|
y.transact(function () {
|
|
if (messageType === 'sync step 2' && senderConn.auth === 'write') {
|
|
readSyncStep2(decoder, encoder, y, senderConn, sender)
|
|
} else if (messageType === 'update' && (skipAuth || senderConn.auth === 'write')) {
|
|
integrateRemoteStructs(y, decoder)
|
|
} else {
|
|
throw new Error('Unable to receive message')
|
|
}
|
|
}, true)
|
|
}
|
|
}
|
|
|
|
_setSyncedWith (user) {
|
|
if (user != null) {
|
|
const userConn = this.connections.get(user)
|
|
userConn.isSynced = true
|
|
const messages = userConn.processAfterSync
|
|
userConn.processAfterSync = []
|
|
messages.forEach(m => {
|
|
this.computeMessage(m[0], m[1], m[2], m[3], m[4])
|
|
})
|
|
}
|
|
const conns = Array.from(this.connections.values())
|
|
if (conns.length > 0 && conns.every(u => u.isSynced)) {
|
|
this._fireIsSyncedListeners()
|
|
}
|
|
}
|
|
}
|