From fb2f9bc493132c54311cdb8328c2d9f9551a02c5 Mon Sep 17 00:00:00 2001 From: Kevin Jahns Date: Mon, 4 Jun 2018 17:35:39 +0200 Subject: [PATCH] add client-server updateCounter support to sync all persisted rooms --- examples/html-editor/index.html | 5 +- examples/html-editor/index.mjs | 68 +++++++----- src/Bindings/DomBinding/DomBinding.mjs | 3 +- .../WebsocketsConnector.mjs | 41 +++++-- .../WebsocketsConnector/decodeMessage.mjs | 85 ++++++++++---- src/Connectors/WebsocketsConnector/server.mjs | 20 ++-- src/Persistences/FilePersistence.mjs | 5 + src/Persistences/IndexedDBPersistence.mjs | 104 +++++++++++++++++- src/Persistences/decodePersisted.mjs | 4 +- src/Util/Binary/Decoder.mjs | 12 ++ src/Util/Binary/Encoder.mjs | 6 +- 11 files changed, 280 insertions(+), 73 deletions(-) diff --git a/examples/html-editor/index.html b/examples/html-editor/index.html index 759cbdf4..0f754006 100644 --- a/examples/html-editor/index.html +++ b/examples/html-editor/index.html @@ -3,6 +3,9 @@ - + + + +
diff --git a/examples/html-editor/index.mjs b/examples/html-editor/index.mjs index cfce7e7d..fde50997 100644 --- a/examples/html-editor/index.mjs +++ b/examples/html-editor/index.mjs @@ -8,34 +8,52 @@ import YIndexdDBPersistence from '../../src/Persistences/IndexedDBPersistence.mj const connector = new YWebsocketsConnector() const persistence = new YIndexdDBPersistence() -const y = new Y('html-editor', null, null, { gc: true }) -persistence.connectY('html-editor', y).then(() => { - // connect after persisted content was applied to y - // If we don't wait for persistence, the other peer will send all data, waisting - // network bandwidth.. - connector.connectY('html-editor', y) -}) + +const roomInput = document.querySelector('#room') + +let currentRoomName = null +let y = null +let domBinding = null + +function setRoomName (roomName) { + if (currentRoomName !== roomName) { + console.log(`change room: "${roomName}"`) + roomInput.value = roomName + currentRoomName = roomName + location.hash = '#' + roomName + if (y !== null) { + domBinding.destroy() + } + + const room = connector._rooms.get(roomName) + if (room !== undefined) { + y = room.y + } else { + y = new Y(roomName, null, null, { gc: true }) + persistence.connectY(roomName, y).then(() => { + // connect after persisted content was applied to y + // If we don't wait for persistence, the other peer will send all data, waisting + // network bandwidth.. + connector.connectY(roomName, y) + }) + } + + window.y = y + window.yXmlType = y.define('xml', YXmlFragment) + + domBinding = new DomBinding(window.yXmlType, document.querySelector('#content'), { scrollingElement: document.scrollingElement }) + } +} + +connector.syncPersistence(persistence) window.connector = connector window.persistence = persistence window.onload = function () { - window.domBinding = new DomBinding(window.yXmlType, document.body, { scrollingElement: document.scrollingElement }) -} - -window.y = y -window.yXmlType = y.define('xml', YXmlFragment) -window.undoManager = new UndoManager(window.yXmlType, { - captureTimeout: 500 -}) - -document.onkeydown = function interceptUndoRedo (e) { - if (e.keyCode === 90 && (e.metaKey || e.ctrlKey)) { - if (!e.shiftKey) { - window.undoManager.undo() - } else { - window.undoManager.redo() - } - e.preventDefault() - } + setRoomName((location.hash || '#default').slice(1)) + roomInput.addEventListener('input', e => { + const roomName = e.target.value + setRoomName(roomName) + }) } diff --git a/src/Bindings/DomBinding/DomBinding.mjs b/src/Bindings/DomBinding/DomBinding.mjs index 42643111..78ff9f1b 100644 --- a/src/Bindings/DomBinding/DomBinding.mjs +++ b/src/Bindings/DomBinding/DomBinding.mjs @@ -121,12 +121,11 @@ export default class DomBinding extends Binding { destroy () { this.domToType = null this.typeToDom = null - this.type.unobserve(this._typeObserver) + this.type.unobserveDeep(this._typeObserver) this._mutationObserver.disconnect() const y = this.type._y y.off('beforeTransaction', this._beforeTransactionHandler) y.off('beforeObserverCalls', this._beforeObserverCallsHandler) - y.off('afterObserverCalls', this._afterObserverCallsHandler) y.off('afterTransaction', this._afterTransactionHandler) super.destroy() } diff --git a/src/Connectors/WebsocketsConnector/WebsocketsConnector.mjs b/src/Connectors/WebsocketsConnector/WebsocketsConnector.mjs index 62651b90..1189e0e3 100644 --- a/src/Connectors/WebsocketsConnector/WebsocketsConnector.mjs +++ b/src/Connectors/WebsocketsConnector/WebsocketsConnector.mjs @@ -3,6 +3,7 @@ import BinaryEncoder from '../../Util/Binary/Encoder.mjs' import NamedEventHandler from '../../Util/NamedEventHandler.mjs' import decodeMessage, { messageSS, messageSubscribe, messageStructs } from './decodeMessage.mjs' import { createMutualExclude } from '../../Util/mutualExclude.mjs' +import { messageCheckUpdateCounter } from './decodeMessage.mjs' export const STATE_DISCONNECTED = 0 export const STATE_CONNECTED = 1 @@ -17,11 +18,25 @@ export default class WebsocketsConnector extends NamedEventHandler { this._connectToServer = true this._reconnectTimeout = 300 this._mutualExclude = createMutualExclude() + this._persistence = null this.connect() } getRoom (roomName) { - return this._rooms.get(roomName) + return this._rooms.get(roomName) || { y: null, roomName, localUpdateCounter: 1 } + } + + syncPersistence (persistence) { + this._persistence = persistence + if (this._state === STATE_CONNECTED) { + persistence.getAllDocuments().then(docs => { + const encoder = new BinaryEncoder() + docs.forEach(doc => { + messageCheckUpdateCounter(doc.roomName, encoder, doc.remoteUpdateCounter) + }); + this.send(encoder) + }) + } } connectY (roomName, y) { @@ -31,13 +46,15 @@ export default class WebsocketsConnector extends NamedEventHandler { } this._rooms.set(roomName, { roomName, - y + y, + localUpdateCounter: 1 }) y.on('afterTransaction', (y, transaction) => { this._mutualExclude(() => { if (transaction.encodedStructsLen > 0) { const encoder = new BinaryEncoder() - messageStructs(roomName, y, encoder, transaction.encodedStructs) + const room = this._rooms.get(roomName) + messageStructs(roomName, y, encoder, transaction.encodedStructs, ++room.localUpdateCounter) this.send(encoder) } }) @@ -63,13 +80,17 @@ export default class WebsocketsConnector extends NamedEventHandler { _onOpen () { this._setState(STATE_CONNECTED) - const encoder = new BinaryEncoder() - for (const [roomName, room] of this._rooms) { - const y = room.y - messageSS(roomName, y, encoder) - messageSubscribe(roomName, y, encoder) + if (this._persistence === null) { + const encoder = new BinaryEncoder() + for (const [roomName, room] of this._rooms) { + const y = room.y + messageSS(roomName, y, encoder) + messageSubscribe(roomName, y, encoder) + } + this.send(encoder) + } else { + this.syncPersistence(this._persistence) } - this.send(encoder) } send (encoder) { @@ -93,7 +114,7 @@ export default class WebsocketsConnector extends NamedEventHandler { _onMessage (message) { if (message.data.byteLength > 0) { - const reply = decodeMessage(this, message.data, null) + const reply = decodeMessage(this, message.data, null, false, this._persistence) this.send(reply) } } diff --git a/src/Connectors/WebsocketsConnector/decodeMessage.mjs b/src/Connectors/WebsocketsConnector/decodeMessage.mjs index 696a18e5..b7cbaac4 100644 --- a/src/Connectors/WebsocketsConnector/decodeMessage.mjs +++ b/src/Connectors/WebsocketsConnector/decodeMessage.mjs @@ -18,9 +18,6 @@ export function messageSubscribe (roomName, y, encoder) { } const CONTENT_SS = 0 -/** - * Message the current state set. The other side must respond with CONTENT_STRUCTS_DSS - */ export function messageSS (roomName, y, encoder) { encoder.writeVarString(roomName) encoder.writeVarUint(CONTENT_SS) @@ -28,20 +25,33 @@ export function messageSS (roomName, y, encoder) { } const CONTENT_STRUCTS_DSS = 2 -export function messageStructsDSS (roomName, y, encoder, ss) { +export function messageStructsDSS (roomName, y, encoder, ss, updateCounter) { encoder.writeVarString(roomName) encoder.writeVarUint(CONTENT_STRUCTS_DSS) - writeStructs(y, encoder, ss) - writeDeleteSet(y, encoder) + encoder.writeVarUint(updateCounter) + const structsDS = new BinaryEncoder() + writeStructs(y, structsDS, ss) + writeDeleteSet(y, structsDS) + encoder.writeVarUint(structsDS.length) + encoder.writeBinaryEncoder(structsDS) } const CONTENT_STRUCTS = 5 -export function messageStructs (roomName, y, encoder, structsBinaryEncoder) { +export function messageStructs (roomName, y, encoder, structsBinaryEncoder, updateCounter) { encoder.writeVarString(roomName) encoder.writeVarUint(CONTENT_STRUCTS) + encoder.writeVarUint(updateCounter) + encoder.writeVarUint(structsBinaryEncoder.length) encoder.writeBinaryEncoder(structsBinaryEncoder) } +const CONTENT_CHECK_COUNTER = 6 +export function messageCheckUpdateCounter (roomName, encoder, updateCounter = 0) { + encoder.writeVarString(roomName) + encoder.writeVarUint(CONTENT_CHECK_COUNTER) + encoder.writeVarUint(updateCounter) +} + /** * Decodes a client-message. * @@ -57,7 +67,7 @@ export function messageStructs (roomName, y, encoder, structsBinaryEncoder) { * @param {*} message The binary encoded message * @param {*} ws The connection object */ -export default function decodeMessage (connector, message, ws, isServer = false) { +export default function decodeMessage (connector, message, ws, isServer = false, persistence) { const decoder = new BinaryDecoder(message) const encoder = new BinaryEncoder() while (decoder.hasContent()) { @@ -66,15 +76,38 @@ export default function decodeMessage (connector, message, ws, isServer = false) const room = connector.getRoom(roomName) const y = room.y switch (contentType) { + case CONTENT_CHECK_COUNTER: + const updateCounter = decoder.readVarUint() + if (room.localUpdateCounter !== updateCounter) { + messageGetSS(roomName, y, encoder) + } + connector.subscribe(roomName, ws) + break case CONTENT_STRUCTS: + console.log(`${roomName}: received update`) connector._mutualExclude(() => { - y.transact(() => { - integrateRemoteStructs(y, decoder) - }, true) + const remoteUpdateCounter = decoder.readVarUint() + persistence.setRemoteUpdateCounter(roomName, remoteUpdateCounter) + const messageLen = decoder.readVarUint() + if (y === null) { + persistence._persistStructs(roomName, decoder.readArrayBuffer(messageLen)) + } else { + y.transact(() => { + integrateRemoteStructs(y, decoder) + }, true) + } }) break case CONTENT_GET_SS: - messageSS(roomName, y, encoder) + if (y !== null) { + messageSS(roomName, y, encoder) + } else { + persistence._createYInstance(roomName).then(y => { + const encoder = new BinaryEncoder() + messageSS(roomName, y, encoder) + connector.send(encoder, ws) + }) + } break case CONTENT_SUBSCRIBE: connector.subscribe(roomName, ws) @@ -84,12 +117,14 @@ export default function decodeMessage (connector, message, ws, isServer = false) // reply with missing content const ss = readStateSet(decoder) const sendStructsDSS = () => { - const encoder = new BinaryEncoder() - messageStructsDSS(roomName, y, encoder, ss) - if (isServer) { - messageSS(roomName, y, encoder) + if (y !== null) { // TODO: how to sync local content? + const encoder = new BinaryEncoder() + messageStructsDSS(roomName, y, encoder, ss, room.localUpdateCounter) // room.localUpdateHandler in case it changes + if (isServer) { + messageSS(roomName, y, encoder) + } + connector.send(encoder, ws) } - connector.send(encoder, ws) } if (room.persistenceLoaded !== undefined) { room.persistenceLoaded.then(sendStructsDSS) @@ -98,11 +133,19 @@ export default function decodeMessage (connector, message, ws, isServer = false) } break case CONTENT_STRUCTS_DSS: + console.log(`${roomName}: synced`) connector._mutualExclude(() => { - y.transact(() => { - integrateRemoteStructs(y, decoder) - readDeleteSet(y, decoder) - }, true) + const remoteUpdateCounter = decoder.readVarUint() + persistence.setRemoteUpdateCounter(roomName, remoteUpdateCounter) + const messageLen = decoder.readVarUint() + if (y === null) { + persistence._persistStructsDS(roomName, decoder.readArrayBuffer(messageLen)) + } else { + y.transact(() => { + integrateRemoteStructs(y, decoder) + readDeleteSet(y, decoder) + }, true) + } }) break default: diff --git a/src/Connectors/WebsocketsConnector/server.mjs b/src/Connectors/WebsocketsConnector/server.mjs index 63dc4dc4..c41eea4e 100644 --- a/src/Connectors/WebsocketsConnector/server.mjs +++ b/src/Connectors/WebsocketsConnector/server.mjs @@ -28,6 +28,7 @@ const wss = new WebsocketsServer({ * Set of room names that are scheduled to be sweeped (destroyed because they don't have a connection anymore) */ const scheduledSweeps = new Set() +/* TODO: enable sweeping setInterval(function sweepRoomes () { scheduledSweeps.forEach(roomName => { const room = rooms.get(roomName) @@ -43,7 +44,7 @@ setInterval(function sweepRoomes () { } }) scheduledSweeps.clear() -}, 5000) +}, 5000) */ const wsConnector = { send: (encoder, ws) => { @@ -66,6 +67,13 @@ const wsConnector = { if (room === undefined) { const y = new Y(roomName, null, null, { gc: true }) const persistenceLoaded = persistence.readState(roomName, y) + room = { + name: roomName, + connections: new Set(), + y, + persistenceLoaded, + localUpdateCounter: 1 + } y.on('afterTransaction', (y, transaction) => { if (transaction.encodedStructsLen > 0) { // save to persistence @@ -73,7 +81,7 @@ const wsConnector = { // forward update to clients persistence._mutex(() => { // do not broadcast if persistence.readState is called const encoder = new BinaryEncoder() - messageStructs(roomName, y, encoder, transaction.encodedStructs) + messageStructs(roomName, y, encoder, transaction.encodedStructs, ++room.localUpdateCounter) const message = encoder.createBuffer() // when changed, broakcast update to all connections room.connections.forEach(conn => { @@ -82,12 +90,6 @@ const wsConnector = { }) } }) - room = { - name: roomName, - connections: new Set(), - y, - persistenceLoaded - } rooms.set(roomName, room) } return room @@ -97,7 +99,7 @@ const wsConnector = { wss.on('connection', (ws) => { ws.on('message', function onWSMessage (message) { if (message.byteLength > 0) { - const reply = decodeMessage(wsConnector, message, ws, true) + const reply = decodeMessage(wsConnector, message, ws, true, persistence) if (reply.length > 0) { ws.send(reply.createBuffer(), null, null, true) } diff --git a/src/Persistences/FilePersistence.mjs b/src/Persistences/FilePersistence.mjs index 83d7a2af..7bbafe71 100644 --- a/src/Persistences/FilePersistence.mjs +++ b/src/Persistences/FilePersistence.mjs @@ -6,6 +6,7 @@ import { createMutualExclude } from '../Util/mutualExclude.mjs' import { encodeUpdate, encodeStructsDS, decodePersisted } from './decodePersisted.mjs' function createFilePath (persistence, roomName) { + // TODO: filename checking! return path.join(persistence.dir, roomName) } @@ -14,6 +15,10 @@ export default class FilePersistence { this.dir = dir this._mutex = createMutualExclude() } + setRemoteUpdateCounter (roomName, remoteUpdateCounter) { + // TODO: implement + // nop + } saveUpdate (room, y, encodedStructs) { return new Promise((resolve, reject) => { this._mutex(() => { diff --git a/src/Persistences/IndexedDBPersistence.mjs b/src/Persistences/IndexedDBPersistence.mjs index 4f3ddc08..e4b2e83a 100644 --- a/src/Persistences/IndexedDBPersistence.mjs +++ b/src/Persistences/IndexedDBPersistence.mjs @@ -5,6 +5,8 @@ import { createMutualExclude } from '../Util/mutualExclude.mjs' import { decodePersisted, encodeStructsDS, encodeUpdate } from './decodePersisted.mjs' import BinaryDecoder from '../Util/Binary/Decoder.mjs' import BinaryEncoder from '../Util/Binary/Encoder.mjs' +import { PERSIST_STRUCTS_DS } from './decodePersisted.mjs'; +import { PERSIST_UPDATE } from './decodePersisted.mjs'; /* * Request to Promise transformer */ @@ -82,21 +84,110 @@ function saveUpdate (room, updateBuffer) { } } +function registerRoomInPersistence (documentsDB, roomName) { + return documentsDB.then( + db => Promise.all([ + db, + rtop(db.transaction(['documents'], 'readonly').objectStore('documents').get(roomName)) + ]) + ).then( + ([db, doc]) => { + if (doc === undefined) { + return rtop(db.transaction(['documents'], 'readwrite').objectStore('documents').add({ roomName, serverUpdateCounter: 0 })) + } + } + ) +} + const PREFERRED_TRIM_SIZE = 400 export default class IndexedDBPersistence { constructor () { this._rooms = new Map() + this._documentsDB = new Promise(function (resolve, reject) { + let request = indexedDB.open('_yjs_documents') + request.onupgradeneeded = function (event) { + const db = event.target.result + if (db.objectStoreNames.contains('documents')) { + db.deleteObjectStore('documents') + } + db.createObjectStore('documents', { keyPath: "roomName" }) + } + request.onerror = function (event) { + reject(new Error(event.target.error)) + } + request.onblocked = function () { + location.reload() + } + request.onsuccess = function (event) { + const db = event.target.result + db.onversionchange = function () { db.close() } + resolve(db) + } + }) addEventListener('unload', () => { + // close everything when page unloads this._rooms.forEach(room => { if (room.db !== null) { room.db.close() } else { - room._db.then(db => db.close()) + room.dbPromise.then(db => db.close()) } }) + this._documentsDB.then(db => db.close()) }) } + getAllDocuments () { + return this._documentsDB.then( + db => rtop(db.transaction(['documents'], 'readonly').objectStore('documents').getAll()) + ) + } + setRemoteUpdateCounter (roomName, remoteUpdateCounter) { + this._documentsDB.then( + db => rtop(db.transaction(['documents'], 'readwrite').objectStore('documents').put({ roomName, remoteUpdateCounter })) + ) + } + + _createYInstance (roomName) { + const room = this._rooms.get(roomName) + if (room !== undefined) { + return room.y + } + const y = new Y() + return openDB(roomName).then( + db => rtop(db.transaction(['updates'], 'readonly').objectStore('updates').getAll()) + ).then( + updates => + y.transact(() => { + updates.forEach(update => { + decodePersisted(y, new BinaryDecoder(update)) + }) + }, true) + ).then(() => Promise.resolve(y)) + } + + _persistStructsDS (roomName, structsDS) { + const encoder = new BinaryEncoder() + encoder.writeVarUint(PERSIST_STRUCTS_DS) + encoder.writeArrayBuffer(structsDS) + return openDB(roomName).then(db => { + const t = db.transaction(['updates'], 'readwrite') + const updatesStore = t.objectStore('updates') + return rtop(updatesStore.put(encoder.createBuffer())) + }) + } + + _persistStructs (roomName, structs) { + const encoder = new BinaryEncoder() + encoder.writeVarUint(PERSIST_UPDATE) + encoder.writeArrayBuffer(structs) + return openDB(roomName).then(db => { + const t = db.transaction(['updates'], 'readwrite') + const updatesStore = t.objectStore('updates') + return rtop(updatesStore.put(encoder.createBuffer())) + }) + } + connectY (roomName, y) { if (this._rooms.has(roomName)) { throw new Error('A Y instance is already bound to this room!') @@ -109,7 +200,7 @@ export default class IndexedDBPersistence { y } if (typeof BroadcastChannel !== 'undefined') { - room.channel = new BroadcastChannel('__yjs__' + room) + room.channel = new BroadcastChannel('__yjs__' + roomName) room.channel.addEventListener('message', e => { room.mutex(function () { decodePersisted(y, new BinaryDecoder(e.data)) @@ -137,6 +228,15 @@ export default class IndexedDBPersistence { } }) }) + // register document in documentsDB + this._documentsDB.then( + db => + rtop(db.transaction(['documents'], 'readonly').objectStore('documents').get(roomName)) + .then( + doc => doc === undefined && rtop(db.transaction(['documents'], 'readwrite').objectStore('documents').add({ roomName, serverUpdateCounter: -1 })) + ) + ) + // open room db and read existing data return room.dbPromise = openDB(roomName) .then(db => { room.db = db diff --git a/src/Persistences/decodePersisted.mjs b/src/Persistences/decodePersisted.mjs index eb89637c..1936b1fd 100644 --- a/src/Persistences/decodePersisted.mjs +++ b/src/Persistences/decodePersisted.mjs @@ -2,7 +2,7 @@ import { integrateRemoteStructs } from '../MessageHandler/integrateRemoteStructs import { writeStructs } from '../MessageHandler/syncStep1.mjs' import { writeDeleteSet, readDeleteSet } from '../MessageHandler/deleteSet.mjs' -const PERSIST_UPDATE = 0 +export const PERSIST_UPDATE = 0 /** * Write an update to an encoder. * @@ -14,7 +14,7 @@ export function encodeUpdate (y, updateEncoder, encoder) { encoder.writeBinaryEncoder(updateEncoder) } -const PERSIST_STRUCTS_DS = 1 +export const PERSIST_STRUCTS_DS = 1 /** * Write the current Yjs data model to an encoder. diff --git a/src/Util/Binary/Decoder.mjs b/src/Util/Binary/Decoder.mjs index 9776e0d2..7e69ce26 100644 --- a/src/Util/Binary/Decoder.mjs +++ b/src/Util/Binary/Decoder.mjs @@ -46,6 +46,18 @@ export default class BinaryDecoder { return this.uint8arr.length } + + /** + * Read `len` bytes as an ArrayBuffer. + */ + readArrayBuffer (len) { + const arrayBuffer = new Uint8Array(len) + const view = new Uint8Array(this.uint8arr.buffer, this.pos, len) + arrayBuffer.set(view) + this.pos += len + return arrayBuffer.buffer + } + /** * Skip one byte, jump to the next position. */ diff --git a/src/Util/Binary/Encoder.mjs b/src/Util/Binary/Encoder.mjs index 26cd67cd..101e317d 100644 --- a/src/Util/Binary/Encoder.mjs +++ b/src/Util/Binary/Encoder.mjs @@ -181,9 +181,13 @@ export default class BinaryEncoder { * @param encoder The BinaryEncoder to be written. */ writeBinaryEncoder (encoder) { + this.writeArrayBuffer(encoder.createBuffer()) + } + + writeArrayBuffer (arrayBuffer) { const prevBufferLen = this._currentBuffer.length this._data.push(new Uint8Array(this._currentBuffer.buffer, 0, this._currentPos)) - this._data.push(new Uint8Array(encoder.createBuffer())) + this._data.push(new Uint8Array(arrayBuffer)) this._currentBuffer = new Uint8Array(prevBufferLen) this._currentPos = 0 }