merge experimental-connectors

This commit is contained in:
Kevin Jahns
2018-10-08 17:08:20 +02:00
committed by Kevin
41 changed files with 2047 additions and 2929 deletions

View File

@@ -0,0 +1,56 @@
import Binding from '../Binding.js'
import simpleDiff from '../../Util/simpleDiff.js'
import { getRelativePosition, fromRelativePosition } from '../../Util/relativePosition.js'
function typeObserver () {
this._mutualExclude(() => {
const textarea = this.target
const textType = this.type
const relativeStart = getRelativePosition(textType, textarea.selectionStart)
const relativeEnd = getRelativePosition(textType, textarea.selectionEnd)
textarea.value = textType.toString()
const start = fromRelativePosition(textType._y, relativeStart)
const end = fromRelativePosition(textType._y, relativeEnd)
textarea.setSelectionRange(start, end)
})
}
function domObserver () {
this._mutualExclude(() => {
let diff = simpleDiff(this.type.toString(), this.target.value)
this.type.delete(diff.pos, diff.remove)
this.type.insert(diff.pos, diff.insert)
})
}
/**
* A binding that binds a YText to a dom textarea.
*
* This binding is automatically destroyed when its parent is deleted.
*
* @example
* const textare = document.createElement('textarea')
* const type = y.define('textarea', Y.Text)
* const binding = new Y.QuillBinding(type, textarea)
*
*/
export default class TextareaBinding extends Binding {
constructor (textType, domTextarea) {
// Binding handles textType as this.type and domTextarea as this.target
super(textType, domTextarea)
// set initial value
domTextarea.value = textType.toString()
// Observers are handled by this class
this._typeObserver = typeObserver.bind(this)
this._domObserver = domObserver.bind(this)
textType.observe(this._typeObserver)
domTextarea.addEventListener('input', this._domObserver)
}
destroy () {
// Remove everything that is handled by this class
this.type.unobserve(this._typeObserver)
this.target.unobserve(this._domObserver)
super.destroy()
}
}

View File

@@ -0,0 +1,140 @@
import BinaryEncoder from '../../Util/Binary/Encoder.js'
/* global WebSocket */
import NamedEventHandler from '../../Util/NamedEventHandler.js'
import decodeMessage, { messageSS, messageSubscribe, messageStructs } from './decodeMessage.js'
import { createMutualExclude } from '../../Util/mutualExclude.js'
import { messageCheckUpdateCounter } from './decodeMessage.js'
export const STATE_DISCONNECTED = 0
export const STATE_CONNECTED = 1
export default class WebsocketsConnector extends NamedEventHandler {
constructor (url = 'ws://localhost:1234') {
super()
this.url = url
this._state = STATE_DISCONNECTED
this._socket = null
this._rooms = new Map()
this._connectToServer = true
this._reconnectTimeout = 300
this._mutualExclude = createMutualExclude()
this._persistence = null
this.connect()
}
getRoom (roomName) {
return this._rooms.get(roomName) || { y: null, roomName, localUpdateCounter: 1 }
}
syncPersistence (persistence) {
this._persistence = persistence
if (this._state === STATE_CONNECTED) {
persistence.getAllDocuments().then(docs => {
const encoder = new BinaryEncoder()
docs.forEach(doc => {
messageCheckUpdateCounter(doc.roomName, encoder, doc.remoteUpdateCounter)
});
this.send(encoder)
})
}
}
connectY (roomName, y) {
let room = this._rooms.get(roomName)
if (room !== undefined) {
throw new Error('Room is already taken! There can be only one Yjs instance per roomName!')
}
this._rooms.set(roomName, {
roomName,
y,
localUpdateCounter: 1
})
y.on('afterTransaction', (y, transaction) => {
this._mutualExclude(() => {
if (transaction.encodedStructsLen > 0) {
const encoder = new BinaryEncoder()
const room = this._rooms.get(roomName)
messageStructs(roomName, y, encoder, transaction.encodedStructs, ++room.localUpdateCounter)
this.send(encoder)
}
})
})
if (this._state === STATE_CONNECTED) {
const encoder = new BinaryEncoder()
messageSS(roomName, y, encoder)
messageSubscribe(roomName, y, encoder)
this.send(encoder)
}
}
_setState (state) {
this._state = state
this.emit('stateChanged', {
state: this.state
})
}
get state () {
return this._state === STATE_DISCONNECTED ? 'disconnected' : 'connected'
}
_onOpen () {
this._setState(STATE_CONNECTED)
if (this._persistence === null) {
const encoder = new BinaryEncoder()
for (const [roomName, room] of this._rooms) {
const y = room.y
messageSS(roomName, y, encoder)
messageSubscribe(roomName, y, encoder)
}
this.send(encoder)
} else {
this.syncPersistence(this._persistence)
}
}
send (encoder) {
if (encoder.length > 0 && this._socket.readyState === WebSocket.OPEN) {
this._socket.send(encoder.createBuffer())
}
}
_onClose () {
this._setState(STATE_DISCONNECTED)
this._socket = null
if (this._connectToServer) {
setTimeout(() => {
if (this._connectToServer) {
this.connect()
}
}, this._reconnectTimeout)
this.connect()
}
}
_onMessage (message) {
if (message.data.byteLength > 0) {
const reply = decodeMessage(this, message.data, null, false, this._persistence)
this.send(reply)
}
}
disconnect (code = 1000, reason = 'Client manually disconnected') {
const socket = this._socket
this._connectToServer = false
socket.close(code, reason)
}
connect () {
if (this._socket === null) {
const socket = new WebSocket(this.url)
socket.binaryType = 'arraybuffer'
this._socket = socket
this._connectToServer = true
// Connection opened
socket.addEventListener('open', this._onOpen.bind(this))
socket.addEventListener('close', this._onClose.bind(this))
socket.addEventListener('message', this._onMessage.bind(this))
}
}
}

