diff --git a/src/Connector.js b/src/Connector.js index b0f6c65c..b6d1f3f5 100644 --- a/src/Connector.js +++ b/src/Connector.js @@ -29,6 +29,7 @@ export default class AbstractConnector { this.currentSyncTarget = null this.debug = opts.debug === true this.broadcastBuffer = new BinaryEncoder() + this.broadcastBufferSize = 0 this.protocolVersion = 11 this.authInfo = opts.auth || null this.checkAuth = opts.checkAuth || function () { return Promise.resolve('write') } // default is everyone has write access @@ -160,22 +161,29 @@ export default class AbstractConnector { if (firstContent) { this.broadcastBuffer.writeVarString(this.y.room) this.broadcastBuffer.writeVarString('update') + this.broadcastBufferSize = 0 + this.broadcastBufferSizePos = this.broadcastBuffer.pos + this.broadcastBuffer.writeUint32(0) } + this.broadcastBufferSize++ struct._toBinary(this.broadcastBuffer) if (this.maxBufferLength > 0 && this.broadcastBuffer.length > this.maxBufferLength) { // it is necessary to send the buffer now // cache the buffer and check if server is responsive - let buffer = this.broadcastBuffer + const buffer = this.broadcastBuffer + buffer.setUint32(this.broadcastBufferSizePos, this.broadcastBufferSize) this.broadcastBuffer = new BinaryEncoder() this.whenRemoteResponsive().then(() => { - this.broadcast(buffer) + this.broadcast(buffer.createBuffer) }) } else if (firstContent) { // send the buffer when all transactions are finished // (or buffer exceeds maxBufferLength) setTimeout(() => { if (this.broadcastBuffer.length > 0) { - this.broadcast(this.broadcastBuffer.createBuffer()) + const buffer = this.broadcastBuffer + buffer.setUint32(this.broadcastBufferSizePos, this.broadcastBufferSize) + this.broadcast(buffer.createBuffer()) this.broadcastBuffer = new BinaryEncoder() } }, 0) diff --git a/src/MessageHandler/integrateRemoteStructs.js b/src/MessageHandler/integrateRemoteStructs.js index 8f6a7df8..d561b65e 100644 --- a/src/MessageHandler/integrateRemoteStructs.js +++ b/src/MessageHandler/integrateRemoteStructs.js @@ -47,7 +47,8 @@ function _integrateRemoteStructHelper (y, struct) { } export function stringifyStructs (y, decoder, strBuilder) { - while (decoder.length !== decoder.pos) { + const len = decoder.readUint32() + for (let i = 0; i < len; i++) { let reference = decoder.readVarUint() let Constr = getStruct(reference) let struct = new Constr() @@ -61,7 +62,8 @@ export function stringifyStructs (y, decoder, strBuilder) { } export function integrateRemoteStructs (decoder, encoder, y) { - while (decoder.length !== decoder.pos) { + const len = decoder.readUint32() + for (let i = 0; i < len; i++) { let reference = decoder.readVarUint() let Constr = getStruct(reference) let struct = new Constr() diff --git a/src/MessageHandler/syncStep1.js b/src/MessageHandler/syncStep1.js index 3992862a..4f081339 100644 --- a/src/MessageHandler/syncStep1.js +++ b/src/MessageHandler/syncStep1.js @@ -31,14 +31,19 @@ export function sendSyncStep1 (connector, syncUser) { } export default function writeStructs (encoder, decoder, y, ss) { + const lenPos = encoder.pos + encoder.writeUint32(0) + let len = 0 for (let user of y.ss.state.keys()) { let clock = ss.get(user) || 0 if (user !== RootFakeUserID) { y.os.iterate(new ID(user, clock), new ID(user, Number.MAX_VALUE), function (struct) { struct._toBinary(encoder) + len++ }) } } + encoder.setUint32(lenPos, len) } export function readSyncStep1 (decoder, encoder, y, senderConn, sender) { @@ -54,9 +59,9 @@ export function readSyncStep1 (decoder, encoder, y, senderConn, sender) { // write sync step 2 encoder.writeVarString('sync step 2') encoder.writeVarString(y.connector.authInfo || '') - writeDeleteSet(y, encoder) const ss = readStateSet(decoder) writeStructs(encoder, decoder, y, ss) + writeDeleteSet(y, encoder) y.connector.send(senderConn.uid, encoder.createBuffer()) senderConn.receivedSyncStep2 = true if (y.connector.role === 'slave') { diff --git a/src/MessageHandler/syncStep2.js b/src/MessageHandler/syncStep2.js index a8dc4208..a169073b 100644 --- a/src/MessageHandler/syncStep2.js +++ b/src/MessageHandler/syncStep2.js @@ -3,6 +3,8 @@ import { readDeleteSet } from './deleteSet.js' export function stringifySyncStep2 (y, decoder, strBuilder) { strBuilder.push(' - auth: ' + decoder.readVarString()) + strBuilder.push(' == OS:') + stringifyStructs(y, decoder, strBuilder) // write DS to string strBuilder.push(' == DS:') let len = decoder.readUint32() @@ -17,12 +19,10 @@ export function stringifySyncStep2 (y, decoder, strBuilder) { strBuilder.push(`[${from}, ${to}, ${gc}]`) } } - strBuilder.push(' == OS:') - stringifyStructs(y, decoder, strBuilder) } export function readSyncStep2 (decoder, encoder, y, senderConn, sender) { - readDeleteSet(y, decoder) integrateRemoteStructs(decoder, encoder, y) + readDeleteSet(y, decoder) y.connector._setSyncedWith(sender) } diff --git a/src/Struct/Delete.js b/src/Struct/Delete.js index ecfc717b..e750c04c 100644 --- a/src/Struct/Delete.js +++ b/src/Struct/Delete.js @@ -1,5 +1,6 @@ import { getReference } from '../Util/structReferences.js' import ID from '../Util/ID.js' +import { logID } from '../MessageHandler/messageToString.js' /** * Delete all items in an ID-range @@ -42,6 +43,8 @@ export default class Delete { this._length = null } _fromBinary (y, decoder) { + // TODO: set target, and add it to missing if not found + // There is an edge case in p2p networks! this._targetID = decoder.readID() this._length = decoder.readVarUint() return [] @@ -71,6 +74,6 @@ export default class Delete { } } _logString () { - return `Delete - target: ${this._target}, len: ${this._length}` + return `Delete - target: ${logID(this._targetID)}, len: ${this._length}` } } diff --git a/src/Struct/Item.js b/src/Struct/Item.js index 49249579..a430b5ef 100644 --- a/src/Struct/Item.js +++ b/src/Struct/Item.js @@ -297,8 +297,6 @@ export default class Item { this._parent = this._origin._parent } else if (this._right_origin !== null) { this._parent = this._right_origin._parent - } else if (missing.length === 0) { - debugger } } if (info & 0b1000) { diff --git a/test/y-array.tests.js b/test/y-array.tests.js index 2d036842..26647338 100644 --- a/test/y-array.tests.js +++ b/test/y-array.tests.js @@ -228,7 +228,7 @@ var arrayTransactions = [ var pos = chance.integer({ min: 0, max: yarray.length }) yarray.insert(pos, content) }, - /*function insertTypeArray (t, user, chance) { + function insertTypeArray (t, user, chance) { const yarray = user.get('array', Y.Array) var pos = chance.integer({ min: 0, max: yarray.length }) yarray.insert(pos, [Y.Array]) @@ -243,7 +243,7 @@ var arrayTransactions = [ map.set('someprop', 42) map.set('someprop', 43) map.set('someprop', 44) - },*/ + }, function _delete (t, user, chance) { const yarray = user.get('array', Y.Array) var length = yarray.length diff --git a/tests-lib/helper.js b/tests-lib/helper.js index cd5b26dc..e70a396f 100644 --- a/tests-lib/helper.js +++ b/tests-lib/helper.js @@ -103,7 +103,7 @@ export async function compareUsers (t, users) { u.os.iterate(null, null, function (op) { const json = { id: op._id, - left: op._left === null ? null : op._left._id, + left: op._left === null ? null : op._left._lastId, right: op._right === null ? null : op._right._id, length: op._length, deleted: op._deleted