added YIndexedDB

This commit is contained in:
Kevin Jahns 2018-06-03 21:58:51 +02:00
parent a1fb1a6258
commit 9df20fac8a
10 changed files with 299 additions and 201 deletions

View File

@ -4,11 +4,20 @@ import Y from '../../src/Y.mjs'
import DomBinding from '../../src/Bindings/DomBinding/DomBinding.mjs'
import UndoManager from '../../src/Util/UndoManager.mjs'
import YXmlFragment from '../../src/Types/YXml/YXmlFragment.mjs'
import YIndexdDBPersistence from '../../src/Persistences/IndexedDBPersistence.mjs'
const connector = new YWebsocketsConnector()
const persistence = new YIndexdDBPersistence()
const y = new Y('html-editor', null, null, { gc: true })
connector.connectY('html-editor2', y)
persistence.connectY('html-editor', y).then(() => {
// connect after persisted content was applied to y
// If we don't wait for persistence, the other peer will send all data, waisting
// network bandwidth..
connector.connectY('html-editor', y)
})
window.connector = connector
window.persistence = persistence
window.onload = function () {
window.domBinding = new DomBinding(window.yXmlType, document.body, { scrollingElement: document.scrollingElement })

View File

@ -0,0 +1,8 @@
<!DOCTYPE html>
<html>
</head>
<script src="./index.mjs" type="module"></script>
</head>
<body contenteditable="true">
</body>
</html>

48
examples/notes/index.mjs Normal file
View File

@ -0,0 +1,48 @@
import IndexedDBPersistence from '../../src/Persistences/IndexeddbPersistence.mjs'
import YWebsocketsConnector from '../../src/Connectors/WebsocketsConnector/WebsocketsConnector.mjs'
import Y from '../../src/Y.mjs'
import YXmlFragment from '../../src/Types/YXml/YXmlFragment.mjs'
const yCollection = new YCollection(new YWebsocketsConnector(), new IndexedDBPersistence())
const y = yCollection.getDocument('my-notes')
persistence.addConnector(persistence)
const y = new Y()
await persistence.persistY(y)
connector.connectY('html-editor', y)
persistence.connectY('html-editor', y)
window.connector = connector
window.onload = function () {
window.domBinding = new DomBinding(window.yXmlType, document.body, { scrollingElement: document.scrollingElement })
}
window.y = y
window.yXmlType = y.define('xml', YXmlFragment)
window.undoManager = new UndoManager(window.yXmlType, {
captureTimeout: 500
})
document.onkeydown = function interceptUndoRedo (e) {
if (e.keyCode === 90 && (e.metaKey || e.ctrlKey)) {
if (!e.shiftKey) {
window.undoManager.undo()
} else {
window.undoManager.redo()
}
e.preventDefault()
}
}

View File

@ -4,10 +4,8 @@ import NamedEventHandler from '../../Util/NamedEventHandler.mjs'
import decodeMessage, { messageSS, messageSubscribe, messageStructs } from './decodeMessage.mjs'
import { createMutualExclude } from '../../Util/mutualExclude.mjs'
export const STATE_CONNECTING = 0
export const STATE_SYNCING = 1
export const STATE_SYNCED = 2
export const STATE_DISCONNECTED = 3
export const STATE_DISCONNECTED = 0
export const STATE_CONNECTED = 1
export default class WebsocketsConnector extends NamedEventHandler {
constructor (url = 'ws://localhost:1234') {
@ -44,20 +42,27 @@ export default class WebsocketsConnector extends NamedEventHandler {
}
})
})
if (this._state === STATE_CONNECTED) {
const encoder = new BinaryEncoder()
messageSS(roomName, y, encoder)
messageSubscribe(roomName, y, encoder)
this.send(encoder)
}
}
_setState (state) {
this.emit('stateChanged', {
state
})
this._state = state
this.emit('stateChanged', {
state: this.state
})
}
get state () {
return this._state
return this._state === STATE_DISCONNECTED ? 'disconnected' : 'connected'
}
_onOpen () {
this._setState(STATE_CONNECTED)
const encoder = new BinaryEncoder()
for (const [roomName, room] of this._rooms) {
const y = room.y
@ -74,6 +79,7 @@ export default class WebsocketsConnector extends NamedEventHandler {
}
_onClose () {
this._setState(STATE_DISCONNECTED)
this._socket = null
if (this._connectToServer) {
setTimeout(() => {

View File

@ -0,0 +1,189 @@
/* global indexedDB, location, BroadcastChannel */
import Y from '../Y.mjs'
import { createMutualExclude } from '../Util/mutualExclude.mjs'
import { decodePersisted, encodeStructsDS, encodeUpdate } from './decodePersisted.mjs'
import BinaryDecoder from '../Util/Binary/Decoder.mjs'
import BinaryEncoder from '../Util/Binary/Encoder.mjs'
/*
* Request to Promise transformer
*/
function rtop (request) {
return new Promise(function (resolve, reject) {
request.onerror = function (event) {
reject(new Error(event.target.error))
}
request.onblocked = function () {
location.reload()
}
request.onsuccess = function (event) {
resolve(event.target.result)
}
})
}
function openDB (room) {
return new Promise(function (resolve, reject) {
let request = indexedDB.open(room)
request.onupgradeneeded = function (event) {
const db = event.target.result
if (db.objectStoreNames.contains('updates')) {
db.deleteObjectStore('updates')
}
db.createObjectStore('updates', {autoIncrement: true})
}
request.onerror = function (event) {
reject(new Error(event.target.error))
}
request.onblocked = function () {
location.reload()
}
request.onsuccess = function (event) {
const db = event.target.result
db.onversionchange = function () { db.close() }
resolve(db)
}
})
}
function persist (room) {
let t = room.db.transaction(['updates'], 'readwrite')
let updatesStore = t.objectStore('updates')
return rtop(updatesStore.getAll())
.then(updates => {
// apply all previous updates before deleting them
room.mutex(() => {
updates.forEach(update => {
decodePersisted(y, new BinaryDecoder(update))
})
})
const encoder = new BinaryEncoder()
encodeStructsDS(y, encoder)
// delete all pending updates
rtop(updatesStore.clear()).then(() => {
// write current model
updatesStore.put(encoder.createBuffer())
})
})
}
function saveUpdate (room, updateBuffer) {
const db = room.db
if (db !== null) {
const t = db.transaction(['updates'], 'readwrite')
const updatesStore = t.objectStore('updates')
const updatePut = rtop(updatesStore.put(updateBuffer))
rtop(updatesStore.count()).then(cnt => {
if (cnt >= PREFERRED_TRIM_SIZE) {
persist(room)
}
})
return updatePut
} else {
room.createdStructs.push(update)
return room.dbPromise
}
}
const PREFERRED_TRIM_SIZE = 400
export default class IndexedDBPersistence {
constructor () {
this._rooms = new Map()
addEventListener('unload', () => {
this._rooms.forEach(room => {
if (room.db !== null) {
room.db.close()
} else {
room._db.then(db => db.close())
}
})
})
}
connectY (roomName, y) {
if (this._rooms.has(roomName)) {
throw new Error('A Y instance is already bound to this room!')
}
let room = {
db: null,
dbPromise: null,
channel: null,
mutex: createMutualExclude(),
y,
createdStructs: [] // document updates before db created
}
if (typeof BroadcastChannel !== 'undefined') {
room.channel = new BroadcastChannel('__yjs__' + room)
room.channel.addEventListener('message', e => {
room.mutex(function () {
decodePersisted(y, new BinaryDecoder(e.data))
})
})
}
y.on('destroyed', () => {
this.disconnectY(roomName, y)
})
y.on('afterTransaction', (y, transaction) => {
room.mutex(() => {
if (transaction.encodedStructsLen > 0) {
const encoder = new BinaryEncoder()
const update = new BinaryEncoder()
encodeUpdate(y, transaction.encodedStructs, update)
const updateBuffer = update.createBuffer()
if (room.channel !== null) {
room.channel.postMessage(updateBuffer)
}
if (transaction.encodedStructsLen > 0) {
if (room.db === null) {
room.createdStructs.push(updateBuffer)
} else {
saveUpdate(room, updateBuffer)
}
}
}
})
})
return room.dbPromise = openDB(roomName)
.then(db => {
room.db = db
const t = room.db.transaction(['updates'], 'readwrite')
const updatesStore = t.objectStore('updates')
return rtop(updatesStore.getAll()).then(updates => {
// apply all previous updates before deleting them
room.mutex(() => {
y.transact(() => {
updates.forEach(update => {
decodePersisted(y, new BinaryDecoder(update))
})
}, true)
})
return Promise.all(room.createdStructs.map(update => {
return saveUpdate(room, update)
})).then(() => {
room.createdStructs = []
})
})
})
}
disconnectY (roomName) {
const {
db, channel
} = this._rooms.get(roomName)
db.close()
if (channel !== null) {
channel.close()
}
this._rooms.delete(roomName)
}
/**
* Remove all persisted data that belongs to a room.
* Automatically destroys all Yjs all Yjs instances that persist to
* the room. If `destroyYjsInstances = false` the persistence functionality
* will be removed from the Yjs instances.
*/
removePersistedData (roomName, destroyYjsInstances = true) {
this.disconnectY(roomName)
return rtop(indexedDB.deleteDatabase(roomName))
}
}

View File

@ -1,174 +0,0 @@
/* global indexedDB, location, BroadcastChannel */
import Y from '../Y.mjs'
/*
* Request to Promise transformer
*/
function rtop (request) {
return new Promise(function (resolve, reject) {
request.onerror = function (event) {
reject(new Error(event.target.error))
}
request.onblocked = function () {
location.reload()
}
request.onsuccess = function (event) {
resolve(event.target.result)
}
})
}
function openDB (room) {
return new Promise(function (resolve, reject) {
let request = indexedDB.open(room)
window.r1 = request
request.onupgradeneeded = function (event) {
const db = event.target.result
if (db.objectStoreNames.contains('model')) {
db.deleteObjectStore('updates')
db.deleteObjectStore('model')
db.deleteObjectStore('custom')
}
db.createObjectStore('updates', {autoIncrement: true})
db.createObjectStore('model')
db.createObjectStore('custom')
}
request.onerror = function (event) {
reject(new Error(event.target.error))
}
request.onblocked = function () {
location.reload()
}
request.onsuccess = function (event) {
const db = event.target.result
db.onversionchange = function () { db.close() }
resolve(db)
}
})
}
const PREFERRED_TRIM_SIZE = 500
export default class IndexedDBPersistence extends Y.AbstractPersistence {
constructor (opts) {
super(opts)
window.addEventListener('unload', () => {
this.ys.forEach(function (cnf, y) {
if (cnf.db !== null) {
cnf.db.close()
} else {
cnf._db.then(db => db.close())
}
})
})
}
init (y) {
let cnf = this.ys.get(y)
let room = y.room
cnf.db = null
const dbOpened = openDB(room)
dbOpened.then(db => {
cnf.db = db
})
if (typeof BroadcastChannel !== 'undefined') {
cnf.channel = new BroadcastChannel('__yjs__' + room)
cnf.channel.addEventListener('message', e => {
cnf.mutualExclude(function () {
y.transact(function () {
Y.utils.integrateRemoteStructs(y, new Y.utils.BinaryDecoder(e.data))
})
})
})
} else {
cnf.channel = null
}
return dbOpened
}
deinit (y) {
let cnf = this.ys.get(y)
cnf.db.close()
super.deinit(y)
}
set (y, key, value) {
const cnf = this.ys.get(y)
const t = cnf.db.transaction(['custom'], 'readwrite')
const customStore = t.objectStore('custom')
return rtop(customStore.put(value, key))
}
get (y, key) {
const cnf = this.ys.get(y)
const t = cnf.db.transaction(['custom'], 'readwrite')
const customStore = t.objectStore('custom')
return rtop(customStore.get(key))
}
/**
* Remove all persisted data that belongs to a room.
* Automatically destroys all Yjs all Yjs instances that persist to
* the room. If `destroyYjsInstances = false` the persistence functionality
* will be removed from the Yjs instances.
*/
removePersistedData (room, destroyYjsInstances = true) {
super.removePersistedData(room, destroyYjsInstances)
return rtop(indexedDB.deleteDatabase(room))
}
saveUpdate (y, update) {
let cnf = this.ys.get(y)
if (cnf.channel !== null) {
cnf.channel.postMessage(update)
}
let t = cnf.db.transaction(['updates'], 'readwrite')
let updatesStore = t.objectStore('updates')
updatesStore.put(update)
let cntP = rtop(updatesStore.count())
cntP.then(cnt => {
if (cnt >= PREFERRED_TRIM_SIZE) {
this.persist(y)
}
})
}
saveStruct (y, struct) {
super.saveStruct(y, struct)
}
retrieve (y) {
let cnf = this.ys.get(y)
let t = cnf.db.transaction(['updates', 'model'], 'readonly')
let modelStore = t.objectStore('model')
let updatesStore = t.objectStore('updates')
return Promise.all([rtop(modelStore.get(0)), rtop(updatesStore.getAll())])
.then(([model, updates]) => {
super.retrieve(y, model, updates)
})
}
persist (y) {
let cnf = this.ys.get(y)
let db = cnf.db
let t = db.transaction(['updates', 'model'], 'readwrite')
let updatesStore = t.objectStore('updates')
return rtop(updatesStore.getAll())
.then(updates => {
// apply pending updates before deleting them
Y.AbstractPersistence.prototype.retrieve.call(this, y, null, updates)
// get binary model
let binaryModel = Y.AbstractPersistence.prototype.persist.call(this, y)
// delete all pending updates
if (updates.length > 0) {
let modelStore = t.objectStore('model')
modelStore.put(binaryModel, 0)
updatesStore.clear()
}
})
}
}
if (typeof Y !== 'undefined') {
extendYIndexedDBPersistence(Y) // eslint-disable-line
}

View File

@ -29,7 +29,7 @@ export function encodeStructsDS (y, encoder) {
}
/**
*Feed the Yjs instance with the persisted state
* Feed the Yjs instance with the persisted state
* @param {Yjs} y A Yjs instance.
* @param {BinaryDecoder} decoder A Decoder instance that holds the file content.
*/
@ -39,17 +39,13 @@ export function decodePersisted (y, decoder) {
const contentType = decoder.readVarUint()
switch (contentType) {
case PERSIST_UPDATE:
y.transact(() => {
integrateRemoteStructs(y, decoder)
})
integrateRemoteStructs(y, decoder)
break
case PERSIST_STRUCTS_DS:
y.transact(() => {
integrateRemoteStructs(y, decoder)
readDeleteSet(y, decoder)
})
integrateRemoteStructs(y, decoder)
readDeleteSet(y, decoder)
break
}
}
})
}, true)
}

View File

@ -113,20 +113,36 @@ export default class BinaryDecoder {
* Read string of variable length
* * varUint is used to store the length of the string
*
* Transforming utf8 to a string is pretty expensive. The code performs 10x better
* when String.fromCodePoint is fed with all characters as arguments.
* But most environments have a maximum number of arguments per functions.
* For effiency reasons we apply a maximum of 10000 characters at once.
*
* @return {String} The read String.
*/
readVarString () {
let len = this.readVarUint()
let remainingLen = this.readVarUint()
let encodedString = ''
let i = 0
while (remainingLen > 0) {
const nextLen = Math.min(remainingLen, 10000)
const bytes = new Array(nextLen)
for (let i = 0; i < nextLen; i++) {
bytes[i] = this.uint8arr[this.pos++]
}
encodedString += String.fromCodePoint.apply(null, bytes)
remainingLen -= nextLen
}
/*
//let bytes = new Array(len)
for (let i = 0; i < len; i++) {
//bytes[i] = this.uint8arr[this.pos++]
// encodedString += String.fromCodePoint(this.uint8arr[this.pos++])
encodedString += String(this.uint8arr[this.pos++])
encodedString += String.fromCodePoint(this.uint8arr[this.pos++])
// encodedString += String(this.uint8arr[this.pos++])
}
*/
//let encodedString = String.fromCodePoint.apply(null, bytes)
//return decodeURIComponent(escape(encodedString))
return encodedString
return decodeURIComponent(escape(encodedString))
}
/**

View File

@ -167,12 +167,11 @@ export default class BinaryEncoder {
* @param {String} str The string that is to be encoded.
*/
writeVarString (str) {
let encodedString = unescape(encodeURIComponent(str))
let bytes = encodedString.split('').map(c => c.codePointAt())
let len = bytes.length
const encodedString = unescape(encodeURIComponent(str))
const len = encodedString.length
this.writeVarUint(len)
for (let i = 0; i < len; i++) {
this.write(bytes[i])
this.write(encodedString.codePointAt(i))
}
}

View File

@ -246,6 +246,7 @@ export default class Y extends NamedEventHandler {
* Persisted data will remain until removed by the persistence adapter.
*/
destroy () {
this.emit('destroyed', true)
super.destroy()
this.share = null
if (this.connector != null) {