From ec58a99748ae2755fb9ae2f892e75470178e99d8 Mon Sep 17 00:00:00 2001 From: Kevin Jahns Date: Sat, 22 Dec 2018 15:50:41 +0100 Subject: [PATCH] add clock vector to awareness protocol --- protocols/awareness.js | 19 +++++++++++++++---- provider/websocket/WebSocketProvider.js | 5 ++++- provider/websocket/server.js | 10 +++++++--- 3 files changed, 26 insertions(+), 8 deletions(-) diff --git a/protocols/awareness.js b/protocols/awareness.js index 3291d92b..4414d0f8 100644 --- a/protocols/awareness.js +++ b/protocols/awareness.js @@ -11,6 +11,7 @@ const messageUsersStateChanged = 0 /** * @typedef {Object} UserStateUpdate * @property {number} UserStateUpdate.userID + * @property {number} UserStateUpdate.clock * @property {Object} UserStateUpdate.state */ @@ -23,8 +24,9 @@ export const writeUsersStateChange = (encoder, stateUpdates) => { encoding.writeVarUint(encoder, messageUsersStateChanged) encoding.writeVarUint(encoder, len) for (let i = 0; i < len; i++) { - const {userID, state} = stateUpdates[i] + const {userID, state, clock} = stateUpdates[i] encoding.writeVarUint(encoder, userID) + encoding.writeVarUint(encoder, clock) encoding.writeVarString(encoder, JSON.stringify(state)) } } @@ -36,20 +38,25 @@ export const readUsersStateChange = (decoder, y) => { const len = decoding.readVarUint(decoder) for (let i = 0; i < len; i++) { const userID = decoding.readVarUint(decoder) + const clock = decoding.readVarUint(decoder) const state = JSON.parse(decoding.readVarString(decoder)) if (userID !== y.userID) { + const uClock = y.awarenessClock.get(userID) || 0 + y.awarenessClock.set(userID, clock) if (state === null) { - if (y.awareness.has(userID)) { + // only write if clock increases. cannot overwrite + if (y.awareness.has(userID) && uClock < clock) { y.awareness.delete(userID) removed.push(userID) } - } else { + } else if (uClock <= clock) { // allow to overwrite (e.g. when client was on, then offline) if (y.awareness.has(userID)) { updated.push(userID) } else { added.push(userID) } y.awareness.set(userID, state) + y.awarenessClock.set(userID, clock) } } } @@ -63,6 +70,7 @@ export const readUsersStateChange = (decoder, y) => { /** * @param {decoding.Decoder} decoder * @param {encoding.Encoder} encoder + * @return {Array} */ export const forwardUsersStateChange = (decoder, encoder) => { const len = decoding.readVarUint(decoder) @@ -71,10 +79,12 @@ export const forwardUsersStateChange = (decoder, encoder) => { encoding.writeVarUint(encoder, len) for (let i = 0; i < len; i++) { const userID = decoding.readVarUint(decoder) + const clock = decoding.readVarUint(decoder) const state = decoding.readVarString(decoder) encoding.writeVarUint(encoder, userID) + encoding.writeVarUint(encoder, clock) encoding.writeVarString(encoder, state) - updates.push({userID, state: JSON.parse(state)}) + updates.push({userID, state: JSON.parse(state), clock}) } return updates } @@ -95,6 +105,7 @@ export const readAwarenessMessage = (decoder, y) => { * @typedef {Object} UserState * @property {number} UserState.userID * @property {any} UserState.state + * @property {number} UserState.clock */ /** diff --git a/provider/websocket/WebSocketProvider.js b/provider/websocket/WebSocketProvider.js index 04ba6637..f10f1ce9 100644 --- a/provider/websocket/WebSocketProvider.js +++ b/provider/websocket/WebSocketProvider.js @@ -105,6 +105,7 @@ class WebsocketsSharedDocument extends Y.Y { this.ws = null this._localAwarenessState = {} this.awareness = new Map() + this.awarenessClock = new Map() setupWS(this, url) this.on('afterTransaction', broadcastUpdate) this._bcSubscriber = data => { @@ -135,9 +136,11 @@ class WebsocketsSharedDocument extends Y.Y { this._localAwarenessState[field] = value } if (this.wsconnected) { + const clock = (this.awarenessClock.get(this.userID) || 0) + 1 + this.awarenessClock.set(this.userID, clock) const encoder = Y.encoding.createEncoder() Y.encoding.writeVarUint(encoder, messageAwareness) - Y.awarenessProtocol.writeUsersStateChange(encoder, [{ userID: this.userID, state: this._localAwarenessState }]) + Y.awarenessProtocol.writeUsersStateChange(encoder, [{ userID: this.userID, state: this._localAwarenessState, clock }]) const buf = Y.encoding.toBuffer(encoder) this.ws.send(buf) } diff --git a/provider/websocket/server.js b/provider/websocket/server.js index cc601108..77e9842c 100644 --- a/provider/websocket/server.js +++ b/provider/websocket/server.js @@ -14,7 +14,7 @@ const port = process.env.PORT || 1234 const persistenceDir = process.env.YPERSISTENCE let persistence = null -if (typeof persistenceDir) { +if (typeof persistenceDir === 'string') { const LevelDbPersistence = require('../../persistence/leveldb.js').LevelDbPersistence persistence = new LevelDbPersistence(persistenceDir) } @@ -52,6 +52,7 @@ class WSSharedDoc extends Y.Y { */ this.conns = new Map() this.awareness = new Map() + this.awarenessClock = new Map() this.on('afterTransaction', afterTransaction) } } @@ -73,6 +74,7 @@ const messageListener = (conn, doc, message) => { const updates = Y.awarenessProtocol.forwardAwarenessMessage(decoder, encoder) updates.forEach(update => { doc.awareness.set(update.userID, update.state) + doc.awarenessClock.set(update.userID, update.clock) doc.conns.get(conn).add(update.userID) }) const buff = Y.encoding.toBuffer(encoder) @@ -105,8 +107,10 @@ const setupConnection = (conn, req) => { const encoder = Y.encoding.createEncoder() Y.encoding.writeVarUint(encoder, messageAwareness) Y.awarenessProtocol.writeUsersStateChange(encoder, Array.from(controlledIds).map(userID => { + const clock = (doc.awarenessClock.get(userID) || 0) + 1 doc.awareness.delete(userID) - return { userID, state: null } + doc.awarenessClock.delete(userID) + return { userID, state: null, clock } })) const buf = Y.encoding.toBuffer(encoder) doc.conns.forEach((_, conn) => conn.send(buf)) @@ -127,7 +131,7 @@ const setupConnection = (conn, req) => { const encoder = Y.encoding.createEncoder() const userStates = [] doc.awareness.forEach((state, userID) => { - userStates.push({ state, userID }) + userStates.push({ state, userID, clock: (doc.awarenessClock.get(userID) || 0) }) }) Y.encoding.writeVarUint(encoder, messageAwareness) Y.awarenessProtocol.writeUsersStateChange(encoder, userStates)