fixed ds syncing bug

This commit is contained in:
Kevin Jahns 2017-10-26 19:12:33 +02:00
parent e6b5e258fb
commit 96c6aa2751
8 changed files with 31 additions and 15 deletions

View File

@ -29,6 +29,7 @@ export default class AbstractConnector {
this.currentSyncTarget = null this.currentSyncTarget = null
this.debug = opts.debug === true this.debug = opts.debug === true
this.broadcastBuffer = new BinaryEncoder() this.broadcastBuffer = new BinaryEncoder()
this.broadcastBufferSize = 0
this.protocolVersion = 11 this.protocolVersion = 11
this.authInfo = opts.auth || null this.authInfo = opts.auth || null
this.checkAuth = opts.checkAuth || function () { return Promise.resolve('write') } // default is everyone has write access this.checkAuth = opts.checkAuth || function () { return Promise.resolve('write') } // default is everyone has write access
@ -160,22 +161,29 @@ export default class AbstractConnector {
if (firstContent) { if (firstContent) {
this.broadcastBuffer.writeVarString(this.y.room) this.broadcastBuffer.writeVarString(this.y.room)
this.broadcastBuffer.writeVarString('update') this.broadcastBuffer.writeVarString('update')
this.broadcastBufferSize = 0
this.broadcastBufferSizePos = this.broadcastBuffer.pos
this.broadcastBuffer.writeUint32(0)
} }
this.broadcastBufferSize++
struct._toBinary(this.broadcastBuffer) struct._toBinary(this.broadcastBuffer)
if (this.maxBufferLength > 0 && this.broadcastBuffer.length > this.maxBufferLength) { if (this.maxBufferLength > 0 && this.broadcastBuffer.length > this.maxBufferLength) {
// it is necessary to send the buffer now // it is necessary to send the buffer now
// cache the buffer and check if server is responsive // cache the buffer and check if server is responsive
let buffer = this.broadcastBuffer const buffer = this.broadcastBuffer
buffer.setUint32(this.broadcastBufferSizePos, this.broadcastBufferSize)
this.broadcastBuffer = new BinaryEncoder() this.broadcastBuffer = new BinaryEncoder()
this.whenRemoteResponsive().then(() => { this.whenRemoteResponsive().then(() => {
this.broadcast(buffer) this.broadcast(buffer.createBuffer)
}) })
} else if (firstContent) { } else if (firstContent) {
// send the buffer when all transactions are finished // send the buffer when all transactions are finished
// (or buffer exceeds maxBufferLength) // (or buffer exceeds maxBufferLength)
setTimeout(() => { setTimeout(() => {
if (this.broadcastBuffer.length > 0) { if (this.broadcastBuffer.length > 0) {
this.broadcast(this.broadcastBuffer.createBuffer()) const buffer = this.broadcastBuffer
buffer.setUint32(this.broadcastBufferSizePos, this.broadcastBufferSize)
this.broadcast(buffer.createBuffer())
this.broadcastBuffer = new BinaryEncoder() this.broadcastBuffer = new BinaryEncoder()
} }
}, 0) }, 0)

View File

@ -47,7 +47,8 @@ function _integrateRemoteStructHelper (y, struct) {
} }
export function stringifyStructs (y, decoder, strBuilder) { export function stringifyStructs (y, decoder, strBuilder) {
while (decoder.length !== decoder.pos) { const len = decoder.readUint32()
for (let i = 0; i < len; i++) {
let reference = decoder.readVarUint() let reference = decoder.readVarUint()
let Constr = getStruct(reference) let Constr = getStruct(reference)
let struct = new Constr() let struct = new Constr()
@ -61,7 +62,8 @@ export function stringifyStructs (y, decoder, strBuilder) {
} }
export function integrateRemoteStructs (decoder, encoder, y) { export function integrateRemoteStructs (decoder, encoder, y) {
while (decoder.length !== decoder.pos) { const len = decoder.readUint32()
for (let i = 0; i < len; i++) {
let reference = decoder.readVarUint() let reference = decoder.readVarUint()
let Constr = getStruct(reference) let Constr = getStruct(reference)
let struct = new Constr() let struct = new Constr()

View File

@ -31,14 +31,19 @@ export function sendSyncStep1 (connector, syncUser) {
} }
export default function writeStructs (encoder, decoder, y, ss) { export default function writeStructs (encoder, decoder, y, ss) {
const lenPos = encoder.pos
encoder.writeUint32(0)
let len = 0
for (let user of y.ss.state.keys()) { for (let user of y.ss.state.keys()) {
let clock = ss.get(user) || 0 let clock = ss.get(user) || 0
if (user !== RootFakeUserID) { if (user !== RootFakeUserID) {
y.os.iterate(new ID(user, clock), new ID(user, Number.MAX_VALUE), function (struct) { y.os.iterate(new ID(user, clock), new ID(user, Number.MAX_VALUE), function (struct) {
struct._toBinary(encoder) struct._toBinary(encoder)
len++
}) })
} }
} }
encoder.setUint32(lenPos, len)
} }
export function readSyncStep1 (decoder, encoder, y, senderConn, sender) { export function readSyncStep1 (decoder, encoder, y, senderConn, sender) {
@ -54,9 +59,9 @@ export function readSyncStep1 (decoder, encoder, y, senderConn, sender) {
// write sync step 2 // write sync step 2
encoder.writeVarString('sync step 2') encoder.writeVarString('sync step 2')
encoder.writeVarString(y.connector.authInfo || '') encoder.writeVarString(y.connector.authInfo || '')
writeDeleteSet(y, encoder)
const ss = readStateSet(decoder) const ss = readStateSet(decoder)
writeStructs(encoder, decoder, y, ss) writeStructs(encoder, decoder, y, ss)
writeDeleteSet(y, encoder)
y.connector.send(senderConn.uid, encoder.createBuffer()) y.connector.send(senderConn.uid, encoder.createBuffer())
senderConn.receivedSyncStep2 = true senderConn.receivedSyncStep2 = true
if (y.connector.role === 'slave') { if (y.connector.role === 'slave') {

View File

@ -3,6 +3,8 @@ import { readDeleteSet } from './deleteSet.js'
export function stringifySyncStep2 (y, decoder, strBuilder) { export function stringifySyncStep2 (y, decoder, strBuilder) {
strBuilder.push(' - auth: ' + decoder.readVarString()) strBuilder.push(' - auth: ' + decoder.readVarString())
strBuilder.push(' == OS:')
stringifyStructs(y, decoder, strBuilder)
// write DS to string // write DS to string
strBuilder.push(' == DS:') strBuilder.push(' == DS:')
let len = decoder.readUint32() let len = decoder.readUint32()
@ -17,12 +19,10 @@ export function stringifySyncStep2 (y, decoder, strBuilder) {
strBuilder.push(`[${from}, ${to}, ${gc}]`) strBuilder.push(`[${from}, ${to}, ${gc}]`)
} }
} }
strBuilder.push(' == OS:')
stringifyStructs(y, decoder, strBuilder)
} }
export function readSyncStep2 (decoder, encoder, y, senderConn, sender) { export function readSyncStep2 (decoder, encoder, y, senderConn, sender) {
readDeleteSet(y, decoder)
integrateRemoteStructs(decoder, encoder, y) integrateRemoteStructs(decoder, encoder, y)
readDeleteSet(y, decoder)
y.connector._setSyncedWith(sender) y.connector._setSyncedWith(sender)
} }

View File

@ -1,5 +1,6 @@
import { getReference } from '../Util/structReferences.js' import { getReference } from '../Util/structReferences.js'
import ID from '../Util/ID.js' import ID from '../Util/ID.js'
import { logID } from '../MessageHandler/messageToString.js'
/** /**
* Delete all items in an ID-range * Delete all items in an ID-range
@ -42,6 +43,8 @@ export default class Delete {
this._length = null this._length = null
} }
_fromBinary (y, decoder) { _fromBinary (y, decoder) {
// TODO: set target, and add it to missing if not found
// There is an edge case in p2p networks!
this._targetID = decoder.readID() this._targetID = decoder.readID()
this._length = decoder.readVarUint() this._length = decoder.readVarUint()
return [] return []
@ -71,6 +74,6 @@ export default class Delete {
} }
} }
_logString () { _logString () {
return `Delete - target: ${this._target}, len: ${this._length}` return `Delete - target: ${logID(this._targetID)}, len: ${this._length}`
} }
} }

View File

@ -297,8 +297,6 @@ export default class Item {
this._parent = this._origin._parent this._parent = this._origin._parent
} else if (this._right_origin !== null) { } else if (this._right_origin !== null) {
this._parent = this._right_origin._parent this._parent = this._right_origin._parent
} else if (missing.length === 0) {
debugger
} }
} }
if (info & 0b1000) { if (info & 0b1000) {

View File

@ -228,7 +228,7 @@ var arrayTransactions = [
var pos = chance.integer({ min: 0, max: yarray.length }) var pos = chance.integer({ min: 0, max: yarray.length })
yarray.insert(pos, content) yarray.insert(pos, content)
}, },
/*function insertTypeArray (t, user, chance) { function insertTypeArray (t, user, chance) {
const yarray = user.get('array', Y.Array) const yarray = user.get('array', Y.Array)
var pos = chance.integer({ min: 0, max: yarray.length }) var pos = chance.integer({ min: 0, max: yarray.length })
yarray.insert(pos, [Y.Array]) yarray.insert(pos, [Y.Array])
@ -243,7 +243,7 @@ var arrayTransactions = [
map.set('someprop', 42) map.set('someprop', 42)
map.set('someprop', 43) map.set('someprop', 43)
map.set('someprop', 44) map.set('someprop', 44)
},*/ },
function _delete (t, user, chance) { function _delete (t, user, chance) {
const yarray = user.get('array', Y.Array) const yarray = user.get('array', Y.Array)
var length = yarray.length var length = yarray.length

View File

@ -103,7 +103,7 @@ export async function compareUsers (t, users) {
u.os.iterate(null, null, function (op) { u.os.iterate(null, null, function (op) {
const json = { const json = {
id: op._id, id: op._id,
left: op._left === null ? null : op._left._id, left: op._left === null ? null : op._left._lastId,
right: op._right === null ? null : op._right._id, right: op._right === null ? null : op._right._id,
length: op._length, length: op._length,
deleted: op._deleted deleted: op._deleted