View File

@@ -0,0 +1,159 @@
import BinaryDecoder from '../../Util/Binary/Decoder.js'
import BinaryEncoder from '../../Util/Binary/Encoder.js'
import { readStateSet, writeStateSet } from '../../MessageHandler/stateSet.js'
import { writeStructs } from '../../MessageHandler/syncStep1.js'
import { writeDeleteSet, readDeleteSet } from '../../MessageHandler/deleteSet.js'
import { integrateRemoteStructs } from '../../MessageHandler/integrateRemoteStructs.js'
const CONTENT_GET_SS = 4
export function messageGetSS (roomName, y, encoder) {
encoder.writeVarString(roomName)
encoder.writeVarUint(CONTENT_GET_SS)
}
const CONTENT_SUBSCRIBE = 3
export function messageSubscribe (roomName, y, encoder) {
encoder.writeVarString(roomName)
encoder.writeVarUint(CONTENT_SUBSCRIBE)
}
const CONTENT_SS = 0
export function messageSS (roomName, y, encoder) {
encoder.writeVarString(roomName)
encoder.writeVarUint(CONTENT_SS)
writeStateSet(y, encoder)
}
const CONTENT_STRUCTS_DSS = 2
export function messageStructsDSS (roomName, y, encoder, ss, updateCounter) {
encoder.writeVarString(roomName)
encoder.writeVarUint(CONTENT_STRUCTS_DSS)
encoder.writeVarUint(updateCounter)
const structsDS = new BinaryEncoder()
writeStructs(y, structsDS, ss)
writeDeleteSet(y, structsDS)
encoder.writeVarUint(structsDS.length)
encoder.writeBinaryEncoder(structsDS)
}
const CONTENT_STRUCTS = 5
export function messageStructs (roomName, y, encoder, structsBinaryEncoder, updateCounter) {
encoder.writeVarString(roomName)
encoder.writeVarUint(CONTENT_STRUCTS)
encoder.writeVarUint(updateCounter)
encoder.writeVarUint(structsBinaryEncoder.length)
encoder.writeBinaryEncoder(structsBinaryEncoder)
}
const CONTENT_CHECK_COUNTER = 6
export function messageCheckUpdateCounter (roomName, encoder, updateCounter = 0) {
encoder.writeVarString(roomName)
encoder.writeVarUint(CONTENT_CHECK_COUNTER)
encoder.writeVarUint(updateCounter)
}
/**
* Decodes a client-message.
*
* A client-message consists of multiple message-elements that are concatenated without delimiter.
* Each has the following structure:
* - roomName
* - content_type
* - content (additional info that is encoded based on the value of content_type)
*
* The message is encoded until no more message-elements are available.
*
* @param {*} connector The connector that handles the connections
* @param {*} message The binary encoded message
* @param {*} ws The connection object
*/
export default function decodeMessage (connector, message, ws, isServer = false, persistence) {
const decoder = new BinaryDecoder(message)
const encoder = new BinaryEncoder()
while (decoder.hasContent()) {
const roomName = decoder.readVarString()
const contentType = decoder.readVarUint()
const room = connector.getRoom(roomName)
const y = room.y
switch (contentType) {
case CONTENT_CHECK_COUNTER:
const updateCounter = decoder.readVarUint()
if (room.localUpdateCounter !== updateCounter) {
messageGetSS(roomName, y, encoder)
}
connector.subscribe(roomName, ws)
break
case CONTENT_STRUCTS:
console.log(`${roomName}: received update`)
connector._mutualExclude(() => {
const remoteUpdateCounter = decoder.readVarUint()
persistence.setRemoteUpdateCounter(roomName, remoteUpdateCounter)
const messageLen = decoder.readVarUint()
if (y === null) {
persistence._persistStructs(roomName, decoder.readArrayBuffer(messageLen))
} else {
y.transact(() => {
integrateRemoteStructs(y, decoder)
}, true)
}
})
break
case CONTENT_GET_SS:
if (y !== null) {
messageSS(roomName, y, encoder)
} else {
persistence._createYInstance(roomName).then(y => {
const encoder = new BinaryEncoder()
messageSS(roomName, y, encoder)
connector.send(encoder, ws)
})
}
break
case CONTENT_SUBSCRIBE:
connector.subscribe(roomName, ws)
break
case CONTENT_SS:
// received state set
// reply with missing content
const ss = readStateSet(decoder)
const sendStructsDSS = () => {
if (y !== null) { // TODO: how to sync local content?
const encoder = new BinaryEncoder()
messageStructsDSS(roomName, y, encoder, ss, room.localUpdateCounter) // room.localUpdateHandler in case it changes
if (isServer) {
messageSS(roomName, y, encoder)
}
connector.send(encoder, ws)
}
}
if (room.persistenceLoaded !== undefined) {
room.persistenceLoaded.then(sendStructsDSS)
} else {
sendStructsDSS()
}
break
case CONTENT_STRUCTS_DSS:
console.log(`${roomName}: synced`)
connector._mutualExclude(() => {
const remoteUpdateCounter = decoder.readVarUint()
persistence.setRemoteUpdateCounter(roomName, remoteUpdateCounter)
const messageLen = decoder.readVarUint()
if (y === null) {
persistence._persistStructsDS(roomName, decoder.readArrayBuffer(messageLen))
} else {
y.transact(() => {
integrateRemoteStructs(y, decoder)
readDeleteSet(y, decoder)
}, true)
}
})
break
default:
console.error('Unexpected content type!')
if (ws !== null) {
ws.close() // TODO: specify reason
}
}
}
return encoder
}

