diff --git a/examples/textarea/index.js b/examples/textarea/index.js index 9e7403bd..d58ab9d1 100644 --- a/examples/textarea/index.js +++ b/examples/textarea/index.js @@ -1,5 +1,9 @@ /* global Y */ +// eslint-disable-next-line +let search = new URLSearchParams(location.search) +let url = search.get('url') + // initialize a shared object. This function call returns a promise! Y({ db: { @@ -8,7 +12,7 @@ Y({ connector: { name: 'websockets-client', room: 'Textarea-example', - url: 'http://127.0.0.1:1234' + url: url || 'http://127.0.0.1:1234' }, sourceDir: '/bower_components', share: { diff --git a/src/Connector.js b/src/Connector.js index 94bcf14f..46fc263f 100644 --- a/src/Connector.js +++ b/src/Connector.js @@ -210,7 +210,8 @@ export default function extendConnector (Y/* :any */) { /* You received a raw message, and you know that it is intended for Yjs. Then call this function. */ - receiveMessage (sender, buffer) { + receiveMessage (sender, buffer, skipAuth) { + skipAuth = skipAuth || false if (!(buffer instanceof ArrayBuffer || buffer instanceof Uint8Array)) { return Promise.reject(new Error('Expected Message to be an ArrayBuffer or Uint8Array!')) } @@ -227,7 +228,7 @@ export default function extendConnector (Y/* :any */) { this.log('%s: Receive \'%s\' from %s', this.userId, messageType, sender) this.logMessage('Message: %Y', buffer) - if (senderConn == null) { + if (senderConn == null && !skipAuth) { throw new Error('Received message from unknown peer!') } @@ -253,21 +254,21 @@ export default function extendConnector (Y/* :any */) { }) } } - if (senderConn.auth != null) { - return this.computeMessage(messageType, senderConn, decoder, encoder, sender) + if (skipAuth || senderConn.auth != null) { + return this.computeMessage(messageType, senderConn, decoder, encoder, sender, skipAuth) } else { - senderConn.processAfterAuth.push([messageType, senderConn, decoder, encoder, sender]) + senderConn.processAfterAuth.push([messageType, senderConn, decoder, encoder, sender, false]) } } - computeMessage (messageType, senderConn, decoder, encoder, sender) { + computeMessage (messageType, senderConn, decoder, encoder, sender, skipAuth) { if (messageType === 'sync step 1' && (senderConn.auth === 'write' || senderConn.auth === 'read')) { // cannot wait for sync step 1 to finish, because we may wait for sync step 2 in sync step 1 (->lock) computeMessageSyncStep1(decoder, encoder, this, senderConn, sender) return this.y.db.whenTransactionsFinished() } else if (messageType === 'sync step 2' && senderConn.auth === 'write') { return computeMessageSyncStep2(decoder, encoder, this, senderConn, sender) - } else if (messageType === 'update' && senderConn.auth === 'write') { + } else if (messageType === 'update' && (skipAuth || senderConn.auth === 'write')) { return computeMessageUpdate(decoder, encoder, this, senderConn, sender) } else { return Promise.reject(new Error('Unable to receive message')) diff --git a/src/Database.js b/src/Database.js index e3656c7d..7f904628 100644 --- a/src/Database.js +++ b/src/Database.js @@ -120,7 +120,7 @@ export default function extendDatabase (Y /* :any */) { startGarbageCollector () { this.gc = this.dbOpts.gc if (this.gc) { - this.gcTimeout = !this.dbOpts.gcTimeout ? 100000 : this.dbOpts.gcTimeout + this.gcTimeout = !this.dbOpts.gcTimeout ? 30000 : this.dbOpts.gcTimeout } else { this.gcTimeout = -1 } diff --git a/src/Encoding.js b/src/Encoding.js index 2308d22e..bceb3431 100644 --- a/src/Encoding.js +++ b/src/Encoding.js @@ -7,37 +7,46 @@ export class BinaryEncoder { constructor () { this.data = [] } + get pos () { return this.data.length } + createBuffer () { return Uint8Array.from(this.data).buffer } + writeUint8 (num) { this.data.push(num & bits8) } + setUint8 (pos, num) { this.data[pos] = num & bits8 } + writeUint16 (num) { this.data.push(num & bits8, (num >>> 8) & bits8) } + setUint16 (pos, num) { this.data[pos] = num & bits8 this.data[pos + 1] = (num >>> 8) & bits8 } + writeUint32 (num) { for (let i = 0; i < 4; i++) { this.data.push(num & bits8) num >>>= 8 } } + setUint32 (pos, num) { for (let i = 0; i < 4; i++) { this.data[pos + i] = num & bits8 num >>>= 8 } } + writeVarUint (num) { while (num >= 0b10000000) { this.data.push(0b10000000 | (bits7 & num)) @@ -45,6 +54,7 @@ export class BinaryEncoder { } this.data.push(bits7 & num) } + writeVarString (str) { let bytes = utf8.setBytesFromString(str) let len = bytes.length @@ -53,6 +63,7 @@ export class BinaryEncoder { this.data.push(bytes[i]) } } + writeOpID (id) { let user = id[0] this.writeVarUint(user) @@ -68,19 +79,22 @@ export class BinaryDecoder { constructor (buffer) { if (buffer instanceof ArrayBuffer) { this.uint8arr = new Uint8Array(buffer) - } else if (buffer instanceof Uint8Array) { + } else if (buffer instanceof Uint8Array || (typeof Buffer !== 'undefined' && buffer instanceof Buffer)) { this.uint8arr = buffer } else { throw new Error('Expected an ArrayBuffer or Uint8Array!') } this.pos = 0 } + skip8 () { this.pos++ } + readUint8 () { return this.uint8arr[this.pos++] } + readUint32 () { let uint = this.uint8arr[this.pos] + @@ -90,9 +104,11 @@ export class BinaryDecoder { this.pos += 4 return uint } + peekUint8 () { return this.uint8arr[this.pos] } + readVarUint () { let num = 0 let len = 0 @@ -108,6 +124,7 @@ export class BinaryDecoder { } } } + readVarString () { let len = this.readVarUint() let bytes = new Array(len) @@ -116,12 +133,14 @@ export class BinaryDecoder { } return utf8.getStringFromBytes(bytes) } + peekVarString () { let pos = this.pos let s = this.readVarString() this.pos = pos return s } + readOpID () { let user = this.readVarUint() if (user !== 0xFFFFFF) { diff --git a/src/MessageHandler.js b/src/MessageHandler.js index 0fdecf74..c62280eb 100644 --- a/src/MessageHandler.js +++ b/src/MessageHandler.js @@ -34,7 +34,7 @@ export function logMessageUpdate (decoder, strBuilder) { } export function computeMessageUpdate (decoder, encoder, conn) { - if (conn.y.db.forwardAppliedOperations) { + if (conn.y.db.forwardAppliedOperations || conn.y.persistence != null) { let messagePosition = decoder.pos let len = decoder.readUint32() let delops = [] @@ -45,7 +45,12 @@ export function computeMessageUpdate (decoder, encoder, conn) { } } if (delops.length > 0) { - conn.broadcastOps(delops) + if (conn.y.db.forwardAppliedOperations) { + conn.broadcastOps(delops) + } + if (conn.y.persistence) { + conn.y.persistence.saveOperations(delops) + } } decoder.pos = messagePosition } diff --git a/src/Persistence.js b/src/Persistence.js new file mode 100644 index 00000000..c5dbd681 --- /dev/null +++ b/src/Persistence.js @@ -0,0 +1,43 @@ +import { BinaryEncoder } from './Encoding.js' + +export default function extendPersistence (Y) { + class AbstractPersistence { + constructor (y, opts) { + this.y = y + this.opts = opts + this.saveOperationsBuffer = [] + this.log = Y.debug('y:persistence') + } + saveToMessageQueue (binary) { + this.log('Room %s: Save message to message queue', this.y.options.connector.room) + } + saveOperations (ops) { + ops = ops.map(function (op) { + return Y.Struct[op.struct].encode(op) + }) + const saveOperations = () => { + if (this.saveOperationsBuffer.length > 0) { + let encoder = new BinaryEncoder() + encoder.writeVarString(this.opts.room) + encoder.writeVarString('update') + let ops = this.saveOperationsBuffer + this.saveOperationsBuffer = [] + let length = ops.length + encoder.writeUint32(length) + for (var i = 0; i < length; i++) { + let op = ops[i] + Y.Struct[op.struct].binaryEncode(encoder, op) + } + this.saveToMessageQueue(encoder.createBuffer()) + } + } + if (this.saveOperationsBuffer.length === 0) { + this.saveOperationsBuffer = ops + this.y.db.whenTransactionsFinished().then(saveOperations) + } else { + this.saveOperationsBuffer = this.saveOperationsBuffer.concat(ops) + } + } + } + Y.AbstractPersistence = AbstractPersistence +} diff --git a/src/Transaction.js b/src/Transaction.js index a708619f..4e6932cd 100644 --- a/src/Transaction.js +++ b/src/Transaction.js @@ -1,3 +1,4 @@ +import { BinaryEncoder, BinaryDecoder } from './Encoding.js' /* Partial definition of a transaction @@ -97,6 +98,9 @@ export default function extendTransaction (Y) { if (send.length > 0) { // TODO: && !this.store.forwardAppliedOperations (but then i don't send delete ops) // is connected, and this is not going to be send in addOperation this.store.y.connector.broadcastOps(send) + if (this.store.y.persistence != null) { + this.store.y.persistence.saveOperations(send) + } } } @@ -679,10 +683,15 @@ export default function extendTransaction (Y) { yield * this.garbageCollectOperation(o.id) } } - if (this.store.forwardAppliedOperations) { + if (this.store.forwardAppliedOperations || this.store.y.persistence != null) { var ops = [] ops.push({struct: 'Delete', target: [del[0], del[1]], length: del[2]}) - this.store.y.connector.broadcastOps(ops) + if (this.store.forwardAppliedOperations) { + this.store.y.connector.broadcastOps(ops) + } + if (this.store.y.persistence != null) { + this.store.y.persistence.saveOperations(ops) + } } } } @@ -733,9 +742,14 @@ export default function extendTransaction (Y) { * addOperation (op) { yield * this.os.put(op) // case op is created by this user, op is already broadcasted in applyCreatedOperations - if (op.id[0] !== this.store.userId && this.store.forwardAppliedOperations && typeof op.id[1] !== 'string') { - // is connected, and this is not going to be send in addOperation - this.store.y.connector.broadcastOps([op]) + if (op.id[0] !== this.store.userId && typeof op.id[1] !== 'string') { + if (this.store.forwardAppliedOperations) { + // is connected, and this is not going to be send in addOperation + this.store.y.connector.broadcastOps([op]) + } + if (this.store.y.persistence != null) { + this.store.y.persistence.saveOperations([op]) + } } } // if insertion, try to combine with left insertion (if both have content property) @@ -1072,6 +1086,20 @@ export default function extendTransaction (Y) { Y.Struct[op.struct].binaryEncode(encoder, Y.Struct[op.struct].encode(op)) } } + + * toBinary () { + let encoder = new BinaryEncoder() + yield * this.writeOperationsUntransformed(encoder) + yield * this.writeDeleteSet(encoder) + return encoder.createBuffer() + } + + * fromBinary (buffer) { + let decoder = new BinaryDecoder(buffer) + yield * this.applyOperationsUntransformed(decoder) + yield * this.applyDeleteSet(decoder) + } + /* * Get the plain untransformed operations from the database. * You can apply these operations using .applyOperationsUntransformed(ops) diff --git a/src/y.js b/src/y.js index 7f43bd12..0dd86e43 100644 --- a/src/y.js +++ b/src/y.js @@ -1,4 +1,5 @@ import extendConnector from './Connector.js' +import extendPersistence from './Persistence.js' import extendDatabase from './Database.js' import extendTransaction from './Transaction.js' import extendStruct from './Struct.js' @@ -7,6 +8,7 @@ import debug from 'debug' import { formatYjsMessage, formatYjsMessageType } from './MessageHandler.js' extendConnector(Y) +extendPersistence(Y) extendDatabase(Y) extendTransaction(Y) extendStruct(Y) @@ -159,6 +161,11 @@ class YConfig extends Y.utils.NamedEventHandler { this.options = opts this.db = new Y[opts.db.name](this, opts.db) this.connector = new Y[opts.connector.name](this, opts.connector) + if (opts.persistence != null) { + this.persistence = new Y[opts.persistence.name](this, opts.persistence) + } else { + this.persistence = null + } this.connected = true } init (callback) { @@ -188,9 +195,15 @@ class YConfig extends Y.utils.NamedEventHandler { } share[propertyname] = yield * this.store.initType.call(this, id, args) } - this.store.whenTransactionsFinished() - .then(callback) }) + if (this.persistence != null) { + this.persistence.retrieveContent() + .then(() => this.db.whenTransactionsFinished()) + .then(callback) + } else { + this.db.whenTransactionsFinished() + .then(callback) + } } isConnected () { return this.connector.isSynced