Implement persistence layer

This commit is contained in:
Kevin Jahns 2017-08-13 00:35:35 +02:00
parent 7e12ea2db5
commit 8770c8e934
8 changed files with 132 additions and 19 deletions

View File

@ -1,5 +1,9 @@
/* global Y */ /* global Y */
// eslint-disable-next-line
let search = new URLSearchParams(location.search)
let url = search.get('url')
// initialize a shared object. This function call returns a promise! // initialize a shared object. This function call returns a promise!
Y({ Y({
db: { db: {
@ -8,7 +12,7 @@ Y({
connector: { connector: {
name: 'websockets-client', name: 'websockets-client',
room: 'Textarea-example', room: 'Textarea-example',
url: 'http://127.0.0.1:1234' url: url || 'http://127.0.0.1:1234'
}, },
sourceDir: '/bower_components', sourceDir: '/bower_components',
share: { share: {

View File

@ -210,7 +210,8 @@ export default function extendConnector (Y/* :any */) {
/* /*
You received a raw message, and you know that it is intended for Yjs. Then call this function. You received a raw message, and you know that it is intended for Yjs. Then call this function.
*/ */
receiveMessage (sender, buffer) { receiveMessage (sender, buffer, skipAuth) {
skipAuth = skipAuth || false
if (!(buffer instanceof ArrayBuffer || buffer instanceof Uint8Array)) { if (!(buffer instanceof ArrayBuffer || buffer instanceof Uint8Array)) {
return Promise.reject(new Error('Expected Message to be an ArrayBuffer or Uint8Array!')) return Promise.reject(new Error('Expected Message to be an ArrayBuffer or Uint8Array!'))
} }
@ -227,7 +228,7 @@ export default function extendConnector (Y/* :any */) {
this.log('%s: Receive \'%s\' from %s', this.userId, messageType, sender) this.log('%s: Receive \'%s\' from %s', this.userId, messageType, sender)
this.logMessage('Message: %Y', buffer) this.logMessage('Message: %Y', buffer)
if (senderConn == null) { if (senderConn == null && !skipAuth) {
throw new Error('Received message from unknown peer!') throw new Error('Received message from unknown peer!')
} }
@ -253,21 +254,21 @@ export default function extendConnector (Y/* :any */) {
}) })
} }
} }
if (senderConn.auth != null) { if (skipAuth || senderConn.auth != null) {
return this.computeMessage(messageType, senderConn, decoder, encoder, sender) return this.computeMessage(messageType, senderConn, decoder, encoder, sender, skipAuth)
} else { } else {
senderConn.processAfterAuth.push([messageType, senderConn, decoder, encoder, sender]) senderConn.processAfterAuth.push([messageType, senderConn, decoder, encoder, sender, false])
} }
} }
computeMessage (messageType, senderConn, decoder, encoder, sender) { computeMessage (messageType, senderConn, decoder, encoder, sender, skipAuth) {
if (messageType === 'sync step 1' && (senderConn.auth === 'write' || senderConn.auth === 'read')) { if (messageType === 'sync step 1' && (senderConn.auth === 'write' || senderConn.auth === 'read')) {
// cannot wait for sync step 1 to finish, because we may wait for sync step 2 in sync step 1 (->lock) // cannot wait for sync step 1 to finish, because we may wait for sync step 2 in sync step 1 (->lock)
computeMessageSyncStep1(decoder, encoder, this, senderConn, sender) computeMessageSyncStep1(decoder, encoder, this, senderConn, sender)
return this.y.db.whenTransactionsFinished() return this.y.db.whenTransactionsFinished()
} else if (messageType === 'sync step 2' && senderConn.auth === 'write') { } else if (messageType === 'sync step 2' && senderConn.auth === 'write') {
return computeMessageSyncStep2(decoder, encoder, this, senderConn, sender) return computeMessageSyncStep2(decoder, encoder, this, senderConn, sender)
} else if (messageType === 'update' && senderConn.auth === 'write') { } else if (messageType === 'update' && (skipAuth || senderConn.auth === 'write')) {
return computeMessageUpdate(decoder, encoder, this, senderConn, sender) return computeMessageUpdate(decoder, encoder, this, senderConn, sender)
} else { } else {
return Promise.reject(new Error('Unable to receive message')) return Promise.reject(new Error('Unable to receive message'))

View File

@ -120,7 +120,7 @@ export default function extendDatabase (Y /* :any */) {
startGarbageCollector () { startGarbageCollector () {
this.gc = this.dbOpts.gc this.gc = this.dbOpts.gc
if (this.gc) { if (this.gc) {
this.gcTimeout = !this.dbOpts.gcTimeout ? 100000 : this.dbOpts.gcTimeout this.gcTimeout = !this.dbOpts.gcTimeout ? 30000 : this.dbOpts.gcTimeout
} else { } else {
this.gcTimeout = -1 this.gcTimeout = -1
} }

View File

@ -7,37 +7,46 @@ export class BinaryEncoder {
constructor () { constructor () {
this.data = [] this.data = []
} }
get pos () { get pos () {
return this.data.length return this.data.length
} }
createBuffer () { createBuffer () {
return Uint8Array.from(this.data).buffer return Uint8Array.from(this.data).buffer
} }
writeUint8 (num) { writeUint8 (num) {
this.data.push(num & bits8) this.data.push(num & bits8)
} }
setUint8 (pos, num) { setUint8 (pos, num) {
this.data[pos] = num & bits8 this.data[pos] = num & bits8
} }
writeUint16 (num) { writeUint16 (num) {
this.data.push(num & bits8, (num >>> 8) & bits8) this.data.push(num & bits8, (num >>> 8) & bits8)
} }
setUint16 (pos, num) { setUint16 (pos, num) {
this.data[pos] = num & bits8 this.data[pos] = num & bits8
this.data[pos + 1] = (num >>> 8) & bits8 this.data[pos + 1] = (num >>> 8) & bits8
} }
writeUint32 (num) { writeUint32 (num) {
for (let i = 0; i < 4; i++) { for (let i = 0; i < 4; i++) {
this.data.push(num & bits8) this.data.push(num & bits8)
num >>>= 8 num >>>= 8
} }
} }
setUint32 (pos, num) { setUint32 (pos, num) {
for (let i = 0; i < 4; i++) { for (let i = 0; i < 4; i++) {
this.data[pos + i] = num & bits8 this.data[pos + i] = num & bits8
num >>>= 8 num >>>= 8
} }
} }
writeVarUint (num) { writeVarUint (num) {
while (num >= 0b10000000) { while (num >= 0b10000000) {
this.data.push(0b10000000 | (bits7 & num)) this.data.push(0b10000000 | (bits7 & num))
@ -45,6 +54,7 @@ export class BinaryEncoder {
} }
this.data.push(bits7 & num) this.data.push(bits7 & num)
} }
writeVarString (str) { writeVarString (str) {
let bytes = utf8.setBytesFromString(str) let bytes = utf8.setBytesFromString(str)
let len = bytes.length let len = bytes.length
@ -53,6 +63,7 @@ export class BinaryEncoder {
this.data.push(bytes[i]) this.data.push(bytes[i])
} }
} }
writeOpID (id) { writeOpID (id) {
let user = id[0] let user = id[0]
this.writeVarUint(user) this.writeVarUint(user)
@ -68,19 +79,22 @@ export class BinaryDecoder {
constructor (buffer) { constructor (buffer) {
if (buffer instanceof ArrayBuffer) { if (buffer instanceof ArrayBuffer) {
this.uint8arr = new Uint8Array(buffer) this.uint8arr = new Uint8Array(buffer)
} else if (buffer instanceof Uint8Array) { } else if (buffer instanceof Uint8Array || (typeof Buffer !== 'undefined' && buffer instanceof Buffer)) {
this.uint8arr = buffer this.uint8arr = buffer
} else { } else {
throw new Error('Expected an ArrayBuffer or Uint8Array!') throw new Error('Expected an ArrayBuffer or Uint8Array!')
} }
this.pos = 0 this.pos = 0
} }
skip8 () { skip8 () {
this.pos++ this.pos++
} }
readUint8 () { readUint8 () {
return this.uint8arr[this.pos++] return this.uint8arr[this.pos++]
} }
readUint32 () { readUint32 () {
let uint = let uint =
this.uint8arr[this.pos] + this.uint8arr[this.pos] +
@ -90,9 +104,11 @@ export class BinaryDecoder {
this.pos += 4 this.pos += 4
return uint return uint
} }
peekUint8 () { peekUint8 () {
return this.uint8arr[this.pos] return this.uint8arr[this.pos]
} }
readVarUint () { readVarUint () {
let num = 0 let num = 0
let len = 0 let len = 0
@ -108,6 +124,7 @@ export class BinaryDecoder {
} }
} }
} }
readVarString () { readVarString () {
let len = this.readVarUint() let len = this.readVarUint()
let bytes = new Array(len) let bytes = new Array(len)
@ -116,12 +133,14 @@ export class BinaryDecoder {
} }
return utf8.getStringFromBytes(bytes) return utf8.getStringFromBytes(bytes)
} }
peekVarString () { peekVarString () {
let pos = this.pos let pos = this.pos
let s = this.readVarString() let s = this.readVarString()
this.pos = pos this.pos = pos
return s return s
} }
readOpID () { readOpID () {
let user = this.readVarUint() let user = this.readVarUint()
if (user !== 0xFFFFFF) { if (user !== 0xFFFFFF) {

View File

@ -34,7 +34,7 @@ export function logMessageUpdate (decoder, strBuilder) {
} }
export function computeMessageUpdate (decoder, encoder, conn) { export function computeMessageUpdate (decoder, encoder, conn) {
if (conn.y.db.forwardAppliedOperations) { if (conn.y.db.forwardAppliedOperations || conn.y.persistence != null) {
let messagePosition = decoder.pos let messagePosition = decoder.pos
let len = decoder.readUint32() let len = decoder.readUint32()
let delops = [] let delops = []
@ -45,7 +45,12 @@ export function computeMessageUpdate (decoder, encoder, conn) {
} }
} }
if (delops.length > 0) { if (delops.length > 0) {
conn.broadcastOps(delops) if (conn.y.db.forwardAppliedOperations) {
conn.broadcastOps(delops)
}
if (conn.y.persistence) {
conn.y.persistence.saveOperations(delops)
}
} }
decoder.pos = messagePosition decoder.pos = messagePosition
} }

43
src/Persistence.js Normal file
View File

@ -0,0 +1,43 @@
import { BinaryEncoder } from './Encoding.js'
export default function extendPersistence (Y) {
class AbstractPersistence {
constructor (y, opts) {
this.y = y
this.opts = opts
this.saveOperationsBuffer = []
this.log = Y.debug('y:persistence')
}
saveToMessageQueue (binary) {
this.log('Room %s: Save message to message queue', this.y.options.connector.room)
}
saveOperations (ops) {
ops = ops.map(function (op) {
return Y.Struct[op.struct].encode(op)
})
const saveOperations = () => {
if (this.saveOperationsBuffer.length > 0) {
let encoder = new BinaryEncoder()
encoder.writeVarString(this.opts.room)
encoder.writeVarString('update')
let ops = this.saveOperationsBuffer
this.saveOperationsBuffer = []
let length = ops.length
encoder.writeUint32(length)
for (var i = 0; i < length; i++) {
let op = ops[i]
Y.Struct[op.struct].binaryEncode(encoder, op)
}
this.saveToMessageQueue(encoder.createBuffer())
}
}
if (this.saveOperationsBuffer.length === 0) {
this.saveOperationsBuffer = ops
this.y.db.whenTransactionsFinished().then(saveOperations)
} else {
this.saveOperationsBuffer = this.saveOperationsBuffer.concat(ops)
}
}
}
Y.AbstractPersistence = AbstractPersistence
}

View File

@ -1,3 +1,4 @@
import { BinaryEncoder, BinaryDecoder } from './Encoding.js'
/* /*
Partial definition of a transaction Partial definition of a transaction
@ -97,6 +98,9 @@ export default function extendTransaction (Y) {
if (send.length > 0) { // TODO: && !this.store.forwardAppliedOperations (but then i don't send delete ops) if (send.length > 0) { // TODO: && !this.store.forwardAppliedOperations (but then i don't send delete ops)
// is connected, and this is not going to be send in addOperation // is connected, and this is not going to be send in addOperation
this.store.y.connector.broadcastOps(send) this.store.y.connector.broadcastOps(send)
if (this.store.y.persistence != null) {
this.store.y.persistence.saveOperations(send)
}
} }
} }
@ -679,10 +683,15 @@ export default function extendTransaction (Y) {
yield * this.garbageCollectOperation(o.id) yield * this.garbageCollectOperation(o.id)
} }
} }
if (this.store.forwardAppliedOperations) { if (this.store.forwardAppliedOperations || this.store.y.persistence != null) {
var ops = [] var ops = []
ops.push({struct: 'Delete', target: [del[0], del[1]], length: del[2]}) ops.push({struct: 'Delete', target: [del[0], del[1]], length: del[2]})
this.store.y.connector.broadcastOps(ops) if (this.store.forwardAppliedOperations) {
this.store.y.connector.broadcastOps(ops)
}
if (this.store.y.persistence != null) {
this.store.y.persistence.saveOperations(ops)
}
} }
} }
} }
@ -733,9 +742,14 @@ export default function extendTransaction (Y) {
* addOperation (op) { * addOperation (op) {
yield * this.os.put(op) yield * this.os.put(op)
// case op is created by this user, op is already broadcasted in applyCreatedOperations // case op is created by this user, op is already broadcasted in applyCreatedOperations
if (op.id[0] !== this.store.userId && this.store.forwardAppliedOperations && typeof op.id[1] !== 'string') { if (op.id[0] !== this.store.userId && typeof op.id[1] !== 'string') {
// is connected, and this is not going to be send in addOperation if (this.store.forwardAppliedOperations) {
this.store.y.connector.broadcastOps([op]) // is connected, and this is not going to be send in addOperation
this.store.y.connector.broadcastOps([op])
}
if (this.store.y.persistence != null) {
this.store.y.persistence.saveOperations([op])
}
} }
} }
// if insertion, try to combine with left insertion (if both have content property) // if insertion, try to combine with left insertion (if both have content property)
@ -1072,6 +1086,20 @@ export default function extendTransaction (Y) {
Y.Struct[op.struct].binaryEncode(encoder, Y.Struct[op.struct].encode(op)) Y.Struct[op.struct].binaryEncode(encoder, Y.Struct[op.struct].encode(op))
} }
} }
* toBinary () {
let encoder = new BinaryEncoder()
yield * this.writeOperationsUntransformed(encoder)
yield * this.writeDeleteSet(encoder)
return encoder.createBuffer()
}
* fromBinary (buffer) {
let decoder = new BinaryDecoder(buffer)
yield * this.applyOperationsUntransformed(decoder)
yield * this.applyDeleteSet(decoder)
}
/* /*
* Get the plain untransformed operations from the database. * Get the plain untransformed operations from the database.
* You can apply these operations using .applyOperationsUntransformed(ops) * You can apply these operations using .applyOperationsUntransformed(ops)

View File

@ -1,4 +1,5 @@
import extendConnector from './Connector.js' import extendConnector from './Connector.js'
import extendPersistence from './Persistence.js'
import extendDatabase from './Database.js' import extendDatabase from './Database.js'
import extendTransaction from './Transaction.js' import extendTransaction from './Transaction.js'
import extendStruct from './Struct.js' import extendStruct from './Struct.js'
@ -7,6 +8,7 @@ import debug from 'debug'
import { formatYjsMessage, formatYjsMessageType } from './MessageHandler.js' import { formatYjsMessage, formatYjsMessageType } from './MessageHandler.js'
extendConnector(Y) extendConnector(Y)
extendPersistence(Y)
extendDatabase(Y) extendDatabase(Y)
extendTransaction(Y) extendTransaction(Y)
extendStruct(Y) extendStruct(Y)
@ -159,6 +161,11 @@ class YConfig extends Y.utils.NamedEventHandler {
this.options = opts this.options = opts
this.db = new Y[opts.db.name](this, opts.db) this.db = new Y[opts.db.name](this, opts.db)
this.connector = new Y[opts.connector.name](this, opts.connector) this.connector = new Y[opts.connector.name](this, opts.connector)
if (opts.persistence != null) {
this.persistence = new Y[opts.persistence.name](this, opts.persistence)
} else {
this.persistence = null
}
this.connected = true this.connected = true
} }
init (callback) { init (callback) {
@ -188,9 +195,15 @@ class YConfig extends Y.utils.NamedEventHandler {
} }
share[propertyname] = yield * this.store.initType.call(this, id, args) share[propertyname] = yield * this.store.initType.call(this, id, args)
} }
this.store.whenTransactionsFinished()
.then(callback)
}) })
if (this.persistence != null) {
this.persistence.retrieveContent()
.then(() => this.db.whenTransactionsFinished())
.then(callback)
} else {
this.db.whenTransactionsFinished()
.then(callback)
}
} }
isConnected () { isConnected () {
return this.connector.isSynced return this.connector.isSynced