start refactoring
This commit is contained in:
@@ -1,5 +1,5 @@
|
||||
|
||||
import { createMutualExclude } from '../Util/mutualExclude.js'
|
||||
import { createMutualExclude } from '../../lib/mutualExclude.js'
|
||||
|
||||
/**
|
||||
* Abstract class for bindings.
|
||||
|
||||
@@ -4,7 +4,7 @@ import {
|
||||
iterateUntilUndeleted,
|
||||
removeAssociation,
|
||||
insertNodeHelper } from './util.js'
|
||||
import diff from '../../Util/simpleDiff.js'
|
||||
import diff from '../../../lib/simpleDiff.js'
|
||||
import YXmlFragment from '../../Types/YXml/YXmlFragment.js'
|
||||
|
||||
/**
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
|
||||
import Binding from '../Binding.js'
|
||||
import simpleDiff from '../../Util/simpleDiff.js'
|
||||
import simpleDiff from '../../../lib/simpleDiff.js'
|
||||
import { getRelativePosition, fromRelativePosition } from '../../Util/relativePosition.js'
|
||||
|
||||
function typeObserver () {
|
||||
|
||||
298
src/Connector.js
298
src/Connector.js
@@ -1,298 +0,0 @@
|
||||
import BinaryEncoder from './Util/Binary/Encoder.js'
|
||||
import BinaryDecoder from './Util/Binary/Decoder.js'
|
||||
|
||||
import { sendSyncStep1, readSyncStep1 } from './MessageHandler/syncStep1.js'
|
||||
import { readSyncStep2 } from './MessageHandler/syncStep2.js'
|
||||
import { integrateRemoteStructs } from './MessageHandler/integrateRemoteStructs.js'
|
||||
|
||||
// TODO: reintroduce or remove
|
||||
// import debug from 'debug'
|
||||
|
||||
// TODO: rename Connector
|
||||
|
||||
export default class AbstractConnector {
|
||||
constructor (y, opts) {
|
||||
this.y = y
|
||||
this.opts = opts
|
||||
if (opts.role == null || opts.role === 'master') {
|
||||
this.role = 'master'
|
||||
} else if (opts.role === 'slave') {
|
||||
this.role = 'slave'
|
||||
} else {
|
||||
throw new Error("Role must be either 'master' or 'slave'!")
|
||||
}
|
||||
this.log = debug('y:connector')
|
||||
this.logMessage = debug('y:connector-message')
|
||||
this._forwardAppliedStructs = opts.forwardAppliedOperations || false // TODO: rename
|
||||
this.role = opts.role
|
||||
this.connections = new Map()
|
||||
this.isSynced = false
|
||||
this.userEventListeners = []
|
||||
this.whenSyncedListeners = []
|
||||
this.currentSyncTarget = null
|
||||
this.debug = opts.debug === true
|
||||
this.broadcastBuffer = new BinaryEncoder()
|
||||
this.broadcastBufferSize = 0
|
||||
this.protocolVersion = 11
|
||||
this.authInfo = opts.auth || null
|
||||
this.checkAuth = opts.checkAuth || function () { return Promise.resolve('write') } // default is everyone has write access
|
||||
if (opts.maxBufferLength == null) {
|
||||
this.maxBufferLength = -1
|
||||
} else {
|
||||
this.maxBufferLength = opts.maxBufferLength
|
||||
}
|
||||
}
|
||||
|
||||
reconnect () {
|
||||
this.log('reconnecting..')
|
||||
}
|
||||
|
||||
disconnect () {
|
||||
this.log('discronnecting..')
|
||||
this.connections = new Map()
|
||||
this.isSynced = false
|
||||
this.currentSyncTarget = null
|
||||
this.whenSyncedListeners = []
|
||||
return Promise.resolve()
|
||||
}
|
||||
|
||||
onUserEvent (f) {
|
||||
this.userEventListeners.push(f)
|
||||
}
|
||||
|
||||
removeUserEventListener (f) {
|
||||
this.userEventListeners = this.userEventListeners.filter(g => f !== g)
|
||||
}
|
||||
|
||||
userLeft (user) {
|
||||
if (this.connections.has(user)) {
|
||||
this.log('%s: User left %s', this.y.userID, user)
|
||||
this.connections.delete(user)
|
||||
// check if isSynced event can be sent now
|
||||
this._setSyncedWith(null)
|
||||
for (var f of this.userEventListeners) {
|
||||
f({
|
||||
action: 'userLeft',
|
||||
user: user
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
userJoined (user, role, auth) {
|
||||
if (role == null) {
|
||||
throw new Error('You must specify the role of the joined user!')
|
||||
}
|
||||
if (this.connections.has(user)) {
|
||||
throw new Error('This user already joined!')
|
||||
}
|
||||
this.log('%s: User joined %s', this.y.userID, user)
|
||||
this.connections.set(user, {
|
||||
uid: user,
|
||||
isSynced: false,
|
||||
role: role,
|
||||
processAfterAuth: [],
|
||||
processAfterSync: [],
|
||||
auth: auth || null,
|
||||
receivedSyncStep2: false
|
||||
})
|
||||
let defer = {}
|
||||
defer.promise = new Promise(function (resolve) { defer.resolve = resolve })
|
||||
this.connections.get(user).syncStep2 = defer
|
||||
for (var f of this.userEventListeners) {
|
||||
f({
|
||||
action: 'userJoined',
|
||||
user: user,
|
||||
role: role
|
||||
})
|
||||
}
|
||||
this._syncWithUser(user)
|
||||
}
|
||||
|
||||
// Execute a function _when_ we are connected.
|
||||
// If not connected, wait until connected
|
||||
whenSynced (f) {
|
||||
if (this.isSynced) {
|
||||
f()
|
||||
} else {
|
||||
this.whenSyncedListeners.push(f)
|
||||
}
|
||||
}
|
||||
|
||||
_syncWithUser (userID) {
|
||||
if (this.role === 'slave') {
|
||||
return // "The current sync has not finished or this is controlled by a master!"
|
||||
}
|
||||
sendSyncStep1(this, userID)
|
||||
}
|
||||
|
||||
_fireIsSyncedListeners () {
|
||||
if (!this.isSynced) {
|
||||
this.isSynced = true
|
||||
// It is safer to remove this!
|
||||
// call whensynced listeners
|
||||
for (var f of this.whenSyncedListeners) {
|
||||
f()
|
||||
}
|
||||
this.whenSyncedListeners = []
|
||||
this.y._setContentReady()
|
||||
this.y.emit('synced')
|
||||
}
|
||||
}
|
||||
|
||||
send (uid, buffer) {
|
||||
const y = this.y
|
||||
if (!(buffer instanceof ArrayBuffer || buffer instanceof Uint8Array)) {
|
||||
throw new Error('Expected Message to be an ArrayBuffer or Uint8Array - don\'t use this method to send custom messages')
|
||||
}
|
||||
this.log('User%s to User%s: Send \'%y\'', y.userID, uid, buffer)
|
||||
this.logMessage('User%s to User%s: Send %Y', y.userID, uid, [y, buffer])
|
||||
}
|
||||
|
||||
broadcast (buffer) {
|
||||
const y = this.y
|
||||
if (!(buffer instanceof ArrayBuffer || buffer instanceof Uint8Array)) {
|
||||
throw new Error('Expected Message to be an ArrayBuffer or Uint8Array - don\'t use this method to send custom messages')
|
||||
}
|
||||
this.log('User%s: Broadcast \'%y\'', y.userID, buffer)
|
||||
this.logMessage('User%s: Broadcast: %Y', y.userID, [y, buffer])
|
||||
}
|
||||
|
||||
/*
|
||||
Buffer operations, and broadcast them when ready.
|
||||
*/
|
||||
broadcastStruct (struct) {
|
||||
const firstContent = this.broadcastBuffer.length === 0
|
||||
if (firstContent) {
|
||||
this.broadcastBuffer.writeVarString(this.y.room)
|
||||
this.broadcastBuffer.writeVarString('update')
|
||||
this.broadcastBufferSize = 0
|
||||
this.broadcastBufferSizePos = this.broadcastBuffer.pos
|
||||
this.broadcastBuffer.writeUint32(0)
|
||||
}
|
||||
this.broadcastBufferSize++
|
||||
struct._toBinary(this.broadcastBuffer)
|
||||
if (this.maxBufferLength > 0 && this.broadcastBuffer.length > this.maxBufferLength) {
|
||||
// it is necessary to send the buffer now
|
||||
// cache the buffer and check if server is responsive
|
||||
const buffer = this.broadcastBuffer
|
||||
buffer.setUint32(this.broadcastBufferSizePos, this.broadcastBufferSize)
|
||||
this.broadcastBuffer = new BinaryEncoder()
|
||||
this.whenRemoteResponsive().then(() => {
|
||||
this.broadcast(buffer.createBuffer())
|
||||
})
|
||||
} else if (firstContent) {
|
||||
// send the buffer when all transactions are finished
|
||||
// (or buffer exceeds maxBufferLength)
|
||||
setTimeout(() => {
|
||||
if (this.broadcastBuffer.length > 0) {
|
||||
const buffer = this.broadcastBuffer
|
||||
buffer.setUint32(this.broadcastBufferSizePos, this.broadcastBufferSize)
|
||||
this.broadcast(buffer.createBuffer())
|
||||
this.broadcastBuffer = new BinaryEncoder()
|
||||
}
|
||||
}, 0)
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Somehow check the responsiveness of the remote clients/server
|
||||
* Default behavior:
|
||||
* Wait 100ms before broadcasting the next batch of operations
|
||||
*
|
||||
* Only used when maxBufferLength is set
|
||||
*
|
||||
*/
|
||||
whenRemoteResponsive () {
|
||||
return new Promise(function (resolve) {
|
||||
setTimeout(resolve, 100)
|
||||
})
|
||||
}
|
||||
|
||||
/*
|
||||
You received a raw message, and you know that it is intended for Yjs. Then call this function.
|
||||
*/
|
||||
receiveMessage (sender, buffer, skipAuth) {
|
||||
const y = this.y
|
||||
const userID = y.userID
|
||||
skipAuth = skipAuth || false
|
||||
if (!(buffer instanceof ArrayBuffer || buffer instanceof Uint8Array)) {
|
||||
return Promise.reject(new Error('Expected Message to be an ArrayBuffer or Uint8Array!'))
|
||||
}
|
||||
if (sender === userID) {
|
||||
return Promise.resolve()
|
||||
}
|
||||
let decoder = new BinaryDecoder(buffer)
|
||||
let encoder = new BinaryEncoder()
|
||||
let roomname = decoder.readVarString() // read room name
|
||||
encoder.writeVarString(roomname)
|
||||
let messageType = decoder.readVarString()
|
||||
let senderConn = this.connections.get(sender)
|
||||
this.log('User%s from User%s: Receive \'%s\'', userID, sender, messageType)
|
||||
this.logMessage('User%s from User%s: Receive %Y', userID, sender, [y, buffer])
|
||||
if (senderConn == null && !skipAuth) {
|
||||
throw new Error('Received message from unknown peer!')
|
||||
}
|
||||
if (messageType === 'sync step 1' || messageType === 'sync step 2') {
|
||||
let auth = decoder.readVarUint()
|
||||
if (senderConn.auth == null) {
|
||||
senderConn.processAfterAuth.push([messageType, senderConn, decoder, encoder, sender])
|
||||
// check auth
|
||||
return this.checkAuth(auth, y, sender).then(authPermissions => {
|
||||
if (senderConn.auth == null) {
|
||||
senderConn.auth = authPermissions
|
||||
y.emit('userAuthenticated', {
|
||||
user: senderConn.uid,
|
||||
auth: authPermissions
|
||||
})
|
||||
}
|
||||
let messages = senderConn.processAfterAuth
|
||||
senderConn.processAfterAuth = []
|
||||
|
||||
messages.forEach(m =>
|
||||
this.computeMessage(m[0], m[1], m[2], m[3], m[4])
|
||||
)
|
||||
})
|
||||
}
|
||||
}
|
||||
if ((skipAuth || senderConn.auth != null) && (messageType !== 'update' || senderConn.isSynced)) {
|
||||
this.computeMessage(messageType, senderConn, decoder, encoder, sender, skipAuth)
|
||||
} else {
|
||||
senderConn.processAfterSync.push([messageType, senderConn, decoder, encoder, sender, false])
|
||||
}
|
||||
}
|
||||
|
||||
computeMessage (messageType, senderConn, decoder, encoder, sender, skipAuth) {
|
||||
if (messageType === 'sync step 1' && (senderConn.auth === 'write' || senderConn.auth === 'read')) {
|
||||
// cannot wait for sync step 1 to finish, because we may wait for sync step 2 in sync step 1 (->lock)
|
||||
readSyncStep1(decoder, encoder, this.y, senderConn, sender)
|
||||
} else {
|
||||
const y = this.y
|
||||
y.transact(function () {
|
||||
if (messageType === 'sync step 2' && senderConn.auth === 'write') {
|
||||
readSyncStep2(decoder, encoder, y, senderConn, sender)
|
||||
} else if (messageType === 'update' && (skipAuth || senderConn.auth === 'write')) {
|
||||
integrateRemoteStructs(y, decoder)
|
||||
} else {
|
||||
throw new Error('Unable to receive message')
|
||||
}
|
||||
}, true)
|
||||
}
|
||||
}
|
||||
|
||||
_setSyncedWith (user) {
|
||||
if (user != null) {
|
||||
const userConn = this.connections.get(user)
|
||||
userConn.isSynced = true
|
||||
const messages = userConn.processAfterSync
|
||||
userConn.processAfterSync = []
|
||||
messages.forEach(m => {
|
||||
this.computeMessage(m[0], m[1], m[2], m[3], m[4])
|
||||
})
|
||||
}
|
||||
const conns = Array.from(this.connections.values())
|
||||
if (conns.length > 0 && conns.every(u => u.isSynced)) {
|
||||
this._fireIsSyncedListeners()
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,140 +0,0 @@
|
||||
import BinaryEncoder from '../../Util/Binary/Encoder.js'
|
||||
/* global WebSocket */
|
||||
import NamedEventHandler from '../../Util/NamedEventHandler.js'
|
||||
import decodeMessage, { messageSS, messageSubscribe, messageStructs } from './decodeMessage.js'
|
||||
import { createMutualExclude } from '../../Util/mutualExclude.js'
|
||||
import { messageCheckUpdateCounter } from './decodeMessage.js'
|
||||
|
||||
export const STATE_DISCONNECTED = 0
|
||||
export const STATE_CONNECTED = 1
|
||||
|
||||
export default class WebsocketsConnector extends NamedEventHandler {
|
||||
constructor (url = 'ws://localhost:1234') {
|
||||
super()
|
||||
this.url = url
|
||||
this._state = STATE_DISCONNECTED
|
||||
this._socket = null
|
||||
this._rooms = new Map()
|
||||
this._connectToServer = true
|
||||
this._reconnectTimeout = 300
|
||||
this._mutualExclude = createMutualExclude()
|
||||
this._persistence = null
|
||||
this.connect()
|
||||
}
|
||||
|
||||
getRoom (roomName) {
|
||||
return this._rooms.get(roomName) || { y: null, roomName, localUpdateCounter: 1 }
|
||||
}
|
||||
|
||||
syncPersistence (persistence) {
|
||||
this._persistence = persistence
|
||||
if (this._state === STATE_CONNECTED) {
|
||||
persistence.getAllDocuments().then(docs => {
|
||||
const encoder = new BinaryEncoder()
|
||||
docs.forEach(doc => {
|
||||
messageCheckUpdateCounter(doc.roomName, encoder, doc.remoteUpdateCounter)
|
||||
});
|
||||
this.send(encoder)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
connectY (roomName, y) {
|
||||
let room = this._rooms.get(roomName)
|
||||
if (room !== undefined) {
|
||||
throw new Error('Room is already taken! There can be only one Yjs instance per roomName!')
|
||||
}
|
||||
this._rooms.set(roomName, {
|
||||
roomName,
|
||||
y,
|
||||
localUpdateCounter: 1
|
||||
})
|
||||
y.on('afterTransaction', (y, transaction) => {
|
||||
this._mutualExclude(() => {
|
||||
if (transaction.encodedStructsLen > 0) {
|
||||
const encoder = new BinaryEncoder()
|
||||
const room = this._rooms.get(roomName)
|
||||
messageStructs(roomName, y, encoder, transaction.encodedStructs, ++room.localUpdateCounter)
|
||||
this.send(encoder)
|
||||
}
|
||||
})
|
||||
})
|
||||
if (this._state === STATE_CONNECTED) {
|
||||
const encoder = new BinaryEncoder()
|
||||
messageSS(roomName, y, encoder)
|
||||
messageSubscribe(roomName, y, encoder)
|
||||
this.send(encoder)
|
||||
}
|
||||
}
|
||||
|
||||
_setState (state) {
|
||||
this._state = state
|
||||
this.emit('stateChanged', {
|
||||
state: this.state
|
||||
})
|
||||
}
|
||||
|
||||
get state () {
|
||||
return this._state === STATE_DISCONNECTED ? 'disconnected' : 'connected'
|
||||
}
|
||||
|
||||
_onOpen () {
|
||||
this._setState(STATE_CONNECTED)
|
||||
if (this._persistence === null) {
|
||||
const encoder = new BinaryEncoder()
|
||||
for (const [roomName, room] of this._rooms) {
|
||||
const y = room.y
|
||||
messageSS(roomName, y, encoder)
|
||||
messageSubscribe(roomName, y, encoder)
|
||||
}
|
||||
this.send(encoder)
|
||||
} else {
|
||||
this.syncPersistence(this._persistence)
|
||||
}
|
||||
}
|
||||
|
||||
send (encoder) {
|
||||
if (encoder.length > 0 && this._socket.readyState === WebSocket.OPEN) {
|
||||
this._socket.send(encoder.createBuffer())
|
||||
}
|
||||
}
|
||||
|
||||
_onClose () {
|
||||
this._setState(STATE_DISCONNECTED)
|
||||
this._socket = null
|
||||
if (this._connectToServer) {
|
||||
setTimeout(() => {
|
||||
if (this._connectToServer) {
|
||||
this.connect()
|
||||
}
|
||||
}, this._reconnectTimeout)
|
||||
this.connect()
|
||||
}
|
||||
}
|
||||
|
||||
_onMessage (message) {
|
||||
if (message.data.byteLength > 0) {
|
||||
const reply = decodeMessage(this, message.data, null, false, this._persistence)
|
||||
this.send(reply)
|
||||
}
|
||||
}
|
||||
|
||||
disconnect (code = 1000, reason = 'Client manually disconnected') {
|
||||
const socket = this._socket
|
||||
this._connectToServer = false
|
||||
socket.close(code, reason)
|
||||
}
|
||||
|
||||
connect () {
|
||||
if (this._socket === null) {
|
||||
const socket = new WebSocket(this.url)
|
||||
socket.binaryType = 'arraybuffer'
|
||||
this._socket = socket
|
||||
this._connectToServer = true
|
||||
// Connection opened
|
||||
socket.addEventListener('open', this._onOpen.bind(this))
|
||||
socket.addEventListener('close', this._onClose.bind(this))
|
||||
socket.addEventListener('message', this._onMessage.bind(this))
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,159 +0,0 @@
|
||||
import BinaryDecoder from '../../Util/Binary/Decoder.js'
|
||||
import BinaryEncoder from '../../Util/Binary/Encoder.js'
|
||||
import { readStateSet, writeStateSet } from '../../MessageHandler/stateSet.js'
|
||||
import { writeStructs } from '../../MessageHandler/syncStep1.js'
|
||||
import { writeDeleteSet, readDeleteSet } from '../../MessageHandler/deleteSet.js'
|
||||
import { integrateRemoteStructs } from '../../MessageHandler/integrateRemoteStructs.js'
|
||||
|
||||
const CONTENT_GET_SS = 4
|
||||
export function messageGetSS (roomName, y, encoder) {
|
||||
encoder.writeVarString(roomName)
|
||||
encoder.writeVarUint(CONTENT_GET_SS)
|
||||
}
|
||||
|
||||
const CONTENT_SUBSCRIBE = 3
|
||||
export function messageSubscribe (roomName, y, encoder) {
|
||||
encoder.writeVarString(roomName)
|
||||
encoder.writeVarUint(CONTENT_SUBSCRIBE)
|
||||
}
|
||||
|
||||
const CONTENT_SS = 0
|
||||
export function messageSS (roomName, y, encoder) {
|
||||
encoder.writeVarString(roomName)
|
||||
encoder.writeVarUint(CONTENT_SS)
|
||||
writeStateSet(y, encoder)
|
||||
}
|
||||
|
||||
const CONTENT_STRUCTS_DSS = 2
|
||||
export function messageStructsDSS (roomName, y, encoder, ss, updateCounter) {
|
||||
encoder.writeVarString(roomName)
|
||||
encoder.writeVarUint(CONTENT_STRUCTS_DSS)
|
||||
encoder.writeVarUint(updateCounter)
|
||||
const structsDS = new BinaryEncoder()
|
||||
writeStructs(y, structsDS, ss)
|
||||
writeDeleteSet(y, structsDS)
|
||||
encoder.writeVarUint(structsDS.length)
|
||||
encoder.writeBinaryEncoder(structsDS)
|
||||
}
|
||||
|
||||
const CONTENT_STRUCTS = 5
|
||||
export function messageStructs (roomName, y, encoder, structsBinaryEncoder, updateCounter) {
|
||||
encoder.writeVarString(roomName)
|
||||
encoder.writeVarUint(CONTENT_STRUCTS)
|
||||
encoder.writeVarUint(updateCounter)
|
||||
encoder.writeVarUint(structsBinaryEncoder.length)
|
||||
encoder.writeBinaryEncoder(structsBinaryEncoder)
|
||||
}
|
||||
|
||||
const CONTENT_CHECK_COUNTER = 6
|
||||
export function messageCheckUpdateCounter (roomName, encoder, updateCounter = 0) {
|
||||
encoder.writeVarString(roomName)
|
||||
encoder.writeVarUint(CONTENT_CHECK_COUNTER)
|
||||
encoder.writeVarUint(updateCounter)
|
||||
}
|
||||
|
||||
/**
|
||||
* Decodes a client-message.
|
||||
*
|
||||
* A client-message consists of multiple message-elements that are concatenated without delimiter.
|
||||
* Each has the following structure:
|
||||
* - roomName
|
||||
* - content_type
|
||||
* - content (additional info that is encoded based on the value of content_type)
|
||||
*
|
||||
* The message is encoded until no more message-elements are available.
|
||||
*
|
||||
* @param {*} connector The connector that handles the connections
|
||||
* @param {*} message The binary encoded message
|
||||
* @param {*} ws The connection object
|
||||
*/
|
||||
export default function decodeMessage (connector, message, ws, isServer = false, persistence) {
|
||||
const decoder = new BinaryDecoder(message)
|
||||
const encoder = new BinaryEncoder()
|
||||
while (decoder.hasContent()) {
|
||||
const roomName = decoder.readVarString()
|
||||
const contentType = decoder.readVarUint()
|
||||
const room = connector.getRoom(roomName)
|
||||
const y = room.y
|
||||
switch (contentType) {
|
||||
case CONTENT_CHECK_COUNTER:
|
||||
const updateCounter = decoder.readVarUint()
|
||||
if (room.localUpdateCounter !== updateCounter) {
|
||||
messageGetSS(roomName, y, encoder)
|
||||
}
|
||||
connector.subscribe(roomName, ws)
|
||||
break
|
||||
case CONTENT_STRUCTS:
|
||||
console.log(`${roomName}: received update`)
|
||||
connector._mutualExclude(() => {
|
||||
const remoteUpdateCounter = decoder.readVarUint()
|
||||
persistence.setRemoteUpdateCounter(roomName, remoteUpdateCounter)
|
||||
const messageLen = decoder.readVarUint()
|
||||
if (y === null) {
|
||||
persistence._persistStructs(roomName, decoder.readArrayBuffer(messageLen))
|
||||
} else {
|
||||
y.transact(() => {
|
||||
integrateRemoteStructs(y, decoder)
|
||||
}, true)
|
||||
}
|
||||
})
|
||||
break
|
||||
case CONTENT_GET_SS:
|
||||
if (y !== null) {
|
||||
messageSS(roomName, y, encoder)
|
||||
} else {
|
||||
persistence._createYInstance(roomName).then(y => {
|
||||
const encoder = new BinaryEncoder()
|
||||
messageSS(roomName, y, encoder)
|
||||
connector.send(encoder, ws)
|
||||
})
|
||||
}
|
||||
break
|
||||
case CONTENT_SUBSCRIBE:
|
||||
connector.subscribe(roomName, ws)
|
||||
break
|
||||
case CONTENT_SS:
|
||||
// received state set
|
||||
// reply with missing content
|
||||
const ss = readStateSet(decoder)
|
||||
const sendStructsDSS = () => {
|
||||
if (y !== null) { // TODO: how to sync local content?
|
||||
const encoder = new BinaryEncoder()
|
||||
messageStructsDSS(roomName, y, encoder, ss, room.localUpdateCounter) // room.localUpdateHandler in case it changes
|
||||
if (isServer) {
|
||||
messageSS(roomName, y, encoder)
|
||||
}
|
||||
connector.send(encoder, ws)
|
||||
}
|
||||
}
|
||||
if (room.persistenceLoaded !== undefined) {
|
||||
room.persistenceLoaded.then(sendStructsDSS)
|
||||
} else {
|
||||
sendStructsDSS()
|
||||
}
|
||||
break
|
||||
case CONTENT_STRUCTS_DSS:
|
||||
console.log(`${roomName}: synced`)
|
||||
connector._mutualExclude(() => {
|
||||
const remoteUpdateCounter = decoder.readVarUint()
|
||||
persistence.setRemoteUpdateCounter(roomName, remoteUpdateCounter)
|
||||
const messageLen = decoder.readVarUint()
|
||||
if (y === null) {
|
||||
persistence._persistStructsDS(roomName, decoder.readArrayBuffer(messageLen))
|
||||
} else {
|
||||
y.transact(() => {
|
||||
integrateRemoteStructs(y, decoder)
|
||||
readDeleteSet(y, decoder)
|
||||
}, true)
|
||||
}
|
||||
})
|
||||
break
|
||||
default:
|
||||
console.error('Unexpected content type!')
|
||||
if (ws !== null) {
|
||||
ws.close() // TODO: specify reason
|
||||
}
|
||||
}
|
||||
}
|
||||
return encoder
|
||||
}
|
||||
@@ -1,124 +0,0 @@
|
||||
import Y from '../../Y.js'
|
||||
import uws from 'uws'
|
||||
import BinaryEncoder from '../../Util/Binary/Encoder.js'
|
||||
import decodeMessage, { messageStructs } from './decodeMessage.js'
|
||||
import FilePersistence from '../../Persistences/FilePersistence.js'
|
||||
|
||||
const WebsocketsServer = uws.Server
|
||||
const persistence = new FilePersistence('.yjsPersisted')
|
||||
/**
|
||||
* Maps from room-name to ..
|
||||
* {
|
||||
* connections, // Set of ws-clients that listen to the room
|
||||
* y // Yjs instance that handles the room
|
||||
* }
|
||||
*/
|
||||
const rooms = new Map()
|
||||
/**
|
||||
* Maps from ws-connection to Set<roomName> - the set of connected roomNames
|
||||
*/
|
||||
const connections = new Map()
|
||||
const port = process.env.PORT || 1234
|
||||
const wss = new WebsocketsServer({
|
||||
port,
|
||||
perMessageDeflate: {}
|
||||
})
|
||||
|
||||
/**
|
||||
* Set of room names that are scheduled to be sweeped (destroyed because they don't have a connection anymore)
|
||||
*/
|
||||
const scheduledSweeps = new Set()
|
||||
/* TODO: enable sweeping
|
||||
setInterval(function sweepRoomes () {
|
||||
scheduledSweeps.forEach(roomName => {
|
||||
const room = rooms.get(roomName)
|
||||
if (room !== undefined) {
|
||||
if (room.connections.size === 0) {
|
||||
persistence.saveState(roomName, room.y).then(() => {
|
||||
if (room.connections.size === 0) {
|
||||
room.y.destroy()
|
||||
rooms.delete(roomName)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
})
|
||||
scheduledSweeps.clear()
|
||||
}, 5000) */
|
||||
|
||||
const wsConnector = {
|
||||
send: (encoder, ws) => {
|
||||
const message = encoder.createBuffer()
|
||||
ws.send(message, null, null, true)
|
||||
},
|
||||
_mutualExclude: f => { f() },
|
||||
subscribe: function subscribe (roomName, ws) {
|
||||
let roomNames = connections.get(ws)
|
||||
if (roomNames === undefined) {
|
||||
roomNames = new Set()
|
||||
connections.set(ws, roomNames)
|
||||
}
|
||||
roomNames.add(roomName)
|
||||
const room = this.getRoom(roomName)
|
||||
room.connections.add(ws)
|
||||
},
|
||||
getRoom: function getRoom (roomName) {
|
||||
let room = rooms.get(roomName)
|
||||
if (room === undefined) {
|
||||
const y = new Y(roomName, null, null, { gc: true })
|
||||
const persistenceLoaded = persistence.readState(roomName, y)
|
||||
room = {
|
||||
name: roomName,
|
||||
connections: new Set(),
|
||||
y,
|
||||
persistenceLoaded,
|
||||
localUpdateCounter: 1
|
||||
}
|
||||
y.on('afterTransaction', (y, transaction) => {
|
||||
if (transaction.encodedStructsLen > 0) {
|
||||
// save to persistence
|
||||
persistence.saveUpdate(roomName, y, transaction.encodedStructs)
|
||||
// forward update to clients
|
||||
persistence._mutex(() => { // do not broadcast if persistence.readState is called
|
||||
const encoder = new BinaryEncoder()
|
||||
messageStructs(roomName, y, encoder, transaction.encodedStructs, ++room.localUpdateCounter)
|
||||
const message = encoder.createBuffer()
|
||||
// when changed, broakcast update to all connections
|
||||
room.connections.forEach(conn => {
|
||||
conn.send(message, null, null, true)
|
||||
})
|
||||
})
|
||||
}
|
||||
})
|
||||
rooms.set(roomName, room)
|
||||
}
|
||||
return room
|
||||
}
|
||||
}
|
||||
|
||||
wss.on('connection', (ws) => {
|
||||
ws.on('message', function onWSMessage (message) {
|
||||
if (message.byteLength > 0) {
|
||||
const reply = decodeMessage(wsConnector, message, ws, true, persistence)
|
||||
if (reply.length > 0) {
|
||||
ws.send(reply.createBuffer(), null, null, true)
|
||||
}
|
||||
}
|
||||
})
|
||||
ws.on('close', function onWSClose () {
|
||||
const roomNames = connections.get(ws)
|
||||
if (roomNames !== undefined) {
|
||||
roomNames.forEach(roomName => {
|
||||
const room = rooms.get(roomName)
|
||||
if (room !== undefined) {
|
||||
const connections = room.connections
|
||||
connections.delete(ws)
|
||||
if (connections.size === 0) {
|
||||
scheduledSweeps.add(roomName)
|
||||
}
|
||||
}
|
||||
})
|
||||
connections.delete(ws)
|
||||
}
|
||||
})
|
||||
})
|
||||
@@ -1,122 +0,0 @@
|
||||
import BinaryEncoder from './Util/Binary/Encoder.js'
|
||||
import BinaryDecoder from './Util/Binary/Decoder.js'
|
||||
import { toBinary, fromBinary } from './MessageHandler/binaryEncode.js'
|
||||
import { integrateRemoteStructs } from './MessageHandler/integrateRemoteStructs.js'
|
||||
import { createMutualExclude } from './Util/mutualExclude.js'
|
||||
|
||||
function getFreshCnf () {
|
||||
let buffer = new BinaryEncoder()
|
||||
buffer.writeUint32(0)
|
||||
return {
|
||||
len: 0,
|
||||
buffer
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Abstract persistence class.
|
||||
*/
|
||||
export default class AbstractPersistence {
|
||||
constructor (opts) {
|
||||
this.opts = opts
|
||||
this.ys = new Map()
|
||||
}
|
||||
|
||||
_init (y) {
|
||||
let cnf = this.ys.get(y)
|
||||
if (cnf === undefined) {
|
||||
cnf = getFreshCnf()
|
||||
cnf.mutualExclude = createMutualExclude()
|
||||
this.ys.set(y, cnf)
|
||||
return this.init(y).then(() => {
|
||||
y.on('afterTransaction', (y, transaction) => {
|
||||
let cnf = this.ys.get(y)
|
||||
if (cnf.len > 0) {
|
||||
cnf.buffer.setUint32(0, cnf.len)
|
||||
this.saveUpdate(y, cnf.buffer.createBuffer(), transaction)
|
||||
let _cnf = getFreshCnf()
|
||||
for (let key in _cnf) {
|
||||
cnf[key] = _cnf[key]
|
||||
}
|
||||
}
|
||||
})
|
||||
return this.retrieve(y)
|
||||
}).then(function () {
|
||||
return Promise.resolve(cnf)
|
||||
})
|
||||
} else {
|
||||
return Promise.resolve(cnf)
|
||||
}
|
||||
}
|
||||
deinit (y) {
|
||||
this.ys.delete(y)
|
||||
y.persistence = null
|
||||
}
|
||||
|
||||
destroy () {
|
||||
this.ys = null
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove all persisted data that belongs to a room.
|
||||
* Automatically destroys all Yjs all Yjs instances that persist to
|
||||
* the room. If `destroyYjsInstances = false` the persistence functionality
|
||||
* will be removed from the Yjs instances.
|
||||
*
|
||||
* ** Must be overwritten! **
|
||||
*/
|
||||
removePersistedData (room, destroyYjsInstances = true) {
|
||||
this.ys.forEach((cnf, y) => {
|
||||
if (y.room === room) {
|
||||
if (destroyYjsInstances) {
|
||||
y.destroy()
|
||||
} else {
|
||||
this.deinit(y)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/* overwrite */
|
||||
saveUpdate (buffer) {
|
||||
}
|
||||
|
||||
/**
|
||||
* Save struct to update buffer.
|
||||
* saveUpdate is called when transaction ends
|
||||
*/
|
||||
saveStruct (y, struct) {
|
||||
let cnf = this.ys.get(y)
|
||||
if (cnf !== undefined) {
|
||||
cnf.mutualExclude(function () {
|
||||
struct._toBinary(cnf.buffer)
|
||||
cnf.len++
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/* overwrite */
|
||||
retrieve (y, model, updates) {
|
||||
let cnf = this.ys.get(y)
|
||||
if (cnf !== undefined) {
|
||||
cnf.mutualExclude(function () {
|
||||
y.transact(function () {
|
||||
if (model != null) {
|
||||
fromBinary(y, new BinaryDecoder(new Uint8Array(model)))
|
||||
}
|
||||
if (updates != null) {
|
||||
for (let i = 0; i < updates.length; i++) {
|
||||
integrateRemoteStructs(y, new BinaryDecoder(new Uint8Array(updates[i])))
|
||||
}
|
||||
}
|
||||
})
|
||||
y.emit('persistenceReady')
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/* overwrite */
|
||||
persist (y) {
|
||||
return toBinary(y).createBuffer()
|
||||
}
|
||||
}
|
||||
@@ -1,5 +1,5 @@
|
||||
|
||||
import Tree from '../Util/Tree.js'
|
||||
import Tree from '../../lib/Tree.js'
|
||||
import ID from '../Util/ID/ID.js'
|
||||
|
||||
class DSNode {
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import Tree from '../Util/Tree.js'
|
||||
import Tree from '../../lib/Tree.js'
|
||||
import RootID from '../Util/ID/RootID.js'
|
||||
import { getStruct } from '../Util/structReferences.js'
|
||||
import { logID } from '../MessageHandler/messageToString.js'
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import { getStructReference } from '../Util/structReferences.js'
|
||||
import ID from '../Util/ID/ID.js'
|
||||
import { logID } from '../MessageHandler/messageToString.js'
|
||||
import { writeStructToTransaction } from '../Transaction.js'
|
||||
import { writeStructToTransaction } from '../Util/Transaction.js'
|
||||
|
||||
/**
|
||||
* @private
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import { getStructReference } from '../Util/structReferences.js'
|
||||
import { RootFakeUserID } from '../Util/ID/RootID.js'
|
||||
import ID from '../Util/ID/ID.js'
|
||||
import { writeStructToTransaction } from '../Transaction.js'
|
||||
import { writeStructToTransaction } from '../Util/Transaction.js'
|
||||
|
||||
// TODO should have the same base class as Item
|
||||
export default class GC {
|
||||
|
||||
@@ -2,7 +2,7 @@ import { getStructReference } from '../Util/structReferences.js'
|
||||
import ID from '../Util/ID/ID.js'
|
||||
import { default as RootID, RootFakeUserID } from '../Util/ID/RootID.js'
|
||||
import Delete from './Delete.js'
|
||||
import { transactionTypeChanged, writeStructToTransaction } from '../Transaction.js'
|
||||
import { transactionTypeChanged, writeStructToTransaction } from '../Util/Transaction.js'
|
||||
import GC from './GC.js'
|
||||
|
||||
/**
|
||||
|
||||
@@ -1,187 +0,0 @@
|
||||
import ID from '../ID/ID.js'
|
||||
import { default as RootID, RootFakeUserID } from '../ID/RootID.js'
|
||||
|
||||
/**
|
||||
* A BinaryDecoder handles the decoding of an ArrayBuffer.
|
||||
*/
|
||||
export default class BinaryDecoder {
|
||||
/**
|
||||
* @param {Uint8Array|Buffer} buffer The binary data that this instance
|
||||
* decodes.
|
||||
*/
|
||||
constructor (buffer) {
|
||||
if (buffer instanceof ArrayBuffer) {
|
||||
this.uint8arr = new Uint8Array(buffer)
|
||||
} else if (
|
||||
buffer instanceof Uint8Array ||
|
||||
(
|
||||
typeof Buffer !== 'undefined' && buffer instanceof Buffer
|
||||
)
|
||||
) {
|
||||
this.uint8arr = buffer
|
||||
} else {
|
||||
throw new Error('Expected an ArrayBuffer or Uint8Array!')
|
||||
}
|
||||
this.pos = 0
|
||||
}
|
||||
|
||||
hasContent () {
|
||||
return this.pos !== this.uint8arr.length
|
||||
}
|
||||
|
||||
/**
|
||||
* Clone this decoder instance.
|
||||
* Optionally set a new position parameter.
|
||||
*/
|
||||
clone (newPos = this.pos) {
|
||||
let decoder = new BinaryDecoder(this.uint8arr)
|
||||
decoder.pos = newPos
|
||||
return decoder
|
||||
}
|
||||
|
||||
/**
|
||||
* Number of bytes.
|
||||
*/
|
||||
get length () {
|
||||
return this.uint8arr.length
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Read `len` bytes as an ArrayBuffer.
|
||||
*/
|
||||
readArrayBuffer (len) {
|
||||
const arrayBuffer = new Uint8Array(len)
|
||||
const view = new Uint8Array(this.uint8arr.buffer, this.pos, len)
|
||||
arrayBuffer.set(view)
|
||||
this.pos += len
|
||||
return arrayBuffer.buffer
|
||||
}
|
||||
|
||||
/**
|
||||
* Skip one byte, jump to the next position.
|
||||
*/
|
||||
skip8 () {
|
||||
this.pos++
|
||||
}
|
||||
|
||||
/**
|
||||
* Read one byte as unsigned integer.
|
||||
*/
|
||||
readUint8 () {
|
||||
return this.uint8arr[this.pos++]
|
||||
}
|
||||
|
||||
/**
|
||||
* Read 4 bytes as unsigned integer.
|
||||
*
|
||||
* @return {number} An unsigned integer.
|
||||
*/
|
||||
readUint32 () {
|
||||
let uint =
|
||||
this.uint8arr[this.pos] +
|
||||
(this.uint8arr[this.pos + 1] << 8) +
|
||||
(this.uint8arr[this.pos + 2] << 16) +
|
||||
(this.uint8arr[this.pos + 3] << 24)
|
||||
this.pos += 4
|
||||
return uint
|
||||
}
|
||||
|
||||
/**
|
||||
* Look ahead without incrementing position.
|
||||
* to the next byte and read it as unsigned integer.
|
||||
*
|
||||
* @return {number} An unsigned integer.
|
||||
*/
|
||||
peekUint8 () {
|
||||
return this.uint8arr[this.pos]
|
||||
}
|
||||
|
||||
/**
|
||||
* Read unsigned integer (32bit) with variable length.
|
||||
* 1/8th of the storage is used as encoding overhead.
|
||||
* * numbers < 2^7 is stored in one byte.
|
||||
* * numbers < 2^14 is stored in two bytes.
|
||||
*
|
||||
* @return {number} An unsigned integer.
|
||||
*/
|
||||
readVarUint () {
|
||||
let num = 0
|
||||
let len = 0
|
||||
while (true) {
|
||||
let r = this.uint8arr[this.pos++]
|
||||
num = num | ((r & 0b1111111) << len)
|
||||
len += 7
|
||||
if (r < 1 << 7) {
|
||||
return num >>> 0 // return unsigned number!
|
||||
}
|
||||
if (len > 35) {
|
||||
throw new Error('Integer out of range!')
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Read string of variable length
|
||||
* * varUint is used to store the length of the string
|
||||
*
|
||||
* Transforming utf8 to a string is pretty expensive. The code performs 10x better
|
||||
* when String.fromCodePoint is fed with all characters as arguments.
|
||||
* But most environments have a maximum number of arguments per functions.
|
||||
* For effiency reasons we apply a maximum of 10000 characters at once.
|
||||
*
|
||||
* @return {String} The read String.
|
||||
*/
|
||||
readVarString () {
|
||||
let remainingLen = this.readVarUint()
|
||||
let encodedString = ''
|
||||
let i = 0
|
||||
while (remainingLen > 0) {
|
||||
const nextLen = Math.min(remainingLen, 10000)
|
||||
const bytes = new Array(nextLen)
|
||||
for (let i = 0; i < nextLen; i++) {
|
||||
bytes[i] = this.uint8arr[this.pos++]
|
||||
}
|
||||
encodedString += String.fromCodePoint.apply(null, bytes)
|
||||
remainingLen -= nextLen
|
||||
}
|
||||
/*
|
||||
//let bytes = new Array(len)
|
||||
for (let i = 0; i < len; i++) {
|
||||
//bytes[i] = this.uint8arr[this.pos++]
|
||||
encodedString += String.fromCodePoint(this.uint8arr[this.pos++])
|
||||
// encodedString += String(this.uint8arr[this.pos++])
|
||||
}
|
||||
*/
|
||||
//let encodedString = String.fromCodePoint.apply(null, bytes)
|
||||
return decodeURIComponent(escape(encodedString))
|
||||
}
|
||||
|
||||
/**
|
||||
* Look ahead and read varString without incrementing position
|
||||
*/
|
||||
peekVarString () {
|
||||
let pos = this.pos
|
||||
let s = this.readVarString()
|
||||
this.pos = pos
|
||||
return s
|
||||
}
|
||||
|
||||
/**
|
||||
* Read ID.
|
||||
* * If first varUint read is 0xFFFFFF a RootID is returned.
|
||||
* * Otherwise an ID is returned.
|
||||
*
|
||||
* @return ID
|
||||
*/
|
||||
readID () {
|
||||
let user = this.readVarUint()
|
||||
if (user === RootFakeUserID) {
|
||||
// read property name and type id
|
||||
const rid = new RootID(this.readVarString(), null)
|
||||
rid.type = this.readVarUint()
|
||||
return rid
|
||||
}
|
||||
return new ID(user, this.readVarUint())
|
||||
}
|
||||
}
|
||||
@@ -1,210 +0,0 @@
|
||||
import { RootFakeUserID } from '../ID/RootID.js'
|
||||
|
||||
const bits7 = 0b1111111
|
||||
const bits8 = 0b11111111
|
||||
|
||||
/**
|
||||
* A BinaryEncoder handles the encoding to an ArrayBuffer.
|
||||
*/
|
||||
export default class BinaryEncoder {
|
||||
constructor () {
|
||||
// TODO: implement chained Uint8Array buffers instead of Array buffer
|
||||
// TODO: Rewrite all methods as functions!
|
||||
this._currentPos = 0
|
||||
this._currentBuffer = new Uint8Array(1000)
|
||||
this._data = []
|
||||
}
|
||||
|
||||
/**
|
||||
* The current length of the encoded data.
|
||||
*/
|
||||
get length () {
|
||||
let len = 0
|
||||
for (let i = 0; i < this._data.length; i++) {
|
||||
len += this._data[i].length
|
||||
}
|
||||
len += this._currentPos
|
||||
return len
|
||||
}
|
||||
|
||||
/**
|
||||
* The current write pointer (the same as {@link length}).
|
||||
*/
|
||||
get pos () {
|
||||
return this.length
|
||||
}
|
||||
|
||||
/**
|
||||
* Transform to ArrayBuffer.
|
||||
*
|
||||
* @return {ArrayBuffer} The created ArrayBuffer.
|
||||
*/
|
||||
createBuffer () {
|
||||
const len = this.length
|
||||
const uint8array = new Uint8Array(len)
|
||||
let curPos = 0
|
||||
for (let i = 0; i < this._data.length; i++) {
|
||||
let d = this._data[i]
|
||||
uint8array.set(d, curPos)
|
||||
curPos += d.length
|
||||
}
|
||||
uint8array.set(new Uint8Array(this._currentBuffer.buffer, 0, this._currentPos), curPos)
|
||||
return uint8array.buffer
|
||||
}
|
||||
|
||||
/**
|
||||
* Write one byte to the encoder.
|
||||
*
|
||||
* @param {number} num The byte that is to be encoded.
|
||||
*/
|
||||
write (num) {
|
||||
if (this._currentPos === this._currentBuffer.length) {
|
||||
this._data.push(this._currentBuffer)
|
||||
this._currentBuffer = new Uint8Array(this._currentBuffer.length * 2)
|
||||
this._currentPos = 0
|
||||
}
|
||||
this._currentBuffer[this._currentPos++] = num
|
||||
}
|
||||
|
||||
set (pos, num) {
|
||||
let buffer = null
|
||||
// iterate all buffers and adjust position
|
||||
for (let i = 0; i < this._data.length && buffer === null; i++) {
|
||||
const b = this._data[i]
|
||||
if (pos < b.length) {
|
||||
buffer = b // found buffer
|
||||
} else {
|
||||
pos -= b.length
|
||||
}
|
||||
}
|
||||
if (buffer === null) {
|
||||
// use current buffer
|
||||
buffer = this._currentBuffer
|
||||
}
|
||||
buffer[pos] = num
|
||||
}
|
||||
|
||||
/**
|
||||
* Write one byte as an unsigned integer.
|
||||
*
|
||||
* @param {number} num The number that is to be encoded.
|
||||
*/
|
||||
writeUint8 (num) {
|
||||
this.write(num & bits8)
|
||||
}
|
||||
|
||||
/**
|
||||
* Write one byte as an unsigned Integer at a specific location.
|
||||
*
|
||||
* @param {number} pos The location where the data will be written.
|
||||
* @param {number} num The number that is to be encoded.
|
||||
*/
|
||||
setUint8 (pos, num) {
|
||||
this.set(pos, num & bits8)
|
||||
}
|
||||
|
||||
/**
|
||||
* Write two bytes as an unsigned integer.
|
||||
*
|
||||
* @param {number} num The number that is to be encoded.
|
||||
*/
|
||||
writeUint16 (num) {
|
||||
this.write(num & bits8)
|
||||
this.write((num >>> 8) & bits8)
|
||||
}
|
||||
/**
|
||||
* Write two bytes as an unsigned integer at a specific location.
|
||||
*
|
||||
* @param {number} pos The location where the data will be written.
|
||||
* @param {number} num The number that is to be encoded.
|
||||
*/
|
||||
setUint16 (pos, num) {
|
||||
this.set(pos, num & bits8)
|
||||
this.set(pos + 1, (num >>> 8) & bits8)
|
||||
}
|
||||
|
||||
/**
|
||||
* Write two bytes as an unsigned integer
|
||||
*
|
||||
* @param {number} num The number that is to be encoded.
|
||||
*/
|
||||
writeUint32 (num) {
|
||||
for (let i = 0; i < 4; i++) {
|
||||
this.write(num & bits8)
|
||||
num >>>= 8
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Write two bytes as an unsigned integer at a specific location.
|
||||
*
|
||||
* @param {number} pos The location where the data will be written.
|
||||
* @param {number} num The number that is to be encoded.
|
||||
*/
|
||||
setUint32 (pos, num) {
|
||||
for (let i = 0; i < 4; i++) {
|
||||
this.set(pos + i, num & bits8)
|
||||
num >>>= 8
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Write a variable length unsigned integer.
|
||||
*
|
||||
* @param {number} num The number that is to be encoded.
|
||||
*/
|
||||
writeVarUint (num) {
|
||||
while (num >= 0b10000000) {
|
||||
this.write(0b10000000 | (bits7 & num))
|
||||
num >>>= 7
|
||||
}
|
||||
this.write(bits7 & num)
|
||||
}
|
||||
|
||||
/**
|
||||
* Write a variable length string.
|
||||
*
|
||||
* @param {String} str The string that is to be encoded.
|
||||
*/
|
||||
writeVarString (str) {
|
||||
const encodedString = unescape(encodeURIComponent(str))
|
||||
const len = encodedString.length
|
||||
this.writeVarUint(len)
|
||||
for (let i = 0; i < len; i++) {
|
||||
this.write(encodedString.codePointAt(i))
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Write the content of another binary encoder.
|
||||
*
|
||||
* @param encoder The BinaryEncoder to be written.
|
||||
*/
|
||||
writeBinaryEncoder (encoder) {
|
||||
this.writeArrayBuffer(encoder.createBuffer())
|
||||
}
|
||||
|
||||
writeArrayBuffer (arrayBuffer) {
|
||||
const prevBufferLen = this._currentBuffer.length
|
||||
this._data.push(new Uint8Array(this._currentBuffer.buffer, 0, this._currentPos))
|
||||
this._data.push(new Uint8Array(arrayBuffer))
|
||||
this._currentBuffer = new Uint8Array(prevBufferLen)
|
||||
this._currentPos = 0
|
||||
}
|
||||
|
||||
/**
|
||||
* Write an ID at the current position.
|
||||
*
|
||||
* @param {ID} id The ID that is to be written.
|
||||
*/
|
||||
writeID (id) {
|
||||
const user = id.user
|
||||
this.writeVarUint(user)
|
||||
if (user !== RootFakeUserID) {
|
||||
this.writeVarUint(id.clock)
|
||||
} else {
|
||||
this.writeVarString(id.name)
|
||||
this.writeVarUint(id.type)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,113 +0,0 @@
|
||||
|
||||
/**
|
||||
* Handles named events.
|
||||
*/
|
||||
export default class NamedEventHandler {
|
||||
constructor () {
|
||||
this._eventListener = new Map()
|
||||
this._stateListener = new Map()
|
||||
}
|
||||
|
||||
/**
|
||||
* @private
|
||||
* Returns all listeners that listen to a specified name.
|
||||
*
|
||||
* @param {String} name The query event name.
|
||||
*/
|
||||
_getListener (name) {
|
||||
let listeners = this._eventListener.get(name)
|
||||
if (listeners === undefined) {
|
||||
listeners = {
|
||||
once: new Set(),
|
||||
on: new Set()
|
||||
}
|
||||
this._eventListener.set(name, listeners)
|
||||
}
|
||||
return listeners
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds a named event listener. The listener is removed after it has been
|
||||
* called once.
|
||||
*
|
||||
* @param {String} name The event name to listen to.
|
||||
* @param {Function} f The function that is executed when the event is fired.
|
||||
*/
|
||||
once (name, f) {
|
||||
let listeners = this._getListener(name)
|
||||
listeners.once.add(f)
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds a named event listener.
|
||||
*
|
||||
* @param {String} name The event name to listen to.
|
||||
* @param {Function} f The function that is executed when the event is fired.
|
||||
*/
|
||||
on (name, f) {
|
||||
let listeners = this._getListener(name)
|
||||
listeners.on.add(f)
|
||||
}
|
||||
|
||||
/**
|
||||
* @private
|
||||
* Init the saved state for an event name.
|
||||
*/
|
||||
_initStateListener (name) {
|
||||
let state = this._stateListener.get(name)
|
||||
if (state === undefined) {
|
||||
state = {}
|
||||
state.promise = new Promise(function (resolve) {
|
||||
state.resolve = resolve
|
||||
})
|
||||
this._stateListener.set(name, state)
|
||||
}
|
||||
return state
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a Promise that is resolved when the event name is called.
|
||||
* The Promise is immediately resolved when the event name was called in the
|
||||
* past.
|
||||
*/
|
||||
when (name) {
|
||||
return this._initStateListener(name).promise
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove an event listener that was registered with either
|
||||
* {@link EventHandler#on} or {@link EventHandler#once}.
|
||||
*/
|
||||
off (name, f) {
|
||||
if (name == null || f == null) {
|
||||
throw new Error('You must specify event name and function!')
|
||||
}
|
||||
const listener = this._eventListener.get(name)
|
||||
if (listener !== undefined) {
|
||||
listener.on.delete(f)
|
||||
listener.once.delete(f)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Emit a named event. All registered event listeners that listen to the
|
||||
* specified name will receive the event.
|
||||
*
|
||||
* @param {String} name The event name.
|
||||
* @param {Array} args The arguments that are applied to the event listener.
|
||||
*/
|
||||
emit (name, ...args) {
|
||||
this._initStateListener(name).resolve()
|
||||
const listener = this._eventListener.get(name)
|
||||
if (listener !== undefined) {
|
||||
listener.on.forEach(f => f.apply(null, args))
|
||||
listener.once.forEach(f => f.apply(null, args))
|
||||
listener.once = new Set()
|
||||
} else if (name === 'error') {
|
||||
console.error(args[0])
|
||||
}
|
||||
}
|
||||
destroy () {
|
||||
this._eventListener = null
|
||||
}
|
||||
}
|
||||
@@ -1,4 +1,4 @@
|
||||
import BinaryEncoder from './Util/Binary/Encoder.js'
|
||||
import BinaryEncoder from './Binary/Encoder.js'
|
||||
|
||||
/**
|
||||
* A transaction is created for every change on the Yjs model. It is possible
|
||||
465
src/Util/Tree.js
465
src/Util/Tree.js
@@ -1,465 +0,0 @@
|
||||
|
||||
function rotate (tree, parent, newParent, n) {
|
||||
if (parent === null) {
|
||||
tree.root = newParent
|
||||
newParent._parent = null
|
||||
} else if (parent.left === n) {
|
||||
parent.left = newParent
|
||||
} else if (parent.right === n) {
|
||||
parent.right = newParent
|
||||
} else {
|
||||
throw new Error('The elements are wrongly connected!')
|
||||
}
|
||||
}
|
||||
|
||||
class N {
|
||||
// A created node is always red!
|
||||
constructor (val) {
|
||||
this.val = val
|
||||
this.color = true
|
||||
this._left = null
|
||||
this._right = null
|
||||
this._parent = null
|
||||
}
|
||||
isRed () { return this.color }
|
||||
isBlack () { return !this.color }
|
||||
redden () { this.color = true; return this }
|
||||
blacken () { this.color = false; return this }
|
||||
get grandparent () {
|
||||
return this.parent.parent
|
||||
}
|
||||
get parent () {
|
||||
return this._parent
|
||||
}
|
||||
get sibling () {
|
||||
return (this === this.parent.left)
|
||||
? this.parent.right : this.parent.left
|
||||
}
|
||||
get left () {
|
||||
return this._left
|
||||
}
|
||||
get right () {
|
||||
return this._right
|
||||
}
|
||||
set left (n) {
|
||||
if (n !== null) {
|
||||
n._parent = this
|
||||
}
|
||||
this._left = n
|
||||
}
|
||||
set right (n) {
|
||||
if (n !== null) {
|
||||
n._parent = this
|
||||
}
|
||||
this._right = n
|
||||
}
|
||||
rotateLeft (tree) {
|
||||
const parent = this.parent
|
||||
const newParent = this.right
|
||||
const newRight = this.right.left
|
||||
newParent.left = this
|
||||
this.right = newRight
|
||||
rotate(tree, parent, newParent, this)
|
||||
}
|
||||
next () {
|
||||
if (this.right !== null) {
|
||||
// search the most left node in the right tree
|
||||
var o = this.right
|
||||
while (o.left !== null) {
|
||||
o = o.left
|
||||
}
|
||||
return o
|
||||
} else {
|
||||
var p = this
|
||||
while (p.parent !== null && p !== p.parent.left) {
|
||||
p = p.parent
|
||||
}
|
||||
return p.parent
|
||||
}
|
||||
}
|
||||
prev () {
|
||||
if (this.left !== null) {
|
||||
// search the most right node in the left tree
|
||||
var o = this.left
|
||||
while (o.right !== null) {
|
||||
o = o.right
|
||||
}
|
||||
return o
|
||||
} else {
|
||||
var p = this
|
||||
while (p.parent !== null && p !== p.parent.right) {
|
||||
p = p.parent
|
||||
}
|
||||
return p.parent
|
||||
}
|
||||
}
|
||||
rotateRight (tree) {
|
||||
const parent = this.parent
|
||||
const newParent = this.left
|
||||
const newLeft = this.left.right
|
||||
newParent.right = this
|
||||
this.left = newLeft
|
||||
rotate(tree, parent, newParent, this)
|
||||
}
|
||||
getUncle () {
|
||||
// we can assume that grandparent exists when this is called!
|
||||
if (this.parent === this.parent.parent.left) {
|
||||
return this.parent.parent.right
|
||||
} else {
|
||||
return this.parent.parent.left
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* This is a Red Black Tree implementation
|
||||
*/
|
||||
export default class Tree {
|
||||
constructor () {
|
||||
this.root = null
|
||||
this.length = 0
|
||||
}
|
||||
findNext (id) {
|
||||
var nextID = id.clone()
|
||||
nextID.clock += 1
|
||||
return this.findWithLowerBound(nextID)
|
||||
}
|
||||
findPrev (id) {
|
||||
let prevID = id.clone()
|
||||
prevID.clock -= 1
|
||||
return this.findWithUpperBound(prevID)
|
||||
}
|
||||
findNodeWithLowerBound (from) {
|
||||
var o = this.root
|
||||
if (o === null) {
|
||||
return null
|
||||
} else {
|
||||
while (true) {
|
||||
if (from === null || (from.lessThan(o.val._id) && o.left !== null)) {
|
||||
// o is included in the bound
|
||||
// try to find an element that is closer to the bound
|
||||
o = o.left
|
||||
} else if (from !== null && o.val._id.lessThan(from)) {
|
||||
// o is not within the bound, maybe one of the right elements is..
|
||||
if (o.right !== null) {
|
||||
o = o.right
|
||||
} else {
|
||||
// there is no right element. Search for the next bigger element,
|
||||
// this should be within the bounds
|
||||
return o.next()
|
||||
}
|
||||
} else {
|
||||
return o
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
findNodeWithUpperBound (to) {
|
||||
if (to === void 0) {
|
||||
throw new Error('You must define from!')
|
||||
}
|
||||
var o = this.root
|
||||
if (o === null) {
|
||||
return null
|
||||
} else {
|
||||
while (true) {
|
||||
if ((to === null || o.val._id.lessThan(to)) && o.right !== null) {
|
||||
// o is included in the bound
|
||||
// try to find an element that is closer to the bound
|
||||
o = o.right
|
||||
} else if (to !== null && to.lessThan(o.val._id)) {
|
||||
// o is not within the bound, maybe one of the left elements is..
|
||||
if (o.left !== null) {
|
||||
o = o.left
|
||||
} else {
|
||||
// there is no left element. Search for the prev smaller element,
|
||||
// this should be within the bounds
|
||||
return o.prev()
|
||||
}
|
||||
} else {
|
||||
return o
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
findSmallestNode () {
|
||||
var o = this.root
|
||||
while (o != null && o.left != null) {
|
||||
o = o.left
|
||||
}
|
||||
return o
|
||||
}
|
||||
findWithLowerBound (from) {
|
||||
var n = this.findNodeWithLowerBound(from)
|
||||
return n == null ? null : n.val
|
||||
}
|
||||
findWithUpperBound (to) {
|
||||
var n = this.findNodeWithUpperBound(to)
|
||||
return n == null ? null : n.val
|
||||
}
|
||||
iterate (from, to, f) {
|
||||
var o
|
||||
if (from === null) {
|
||||
o = this.findSmallestNode()
|
||||
} else {
|
||||
o = this.findNodeWithLowerBound(from)
|
||||
}
|
||||
while (
|
||||
o !== null &&
|
||||
(
|
||||
to === null || // eslint-disable-line no-unmodified-loop-condition
|
||||
o.val._id.lessThan(to) ||
|
||||
o.val._id.equals(to)
|
||||
)
|
||||
) {
|
||||
f(o.val)
|
||||
o = o.next()
|
||||
}
|
||||
}
|
||||
find (id) {
|
||||
let n = this.findNode(id)
|
||||
if (n !== null) {
|
||||
return n.val
|
||||
} else {
|
||||
return null
|
||||
}
|
||||
}
|
||||
findNode (id) {
|
||||
var o = this.root
|
||||
if (o === null) {
|
||||
return null
|
||||
} else {
|
||||
while (true) {
|
||||
if (o === null) {
|
||||
return null
|
||||
}
|
||||
if (id.lessThan(o.val._id)) {
|
||||
o = o.left
|
||||
} else if (o.val._id.lessThan(id)) {
|
||||
o = o.right
|
||||
} else {
|
||||
return o
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
delete (id) {
|
||||
var d = this.findNode(id)
|
||||
if (d == null) {
|
||||
// throw new Error('Element does not exist!')
|
||||
return
|
||||
}
|
||||
this.length--
|
||||
if (d.left !== null && d.right !== null) {
|
||||
// switch d with the greates element in the left subtree.
|
||||
// o should have at most one child.
|
||||
var o = d.left
|
||||
// find
|
||||
while (o.right !== null) {
|
||||
o = o.right
|
||||
}
|
||||
// switch
|
||||
d.val = o.val
|
||||
d = o
|
||||
}
|
||||
// d has at most one child
|
||||
// let n be the node that replaces d
|
||||
var isFakeChild
|
||||
var child = d.left || d.right
|
||||
if (child === null) {
|
||||
isFakeChild = true
|
||||
child = new N(null)
|
||||
child.blacken()
|
||||
d.right = child
|
||||
} else {
|
||||
isFakeChild = false
|
||||
}
|
||||
|
||||
if (d.parent === null) {
|
||||
if (!isFakeChild) {
|
||||
this.root = child
|
||||
child.blacken()
|
||||
child._parent = null
|
||||
} else {
|
||||
this.root = null
|
||||
}
|
||||
return
|
||||
} else if (d.parent.left === d) {
|
||||
d.parent.left = child
|
||||
} else if (d.parent.right === d) {
|
||||
d.parent.right = child
|
||||
} else {
|
||||
throw new Error('Impossible!')
|
||||
}
|
||||
if (d.isBlack()) {
|
||||
if (child.isRed()) {
|
||||
child.blacken()
|
||||
} else {
|
||||
this._fixDelete(child)
|
||||
}
|
||||
}
|
||||
this.root.blacken()
|
||||
if (isFakeChild) {
|
||||
if (child.parent.left === child) {
|
||||
child.parent.left = null
|
||||
} else if (child.parent.right === child) {
|
||||
child.parent.right = null
|
||||
} else {
|
||||
throw new Error('Impossible #3')
|
||||
}
|
||||
}
|
||||
}
|
||||
_fixDelete (n) {
|
||||
function isBlack (node) {
|
||||
return node !== null ? node.isBlack() : true
|
||||
}
|
||||
function isRed (node) {
|
||||
return node !== null ? node.isRed() : false
|
||||
}
|
||||
if (n.parent === null) {
|
||||
// this can only be called after the first iteration of fixDelete.
|
||||
return
|
||||
}
|
||||
// d was already replaced by the child
|
||||
// d is not the root
|
||||
// d and child are black
|
||||
var sibling = n.sibling
|
||||
if (isRed(sibling)) {
|
||||
// make sibling the grandfather
|
||||
n.parent.redden()
|
||||
sibling.blacken()
|
||||
if (n === n.parent.left) {
|
||||
n.parent.rotateLeft(this)
|
||||
} else if (n === n.parent.right) {
|
||||
n.parent.rotateRight(this)
|
||||
} else {
|
||||
throw new Error('Impossible #2')
|
||||
}
|
||||
sibling = n.sibling
|
||||
}
|
||||
// parent, sibling, and children of n are black
|
||||
if (n.parent.isBlack() &&
|
||||
sibling.isBlack() &&
|
||||
isBlack(sibling.left) &&
|
||||
isBlack(sibling.right)
|
||||
) {
|
||||
sibling.redden()
|
||||
this._fixDelete(n.parent)
|
||||
} else if (n.parent.isRed() &&
|
||||
sibling.isBlack() &&
|
||||
isBlack(sibling.left) &&
|
||||
isBlack(sibling.right)
|
||||
) {
|
||||
sibling.redden()
|
||||
n.parent.blacken()
|
||||
} else {
|
||||
if (n === n.parent.left &&
|
||||
sibling.isBlack() &&
|
||||
isRed(sibling.left) &&
|
||||
isBlack(sibling.right)
|
||||
) {
|
||||
sibling.redden()
|
||||
sibling.left.blacken()
|
||||
sibling.rotateRight(this)
|
||||
sibling = n.sibling
|
||||
} else if (n === n.parent.right &&
|
||||
sibling.isBlack() &&
|
||||
isRed(sibling.right) &&
|
||||
isBlack(sibling.left)
|
||||
) {
|
||||
sibling.redden()
|
||||
sibling.right.blacken()
|
||||
sibling.rotateLeft(this)
|
||||
sibling = n.sibling
|
||||
}
|
||||
sibling.color = n.parent.color
|
||||
n.parent.blacken()
|
||||
if (n === n.parent.left) {
|
||||
sibling.right.blacken()
|
||||
n.parent.rotateLeft(this)
|
||||
} else {
|
||||
sibling.left.blacken()
|
||||
n.parent.rotateRight(this)
|
||||
}
|
||||
}
|
||||
}
|
||||
put (v) {
|
||||
var node = new N(v)
|
||||
if (this.root !== null) {
|
||||
var p = this.root // p abbrev. parent
|
||||
while (true) {
|
||||
if (node.val._id.lessThan(p.val._id)) {
|
||||
if (p.left === null) {
|
||||
p.left = node
|
||||
break
|
||||
} else {
|
||||
p = p.left
|
||||
}
|
||||
} else if (p.val._id.lessThan(node.val._id)) {
|
||||
if (p.right === null) {
|
||||
p.right = node
|
||||
break
|
||||
} else {
|
||||
p = p.right
|
||||
}
|
||||
} else {
|
||||
p.val = node.val
|
||||
return p
|
||||
}
|
||||
}
|
||||
this._fixInsert(node)
|
||||
} else {
|
||||
this.root = node
|
||||
}
|
||||
this.length++
|
||||
this.root.blacken()
|
||||
return node
|
||||
}
|
||||
_fixInsert (n) {
|
||||
if (n.parent === null) {
|
||||
n.blacken()
|
||||
return
|
||||
} else if (n.parent.isBlack()) {
|
||||
return
|
||||
}
|
||||
var uncle = n.getUncle()
|
||||
if (uncle !== null && uncle.isRed()) {
|
||||
// Note: parent: red, uncle: red
|
||||
n.parent.blacken()
|
||||
uncle.blacken()
|
||||
n.grandparent.redden()
|
||||
this._fixInsert(n.grandparent)
|
||||
} else {
|
||||
// Note: parent: red, uncle: black or null
|
||||
// Now we transform the tree in such a way that
|
||||
// either of these holds:
|
||||
// 1) grandparent.left.isRed
|
||||
// and grandparent.left.left.isRed
|
||||
// 2) grandparent.right.isRed
|
||||
// and grandparent.right.right.isRed
|
||||
if (n === n.parent.right && n.parent === n.grandparent.left) {
|
||||
n.parent.rotateLeft(this)
|
||||
// Since we rotated and want to use the previous
|
||||
// cases, we need to set n in such a way that
|
||||
// n.parent.isRed again
|
||||
n = n.left
|
||||
} else if (n === n.parent.left && n.parent === n.grandparent.right) {
|
||||
n.parent.rotateRight(this)
|
||||
// see above
|
||||
n = n.right
|
||||
}
|
||||
// Case 1) or 2) hold from here on.
|
||||
// Now traverse grandparent, make parent a black node
|
||||
// on the highest level which holds two red nodes.
|
||||
n.parent.blacken()
|
||||
n.grandparent.redden()
|
||||
if (n === n.parent.left) {
|
||||
// Case 1
|
||||
n.grandparent.rotateRight(this)
|
||||
} else {
|
||||
// Case 2
|
||||
n.grandparent.rotateLeft(this)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,34 +0,0 @@
|
||||
// TODO: rename mutex
|
||||
|
||||
/**
|
||||
* Creates a mutual exclude function with the following property:
|
||||
*
|
||||
* @example
|
||||
* const mutualExclude = createMutualExclude()
|
||||
* mutualExclude(function () {
|
||||
* // This function is immediately executed
|
||||
* mutualExclude(function () {
|
||||
* // This function is never executed, as it is called with the same
|
||||
* // mutualExclude
|
||||
* })
|
||||
* })
|
||||
*
|
||||
* @return {Function} A mutual exclude function
|
||||
* @public
|
||||
*/
|
||||
export function createMutualExclude () {
|
||||
var token = true
|
||||
return function mutualExclude (f, g) {
|
||||
if (token) {
|
||||
token = false
|
||||
try {
|
||||
f()
|
||||
} catch (e) {
|
||||
console.error(e)
|
||||
}
|
||||
token = true
|
||||
} else if (g !== undefined) {
|
||||
g()
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,47 +0,0 @@
|
||||
|
||||
/**
|
||||
* A SimpleDiff describes a change on a String.
|
||||
*
|
||||
* @example
|
||||
* console.log(a) // the old value
|
||||
* console.log(b) // the updated value
|
||||
* // Apply changes of diff (pseudocode)
|
||||
* a.remove(diff.pos, diff.remove) // Remove `diff.remove` characters
|
||||
* a.insert(diff.pos, diff.insert) // Insert `diff.insert`
|
||||
* a === b // values match
|
||||
*
|
||||
* @typedef {Object} SimpleDiff
|
||||
* @property {Number} pos The index where changes were applied
|
||||
* @property {Number} delete The number of characters to delete starting
|
||||
* at `index`.
|
||||
* @property {String} insert The new text to insert at `index` after applying
|
||||
* `delete`
|
||||
*/
|
||||
|
||||
/**
|
||||
* Create a diff between two strings. This diff implementation is highly
|
||||
* efficient, but not very sophisticated.
|
||||
*
|
||||
* @public
|
||||
* @param {String} a The old version of the string
|
||||
* @param {String} b The updated version of the string
|
||||
* @return {SimpleDiff} The diff description.
|
||||
*/
|
||||
export default function simpleDiff (a, b) {
|
||||
let left = 0 // number of same characters counting from left
|
||||
let right = 0 // number of same characters counting from right
|
||||
while (left < a.length && left < b.length && a[left] === b[left]) {
|
||||
left++
|
||||
}
|
||||
if (left !== a.length || left !== b.length) {
|
||||
// Only check right if a !== b
|
||||
while (right + left < a.length && right + left < b.length && a[a.length - right - 1] === b[b.length - right - 1]) {
|
||||
right++
|
||||
}
|
||||
}
|
||||
return {
|
||||
pos: left, // TODO: rename to index (also in type above)
|
||||
remove: a.length - left - right,
|
||||
insert: b.slice(left, b.length - right)
|
||||
}
|
||||
}
|
||||
@@ -1,7 +1,7 @@
|
||||
|
||||
import Y from './Y.js'
|
||||
import UndoManager from './Util/UndoManager.js'
|
||||
import { integrateRemoteStructs } from './MessageHandler/integrateRemoteStructs.js'
|
||||
export { default as Y } from './Y.js'
|
||||
export { default as UndoManager } from './Util/UndoManager.js'
|
||||
export { integrateRemoteStructs } from './MessageHandler/integrateRemoteStructs.js'
|
||||
|
||||
import Connector from './Connector.js'
|
||||
import Persistence from './Persistence.js'
|
||||
@@ -52,5 +52,3 @@ Y.utils = {
|
||||
toBinary,
|
||||
fromBinary
|
||||
}
|
||||
|
||||
export default Y
|
||||
|
||||
4
src/Y.js
4
src/Y.js
@@ -3,8 +3,8 @@ import OperationStore from './Store/OperationStore.js'
|
||||
import StateStore from './Store/StateStore.js'
|
||||
import { generateRandomUint32 } from './Util/generateRandomUint32.js'
|
||||
import RootID from './Util/ID/RootID.js'
|
||||
import NamedEventHandler from './Util/NamedEventHandler.js'
|
||||
import Transaction from './Transaction.js'
|
||||
import NamedEventHandler from '../lib/NamedEventHandler.js'
|
||||
import Transaction from './Util/Transaction.js'
|
||||
|
||||
export { default as DomBinding } from './Bindings/DomBinding/DomBinding.js'
|
||||
|
||||
|
||||
1
src/YdbClient/README.md
Normal file
1
src/YdbClient/README.md
Normal file
@@ -0,0 +1 @@
|
||||
* Host should discard message when confNumber is older than expected
|
||||
56
src/YdbClient/TODO.md
Normal file
56
src/YdbClient/TODO.md
Normal file
@@ -0,0 +1,56 @@
|
||||
Implement default dom filter..
|
||||
|
||||
But requires more explicit filtering of src attributes
|
||||
|
||||
e.g. src="java\nscript:alert(0)"
|
||||
|
||||
function domFilter (nodeName, attributes) {
|
||||
// Filter all attributes that start with on*. E.g. onclick does execute code
|
||||
// If key is 'href' or 'src', filter everything but 'http*', 'blob*', or 'data:image*' urls
|
||||
attributes.forEach(function (value, key) {
|
||||
key = key.toLowerCase();
|
||||
value = value.toLowerCase();
|
||||
if (key != null && (
|
||||
// filter all attributes starting with 'on'
|
||||
key.substr(0, 2) === 'on' ||
|
||||
// if key is 'href' or 'src', filter everything but http, blob, or data:image
|
||||
(
|
||||
(key === 'href' || key === 'src') &&
|
||||
value.substr(0, 4) !== 'http' &&
|
||||
value.substr(0, 4) !== 'blob' &&
|
||||
value.substr(0, 10) !== 'data:image'
|
||||
)
|
||||
)) {
|
||||
attributes.delete(key);
|
||||
}
|
||||
});
|
||||
switch (nodeName) {
|
||||
case 'SCRIPT':
|
||||
return null;
|
||||
case 'EN-ADORNMENTS':
|
||||
// TODO: Remove EN-ADORNMENTS check when merged into master branch!
|
||||
return null;
|
||||
case 'EN-TABLE':
|
||||
attributes.delete('class');
|
||||
return attributes;
|
||||
case 'EN-COMMENT':
|
||||
attributes.delete('style');
|
||||
attributes.delete('class');
|
||||
return attributes;
|
||||
case 'SPAN':
|
||||
return (attributes.get('id') || '').substr(0, 5) === 'goog_' ? null : attributes;
|
||||
case 'TD':
|
||||
attributes.delete('class');
|
||||
return attributes;
|
||||
case 'EMBED':
|
||||
attributes.delete('src');
|
||||
attributes.delete('style');
|
||||
attributes.delete('data-reference');
|
||||
return attributes;
|
||||
case 'FORM':
|
||||
attributes.delete('action');
|
||||
return attributes;
|
||||
default:
|
||||
return (nodeName || '').substr(0, 3) === 'UI-' ? null : attributes;
|
||||
}
|
||||
}
|
||||
165
src/YdbClient/YdbClient.js
Normal file
165
src/YdbClient/YdbClient.js
Normal file
@@ -0,0 +1,165 @@
|
||||
/* eslint-env browser */
|
||||
import * as idbactions from './idbactions.js'
|
||||
import * as globals from './globals.js'
|
||||
import * as message from './message.js'
|
||||
import * as bc from './broadcastchannel.js'
|
||||
import * as encoding from './encoding.js'
|
||||
import * as logging from './logging.js'
|
||||
import * as idb from './idb.js'
|
||||
import Y from '../src/Y.js'
|
||||
import BinaryDecoder from '../src/Util/Binary/Decoder.js'
|
||||
import { integrateRemoteStruct } from '../src/MessageHandler/integrateRemoteStructs.js'
|
||||
import { createMutualExclude } from '../src/Util/mutualExclude.js'
|
||||
|
||||
export class YdbClient {
|
||||
constructor (url, db) {
|
||||
this.url = url
|
||||
this.ws = new WebSocket(url)
|
||||
this.rooms = new Map()
|
||||
this.db = db
|
||||
this.connected = false
|
||||
initWS(this, this.ws)
|
||||
}
|
||||
/**
|
||||
* Open a Yjs instance that connects to `roomname`.
|
||||
* @param {string} roomname
|
||||
* @return {Y}
|
||||
*/
|
||||
getY (roomname) {
|
||||
const y = new Y(roomname)
|
||||
const mutex = createMutualExclude()
|
||||
y.on('afterTransaction', (y, transaction) => mutex(() => {
|
||||
if (transaction.encodedStructsLen > 0) {
|
||||
update(this, roomname, transaction.encodedStructs.createBuffer())
|
||||
}
|
||||
}))
|
||||
subscribe(this, roomname, update => mutex(() => {
|
||||
y.transact(() => {
|
||||
const decoder = new BinaryDecoder(update)
|
||||
while (decoder.hasContent()) {
|
||||
integrateRemoteStruct(y, decoder)
|
||||
}
|
||||
}, true)
|
||||
}))
|
||||
return y
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize WebSocket connection. Try to reconnect on error/disconnect.
|
||||
* @param {YdbClient} ydb
|
||||
* @param {WebSocket} ws
|
||||
*/
|
||||
const initWS = (ydb, ws) => {
|
||||
ws.binaryType = 'arraybuffer'
|
||||
ws.onclose = () => {
|
||||
ydb.connected = false
|
||||
logging.log('Disconnected from ydb. Reconnecting..')
|
||||
ydb.ws = new WebSocket(ydb.url)
|
||||
initWS(ydb, ws)
|
||||
}
|
||||
ws.onopen = () => {
|
||||
const t = idbactions.createTransaction(ydb.db)
|
||||
globals.pall([idbactions.getRoomMetas(t), idbactions.getUnconfirmedSubscriptions(t), idbactions.getUnconfirmedUpdates(t)]).then(([metas, us, unconfirmedUpdates]) => {
|
||||
const subs = []
|
||||
metas.forEach(meta => {
|
||||
subs.push({
|
||||
room: meta.room,
|
||||
offset: meta.offset
|
||||
})
|
||||
})
|
||||
us.forEach(room => {
|
||||
subs.push({
|
||||
room, offset: 0
|
||||
})
|
||||
})
|
||||
ydb.connected = true
|
||||
const encoder = encoding.createEncoder()
|
||||
encoding.writeArrayBuffer(encoder, message.createSub(subs))
|
||||
encoding.writeArrayBuffer(encoder, unconfirmedUpdates)
|
||||
send(ydb, encoding.toBuffer(encoder))
|
||||
})
|
||||
}
|
||||
ws.onmessage = event => message.readMessage(ydb, event.data)
|
||||
}
|
||||
|
||||
// maps from dbNamespace to db
|
||||
const dbPromises = new Map()
|
||||
|
||||
/**
|
||||
* Factory function. Get a ydb instance that connects to url, and uses dbNamespace as indexeddb namespace.
|
||||
* Create if it does not exist yet.
|
||||
*
|
||||
* @param {string} url
|
||||
* @param {string} dbNamespace
|
||||
* @return {Promise<YdbClient>}
|
||||
*/
|
||||
export const get = (url, dbNamespace = 'ydb') => {
|
||||
if (!dbPromises.has(dbNamespace)) {
|
||||
dbPromises.set(dbNamespace, idbactions.openDB(dbNamespace))
|
||||
}
|
||||
return dbPromises.get(dbNamespace).then(db => globals.presolve(new YdbClient(url, db)))
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove a db namespace. Call this to remove any persisted data. Make sure to close active sessions.
|
||||
* TODO: destroy active ydbClient sessions / throw if a session is still active
|
||||
* @param {string} dbNamespace
|
||||
* @return {Promise}
|
||||
*/
|
||||
export const clear = (dbNamespace = 'ydb') => idb.deleteDB(dbNamespace)
|
||||
|
||||
/**
|
||||
* @param {YdbClient} ydb
|
||||
* @param {ArrayBuffer} m
|
||||
*/
|
||||
export const send = (ydb, m) => ydb.connected && ydb.ws.send(m)
|
||||
|
||||
/**
|
||||
* @param {YdbClient} ydb
|
||||
* @param {string} room
|
||||
* @param {ArrayBuffer} update
|
||||
*/
|
||||
export const update = (ydb, room, update) => {
|
||||
bc.publish(room, update)
|
||||
const t = idbactions.createTransaction(ydb.db)
|
||||
logging.log(`Write Unconfirmed Update. room "${room}", ${JSON.stringify(update)}`)
|
||||
return idbactions.writeClientUnconfirmed(t, room, update).then(clientConf => {
|
||||
logging.log(`Send Unconfirmed Update. connected ${ydb.connected} room "${room}", clientConf ${clientConf}`)
|
||||
send(ydb, message.createUpdate(room, update, clientConf))
|
||||
})
|
||||
}
|
||||
|
||||
export const subscribe = (ydb, room, f) => {
|
||||
bc.subscribe(room, f)
|
||||
const t = idbactions.createTransaction(ydb.db)
|
||||
idbactions.getRoomData(t, room).then(data => {
|
||||
if (data.byteLength > 0) {
|
||||
f(data)
|
||||
}
|
||||
})
|
||||
idbactions.getRoomMeta(t, room).then(meta => {
|
||||
if (meta === undefined) {
|
||||
logging.log(`Send Subscribe. room "${room}", offset ${0}`)
|
||||
// TODO: maybe set prelim meta value so we don't sub twice
|
||||
send(ydb, message.createSub([{ room, offset: 0 }]))
|
||||
idbactions.writeUnconfirmedSubscription(t, room)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
export const subscribeRooms = (ydb, rooms) => {
|
||||
const t = idbactions.createTransaction(ydb.db)
|
||||
const subs = []
|
||||
return globals.pall(rooms.map(room => idbactions.getRoomMeta(t, room).then(meta => {
|
||||
if (meta === undefined) {
|
||||
subs.push(room)
|
||||
return idbactions.writeUnconfirmedSubscription(t, room)
|
||||
}
|
||||
}))).then(() => {
|
||||
// write all sub messages when all unconfirmed subs are writted to idb
|
||||
if (subs.length > 0) {
|
||||
send(ydb, message.createSub(rooms.map(room => ({room, offset: 0}))))
|
||||
}
|
||||
})
|
||||
}
|
||||
85
src/YdbClient/YdbClient.test.js
Normal file
85
src/YdbClient/YdbClient.test.js
Normal file
@@ -0,0 +1,85 @@
|
||||
/* eslint-env browser */
|
||||
|
||||
import * as test from './test.js'
|
||||
import * as ydbClient from './YdbClient.js'
|
||||
import * as globals from './globals.js'
|
||||
import * as idbactions from './idbactions.js'
|
||||
import * as logging from './logging.js'
|
||||
|
||||
const wsUrl = 'ws://127.0.0.1:8899/ws'
|
||||
const testRoom = 'testroom'
|
||||
|
||||
class YdbTestClient {
|
||||
constructor (ydb) {
|
||||
this.ydb = ydb
|
||||
this.createdUpdates = new Set()
|
||||
this.data = []
|
||||
this.checked = new Set()
|
||||
}
|
||||
}
|
||||
|
||||
const clearAllYdbContent = () => fetch('http://127.0.0.1:8899/clearAll', { mode: 'no-cors' })
|
||||
|
||||
/**
|
||||
* @param {string} name
|
||||
* @return {Promise<YdbTestClient>}
|
||||
*/
|
||||
const getTestClient = async name => {
|
||||
await ydbClient.clear('ydb-' + name)
|
||||
const ydb = await ydbClient.get(wsUrl, 'ydb-' + name)
|
||||
const testClient = new YdbTestClient(ydb)
|
||||
ydbClient.subscribe(ydb, testRoom, data => {
|
||||
testClient.data.push(data)
|
||||
globals.createArrayFromArrayBuffer(data).forEach(d => {
|
||||
if (d < nextUpdateNumber) {
|
||||
testClient.checked.add(d)
|
||||
}
|
||||
})
|
||||
console.log(name, 'received data', data, testClient.data)
|
||||
})
|
||||
return testClient
|
||||
}
|
||||
|
||||
// TODO: does only work for 8bit numbers..
|
||||
let nextUpdateNumber = 0
|
||||
|
||||
/**
|
||||
* Create an update. We use an increasing message counter so each update is unique.
|
||||
* @param {YdbTestClient} client
|
||||
*/
|
||||
const update = (client) => {
|
||||
ydbClient.update(client.ydb, testRoom, globals.createArrayBufferFromArray([nextUpdateNumber++]))
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if tsclient has all data in dataset
|
||||
* @param {...YdbTestClient} testClients
|
||||
*/
|
||||
const checkTestClients = (...testClients) => globals.until(100000, () => testClients.every(testClient =>
|
||||
testClient.checked.size === nextUpdateNumber
|
||||
)).then(() =>
|
||||
globals.wait(150) // wait 150 for all conf messages to come in..
|
||||
// TODO: do the below check in the until handler
|
||||
).then(() => globals.pall(testClients.map(testClient => idbactions.getRoomData(idbactions.createTransaction(testClient.ydb.db), testRoom)))).then(testClientsData => {
|
||||
testClientsData.forEach((testClientData, i) => {
|
||||
const checked = new Set()
|
||||
globals.createArrayFromArrayBuffer(testClientData).forEach(d => {
|
||||
if (checked.has(d)) {
|
||||
logging.fail('duplicate content')
|
||||
}
|
||||
checked.add(d)
|
||||
})
|
||||
if (checked.size !== nextUpdateNumber) {
|
||||
logging.fail(`Not all data is available in idb in client ${i}`)
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
clearAllYdbContent().then(() => {
|
||||
test.run('ydb-client', async testname => {
|
||||
const c1 = await getTestClient('1')
|
||||
const c2 = await getTestClient('2')
|
||||
update(c1)
|
||||
await checkTestClients(c1, c2)
|
||||
})
|
||||
})
|
||||
45
src/YdbClient/broadcastchannel.js
Normal file
45
src/YdbClient/broadcastchannel.js
Normal file
@@ -0,0 +1,45 @@
|
||||
/* eslint-env browser */
|
||||
|
||||
import * as decoding from './decoding.js'
|
||||
import * as encoding from './encoding.js'
|
||||
|
||||
const bc = new BroadcastChannel('ydb-client')
|
||||
const subs = new Map()
|
||||
|
||||
bc.onmessage = event => {
|
||||
const decoder = decoding.createDecoder(event.data)
|
||||
const room = decoding.readVarString(decoder)
|
||||
const update = decoding.readTail(decoder)
|
||||
const rsubs = subs.get(room)
|
||||
if (rsubs !== undefined) {
|
||||
rsubs.forEach(f => f(update))
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} room
|
||||
* @param {function(ArrayBuffer)} f
|
||||
*/
|
||||
export const subscribe = (room, f) => {
|
||||
let rsubs = subs.get(room)
|
||||
if (rsubs === undefined) {
|
||||
rsubs = new Set()
|
||||
subs.set(room, rsubs)
|
||||
}
|
||||
rsubs.add(f)
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} room
|
||||
* @param {ArrayBuffer} update
|
||||
*/
|
||||
export const publish = (room, update) => {
|
||||
const encoder = encoding.createEncoder()
|
||||
encoding.writeVarString(encoder, room)
|
||||
encoding.writeArrayBuffer(encoder, update)
|
||||
bc.postMessage(encoding.toBuffer(encoder))
|
||||
const rsubs = subs.get(room)
|
||||
if (rsubs !== undefined) {
|
||||
rsubs.forEach(f => f(update))
|
||||
}
|
||||
}
|
||||
315
src/YdbClient/idbactions.js
Normal file
315
src/YdbClient/idbactions.js
Normal file
@@ -0,0 +1,315 @@
|
||||
/* eslint-env browser */
|
||||
|
||||
/**
|
||||
* Naming conventions:
|
||||
* * ydb: Think of ydb as a federated set of servers. This is not yet true, but we will eventually get there. With this assumption come some challenges with the client
|
||||
* * ydb instance: A single ydb instance that this ydb-client connects to
|
||||
* * (room) host: Exactly one ydb instance controls a room at any time. The ownership may change over time. The host of a room is the ydb instance that owns it. This is not necessarily the instance we connect to.
|
||||
* * room session id: An random id that is assigned to a room. When the server dies unexpectedly, we can conclude which data is missing and send it to the server (or delete it and prevent duplicate content)
|
||||
* * update: An ArrayBuffer of binary data. Neither Ydb nor Ydb-client care about the content of update. Updates may be appended to each other.
|
||||
*
|
||||
* The database has four tables:
|
||||
*
|
||||
* CU "client-unconfirmed" confid -> room, update
|
||||
* - The client writes to this table when it creates an update.
|
||||
* - Then it sends an update to the host with the generated confid
|
||||
* - In case the host doesn't confirm that it received this update, it is sent again on next sync
|
||||
* HU "host-unconfirmed" room, offset -> update
|
||||
* - Updates from the host are written to this table
|
||||
* - When host confirms that an unconfirmed update was persisted, the update is written to the Co table
|
||||
* - When client sync to host and the room session ids don't match, all host-unconfirmed messages are sent to host
|
||||
* Co "confirmed":
|
||||
* data:{room} -> update
|
||||
* - this field holds confirmed room updates
|
||||
* meta:{room} -> room session id, confirmed offset
|
||||
* - this field holds metadata about the room
|
||||
* US "unconfirmed-subscriptions" room -> _
|
||||
* - Subscriptions sent to the server, but didn't receive confirmation yet
|
||||
* - Either a room is in US or in Co
|
||||
* - A client may update a room when the room is in either US or Co
|
||||
*/
|
||||
|
||||
import * as encoding from './encoding.js'
|
||||
import * as decoding from './decoding.js'
|
||||
import * as idb from './idb.js'
|
||||
import * as globals from './globals.js'
|
||||
import * as message from './message.js'
|
||||
|
||||
/**
|
||||
* Get 'client-unconfirmed' store from transaction
|
||||
* @param {IDBTransaction} t
|
||||
* @return {IDBObjectStore}
|
||||
*/
|
||||
const getStoreCU = t => idb.getStore(t, STORE_CU)
|
||||
/**
|
||||
* Get 'host-unconfirmed' store from transaction
|
||||
* @param {IDBTransaction} t
|
||||
* @return {IDBObjectStore}
|
||||
*/
|
||||
const getStoreHU = t => idb.getStore(t, STORE_HU)
|
||||
/**
|
||||
* Get 'confirmed' store from transaction
|
||||
* @param {IDBTransaction} t
|
||||
* @return {IDBObjectStore}
|
||||
*/
|
||||
const getStoreCo = t => idb.getStore(t, STORE_CO)
|
||||
|
||||
/**
|
||||
* Get `unconfirmed-subscriptions` store from transaction
|
||||
* @param {IDBTransaction} t
|
||||
* @return {IDBObjectStore}
|
||||
*/
|
||||
const getStoreUS = t => idb.getStore(t, STORE_US)
|
||||
|
||||
/**
|
||||
* @param {string} room
|
||||
* @param {number} offset
|
||||
* @return {[string, number]}
|
||||
*/
|
||||
const encodeHUKey = (room, offset) => [room, offset]
|
||||
|
||||
/**
|
||||
* @typedef RoomAndOffset
|
||||
* @type {Object}
|
||||
* @property {string} room
|
||||
* @property {number} offset Received offsets (including offsets that are not yet confirmed)
|
||||
*/
|
||||
|
||||
/**
|
||||
* @param {[string, number]} key
|
||||
* @return {RoomAndOffset}
|
||||
*/
|
||||
const decodeHUKey = key => {
|
||||
return {
|
||||
room: key[0],
|
||||
offset: key[1]
|
||||
}
|
||||
}
|
||||
|
||||
const getCoMetaKey = room => 'meta:' + room
|
||||
const getCoDataKey = room => 'data:' + room
|
||||
|
||||
const STORE_CU = 'client-unconfirmed'
|
||||
const STORE_US = 'unconfirmed-subscriptions'
|
||||
const STORE_CO = 'confirmed'
|
||||
const STORE_HU = 'host-unconfirmed'
|
||||
|
||||
/**
|
||||
* @param {string} dbNamespace
|
||||
* @return {Promise<IDBDatabase>}
|
||||
*/
|
||||
export const openDB = dbNamespace => idb.openDB(dbNamespace, db => idb.createStores(db, [
|
||||
[STORE_CU, { autoIncrement: true }],
|
||||
[STORE_HU],
|
||||
[STORE_CO],
|
||||
[STORE_US]
|
||||
]))
|
||||
|
||||
export const deleteDB = name => idb.deleteDB(name)
|
||||
|
||||
/**
|
||||
* Create a new IDBTransaction accessing all object stores. Normally we should care that we can access object stores in parallel.
|
||||
* But this is not possible in ydb-client since at least two object stores are requested in every IDB change.
|
||||
* @param {IDBDatabase} db
|
||||
* @return {IDBTransaction}
|
||||
*/
|
||||
export const createTransaction = db => db.transaction([STORE_CU, STORE_HU, STORE_CO, STORE_US], 'readwrite')
|
||||
|
||||
/**
|
||||
* Write an update to the db after the client created it. This update is not yet received by the host.
|
||||
* This function returns a client confirmation number. The confirmation number must be send to the host so it can identify the update,
|
||||
* and we can move the update to HU when it is confirmed (@see writeHostUnconfirmedByClient)
|
||||
* @param {IDBTransaction} t
|
||||
* @param {String} room
|
||||
* @param {ArrayBuffer} update
|
||||
* @return {Promise<number>} client confirmation number
|
||||
*/
|
||||
export const writeClientUnconfirmed = (t, room, update) => {
|
||||
const encoder = encoding.createEncoder()
|
||||
encoding.writeVarString(encoder, room)
|
||||
encoding.writeArrayBuffer(encoder, update)
|
||||
return idb.addAutoKey(getStoreCU(t), encoding.toBuffer(encoder))
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all updates that are not yet confirmed by host.
|
||||
* @param {IDBTransaction} t
|
||||
* @return {Promise<ArrayBuffer>} All update messages as a single ArrayBuffer
|
||||
*/
|
||||
export const getUnconfirmedUpdates = t => {
|
||||
const encoder = encoding.createEncoder()
|
||||
return idb.iterate(getStoreCU(t), null, (value, clientConf) => {
|
||||
const decoder = decoding.createDecoder(value)
|
||||
const room = decoding.readVarString(decoder)
|
||||
const update = decoding.readTail(decoder)
|
||||
encoding.writeArrayBuffer(encoder, message.createUpdate(room, update, clientConf))
|
||||
}).then(() => encoding.toBuffer(encoder))
|
||||
}
|
||||
|
||||
/**
|
||||
* The host confirms that it received and persisted an update. The update can be safely removed from CU.
|
||||
* It is necessary to call this function in case that the client disconnected before the host could send `writeHostUnconfirmedByClient`.
|
||||
* @param {IDBTransaction} t
|
||||
* @param {number} clientConf
|
||||
* @return {Promise}
|
||||
*/
|
||||
export const confirmClient = (t, clientConf) => idb.del(getStoreCU(t), idb.createIDBKeyRangeUpperBound(clientConf, false))
|
||||
|
||||
/**
|
||||
* The host confirms that it received and broadcasted an update sent from this client.
|
||||
* Calling this method does not confirm that the update has been persisted by the server.
|
||||
*
|
||||
* Other clients will receive an update with `writeHostUnconfirmed`. Since this client created the update, it only receives a confirmation. So
|
||||
* we can simply move the update from CU to HU.
|
||||
*
|
||||
* @param {IDBTransaction} t
|
||||
* @param {number} clientConf The client confirmation number that identifies the update
|
||||
* @param {number} offset The offset with wich the server will store the information
|
||||
*/
|
||||
export const writeHostUnconfirmedByClient = (t, clientConf, offset) => idb.get(getStoreCU(t), clientConf).then(roomAndUpdate => {
|
||||
const decoder = decoding.createDecoder(roomAndUpdate)
|
||||
const room = decoding.readVarString(decoder)
|
||||
const update = decoding.readTail(decoder)
|
||||
return writeHostUnconfirmed(t, room, offset, update).then(() =>
|
||||
idb.del(getStoreCU(t), clientConf)
|
||||
)
|
||||
})
|
||||
|
||||
/**
|
||||
* The host broadcasts an update created by another client. It assures that the update will eventually be persisted with
|
||||
* `offset`. Calling this function does not imply that the update was persisted by the host. In case of mismatching room session ids
|
||||
* the updates in HU will be sent to the server.
|
||||
*
|
||||
* @param {IDBTransaction} t
|
||||
* @param {String} room
|
||||
* @param {number} offset
|
||||
* @param {ArrayBuffer} update
|
||||
* @return {Promise}
|
||||
*/
|
||||
export const writeHostUnconfirmed = (t, room, offset, update) => idb.put(getStoreHU(t), update, encodeHUKey(room, offset))
|
||||
|
||||
/**
|
||||
* The host confirms that it persisted updates up until (including) offset. updates may be moved from HU to Co.
|
||||
*
|
||||
* @param {IDBTransaction} t
|
||||
* @param {String} room
|
||||
* @param {number} offset Inclusive range [0, offset - 1] has been stored to host
|
||||
*/
|
||||
export const writeConfirmedByHost = (t, room, offset) => {
|
||||
const co = getStoreCo(t)
|
||||
return globals.pall([idb.get(co, getCoDataKey(room)), idb.get(co, getCoMetaKey(room))]).then(async arr => {
|
||||
const data = arr[0]
|
||||
const meta = decodeMetaValue(arr[1])
|
||||
const dataEncoder = encoding.createEncoder()
|
||||
if (meta.offset >= offset) {
|
||||
return // nothing to do
|
||||
}
|
||||
encoding.writeArrayBuffer(dataEncoder, data)
|
||||
const hu = getStoreHU(t)
|
||||
const huKeyRange = idb.createIDBKeyRangeBound(encodeHUKey(room, 0), encodeHUKey(room, offset), false, false)
|
||||
return idb.iterate(hu, huKeyRange, (value, _key) => {
|
||||
const key = decodeHUKey(_key) // @kevin _key is an array. remove decodeHUKey functions
|
||||
if (key.room === room && key.offset <= offset) {
|
||||
encoding.writeArrayBuffer(dataEncoder, value)
|
||||
}
|
||||
}).then(() => {
|
||||
globals.pall([idb.put(co, encodeMetaValue(meta.roomsid, offset), getCoMetaKey(room)), idb.put(co, encoding.toBuffer(dataEncoder), getCoDataKey(room)), idb.del(hu, huKeyRange)])
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* @typedef RoomMeta
|
||||
* @type {Object}
|
||||
* @property {string} room
|
||||
* @property {number} roomsid Room session id
|
||||
* @property {number} offset Received offsets (including offsets that are not yet confirmed)
|
||||
*/
|
||||
|
||||
/**
|
||||
* Get all meta information for all rooms.
|
||||
*
|
||||
* @param {IDBTransaction} t
|
||||
* @return {Promise<Array<RoomMeta>>}
|
||||
*/
|
||||
export const getRoomMetas = t => {
|
||||
const hu = getStoreHU(t)
|
||||
const result = []
|
||||
return idb.iterate(getStoreCo(t), idb.createIDBKeyRangeLowerBound('meta:', false), (metavalue, metakey) =>
|
||||
idb.getAllKeys(hu, idb.createIDBKeyRangeBound(encodeHUKey(metakey.slice(5), 0), encodeHUKey(metakey.slice(5), 2 ** 32), false, false)).then(keys => {
|
||||
const { roomsid, offset } = decodeMetaValue(metavalue)
|
||||
result.push({
|
||||
room: metakey.slice(5),
|
||||
roomsid,
|
||||
offset: keys.reduce((cur, key) => globals.max(decodeHUKey(key).offset, cur), offset)
|
||||
})
|
||||
})
|
||||
).then(() => globals.presolve(result))
|
||||
}
|
||||
|
||||
export const getRoomMeta = (t, room) =>
|
||||
idb.get(getStoreCo(t), getCoMetaKey(room))
|
||||
|
||||
/**
|
||||
* Get all data from idb, including unconfirmed updates.
|
||||
* TODO: include updates in CU
|
||||
* @param {IDBTransaction} t
|
||||
* @param {string} room
|
||||
* @return {Promise<ArrayBuffer>}
|
||||
*/
|
||||
export const getRoomData = (t, room) => globals.pall([idb.get(getStoreCo(t), 'data:' + room), idb.getAll(getStoreHU(t), idb.createIDBKeyRangeBound(encodeHUKey(room, 0), encodeHUKey(room, 2 ** 32), false, false))]).then(([data, updates]) => {
|
||||
const encoder = encoding.createEncoder()
|
||||
encoding.writeArrayBuffer(encoder, data || new Uint8Array(0))
|
||||
updates.forEach(update => encoding.writeArrayBuffer(encoder, update))
|
||||
return encoding.toBuffer(encoder)
|
||||
})
|
||||
|
||||
const decodeMetaValue = buffer => {
|
||||
const decoder = decoding.createDecoder(buffer)
|
||||
const roomsid = decoding.readVarUint(decoder)
|
||||
const offset = decoding.readVarUint(decoder)
|
||||
return {
|
||||
roomsid, offset
|
||||
}
|
||||
}
|
||||
/**
|
||||
* @param {number} roomsid room session id
|
||||
* @param {number} offset
|
||||
* @return {ArrayBuffer}
|
||||
*/
|
||||
const encodeMetaValue = (roomsid, offset) => {
|
||||
const encoder = encoding.createEncoder()
|
||||
encoding.writeVarUint(encoder, roomsid)
|
||||
encoding.writeVarUint(encoder, offset)
|
||||
return encoding.toBuffer(encoder)
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the initial room data. Overwrites initial data if there is any!
|
||||
* @param {IDBTransaction} t
|
||||
* @param {string} room
|
||||
* @param {number} roomsessionid
|
||||
* @param {number} offset
|
||||
* @return {Promise<void>}
|
||||
*/
|
||||
export const confirmSubscription = (t, room, roomsessionid, offset) => idb.get(getStoreCo(t), getCoMetaKey(room)).then(metaval => {
|
||||
if (metaval === undefined) {
|
||||
return globals.pall([
|
||||
idb.put(getStoreCo(t), encodeMetaValue(roomsessionid, offset), getCoMetaKey(room)),
|
||||
idb.put(getStoreCo(t), globals.createArrayBufferFromArray([]), getCoDataKey(room))
|
||||
]).then(() => idb.del(getStoreUS(t), room))
|
||||
}
|
||||
const meta = decodeMetaValue(metaval)
|
||||
if (meta.roomsid !== roomsessionid) {
|
||||
// TODO: upload all unconfirmed updates
|
||||
// or do a Yjs sync with server
|
||||
} else if (meta.roomsid < offset) {
|
||||
return writeConfirmedByHost(t, room, offset)
|
||||
} else {
|
||||
// nothing needs to happen
|
||||
}
|
||||
})
|
||||
|
||||
export const writeUnconfirmedSubscription = (t, room) => idb.put(getStoreUS(t), true, room)
|
||||
|
||||
export const getUnconfirmedSubscriptions = t => idb.getAllKeys(getStoreUS(t))
|
||||
23
src/YdbClient/idbactions.test.js
Normal file
23
src/YdbClient/idbactions.test.js
Normal file
@@ -0,0 +1,23 @@
|
||||
import * as globals from './globals.js'
|
||||
import * as idbactions from './idbactions.js'
|
||||
import * as test from './test.js'
|
||||
|
||||
idbactions.deleteDB().then(() => idbactions.openDB()).then(db => {
|
||||
test.run('update lifetime 1', async (testname) => {
|
||||
const update = new Uint8Array([1, 2, 3]).buffer
|
||||
const t = idbactions.createTransaction(db)
|
||||
idbactions.writeInitialRoomData(t, testname, 42, 1, new Uint8Array([0]).buffer)
|
||||
const clientConf = await idbactions.writeClientUnconfirmed(t, testname, update)
|
||||
await idbactions.writeHostUnconfirmedByClient(t, clientConf, 0)
|
||||
await idbactions.writeConfirmedByHost(t, testname, 4)
|
||||
const metas = await idbactions.getRoomMetas(t)
|
||||
const roommeta = metas.find(meta => meta.room === testname)
|
||||
if (roommeta == null || roommeta.offset !== 4 || roommeta.roomsid !== 42) {
|
||||
throw globals.error()
|
||||
}
|
||||
const data = await idbactions.getRoomData(t, testname)
|
||||
if (!test.compareArrays(new Uint8Array(data), new Uint8Array([0, 1, 2, 3]))) {
|
||||
throw globals.error()
|
||||
}
|
||||
})
|
||||
})
|
||||
7
src/YdbClient/index.js
Normal file
7
src/YdbClient/index.js
Normal file
@@ -0,0 +1,7 @@
|
||||
import * as ydbclient from './YdbClient.js'
|
||||
|
||||
/**
|
||||
* @param {string} url
|
||||
* @return {Promise<ydbclient.YdbClient>}
|
||||
*/
|
||||
export const createYdbClient = url => ydbclient.get(url)
|
||||
106
src/YdbClient/message.js
Normal file
106
src/YdbClient/message.js
Normal file
@@ -0,0 +1,106 @@
|
||||
import * as encoding from './encoding.js'
|
||||
import * as decoding from './decoding.js'
|
||||
import * as idbactions from './idbactions.js'
|
||||
import * as logging from './logging.js'
|
||||
import * as bc from './broadcastchannel.js'
|
||||
|
||||
/* make sure to update message.go in ydb when updating these values.. */
|
||||
export const MESSAGE_UPDATE = 0 // TODO: rename host_unconfirmed?
|
||||
export const MESSAGE_SUB = 1
|
||||
export const MESSAGE_CONFIRMATION = 2
|
||||
export const MESSAGE_SUB_CONF = 3
|
||||
export const MESSAGE_HOST_UNCONFIRMED_BY_CLIENT = 4
|
||||
export const MESSAGE_CONFIRMED_BY_HOST = 5
|
||||
|
||||
/**
|
||||
* @param {any} ydb YdbClient instance
|
||||
* @param {ArrayBuffer} message
|
||||
*/
|
||||
export const readMessage = (ydb, message) => {
|
||||
const t = idbactions.createTransaction(ydb.db)
|
||||
const decoder = decoding.createDecoder(message)
|
||||
while (decoding.hasContent(decoder)) {
|
||||
switch (decoding.readVarUint(decoder)) {
|
||||
case MESSAGE_UPDATE: {
|
||||
const offset = decoding.readVarUint(decoder)
|
||||
const room = decoding.readVarString(decoder)
|
||||
const update = decoding.readPayload(decoder)
|
||||
logging.log(`Received Update. room "${room}", offset ${offset}`)
|
||||
idbactions.writeHostUnconfirmed(t, room, offset, update)
|
||||
bc.publish(room, update)
|
||||
break
|
||||
}
|
||||
case MESSAGE_SUB_CONF: {
|
||||
const nSubs = decoding.readVarUint(decoder)
|
||||
for (let i = 0; i < nSubs; i++) {
|
||||
const room = decoding.readVarString(decoder)
|
||||
const offset = decoding.readVarUint(decoder)
|
||||
const roomsid = decoding.readVarUint(decoder) // TODO: SID
|
||||
// logging.log(`Received Sub Conf. room "${room}", offset ${offset}, roomsid ${roomsid}`)
|
||||
idbactions.confirmSubscription(t, room, roomsid, offset)
|
||||
}
|
||||
break
|
||||
}
|
||||
case MESSAGE_CONFIRMATION: {
|
||||
const room = decoding.readVarString(decoder)
|
||||
const offset = decoding.readVarUint(decoder)
|
||||
logging.log(`Received Confirmation. room "${room}", offset ${offset}`)
|
||||
idbactions.writeConfirmedByHost(t, room, offset)
|
||||
break
|
||||
}
|
||||
case MESSAGE_HOST_UNCONFIRMED_BY_CLIENT: {
|
||||
const clientConf = decoding.readVarUint(decoder)
|
||||
const offset = decoding.readVarUint(decoder)
|
||||
logging.log(`Received HostUnconfirmedByClient. clientConf "${clientConf}", offset ${offset}`)
|
||||
idbactions.writeHostUnconfirmedByClient(t, clientConf, offset)
|
||||
break
|
||||
}
|
||||
case MESSAGE_CONFIRMED_BY_HOST: {
|
||||
const room = decoding.readVarString(decoder)
|
||||
const offset = decoding.readVarUint(decoder)
|
||||
logging.log(`Received Confirmation By Host. room "${room}", offset ${offset}`)
|
||||
idbactions.writeConfirmedByHost(t, room, offset)
|
||||
break
|
||||
}
|
||||
default:
|
||||
logging.fail(`Unexpected message type`)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} room
|
||||
* @param {ArrayBuffer} update
|
||||
* @param {number} clientConf
|
||||
* @return {ArrayBuffer}
|
||||
*/
|
||||
export const createUpdate = (room, update, clientConf) => {
|
||||
const encoder = encoding.createEncoder()
|
||||
encoding.writeVarUint(encoder, MESSAGE_UPDATE)
|
||||
encoding.writeVarUint(encoder, clientConf)
|
||||
encoding.writeVarString(encoder, room)
|
||||
encoding.writePayload(encoder, update)
|
||||
return encoding.toBuffer(encoder)
|
||||
}
|
||||
|
||||
/**
|
||||
* @typedef SubDef
|
||||
* @type {Object}
|
||||
* @property {string} room
|
||||
* @property {number} offset
|
||||
*/
|
||||
|
||||
/**
|
||||
* @param {Array<SubDef>} rooms
|
||||
* @return {ArrayBuffer}
|
||||
*/
|
||||
export const createSub = rooms => {
|
||||
const encoder = encoding.createEncoder()
|
||||
encoding.writeVarUint(encoder, MESSAGE_SUB)
|
||||
encoding.writeVarUint(encoder, rooms.length)
|
||||
for (let i = 0; i < rooms.length; i++) {
|
||||
encoding.writeVarString(encoder, rooms[i].room)
|
||||
encoding.writeVarUint(encoder, rooms[i].offset)
|
||||
}
|
||||
return encoding.toBuffer(encoder)
|
||||
}
|
||||
Reference in New Issue
Block a user