add clock vector to awareness protocol

This commit is contained in:
Kevin Jahns 2018-12-22 15:50:41 +01:00
parent f1eb66655b
commit ec58a99748
3 changed files with 26 additions and 8 deletions

View File

@ -11,6 +11,7 @@ const messageUsersStateChanged = 0
/** /**
* @typedef {Object} UserStateUpdate * @typedef {Object} UserStateUpdate
* @property {number} UserStateUpdate.userID * @property {number} UserStateUpdate.userID
* @property {number} UserStateUpdate.clock
* @property {Object} UserStateUpdate.state * @property {Object} UserStateUpdate.state
*/ */
@ -23,8 +24,9 @@ export const writeUsersStateChange = (encoder, stateUpdates) => {
encoding.writeVarUint(encoder, messageUsersStateChanged) encoding.writeVarUint(encoder, messageUsersStateChanged)
encoding.writeVarUint(encoder, len) encoding.writeVarUint(encoder, len)
for (let i = 0; i < len; i++) { for (let i = 0; i < len; i++) {
const {userID, state} = stateUpdates[i] const {userID, state, clock} = stateUpdates[i]
encoding.writeVarUint(encoder, userID) encoding.writeVarUint(encoder, userID)
encoding.writeVarUint(encoder, clock)
encoding.writeVarString(encoder, JSON.stringify(state)) encoding.writeVarString(encoder, JSON.stringify(state))
} }
} }
@ -36,20 +38,25 @@ export const readUsersStateChange = (decoder, y) => {
const len = decoding.readVarUint(decoder) const len = decoding.readVarUint(decoder)
for (let i = 0; i < len; i++) { for (let i = 0; i < len; i++) {
const userID = decoding.readVarUint(decoder) const userID = decoding.readVarUint(decoder)
const clock = decoding.readVarUint(decoder)
const state = JSON.parse(decoding.readVarString(decoder)) const state = JSON.parse(decoding.readVarString(decoder))
if (userID !== y.userID) { if (userID !== y.userID) {
const uClock = y.awarenessClock.get(userID) || 0
y.awarenessClock.set(userID, clock)
if (state === null) { if (state === null) {
if (y.awareness.has(userID)) { // only write if clock increases. cannot overwrite
if (y.awareness.has(userID) && uClock < clock) {
y.awareness.delete(userID) y.awareness.delete(userID)
removed.push(userID) removed.push(userID)
} }
} else { } else if (uClock <= clock) { // allow to overwrite (e.g. when client was on, then offline)
if (y.awareness.has(userID)) { if (y.awareness.has(userID)) {
updated.push(userID) updated.push(userID)
} else { } else {
added.push(userID) added.push(userID)
} }
y.awareness.set(userID, state) y.awareness.set(userID, state)
y.awarenessClock.set(userID, clock)
} }
} }
} }
@ -63,6 +70,7 @@ export const readUsersStateChange = (decoder, y) => {
/** /**
* @param {decoding.Decoder} decoder * @param {decoding.Decoder} decoder
* @param {encoding.Encoder} encoder * @param {encoding.Encoder} encoder
* @return {Array<UserStateUpdate>}
*/ */
export const forwardUsersStateChange = (decoder, encoder) => { export const forwardUsersStateChange = (decoder, encoder) => {
const len = decoding.readVarUint(decoder) const len = decoding.readVarUint(decoder)
@ -71,10 +79,12 @@ export const forwardUsersStateChange = (decoder, encoder) => {
encoding.writeVarUint(encoder, len) encoding.writeVarUint(encoder, len)
for (let i = 0; i < len; i++) { for (let i = 0; i < len; i++) {
const userID = decoding.readVarUint(decoder) const userID = decoding.readVarUint(decoder)
const clock = decoding.readVarUint(decoder)
const state = decoding.readVarString(decoder) const state = decoding.readVarString(decoder)
encoding.writeVarUint(encoder, userID) encoding.writeVarUint(encoder, userID)
encoding.writeVarUint(encoder, clock)
encoding.writeVarString(encoder, state) encoding.writeVarString(encoder, state)
updates.push({userID, state: JSON.parse(state)}) updates.push({userID, state: JSON.parse(state), clock})
} }
return updates return updates
} }
@ -95,6 +105,7 @@ export const readAwarenessMessage = (decoder, y) => {
* @typedef {Object} UserState * @typedef {Object} UserState
* @property {number} UserState.userID * @property {number} UserState.userID
* @property {any} UserState.state * @property {any} UserState.state
* @property {number} UserState.clock
*/ */
/** /**

View File

@ -105,6 +105,7 @@ class WebsocketsSharedDocument extends Y.Y {
this.ws = null this.ws = null
this._localAwarenessState = {} this._localAwarenessState = {}
this.awareness = new Map() this.awareness = new Map()
this.awarenessClock = new Map()
setupWS(this, url) setupWS(this, url)
this.on('afterTransaction', broadcastUpdate) this.on('afterTransaction', broadcastUpdate)
this._bcSubscriber = data => { this._bcSubscriber = data => {
@ -135,9 +136,11 @@ class WebsocketsSharedDocument extends Y.Y {
this._localAwarenessState[field] = value this._localAwarenessState[field] = value
} }
if (this.wsconnected) { if (this.wsconnected) {
const clock = (this.awarenessClock.get(this.userID) || 0) + 1
this.awarenessClock.set(this.userID, clock)
const encoder = Y.encoding.createEncoder() const encoder = Y.encoding.createEncoder()
Y.encoding.writeVarUint(encoder, messageAwareness) Y.encoding.writeVarUint(encoder, messageAwareness)
Y.awarenessProtocol.writeUsersStateChange(encoder, [{ userID: this.userID, state: this._localAwarenessState }]) Y.awarenessProtocol.writeUsersStateChange(encoder, [{ userID: this.userID, state: this._localAwarenessState, clock }])
const buf = Y.encoding.toBuffer(encoder) const buf = Y.encoding.toBuffer(encoder)
this.ws.send(buf) this.ws.send(buf)
} }

View File

@ -14,7 +14,7 @@ const port = process.env.PORT || 1234
const persistenceDir = process.env.YPERSISTENCE const persistenceDir = process.env.YPERSISTENCE
let persistence = null let persistence = null
if (typeof persistenceDir) { if (typeof persistenceDir === 'string') {
const LevelDbPersistence = require('../../persistence/leveldb.js').LevelDbPersistence const LevelDbPersistence = require('../../persistence/leveldb.js').LevelDbPersistence
persistence = new LevelDbPersistence(persistenceDir) persistence = new LevelDbPersistence(persistenceDir)
} }
@ -52,6 +52,7 @@ class WSSharedDoc extends Y.Y {
*/ */
this.conns = new Map() this.conns = new Map()
this.awareness = new Map() this.awareness = new Map()
this.awarenessClock = new Map()
this.on('afterTransaction', afterTransaction) this.on('afterTransaction', afterTransaction)
} }
} }
@ -73,6 +74,7 @@ const messageListener = (conn, doc, message) => {
const updates = Y.awarenessProtocol.forwardAwarenessMessage(decoder, encoder) const updates = Y.awarenessProtocol.forwardAwarenessMessage(decoder, encoder)
updates.forEach(update => { updates.forEach(update => {
doc.awareness.set(update.userID, update.state) doc.awareness.set(update.userID, update.state)
doc.awarenessClock.set(update.userID, update.clock)
doc.conns.get(conn).add(update.userID) doc.conns.get(conn).add(update.userID)
}) })
const buff = Y.encoding.toBuffer(encoder) const buff = Y.encoding.toBuffer(encoder)
@ -105,8 +107,10 @@ const setupConnection = (conn, req) => {
const encoder = Y.encoding.createEncoder() const encoder = Y.encoding.createEncoder()
Y.encoding.writeVarUint(encoder, messageAwareness) Y.encoding.writeVarUint(encoder, messageAwareness)
Y.awarenessProtocol.writeUsersStateChange(encoder, Array.from(controlledIds).map(userID => { Y.awarenessProtocol.writeUsersStateChange(encoder, Array.from(controlledIds).map(userID => {
const clock = (doc.awarenessClock.get(userID) || 0) + 1
doc.awareness.delete(userID) doc.awareness.delete(userID)
return { userID, state: null } doc.awarenessClock.delete(userID)
return { userID, state: null, clock }
})) }))
const buf = Y.encoding.toBuffer(encoder) const buf = Y.encoding.toBuffer(encoder)
doc.conns.forEach((_, conn) => conn.send(buf)) doc.conns.forEach((_, conn) => conn.send(buf))
@ -127,7 +131,7 @@ const setupConnection = (conn, req) => {
const encoder = Y.encoding.createEncoder() const encoder = Y.encoding.createEncoder()
const userStates = [] const userStates = []
doc.awareness.forEach((state, userID) => { doc.awareness.forEach((state, userID) => {
userStates.push({ state, userID }) userStates.push({ state, userID, clock: (doc.awarenessClock.get(userID) || 0) })
}) })
Y.encoding.writeVarUint(encoder, messageAwareness) Y.encoding.writeVarUint(encoder, messageAwareness)
Y.awarenessProtocol.writeUsersStateChange(encoder, userStates) Y.awarenessProtocol.writeUsersStateChange(encoder, userStates)