View File

@@ -0,0 +1,124 @@
import Y from '../../Y.js'
import uws from 'uws'
import BinaryEncoder from '../../Util/Binary/Encoder.js'
import decodeMessage, { messageStructs } from './decodeMessage.js'
import FilePersistence from '../../Persistences/FilePersistence.js'
const WebsocketsServer = uws.Server
const persistence = new FilePersistence('.yjsPersisted')
/**
* Maps from room-name to ..
* {
* connections, // Set of ws-clients that listen to the room
* y // Yjs instance that handles the room
* }
*/
const rooms = new Map()
/**
* Maps from ws-connection to Set<roomName> - the set of connected roomNames
*/
const connections = new Map()
const port = process.env.PORT || 1234
const wss = new WebsocketsServer({
port,
perMessageDeflate: {}
})
/**
* Set of room names that are scheduled to be sweeped (destroyed because they don't have a connection anymore)
*/
const scheduledSweeps = new Set()
/* TODO: enable sweeping
setInterval(function sweepRoomes () {
scheduledSweeps.forEach(roomName => {
const room = rooms.get(roomName)
if (room !== undefined) {
if (room.connections.size === 0) {
persistence.saveState(roomName, room.y).then(() => {
if (room.connections.size === 0) {
room.y.destroy()
rooms.delete(roomName)
}
})
}
}
})
scheduledSweeps.clear()
}, 5000) */
const wsConnector = {
send: (encoder, ws) => {
const message = encoder.createBuffer()
ws.send(message, null, null, true)
},
_mutualExclude: f => { f() },
subscribe: function subscribe (roomName, ws) {
let roomNames = connections.get(ws)
if (roomNames === undefined) {
roomNames = new Set()
connections.set(ws, roomNames)
}
roomNames.add(roomName)
const room = this.getRoom(roomName)
room.connections.add(ws)
},
getRoom: function getRoom (roomName) {
let room = rooms.get(roomName)
if (room === undefined) {
const y = new Y(roomName, null, null, { gc: true })
const persistenceLoaded = persistence.readState(roomName, y)
room = {
name: roomName,
connections: new Set(),
y,
persistenceLoaded,
localUpdateCounter: 1
}
y.on('afterTransaction', (y, transaction) => {
if (transaction.encodedStructsLen > 0) {
// save to persistence
persistence.saveUpdate(roomName, y, transaction.encodedStructs)
// forward update to clients
persistence._mutex(() => { // do not broadcast if persistence.readState is called
const encoder = new BinaryEncoder()
messageStructs(roomName, y, encoder, transaction.encodedStructs, ++room.localUpdateCounter)
const message = encoder.createBuffer()
// when changed, broakcast update to all connections
room.connections.forEach(conn => {
conn.send(message, null, null, true)
})
})
}
})
rooms.set(roomName, room)
}
return room
}
}
wss.on('connection', (ws) => {
ws.on('message', function onWSMessage (message) {
if (message.byteLength > 0) {
const reply = decodeMessage(wsConnector, message, ws, true, persistence)
if (reply.length > 0) {
ws.send(reply.createBuffer(), null, null, true)
}
}
})
ws.on('close', function onWSClose () {
const roomNames = connections.get(ws)
if (roomNames !== undefined) {
roomNames.forEach(roomName => {
const room = rooms.get(roomName)
if (room !== undefined) {
const connections = room.connections
connections.delete(ws)
if (connections.size === 0) {
scheduledSweeps.add(roomName)
}
}
})
connections.delete(ws)
}
})
})

View File

@@ -0,0 +1,2 @@
export default class AbstractPersistence {}

View File

