From a19cfa1465c24b849d99dda74ef14007e30e2899 Mon Sep 17 00:00:00 2001
From: Kevin Jahns <kevin.jahns@rwth-aachen.de>
Date: Sat, 22 Jul 2017 18:06:00 +0200
Subject: [PATCH] redesigned connector protocol - enabled binary compression

---
 src/Connector.js            | 242 ++++++++++++------------------------
 src/Database.js             |  10 +-
 src/Encoding.js             | 123 ++++++++++--------
 src/MessageHandler.js       | 172 +++++++++++++++++++++++++
 src/Struct.js               |  14 +++
 src/Transaction.js          | 136 ++++++++++++++------
 src/y.js                    |   7 +-
 test/encode-decode.js       |  12 +-
 tests-lib/helper.js         |  41 +++++-
 tests-lib/test-connector.js |  13 +-
 10 files changed, 498 insertions(+), 272 deletions(-)
 create mode 100644 src/MessageHandler.js

diff --git a/src/Connector.js b/src/Connector.js
index 494e9552..8c0c05c8 100644
--- a/src/Connector.js
+++ b/src/Connector.js
@@ -1,32 +1,11 @@
-/* @flow */
-'use strict'
-
-function canRead (auth) { return auth === 'read' || auth === 'write' }
-function canWrite (auth) { return auth === 'write' }
+import { BinaryEncoder, BinaryDecoder } from './Encoding.js'
+import { computeMessageSyncStep1, computeMessageSyncStep2, computeMessageUpdate } from './MessageHandler.js'
 
 export default function extendConnector (Y/* :any */) {
   class AbstractConnector {
-    /* ::
-    y: YConfig;
-    role: SyncRole;
-    connections: Object;
-    isSynced: boolean;
-    userEventListeners: Array<Function>;
-    whenSyncedListeners: Array<Function>;
-    currentSyncTarget: ?UserId;
-    debug: boolean;
-    syncStep2: Promise;
-    userId: UserId;
-    send: Function;
-    broadcast: Function;
-    broadcastOpBuffer: Array<Operation>;
-    protocolVersion: number;
-    */
     /*
       opts contains the following information:
        role : String Role of this client ("master" or "slave")
-       userId : String Uniquely defines the user.
-       debug: Boolean Whether to print debug messages (optional)
     */
     constructor (y, opts) {
       this.y = y
@@ -63,15 +42,6 @@ export default function extendConnector (Y/* :any */) {
         this.setUserId(Y.utils.generateUserId())
       }
     }
-    resetAuth (auth) {
-      if (this.authInfo !== auth) {
-        this.authInfo = auth
-        this.broadcast({
-          type: 'auth',
-          auth: this.authInfo
-        })
-      }
-    }
     reconnect () {
       this.log('reconnecting..')
       return this.y.db.startGarbageCollector()
@@ -137,8 +107,10 @@ export default function extendConnector (Y/* :any */) {
       }
       this.log('%s: User joined %s', this.userId, user)
       this.connections.set(user, {
+        uid: user,
         isSynced: false,
-        role: role
+        role: role,
+        processAfterAuth: []
       })
       let defer = {}
       defer.promise = new Promise(function (resolve) { defer.resolve = resolve })
@@ -179,19 +151,14 @@ export default function extendConnector (Y/* :any */) {
       if (syncUser != null) {
         this.currentSyncTarget = syncUser
         this.y.db.requestTransaction(function * () {
-          var stateSet = yield * this.getStateSet()
-          // var deleteSet = yield * this.getDeleteSet()
-          var answer = {
-            type: 'sync step 1',
-            stateSet: stateSet,
-            // deleteSet: deleteSet,
-            protocolVersion: conn.protocolVersion,
-            auth: conn.authInfo
-          }
-          if (conn.preferUntransformed && Object.keys(stateSet).length === 0) {
-            answer.preferUntransformed = true
-          }
-          conn.send(syncUser, answer)
+          let encoder = new BinaryEncoder()
+          encoder.writeVarString('sync step 1')
+          encoder.writeVarString(conn.authInfo || '')
+          encoder.writeVarUint(conn.protocolVersion)
+          let preferUntransformed = conn.preferUntransformed && this.os.length === 0 // TODO: length may not be defined
+          encoder.writeUint8(preferUntransformed ? 1 : 0)
+          yield * this.writeStateSet(encoder)
+          conn.send(syncUser, encoder.createBuffer())
         })
       } else {
         if (!conn.isSynced) {
@@ -211,13 +178,13 @@ export default function extendConnector (Y/* :any */) {
         }
       }
     }
-    send (uid, message) {
-      this.log('%s: Send \'%s\' to %s', this.userId, message.type, uid)
-      this.logMessage('Message: %j', message)
+    send (uid, buffer) {
+      this.log('%s: Send \'%y\' to %s', this.userId, buffer, uid)
+      this.logMessage('Message: %Y', buffer)
     }
-    broadcast (message) {
-      this.log('%s: Broadcast \'%s\'', this.userId, message.type)
-      this.logMessage('Message: %j', message)
+    broadcast (buffer) {
+      this.log('%s: Broadcast \'%y\'', this.userId, buffer)
+      this.logMessage('Message: %Y', buffer)
     }
     /*
       Buffer operations, and broadcast them when ready.
@@ -229,11 +196,17 @@ export default function extendConnector (Y/* :any */) {
       var self = this
       function broadcastOperations () {
         if (self.broadcastOpBuffer.length > 0) {
-          self.broadcast({
-            type: 'update',
-            ops: self.broadcastOpBuffer
-          })
+          let encoder = new BinaryEncoder()
+          encoder.writeVarString('update')
+          let ops = self.broadcastOpBuffer
           self.broadcastOpBuffer = []
+          let length = ops.length
+          encoder.writeUint32(length)
+          for (var i = 0; i < length; i++) {
+            let op = ops[i]
+            Y.Struct[op.struct].binaryEncode(encoder, op)
+          }
+          self.broadcast(encoder.createBuffer())
         }
       }
       if (this.broadcastOpBuffer.length === 0) {
@@ -246,119 +219,60 @@ export default function extendConnector (Y/* :any */) {
     /*
       You received a raw message, and you know that it is intended for Yjs. Then call this function.
     */
-    receiveMessage (sender/* :UserId */, message/* :Message */) {
+    async receiveMessage (sender, buffer) {
       if (sender === this.userId) {
-        return Promise.resolve()
+        return
       }
-      this.log('%s: Receive \'%s\' from %s', this.userId, message.type, sender)
-      this.logMessage('Message: %j', message)
-      if (message.protocolVersion != null && message.protocolVersion !== this.protocolVersion) {
-        console.warn(
-          `You tried to sync with a yjs instance that has a different protocol version
-          (You: ${this.protocolVersion}, Client: ${message.protocolVersion}).
-          The sync was stopped. You need to upgrade your dependencies (especially Yjs & the Connector)!
-          `)
-        this.send(sender, {
-          type: 'sync stop',
-          protocolVersion: this.protocolVersion
-        })
-        return Promise.reject(new Error('Incompatible protocol version'))
-      }
-      if (message.auth != null && this.connections.has(sender)) {
-        // authenticate using auth in message
-        var auth = this.checkAuth(message.auth, this.y)
-        this.connections.get(sender).auth = auth
-        auth.then(auth => {
-          for (var f of this.userEventListeners) {
-            f({
-              action: 'userAuthenticated',
-              user: sender,
-              auth: auth
-            })
-          }
-        })
-      } else if (this.connections.has(sender) && this.connections.get(sender).auth == null) {
-        // authenticate without otherwise
-        this.connections.get(sender).auth = this.checkAuth(null, this.y)
-      }
-      if (this.connections.has(sender) && this.connections.get(sender).auth != null) {
-        return this.connections.get(sender).auth.then(auth => {
-          if (message.type === 'sync step 1' && canRead(auth)) {
-            let conn = this
-            let m = message
-            let wait // wait for sync step 2 to complete
-            if (this.role === 'slave') {
-              wait = Promise.all(Array.from(this.connections.values())
-                .filter(conn => conn.role === 'master')
-                .map(conn => conn.syncStep2.promise)
-              )
-            } else {
-              wait = Promise.resolve()
-            }
-            wait.then(() => {
-              this.y.db.requestTransaction(function * () {
-                var currentStateSet = yield * this.getStateSet()
-                // TODO: remove
-                // if (canWrite(auth)) {
-                //  yield * this.applyDeleteSet(m.deleteSet)
-                // }
+      let decoder = new BinaryDecoder(buffer)
+      let encoder = new BinaryEncoder()
+      let messageType = decoder.readVarString()
+      let senderConn = this.connections.get(sender)
 
-                var ds = yield * this.getDeleteSet()
-                var answer = {
-                  type: 'sync step 2',
-                  stateSet: currentStateSet,
-                  deleteSet: ds,
-                  protocolVersion: conn.protocolVersion,
-                  auth: conn.authInfo
-                }
-                if (message.preferUntransformed === true && Object.keys(m.stateSet).length === 0) {
-                  answer.osUntransformed = yield * this.getOperationsUntransformed()
-                } else {
-                  answer.os = yield * this.getOperations(m.stateSet)
-                }
-                conn.send(sender, answer)
-              })
-            })
-          } else if (message.type === 'sync step 2' && canWrite(auth)) {
-            var db = this.y.db
-            let defer = this.connections.get(sender).syncStep2
-            let m = message
-            // apply operations first
-            db.requestTransaction(function * () {
-              // yield * this.applyDeleteSet(m.deleteSet)
-              if (m.osUntransformed != null) {
-                yield * this.applyOperationsUntransformed(m.osUntransformed, m.stateSet)
-              } else {
-                this.store.apply(m.os)
-              }
-              // defer.resolve()
-            })
-            // then apply ds
-            db.whenTransactionsFinished().then(() => {
-              db.requestTransaction(function * () {
-                yield * this.applyDeleteSet(m.deleteSet)
-              })
-              defer.resolve()
-            })
-            var self = this
-            this.connections.get(sender).syncStep2.promise.then(function () {
-              self._setSyncedWith(sender)
-            })
-            return defer.promise
-          } else if (message.type === 'update' && canWrite(auth)) {
-            if (this.y.db.forwardAppliedOperations) {
-              var delops = message.ops.filter(function (o) {
-                return o.struct === 'Delete'
-              })
-              if (delops.length > 0) {
-                this.broadcastOps(delops)
-              }
+      if (senderConn == null) {
+        throw new Error('Received message from unknown peer!')
+      }
+
+      if (messageType === 'sync step 1' || messageType === 'sync step 2') {
+        let auth = decoder.readVarUint()
+        if (senderConn.auth == null) {
+          // check auth
+          let authPermissions = await this.checkAuth(auth, this.y, sender)
+          senderConn.auth = authPermissions
+          this.y.emit('userAuthenticated', {
+            user: senderConn.uid,
+            auth: authPermissions
+          })
+          senderConn.syncStep2.promise.then(() => {
+            if (senderConn.processAfterAuth == null) {
+              return
             }
-            this.y.db.apply(message.ops)
-          }
-        })
+            for (let i = 0; i < senderConn.processAfterAuth.length; i++) {
+              let m = senderConn.processAfterAuth[i]
+              this.receiveMessage(m[0], m[1])
+            }
+            senderConn.processAfterAuth = null
+          })
+        }
+      }
+
+      if (senderConn.auth == null) {
+        senderConn.processAfterAuth.push([sender, buffer])
+        return
+      }
+
+      this.log('%s: Receive \'%s\' from %s', this.userId, messageType, sender)
+      this.logMessage('Message: %Y', buffer)
+
+      if (messageType === 'sync step 1' && (senderConn.auth === 'write' || senderConn.auth === 'read')) {
+        // cannot wait for sync step 1 to finish, because we may wait for sync step 2 in sync step 1 (->lock)
+        computeMessageSyncStep1(decoder, encoder, this, senderConn, sender)
+        return this.y.db.whenTransactionsFinished()
+      } else if (messageType === 'sync step 2' && senderConn.auth === 'write') {
+        return computeMessageSyncStep2(decoder, encoder, this, senderConn, sender)
+      } else if (messageType === 'update' && senderConn.auth === 'write') {
+        return computeMessageUpdate(decoder, encoder, this, senderConn, sender)
       } else {
-        return Promise.reject(new Error('Unable to deliver message'))
+        console.error('Unable to receive message')
       }
     }
     _setSyncedWith (user) {
diff --git a/src/Database.js b/src/Database.js
index a5cf3976..e3656c7d 100644
--- a/src/Database.js
+++ b/src/Database.js
@@ -306,10 +306,12 @@ export default function extendDatabase (Y /* :any */) {
       * check if it is an expected op (otherwise wait for it)
       * check if was deleted, apply a delete operation after op was applied
     */
-    apply (ops) {
+    applyOperations (decoder) {
       this.opsReceivedTimestamp = new Date()
-      for (var i = 0; i < ops.length; i++) {
-        var o = ops[i]
+      let length = decoder.readUint32()
+
+      for (var i = 0; i < length; i++) {
+        let o = Y.Struct.binaryDecodeOperation(decoder)
         if (o.id == null || o.id[0] !== this.y.connector.userId) {
           var required = Y.Struct[o.struct].requiredOps(o)
           if (o.requires != null) {
@@ -590,7 +592,7 @@ export default function extendDatabase (Y /* :any */) {
       op.type = typedefinition[0].name
 
       this.requestTransaction(function * () {
-        if (op.id[0] === -1) {
+        if (op.id[0] === 0xFFFFFF) {
           yield * this.setOperation(op)
         } else {
           yield * this.applyCreatedOperations([op])
diff --git a/src/Encoding.js b/src/Encoding.js
index b1669c55..5faa5afe 100644
--- a/src/Encoding.js
+++ b/src/Encoding.js
@@ -1,102 +1,127 @@
 import utf8 from 'utf-8'
 
 const bits7 = 0b1111111
-
-export class BinaryLength {
-  constructor () {
-    this.length = 0
-  }
-  writeUint8 (num) {
-    this.length++
-  }
-  writeVarUint (num) {
-    while (num >= 0b10000000) {
-      this.length++
-      num >>= 7
-    }
-    this.length++
-  }
-  writeVarString (str) {
-    let len = utf8.setBytesFromString(str).length
-    this.writeVarUint(len)
-    this.length += len
-  }
-  writeOpID (id) {
-    this.writeVarUint(id[0])
-    this.writeVarUint(id[1])
-  }
-}
+const bits8 = 0b11111111
 
 export class BinaryEncoder {
-  constructor (binaryLength) {
-    this.dataview = new DataView(new ArrayBuffer(binaryLength.length))
-    this.pos = 0
+  constructor () {
+    this.data = []
+  }
+  get pos () {
+    return this.data.length
+  }
+  createBuffer () {
+    return Uint8Array.from(this.data).buffer
   }
   writeUint8 (num) {
-    this.dataview.setUint8(this.pos++, num)
+    this.data.push(num & bits8)
+  }
+  setUint8 (pos, num) {
+    this.data[pos] = num & bits8
+  }
+  writeUint16 (num) {
+    this.data.push(num & bits8, (num >> 8) & bits8)
+  }
+  setUint16 (pos, num) {
+    this.data[pos] = num & bits8
+    this.data[pos + 1] = (num >> 8) & bits8
+  }
+  writeUint32 (num) {
+    for (let i = 0; i < 4; i++) {
+      this.data.push(num & bits8)
+      num >>= 8
+    }
+  }
+  setUint32 (pos, num) {
+    for (let i = 0; i < 4; i++) {
+      this.data[pos + i] = num & bits8
+      num >>= 8
+    }
   }
   writeVarUint (num) {
     while (num >= 0b10000000) {
-      this.dataview.setUint8(this.pos++, 0b10000000 | (bits7 & num))
+      this.data.push(0b10000000 | (bits7 & num))
       num >>= 7
     }
-    this.dataview.setUint8(this.pos++, bits7 & num)
+    this.data.push(bits7 & num)
   }
   writeVarString (str) {
     let bytes = utf8.setBytesFromString(str)
     let len = bytes.length
     this.writeVarUint(len)
     for (let i = 0; i < len; i++) {
-      this.dataview.setUint8(this.pos++, bytes[i])
+      this.data.push(bytes[i])
     }
   }
   writeOpID (id) {
-    this.writeVarUint(id[0])
-    this.writeVarUint(id[1])
+    let user = id[0]
+    this.writeVarUint(user)
+    if (user !== 0xFFFFFF) {
+      this.writeVarUint(id[1])
+    } else {
+      this.writeVarString(id[1])
+    }
   }
 }
 
 export class BinaryDecoder {
-  constructor (dataview) {
-    this.dataview = dataview
+  constructor (buffer) {
+    this.uint8arr = new Uint8Array(buffer)
     this.pos = 0
   }
   skip8 () {
     this.pos++
   }
-  skip16 () {
-    this.pos += 2
-  }
-  skip32 () {
-    this.pos += 4
-  }
-  skipVar () {
-    while (this.dataview.getUint8(this.pos++) >= 1 << 7) { }
-  }
   readUint8 () {
-    return this.dataview.getUint8(this.pos++)
+    return this.uint8arr[this.pos++]
+  }
+  readUint32 () {
+    let uint =
+      this.uint8arr[this.pos] +
+      (this.uint8arr[this.pos + 1] << 8) +
+      (this.uint8arr[this.pos + 2] << 16) +
+      (this.uint8arr[this.pos + 3] << 24)
+    this.pos += 4
+    return uint
+  }
+  peekUint8 () {
+    return this.uint8arr[this.pos]
   }
   readVarUint () {
     let num = 0
     let len = 0
     while (true) {
-      let r = this.dataview.getUint8(this.pos++)
+      let r = this.uint8arr[this.pos++]
       num = num | ((r & bits7) << len)
       len += 7
       if (r < 1 << 7) {
         return num
       }
+      if (len > 35) {
+        throw new Error('Integer out of range!')
+      }
     }
   }
   readVarString () {
     let len = this.readVarUint()
     let bytes = new Array(len)
     for (let i = 0; i < len; i++) {
-      bytes[i] = this.dataview.getUint8(this.pos++)
+      bytes[i] = this.uint8arr[this.pos++]
     }
     return utf8.getStringFromBytes(bytes)
   }
+  peekVarString () {
+    let pos = this.pos
+    let s = this.readVarString()
+    this.pos = pos
+    return s
+  }
   readOpID () {
-    return [this.readVarUint(), this.readVarUint()]
+    let user = this.readVarUint()
+    if (user !== 0xFFFFFF) {
+      return [user, this.readVarUint()]
+    } else {
+      return [user, this.readVarString()]
+    }
   }
 }
diff --git a/src/MessageHandler.js b/src/MessageHandler.js
new file mode 100644
index 00000000..161f1985
--- /dev/null
+++ b/src/MessageHandler.js
@@ -0,0 +1,172 @@
+
+import Y from './y.js'
+import { BinaryDecoder } from './Encoding.js'
+
+export function formatYjsMessage (buffer) {
+  let decoder = new BinaryDecoder(buffer)
+  let type = decoder.readVarString()
+  let strBuilder = []
+  strBuilder.push('\n === ' + type + ' ===\n')
+  if (type === 'update') {
+    logMessageUpdate(decoder, strBuilder)
+  } else if (type === 'sync step 1') {
+    logMessageSyncStep1(decoder, strBuilder)
+  } else if (type === 'sync step 2') {
+    logMessageSyncStep2(decoder, strBuilder)
+  } else {
+    strBuilder.push('-- Unknown message type - probably an encoding issue!!!')
+  }
+  return strBuilder.join('')
+}
+
+export function formatYjsMessageType (buffer) {
+  let decoder = new BinaryDecoder(buffer)
+  return decoder.readVarString()
+}
+
+export async function logMessageUpdate (decoder, strBuilder) {
+  let len = decoder.readUint32()
+  for (let i = 0; i < len; i++) {
+    strBuilder.push(JSON.stringify(Y.Struct.binaryDecodeOperation(decoder)) + '\n')
+  }
+}
+
+export async function computeMessageUpdate (decoder, encoder, conn) {
+  if (conn.y.db.forwardAppliedOperations) {
+    let messagePosition = decoder.pos
+    let len = decoder.readUint32()
+    let delops = []
+    for (let i = 0; i < len; i++) {
+      let op = Y.Struct.binaryDecodeOperation(decoder)
+      if (op.struct === 'Delete') {
+        delops.push(op)
+      }
+    }
+    if (delops.length > 0) {
+      conn.broadcastOps(delops)
+    }
+    decoder.pos = messagePosition
+  }
+  conn.y.db.applyOperations(decoder)
+}
+
+export function logMessageSyncStep1 (decoder, strBuilder) {
+  let auth = decoder.readVarString()
+  let protocolVersion = decoder.readVarUint()
+  let preferUntransformed = decoder.readUint8() === 1
+  strBuilder.push(`
+  - auth: "${auth}"
+  - protocolVersion: ${protocolVersion}
+  - preferUntransformed: ${preferUntransformed}
+`)
+  logSS(decoder, strBuilder)
+}
+
+export async function computeMessageSyncStep1 (decoder, encoder, conn, senderConn, sender) {
+  let protocolVersion = decoder.readVarUint()
+  let preferUntransformed = decoder.readUint8() === 1
+
+  // check protocol version
+  if (protocolVersion !== conn.protocolVersion) {
+    console.warn(
+      `You tried to sync with a yjs instance that has a different protocol version
+      (You: ${protocolVersion}, Client: ${protocolVersion}).
+      The sync was stopped. You need to upgrade your dependencies (especially Yjs & the Connector)!
+      `)
+    conn.y.destroy()
+  }
+
+  if (conn.role === 'slave') {
+    // wait for sync step 2 to complete
+    await Promise.all(Array.from(conn.connections.values())
+      .filter(conn => conn.role === 'master')
+      .map(conn => conn.syncStep2.promise)
+    )
+  }
+  conn.y.db.requestTransaction(function * () {
+    encoder.writeVarString('sync step 2')
+    encoder.writeVarString(conn.authInfo || '')
+    let emptyStateSet = this.ds.length === 0 // TODO: length may not always be available
+
+    if (preferUntransformed && emptyStateSet) {
+      encoder.writeUint8(1)
+      yield * this.writeOperationsUntransformed(encoder)
+    } else {
+      encoder.writeUint8(0)
+      yield * this.writeOperations(encoder, decoder)
+    }
+
+    yield * this.writeDeleteSet(encoder)
+    conn.send(senderConn.uid, encoder.createBuffer())
+  })
+  await conn.y.db.whenTransactionsFinished()
+}
+
+export function logSS (decoder, strBuilder) {
+  strBuilder.push('  == SS: \n')
+  let len = decoder.readUint32()
+  for (let i = 0; i < len; i++) {
+    let user = decoder.readVarUint()
+    let clock = decoder.readVarUint()
+    strBuilder.push(`     - ${user}: ${clock}`)
+  }
+}
+
+export function logOS (decoder, strBuilder) {
+  strBuilder.push('  == OS: \n')
+  let len = decoder.readUint32()
+  for (let i = 0; i < len; i++) {
+    let op = Y.Struct.binaryDecodeOperation(decoder)
+    strBuilder.push(JSON.stringify(op) + '\n')
+  }
+}
+
+export function logDS (decoder, strBuilder) {
+  strBuilder.push('  == DS: \n')
+  let len = decoder.readUint32()
+  for (let i = 0; i < len; i++) {
+    let user = decoder.readVarUint()
+    strBuilder.push(`    User: ${user}: `)
+    let len2 = decoder.readVarUint()
+    for (let j = 0; j < len2; j++) {
+      let from = decoder.readVarUint()
+      let to = decoder.readVarUint()
+      let gc = decoder.readUint8() === 1
+      strBuilder.push(`[${from}, ${to}, ${gc}]`)
+    }
+  }
+}
+
+export function logMessageSyncStep2 (decoder, strBuilder) {
+  strBuilder.push('     - auth: ' + decoder.readVarString() + '\n')
+  let osTransformed = decoder.readUint8() === 1
+  strBuilder.push('     - osUntransformed: ' + osTransformed + '\n')
+  logOS(decoder, strBuilder)
+  if (osTransformed) {
+    logSS(decoder, strBuilder)
+  }
+  logDS(decoder, strBuilder)
+}
+
+export async function computeMessageSyncStep2 (decoder, encoder, conn, senderConn, sender) {
+  var db = conn.y.db
+  let defer = senderConn.syncStep2
+
+  // apply operations first
+  db.requestTransaction(function * () {
+    let osUntransformed = decoder.readUint8()
+    if (osUntransformed === 1) {
+      yield * this.applyOperationsUntransformed(decoder)
+    } else {
+      this.store.applyOperations(decoder)
+    }
+  })
+  // then apply ds
+  await db.whenTransactionsFinished()
+  db.requestTransaction(function * () {
+    yield * this.applyDeleteSet(decoder)
+  })
+  await db.whenTransactionsFinished()
+  conn._setSyncedWith(sender)
+  defer.resolve()
+}
diff --git a/src/Struct.js b/src/Struct.js
index a766e4de..6b2bbdb2 100644
--- a/src/Struct.js
+++ b/src/Struct.js
@@ -23,6 +23,20 @@ const CMAP = 3
 */
 export default function extendStruct (Y) {
   var Struct = {
+    binaryDecodeOperation: function (decoder) {
+      let code = decoder.peekUint8()
+      if (code === CDELETE) {
+        return Y.Struct.Delete.binaryDecode(decoder)
+      } else if (code === CINSERT) {
+        return Y.Struct.Insert.binaryDecode(decoder)
+      } else if (code === CLIST) {
+        return Y.Struct.List.binaryDecode(decoder)
+      } else if (code === CMAP) {
+        return Y.Struct.Map.binaryDecode(decoder)
+      } else {
+        throw new Error('Unable to decode operation!')
+      }
+    },
     /* This is the only operation that is actually not a structure, because
     it is not stored in the OS. This is why it _does not_ have an id
 
diff --git a/src/Transaction.js b/src/Transaction.js
index 5afe9e3d..37b13ecd 100644
--- a/src/Transaction.js
+++ b/src/Transaction.js
@@ -1,5 +1,3 @@
-/* @flow */
-'use strict'
 
 /*
   Partial definition of a transaction
@@ -96,7 +94,7 @@ export default function extendTransaction (Y) {
           send.push(Y.Struct[op.struct].encode(op))
         }
       }
-      if (this.store.y.connector.isSynced && send.length > 0) { // TODO: && !this.store.forwardAppliedOperations (but then i don't send delete ops)
+      if (send.length > 0) { // TODO: && !this.store.forwardAppliedOperations (but then i don't send delete ops)
         // is connected, and this is not going to be send in addOperation
         this.store.y.connector.broadcastOps(send)
       }
@@ -588,12 +586,20 @@ export default function extendTransaction (Y) {
       apply a delete set in order to get
       the state of the supplied ds
     */
-    * applyDeleteSet (ds) {
+    * applyDeleteSet (decoder) {
       var deletions = []
 
-      for (var user in ds) {
-        var dv = ds[user]
-        user = Number.parseInt(user, 10)
+      let dsLength = decoder.readUint32()
+      for (let i = 0; i < dsLength; i++) {
+        let user = decoder.readVarUint()
+        let dv = []
+        let dvLength = decoder.readVarUint()
+        for (let j = 0; j < dvLength; j++) {
+          let from = decoder.readVarUint()
+          let len = decoder.readVarUint()
+          let gc = decoder.readUint8() === 1
+          dv.push([from, len, gc])
+        }
         var pos = 0
         var d = dv[pos]
         yield * this.ds.iterate(this, [user, 0], [user, Number.MAX_VALUE], function * (n) {
@@ -687,21 +693,34 @@ export default function extendTransaction (Y) {
     /*
       A DeleteSet (ds) describes all the deleted ops in the OS
     */
-    * getDeleteSet () {
-      var ds = {}
+    * writeDeleteSet (encoder) {
+      var ds = new Map()
       yield * this.ds.iterate(this, null, null, function * (n) {
         var user = n.id[0]
         var counter = n.id[1]
         var len = n.len
         var gc = n.gc
-        var dv = ds[user]
+        var dv = ds.get(user)
         if (dv === void 0) {
           dv = []
-          ds[user] = dv
+          ds.set(user, dv)
         }
         dv.push([counter, len, gc])
       })
-      return ds
+      let keys = Array.from(ds.keys())
+      encoder.writeUint32(keys.length)
+      for (var i = 0; i < keys.length; i++) {
+        let user = keys[i]
+        let deletions = ds.get(user)
+        encoder.writeVarUint(user)
+        encoder.writeVarUint(deletions.length)
+        for (var j = 0; j < deletions.length; j++) {
+          let del = deletions[j]
+          encoder.writeVarUint(del[0])
+          encoder.writeVarUint(del[1])
+          encoder.writeUint8(del[2] ? 1 : 0)
+        }
+      }
     }
     * isDeleted (id) {
       var n = yield * this.ds.findWithUpperBound(id)
@@ -713,7 +732,8 @@ export default function extendTransaction (Y) {
     }
     * addOperation (op) {
       yield * this.os.put(op)
-      if (this.store.y.connector.isSynced && this.store.forwardAppliedOperations && typeof op.id[1] !== 'string') {
+      // case op is created by this user, op is already broadcasted in applyCreatedOperations
+      if (op.id[0] !== this.store.userId && this.store.forwardAppliedOperations && typeof op.id[1] !== 'string') {
         // is connected, and this is not going to be send in addOperation
         this.store.y.connector.broadcastOps([op])
       }
@@ -822,11 +842,11 @@ export default function extendTransaction (Y) {
     }
     * getOperation (id/* :any */)/* :Transaction<any> */ {
       var o = yield * this.os.find(id)
-      if (id[0] !== -1 || o != null) {
+      if (id[0] !== 0xFFFFFF || o != null) {
         return o
       } else { // type is string
         // generate this operation?
-        var comp = id[1].split(-1)
+        var comp = id[1].split('_')
         if (comp.length > 1) {
           var struct = comp[0]
           var op = Y.Struct[struct].create(id)
@@ -879,6 +899,18 @@ export default function extendTransaction (Y) {
       })
       return ss
     }
+    * writeStateSet (encoder) {
+      let lenPosition = encoder.pos
+      let len = 0
+      encoder.writeUint32(0)
+      yield * this.ss.iterate(this, null, null, function * (n) {
+        encoder.writeVarUint(n.id[0])
+        encoder.writeVarUint(n.clock)
+        len++
+      })
+      encoder.setUint32(lenPosition, len)
+      return len === 0
+    }
     /*
       Here, we make all missing operations executable for the receiving user.
 
@@ -928,17 +960,17 @@ export default function extendTransaction (Y) {
     * getOperations (startSS) {
       // TODO: use bounds here!
       if (startSS == null) {
-        startSS = {}
+        startSS = new Map()
       }
       var send = []
 
       var endSV = yield * this.getStateVector()
       for (let endState of endSV) {
         let user = endState.user
-        if (user === -1) {
+        if (user === 0xFFFFFF) {
           continue
         }
-        let startPos = startSS[user] || 0
+        let startPos = startSS.get(user) || 0
         if (startPos > 0) {
           // There is a change that [user, startPos] is in a composed Insertion (with a smaller counter)
           // find out if that is the case
@@ -948,19 +980,19 @@ export default function extendTransaction (Y) {
             startPos = firstMissing.id[1]
           }
         }
-        startSS[user] = startPos
+        startSS.set(user, startPos)
       }
       for (let endState of endSV) {
         let user = endState.user
-        let startPos = startSS[user]
-        if (user === -1) {
+        let startPos = startSS.get(user)
+        if (user === 0xFFFFFF) {
           continue
         }
         yield * this.os.iterate(this, [user, startPos], [user, Number.MAX_VALUE], function * (op) {
           op = Y.Struct[op.struct].encode(op)
           if (op.struct !== 'Insert') {
             send.push(op)
-          } else if (op.right == null || op.right[1] < (startSS[op.right[0]] || 0)) {
+          } else if (op.right == null || op.right[1] < (startSS.get(op.right[0]) || 0)) {
             // case 1. op.right is known
             // this case is only reached if op.right is known.
             // => this is not called for op.left, as op.right is unknown
@@ -978,7 +1010,7 @@ export default function extendTransaction (Y) {
                 op.left = null
                 send.push(op)
                 /* not necessary, as o is already sent..
-                if (!Y.utils.compareIds(o.id, op.id) && o.id[1] >= (startSS[o.id[0]] || 0)) {
+                if (!Y.utils.compareIds(o.id, op.id) && o.id[1] >= (startSS.get(o.id[0]) || 0)) {
                   // o is not op && o is unknown
                   o = Y.Struct[op.struct].encode(o)
                   o.right = missingOrigins[missingOrigins.length - 1].id
@@ -992,7 +1024,7 @@ export default function extendTransaction (Y) {
               while (missingOrigins.length > 0 && Y.utils.matchesId(o, missingOrigins[missingOrigins.length - 1].origin)) {
                 missingOrigins.pop()
               }
-              if (o.id[1] < (startSS[o.id[0]] || 0)) {
+              if (o.id[1] < (startSS.get(o.id[0]) || 0)) {
                 // case 2. o is known
                 op.left = Y.utils.getLastId(o)
                 send.push(op)
@@ -1024,28 +1056,48 @@ export default function extendTransaction (Y) {
       }
       return send.reverse()
     }
+
+    * writeOperations (encoder, decoder) {
+      let ss = new Map()
+      let ssLength = decoder.readUint32()
+      for (let i = 0; i < ssLength; i++) {
+        let user = decoder.readUint32()
+        let clock = decoder.readUint32()
+        ss.set(user, clock)
+      }
+      let ops = yield * this.getOperations(ss)
+      encoder.writeUint32(ops.length)
+      for (let i = 0; i < ops.length; i++) {
+        let op = ops[i]
+        Y.Struct[op.struct].binaryEncode(encoder, Y.Struct[op.struct].encode(op))
+      }
+    }
     /*
      * Get the plain untransformed operations from the database.
      * You can apply these operations using .applyOperationsUntransformed(ops)
      *
      */
-    * getOperationsUntransformed () {
-      var ops = []
+    * writeOperationsUntransformed (encoder) {
+      let lenPosition = encoder.pos
+      let len = 0
+      encoder.writeUint32(0) // placeholder
       yield * this.os.iterate(this, null, null, function * (op) {
-        if (op.id[0] !== -1) {
-          ops.push(op)
+        if (op.id[0] !== 0xFFFFFF) {
+          len++
+          Y.Struct[op.struct].binaryEncode(encoder, Y.Struct[op.struct].encode(op))
         }
       })
-      return {
-        untransformed: ops
-      }
+      encoder.setUint32(lenPosition, len)
+      yield * this.writeStateSet(encoder)
     }
-    * applyOperationsUntransformed (m, stateSet) {
-      var ops = m.untransformed
-      for (var i = 0; i < ops.length; i++) {
-        var op = ops[i]
-        // create, and modify parent, if it is created implicitly
-        if (op.parent != null && op.parent[0] === -1) {
+    * applyOperationsUntransformed (decoder) {
+      let len = decoder.readUint32()
+      for (let i = 0; i < len; i++) {
+        let op = decoder.readOperation()
+        yield * this.os.put(op)
+      }
+      yield * this.os.iterate(this, null, null, function * (op) {
+        if (op.parent != null && op.parent[0] === 0xFFFFFF) {
           if (op.struct === 'Insert') {
             // update parents .map/start/end properties
             if (op.parentSub != null && op.left == null) {
@@ -1065,12 +1117,14 @@ export default function extendTransaction (Y) {
             }
           }
         }
-        yield * this.os.put(op)
-      }
-      for (var user in stateSet) {
+      })
+      let stateSetLength = decoder.readUint32()
+      for (let i = 0; i < stateSetLength; i++) {
+        let user = decoder.readVarUint()
+        let clock = decoder.readVarUint()
         yield * this.ss.put({
           id: [user],
-          clock: stateSet[user]
+          clock: clock
         })
       }
     }
diff --git a/src/y.js b/src/y.js
index e464810c..7f43bd12 100644
--- a/src/y.js
+++ b/src/y.js
@@ -1,9 +1,10 @@
-import debug from 'debug'
 import extendConnector from './Connector.js'
 import extendDatabase from './Database.js'
 import extendTransaction from './Transaction.js'
 import extendStruct from './Struct.js'
 import extendUtils from './Utils.js'
+import debug from 'debug'
+import { formatYjsMessage, formatYjsMessageType } from './MessageHandler.js'
 
 extendConnector(Y)
 extendDatabase(Y)
@@ -12,6 +13,8 @@ extendStruct(Y)
 extendUtils(Y)
 
 Y.debug = debug
+debug.formatters.Y = formatYjsMessage
+debug.formatters.y = formatYjsMessageType
 
 var requiringModules = {}
 
@@ -169,7 +172,7 @@ class YConfig extends Y.utils.NamedEventHandler {
         var typeName = typeConstructor.splice(0, 1)
         var type = Y[typeName]
         var typedef = type.typeDefinition
-        var id = [-1, typedef.struct + -1 + typeName + -1 + propertyname + -1 + typeConstructor]
+        var id = [0xFFFFFF, typedef.struct + '_' + typeName + '_' + propertyname + '_' + typeConstructor]
         var args = []
         if (typeConstructor.length === 1) {
           try {
diff --git a/test/encode-decode.js b/test/encode-decode.js
index 2fe5f21c..c919e97f 100644
--- a/test/encode-decode.js
+++ b/test/encode-decode.js
@@ -1,16 +1,15 @@
 import { test } from 'cutest'
 import Chance from 'chance'
 import Y from '../src/y.js'
-import { BinaryLength, BinaryEncoder, BinaryDecoder } from '../src/Encoding.js'
+import { BinaryEncoder, BinaryDecoder } from '../src/Encoding.js'
+import { applyRandomTests } from '../../y-array/test/testGeneration.js'
 
 function testEncoding (t, write, read, val) {
-  let binLength = new BinaryLength()
-  write(binLength, val)
-  let encoder = new BinaryEncoder(binLength)
+  let encoder = new BinaryEncoder()
   write(encoder, val)
-  let reader = new BinaryDecoder(encoder.dataview)
+  let reader = new BinaryDecoder(encoder.createBuffer())
   let result = read(reader)
-  t.log(`string encode: ${JSON.stringify(val).length} bytes / binary encode: ${encoder.dataview.buffer.byteLength} bytes`)
+  t.log(`string encode: ${JSON.stringify(val).length} bytes / binary encode: ${encoder.data.length} bytes`)
   t.compare(val, result, 'Compare results')
 }
 
@@ -219,3 +218,4 @@ test('encode/decode Map operations', async function binMap (t) {
     info: 400
   })
 })
+
diff --git a/tests-lib/helper.js b/tests-lib/helper.js
index 1b4d8c9c..4a2d6376 100644
--- a/tests-lib/helper.js
+++ b/tests-lib/helper.js
@@ -12,6 +12,33 @@ export let Y = _Y
 
 Y.extend(yMemory, yArray, yMap, yTest)
 
+function * getStateSet () {
+  var ss = {}
+  yield * this.ss.iterate(this, null, null, function * (n) {
+    var user = n.id[0]
+    var clock = n.clock
+    ss[user] = clock
+  })
+  return ss
+}
+
+function * getDeleteSet () {
+  var ds = {}
+  yield * this.ds.iterate(this, null, null, function * (n) {
+    var user = n.id[0]
+    var counter = n.id[1]
+    var len = n.len
+    var gc = n.gc
+    var dv = ds[user]
+    if (dv === void 0) {
+      dv = []
+      ds[user] = dv
+    }
+    dv.push([counter, len, gc])
+  })
+  return ds
+}
+
 export async function garbageCollectUsers (t, users) {
   await flushAll(t, users)
   await Promise.all(users.map(u => u.db.emptyGarbageCollector()))
@@ -60,10 +87,14 @@ export async function compareUsers (t, users) {
   var data = await Promise.all(users.map(async (u) => {
     var data = {}
     u.db.requestTransaction(function * () {
-      var os = yield * this.getOperationsUntransformed()
+      let ops = []
+      yield * this.os.iterate(this, null, null, function * (op) {
+        ops.push(Y.Struct[op.struct].encode(op))
+      })
+
       data.os = {}
-      for (let i = 0; i < os.untransformed.length; i++) {
-        let op = os.untransformed[i]
+      for (let i = 0; i < ops.length; i++) {
+        let op = ops[i]
         op = Y.Struct[op.struct].encode(op)
         delete op.origin
         /*
@@ -79,8 +110,8 @@ export async function compareUsers (t, users) {
           data.os[JSON.stringify(op.id)] = op
         }
       }
-      data.ds = yield * this.getDeleteSet()
-      data.ss = yield * this.getStateSet()
+      data.ds = yield * getDeleteSet.apply(this)
+      data.ss = yield * getStateSet.apply(this)
     })
     await u.db.whenTransactionsFinished()
     return data
diff --git a/tests-lib/test-connector.js b/tests-lib/test-connector.js
index 0656ae93..42a87564 100644
--- a/tests-lib/test-connector.js
+++ b/tests-lib/test-connector.js
@@ -1,5 +1,6 @@
 /* global Y */
 import { wait } from './helper.js'
+import { formatYjsMessage } from '../src/MessageHandler.js'
 
 var rooms = {}
 
@@ -28,7 +29,6 @@ export class TestRoom {
     })
   }
   send (sender, receiver, m) {
-    m = JSON.parse(JSON.stringify(m))
     var user = this.users.get(receiver)
     if (user != null) {
       user.receiveMessage(sender, m)
@@ -81,14 +81,25 @@ export default function extendTestConnector (Y) {
       this.testRoom.leave(this)
       return super.disconnect()
     }
+    logBufferParsed () {
+      console.log(' === Logging buffer of user ' + this.userId + ' === ')
+      for (let [user, conn] of this.connections) {
+        console.log(` ${user}:`)
+        for (let i = 0; i < conn.buffer.length; i++) {
+          console.log(formatYjsMessage(conn.buffer[i]))
+        }
+      }
+    }
     reconnect () {
       this.testRoom.join(this)
       return super.reconnect()
     }
     send (uid, message) {
+      super.send(uid, message)
       this.testRoom.send(this.userId, uid, message)
     }
     broadcast (message) {
+      super.broadcast(message)
       this.testRoom.broadcast(this.userId, message)
     }
     async whenSynced (f) {