refactor the whole damn thing
This commit is contained in:
117
src/MessageHandler/deleteSet.js
Normal file
117
src/MessageHandler/deleteSet.js
Normal file
@@ -0,0 +1,117 @@
|
||||
import deleteItemRange from 'deleteItemRange'
|
||||
|
||||
export function stringifyDeleteSet (y, decoder, strBuilder) {
|
||||
let dsLength = decoder.readUint32()
|
||||
for (let i = 0; i < dsLength; i++) {
|
||||
let user = decoder.readVarUint()
|
||||
strBuilder.push(' -' + user + ':')
|
||||
let dvLength = decoder.readVarUint()
|
||||
for (let j = 0; j < dvLength; j++) {
|
||||
let from = decoder.readVarUint()
|
||||
let len = decoder.readVarUint()
|
||||
let gc = decoder.readUint8() === 1
|
||||
strBuilder.push(`clock: ${from}, length: ${len}, gc: ${gc}`)
|
||||
}
|
||||
}
|
||||
return strBuilder
|
||||
}
|
||||
|
||||
export function writeDeleteSet (y, encoder) {
|
||||
let currentUser = null
|
||||
let currentLength = 0
|
||||
let lastLenPos
|
||||
|
||||
let numberOfUsers = 0
|
||||
let laterDSLenPus = encoder.pos
|
||||
encoder.writeUint32(0)
|
||||
|
||||
y.ds.iterate(null, null, function (n) {
|
||||
var user = n._id.user
|
||||
var clock = n._id.clock
|
||||
var len = n.len
|
||||
var gc = n.gc
|
||||
if (currentUser !== user) {
|
||||
numberOfUsers++
|
||||
// a new user was found
|
||||
if (currentUser !== null) { // happens on first iteration
|
||||
encoder.setUint32(lastLenPos, currentLength)
|
||||
}
|
||||
encoder.writeVarUint(user)
|
||||
// pseudo-fill pos
|
||||
lastLenPos = encoder.pos
|
||||
encoder.writeUint32(0)
|
||||
}
|
||||
encoder.writeVarUint(clock)
|
||||
encoder.writeVarUint(len)
|
||||
encoder.writeUint8(gc ? 1 : 0)
|
||||
})
|
||||
if (currentUser !== null) { // happens on first iteration
|
||||
encoder.setUint32(lastLenPos, currentLength)
|
||||
}
|
||||
encoder.writeUint32(laterDSLenPus, numberOfUsers)
|
||||
}
|
||||
|
||||
export function readDeleteSet (y, decoder) {
|
||||
let dsLength = decoder.readUint32()
|
||||
for (let i = 0; i < dsLength; i++) {
|
||||
let user = decoder.readVarUint()
|
||||
let dv = []
|
||||
let dvLength = decoder.readVarUint()
|
||||
for (let j = 0; j < dvLength; j++) {
|
||||
let from = decoder.readVarUint()
|
||||
let len = decoder.readVarUint()
|
||||
let gc = decoder.readUint8() === 1
|
||||
dv.push([from, len, gc])
|
||||
}
|
||||
var pos = 0
|
||||
var d = dv[pos]
|
||||
y.ds.iterate(this, [user, 0], [user, Number.MAX_VALUE], function (n) {
|
||||
// cases:
|
||||
// 1. d deletes something to the right of n
|
||||
// => go to next n (break)
|
||||
// 2. d deletes something to the left of n
|
||||
// => create deletions
|
||||
// => reset d accordingly
|
||||
// *)=> if d doesn't delete anything anymore, go to next d (continue)
|
||||
// 3. not 2) and d deletes something that also n deletes
|
||||
// => reset d so that it doesn't contain n's deletion
|
||||
// *)=> if d does not delete anything anymore, go to next d (continue)
|
||||
while (d != null) {
|
||||
var diff = 0 // describe the diff of length in 1) and 2)
|
||||
if (n.id[1] + n.len <= d[0]) {
|
||||
// 1)
|
||||
break
|
||||
} else if (d[0] < n.id[1]) {
|
||||
// 2)
|
||||
// delete maximum the len of d
|
||||
// else delete as much as possible
|
||||
diff = Math.min(n.id[1] - d[0], d[1])
|
||||
deleteItemRange(y, user, d[0], diff)
|
||||
// deletions.push([user, d[0], diff, d[2]])
|
||||
} else {
|
||||
// 3)
|
||||
diff = n.id[1] + n.len - d[0] // never null (see 1)
|
||||
if (d[2] && !n.gc) {
|
||||
// d marks as gc'd but n does not
|
||||
// then delete either way
|
||||
deleteItemRange(y, user, d[0], Math.min(diff, d[1]))
|
||||
// deletions.push([user, d[0], Math.min(diff, d[1]), d[2]])
|
||||
}
|
||||
}
|
||||
if (d[1] <= diff) {
|
||||
// d doesn't delete anything anymore
|
||||
d = dv[++pos]
|
||||
} else {
|
||||
d[0] = d[0] + diff // reset pos
|
||||
d[1] = d[1] - diff // reset length
|
||||
}
|
||||
}
|
||||
})
|
||||
// for the rest.. just apply it
|
||||
for (; pos < dv.length; pos++) {
|
||||
d = dv[pos]
|
||||
deleteItemRange(y, user, d[0], d[1])
|
||||
// deletions.push([user, d[0], d[1], d[2]])
|
||||
}
|
||||
}
|
||||
}
|
||||
73
src/MessageHandler/integrateRemoteStructs.js
Normal file
73
src/MessageHandler/integrateRemoteStructs.js
Normal file
@@ -0,0 +1,73 @@
|
||||
import { getStruct } from '../Util/StructReferences'
|
||||
import BinaryDecoder from '../Util/Binary/Decoder'
|
||||
|
||||
class MissingEntry {
|
||||
constructor (decoder, missing, struct) {
|
||||
this.decoder = decoder
|
||||
this.missing = missing.length
|
||||
this.struct = struct
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Integrate remote struct
|
||||
* When a remote struct is integrated, other structs might be ready to ready to
|
||||
* integrate.
|
||||
*/
|
||||
function _integrateRemoteStructHelper (y, struct) {
|
||||
struct._integrate(y)
|
||||
let msu = y._missingStructs.get(struct._id.user)
|
||||
if (msu != null) {
|
||||
let len = struct._length
|
||||
for (let i = 0; i < len; i++) {
|
||||
if (msu.has(struct._id.clock + i)) {
|
||||
let msuc = msu.get(struct._id.clock + i)
|
||||
msuc.forEach(missingDef => {
|
||||
missingDef.missing--
|
||||
if (missingDef.missing === 0) {
|
||||
let missing = missingDef.struct._fromBinary(y, missingDef.decoder)
|
||||
if (missing.length > 0) {
|
||||
console.error('Missing should be empty!')
|
||||
} else {
|
||||
y._readyToIntegrate.push(missingDef.struct)
|
||||
}
|
||||
}
|
||||
})
|
||||
msu.delete(struct._id.clock)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export default function integrateRemoteStructs (decoder, encoder, y) {
|
||||
while (decoder.length !== decoder.pos) {
|
||||
let decoderPos = decoder.pos
|
||||
let reference = decoder.readVarUint()
|
||||
let Constr = getStruct(reference)
|
||||
let struct = new Constr()
|
||||
let missing = struct._fromBinary(decoder)
|
||||
if (missing.length === 0) {
|
||||
while (struct != null) {
|
||||
_integrateRemoteStructHelper(y, struct)
|
||||
struct = y._readyToIntegrate.shift()
|
||||
}
|
||||
} else {
|
||||
let _decoder = new BinaryDecoder(decoder.uint8arr)
|
||||
_decoder.pos = decoderPos
|
||||
let missingEntry = new MissingEntry(_decoder, missing, struct)
|
||||
let missingStructs = y._missingStructs
|
||||
for (let i = missing.length - 1; i >= 0; i--) {
|
||||
let m = missing[i]
|
||||
if (!missingStructs.has(m.user)) {
|
||||
missingStructs.set(m.user, new Map())
|
||||
}
|
||||
let msu = missingStructs.get(m.user)
|
||||
if (!msu.has(m.clock)) {
|
||||
msu.set(m.clock, [])
|
||||
}
|
||||
let mArray = msu = msu.get(m.clock)
|
||||
mArray.push(missingEntry)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
28
src/MessageHandler/messageToString.js
Normal file
28
src/MessageHandler/messageToString.js
Normal file
@@ -0,0 +1,28 @@
|
||||
import BinaryDecoder from '../Utily/Binary/Decoder'
|
||||
import { stringifyUpdate } from './update'
|
||||
import { stringifySyncStep1 } from './syncStep1'
|
||||
import { stringifySyncStep2 } from './syncStep2'
|
||||
|
||||
export function messageToString (buffer) {
|
||||
let decoder = new BinaryDecoder(buffer)
|
||||
decoder.readVarString() // read roomname
|
||||
let type = decoder.readVarString()
|
||||
let strBuilder = []
|
||||
strBuilder.push('\n === ' + type + ' ===\n')
|
||||
if (type === 'update') {
|
||||
stringifyUpdate(decoder, strBuilder)
|
||||
} else if (type === 'sync step 1') {
|
||||
stringifySyncStep1(decoder, strBuilder)
|
||||
} else if (type === 'sync step 2') {
|
||||
stringifySyncStep2(decoder, strBuilder)
|
||||
} else {
|
||||
strBuilder.push('-- Unknown message type - probably an encoding issue!!!')
|
||||
}
|
||||
return strBuilder.join('\n')
|
||||
}
|
||||
|
||||
export function messageToRoomname (buffer) {
|
||||
let decoder = new BinaryDecoder(buffer)
|
||||
decoder.readVarString() // roomname
|
||||
return decoder.readVarString() // messageType
|
||||
}
|
||||
24
src/MessageHandler/stateSet.js
Normal file
24
src/MessageHandler/stateSet.js
Normal file
@@ -0,0 +1,24 @@
|
||||
|
||||
export function readStateSet (decoder) {
|
||||
let ss = new Map()
|
||||
let ssLength = decoder.readUint32()
|
||||
for (let i = 0; i < ssLength; i++) {
|
||||
let user = decoder.readVarUint()
|
||||
let clock = decoder.readVarUint()
|
||||
ss.set(user, clock)
|
||||
}
|
||||
return ss
|
||||
}
|
||||
|
||||
export function writeStateSet (encoder) {
|
||||
let lenPosition = encoder.pos
|
||||
let len = 0
|
||||
encoder.writeUint32(0)
|
||||
this.ss.iterate(this, null, null, function (n) {
|
||||
encoder.writeVarUint(n.id[0])
|
||||
encoder.writeVarUint(n.clock)
|
||||
len++
|
||||
})
|
||||
encoder.setUint32(lenPosition, len)
|
||||
return len === 0
|
||||
}
|
||||
53
src/MessageHandler/syncStep1.js
Normal file
53
src/MessageHandler/syncStep1.js
Normal file
@@ -0,0 +1,53 @@
|
||||
import BinaryEncoder from './Util/Binary/Encoder.js'
|
||||
|
||||
export function stringifySyncStep1 (decoder, strBuilder) {
|
||||
let auth = decoder.readVarString()
|
||||
let protocolVersion = decoder.readVarUint()
|
||||
strBuilder.push(`
|
||||
- auth: "${auth}"
|
||||
- protocolVersion: ${protocolVersion}
|
||||
`)
|
||||
// write SS
|
||||
strBuilder.push(' == SS: \n')
|
||||
let len = decoder.readUint32()
|
||||
for (let i = 0; i < len; i++) {
|
||||
let user = decoder.readVarUint()
|
||||
let clock = decoder.readVarUint()
|
||||
strBuilder.push(` ${user}: ${clock}\n`)
|
||||
}
|
||||
}
|
||||
|
||||
export function sendSyncStep1 (y, syncUser) {
|
||||
let encoder = new BinaryEncoder()
|
||||
encoder.writeVarString(y.room)
|
||||
encoder.writeVarString('sync step 1')
|
||||
encoder.writeVarString(y.connector.authInfo || '')
|
||||
encoder.writeVarUint(y.connector.protocolVersion)
|
||||
y.ss.writeStateSet(encoder)
|
||||
y.connector.send(syncUser, encoder.createBuffer())
|
||||
}
|
||||
|
||||
export function readSyncStep1 (decoder, encoder, y, senderConn, sender) {
|
||||
let protocolVersion = decoder.readVarUint()
|
||||
// check protocol version
|
||||
if (protocolVersion !== y.connector.protocolVersion) {
|
||||
console.warn(
|
||||
`You tried to sync with a yjs instance that has a different protocol version
|
||||
(You: ${protocolVersion}, Client: ${protocolVersion}).
|
||||
The sync was stopped. You need to upgrade your dependencies (especially Yjs & the Connector)!
|
||||
`)
|
||||
y.destroy()
|
||||
}
|
||||
|
||||
// send sync step 2
|
||||
encoder.writeVarString('sync step 2')
|
||||
encoder.writeVarString(y.connector.authInfo || '')
|
||||
writeDeleteSet(encoder)
|
||||
// reads ss and writes os
|
||||
writeOperations(encoder, decoder)
|
||||
y.connector.send(senderConn.uid, encoder.createBuffer())
|
||||
senderConn.receivedSyncStep2 = true
|
||||
if (y.connector.role === 'slave') {
|
||||
sendSyncStep1(y, sender)
|
||||
}
|
||||
}
|
||||
48
src/MessageHandler/syncStep2.js
Normal file
48
src/MessageHandler/syncStep2.js
Normal file
@@ -0,0 +1,48 @@
|
||||
import integrateRemoteStructs from './integrateRemoteStructs'
|
||||
import { stringifyUpdate } from './update.js'
|
||||
import ID from '../Util/ID'
|
||||
|
||||
export function stringifySyncStep2 (decoder, strBuilder) {
|
||||
strBuilder.push(' - auth: ' + decoder.readVarString() + '\n')
|
||||
strBuilder.push(' == OS: \n')
|
||||
stringifyUpdate(decoder, strBuilder)
|
||||
// write DS to string
|
||||
strBuilder.push(' == DS: \n')
|
||||
let len = decoder.readUint32()
|
||||
for (let i = 0; i < len; i++) {
|
||||
let user = decoder.readVarUint()
|
||||
strBuilder.push(` User: ${user}: `)
|
||||
let len2 = decoder.readVarUint()
|
||||
for (let j = 0; j < len2; j++) {
|
||||
let from = decoder.readVarUint()
|
||||
let to = decoder.readVarUint()
|
||||
let gc = decoder.readUint8() === 1
|
||||
strBuilder.push(`[${from}, ${to}, ${gc}]`)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export function writeSyncStep2 () {
|
||||
// TODO
|
||||
}
|
||||
|
||||
export default function writeStructs (encoder, decoder, y, ss) {
|
||||
let lenPos = encoder.pos
|
||||
let len = 0
|
||||
encoder.writeUint32(0)
|
||||
for (let [user, clock] of ss) {
|
||||
y.os.iterate(new ID(user, clock), null, function (struct) {
|
||||
struct._toBinary(y, encoder)
|
||||
len++
|
||||
})
|
||||
}
|
||||
encoder.setUint32(lenPos, len)
|
||||
}
|
||||
|
||||
export function readSyncStep2 (decoder, encoder, y, senderConn, sender) {
|
||||
// apply operations first
|
||||
applyDeleteSet(decoder)
|
||||
integrateRemoteStructs(decoder, encoder, y)
|
||||
// then apply ds
|
||||
y.connector._setSyncedWith(sender)
|
||||
}
|
||||
19
src/MessageHandler/update.js
Normal file
19
src/MessageHandler/update.js
Normal file
@@ -0,0 +1,19 @@
|
||||
|
||||
import { getStruct } from '../Util/StructReferences'
|
||||
|
||||
export function stringifyUpdate (decoder, strBuilder) {
|
||||
while (decoder.length !== decoder.pos) {
|
||||
let reference = decoder.readVarUint()
|
||||
let Constr = getStruct(reference)
|
||||
let struct = new Constr()
|
||||
let missing = struct._fromBinary(decoder)
|
||||
let logMessage = struct._logString()
|
||||
if (missing.length > 0) {
|
||||
logMessage += missing.map(m => m._logString()).join(', ')
|
||||
}
|
||||
logMessage += '\n'
|
||||
strBuilder.push(logMessage)
|
||||
}
|
||||
}
|
||||
|
||||
export { integrateRemoteStructs as readUpdate } from './integrateRemoteStructs'
|
||||
Reference in New Issue
Block a user