@@ -0,0 +1,72 @@
import fs from 'fs'
import path from 'path'
import BinaryDecoder from '../Util/Binary/Decoder.js'
import BinaryEncoder from '../Util/Binary/Encoder.js'
import { createMutualExclude } from '../Util/mutualExclude.js'
import { encodeUpdate, encodeStructsDS, decodePersisted } from './decodePersisted.js'
function createFilePath (persistence, roomName) {
// TODO: filename checking!
return path.join(persistence.dir, roomName)
}
export default class FilePersistence {
constructor (dir) {
this.dir = dir
this._mutex = createMutualExclude()
}
setRemoteUpdateCounter (roomName, remoteUpdateCounter) {
// TODO: implement
// nop
}
saveUpdate (room, y, encodedStructs) {
return new Promise((resolve, reject) => {
this._mutex(() => {
const filePath = createFilePath(this, room)
const updateMessage = new BinaryEncoder()
encodeUpdate(y, encodedStructs, updateMessage)
fs.appendFile(filePath, Buffer.from(updateMessage.createBuffer()), (err) => {
if (err !== null) {
reject(err)
} else {
resolve()
}
})
}, resolve)
})
}
saveState (roomName, y) {
return new Promise((resolve, reject) => {
const encoder = new BinaryEncoder()
encodeStructsDS(y, encoder)
const filePath = createFilePath(this, roomName)
fs.writeFile(filePath, Buffer.from(encoder.createBuffer()), (err) => {
if (err !== null) {
reject(err)
} else {
resolve()
}
})
})
}
readState (roomName, y) {
// Check if the file exists in the current directory.
return new Promise((resolve, reject) => {
const filePath = path.join(this.dir, roomName)
fs.readFile(filePath, (err, data) => {
if (err !== null) {
resolve()
// reject(err)
} else {
this._mutex(() => {
console.info(`unpacking data (${data.length})`)
console.time('unpacking')
decodePersisted(y, new BinaryDecoder(data))
console.timeEnd('unpacking')
})
resolve()
}
})
})
}
}

View File

