diff --git a/examples/html-editor/index.mjs b/examples/html-editor/index.mjs index d3ef5f23..cfce7e7d 100644 --- a/examples/html-editor/index.mjs +++ b/examples/html-editor/index.mjs @@ -4,11 +4,20 @@ import Y from '../../src/Y.mjs' import DomBinding from '../../src/Bindings/DomBinding/DomBinding.mjs' import UndoManager from '../../src/Util/UndoManager.mjs' import YXmlFragment from '../../src/Types/YXml/YXmlFragment.mjs' +import YIndexdDBPersistence from '../../src/Persistences/IndexedDBPersistence.mjs' const connector = new YWebsocketsConnector() +const persistence = new YIndexdDBPersistence() const y = new Y('html-editor', null, null, { gc: true }) -connector.connectY('html-editor2', y) +persistence.connectY('html-editor', y).then(() => { + // connect after persisted content was applied to y + // If we don't wait for persistence, the other peer will send all data, waisting + // network bandwidth.. + connector.connectY('html-editor', y) +}) + window.connector = connector +window.persistence = persistence window.onload = function () { window.domBinding = new DomBinding(window.yXmlType, document.body, { scrollingElement: document.scrollingElement }) diff --git a/examples/notes/index.html b/examples/notes/index.html new file mode 100644 index 00000000..759cbdf4 --- /dev/null +++ b/examples/notes/index.html @@ -0,0 +1,8 @@ + + + + + + + + diff --git a/examples/notes/index.mjs b/examples/notes/index.mjs new file mode 100644 index 00000000..09064ba1 --- /dev/null +++ b/examples/notes/index.mjs @@ -0,0 +1,48 @@ + +import IndexedDBPersistence from '../../src/Persistences/IndexeddbPersistence.mjs' +import YWebsocketsConnector from '../../src/Connectors/WebsocketsConnector/WebsocketsConnector.mjs' +import Y from '../../src/Y.mjs' +import YXmlFragment from '../../src/Types/YXml/YXmlFragment.mjs' + +const yCollection = new YCollection(new YWebsocketsConnector(), new IndexedDBPersistence()) + +const y = yCollection.getDocument('my-notes') + + + + + +persistence.addConnector(persistence) + +const y = new Y() +await persistence.persistY(y) + + +connector.connectY('html-editor', y) +persistence.connectY('html-editor', y) + + + + +window.connector = connector + +window.onload = function () { + window.domBinding = new DomBinding(window.yXmlType, document.body, { scrollingElement: document.scrollingElement }) +} + +window.y = y +window.yXmlType = y.define('xml', YXmlFragment) +window.undoManager = new UndoManager(window.yXmlType, { + captureTimeout: 500 +}) + +document.onkeydown = function interceptUndoRedo (e) { + if (e.keyCode === 90 && (e.metaKey || e.ctrlKey)) { + if (!e.shiftKey) { + window.undoManager.undo() + } else { + window.undoManager.redo() + } + e.preventDefault() + } +} diff --git a/src/Connectors/WebsocketsConnector/WebsocketsConnector.mjs b/src/Connectors/WebsocketsConnector/WebsocketsConnector.mjs index e075fc4d..62651b90 100644 --- a/src/Connectors/WebsocketsConnector/WebsocketsConnector.mjs +++ b/src/Connectors/WebsocketsConnector/WebsocketsConnector.mjs @@ -4,10 +4,8 @@ import NamedEventHandler from '../../Util/NamedEventHandler.mjs' import decodeMessage, { messageSS, messageSubscribe, messageStructs } from './decodeMessage.mjs' import { createMutualExclude } from '../../Util/mutualExclude.mjs' -export const STATE_CONNECTING = 0 -export const STATE_SYNCING = 1 -export const STATE_SYNCED = 2 -export const STATE_DISCONNECTED = 3 +export const STATE_DISCONNECTED = 0 +export const STATE_CONNECTED = 1 export default class WebsocketsConnector extends NamedEventHandler { constructor (url = 'ws://localhost:1234') { @@ -44,20 +42,27 @@ export default class WebsocketsConnector extends NamedEventHandler { } }) }) + if (this._state === STATE_CONNECTED) { + const encoder = new BinaryEncoder() + messageSS(roomName, y, encoder) + messageSubscribe(roomName, y, encoder) + this.send(encoder) + } } _setState (state) { - this.emit('stateChanged', { - state - }) this._state = state + this.emit('stateChanged', { + state: this.state + }) } get state () { - return this._state + return this._state === STATE_DISCONNECTED ? 'disconnected' : 'connected' } _onOpen () { + this._setState(STATE_CONNECTED) const encoder = new BinaryEncoder() for (const [roomName, room] of this._rooms) { const y = room.y @@ -74,6 +79,7 @@ export default class WebsocketsConnector extends NamedEventHandler { } _onClose () { + this._setState(STATE_DISCONNECTED) this._socket = null if (this._connectToServer) { setTimeout(() => { diff --git a/src/Persistences/IndexedDBPersistence.mjs b/src/Persistences/IndexedDBPersistence.mjs new file mode 100644 index 00000000..3bc5cab8 --- /dev/null +++ b/src/Persistences/IndexedDBPersistence.mjs @@ -0,0 +1,189 @@ +/* global indexedDB, location, BroadcastChannel */ + +import Y from '../Y.mjs' +import { createMutualExclude } from '../Util/mutualExclude.mjs' +import { decodePersisted, encodeStructsDS, encodeUpdate } from './decodePersisted.mjs' +import BinaryDecoder from '../Util/Binary/Decoder.mjs' +import BinaryEncoder from '../Util/Binary/Encoder.mjs' +/* + * Request to Promise transformer + */ +function rtop (request) { + return new Promise(function (resolve, reject) { + request.onerror = function (event) { + reject(new Error(event.target.error)) + } + request.onblocked = function () { + location.reload() + } + request.onsuccess = function (event) { + resolve(event.target.result) + } + }) +} + +function openDB (room) { + return new Promise(function (resolve, reject) { + let request = indexedDB.open(room) + request.onupgradeneeded = function (event) { + const db = event.target.result + if (db.objectStoreNames.contains('updates')) { + db.deleteObjectStore('updates') + } + db.createObjectStore('updates', {autoIncrement: true}) + } + request.onerror = function (event) { + reject(new Error(event.target.error)) + } + request.onblocked = function () { + location.reload() + } + request.onsuccess = function (event) { + const db = event.target.result + db.onversionchange = function () { db.close() } + resolve(db) + } + }) +} + +function persist (room) { + let t = room.db.transaction(['updates'], 'readwrite') + let updatesStore = t.objectStore('updates') + return rtop(updatesStore.getAll()) + .then(updates => { + // apply all previous updates before deleting them + room.mutex(() => { + updates.forEach(update => { + decodePersisted(y, new BinaryDecoder(update)) + }) + }) + const encoder = new BinaryEncoder() + encodeStructsDS(y, encoder) + // delete all pending updates + rtop(updatesStore.clear()).then(() => { + // write current model + updatesStore.put(encoder.createBuffer()) + }) + }) +} + +function saveUpdate (room, updateBuffer) { + const db = room.db + if (db !== null) { + const t = db.transaction(['updates'], 'readwrite') + const updatesStore = t.objectStore('updates') + const updatePut = rtop(updatesStore.put(updateBuffer)) + rtop(updatesStore.count()).then(cnt => { + if (cnt >= PREFERRED_TRIM_SIZE) { + persist(room) + } + }) + return updatePut + } else { + room.createdStructs.push(update) + return room.dbPromise + } +} + +const PREFERRED_TRIM_SIZE = 400 + +export default class IndexedDBPersistence { + constructor () { + this._rooms = new Map() + addEventListener('unload', () => { + this._rooms.forEach(room => { + if (room.db !== null) { + room.db.close() + } else { + room._db.then(db => db.close()) + } + }) + }) + } + connectY (roomName, y) { + if (this._rooms.has(roomName)) { + throw new Error('A Y instance is already bound to this room!') + } + let room = { + db: null, + dbPromise: null, + channel: null, + mutex: createMutualExclude(), + y, + createdStructs: [] // document updates before db created + } + if (typeof BroadcastChannel !== 'undefined') { + room.channel = new BroadcastChannel('__yjs__' + room) + room.channel.addEventListener('message', e => { + room.mutex(function () { + decodePersisted(y, new BinaryDecoder(e.data)) + }) + }) + } + y.on('destroyed', () => { + this.disconnectY(roomName, y) + }) + y.on('afterTransaction', (y, transaction) => { + room.mutex(() => { + if (transaction.encodedStructsLen > 0) { + const encoder = new BinaryEncoder() + const update = new BinaryEncoder() + encodeUpdate(y, transaction.encodedStructs, update) + const updateBuffer = update.createBuffer() + if (room.channel !== null) { + room.channel.postMessage(updateBuffer) + } + if (transaction.encodedStructsLen > 0) { + if (room.db === null) { + room.createdStructs.push(updateBuffer) + } else { + saveUpdate(room, updateBuffer) + } + } + } + }) + }) + return room.dbPromise = openDB(roomName) + .then(db => { + room.db = db + const t = room.db.transaction(['updates'], 'readwrite') + const updatesStore = t.objectStore('updates') + return rtop(updatesStore.getAll()).then(updates => { + // apply all previous updates before deleting them + room.mutex(() => { + y.transact(() => { + updates.forEach(update => { + decodePersisted(y, new BinaryDecoder(update)) + }) + }, true) + }) + return Promise.all(room.createdStructs.map(update => { + return saveUpdate(room, update) + })).then(() => { + room.createdStructs = [] + }) + }) + }) + } + disconnectY (roomName) { + const { + db, channel + } = this._rooms.get(roomName) + db.close() + if (channel !== null) { + channel.close() + } + this._rooms.delete(roomName) + } + + /** + * Remove all persisted data that belongs to a room. + * Automatically destroys all Yjs all Yjs instances that persist to + * the room. If `destroyYjsInstances = false` the persistence functionality + * will be removed from the Yjs instances. + */ + removePersistedData (roomName, destroyYjsInstances = true) { + this.disconnectY(roomName) + return rtop(indexedDB.deleteDatabase(roomName)) + } +} diff --git a/src/Persistences/IndexeddbPersistence.mjs b/src/Persistences/IndexeddbPersistence.mjs deleted file mode 100644 index 1798b54c..00000000 --- a/src/Persistences/IndexeddbPersistence.mjs +++ /dev/null @@ -1,174 +0,0 @@ -/* global indexedDB, location, BroadcastChannel */ - -import Y from '../Y.mjs' - -/* - * Request to Promise transformer - */ -function rtop (request) { - return new Promise(function (resolve, reject) { - request.onerror = function (event) { - reject(new Error(event.target.error)) - } - request.onblocked = function () { - location.reload() - } - request.onsuccess = function (event) { - resolve(event.target.result) - } - }) -} - -function openDB (room) { - return new Promise(function (resolve, reject) { - let request = indexedDB.open(room) - window.r1 = request - request.onupgradeneeded = function (event) { - const db = event.target.result - if (db.objectStoreNames.contains('model')) { - db.deleteObjectStore('updates') - db.deleteObjectStore('model') - db.deleteObjectStore('custom') - } - db.createObjectStore('updates', {autoIncrement: true}) - db.createObjectStore('model') - db.createObjectStore('custom') - } - request.onerror = function (event) { - reject(new Error(event.target.error)) - } - request.onblocked = function () { - location.reload() - } - request.onsuccess = function (event) { - const db = event.target.result - db.onversionchange = function () { db.close() } - resolve(db) - } - }) -} - -const PREFERRED_TRIM_SIZE = 500 - -export default class IndexedDBPersistence extends Y.AbstractPersistence { - constructor (opts) { - super(opts) - window.addEventListener('unload', () => { - this.ys.forEach(function (cnf, y) { - if (cnf.db !== null) { - cnf.db.close() - } else { - cnf._db.then(db => db.close()) - } - }) - }) - } - init (y) { - let cnf = this.ys.get(y) - let room = y.room - cnf.db = null - const dbOpened = openDB(room) - dbOpened.then(db => { - cnf.db = db - }) - if (typeof BroadcastChannel !== 'undefined') { - cnf.channel = new BroadcastChannel('__yjs__' + room) - cnf.channel.addEventListener('message', e => { - cnf.mutualExclude(function () { - y.transact(function () { - Y.utils.integrateRemoteStructs(y, new Y.utils.BinaryDecoder(e.data)) - }) - }) - }) - } else { - cnf.channel = null - } - return dbOpened - } - - deinit (y) { - let cnf = this.ys.get(y) - cnf.db.close() - super.deinit(y) - } - - set (y, key, value) { - const cnf = this.ys.get(y) - const t = cnf.db.transaction(['custom'], 'readwrite') - const customStore = t.objectStore('custom') - return rtop(customStore.put(value, key)) - } - - get (y, key) { - const cnf = this.ys.get(y) - const t = cnf.db.transaction(['custom'], 'readwrite') - const customStore = t.objectStore('custom') - return rtop(customStore.get(key)) - } - - /** - * Remove all persisted data that belongs to a room. - * Automatically destroys all Yjs all Yjs instances that persist to - * the room. If `destroyYjsInstances = false` the persistence functionality - * will be removed from the Yjs instances. - */ - removePersistedData (room, destroyYjsInstances = true) { - super.removePersistedData(room, destroyYjsInstances) - return rtop(indexedDB.deleteDatabase(room)) - } - - saveUpdate (y, update) { - let cnf = this.ys.get(y) - if (cnf.channel !== null) { - cnf.channel.postMessage(update) - } - let t = cnf.db.transaction(['updates'], 'readwrite') - let updatesStore = t.objectStore('updates') - updatesStore.put(update) - let cntP = rtop(updatesStore.count()) - cntP.then(cnt => { - if (cnt >= PREFERRED_TRIM_SIZE) { - this.persist(y) - } - }) - } - - saveStruct (y, struct) { - super.saveStruct(y, struct) - } - - retrieve (y) { - let cnf = this.ys.get(y) - let t = cnf.db.transaction(['updates', 'model'], 'readonly') - let modelStore = t.objectStore('model') - let updatesStore = t.objectStore('updates') - return Promise.all([rtop(modelStore.get(0)), rtop(updatesStore.getAll())]) - .then(([model, updates]) => { - super.retrieve(y, model, updates) - }) - } - - persist (y) { - let cnf = this.ys.get(y) - let db = cnf.db - let t = db.transaction(['updates', 'model'], 'readwrite') - let updatesStore = t.objectStore('updates') - return rtop(updatesStore.getAll()) - .then(updates => { - // apply pending updates before deleting them - Y.AbstractPersistence.prototype.retrieve.call(this, y, null, updates) - // get binary model - let binaryModel = Y.AbstractPersistence.prototype.persist.call(this, y) - // delete all pending updates - if (updates.length > 0) { - let modelStore = t.objectStore('model') - modelStore.put(binaryModel, 0) - updatesStore.clear() - } - }) - } -} - -if (typeof Y !== 'undefined') { - extendYIndexedDBPersistence(Y) // eslint-disable-line -} diff --git a/src/Persistences/decodePersisted.mjs b/src/Persistences/decodePersisted.mjs index cda8c3f6..eb89637c 100644 --- a/src/Persistences/decodePersisted.mjs +++ b/src/Persistences/decodePersisted.mjs @@ -29,7 +29,7 @@ export function encodeStructsDS (y, encoder) { } /** - *Feed the Yjs instance with the persisted state + * Feed the Yjs instance with the persisted state * @param {Yjs} y A Yjs instance. * @param {BinaryDecoder} decoder A Decoder instance that holds the file content. */ @@ -39,17 +39,13 @@ export function decodePersisted (y, decoder) { const contentType = decoder.readVarUint() switch (contentType) { case PERSIST_UPDATE: - y.transact(() => { - integrateRemoteStructs(y, decoder) - }) + integrateRemoteStructs(y, decoder) break case PERSIST_STRUCTS_DS: - y.transact(() => { - integrateRemoteStructs(y, decoder) - readDeleteSet(y, decoder) - }) + integrateRemoteStructs(y, decoder) + readDeleteSet(y, decoder) break } } - }) + }, true) } diff --git a/src/Util/Binary/Decoder.mjs b/src/Util/Binary/Decoder.mjs index 17a6f26a..9776e0d2 100644 --- a/src/Util/Binary/Decoder.mjs +++ b/src/Util/Binary/Decoder.mjs @@ -113,20 +113,36 @@ export default class BinaryDecoder { * Read string of variable length * * varUint is used to store the length of the string * + * Transforming utf8 to a string is pretty expensive. The code performs 10x better + * when String.fromCodePoint is fed with all characters as arguments. + * But most environments have a maximum number of arguments per functions. + * For effiency reasons we apply a maximum of 10000 characters at once. + * * @return {String} The read String. */ readVarString () { - let len = this.readVarUint() + let remainingLen = this.readVarUint() let encodedString = '' + let i = 0 + while (remainingLen > 0) { + const nextLen = Math.min(remainingLen, 10000) + const bytes = new Array(nextLen) + for (let i = 0; i < nextLen; i++) { + bytes[i] = this.uint8arr[this.pos++] + } + encodedString += String.fromCodePoint.apply(null, bytes) + remainingLen -= nextLen + } + /* //let bytes = new Array(len) for (let i = 0; i < len; i++) { //bytes[i] = this.uint8arr[this.pos++] - // encodedString += String.fromCodePoint(this.uint8arr[this.pos++]) - encodedString += String(this.uint8arr[this.pos++]) + encodedString += String.fromCodePoint(this.uint8arr[this.pos++]) + // encodedString += String(this.uint8arr[this.pos++]) } + */ //let encodedString = String.fromCodePoint.apply(null, bytes) - //return decodeURIComponent(escape(encodedString)) - return encodedString + return decodeURIComponent(escape(encodedString)) } /** diff --git a/src/Util/Binary/Encoder.mjs b/src/Util/Binary/Encoder.mjs index b8fb7bf8..26cd67cd 100644 --- a/src/Util/Binary/Encoder.mjs +++ b/src/Util/Binary/Encoder.mjs @@ -167,12 +167,11 @@ export default class BinaryEncoder { * @param {String} str The string that is to be encoded. */ writeVarString (str) { - let encodedString = unescape(encodeURIComponent(str)) - let bytes = encodedString.split('').map(c => c.codePointAt()) - let len = bytes.length + const encodedString = unescape(encodeURIComponent(str)) + const len = encodedString.length this.writeVarUint(len) for (let i = 0; i < len; i++) { - this.write(bytes[i]) + this.write(encodedString.codePointAt(i)) } } diff --git a/src/Y.mjs b/src/Y.mjs index 6f74c237..1e2b30c6 100644 --- a/src/Y.mjs +++ b/src/Y.mjs @@ -246,6 +246,7 @@ export default class Y extends NamedEventHandler { * Persisted data will remain until removed by the persistence adapter. */ destroy () { + this.emit('destroyed', true) super.destroy() this.share = null if (this.connector != null) {