add client-server updateCounter support to sync all persisted rooms

This commit is contained in:
Kevin Jahns 2018-06-04 17:35:39 +02:00
parent 6f9ae0c4fc
commit fb2f9bc493
11 changed files with 280 additions and 73 deletions

View File

@ -3,6 +3,9 @@
</head>
<script src="./index.mjs" type="module"></script>
</head>
<body contenteditable="true">
<body>
<label for="room">Room: </label>
<input type="text" id="room" name="room">
<div id="content" contenteditable style="position:absolute;top:35px;left:0;right:0;bottom:0;outline: 0px solid transparent;"></div>
</body>
</html>

View File

@ -8,34 +8,52 @@ import YIndexdDBPersistence from '../../src/Persistences/IndexedDBPersistence.mj
const connector = new YWebsocketsConnector()
const persistence = new YIndexdDBPersistence()
const y = new Y('html-editor', null, null, { gc: true })
persistence.connectY('html-editor', y).then(() => {
// connect after persisted content was applied to y
// If we don't wait for persistence, the other peer will send all data, waisting
// network bandwidth..
connector.connectY('html-editor', y)
})
const roomInput = document.querySelector('#room')
let currentRoomName = null
let y = null
let domBinding = null
function setRoomName (roomName) {
if (currentRoomName !== roomName) {
console.log(`change room: "${roomName}"`)
roomInput.value = roomName
currentRoomName = roomName
location.hash = '#' + roomName
if (y !== null) {
domBinding.destroy()
}
const room = connector._rooms.get(roomName)
if (room !== undefined) {
y = room.y
} else {
y = new Y(roomName, null, null, { gc: true })
persistence.connectY(roomName, y).then(() => {
// connect after persisted content was applied to y
// If we don't wait for persistence, the other peer will send all data, waisting
// network bandwidth..
connector.connectY(roomName, y)
})
}
window.y = y
window.yXmlType = y.define('xml', YXmlFragment)
domBinding = new DomBinding(window.yXmlType, document.querySelector('#content'), { scrollingElement: document.scrollingElement })
}
}
connector.syncPersistence(persistence)
window.connector = connector
window.persistence = persistence
window.onload = function () {
window.domBinding = new DomBinding(window.yXmlType, document.body, { scrollingElement: document.scrollingElement })
}
window.y = y
window.yXmlType = y.define('xml', YXmlFragment)
window.undoManager = new UndoManager(window.yXmlType, {
captureTimeout: 500
})
document.onkeydown = function interceptUndoRedo (e) {
if (e.keyCode === 90 && (e.metaKey || e.ctrlKey)) {
if (!e.shiftKey) {
window.undoManager.undo()
} else {
window.undoManager.redo()
}
e.preventDefault()
}
setRoomName((location.hash || '#default').slice(1))
roomInput.addEventListener('input', e => {
const roomName = e.target.value
setRoomName(roomName)
})
}

View File

