Home Reference Source

src/Connector.js

import BinaryEncoder from './Util/Binary/Encoder.js'
import BinaryDecoder from './Util/Binary/Decoder.js'

import { sendSyncStep1, readSyncStep1 } from './MessageHandler/syncStep1.js'
import { readSyncStep2 } from './MessageHandler/syncStep2.js'
import { integrateRemoteStructs } from './MessageHandler/integrateRemoteStructs.js'

import debug from 'debug'

// TODO: rename Connector

export default class AbstractConnector {
  constructor (y, opts) {
    this.y = y
    this.opts = opts
    if (opts.role == null || opts.role === 'master') {
      this.role = 'master'
    } else if (opts.role === 'slave') {
      this.role = 'slave'
    } else {
      throw new Error("Role must be either 'master' or 'slave'!")
    }
    this.log = debug('y:connector')
    this.logMessage = debug('y:connector-message')
    this._forwardAppliedStructs = opts.forwardAppliedOperations || false // TODO: rename
    this.role = opts.role
    this.connections = new Map()
    this.isSynced = false
    this.userEventListeners = []
    this.whenSyncedListeners = []
    this.currentSyncTarget = null
    this.debug = opts.debug === true
    this.broadcastBuffer = new BinaryEncoder()
    this.broadcastBufferSize = 0
    this.protocolVersion = 11
    this.authInfo = opts.auth || null
    this.checkAuth = opts.checkAuth || function () { return Promise.resolve('write') } // default is everyone has write access
    if (opts.maxBufferLength == null) {
      this.maxBufferLength = -1
    } else {
      this.maxBufferLength = opts.maxBufferLength
    }
  }

  reconnect () {
    this.log('reconnecting..')
  }

  disconnect () {
    this.log('discronnecting..')
    this.connections = new Map()
    this.isSynced = false
    this.currentSyncTarget = null
    this.whenSyncedListeners = []
    return Promise.resolve()
  }

  onUserEvent (f) {
    this.userEventListeners.push(f)
  }

  removeUserEventListener (f) {
    this.userEventListeners = this.userEventListeners.filter(g => f !== g)
  }

  userLeft (user) {
    if (this.connections.has(user)) {
      this.log('%s: User left %s', this.y.userID, user)
      this.connections.delete(user)
      // check if isSynced event can be sent now
      this._setSyncedWith(null)
      for (var f of this.userEventListeners) {
        f({
          action: 'userLeft',
          user: user
        })
      }
    }
  }

  userJoined (user, role, auth) {
    if (role == null) {
      throw new Error('You must specify the role of the joined user!')
    }
    if (this.connections.has(user)) {
      throw new Error('This user already joined!')
    }
    this.log('%s: User joined %s', this.y.userID, user)
    this.connections.set(user, {
      uid: user,
      isSynced: false,
      role: role,
      processAfterAuth: [],
      processAfterSync: [],
      auth: auth || null,
      receivedSyncStep2: false
    })
    let defer = {}
    defer.promise = new Promise(function (resolve) { defer.resolve = resolve })
    this.connections.get(user).syncStep2 = defer
    for (var f of this.userEventListeners) {
      f({
        action: 'userJoined',
        user: user,
        role: role
      })
    }
    this._syncWithUser(user)
  }

  // Execute a function _when_ we are connected.
  // If not connected, wait until connected
  whenSynced (f) {
    if (this.isSynced) {
      f()
    } else {
      this.whenSyncedListeners.push(f)
    }
  }

  _syncWithUser (userID) {
    if (this.role === 'slave') {
      return // "The current sync has not finished or this is controlled by a master!"
    }
    sendSyncStep1(this, userID)
  }

  _fireIsSyncedListeners () {
    if (!this.isSynced) {
      this.isSynced = true
      // It is safer to remove this!
      // call whensynced listeners
      for (var f of this.whenSyncedListeners) {
        f()
      }
      this.whenSyncedListeners = []
      this.y._setContentReady()
      this.y.emit('synced')
    }
  }

  send (uid, buffer) {
    const y = this.y
    if (!(buffer instanceof ArrayBuffer || buffer instanceof Uint8Array)) {
      throw new Error('Expected Message to be an ArrayBuffer or Uint8Array - don\'t use this method to send custom messages')
    }
    this.log('User%s to User%s: Send \'%y\'', y.userID, uid, buffer)
    this.logMessage('User%s to User%s: Send %Y', y.userID, uid, [y, buffer])
  }

  broadcast (buffer) {
    const y = this.y
    if (!(buffer instanceof ArrayBuffer || buffer instanceof Uint8Array)) {
      throw new Error('Expected Message to be an ArrayBuffer or Uint8Array - don\'t use this method to send custom messages')
    }
    this.log('User%s: Broadcast \'%y\'', y.userID, buffer)
    this.logMessage('User%s: Broadcast: %Y', y.userID, [y, buffer])
  }

