implemented awareness protocol and added cursor support

This commit is contained in:
Kevin Jahns
2018-11-09 00:13:30 +01:00
parent 31d6ef6296
commit aafe15757f
19 changed files with 391 additions and 100 deletions

View File

@@ -3,6 +3,9 @@
import * as Y from '../../src/index.js'
export * from '../../src/index.js'
const messageSync = 0
const messageAwareness = 1
const reconnectTimeout = 100
const setupWS = (doc, url) => {
@@ -12,10 +15,19 @@ const setupWS = (doc, url) => {
websocket.onmessage = event => {
const decoder = Y.createDecoder(event.data)
const encoder = Y.createEncoder()
doc.mux(() =>
Y.readMessage(decoder, encoder, doc)
)
if (Y.length(encoder) > 0) {
const messageType = Y.readVarUint(decoder)
switch (messageType) {
case messageSync:
Y.writeVarUint(encoder, messageSync)
doc.mux(() =>
Y.readSyncMessage(decoder, encoder, doc)
)
break
case messageAwareness:
Y.readAwarenessMessage(decoder, doc)
break
}
if (Y.length(encoder) > 1) {
websocket.send(Y.toBuffer(encoder))
}
}
@@ -34,8 +46,11 @@ const setupWS = (doc, url) => {
})
// always send sync step 1 when connected
const encoder = Y.createEncoder()
Y.writeVarUint(encoder, messageSync)
Y.writeSyncStep1(encoder, doc)
websocket.send(Y.toBuffer(encoder))
// force send stored awareness info
doc.setAwarenessField(null, null)
}
}
@@ -43,6 +58,7 @@ const broadcastUpdate = (y, transaction) => {
if (y.wsconnected && transaction.encodedStructsLen > 0) {
y.mux(() => {
const encoder = Y.createEncoder()
Y.writeVarUint(encoder, messageSync)
Y.writeUpdate(encoder, transaction.encodedStructsLen, transaction.encodedStructs)
y.ws.send(Y.toBuffer(encoder))
})
@@ -54,9 +70,29 @@ class WebsocketsSharedDocument extends Y.Y {
super()
this.wsconnected = false
this.mux = Y.createMutex()
this.ws = null
this._localAwarenessState = {}
this.awareness = new Map()
setupWS(this, url)
this.on('afterTransaction', broadcastUpdate)
}
getLocalAwarenessInfo () {
return this._localAwarenessState
}
getAwarenessInfo () {
return this.awareness
}
setAwarenessField (field, value) {
if (field !== null) {
this._localAwarenessState[field] = value
}
if (this.ws !== null) {
const encoder = Y.createEncoder()
Y.writeVarUint(encoder, messageAwareness)
Y.writeUsersStateChange(encoder, [{ userID: this.userID, state: this._localAwarenessState }])
this.ws.send(Y.toBuffer(encoder))
}
}
}
export default class WebsocketProvider {

View File

@@ -3,12 +3,16 @@ const WebSocket = require('ws')
const wss = new WebSocket.Server({ port: 1234 })
const docs = new Map()
const messageSync = 0
const messageAwareness = 1
const afterTransaction = (doc, transaction) => {
if (transaction.encodedStructsLen > 0) {
const encoder = Y.createEncoder()
Y.writeVarUint(encoder, messageSync)
Y.writeUpdate(encoder, transaction.encodedStructsLen, transaction.encodedStructs)
const message = Y.toBuffer(encoder)
doc.conns.forEach(conn => conn.send(message))
doc.conns.forEach((_, conn) => conn.send(message))
}
}
@@ -16,7 +20,12 @@ class WSSharedDoc extends Y.Y {
constructor () {
super()
this.mux = Y.createMutex()
this.conns = new Set()
/**
* Maps from conn to set of controlled user ids. Delete all user ids from awareness when this conn is closed
* @type {Map<Object, Set<number>>}
*/
this.conns = new Map()
this.awareness = new Map()
this.on('afterTransaction', afterTransaction)
}
}
@@ -24,9 +33,28 @@ class WSSharedDoc extends Y.Y {
const messageListener = (conn, doc, message) => {
const encoder = Y.createEncoder()
const decoder = Y.createDecoder(message)
Y.readMessage(decoder, encoder, doc)
if (Y.length(encoder) > 0) {
conn.send(Y.toBuffer(encoder))
const messageType = Y.readVarUint(decoder)
switch (messageType) {
case messageSync:
Y.writeVarUint(encoder, messageSync)
Y.readSyncMessage(decoder, encoder, doc)
if (Y.length(encoder) > 1) {
conn.send(Y.toBuffer(encoder))
}
break
case messageAwareness: {
Y.writeVarUint(encoder, messageAwareness)
const updates = Y.forwardAwarenessMessage(decoder, encoder)
updates.forEach(update => {
doc.awareness.set(update.userID, update.state)
doc.conns.get(conn).add(update.userID)
})
const buff = Y.toBuffer(encoder)
doc.conns.forEach((_, c) => {
c.send(buff)
})
break
}
}
}
@@ -38,16 +66,36 @@ const setupConnection = (conn, req) => {
doc = new WSSharedDoc()
docs.set(req.url.slice(1), doc)
}
doc.conns.add(conn)
doc.conns.set(conn, new Set())
// listen and reply to events
conn.on('message', message => messageListener(conn, doc, message))
conn.on('close', () =>
conn.on('close', () => {
const controlledIds = doc.conns.get(conn)
doc.conns.delete(conn)
)
const encoder = Y.createEncoder()
Y.writeVarUint(encoder, messageAwareness)
Y.writeUsersStateChange(encoder, Array.from(controlledIds).map(userID => {
doc.awareness.delete(userID)
return { userID, state: null }
}))
const buf = Y.toBuffer(encoder)
doc.conns.forEach((_, conn) => conn.send(buf))
})
// send sync step 1
const encoder = Y.createEncoder()
Y.writeVarUint(encoder, messageSync)
Y.writeSyncStep1(encoder, doc)
conn.send(Y.toBuffer(encoder))
if (doc.awareness.size > 0) {
const encoder = Y.createEncoder()
const userStates = []
doc.awareness.forEach((state, userID) => {
userStates.push({ state, userID })
})
Y.writeVarUint(encoder, messageAwareness)
Y.writeUsersStateChange(encoder, userStates)
conn.send(Y.toBuffer(encoder))
}
}
wss.on('connection', setupConnection)