@@ -0,0 +1,283 @@
/* global indexedDB, location, BroadcastChannel */
import Y from '../Y.js'
import { createMutualExclude } from '../Util/mutualExclude.js'
import { decodePersisted, encodeStructsDS, encodeUpdate } from './decodePersisted.js'
import BinaryDecoder from '../Util/Binary/Decoder.js'
import BinaryEncoder from '../Util/Binary/Encoder.js'
import { PERSIST_STRUCTS_DS } from './decodePersisted.js';
import { PERSIST_UPDATE } from './decodePersisted.js';
/*
* Request to Promise transformer
*/
function rtop (request) {
return new Promise(function (resolve, reject) {
request.onerror = function (event) {
reject(new Error(event.target.error))
}
request.onblocked = function () {
location.reload()
}
request.onsuccess = function (event) {
resolve(event.target.result)
}
})
}
function openDB (room) {
return new Promise(function (resolve, reject) {
let request = indexedDB.open(room)
request.onupgradeneeded = function (event) {
const db = event.target.result
if (db.objectStoreNames.contains('updates')) {
db.deleteObjectStore('updates')
}
db.createObjectStore('updates', {autoIncrement: true})
}
request.onerror = function (event) {
reject(new Error(event.target.error))
}
request.onblocked = function () {
location.reload()
}
request.onsuccess = function (event) {
const db = event.target.result
db.onversionchange = function () { db.close() }
resolve(db)
}
})
}
function persist (room) {
let t = room.db.transaction(['updates'], 'readwrite')
let updatesStore = t.objectStore('updates')
return rtop(updatesStore.getAll())
.then(updates => {
// apply all previous updates before deleting them
room.mutex(() => {
updates.forEach(update => {
decodePersisted(y, new BinaryDecoder(update))
})
})
const encoder = new BinaryEncoder()
encodeStructsDS(y, encoder)
// delete all pending updates
rtop(updatesStore.clear()).then(() => {
// write current model
updatesStore.put(encoder.createBuffer())
})
})
}
function saveUpdate (room, updateBuffer) {
const db = room.db
if (db !== null) {
const t = db.transaction(['updates'], 'readwrite')
const updatesStore = t.objectStore('updates')
const updatePut = rtop(updatesStore.put(updateBuffer))
rtop(updatesStore.count()).then(cnt => {
if (cnt >= PREFERRED_TRIM_SIZE) {
persist(room)
}
})
return updatePut
}
}
function registerRoomInPersistence (documentsDB, roomName) {
return documentsDB.then(
db => Promise.all([
db,
rtop(db.transaction(['documents'], 'readonly').objectStore('documents').get(roomName))
])
).then(
([db, doc]) => {
if (doc === undefined) {
return rtop(db.transaction(['documents'], 'readwrite').objectStore('documents').add({ roomName, serverUpdateCounter: 0 }))
}
}
)
}
const PREFERRED_TRIM_SIZE = 400
export default class IndexedDBPersistence {
constructor () {
this._rooms = new Map()
this._documentsDB = new Promise(function (resolve, reject) {
let request = indexedDB.open('_yjs_documents')
request.onupgradeneeded = function (event) {
const db = event.target.result
if (db.objectStoreNames.contains('documents')) {
db.deleteObjectStore('documents')
}
db.createObjectStore('documents', { keyPath: "roomName" })
}
request.onerror = function (event) {
reject(new Error(event.target.error))
}
request.onblocked = function () {
location.reload()
}
request.onsuccess = function (event) {
const db = event.target.result
db.onversionchange = function () { db.close() }
resolve(db)
}
})
addEventListener('unload', () => {
// close everything when page unloads
this._rooms.forEach(room => {
if (room.db !== null) {
room.db.close()
} else {
room.dbPromise.then(db => db.close())
}
})
this._documentsDB.then(db => db.close())
})
}
getAllDocuments () {
return this._documentsDB.then(
db => rtop(db.transaction(['documents'], 'readonly').objectStore('documents').getAll())
)
}
setRemoteUpdateCounter (roomName, remoteUpdateCounter) {
this._documentsDB.then(
db => rtop(db.transaction(['documents'], 'readwrite').objectStore('documents').put({ roomName, remoteUpdateCounter }))
)
}
_createYInstance (roomName) {
const room = this._rooms.get(roomName)
if (room !== undefined) {
return room.y
}
const y = new Y()
return openDB(roomName).then(
db => rtop(db.transaction(['updates'], 'readonly').objectStore('updates').getAll())
).then(
updates =>
y.transact(() => {
updates.forEach(update => {
decodePersisted(y, new BinaryDecoder(update))
})
}, true)
).then(() => Promise.resolve(y))
}
_persistStructsDS (roomName, structsDS) {
const encoder = new BinaryEncoder()
encoder.writeVarUint(PERSIST_STRUCTS_DS)
encoder.writeArrayBuffer(structsDS)
return openDB(roomName).then(db => {
const t = db.transaction(['updates'], 'readwrite')
const updatesStore = t.objectStore('updates')
return rtop(updatesStore.put(encoder.createBuffer()))
})
}
_persistStructs (roomName, structs) {
const encoder = new BinaryEncoder()
encoder.writeVarUint(PERSIST_UPDATE)
encoder.writeArrayBuffer(structs)
return openDB(roomName).then(db => {
const t = db.transaction(['updates'], 'readwrite')
const updatesStore = t.objectStore('updates')
return rtop(updatesStore.put(encoder.createBuffer()))
})
}
connectY (roomName, y) {
if (this._rooms.has(roomName)) {
throw new Error('A Y instance is already bound to this room!')
}
let room = {
db: null,
dbPromise: null,
channel: null,
mutex: createMutualExclude(),
y
}
if (typeof BroadcastChannel !== 'undefined') {
room.channel = new BroadcastChannel('__yjs__' + roomName)
room.channel.addEventListener('message', e => {
room.mutex(function () {
decodePersisted(y, new BinaryDecoder(e.data))
})
})
}
y.on('destroyed', () => {
this.disconnectY(roomName, y)
})
y.on('afterTransaction', (y, transaction) => {
room.mutex(() => {
if (transaction.encodedStructsLen > 0) {
const encoder = new BinaryEncoder()
const update = new BinaryEncoder()
encodeUpdate(y, transaction.encodedStructs, update)
const updateBuffer = update.createBuffer()
if (room.channel !== null) {
room.channel.postMessage(updateBuffer)
}
if (transaction.encodedStructsLen > 0) {
if (room.db !== null) {
saveUpdate(room, updateBuffer)
}
}
}
})
})
// register document in documentsDB
this._documentsDB.then(
db =>
rtop(db.transaction(['documents'], 'readonly').objectStore('documents').get(roomName))
.then(
doc => doc === undefined && rtop(db.transaction(['documents'], 'readwrite').objectStore('documents').add({ roomName, serverUpdateCounter: -1 }))
)
)
// open room db and read existing data
return room.dbPromise = openDB(roomName)
.then(db => {
room.db = db
const t = room.db.transaction(['updates'], 'readwrite')
const updatesStore = t.objectStore('updates')
// write current state as update
const encoder = new BinaryEncoder()
encodeStructsDS(y, encoder)
return rtop(updatesStore.put(encoder.createBuffer())).then(() => {
// read persisted state
return rtop(updatesStore.getAll()).then(updates => {
room.mutex(() => {
y.transact(() => {
updates.forEach(update => {
decodePersisted(y, new BinaryDecoder(update))
})
}, true)
})
})
})
})
}
disconnectY (roomName) {
const {
db, channel
} = this._rooms.get(roomName)
db.close()
if (channel !== null) {
channel.close()
}
this._rooms.delete(roomName)
}
/**
* Remove all persisted data that belongs to a room.
* Automatically destroys all Yjs all Yjs instances that persist to
* the room. If `destroyYjsInstances = false` the persistence functionality
* will be removed from the Yjs instances.
*/
removePersistedData (roomName, destroyYjsInstances = true) {
this.disconnectY(roomName)
return rtop(indexedDB.deleteDatabase(roomName))
}
}

View File

@@ -0,0 +1,51 @@
import { integrateRemoteStructs } from '../MessageHandler/integrateRemoteStructs.js'
import { writeStructs } from '../MessageHandler/syncStep1.js'
import { writeDeleteSet, readDeleteSet } from '../MessageHandler/deleteSet.js'
export const PERSIST_UPDATE = 0
/**
* Write an update to an encoder.
*
* @param {Yjs} y A Yjs instance
* @param {BinaryEncoder} updateEncoder I.e. transaction.encodedStructs
*/
export function encodeUpdate (y, updateEncoder, encoder) {
encoder.writeVarUint(PERSIST_UPDATE)
encoder.writeBinaryEncoder(updateEncoder)
}
export const PERSIST_STRUCTS_DS = 1
/**
* Write the current Yjs data model to an encoder.
*
* @param {Yjs} y A Yjs instance
* @param {BinaryEncoder} encoder An encoder to write to
*/
export function encodeStructsDS (y, encoder) {
encoder.writeVarUint(PERSIST_STRUCTS_DS)
writeStructs(y, encoder, new Map())
writeDeleteSet(y, encoder)
}
/**
* Feed the Yjs instance with the persisted state
* @param {Yjs} y A Yjs instance.
* @param {BinaryDecoder} decoder A Decoder instance that holds the file content.
*/
export function decodePersisted (y, decoder) {
y.transact(() => {
while (decoder.hasContent()) {
const contentType = decoder.readVarUint()
switch (contentType) {
case PERSIST_UPDATE:
integrateRemoteStructs(y, decoder)
break
case PERSIST_STRUCTS_DS:
integrateRemoteStructs(y, decoder)
readDeleteSet(y, decoder)
break
}
}
}, true)
}