@ -121,12 +121,11 @@ export default class DomBinding extends Binding {
destroy () {
this.domToType = null
this.typeToDom = null
this.type.unobserve(this._typeObserver)
this.type.unobserveDeep(this._typeObserver)
this._mutationObserver.disconnect()
const y = this.type._y
y.off('beforeTransaction', this._beforeTransactionHandler)
y.off('beforeObserverCalls', this._beforeObserverCallsHandler)
y.off('afterObserverCalls', this._afterObserverCallsHandler)
y.off('afterTransaction', this._afterTransactionHandler)
super.destroy()
}

View File

@ -3,6 +3,7 @@ import BinaryEncoder from '../../Util/Binary/Encoder.mjs'
import NamedEventHandler from '../../Util/NamedEventHandler.mjs'
import decodeMessage, { messageSS, messageSubscribe, messageStructs } from './decodeMessage.mjs'
import { createMutualExclude } from '../../Util/mutualExclude.mjs'
import { messageCheckUpdateCounter } from './decodeMessage.mjs'
export const STATE_DISCONNECTED = 0
export const STATE_CONNECTED = 1
@ -17,11 +18,25 @@ export default class WebsocketsConnector extends NamedEventHandler {
this._connectToServer = true
this._reconnectTimeout = 300
this._mutualExclude = createMutualExclude()
this._persistence = null
this.connect()
}
getRoom (roomName) {
return this._rooms.get(roomName)
return this._rooms.get(roomName) || { y: null, roomName, localUpdateCounter: 1 }
}
syncPersistence (persistence) {
this._persistence = persistence
if (this._state === STATE_CONNECTED) {
persistence.getAllDocuments().then(docs => {
const encoder = new BinaryEncoder()
docs.forEach(doc => {
messageCheckUpdateCounter(doc.roomName, encoder, doc.remoteUpdateCounter)
});
this.send(encoder)
})
}
}
connectY (roomName, y) {
@ -31,13 +46,15 @@ export default class WebsocketsConnector extends NamedEventHandler {
}
this._rooms.set(roomName, {
roomName,
y
y,
localUpdateCounter: 1
})
y.on('afterTransaction', (y, transaction) => {
this._mutualExclude(() => {
if (transaction.encodedStructsLen > 0) {
const encoder = new BinaryEncoder()
messageStructs(roomName, y, encoder, transaction.encodedStructs)
const room = this._rooms.get(roomName)
messageStructs(roomName, y, encoder, transaction.encodedStructs, ++room.localUpdateCounter)
this.send(encoder)
}
})
@ -63,13 +80,17 @@ export default class WebsocketsConnector extends NamedEventHandler {
_onOpen () {
this._setState(STATE_CONNECTED)
const encoder = new BinaryEncoder()
for (const [roomName, room] of this._rooms) {
const y = room.y
messageSS(roomName, y, encoder)
messageSubscribe(roomName, y, encoder)
if (this._persistence === null) {
const encoder = new BinaryEncoder()
for (const [roomName, room] of this._rooms) {
const y = room.y
messageSS(roomName, y, encoder)
messageSubscribe(roomName, y, encoder)
}
this.send(encoder)
} else {
this.syncPersistence(this._persistence)
}
this.send(encoder)
}
send (encoder) {
@ -93,7 +114,7 @@ export default class WebsocketsConnector extends NamedEventHandler {
_onMessage (message) {
if (message.data.byteLength > 0) {
const reply = decodeMessage(this, message.data, null)
const reply = decodeMessage(this, message.data, null, false, this._persistence)
this.send(reply)
}
}

View File

@ -18,9 +18,6 @@ export function messageSubscribe (roomName, y, encoder) {
}
const CONTENT_SS = 0
/**
* Message the current state set. The other side must respond with CONTENT_STRUCTS_DSS
*/
export function messageSS (roomName, y, encoder) {
encoder.writeVarString(roomName)
encoder.writeVarUint(CONTENT_SS)
@ -28,20 +25,33 @@ export function messageSS (roomName, y, encoder) {
}
const CONTENT_STRUCTS_DSS = 2
export function messageStructsDSS (roomName, y, encoder, ss) {
export function messageStructsDSS (roomName, y, encoder, ss, updateCounter) {
encoder.writeVarString(roomName)
encoder.writeVarUint(CONTENT_STRUCTS_DSS)
writeStructs(y, encoder, ss)
writeDeleteSet(y, encoder)
encoder.writeVarUint(updateCounter)
const structsDS = new BinaryEncoder()
writeStructs(y, structsDS, ss)
writeDeleteSet(y, structsDS)
encoder.writeVarUint(structsDS.length)
encoder.writeBinaryEncoder(structsDS)
}
const CONTENT_STRUCTS = 5
export function messageStructs (roomName, y, encoder, structsBinaryEncoder) {
export function messageStructs (roomName, y, encoder, structsBinaryEncoder, updateCounter) {
encoder.writeVarString(roomName)
encoder.writeVarUint(CONTENT_STRUCTS)
encoder.writeVarUint(updateCounter)
encoder.writeVarUint(structsBinaryEncoder.length)
encoder.writeBinaryEncoder(structsBinaryEncoder)
}
const CONTENT_CHECK_COUNTER = 6
export function messageCheckUpdateCounter (roomName, encoder, updateCounter = 0) {
encoder.writeVarString(roomName)
encoder.writeVarUint(CONTENT_CHECK_COUNTER)
encoder.writeVarUint(updateCounter)
}
/**
* Decodes a client-message.
*
@ -57,7 +67,7 @@ export function messageStructs (roomName, y, encoder, structsBinaryEncoder) {
* @param {*} message The binary encoded message
* @param {*} ws The connection object
*/
export default function decodeMessage (connector, message, ws, isServer = false) {
export default function decodeMessage (connector, message, ws, isServer = false, persistence) {
const decoder = new BinaryDecoder(message)
const encoder = new BinaryEncoder()
while (decoder.hasContent()) {
@ -66,15 +76,38 @@ export default function decodeMessage (connector, message, ws, isServer = false)
const room = connector.getRoom(roomName)
const y = room.y
switch (contentType) {
case CONTENT_CHECK_COUNTER:
const updateCounter = decoder.readVarUint()
if (room.localUpdateCounter !== updateCounter) {
messageGetSS(roomName, y, encoder)
}
connector.subscribe(roomName, ws)
break
case CONTENT_STRUCTS:
console.log(`${roomName}: received update`)
connector._mutualExclude(() => {
y.transact(() => {
integrateRemoteStructs(y, decoder)
}, true)
const remoteUpdateCounter = decoder.readVarUint()
persistence.setRemoteUpdateCounter(roomName, remoteUpdateCounter)
const messageLen = decoder.readVarUint()
if (y === null) {
persistence._persistStructs(roomName, decoder.readArrayBuffer(messageLen))
} else {
y.transact(() => {
integrateRemoteStructs(y, decoder)
}, true)
}
})
break
case CONTENT_GET_SS:
messageSS(roomName, y, encoder)
if (y !== null) {
messageSS(roomName, y, encoder)
} else {
persistence._createYInstance(roomName).then(y => {
const encoder = new BinaryEncoder()
messageSS(roomName, y, encoder)
connector.send(encoder, ws)
})
}
break
case CONTENT_SUBSCRIBE:
connector.subscribe(roomName, ws)
@ -84,12 +117,14 @@ export default function decodeMessage (connector, message, ws, isServer = false)
// reply with missing content
const ss = readStateSet(decoder)
const sendStructsDSS = () => {
const encoder = new BinaryEncoder()
messageStructsDSS(roomName, y, encoder, ss)
if (isServer) {
messageSS(roomName, y, encoder)
if (y !== null) { // TODO: how to sync local content?
const encoder = new BinaryEncoder()
messageStructsDSS(roomName, y, encoder, ss, room.localUpdateCounter) // room.localUpdateHandler in case it changes
if (isServer) {
messageSS(roomName, y, encoder)
}
connector.send(encoder, ws)
}
connector.send(encoder, ws)
}
if (room.persistenceLoaded !== undefined) {
room.persistenceLoaded.then(sendStructsDSS)
@ -98,11 +133,19 @@ export default function decodeMessage (connector, message, ws, isServer = false)
}
break
case CONTENT_STRUCTS_DSS:
console.log(`${roomName}: synced`)
connector._mutualExclude(() => {
y.transact(() => {
integrateRemoteStructs(y, decoder)
readDeleteSet(y, decoder)
}, true)
const remoteUpdateCounter = decoder.readVarUint()
persistence.setRemoteUpdateCounter(roomName, remoteUpdateCounter)
const messageLen = decoder.readVarUint()
if (y === null) {
persistence._persistStructsDS(roomName, decoder.readArrayBuffer(messageLen))
} else {
y.transact(() => {
integrateRemoteStructs(y, decoder)
readDeleteSet(y, decoder)
}, true)
}
})
break
default:

View File

@ -28,6 +28,7 @@ const wss = new WebsocketsServer({
* Set of room names that are scheduled to be sweeped (destroyed because they don't have a connection anymore)
*/
const scheduledSweeps = new Set()
/* TODO: enable sweeping
setInterval(function sweepRoomes () {
scheduledSweeps.forEach(roomName => {
const room = rooms.get(roomName)
@ -43,7 +44,7 @@ setInterval(function sweepRoomes () {
}
})
scheduledSweeps.clear()
}, 5000)
}, 5000) */
const wsConnector = {
send: (encoder, ws) => {
@ -66,6 +67,13 @@ const wsConnector = {
if (room === undefined) {
const y = new Y(roomName, null, null, { gc: true })
const persistenceLoaded = persistence.readState(roomName, y)
room = {
name: roomName,
connections: new Set(),
y,
persistenceLoaded,
localUpdateCounter: 1
}
y.on('afterTransaction', (y, transaction) => {
if (transaction.encodedStructsLen > 0) {
// save to persistence
@ -73,7 +81,7 @@ const wsConnector = {
// forward update to clients
persistence._mutex(() => { // do not broadcast if persistence.readState is called
const encoder = new BinaryEncoder()
messageStructs(roomName, y, encoder, transaction.encodedStructs)
messageStructs(roomName, y, encoder, transaction.encodedStructs, ++room.localUpdateCounter)
const message = encoder.createBuffer()
// when changed, broakcast update to all connections
room.connections.forEach(conn => {
@ -82,12 +90,6 @@ const wsConnector = {
})
}
})
room = {
name: roomName,
connections: new Set(),
y,
persistenceLoaded
}
rooms.set(roomName, room)
}
return room
@ -97,7 +99,7 @@ const wsConnector = {
wss.on('connection', (ws) => {
ws.on('message', function onWSMessage (message) {
if (message.byteLength > 0) {
const reply = decodeMessage(wsConnector, message, ws, true)
const reply = decodeMessage(wsConnector, message, ws, true, persistence)
if (reply.length > 0) {
ws.send(reply.createBuffer(), null, null, true)
}

View File

@ -6,6 +6,7 @@ import { createMutualExclude } from '../Util/mutualExclude.mjs'
import { encodeUpdate, encodeStructsDS, decodePersisted } from './decodePersisted.mjs'
function createFilePath (persistence, roomName) {
// TODO: filename checking!
return path.join(persistence.dir, roomName)
}
@ -14,6 +15,10 @@ export default class FilePersistence {
this.dir = dir
this._mutex = createMutualExclude()
}
setRemoteUpdateCounter (roomName, remoteUpdateCounter) {
// TODO: implement
// nop
}
saveUpdate (room, y, encodedStructs) {
return new Promise((resolve, reject) => {
this._mutex(() => {

View File

@ -5,6 +5,8 @@ import { createMutualExclude } from '../Util/mutualExclude.mjs'
import { decodePersisted, encodeStructsDS, encodeUpdate } from './decodePersisted.mjs'
import BinaryDecoder from '../Util/Binary/Decoder.mjs'
import BinaryEncoder from '../Util/Binary/Encoder.mjs'
import { PERSIST_STRUCTS_DS } from './decodePersisted.mjs';
import { PERSIST_UPDATE } from './decodePersisted.mjs';
/*
* Request to Promise transformer
*/
@ -82,21 +84,110 @@ function saveUpdate (room, updateBuffer) {
}
}
function registerRoomInPersistence (documentsDB, roomName) {
return documentsDB.then(
db => Promise.all([
db,
rtop(db.transaction(['documents'], 'readonly').objectStore('documents').get(roomName))
])
).then(
([db, doc]) => {
if (doc === undefined) {
return rtop(db.transaction(['documents'], 'readwrite').objectStore('documents').add({ roomName, serverUpdateCounter: 0 }))
}
}
)
}
const PREFERRED_TRIM_SIZE = 400
export default class IndexedDBPersistence {
constructor () {
this._rooms = new Map()
this._documentsDB = new Promise(function (resolve, reject) {
let request = indexedDB.open('_yjs_documents')
request.onupgradeneeded = function (event) {
const db = event.target.result
if (db.objectStoreNames.contains('documents')) {
db.deleteObjectStore('documents')
}
db.createObjectStore('documents', { keyPath: "roomName" })
}
request.onerror = function (event) {
reject(new Error(event.target.error))
}
request.onblocked = function () {
location.reload()
}
request.onsuccess = function (event) {
const db = event.target.result
db.onversionchange = function () { db.close() }
resolve(db)
}
})
addEventListener('unload', () => {
// close everything when page unloads
this._rooms.forEach(room => {
if (room.db !== null) {
room.db.close()
} else {
room._db.then(db => db.close())
room.dbPromise.then(db => db.close())
}
})
this._documentsDB.then(db => db.close())
})
}
getAllDocuments () {
return this._documentsDB.then(
db => rtop(db.transaction(['documents'], 'readonly').objectStore('documents').getAll())
)
}
setRemoteUpdateCounter (roomName, remoteUpdateCounter) {
this._documentsDB.then(
db => rtop(db.transaction(['documents'], 'readwrite').objectStore('documents').put({ roomName, remoteUpdateCounter }))
)
}
_createYInstance (roomName) {
const room = this._rooms.get(roomName)
if (room !== undefined) {
return room.y
}
const y = new Y()
return openDB(roomName).then(
db => rtop(db.transaction(['updates'], 'readonly').objectStore('updates').getAll())
).then(
updates =>
y.transact(() => {
updates.forEach(update => {
decodePersisted(y, new BinaryDecoder(update))
})
}, true)
).then(() => Promise.resolve(y))
}
_persistStructsDS (roomName, structsDS) {
const encoder = new BinaryEncoder()
encoder.writeVarUint(PERSIST_STRUCTS_DS)
encoder.writeArrayBuffer(structsDS)
return openDB(roomName).then(db => {
const t = db.transaction(['updates'], 'readwrite')
const updatesStore = t.objectStore('updates')
return rtop(updatesStore.put(encoder.createBuffer()))
})
}
_persistStructs (roomName, structs) {
const encoder = new BinaryEncoder()
encoder.writeVarUint(PERSIST_UPDATE)
encoder.writeArrayBuffer(structs)
return openDB(roomName).then(db => {
const t = db.transaction(['updates'], 'readwrite')
const updatesStore = t.objectStore('updates')
return rtop(updatesStore.put(encoder.createBuffer()))
})
}
connectY (roomName, y) {
if (this._rooms.has(roomName)) {
throw new Error('A Y instance is already bound to this room!')
@ -109,7 +200,7 @@ export default class IndexedDBPersistence {
y
}
if (typeof BroadcastChannel !== 'undefined') {
room.channel = new BroadcastChannel('__yjs__' + room)
room.channel = new BroadcastChannel('__yjs__' + roomName)
room.channel.addEventListener('message', e => {
room.mutex(function () {
decodePersisted(y, new BinaryDecoder(e.data))
@ -137,6 +228,15 @@ export default class IndexedDBPersistence {
}
})
})
// register document in documentsDB
this._documentsDB.then(
db =>
rtop(db.transaction(['documents'], 'readonly').objectStore('documents').get(roomName))
.then(
doc => doc === undefined && rtop(db.transaction(['documents'], 'readwrite').objectStore('documents').add({ roomName, serverUpdateCounter: -1 }))
)
)
// open room db and read existing data
return room.dbPromise = openDB(roomName)
.then(db => {
room.db = db

View File

@ -2,7 +2,7 @@ import { integrateRemoteStructs } from '../MessageHandler/integrateRemoteStructs
import { writeStructs } from '../MessageHandler/syncStep1.mjs'
import { writeDeleteSet, readDeleteSet } from '../MessageHandler/deleteSet.mjs'
const PERSIST_UPDATE = 0
export const PERSIST_UPDATE = 0
/**
* Write an update to an encoder.
*
@ -14,7 +14,7 @@ export function encodeUpdate (y, updateEncoder, encoder) {
encoder.writeBinaryEncoder(updateEncoder)
}
const PERSIST_STRUCTS_DS = 1
export const PERSIST_STRUCTS_DS = 1
/**
* Write the current Yjs data model to an encoder.

View File

@ -46,6 +46,18 @@ export default class BinaryDecoder {
return this.uint8arr.length
}
/**
* Read `len` bytes as an ArrayBuffer.
*/
readArrayBuffer (len) {
const arrayBuffer = new Uint8Array(len)
const view = new Uint8Array(this.uint8arr.buffer, this.pos, len)
arrayBuffer.set(view)
this.pos += len
return arrayBuffer.buffer
}
/**
* Skip one byte, jump to the next position.
*/

View File

@ -181,9 +181,13 @@ export default class BinaryEncoder {
* @param encoder The BinaryEncoder to be written.
*/
writeBinaryEncoder (encoder) {
this.writeArrayBuffer(encoder.createBuffer())
}
writeArrayBuffer (arrayBuffer) {
const prevBufferLen = this._currentBuffer.length
this._data.push(new Uint8Array(this._currentBuffer.buffer, 0, this._currentPos))
this._data.push(new Uint8Array(encoder.createBuffer()))
this._data.push(new Uint8Array(arrayBuffer))
this._currentBuffer = new Uint8Array(prevBufferLen)
this._currentPos = 0
}