yjs/persistence/leveldb.js

125 lines
3.8 KiB
JavaScript

/**
* @module persistence/leveldb
* This module re-uses the encoding of syncProtocol to store and read updates from leveldb.
*/
const level = require('level')
const Y = require('../build/yjs.js')
const mux = Y.createMutex()
/*
* Improves the uniqueness of timestamps.
* We gamble with the fact that users won't create more than 10000 changes on a single document
* within one millisecond (also assuming clock works correctly).
*/
let timestampIterator = 0
/**
* @return {string} A random, time-based string starting with "${roomName}:"
*/
const getNextTimestamp = () => {
timestampIterator = (timestampIterator + 1) % 10000
return `${Date.now()}${timestampIterator.toString().padStart(4, '0')}`
}
/**
* @param {string} docName
* @return {string}
*/
const generateEntryKey = docName => `${docName}#${getNextTimestamp()}`
/**
*
* @param {any} db
* @param {string} docName
* @param {Uint8Array | ArrayBuffer} buf
*/
const writeEntry = (db, docName, buf) => db.put(generateEntryKey(docName), buf)
/**
* @param {Uint8Array} arr
* @param {Y.Y} ydocument
*/
const readEntry = (arr, ydocument) => mux(() =>
Y.syncProtocol.readSyncMessage(Y.decoding.createDecoder(arr), Y.encoding.createEncoder(), ydocument)
)
/**
* @param {any} db
* @param {string} docName
* @param {Y.Y} ydocument
*/
const loadFromPersistence = (db, docName, ydocument) => new Promise((resolve, reject)=>
db.createReadStream({
gte: `${docName}#`,
lte: `${docName}#Z`,
keys: false,
values: true
})
.on('data', data => readEntry(data, ydocument))
.on('error', reject)
.on('end', resolve)
.on('close', resolve)
)
const persistState = (db, docName, ydocument) => {
const encoder = Y.encoding.createEncoder()
Y.syncProtocol.writeSyncStep2(encoder, ydocument, new Map())
const entryKey = generateEntryKey(docName)
const entryPromise = db.put(entryKey, Y.encoding.toBuffer(encoder))
const delOps = []
return new Promise((resolve, reject) => db.createKeyStream({
gte: `${docName}#`,
lt: entryKey,
})
.on('data', key => delOps.push({ type: 'del', key }))
.on('error', reject)
.on('end', resolve)
.on('close', resolve)
).then(() => entryPromise).then(() => db.batch(delOps))
}
/**
* Persistence layer for Leveldb.
*/
exports.LevelDbPersistence = class LevelDbPersistence {
/**
* @param {string} fpath Path to leveldb database
*/
constructor (fpath) {
this.db = level(fpath, { valueEncoding: 'binary' })
}
/**
* Retrieve all data from LevelDB and automatically persist all document updates to leveldb.
*
* @param {string} docName
* @param {Y.Y} ydocument
*/
bindState (docName, ydocument) {
// write all updates received from other clients
// - unless it is created by this persistence layer (e.g. loadFromPersistence, we we mux).
ydocument.on('afterTransaction', (y, transaction) => {
if (transaction.encodedStructsLen > 0) {
mux(() => {
const encoder = Y.encoding.createEncoder()
Y.syncProtocol.writeUpdate(encoder, transaction.encodedStructsLen, transaction.encodedStructs)
writeEntry(this.db, docName, Y.encoding.toBuffer(encoder))
})
}
})
// read all data from persistence
return loadFromPersistence(this.db, docName, ydocument).then(() =>
// write current state (just in case anything was added before state was bound)
this.writeState(docName, ydocument)
)
}
/**
* Write current state to persistence layer. Deletes all entries that were made before.
* Call this method at any time - the recommended time to call this method is before the ydocument is destroyed.
*
* @param {string} docName
* @param {Y.Y} ydocument
*/
writeState (docName, ydocument) {
return persistState(this.db, docName, ydocument)
}
}