diff --git a/src/Connector.js b/src/Connector.js index 494e9552..8c0c05c8 100644 --- a/src/Connector.js +++ b/src/Connector.js @@ -1,32 +1,11 @@ -/* @flow */ -'use strict' - -function canRead (auth) { return auth === 'read' || auth === 'write' } -function canWrite (auth) { return auth === 'write' } +import { BinaryEncoder, BinaryDecoder } from './Encoding.js' +import { computeMessageSyncStep1, computeMessageSyncStep2, computeMessageUpdate } from './MessageHandler.js' export default function extendConnector (Y/* :any */) { class AbstractConnector { - /* :: - y: YConfig; - role: SyncRole; - connections: Object; - isSynced: boolean; - userEventListeners: Array; - whenSyncedListeners: Array; - currentSyncTarget: ?UserId; - debug: boolean; - syncStep2: Promise; - userId: UserId; - send: Function; - broadcast: Function; - broadcastOpBuffer: Array; - protocolVersion: number; - */ /* opts contains the following information: role : String Role of this client ("master" or "slave") - userId : String Uniquely defines the user. - debug: Boolean Whether to print debug messages (optional) */ constructor (y, opts) { this.y = y @@ -63,15 +42,6 @@ export default function extendConnector (Y/* :any */) { this.setUserId(Y.utils.generateUserId()) } } - resetAuth (auth) { - if (this.authInfo !== auth) { - this.authInfo = auth - this.broadcast({ - type: 'auth', - auth: this.authInfo - }) - } - } reconnect () { this.log('reconnecting..') return this.y.db.startGarbageCollector() @@ -137,8 +107,10 @@ export default function extendConnector (Y/* :any */) { } this.log('%s: User joined %s', this.userId, user) this.connections.set(user, { + uid: user, isSynced: false, - role: role + role: role, + processAfterAuth: [] }) let defer = {} defer.promise = new Promise(function (resolve) { defer.resolve = resolve }) @@ -179,19 +151,14 @@ export default function extendConnector (Y/* :any */) { if (syncUser != null) { this.currentSyncTarget = syncUser this.y.db.requestTransaction(function * () { - var stateSet = yield * this.getStateSet() - // var deleteSet = yield * this.getDeleteSet() - var answer = { - type: 'sync step 1', - stateSet: stateSet, - // deleteSet: deleteSet, - protocolVersion: conn.protocolVersion, - auth: conn.authInfo - } - if (conn.preferUntransformed && Object.keys(stateSet).length === 0) { - answer.preferUntransformed = true - } - conn.send(syncUser, answer) + let encoder = new BinaryEncoder() + encoder.writeVarString('sync step 1') + encoder.writeVarString(conn.authInfo || '') + encoder.writeVarUint(conn.protocolVersion) + let preferUntransformed = conn.preferUntransformed && this.os.length === 0 // TODO: length may not be defined + encoder.writeUint8(preferUntransformed ? 1 : 0) + yield * this.writeStateSet(encoder) + conn.send(syncUser, encoder.createBuffer()) }) } else { if (!conn.isSynced) { @@ -211,13 +178,13 @@ export default function extendConnector (Y/* :any */) { } } } - send (uid, message) { - this.log('%s: Send \'%s\' to %s', this.userId, message.type, uid) - this.logMessage('Message: %j', message) + send (uid, buffer) { + this.log('%s: Send \'%y\' to %s', this.userId, buffer, uid) + this.logMessage('Message: %Y', buffer) } - broadcast (message) { - this.log('%s: Broadcast \'%s\'', this.userId, message.type) - this.logMessage('Message: %j', message) + broadcast (buffer) { + this.log('%s: Broadcast \'%y\'', this.userId, buffer) + this.logMessage('Message: %Y', buffer) } /* Buffer operations, and broadcast them when ready. @@ -229,11 +196,17 @@ export default function extendConnector (Y/* :any */) { var self = this function broadcastOperations () { if (self.broadcastOpBuffer.length > 0) { - self.broadcast({ - type: 'update', - ops: self.broadcastOpBuffer - }) + let encoder = new BinaryEncoder() + encoder.writeVarString('update') + let ops = self.broadcastOpBuffer self.broadcastOpBuffer = [] + let length = ops.length + encoder.writeUint32(length) + for (var i = 0; i < length; i++) { + let op = ops[i] + Y.Struct[op.struct].binaryEncode(encoder, op) + } + self.broadcast(encoder.createBuffer()) } } if (this.broadcastOpBuffer.length === 0) { @@ -246,119 +219,60 @@ export default function extendConnector (Y/* :any */) { /* You received a raw message, and you know that it is intended for Yjs. Then call this function. */ - receiveMessage (sender/* :UserId */, message/* :Message */) { + async receiveMessage (sender, buffer) { if (sender === this.userId) { - return Promise.resolve() + return } - this.log('%s: Receive \'%s\' from %s', this.userId, message.type, sender) - this.logMessage('Message: %j', message) - if (message.protocolVersion != null && message.protocolVersion !== this.protocolVersion) { - console.warn( - `You tried to sync with a yjs instance that has a different protocol version - (You: ${this.protocolVersion}, Client: ${message.protocolVersion}). - The sync was stopped. You need to upgrade your dependencies (especially Yjs & the Connector)! - `) - this.send(sender, { - type: 'sync stop', - protocolVersion: this.protocolVersion - }) - return Promise.reject(new Error('Incompatible protocol version')) - } - if (message.auth != null && this.connections.has(sender)) { - // authenticate using auth in message - var auth = this.checkAuth(message.auth, this.y) - this.connections.get(sender).auth = auth - auth.then(auth => { - for (var f of this.userEventListeners) { - f({ - action: 'userAuthenticated', - user: sender, - auth: auth - }) - } - }) - } else if (this.connections.has(sender) && this.connections.get(sender).auth == null) { - // authenticate without otherwise - this.connections.get(sender).auth = this.checkAuth(null, this.y) - } - if (this.connections.has(sender) && this.connections.get(sender).auth != null) { - return this.connections.get(sender).auth.then(auth => { - if (message.type === 'sync step 1' && canRead(auth)) { - let conn = this - let m = message - let wait // wait for sync step 2 to complete - if (this.role === 'slave') { - wait = Promise.all(Array.from(this.connections.values()) - .filter(conn => conn.role === 'master') - .map(conn => conn.syncStep2.promise) - ) - } else { - wait = Promise.resolve() - } - wait.then(() => { - this.y.db.requestTransaction(function * () { - var currentStateSet = yield * this.getStateSet() - // TODO: remove - // if (canWrite(auth)) { - // yield * this.applyDeleteSet(m.deleteSet) - // } + let decoder = new BinaryDecoder(buffer) + let encoder = new BinaryEncoder() + let messageType = decoder.readVarString() + let senderConn = this.connections.get(sender) - var ds = yield * this.getDeleteSet() - var answer = { - type: 'sync step 2', - stateSet: currentStateSet, - deleteSet: ds, - protocolVersion: conn.protocolVersion, - auth: conn.authInfo - } - if (message.preferUntransformed === true && Object.keys(m.stateSet).length === 0) { - answer.osUntransformed = yield * this.getOperationsUntransformed() - } else { - answer.os = yield * this.getOperations(m.stateSet) - } - conn.send(sender, answer) - }) - }) - } else if (message.type === 'sync step 2' && canWrite(auth)) { - var db = this.y.db - let defer = this.connections.get(sender).syncStep2 - let m = message - // apply operations first - db.requestTransaction(function * () { - // yield * this.applyDeleteSet(m.deleteSet) - if (m.osUntransformed != null) { - yield * this.applyOperationsUntransformed(m.osUntransformed, m.stateSet) - } else { - this.store.apply(m.os) - } - // defer.resolve() - }) - // then apply ds - db.whenTransactionsFinished().then(() => { - db.requestTransaction(function * () { - yield * this.applyDeleteSet(m.deleteSet) - }) - defer.resolve() - }) - var self = this - this.connections.get(sender).syncStep2.promise.then(function () { - self._setSyncedWith(sender) - }) - return defer.promise - } else if (message.type === 'update' && canWrite(auth)) { - if (this.y.db.forwardAppliedOperations) { - var delops = message.ops.filter(function (o) { - return o.struct === 'Delete' - }) - if (delops.length > 0) { - this.broadcastOps(delops) - } + if (senderConn == null) { + throw new Error('Received message from unknown peer!') + } + + if (messageType === 'sync step 1' || messageType === 'sync step 2') { + let auth = decoder.readVarUint() + if (senderConn.auth == null) { + // check auth + let authPermissions = await this.checkAuth(auth, this.y, sender) + senderConn.auth = authPermissions + this.y.emit('userAuthenticated', { + user: senderConn.uid, + auth: authPermissions + }) + senderConn.syncStep2.promise.then(() => { + if (senderConn.processAfterAuth == null) { + return } - this.y.db.apply(message.ops) - } - }) + for (let i = 0; i < senderConn.processAfterAuth.length; i++) { + let m = senderConn.processAfterAuth[i] + this.receiveMessage(m[0], m[1]) + } + senderConn.processAfterAuth = null + }) + } + } + + if (senderConn.auth == null) { + senderConn.processAfterAuth.push([sender, buffer]) + return + } + + this.log('%s: Receive \'%s\' from %s', this.userId, messageType, sender) + this.logMessage('Message: %Y', buffer) + + if (messageType === 'sync step 1' && (senderConn.auth === 'write' || senderConn.auth === 'read')) { + // cannot wait for sync step 1 to finish, because we may wait for sync step 2 in sync step 1 (->lock) + computeMessageSyncStep1(decoder, encoder, this, senderConn, sender) + return this.y.db.whenTransactionsFinished() + } else if (messageType === 'sync step 2' && senderConn.auth === 'write') { + return computeMessageSyncStep2(decoder, encoder, this, senderConn, sender) + } else if (messageType === 'update' && senderConn.auth === 'write') { + return computeMessageUpdate(decoder, encoder, this, senderConn, sender) } else { - return Promise.reject(new Error('Unable to deliver message')) + console.error('Unable to receive message') } } _setSyncedWith (user) { diff --git a/src/Database.js b/src/Database.js index a5cf3976..e3656c7d 100644 --- a/src/Database.js +++ b/src/Database.js @@ -306,10 +306,12 @@ export default function extendDatabase (Y /* :any */) { * check if it is an expected op (otherwise wait for it) * check if was deleted, apply a delete operation after op was applied */ - apply (ops) { + applyOperations (decoder) { this.opsReceivedTimestamp = new Date() - for (var i = 0; i < ops.length; i++) { - var o = ops[i] + let length = decoder.readUint32() + + for (var i = 0; i < length; i++) { + let o = Y.Struct.binaryDecodeOperation(decoder) if (o.id == null || o.id[0] !== this.y.connector.userId) { var required = Y.Struct[o.struct].requiredOps(o) if (o.requires != null) { @@ -590,7 +592,7 @@ export default function extendDatabase (Y /* :any */) { op.type = typedefinition[0].name this.requestTransaction(function * () { - if (op.id[0] === -1) { + if (op.id[0] === 0xFFFFFF) { yield * this.setOperation(op) } else { yield * this.applyCreatedOperations([op]) diff --git a/src/Encoding.js b/src/Encoding.js index b1669c55..5faa5afe 100644 --- a/src/Encoding.js +++ b/src/Encoding.js @@ -1,102 +1,127 @@ import utf8 from 'utf-8' const bits7 = 0b1111111 - -export class BinaryLength { - constructor () { - this.length = 0 - } - writeUint8 (num) { - this.length++ - } - writeVarUint (num) { - while (num >= 0b10000000) { - this.length++ - num >>= 7 - } - this.length++ - } - writeVarString (str) { - let len = utf8.setBytesFromString(str).length - this.writeVarUint(len) - this.length += len - } - writeOpID (id) { - this.writeVarUint(id[0]) - this.writeVarUint(id[1]) - } -} +const bits8 = 0b11111111 export class BinaryEncoder { - constructor (binaryLength) { - this.dataview = new DataView(new ArrayBuffer(binaryLength.length)) - this.pos = 0 + constructor () { + this.data = [] + } + get pos () { + return this.data.length + } + createBuffer () { + return Uint8Array.from(this.data).buffer } writeUint8 (num) { - this.dataview.setUint8(this.pos++, num) + this.data.push(num & bits8) + } + setUint8 (pos, num) { + this.data[pos] = num & bits8 + } + writeUint16 (num) { + this.data.push(num & bits8, (num >> 8) & bits8) + } + setUint16 (pos, num) { + this.data[pos] = num & bits8 + this.data[pos + 1] = (num >> 8) & bits8 + } + writeUint32 (num) { + for (let i = 0; i < 4; i++) { + this.data.push(num & bits8) + num >>= 8 + } + } + setUint32 (pos, num) { + for (let i = 0; i < 4; i++) { + this.data[pos + i] = num & bits8 + num >>= 8 + } } writeVarUint (num) { while (num >= 0b10000000) { - this.dataview.setUint8(this.pos++, 0b10000000 | (bits7 & num)) + this.data.push(0b10000000 | (bits7 & num)) num >>= 7 } - this.dataview.setUint8(this.pos++, bits7 & num) + this.data.push(bits7 & num) } writeVarString (str) { let bytes = utf8.setBytesFromString(str) let len = bytes.length this.writeVarUint(len) for (let i = 0; i < len; i++) { - this.dataview.setUint8(this.pos++, bytes[i]) + this.data.push(bytes[i]) } } writeOpID (id) { - this.writeVarUint(id[0]) - this.writeVarUint(id[1]) + let user = id[0] + this.writeVarUint(user) + if (user !== 0xFFFFFF) { + this.writeVarUint(id[1]) + } else { + this.writeVarString(id[1]) + } } } export class BinaryDecoder { - constructor (dataview) { - this.dataview = dataview + constructor (buffer) { + this.uint8arr = new Uint8Array(buffer) this.pos = 0 } skip8 () { this.pos++ } - skip16 () { - this.pos += 2 - } - skip32 () { - this.pos += 4 - } - skipVar () { - while (this.dataview.getUint8(this.pos++) >= 1 << 7) { } - } readUint8 () { - return this.dataview.getUint8(this.pos++) + return this.uint8arr[this.pos++] + } + readUint32 () { + let uint = + this.uint8arr[this.pos] + + (this.uint8arr[this.pos + 1] << 8) + + (this.uint8arr[this.pos + 2] << 16) + + (this.uint8arr[this.pos + 3] << 24) + this.pos += 4 + return uint + } + peekUint8 () { + return this.uint8arr[this.pos] } readVarUint () { let num = 0 let len = 0 while (true) { - let r = this.dataview.getUint8(this.pos++) + let r = this.uint8arr[this.pos++] num = num | ((r & bits7) << len) len += 7 if (r < 1 << 7) { return num } + if (len > 35) { + throw new Error('Integer out of range!') + } } } readVarString () { let len = this.readVarUint() let bytes = new Array(len) for (let i = 0; i < len; i++) { - bytes[i] = this.dataview.getUint8(this.pos++) + bytes[i] = this.uint8arr[this.pos++] } return utf8.getStringFromBytes(bytes) } + peekVarString () { + let pos = this.pos + let s = this.readVarString() + this.pos = pos + return s + } readOpID () { - return [this.readVarUint(), this.readVarUint()] + let user = this.readVarUint() + if (user !== 0xFFFFFF) { + return [user, this.readVarUint()] + } else { + return [user, this.readVarString()] + } } } diff --git a/src/MessageHandler.js b/src/MessageHandler.js new file mode 100644 index 00000000..161f1985 --- /dev/null +++ b/src/MessageHandler.js @@ -0,0 +1,172 @@ + +import Y from './y.js' +import { BinaryDecoder } from './Encoding.js' + +export function formatYjsMessage (buffer) { + let decoder = new BinaryDecoder(buffer) + let type = decoder.readVarString() + let strBuilder = [] + strBuilder.push('\n === ' + type + ' ===\n') + if (type === 'update') { + logMessageUpdate(decoder, strBuilder) + } else if (type === 'sync step 1') { + logMessageSyncStep1(decoder, strBuilder) + } else if (type === 'sync step 2') { + logMessageSyncStep2(decoder, strBuilder) + } else { + strBuilder.push('-- Unknown message type - probably an encoding issue!!!') + } + return strBuilder.join('') +} + +export function formatYjsMessageType (buffer) { + let decoder = new BinaryDecoder(buffer) + return decoder.readVarString() +} + +export async function logMessageUpdate (decoder, strBuilder) { + let len = decoder.readUint32() + for (let i = 0; i < len; i++) { + strBuilder.push(JSON.stringify(Y.Struct.binaryDecodeOperation(decoder)) + '\n') + } +} + +export async function computeMessageUpdate (decoder, encoder, conn) { + if (conn.y.db.forwardAppliedOperations) { + let messagePosition = decoder.pos + let len = decoder.readUint32() + let delops = [] + for (let i = 0; i < len; i++) { + let op = Y.Struct.binaryDecodeOperation(decoder) + if (op.struct === 'Delete') { + delops.push(op) + } + } + if (delops.length > 0) { + conn.broadcastOps(delops) + } + decoder.pos = messagePosition + } + conn.y.db.applyOperations(decoder) +} + +export function logMessageSyncStep1 (decoder, strBuilder) { + let auth = decoder.readVarString() + let protocolVersion = decoder.readVarUint() + let preferUntransformed = decoder.readUint8() === 1 + strBuilder.push(` + - auth: "${auth}" + - protocolVersion: ${protocolVersion} + - preferUntransformed: ${preferUntransformed} +`) + logSS(decoder, strBuilder) +} + +export async function computeMessageSyncStep1 (decoder, encoder, conn, senderConn, sender) { + let protocolVersion = decoder.readVarUint() + let preferUntransformed = decoder.readUint8() === 1 + + // check protocol version + if (protocolVersion !== conn.protocolVersion) { + console.warn( + `You tried to sync with a yjs instance that has a different protocol version + (You: ${protocolVersion}, Client: ${protocolVersion}). + The sync was stopped. You need to upgrade your dependencies (especially Yjs & the Connector)! + `) + conn.y.destroy() + } + + if (conn.role === 'slave') { + // wait for sync step 2 to complete + await Promise.all(Array.from(conn.connections.values()) + .filter(conn => conn.role === 'master') + .map(conn => conn.syncStep2.promise) + ) + } + conn.y.db.requestTransaction(function * () { + encoder.writeVarString('sync step 2') + encoder.writeVarString(conn.authInfo || '') + let emptyStateSet = this.ds.length === 0 // TODO: length may not always be available + + if (preferUntransformed && emptyStateSet) { + encoder.writeUint8(1) + yield * this.writeOperationsUntransformed(encoder) + } else { + encoder.writeUint8(0) + yield * this.writeOperations(encoder, decoder) + } + + yield * this.writeDeleteSet(encoder) + conn.send(senderConn.uid, encoder.createBuffer()) + }) + await conn.y.db.whenTransactionsFinished() +} + +export function logSS (decoder, strBuilder) { + strBuilder.push(' == SS: \n') + let len = decoder.readUint32() + for (let i = 0; i < len; i++) { + let user = decoder.readVarUint() + let clock = decoder.readVarUint() + strBuilder.push(` - ${user}: ${clock}`) + } +} + +export function logOS (decoder, strBuilder) { + strBuilder.push(' == OS: \n') + let len = decoder.readUint32() + for (let i = 0; i < len; i++) { + let op = Y.Struct.binaryDecodeOperation(decoder) + strBuilder.push(JSON.stringify(op) + '\n') + } +} + +export function logDS (decoder, strBuilder) { + strBuilder.push(' == DS: \n') + let len = decoder.readUint32() + for (let i = 0; i < len; i++) { + let user = decoder.readVarUint() + strBuilder.push(` User: ${user}: `) + let len2 = decoder.readVarUint() + for (let j = 0; j < len2; j++) { + let from = decoder.readVarUint() + let to = decoder.readVarUint() + let gc = decoder.readUint8() === 1 + strBuilder.push(`[${from}, ${to}, ${gc}]`) + } + } +} + +export function logMessageSyncStep2 (decoder, strBuilder) { + strBuilder.push(' - auth: ' + decoder.readVarString() + '\n') + let osTransformed = decoder.readUint8() === 1 + strBuilder.push(' - osUntransformed: ' + osTransformed + '\n') + logOS(decoder, strBuilder) + if (osTransformed) { + logSS(decoder, strBuilder) + } + logDS(decoder, strBuilder) +} + +export async function computeMessageSyncStep2 (decoder, encoder, conn, senderConn, sender) { + var db = conn.y.db + let defer = senderConn.syncStep2 + + // apply operations first + db.requestTransaction(function * () { + let osUntransformed = decoder.readUint8() + if (osUntransformed === 1) { + yield * this.applyOperationsUntransformed(decoder) + } else { + this.store.applyOperations(decoder) + } + }) + // then apply ds + await db.whenTransactionsFinished() + db.requestTransaction(function * () { + yield * this.applyDeleteSet(decoder) + }) + await db.whenTransactionsFinished() + conn._setSyncedWith(sender) + defer.resolve() +} diff --git a/src/Struct.js b/src/Struct.js index a766e4de..6b2bbdb2 100644 --- a/src/Struct.js +++ b/src/Struct.js @@ -23,6 +23,20 @@ const CMAP = 3 */ export default function extendStruct (Y) { var Struct = { + binaryDecodeOperation: function (decoder) { + let code = decoder.peekUint8() + if (code === CDELETE) { + return Y.Struct.Delete.binaryDecode(decoder) + } else if (code === CINSERT) { + return Y.Struct.Insert.binaryDecode(decoder) + } else if (code === CLIST) { + return Y.Struct.List.binaryDecode(decoder) + } else if (code === CMAP) { + return Y.Struct.Map.binaryDecode(decoder) + } else { + throw new Error('Unable to decode operation!') + } + }, /* This is the only operation that is actually not a structure, because it is not stored in the OS. This is why it _does not_ have an id diff --git a/src/Transaction.js b/src/Transaction.js index 5afe9e3d..37b13ecd 100644 --- a/src/Transaction.js +++ b/src/Transaction.js @@ -1,5 +1,3 @@ -/* @flow */ -'use strict' /* Partial definition of a transaction @@ -96,7 +94,7 @@ export default function extendTransaction (Y) { send.push(Y.Struct[op.struct].encode(op)) } } - if (this.store.y.connector.isSynced && send.length > 0) { // TODO: && !this.store.forwardAppliedOperations (but then i don't send delete ops) + if (send.length > 0) { // TODO: && !this.store.forwardAppliedOperations (but then i don't send delete ops) // is connected, and this is not going to be send in addOperation this.store.y.connector.broadcastOps(send) } @@ -588,12 +586,20 @@ export default function extendTransaction (Y) { apply a delete set in order to get the state of the supplied ds */ - * applyDeleteSet (ds) { + * applyDeleteSet (decoder) { var deletions = [] - for (var user in ds) { - var dv = ds[user] - user = Number.parseInt(user, 10) + let dsLength = decoder.readUint32() + for (let i = 0; i < dsLength; i++) { + let user = decoder.readVarUint() + let dv = [] + let dvLength = decoder.readVarUint() + for (let j = 0; j < dvLength; j++) { + let from = decoder.readVarUint() + let len = decoder.readVarUint() + let gc = decoder.readUint8() === 1 + dv.push([from, len, gc]) + } var pos = 0 var d = dv[pos] yield * this.ds.iterate(this, [user, 0], [user, Number.MAX_VALUE], function * (n) { @@ -687,21 +693,34 @@ export default function extendTransaction (Y) { /* A DeleteSet (ds) describes all the deleted ops in the OS */ - * getDeleteSet () { - var ds = {} + * writeDeleteSet (encoder) { + var ds = new Map() yield * this.ds.iterate(this, null, null, function * (n) { var user = n.id[0] var counter = n.id[1] var len = n.len var gc = n.gc - var dv = ds[user] + var dv = ds.get(user) if (dv === void 0) { dv = [] - ds[user] = dv + ds.set(user, dv) } dv.push([counter, len, gc]) }) - return ds + let keys = Array.from(ds.keys()) + encoder.writeUint32(keys.length) + for (var i = 0; i < keys.length; i++) { + let user = keys[i] + let deletions = ds.get(user) + encoder.writeVarUint(user) + encoder.writeVarUint(deletions.length) + for (var j = 0; j < deletions.length; j++) { + let del = deletions[j] + encoder.writeVarUint(del[0]) + encoder.writeVarUint(del[1]) + encoder.writeUint8(del[2] ? 1 : 0) + } + } } * isDeleted (id) { var n = yield * this.ds.findWithUpperBound(id) @@ -713,7 +732,8 @@ export default function extendTransaction (Y) { } * addOperation (op) { yield * this.os.put(op) - if (this.store.y.connector.isSynced && this.store.forwardAppliedOperations && typeof op.id[1] !== 'string') { + // case op is created by this user, op is already broadcasted in applyCreatedOperations + if (op.id[0] !== this.store.userId && this.store.forwardAppliedOperations && typeof op.id[1] !== 'string') { // is connected, and this is not going to be send in addOperation this.store.y.connector.broadcastOps([op]) } @@ -822,11 +842,11 @@ export default function extendTransaction (Y) { } * getOperation (id/* :any */)/* :Transaction */ { var o = yield * this.os.find(id) - if (id[0] !== -1 || o != null) { + if (id[0] !== 0xFFFFFF || o != null) { return o } else { // type is string // generate this operation? - var comp = id[1].split(-1) + var comp = id[1].split('_') if (comp.length > 1) { var struct = comp[0] var op = Y.Struct[struct].create(id) @@ -879,6 +899,18 @@ export default function extendTransaction (Y) { }) return ss } + * writeStateSet (encoder) { + let lenPosition = encoder.pos + let len = 0 + encoder.writeUint32(0) + yield * this.ss.iterate(this, null, null, function * (n) { + encoder.writeVarUint(n.id[0]) + encoder.writeVarUint(n.clock) + len++ + }) + encoder.setUint32(lenPosition, len) + return len === 0 + } /* Here, we make all missing operations executable for the receiving user. @@ -928,17 +960,17 @@ export default function extendTransaction (Y) { * getOperations (startSS) { // TODO: use bounds here! if (startSS == null) { - startSS = {} + startSS = new Map() } var send = [] var endSV = yield * this.getStateVector() for (let endState of endSV) { let user = endState.user - if (user === -1) { + if (user === 0xFFFFFF) { continue } - let startPos = startSS[user] || 0 + let startPos = startSS.get(user) || 0 if (startPos > 0) { // There is a change that [user, startPos] is in a composed Insertion (with a smaller counter) // find out if that is the case @@ -948,19 +980,19 @@ export default function extendTransaction (Y) { startPos = firstMissing.id[1] } } - startSS[user] = startPos + startSS.set(user, startPos) } for (let endState of endSV) { let user = endState.user - let startPos = startSS[user] - if (user === -1) { + let startPos = startSS.get(user) + if (user === 0xFFFFFF) { continue } yield * this.os.iterate(this, [user, startPos], [user, Number.MAX_VALUE], function * (op) { op = Y.Struct[op.struct].encode(op) if (op.struct !== 'Insert') { send.push(op) - } else if (op.right == null || op.right[1] < (startSS[op.right[0]] || 0)) { + } else if (op.right == null || op.right[1] < (startSS.get(op.right[0]) || 0)) { // case 1. op.right is known // this case is only reached if op.right is known. // => this is not called for op.left, as op.right is unknown @@ -978,7 +1010,7 @@ export default function extendTransaction (Y) { op.left = null send.push(op) /* not necessary, as o is already sent.. - if (!Y.utils.compareIds(o.id, op.id) && o.id[1] >= (startSS[o.id[0]] || 0)) { + if (!Y.utils.compareIds(o.id, op.id) && o.id[1] >= (startSS.get(o.id[0]) || 0)) { // o is not op && o is unknown o = Y.Struct[op.struct].encode(o) o.right = missingOrigins[missingOrigins.length - 1].id @@ -992,7 +1024,7 @@ export default function extendTransaction (Y) { while (missingOrigins.length > 0 && Y.utils.matchesId(o, missingOrigins[missingOrigins.length - 1].origin)) { missingOrigins.pop() } - if (o.id[1] < (startSS[o.id[0]] || 0)) { + if (o.id[1] < (startSS.get(o.id[0]) || 0)) { // case 2. o is known op.left = Y.utils.getLastId(o) send.push(op) @@ -1024,28 +1056,48 @@ export default function extendTransaction (Y) { } return send.reverse() } + + * writeOperations (encoder, decoder) { + let ss = new Map() + let ssLength = decoder.readUint32() + for (let i = 0; i < ssLength; i++) { + let user = decoder.readUint32() + let clock = decoder.readUint32() + ss.set(user, clock) + } + let ops = yield * this.getOperations(ss) + encoder.writeUint32(ops.length) + for (let i = 0; i < ops.length; i++) { + let op = ops[i] + Y.Struct[op.struct].binaryEncode(encoder, Y.Struct[op.struct].encode(op)) + } + } /* * Get the plain untransformed operations from the database. * You can apply these operations using .applyOperationsUntransformed(ops) * */ - * getOperationsUntransformed () { - var ops = [] + * writeOperationsUntransformed (encoder) { + let lenPosition = encoder.pos + let len = 0 + encoder.writeUint32(0) // placeholder yield * this.os.iterate(this, null, null, function * (op) { - if (op.id[0] !== -1) { - ops.push(op) + if (op.id[0] !== 0xFFFFFF) { + len++ + Y.Struct[op.struct].binaryEncode(encoder, Y.Struct[op.struct].encode(op)) } }) - return { - untransformed: ops - } + encoder.setUint32(lenPosition, len) + yield * this.writeStateSet(encoder) } - * applyOperationsUntransformed (m, stateSet) { - var ops = m.untransformed - for (var i = 0; i < ops.length; i++) { - var op = ops[i] - // create, and modify parent, if it is created implicitly - if (op.parent != null && op.parent[0] === -1) { + * applyOperationsUntransformed (decoder) { + let len = decoder.readUint32() + for (let i = 0; i < len; i++) { + let op = decoder.readOperation() + yield * this.os.put(op) + } + yield * this.os.iterate(this, null, null, function * (op) { + if (op.parent != null && op.parent[0] === 0xFFFFFF) { if (op.struct === 'Insert') { // update parents .map/start/end properties if (op.parentSub != null && op.left == null) { @@ -1065,12 +1117,14 @@ export default function extendTransaction (Y) { } } } - yield * this.os.put(op) - } - for (var user in stateSet) { + }) + let stateSetLength = decoder.readUint32() + for (let i = 0; i < stateSetLength; i++) { + let user = decoder.readVarUint() + let clock = decoder.readVarUint() yield * this.ss.put({ id: [user], - clock: stateSet[user] + clock: clock }) } } diff --git a/src/y.js b/src/y.js index e464810c..7f43bd12 100644 --- a/src/y.js +++ b/src/y.js @@ -1,9 +1,10 @@ -import debug from 'debug' import extendConnector from './Connector.js' import extendDatabase from './Database.js' import extendTransaction from './Transaction.js' import extendStruct from './Struct.js' import extendUtils from './Utils.js' +import debug from 'debug' +import { formatYjsMessage, formatYjsMessageType } from './MessageHandler.js' extendConnector(Y) extendDatabase(Y) @@ -12,6 +13,8 @@ extendStruct(Y) extendUtils(Y) Y.debug = debug +debug.formatters.Y = formatYjsMessage +debug.formatters.y = formatYjsMessageType var requiringModules = {} @@ -169,7 +172,7 @@ class YConfig extends Y.utils.NamedEventHandler { var typeName = typeConstructor.splice(0, 1) var type = Y[typeName] var typedef = type.typeDefinition - var id = [-1, typedef.struct + -1 + typeName + -1 + propertyname + -1 + typeConstructor] + var id = [0xFFFFFF, typedef.struct + '_' + typeName + '_' + propertyname + '_' + typeConstructor] var args = [] if (typeConstructor.length === 1) { try { diff --git a/test/encode-decode.js b/test/encode-decode.js index 2fe5f21c..c919e97f 100644 --- a/test/encode-decode.js +++ b/test/encode-decode.js @@ -1,16 +1,15 @@ import { test } from 'cutest' import Chance from 'chance' import Y from '../src/y.js' -import { BinaryLength, BinaryEncoder, BinaryDecoder } from '../src/Encoding.js' +import { BinaryEncoder, BinaryDecoder } from '../src/Encoding.js' +import { applyRandomTests } from '../../y-array/test/testGeneration.js' function testEncoding (t, write, read, val) { - let binLength = new BinaryLength() - write(binLength, val) - let encoder = new BinaryEncoder(binLength) + let encoder = new BinaryEncoder() write(encoder, val) - let reader = new BinaryDecoder(encoder.dataview) + let reader = new BinaryDecoder(encoder.createBuffer()) let result = read(reader) - t.log(`string encode: ${JSON.stringify(val).length} bytes / binary encode: ${encoder.dataview.buffer.byteLength} bytes`) + t.log(`string encode: ${JSON.stringify(val).length} bytes / binary encode: ${encoder.data.length} bytes`) t.compare(val, result, 'Compare results') } @@ -219,3 +218,4 @@ test('encode/decode Map operations', async function binMap (t) { info: 400 }) }) + diff --git a/tests-lib/helper.js b/tests-lib/helper.js index 1b4d8c9c..4a2d6376 100644 --- a/tests-lib/helper.js +++ b/tests-lib/helper.js @@ -12,6 +12,33 @@ export let Y = _Y Y.extend(yMemory, yArray, yMap, yTest) +function * getStateSet () { + var ss = {} + yield * this.ss.iterate(this, null, null, function * (n) { + var user = n.id[0] + var clock = n.clock + ss[user] = clock + }) + return ss +} + +function * getDeleteSet () { + var ds = {} + yield * this.ds.iterate(this, null, null, function * (n) { + var user = n.id[0] + var counter = n.id[1] + var len = n.len + var gc = n.gc + var dv = ds[user] + if (dv === void 0) { + dv = [] + ds[user] = dv + } + dv.push([counter, len, gc]) + }) + return ds +} + export async function garbageCollectUsers (t, users) { await flushAll(t, users) await Promise.all(users.map(u => u.db.emptyGarbageCollector())) @@ -60,10 +87,14 @@ export async function compareUsers (t, users) { var data = await Promise.all(users.map(async (u) => { var data = {} u.db.requestTransaction(function * () { - var os = yield * this.getOperationsUntransformed() + let ops = [] + yield * this.os.iterate(this, null, null, function * (op) { + ops.push(Y.Struct[op.struct].encode(op)) + }) + data.os = {} - for (let i = 0; i < os.untransformed.length; i++) { - let op = os.untransformed[i] + for (let i = 0; i < ops.length; i++) { + let op = ops[i] op = Y.Struct[op.struct].encode(op) delete op.origin /* @@ -79,8 +110,8 @@ export async function compareUsers (t, users) { data.os[JSON.stringify(op.id)] = op } } - data.ds = yield * this.getDeleteSet() - data.ss = yield * this.getStateSet() + data.ds = yield * getDeleteSet.apply(this) + data.ss = yield * getStateSet.apply(this) }) await u.db.whenTransactionsFinished() return data diff --git a/tests-lib/test-connector.js b/tests-lib/test-connector.js index 0656ae93..42a87564 100644 --- a/tests-lib/test-connector.js +++ b/tests-lib/test-connector.js @@ -1,5 +1,6 @@ /* global Y */ import { wait } from './helper.js' +import { formatYjsMessage } from '../src/MessageHandler.js' var rooms = {} @@ -28,7 +29,6 @@ export class TestRoom { }) } send (sender, receiver, m) { - m = JSON.parse(JSON.stringify(m)) var user = this.users.get(receiver) if (user != null) { user.receiveMessage(sender, m) @@ -81,14 +81,25 @@ export default function extendTestConnector (Y) { this.testRoom.leave(this) return super.disconnect() } + logBufferParsed () { + console.log(' === Logging buffer of user ' + this.userId + ' === ') + for (let [user, conn] of this.connections) { + console.log(` ${user}:`) + for (let i = 0; i < conn.buffer.length; i++) { + console.log(formatYjsMessage(conn.buffer[i])) + } + } + } reconnect () { this.testRoom.join(this) return super.reconnect() } send (uid, message) { + super.send(uid, message) this.testRoom.send(this.userId, uid, message) } broadcast (message) { + super.broadcast(message) this.testRoom.broadcast(this.userId, message) } async whenSynced (f) {