reimplement persistence approach

This commit is contained in:
Kevin Jahns
2017-12-24 03:18:00 +01:00
parent 08f37a86e3
commit f2debc150c
27 changed files with 880 additions and 1275 deletions

View File

@@ -1,47 +1,81 @@
// import BinaryEncoder from './Binary/Encoder.js'
import BinaryEncoder from './Binary/Encoder.js'
import BinaryDecoder from './Binary/Decoder.js'
import { toBinary, fromBinary } from './MessageHandler/binaryEncode.js'
import { integrateRemoteStructs } from './MessageHandler/integrateRemoteStructs.js'
export default function extendPersistence (Y) {
class AbstractPersistence {
constructor (y, opts) {
this.y = y
this.opts = opts
this.saveOperationsBuffer = []
this.log = Y.debug('y:persistence')
}
function getFreshCnf () {
let buffer = new BinaryEncoder()
buffer.writeUint32(0)
return {
len: 0,
buffer
}
}
saveToMessageQueue (binary) {
this.log('Room %s: Save message to message queue', this.y.options.connector.room)
}
export default class AbstractPersistence {
constructor (opts) {
this.opts = opts
this.ys = new Map()
}
saveOperations (ops) {
ops = ops.map(function (op) {
return Y.Struct[op.struct].encode(op)
})
/*
const saveOperations = () => {
if (this.saveOperationsBuffer.length > 0) {
let encoder = new BinaryEncoder()
encoder.writeVarString(this.opts.room)
encoder.writeVarString('update')
let ops = this.saveOperationsBuffer
this.saveOperationsBuffer = []
let length = ops.length
encoder.writeUint32(length)
for (var i = 0; i < length; i++) {
let op = ops[i]
Y.Struct[op.struct].binaryEncode(encoder, op)
_init (y) {
let cnf = this.ys.get(y)
if (cnf === undefined) {
cnf = getFreshCnf()
this.ys.set(y, cnf)
this.init(y)
y.on('afterTransaction', () => {
let cnf = this.ys.get(y)
if (cnf.len > 0) {
cnf.buffer.setUint32(0, cnf.len)
this.saveUpdate(y, cnf.buffer.createBuffer())
let _cnf = getFreshCnf()
for (let key in _cnf) {
cnf[key] = _cnf[key]
}
this.saveToMessageQueue(encoder.createBuffer())
}
}
*/
if (this.saveOperationsBuffer.length === 0) {
this.saveOperationsBuffer = ops
} else {
this.saveOperationsBuffer = this.saveOperationsBuffer.concat(ops)
}
})
}
return this.retrieve(y).then(function () {
return Promise.resolve(cnf)
})
}
deinit (y) {
this.ys.delete(y)
}
/* overwrite */
saveUpdate (buffer) {
}
/**
* Save struct to update buffer.
* saveUpdate is called when transaction ends
*/
saveStruct (y, struct) {
let cnf = this.ys.get(y)
if (cnf !== undefined) {
struct._toBinary(cnf.buffer)
cnf.len++
}
}
Y.AbstractPersistence = AbstractPersistence
/* overwrite */
retrieve (y, model, updates) {
y.transact(function () {
if (model != null) {
fromBinary(y, new BinaryDecoder(new Uint8Array(model)))
}
if (updates != null) {
for (let i = 0; i < updates.length; i++) {
integrateRemoteStructs(y, new BinaryDecoder(new Uint8Array(updates[i])))
}
}
})
}
/* overwrite */
persist (y) {
return toBinary(y).createBuffer()
}
}