  /*
    Buffer operations, and broadcast them when ready.
  */
  broadcastStruct (struct) {
    const firstContent = this.broadcastBuffer.length === 0
    if (firstContent) {
      this.broadcastBuffer.writeVarString(this.y.room)
      this.broadcastBuffer.writeVarString('update')
      this.broadcastBufferSize = 0
      this.broadcastBufferSizePos = this.broadcastBuffer.pos
      this.broadcastBuffer.writeUint32(0)
    }
    this.broadcastBufferSize++
    struct._toBinary(this.broadcastBuffer)
    if (this.maxBufferLength > 0 && this.broadcastBuffer.length > this.maxBufferLength) {
      // it is necessary to send the buffer now
      // cache the buffer and check if server is responsive
      const buffer = this.broadcastBuffer
      buffer.setUint32(this.broadcastBufferSizePos, this.broadcastBufferSize)
      this.broadcastBuffer = new BinaryEncoder()
      this.whenRemoteResponsive().then(() => {
        this.broadcast(buffer.createBuffer())
      })
    } else if (firstContent) {
      // send the buffer when all transactions are finished
      // (or buffer exceeds maxBufferLength)
      setTimeout(() => {
        if (this.broadcastBuffer.length > 0) {
          const buffer = this.broadcastBuffer
          buffer.setUint32(this.broadcastBufferSizePos, this.broadcastBufferSize)
          this.broadcast(buffer.createBuffer())
          this.broadcastBuffer = new BinaryEncoder()
        }
      }, 0)
    }
  }

  /*
   * Somehow check the responsiveness of the remote clients/server
   * Default behavior:
   *   Wait 100ms before broadcasting the next batch of operations
   *
   * Only used when maxBufferLength is set
   *
   */
  whenRemoteResponsive () {
    return new Promise(function (resolve) {
      setTimeout(resolve, 100)
    })
  }

  /*
    You received a raw message, and you know that it is intended for Yjs. Then call this function.
  */
  receiveMessage (sender, buffer, skipAuth) {
    const y = this.y
    const userID = y.userID
    skipAuth = skipAuth || false
    if (!(buffer instanceof ArrayBuffer || buffer instanceof Uint8Array)) {
      return Promise.reject(new Error('Expected Message to be an ArrayBuffer or Uint8Array!'))
    }
    if (sender === userID) {
      return Promise.resolve()
    }
    let decoder = new BinaryDecoder(buffer)
    let encoder = new BinaryEncoder()
    let roomname = decoder.readVarString() // read room name
    encoder.writeVarString(roomname)
    let messageType = decoder.readVarString()
    let senderConn = this.connections.get(sender)
    this.log('User%s from User%s: Receive \'%s\'', userID, sender, messageType)
    this.logMessage('User%s from User%s: Receive %Y', userID, sender, [y, buffer])
    if (senderConn == null && !skipAuth) {
      throw new Error('Received message from unknown peer!')
    }
    if (messageType === 'sync step 1' || messageType === 'sync step 2') {
      let auth = decoder.readVarUint()
      if (senderConn.auth == null) {
        senderConn.processAfterAuth.push([messageType, senderConn, decoder, encoder, sender])
        // check auth
        return this.checkAuth(auth, y, sender).then(authPermissions => {
          if (senderConn.auth == null) {
            senderConn.auth = authPermissions
            y.emit('userAuthenticated', {
              user: senderConn.uid,
              auth: authPermissions
            })
          }
          let messages = senderConn.processAfterAuth
          senderConn.processAfterAuth = []

          messages.forEach(m =>
            this.computeMessage(m[0], m[1], m[2], m[3], m[4])
          )
        })
      }
    }
    if ((skipAuth || senderConn.auth != null) && (messageType !== 'update' || senderConn.isSynced)) {
      this.computeMessage(messageType, senderConn, decoder, encoder, sender, skipAuth)
    } else {
      senderConn.processAfterSync.push([messageType, senderConn, decoder, encoder, sender, false])
    }
  }

  computeMessage (messageType, senderConn, decoder, encoder, sender, skipAuth) {
    if (messageType === 'sync step 1' && (senderConn.auth === 'write' || senderConn.auth === 'read')) {
      // cannot wait for sync step 1 to finish, because we may wait for sync step 2 in sync step 1 (->lock)
      readSyncStep1(decoder, encoder, this.y, senderConn, sender)
    } else {
      const y = this.y
      y.transact(function () {
        if (messageType === 'sync step 2' && senderConn.auth === 'write') {
          readSyncStep2(decoder, encoder, y, senderConn, sender)
        } else if (messageType === 'update' && (skipAuth || senderConn.auth === 'write')) {
          integrateRemoteStructs(y, decoder)
        } else {
          throw new Error('Unable to receive message')
        }
      }, true)
    }
  }

  _setSyncedWith (user) {
    if (user != null) {
      const userConn = this.connections.get(user)
      userConn.isSynced = true
      const messages = userConn.processAfterSync
      userConn.processAfterSync = []
      messages.forEach(m => {
        this.computeMessage(m[0], m[1], m[2], m[3], m[4])
      })
    }
    const conns = Array.from(this.connections.values())
    if (conns.length > 0 && conns.every(u => u.isSynced)) {
      this._fireIsSyncedListeners()
    }
  }
}