View File

@@ -1,6 +1,7 @@
import { getStructReference } from '../Util/structReferences.js'
import ID from '../Util/ID/ID.js'
import { logID } from '../MessageHandler/messageToString.js'
import { writeStructToTransaction } from '../Transaction.js'
/**
* @private
@@ -108,6 +109,7 @@ export default class Delete {
if (y.persistence !== null) {
y.persistence.saveStruct(y, this)
}
writeStructToTransaction(y._transaction, this)
}
/**

View File

@@ -1,6 +1,7 @@
import { getStructReference } from '../Util/structReferences.js'
import { RootFakeUserID } from '../Util/ID/RootID.js'
import ID from '../Util/ID/ID.js'
import { writeStructToTransaction } from '../Transaction.js'
// TODO should have the same base class as Item
export default class GC {
@@ -43,6 +44,7 @@ export default class GC {
if (y.persistence !== null) {
y.persistence.saveStruct(y, this)
}
writeStructToTransaction(y._transaction, this)
}
}

View File

@@ -2,7 +2,7 @@ import { getStructReference } from '../Util/structReferences.js'
import ID from '../Util/ID/ID.js'
import { default as RootID, RootFakeUserID } from '../Util/ID/RootID.js'
import Delete from './Delete.js'
import { transactionTypeChanged } from '../Transaction.js'
import { transactionTypeChanged, writeStructToTransaction } from '../Transaction.js'
import GC from './GC.js'
/**
@@ -397,6 +397,7 @@ export default class Item {
if (y.persistence !== null) {
y.persistence.saveStruct(y, this)
}
writeStructToTransaction(y._transaction, this)
}
}

View File

@@ -1,4 +1,4 @@
import { default as Item } from './Item.js'
import Item from './Item.js'
import { logItemHelper } from '../MessageHandler/messageToString.js'
export default class ItemEmbed extends Item {

View File

@@ -1,4 +1,4 @@
import { default as Item } from './Item.js'
import Item from './Item.js'
import { logItemHelper } from '../MessageHandler/messageToString.js'
export default class ItemFormat extends Item {

View File

@@ -1,4 +1,4 @@
import { splitHelper, default as Item } from './Item.js'
import Item, { splitHelper } from './Item.js'
import { logItemHelper } from '../MessageHandler/messageToString.js'
export default class ItemJSON extends Item {

View File

@@ -1,4 +1,4 @@
import { splitHelper, default as Item } from './Item.js'
import Item, { splitHelper } from './Item.js'
import { logItemHelper } from '../MessageHandler/messageToString.js'
export default class ItemString extends Item {

View File

@@ -1,3 +1,4 @@
import BinaryEncoder from './Util/Binary/Encoder.js'
/**
* A transaction is created for every change on the Yjs model. It is possible
@@ -58,7 +59,19 @@ export default class Transaction {
* @type {Map<YType,Array<YEvent>>}
*/
this.changedParentTypes = new Map()
this.encodedStructsLen = 0
this._encodedStructs = new BinaryEncoder()
this._encodedStructs.writeUint32(0)
}
get encodedStructs () {
this._encodedStructs.setUint32(0, this.encodedStructsLen)
return this._encodedStructs
}
}
export function writeStructToTransaction (transaction, struct) {
transaction.encodedStructsLen++
struct._toBinary(transaction._encodedStructs)
}
/**

View File

@@ -1,5 +1,5 @@
import Type from '../../Struct/Type.js'
import Item from '../../Struct/Item.js'
import Type from '../../Struct/Type.js'
import ItemJSON from '../../Struct/ItemJSON.js'
import { logItemHelper } from '../../MessageHandler/messageToString.js'
import YEvent from '../../Util/YEvent.js'

View File

@@ -1,5 +1,5 @@
import ItemString from '../../Struct/ItemString.js'
import ItemEmbed from '../../Struct/ItemEmbed.js'
import ItemString from '../../Struct/ItemString.js'
import ItemFormat from '../../Struct/ItemFormat.js'
import { logItemHelper } from '../../MessageHandler/messageToString.js'
import { YArrayEvent, default as YArray } from '../YArray/YArray.js'

View File

@@ -25,6 +25,10 @@ export default class BinaryDecoder {
this.pos = 0
}
hasContent () {
return this.pos !== this.uint8arr.length
}
/**
* Clone this decoder instance.
* Optionally set a new position parameter.
@@ -42,6 +46,18 @@ export default class BinaryDecoder {
return this.uint8arr.length
}
/**
* Read `len` bytes as an ArrayBuffer.
*/
readArrayBuffer (len) {
const arrayBuffer = new Uint8Array(len)
const view = new Uint8Array(this.uint8arr.buffer, this.pos, len)
arrayBuffer.set(view)
this.pos += len
return arrayBuffer.buffer
}
/**
* Skip one byte, jump to the next position.
*/
@@ -109,15 +125,35 @@ export default class BinaryDecoder {
* Read string of variable length
* * varUint is used to store the length of the string
*
* Transforming utf8 to a string is pretty expensive. The code performs 10x better
* when String.fromCodePoint is fed with all characters as arguments.
* But most environments have a maximum number of arguments per functions.
* For effiency reasons we apply a maximum of 10000 characters at once.
*
* @return {String} The read String.
*/
readVarString () {
let len = this.readVarUint()
let bytes = new Array(len)
for (let i = 0; i < len; i++) {
bytes[i] = this.uint8arr[this.pos++]
let remainingLen = this.readVarUint()
let encodedString = ''
let i = 0
while (remainingLen > 0) {
const nextLen = Math.min(remainingLen, 10000)
const bytes = new Array(nextLen)
for (let i = 0; i < nextLen; i++) {
bytes[i] = this.uint8arr[this.pos++]
}
encodedString += String.fromCodePoint.apply(null, bytes)
remainingLen -= nextLen
}
let encodedString = bytes.map(b => String.fromCodePoint(b)).join('')
/*
//let bytes = new Array(len)
for (let i = 0; i < len; i++) {
//bytes[i] = this.uint8arr[this.pos++]
encodedString += String.fromCodePoint(this.uint8arr[this.pos++])
// encodedString += String(this.uint8arr[this.pos++])
}
*/
//let encodedString = String.fromCodePoint.apply(null, bytes)
return decodeURIComponent(escape(encodedString))
}

View File

@@ -10,30 +10,78 @@ export default class BinaryEncoder {
constructor () {
// TODO: implement chained Uint8Array buffers instead of Array buffer
// TODO: Rewrite all methods as functions!
this.data = []
this._currentPos = 0
this._currentBuffer = new Uint8Array(1000)
this._data = []
}
/**
* The current length of the encoded data.
*/
get length () {
return this.data.length
let len = 0
for (let i = 0; i < this._data.length; i++) {
len += this._data[i].length
}
len += this._currentPos
return len
}
/**
* The current write pointer (the same as {@link length}).
*/
get pos () {
return this.data.length
return this.length
}
/**
* Create an ArrayBuffer.
* Transform to ArrayBuffer.
*
* @return {Uint8Array} A Uint8Array that represents the written data.
* @return {ArrayBuffer} The created ArrayBuffer.
*/
createBuffer () {
return Uint8Array.from(this.data).buffer
const len = this.length
const uint8array = new Uint8Array(len)
let curPos = 0
for (let i = 0; i < this._data.length; i++) {
let d = this._data[i]
uint8array.set(d, curPos)
curPos += d.length
}
uint8array.set(new Uint8Array(this._currentBuffer.buffer, 0, this._currentPos), curPos)
return uint8array.buffer
}
/**
* Write one byte to the encoder.
*
* @param {number} num The byte that is to be encoded.
*/
write (num) {
if (this._currentPos === this._currentBuffer.length) {
this._data.push(this._currentBuffer)
this._currentBuffer = new Uint8Array(this._currentBuffer.length * 2)
this._currentPos = 0
}
this._currentBuffer[this._currentPos++] = num
}
set (pos, num) {
let buffer = null
// iterate all buffers and adjust position
for (let i = 0; i < this._data.length && buffer === null; i++) {
const b = this._data[i]
if (pos < b.length) {
buffer = b // found buffer
} else {
pos -= b.length
}
}
if (buffer === null) {
// use current buffer
buffer = this._currentBuffer
}
buffer[pos] = num
}
/**
@@ -42,7 +90,7 @@ export default class BinaryEncoder {
* @param {number} num The number that is to be encoded.
*/
writeUint8 (num) {
this.data.push(num & bits8)
this.write(num & bits8)
}
/**
@@ -52,7 +100,7 @@ export default class BinaryEncoder {
* @param {number} num The number that is to be encoded.
*/
setUint8 (pos, num) {
this.data[pos] = num & bits8
this.set(pos, num & bits8)
}
/**
@@ -61,7 +109,8 @@ export default class BinaryEncoder {
* @param {number} num The number that is to be encoded.
*/
writeUint16 (num) {
this.data.push(num & bits8, (num >>> 8) & bits8)
this.write(num & bits8)
this.write((num >>> 8) & bits8)
}
/**
* Write two bytes as an unsigned integer at a specific location.
@@ -70,8 +119,8 @@ export default class BinaryEncoder {
* @param {number} num The number that is to be encoded.
*/
setUint16 (pos, num) {
this.data[pos] = num & bits8
this.data[pos + 1] = (num >>> 8) & bits8
this.set(pos, num & bits8)
this.set(pos + 1, (num >>> 8) & bits8)
}
/**
@@ -81,7 +130,7 @@ export default class BinaryEncoder {
*/
writeUint32 (num) {
for (let i = 0; i < 4; i++) {
this.data.push(num & bits8)
this.write(num & bits8)
num >>>= 8
}
}
@@ -94,7 +143,7 @@ export default class BinaryEncoder {
*/
setUint32 (pos, num) {
for (let i = 0; i < 4; i++) {
this.data[pos + i] = num & bits8
this.set(pos + i, num & bits8)
num >>>= 8
}
}
@@ -106,10 +155,10 @@ export default class BinaryEncoder {
*/
writeVarUint (num) {
while (num >= 0b10000000) {
this.data.push(0b10000000 | (bits7 & num))
this.write(0b10000000 | (bits7 & num))
num >>>= 7
}
this.data.push(bits7 & num)
this.write(bits7 & num)
}
/**
@@ -118,15 +167,31 @@ export default class BinaryEncoder {
* @param {String} str The string that is to be encoded.
*/
writeVarString (str) {
let encodedString = unescape(encodeURIComponent(str))
let bytes = encodedString.split('').map(c => c.codePointAt())
let len = bytes.length
const encodedString = unescape(encodeURIComponent(str))
const len = encodedString.length
this.writeVarUint(len)
for (let i = 0; i < len; i++) {
this.data.push(bytes[i])
this.write(encodedString.codePointAt(i))
}
}
/**
* Write the content of another binary encoder.
*
* @param encoder The BinaryEncoder to be written.
*/
writeBinaryEncoder (encoder) {
this.writeArrayBuffer(encoder.createBuffer())
}
writeArrayBuffer (arrayBuffer) {
const prevBufferLen = this._currentBuffer.length
this._data.push(new Uint8Array(this._currentBuffer.buffer, 0, this._currentPos))
this._data.push(new Uint8Array(arrayBuffer))
this._currentBuffer = new Uint8Array(prevBufferLen)
this._currentPos = 0
}
/**
* Write an ID at the current position.
*

View File

View File

@@ -1,4 +1,3 @@
// TODO: rename mutex
/**
@@ -19,7 +18,7 @@
*/
export function createMutualExclude () {
var token = true
return function mutualExclude (f) {
return function mutualExclude (f, g) {
if (token) {
token = false
try {
@@ -28,6 +27,8 @@ export function createMutualExclude () {
console.error(e)
}
token = true
} else if (g !== undefined) {
g()
}
}
}

View File

@@ -1,3 +1,10 @@
import Delete from '../Struct/Delete.js'
import ItemJSON from '../Struct/ItemJSON.js'
import ItemString from '../Struct/ItemString.js'
import ItemFormat from '../Struct/ItemFormat.js'
import ItemEmbed from '../Struct/ItemEmbed.js'
import GC from '../Struct/GC.js'
import YArray from '../Types/YArray/YArray.js'
import YMap from '../Types/YMap/YMap.js'
import YText from '../Types/YText/YText.js'
@@ -6,13 +13,6 @@ import YXmlHook from '../Types/YXml/YXmlHook.js'
import YXmlFragment from '../Types/YXml/YXmlFragment.js'
import YXmlElement from '../Types/YXml/YXmlElement.js'
import Delete from '../Struct/Delete.js'
import ItemJSON from '../Struct/ItemJSON.js'
import ItemString from '../Struct/ItemString.js'
import ItemFormat from '../Struct/ItemFormat.js'
import ItemEmbed from '../Struct/ItemEmbed.js'
import GC from '../Struct/GC.js'
const structs = new Map()
const references = new Map()

View File

@@ -3,8 +3,6 @@ import Y from './Y.js'
import UndoManager from './Util/UndoManager.js'
import { integrateRemoteStructs } from './MessageHandler/integrateRemoteStructs.js'
import { messageToString, messageToRoomname } from './MessageHandler/messageToString.js'
import Connector from './Connector.js'
import Persistence from './Persistence.js'
import YArray from './Types/YArray/YArray.js'

View File

@@ -28,7 +28,7 @@ export { default as DomBinding } from './Bindings/DomBinding/DomBinding.js'
* @param {AbstractPersistence} persistence Persistence adapter instance
*/
export default class Y extends NamedEventHandler {
constructor (room, opts, persistence, conf = {}) {
constructor (room, connector, persistence, conf = {}) {
super()
this.gcEnabled = conf.gc || false
/**
@@ -36,6 +36,7 @@ export default class Y extends NamedEventHandler {
* @type {String}
*/
this.room = room
<<<<<<< HEAD:src/Y.js
if (opts != null && opts.connector != null) {
opts.connector.room = room
}
@@ -46,6 +47,10 @@ export default class Y extends NamedEventHandler {
} else {
this.userID = opts.userID
}
=======
this._contentReady = false
this.userID = generateRandomUint32()
>>>>>>> experimental-connectors:src/Y.mjs
// TODO: This should be a Map so we can use encodables as keys
this.share = {}
this.ds = new DeleteStore(this)
@@ -61,10 +66,13 @@ export default class Y extends NamedEventHandler {
this.connector = null
this.connected = false
let initConnection = () => {
if (opts != null) {
this.connector = new Y[opts.connector.name](this, opts.connector)
this.connected = true
this.emit('connectorReady')
if (connector != null) {
if (connector.constructor === Object) {
connector.connector.room = room
this.connector = new Y[connector.connector.name](this, connector.connector)
this.connected = true
this.emit('connectorReady')
}
}
}
/**
@@ -251,6 +259,7 @@ export default class Y extends NamedEventHandler {
* Persisted data will remain until removed by the persistence adapter.
*/
destroy () {
this.emit('destroyed', true)
super.destroy()
this.share = null
if (this.connector != null) {