redesigned connector protocol - enabled binary compression

This commit is contained in:
Kevin Jahns 2017-07-22 18:06:00 +02:00
parent 6c37bd4463
commit a19cfa1465
10 changed files with 498 additions and 272 deletions

View File

@ -1,32 +1,11 @@
/* @flow */
'use strict'
function canRead (auth) { return auth === 'read' || auth === 'write' }
function canWrite (auth) { return auth === 'write' }
import { BinaryEncoder, BinaryDecoder } from './Encoding.js'
import { computeMessageSyncStep1, computeMessageSyncStep2, computeMessageUpdate } from './MessageHandler.js'
export default function extendConnector (Y/* :any */) {
class AbstractConnector {
/* ::
y: YConfig;
role: SyncRole;
connections: Object;
isSynced: boolean;
userEventListeners: Array<Function>;
whenSyncedListeners: Array<Function>;
currentSyncTarget: ?UserId;
debug: boolean;
syncStep2: Promise;
userId: UserId;
send: Function;
broadcast: Function;
broadcastOpBuffer: Array<Operation>;
protocolVersion: number;
*/
/*
opts contains the following information:
role : String Role of this client ("master" or "slave")
userId : String Uniquely defines the user.
debug: Boolean Whether to print debug messages (optional)
*/
constructor (y, opts) {
this.y = y
@ -63,15 +42,6 @@ export default function extendConnector (Y/* :any */) {
this.setUserId(Y.utils.generateUserId())
}
}
resetAuth (auth) {
if (this.authInfo !== auth) {
this.authInfo = auth
this.broadcast({
type: 'auth',
auth: this.authInfo
})
}
}
reconnect () {
this.log('reconnecting..')
return this.y.db.startGarbageCollector()
@ -137,8 +107,10 @@ export default function extendConnector (Y/* :any */) {
}
this.log('%s: User joined %s', this.userId, user)
this.connections.set(user, {
uid: user,
isSynced: false,
role: role
role: role,
processAfterAuth: []
})
let defer = {}
defer.promise = new Promise(function (resolve) { defer.resolve = resolve })
@ -179,19 +151,14 @@ export default function extendConnector (Y/* :any */) {
if (syncUser != null) {
this.currentSyncTarget = syncUser
this.y.db.requestTransaction(function * () {
var stateSet = yield * this.getStateSet()
// var deleteSet = yield * this.getDeleteSet()
var answer = {
type: 'sync step 1',
stateSet: stateSet,
// deleteSet: deleteSet,
protocolVersion: conn.protocolVersion,
auth: conn.authInfo
}
if (conn.preferUntransformed && Object.keys(stateSet).length === 0) {
answer.preferUntransformed = true
}
conn.send(syncUser, answer)
let encoder = new BinaryEncoder()
encoder.writeVarString('sync step 1')
encoder.writeVarString(conn.authInfo || '')
encoder.writeVarUint(conn.protocolVersion)
let preferUntransformed = conn.preferUntransformed && this.os.length === 0 // TODO: length may not be defined
encoder.writeUint8(preferUntransformed ? 1 : 0)
yield * this.writeStateSet(encoder)
conn.send(syncUser, encoder.createBuffer())
})
} else {
if (!conn.isSynced) {
@ -211,13 +178,13 @@ export default function extendConnector (Y/* :any */) {
}
}
}
send (uid, message) {
this.log('%s: Send \'%s\' to %s', this.userId, message.type, uid)
this.logMessage('Message: %j', message)
send (uid, buffer) {
this.log('%s: Send \'%y\' to %s', this.userId, buffer, uid)
this.logMessage('Message: %Y', buffer)
}
broadcast (message) {
this.log('%s: Broadcast \'%s\'', this.userId, message.type)
this.logMessage('Message: %j', message)
broadcast (buffer) {
this.log('%s: Broadcast \'%y\'', this.userId, buffer)
this.logMessage('Message: %Y', buffer)
}
/*
Buffer operations, and broadcast them when ready.
@ -229,11 +196,17 @@ export default function extendConnector (Y/* :any */) {
var self = this
function broadcastOperations () {
if (self.broadcastOpBuffer.length > 0) {
self.broadcast({
type: 'update',
ops: self.broadcastOpBuffer
})
let encoder = new BinaryEncoder()
encoder.writeVarString('update')
let ops = self.broadcastOpBuffer
self.broadcastOpBuffer = []
let length = ops.length
encoder.writeUint32(length)
for (var i = 0; i < length; i++) {
let op = ops[i]
Y.Struct[op.struct].binaryEncode(encoder, op)
}
self.broadcast(encoder.createBuffer())
}
}
if (this.broadcastOpBuffer.length === 0) {
@ -246,119 +219,60 @@ export default function extendConnector (Y/* :any */) {
/*
You received a raw message, and you know that it is intended for Yjs. Then call this function.
*/
receiveMessage (sender/* :UserId */, message/* :Message */) {
async receiveMessage (sender, buffer) {
if (sender === this.userId) {
return Promise.resolve()
return
}
this.log('%s: Receive \'%s\' from %s', this.userId, message.type, sender)
this.logMessage('Message: %j', message)
if (message.protocolVersion != null && message.protocolVersion !== this.protocolVersion) {
console.warn(
`You tried to sync with a yjs instance that has a different protocol version
(You: ${this.protocolVersion}, Client: ${message.protocolVersion}).
The sync was stopped. You need to upgrade your dependencies (especially Yjs & the Connector)!
`)
this.send(sender, {
type: 'sync stop',
protocolVersion: this.protocolVersion
})
return Promise.reject(new Error('Incompatible protocol version'))
}
if (message.auth != null && this.connections.has(sender)) {
// authenticate using auth in message
var auth = this.checkAuth(message.auth, this.y)
this.connections.get(sender).auth = auth
auth.then(auth => {
for (var f of this.userEventListeners) {
f({
action: 'userAuthenticated',
user: sender,
auth: auth
})
}
})
} else if (this.connections.has(sender) && this.connections.get(sender).auth == null) {
// authenticate without otherwise
this.connections.get(sender).auth = this.checkAuth(null, this.y)
}
if (this.connections.has(sender) && this.connections.get(sender).auth != null) {
return this.connections.get(sender).auth.then(auth => {
if (message.type === 'sync step 1' && canRead(auth)) {
let conn = this
let m = message
let wait // wait for sync step 2 to complete
if (this.role === 'slave') {
wait = Promise.all(Array.from(this.connections.values())
.filter(conn => conn.role === 'master')
.map(conn => conn.syncStep2.promise)
)
} else {
wait = Promise.resolve()
}
wait.then(() => {
this.y.db.requestTransaction(function * () {
var currentStateSet = yield * this.getStateSet()
// TODO: remove
// if (canWrite(auth)) {
// yield * this.applyDeleteSet(m.deleteSet)
// }
let decoder = new BinaryDecoder(buffer)
let encoder = new BinaryEncoder()
let messageType = decoder.readVarString()
let senderConn = this.connections.get(sender)
var ds = yield * this.getDeleteSet()
var answer = {
type: 'sync step 2',
stateSet: currentStateSet,
deleteSet: ds,
protocolVersion: conn.protocolVersion,
auth: conn.authInfo
}
if (message.preferUntransformed === true && Object.keys(m.stateSet).length === 0) {
answer.osUntransformed = yield * this.getOperationsUntransformed()
} else {
answer.os = yield * this.getOperations(m.stateSet)
}
conn.send(sender, answer)
})
})
} else if (message.type === 'sync step 2' && canWrite(auth)) {
var db = this.y.db
let defer = this.connections.get(sender).syncStep2
let m = message
// apply operations first
db.requestTransaction(function * () {
// yield * this.applyDeleteSet(m.deleteSet)
if (m.osUntransformed != null) {
yield * this.applyOperationsUntransformed(m.osUntransformed, m.stateSet)
} else {
this.store.apply(m.os)
}
// defer.resolve()
})
// then apply ds
db.whenTransactionsFinished().then(() => {
db.requestTransaction(function * () {
yield * this.applyDeleteSet(m.deleteSet)
})
defer.resolve()
})
var self = this
this.connections.get(sender).syncStep2.promise.then(function () {
self._setSyncedWith(sender)
})
return defer.promise
} else if (message.type === 'update' && canWrite(auth)) {
if (this.y.db.forwardAppliedOperations) {
var delops = message.ops.filter(function (o) {
return o.struct === 'Delete'
})
if (delops.length > 0) {
this.broadcastOps(delops)
}
if (senderConn == null) {
throw new Error('Received message from unknown peer!')
}
if (messageType === 'sync step 1' || messageType === 'sync step 2') {
let auth = decoder.readVarUint()
if (senderConn.auth == null) {
// check auth
let authPermissions = await this.checkAuth(auth, this.y, sender)
senderConn.auth = authPermissions
this.y.emit('userAuthenticated', {
user: senderConn.uid,
auth: authPermissions
})
senderConn.syncStep2.promise.then(() => {
if (senderConn.processAfterAuth == null) {
return
}
this.y.db.apply(message.ops)
}
})
for (let i = 0; i < senderConn.processAfterAuth.length; i++) {
let m = senderConn.processAfterAuth[i]
this.receiveMessage(m[0], m[1])
}
senderConn.processAfterAuth = null
})
}
}
if (senderConn.auth == null) {
senderConn.processAfterAuth.push([sender, buffer])
return
}
this.log('%s: Receive \'%s\' from %s', this.userId, messageType, sender)
this.logMessage('Message: %Y', buffer)
if (messageType === 'sync step 1' && (senderConn.auth === 'write' || senderConn.auth === 'read')) {
// cannot wait for sync step 1 to finish, because we may wait for sync step 2 in sync step 1 (->lock)
computeMessageSyncStep1(decoder, encoder, this, senderConn, sender)
return this.y.db.whenTransactionsFinished()
} else if (messageType === 'sync step 2' && senderConn.auth === 'write') {
return computeMessageSyncStep2(decoder, encoder, this, senderConn, sender)
} else if (messageType === 'update' && senderConn.auth === 'write') {
return computeMessageUpdate(decoder, encoder, this, senderConn, sender)
} else {
return Promise.reject(new Error('Unable to deliver message'))
console.error('Unable to receive message')
}
}
_setSyncedWith (user) {

View File

@ -306,10 +306,12 @@ export default function extendDatabase (Y /* :any */) {
* check if it is an expected op (otherwise wait for it)
* check if was deleted, apply a delete operation after op was applied
*/
apply (ops) {
applyOperations (decoder) {
this.opsReceivedTimestamp = new Date()
for (var i = 0; i < ops.length; i++) {
var o = ops[i]
let length = decoder.readUint32()
for (var i = 0; i < length; i++) {
let o = Y.Struct.binaryDecodeOperation(decoder)
if (o.id == null || o.id[0] !== this.y.connector.userId) {
var required = Y.Struct[o.struct].requiredOps(o)
if (o.requires != null) {
@ -590,7 +592,7 @@ export default function extendDatabase (Y /* :any */) {
op.type = typedefinition[0].name
this.requestTransaction(function * () {
if (op.id[0] === -1) {
if (op.id[0] === 0xFFFFFF) {
yield * this.setOperation(op)
} else {
yield * this.applyCreatedOperations([op])

View File

@ -1,102 +1,127 @@
import utf8 from 'utf-8'
const bits7 = 0b1111111
export class BinaryLength {
constructor () {
this.length = 0
}
writeUint8 (num) {
this.length++
}
writeVarUint (num) {
while (num >= 0b10000000) {
this.length++
num >>= 7
}
this.length++
}
writeVarString (str) {
let len = utf8.setBytesFromString(str).length
this.writeVarUint(len)
this.length += len
}
writeOpID (id) {
this.writeVarUint(id[0])
this.writeVarUint(id[1])
}
}
const bits8 = 0b11111111
export class BinaryEncoder {
constructor (binaryLength) {
this.dataview = new DataView(new ArrayBuffer(binaryLength.length))
this.pos = 0
constructor () {
this.data = []
}
get pos () {
return this.data.length
}
createBuffer () {
return Uint8Array.from(this.data).buffer
}
writeUint8 (num) {
this.dataview.setUint8(this.pos++, num)
this.data.push(num & bits8)
}
setUint8 (pos, num) {
this.data[pos] = num & bits8
}
writeUint16 (num) {
this.data.push(num & bits8, (num >> 8) & bits8)
}
setUint16 (pos, num) {
this.data[pos] = num & bits8
this.data[pos + 1] = (num >> 8) & bits8
}
writeUint32 (num) {
for (let i = 0; i < 4; i++) {
this.data.push(num & bits8)
num >>= 8
}
}
setUint32 (pos, num) {
for (let i = 0; i < 4; i++) {
this.data[pos + i] = num & bits8
num >>= 8
}
}
writeVarUint (num) {
while (num >= 0b10000000) {
this.dataview.setUint8(this.pos++, 0b10000000 | (bits7 & num))
this.data.push(0b10000000 | (bits7 & num))
num >>= 7
}
this.dataview.setUint8(this.pos++, bits7 & num)
this.data.push(bits7 & num)
}
writeVarString (str) {
let bytes = utf8.setBytesFromString(str)
let len = bytes.length
this.writeVarUint(len)
for (let i = 0; i < len; i++) {
this.dataview.setUint8(this.pos++, bytes[i])
this.data.push(bytes[i])
}
}
writeOpID (id) {
this.writeVarUint(id[0])
this.writeVarUint(id[1])
let user = id[0]
this.writeVarUint(user)
if (user !== 0xFFFFFF) {
this.writeVarUint(id[1])
} else {
this.writeVarString(id[1])
}
}
}
export class BinaryDecoder {
constructor (dataview) {
this.dataview = dataview
constructor (buffer) {
this.uint8arr = new Uint8Array(buffer)
this.pos = 0
}
skip8 () {
this.pos++
}
skip16 () {
this.pos += 2
}
skip32 () {
this.pos += 4
}
skipVar () {
while (this.dataview.getUint8(this.pos++) >= 1 << 7) { }
}
readUint8 () {
return this.dataview.getUint8(this.pos++)
return this.uint8arr[this.pos++]
}
readUint32 () {
let uint =
this.uint8arr[this.pos] +
(this.uint8arr[this.pos + 1] << 8) +
(this.uint8arr[this.pos + 2] << 16) +
(this.uint8arr[this.pos + 3] << 24)
this.pos += 4
return uint
}
peekUint8 () {
return this.uint8arr[this.pos]
}
readVarUint () {
let num = 0
let len = 0
while (true) {
let r = this.dataview.getUint8(this.pos++)
let r = this.uint8arr[this.pos++]
num = num | ((r & bits7) << len)
len += 7
if (r < 1 << 7) {
return num
}
if (len > 35) {
throw new Error('Integer out of range!')
}
}
}
readVarString () {
let len = this.readVarUint()
let bytes = new Array(len)
for (let i = 0; i < len; i++) {
bytes[i] = this.dataview.getUint8(this.pos++)
bytes[i] = this.uint8arr[this.pos++]
}
return utf8.getStringFromBytes(bytes)
}
peekVarString () {
let pos = this.pos
let s = this.readVarString()
this.pos = pos
return s
}
readOpID () {
return [this.readVarUint(), this.readVarUint()]
let user = this.readVarUint()
if (user !== 0xFFFFFF) {
return [user, this.readVarUint()]
} else {
return [user, this.readVarString()]
}
}
}

172
src/MessageHandler.js Normal file
View File

@ -0,0 +1,172 @@
import Y from './y.js'
import { BinaryDecoder } from './Encoding.js'
export function formatYjsMessage (buffer) {
let decoder = new BinaryDecoder(buffer)
let type = decoder.readVarString()
let strBuilder = []
strBuilder.push('\n === ' + type + ' ===\n')
if (type === 'update') {
logMessageUpdate(decoder, strBuilder)
} else if (type === 'sync step 1') {
logMessageSyncStep1(decoder, strBuilder)
} else if (type === 'sync step 2') {
logMessageSyncStep2(decoder, strBuilder)
} else {
strBuilder.push('-- Unknown message type - probably an encoding issue!!!')
}
return strBuilder.join('')
}
export function formatYjsMessageType (buffer) {
let decoder = new BinaryDecoder(buffer)
return decoder.readVarString()
}
export async function logMessageUpdate (decoder, strBuilder) {
let len = decoder.readUint32()
for (let i = 0; i < len; i++) {
strBuilder.push(JSON.stringify(Y.Struct.binaryDecodeOperation(decoder)) + '\n')
}
}
export async function computeMessageUpdate (decoder, encoder, conn) {
if (conn.y.db.forwardAppliedOperations) {
let messagePosition = decoder.pos
let len = decoder.readUint32()
let delops = []
for (let i = 0; i < len; i++) {
let op = Y.Struct.binaryDecodeOperation(decoder)
if (op.struct === 'Delete') {
delops.push(op)
}
}
if (delops.length > 0) {
conn.broadcastOps(delops)
}
decoder.pos = messagePosition
}
conn.y.db.applyOperations(decoder)
}
export function logMessageSyncStep1 (decoder, strBuilder) {
let auth = decoder.readVarString()
let protocolVersion = decoder.readVarUint()
let preferUntransformed = decoder.readUint8() === 1
strBuilder.push(`
- auth: "${auth}"
- protocolVersion: ${protocolVersion}
- preferUntransformed: ${preferUntransformed}
`)
logSS(decoder, strBuilder)
}
export async function computeMessageSyncStep1 (decoder, encoder, conn, senderConn, sender) {
let protocolVersion = decoder.readVarUint()
let preferUntransformed = decoder.readUint8() === 1
// check protocol version
if (protocolVersion !== conn.protocolVersion) {
console.warn(
`You tried to sync with a yjs instance that has a different protocol version
(You: ${protocolVersion}, Client: ${protocolVersion}).
The sync was stopped. You need to upgrade your dependencies (especially Yjs & the Connector)!
`)
conn.y.destroy()
}
if (conn.role === 'slave') {
// wait for sync step 2 to complete
await Promise.all(Array.from(conn.connections.values())
.filter(conn => conn.role === 'master')
.map(conn => conn.syncStep2.promise)
)
}
conn.y.db.requestTransaction(function * () {
encoder.writeVarString('sync step 2')
encoder.writeVarString(conn.authInfo || '')
let emptyStateSet = this.ds.length === 0 // TODO: length may not always be available
if (preferUntransformed && emptyStateSet) {
encoder.writeUint8(1)
yield * this.writeOperationsUntransformed(encoder)
} else {
encoder.writeUint8(0)
yield * this.writeOperations(encoder, decoder)
}
yield * this.writeDeleteSet(encoder)
conn.send(senderConn.uid, encoder.createBuffer())
})
await conn.y.db.whenTransactionsFinished()
}
export function logSS (decoder, strBuilder) {
strBuilder.push(' == SS: \n')
let len = decoder.readUint32()
for (let i = 0; i < len; i++) {
let user = decoder.readVarUint()
let clock = decoder.readVarUint()
strBuilder.push(` - ${user}: ${clock}`)
}
}
export function logOS (decoder, strBuilder) {
strBuilder.push(' == OS: \n')
let len = decoder.readUint32()
for (let i = 0; i < len; i++) {
let op = Y.Struct.binaryDecodeOperation(decoder)
strBuilder.push(JSON.stringify(op) + '\n')
}
}
export function logDS (decoder, strBuilder) {
strBuilder.push(' == DS: \n')
let len = decoder.readUint32()
for (let i = 0; i < len; i++) {
let user = decoder.readVarUint()
strBuilder.push(` User: ${user}: `)
let len2 = decoder.readVarUint()
for (let j = 0; j < len2; j++) {
let from = decoder.readVarUint()
let to = decoder.readVarUint()
let gc = decoder.readUint8() === 1
strBuilder.push(`[${from}, ${to}, ${gc}]`)
}
}
}
export function logMessageSyncStep2 (decoder, strBuilder) {
strBuilder.push(' - auth: ' + decoder.readVarString() + '\n')
let osTransformed = decoder.readUint8() === 1
strBuilder.push(' - osUntransformed: ' + osTransformed + '\n')
logOS(decoder, strBuilder)
if (osTransformed) {
logSS(decoder, strBuilder)
}
logDS(decoder, strBuilder)
}
export async function computeMessageSyncStep2 (decoder, encoder, conn, senderConn, sender) {
var db = conn.y.db
let defer = senderConn.syncStep2
// apply operations first
db.requestTransaction(function * () {
let osUntransformed = decoder.readUint8()
if (osUntransformed === 1) {
yield * this.applyOperationsUntransformed(decoder)
} else {
this.store.applyOperations(decoder)
}
})
// then apply ds
await db.whenTransactionsFinished()
db.requestTransaction(function * () {
yield * this.applyDeleteSet(decoder)
})
await db.whenTransactionsFinished()
conn._setSyncedWith(sender)
defer.resolve()
}

View File

@ -23,6 +23,20 @@ const CMAP = 3
*/
export default function extendStruct (Y) {
var Struct = {
binaryDecodeOperation: function (decoder) {
let code = decoder.peekUint8()
if (code === CDELETE) {
return Y.Struct.Delete.binaryDecode(decoder)
} else if (code === CINSERT) {
return Y.Struct.Insert.binaryDecode(decoder)
} else if (code === CLIST) {
return Y.Struct.List.binaryDecode(decoder)
} else if (code === CMAP) {
return Y.Struct.Map.binaryDecode(decoder)
} else {
throw new Error('Unable to decode operation!')
}
},
/* This is the only operation that is actually not a structure, because
it is not stored in the OS. This is why it _does not_ have an id

View File

@ -1,5 +1,3 @@
/* @flow */
'use strict'
/*
Partial definition of a transaction
@ -96,7 +94,7 @@ export default function extendTransaction (Y) {
send.push(Y.Struct[op.struct].encode(op))
}
}
if (this.store.y.connector.isSynced && send.length > 0) { // TODO: && !this.store.forwardAppliedOperations (but then i don't send delete ops)
if (send.length > 0) { // TODO: && !this.store.forwardAppliedOperations (but then i don't send delete ops)
// is connected, and this is not going to be send in addOperation
this.store.y.connector.broadcastOps(send)
}
@ -588,12 +586,20 @@ export default function extendTransaction (Y) {
apply a delete set in order to get
the state of the supplied ds
*/
* applyDeleteSet (ds) {
* applyDeleteSet (decoder) {
var deletions = []
for (var user in ds) {
var dv = ds[user]
user = Number.parseInt(user, 10)
let dsLength = decoder.readUint32()
for (let i = 0; i < dsLength; i++) {
let user = decoder.readVarUint()
let dv = []
let dvLength = decoder.readVarUint()
for (let j = 0; j < dvLength; j++) {
let from = decoder.readVarUint()
let len = decoder.readVarUint()
let gc = decoder.readUint8() === 1
dv.push([from, len, gc])
}
var pos = 0
var d = dv[pos]
yield * this.ds.iterate(this, [user, 0], [user, Number.MAX_VALUE], function * (n) {
@ -687,21 +693,34 @@ export default function extendTransaction (Y) {
/*
A DeleteSet (ds) describes all the deleted ops in the OS
*/
* getDeleteSet () {
var ds = {}
* writeDeleteSet (encoder) {
var ds = new Map()
yield * this.ds.iterate(this, null, null, function * (n) {
var user = n.id[0]
var counter = n.id[1]
var len = n.len
var gc = n.gc
var dv = ds[user]
var dv = ds.get(user)
if (dv === void 0) {
dv = []
ds[user] = dv
ds.set(user, dv)
}
dv.push([counter, len, gc])
})
return ds
let keys = Array.from(ds.keys())
encoder.writeUint32(keys.length)
for (var i = 0; i < keys.length; i++) {
let user = keys[i]
let deletions = ds.get(user)
encoder.writeVarUint(user)
encoder.writeVarUint(deletions.length)
for (var j = 0; j < deletions.length; j++) {
let del = deletions[j]
encoder.writeVarUint(del[0])
encoder.writeVarUint(del[1])
encoder.writeUint8(del[2] ? 1 : 0)
}
}
}
* isDeleted (id) {
var n = yield * this.ds.findWithUpperBound(id)
@ -713,7 +732,8 @@ export default function extendTransaction (Y) {
}
* addOperation (op) {
yield * this.os.put(op)
if (this.store.y.connector.isSynced && this.store.forwardAppliedOperations && typeof op.id[1] !== 'string') {
// case op is created by this user, op is already broadcasted in applyCreatedOperations
if (op.id[0] !== this.store.userId && this.store.forwardAppliedOperations && typeof op.id[1] !== 'string') {
// is connected, and this is not going to be send in addOperation
this.store.y.connector.broadcastOps([op])
}
@ -822,11 +842,11 @@ export default function extendTransaction (Y) {
}
* getOperation (id/* :any */)/* :Transaction<any> */ {
var o = yield * this.os.find(id)
if (id[0] !== -1 || o != null) {
if (id[0] !== 0xFFFFFF || o != null) {
return o
} else { // type is string
// generate this operation?
var comp = id[1].split(-1)
var comp = id[1].split('_')
if (comp.length > 1) {
var struct = comp[0]
var op = Y.Struct[struct].create(id)
@ -879,6 +899,18 @@ export default function extendTransaction (Y) {
})
return ss
}
* writeStateSet (encoder) {
let lenPosition = encoder.pos
let len = 0
encoder.writeUint32(0)
yield * this.ss.iterate(this, null, null, function * (n) {
encoder.writeVarUint(n.id[0])
encoder.writeVarUint(n.clock)
len++
})
encoder.setUint32(lenPosition, len)
return len === 0
}
/*
Here, we make all missing operations executable for the receiving user.
@ -928,17 +960,17 @@ export default function extendTransaction (Y) {
* getOperations (startSS) {
// TODO: use bounds here!
if (startSS == null) {
startSS = {}
startSS = new Map()
}
var send = []
var endSV = yield * this.getStateVector()
for (let endState of endSV) {
let user = endState.user
if (user === -1) {
if (user === 0xFFFFFF) {
continue
}
let startPos = startSS[user] || 0
let startPos = startSS.get(user) || 0
if (startPos > 0) {
// There is a change that [user, startPos] is in a composed Insertion (with a smaller counter)
// find out if that is the case
@ -948,19 +980,19 @@ export default function extendTransaction (Y) {
startPos = firstMissing.id[1]
}
}
startSS[user] = startPos
startSS.set(user, startPos)
}
for (let endState of endSV) {
let user = endState.user
let startPos = startSS[user]
if (user === -1) {
let startPos = startSS.get(user)
if (user === 0xFFFFFF) {
continue
}
yield * this.os.iterate(this, [user, startPos], [user, Number.MAX_VALUE], function * (op) {
op = Y.Struct[op.struct].encode(op)
if (op.struct !== 'Insert') {
send.push(op)
} else if (op.right == null || op.right[1] < (startSS[op.right[0]] || 0)) {
} else if (op.right == null || op.right[1] < (startSS.get(op.right[0]) || 0)) {
// case 1. op.right is known
// this case is only reached if op.right is known.
// => this is not called for op.left, as op.right is unknown
@ -978,7 +1010,7 @@ export default function extendTransaction (Y) {
op.left = null
send.push(op)
/* not necessary, as o is already sent..
if (!Y.utils.compareIds(o.id, op.id) && o.id[1] >= (startSS[o.id[0]] || 0)) {
if (!Y.utils.compareIds(o.id, op.id) && o.id[1] >= (startSS.get(o.id[0]) || 0)) {
// o is not op && o is unknown
o = Y.Struct[op.struct].encode(o)
o.right = missingOrigins[missingOrigins.length - 1].id
@ -992,7 +1024,7 @@ export default function extendTransaction (Y) {
while (missingOrigins.length > 0 && Y.utils.matchesId(o, missingOrigins[missingOrigins.length - 1].origin)) {
missingOrigins.pop()
}
if (o.id[1] < (startSS[o.id[0]] || 0)) {
if (o.id[1] < (startSS.get(o.id[0]) || 0)) {
// case 2. o is known
op.left = Y.utils.getLastId(o)
send.push(op)
@ -1024,28 +1056,48 @@ export default function extendTransaction (Y) {
}
return send.reverse()
}
* writeOperations (encoder, decoder) {
let ss = new Map()
let ssLength = decoder.readUint32()
for (let i = 0; i < ssLength; i++) {
let user = decoder.readUint32()
let clock = decoder.readUint32()
ss.set(user, clock)
}
let ops = yield * this.getOperations(ss)
encoder.writeUint32(ops.length)
for (let i = 0; i < ops.length; i++) {
let op = ops[i]
Y.Struct[op.struct].binaryEncode(encoder, Y.Struct[op.struct].encode(op))
}
}
/*
* Get the plain untransformed operations from the database.
* You can apply these operations using .applyOperationsUntransformed(ops)
*
*/
* getOperationsUntransformed () {
var ops = []
* writeOperationsUntransformed (encoder) {
let lenPosition = encoder.pos
let len = 0
encoder.writeUint32(0) // placeholder
yield * this.os.iterate(this, null, null, function * (op) {
if (op.id[0] !== -1) {
ops.push(op)
if (op.id[0] !== 0xFFFFFF) {
len++
Y.Struct[op.struct].binaryEncode(encoder, Y.Struct[op.struct].encode(op))
}
})
return {
untransformed: ops
}
encoder.setUint32(lenPosition, len)
yield * this.writeStateSet(encoder)
}
* applyOperationsUntransformed (m, stateSet) {
var ops = m.untransformed
for (var i = 0; i < ops.length; i++) {
var op = ops[i]
// create, and modify parent, if it is created implicitly
if (op.parent != null && op.parent[0] === -1) {
* applyOperationsUntransformed (decoder) {
let len = decoder.readUint32()
for (let i = 0; i < len; i++) {
let op = decoder.readOperation()
yield * this.os.put(op)
}
yield * this.os.iterate(this, null, null, function * (op) {
if (op.parent != null && op.parent[0] === 0xFFFFFF) {
if (op.struct === 'Insert') {
// update parents .map/start/end properties
if (op.parentSub != null && op.left == null) {
@ -1065,12 +1117,14 @@ export default function extendTransaction (Y) {
}
}
}
yield * this.os.put(op)
}
for (var user in stateSet) {
})
let stateSetLength = decoder.readUint32()
for (let i = 0; i < stateSetLength; i++) {
let user = decoder.readVarUint()
let clock = decoder.readVarUint()
yield * this.ss.put({
id: [user],
clock: stateSet[user]
clock: clock
})
}
}

View File

@ -1,9 +1,10 @@
import debug from 'debug'
import extendConnector from './Connector.js'
import extendDatabase from './Database.js'
import extendTransaction from './Transaction.js'
import extendStruct from './Struct.js'
import extendUtils from './Utils.js'
import debug from 'debug'
import { formatYjsMessage, formatYjsMessageType } from './MessageHandler.js'
extendConnector(Y)
extendDatabase(Y)
@ -12,6 +13,8 @@ extendStruct(Y)
extendUtils(Y)
Y.debug = debug
debug.formatters.Y = formatYjsMessage
debug.formatters.y = formatYjsMessageType
var requiringModules = {}
@ -169,7 +172,7 @@ class YConfig extends Y.utils.NamedEventHandler {
var typeName = typeConstructor.splice(0, 1)
var type = Y[typeName]
var typedef = type.typeDefinition
var id = [-1, typedef.struct + -1 + typeName + -1 + propertyname + -1 + typeConstructor]
var id = [0xFFFFFF, typedef.struct + '_' + typeName + '_' + propertyname + '_' + typeConstructor]
var args = []
if (typeConstructor.length === 1) {
try {

View File

@ -1,16 +1,15 @@
import { test } from 'cutest'
import Chance from 'chance'
import Y from '../src/y.js'
import { BinaryLength, BinaryEncoder, BinaryDecoder } from '../src/Encoding.js'
import { BinaryEncoder, BinaryDecoder } from '../src/Encoding.js'
import { applyRandomTests } from '../../y-array/test/testGeneration.js'
function testEncoding (t, write, read, val) {
let binLength = new BinaryLength()
write(binLength, val)
let encoder = new BinaryEncoder(binLength)
let encoder = new BinaryEncoder()
write(encoder, val)
let reader = new BinaryDecoder(encoder.dataview)
let reader = new BinaryDecoder(encoder.createBuffer())
let result = read(reader)
t.log(`string encode: ${JSON.stringify(val).length} bytes / binary encode: ${encoder.dataview.buffer.byteLength} bytes`)
t.log(`string encode: ${JSON.stringify(val).length} bytes / binary encode: ${encoder.data.length} bytes`)
t.compare(val, result, 'Compare results')
}
@ -219,3 +218,4 @@ test('encode/decode Map operations', async function binMap (t) {
info: 400
})
})

View File

@ -12,6 +12,33 @@ export let Y = _Y
Y.extend(yMemory, yArray, yMap, yTest)
function * getStateSet () {
var ss = {}
yield * this.ss.iterate(this, null, null, function * (n) {
var user = n.id[0]
var clock = n.clock
ss[user] = clock
})
return ss
}
function * getDeleteSet () {
var ds = {}
yield * this.ds.iterate(this, null, null, function * (n) {
var user = n.id[0]
var counter = n.id[1]
var len = n.len
var gc = n.gc
var dv = ds[user]
if (dv === void 0) {
dv = []
ds[user] = dv
}
dv.push([counter, len, gc])
})
return ds
}
export async function garbageCollectUsers (t, users) {
await flushAll(t, users)
await Promise.all(users.map(u => u.db.emptyGarbageCollector()))
@ -60,10 +87,14 @@ export async function compareUsers (t, users) {
var data = await Promise.all(users.map(async (u) => {
var data = {}
u.db.requestTransaction(function * () {
var os = yield * this.getOperationsUntransformed()
let ops = []
yield * this.os.iterate(this, null, null, function * (op) {
ops.push(Y.Struct[op.struct].encode(op))
})
data.os = {}
for (let i = 0; i < os.untransformed.length; i++) {
let op = os.untransformed[i]
for (let i = 0; i < ops.length; i++) {
let op = ops[i]
op = Y.Struct[op.struct].encode(op)
delete op.origin
/*
@ -79,8 +110,8 @@ export async function compareUsers (t, users) {
data.os[JSON.stringify(op.id)] = op
}
}
data.ds = yield * this.getDeleteSet()
data.ss = yield * this.getStateSet()
data.ds = yield * getDeleteSet.apply(this)
data.ss = yield * getStateSet.apply(this)
})
await u.db.whenTransactionsFinished()
return data

View File

@ -1,5 +1,6 @@
/* global Y */
import { wait } from './helper.js'
import { formatYjsMessage } from '../src/MessageHandler.js'
var rooms = {}
@ -28,7 +29,6 @@ export class TestRoom {
})
}
send (sender, receiver, m) {
m = JSON.parse(JSON.stringify(m))
var user = this.users.get(receiver)
if (user != null) {
user.receiveMessage(sender, m)
@ -81,14 +81,25 @@ export default function extendTestConnector (Y) {
this.testRoom.leave(this)
return super.disconnect()
}
logBufferParsed () {
console.log(' === Logging buffer of user ' + this.userId + ' === ')
for (let [user, conn] of this.connections) {
console.log(` ${user}:`)
for (let i = 0; i < conn.buffer.length; i++) {
console.log(formatYjsMessage(conn.buffer[i]))
}
}
}
reconnect () {
this.testRoom.join(this)
return super.reconnect()
}
send (uid, message) {
super.send(uid, message)
this.testRoom.send(this.userId, uid, message)
}
broadcast (message) {
super.broadcast(message)
this.testRoom.broadcast(this.userId, message)
}
async whenSynced (f) {