permission protocol + reduce circular dependencies
This commit is contained in:
@@ -9,28 +9,37 @@ import * as bc from '../../lib/broadcastchannel.js'
|
||||
|
||||
const messageSync = 0
|
||||
const messageAwareness = 1
|
||||
const messageAuth = 2
|
||||
|
||||
const reconnectTimeout = 3000
|
||||
|
||||
/**
|
||||
* @param {WebsocketsSharedDocument} doc
|
||||
* @param {string} reason
|
||||
*/
|
||||
const permissionDeniedHandler = (doc, reason) => console.warn(`Permission denied to access ${doc.url}.\n${reason}`)
|
||||
|
||||
/**
|
||||
* @param {WebsocketsSharedDocument} doc
|
||||
* @param {ArrayBuffer} buf
|
||||
* @return {Y.Encoder}
|
||||
* @return {Y.encoding.Encoder}
|
||||
*/
|
||||
const readMessage = (doc, buf) => {
|
||||
const decoder = Y.createDecoder(buf)
|
||||
const encoder = Y.createEncoder()
|
||||
const messageType = Y.readVarUint(decoder)
|
||||
const decoder = Y.decoding.createDecoder(buf)
|
||||
const encoder = Y.encoding.createEncoder()
|
||||
const messageType = Y.decoding.readVarUint(decoder)
|
||||
switch (messageType) {
|
||||
case messageSync:
|
||||
Y.writeVarUint(encoder, messageSync)
|
||||
Y.encoding.writeVarUint(encoder, messageSync)
|
||||
doc.mux(() =>
|
||||
Y.readSyncMessage(decoder, encoder, doc)
|
||||
Y.syncProtocol.readSyncMessage(decoder, encoder, doc)
|
||||
)
|
||||
break
|
||||
case messageAwareness:
|
||||
Y.readAwarenessMessage(decoder, doc)
|
||||
Y.awarenessProtocol.readAwarenessMessage(decoder, doc)
|
||||
break
|
||||
case messageAuth:
|
||||
Y.authProtocol.readAuthMessage(decoder, doc, permissionDeniedHandler)
|
||||
}
|
||||
return encoder
|
||||
}
|
||||
@@ -41,8 +50,8 @@ const setupWS = (doc, url) => {
|
||||
doc.ws = websocket
|
||||
websocket.onmessage = event => {
|
||||
const encoder = readMessage(doc, event.data)
|
||||
if (Y.length(encoder) > 1) {
|
||||
websocket.send(Y.toBuffer(encoder))
|
||||
if (Y.encoding.length(encoder) > 1) {
|
||||
websocket.send(Y.encoding.toBuffer(encoder))
|
||||
}
|
||||
}
|
||||
websocket.onclose = () => {
|
||||
@@ -59,10 +68,10 @@ const setupWS = (doc, url) => {
|
||||
status: 'disconnected'
|
||||
})
|
||||
// always send sync step 1 when connected
|
||||
const encoder = Y.createEncoder()
|
||||
Y.writeVarUint(encoder, messageSync)
|
||||
Y.writeSyncStep1(encoder, doc)
|
||||
websocket.send(Y.toBuffer(encoder))
|
||||
const encoder = Y.encoding.createEncoder()
|
||||
Y.encoding.writeVarUint(encoder, messageSync)
|
||||
Y.syncProtocol.writeSyncStep1(encoder, doc)
|
||||
websocket.send(Y.encoding.toBuffer(encoder))
|
||||
// force send stored awareness info
|
||||
doc.setAwarenessField(null, null)
|
||||
}
|
||||
@@ -71,10 +80,10 @@ const setupWS = (doc, url) => {
|
||||
const broadcastUpdate = (y, transaction) => {
|
||||
if (transaction.encodedStructsLen > 0) {
|
||||
y.mux(() => {
|
||||
const encoder = Y.createEncoder()
|
||||
Y.writeVarUint(encoder, messageSync)
|
||||
Y.writeUpdate(encoder, transaction.encodedStructsLen, transaction.encodedStructs)
|
||||
const buf = Y.toBuffer(encoder)
|
||||
const encoder = Y.encoding.createEncoder()
|
||||
Y.encoding.writeVarUint(encoder, messageSync)
|
||||
Y.syncProtocol.writeUpdate(encoder, transaction.encodedStructsLen, transaction.encodedStructs)
|
||||
const buf = Y.encoding.toBuffer(encoder)
|
||||
if (y.wsconnected) {
|
||||
y.ws.send(buf)
|
||||
}
|
||||
@@ -95,20 +104,20 @@ class WebsocketsSharedDocument extends Y.Y {
|
||||
setupWS(this, url)
|
||||
this.on('afterTransaction', broadcastUpdate)
|
||||
this._bcSubscriber = data => {
|
||||
const encoder = readMessage(this, data)
|
||||
if (Y.length(encoder) > 1) {
|
||||
this.mux(() => {
|
||||
bc.publish(url, Y.toBuffer(encoder))
|
||||
})
|
||||
}
|
||||
this.mux(() => {
|
||||
const encoder = readMessage(this, data)
|
||||
if (Y.encoding.length(encoder) > 1) {
|
||||
bc.publish(url, Y.encoding.toBuffer(encoder))
|
||||
}
|
||||
})
|
||||
}
|
||||
bc.subscribe(url, this._bcSubscriber)
|
||||
// send sync step1 to bc
|
||||
this.mux(() => {
|
||||
const encoder = Y.createEncoder()
|
||||
Y.writeVarUint(encoder, messageSync)
|
||||
Y.writeSyncStep1(encoder, this)
|
||||
bc.publish(url, Y.toBuffer(encoder))
|
||||
const encoder = Y.encoding.createEncoder()
|
||||
Y.encoding.writeVarUint(encoder, messageSync)
|
||||
Y.syncProtocol.writeSyncStep1(encoder, this)
|
||||
bc.publish(url, Y.encoding.toBuffer(encoder))
|
||||
})
|
||||
}
|
||||
getLocalAwarenessInfo () {
|
||||
@@ -122,10 +131,10 @@ class WebsocketsSharedDocument extends Y.Y {
|
||||
this._localAwarenessState[field] = value
|
||||
}
|
||||
if (this.wsconnected) {
|
||||
const encoder = Y.createEncoder()
|
||||
Y.writeVarUint(encoder, messageAwareness)
|
||||
Y.writeUsersStateChange(encoder, [{ userID: this.userID, state: this._localAwarenessState }])
|
||||
const buf = Y.toBuffer(encoder)
|
||||
const encoder = Y.encoding.createEncoder()
|
||||
Y.encoding.writeVarUint(encoder, messageAwareness)
|
||||
Y.awarenessProtocol.writeUsersStateChange(encoder, [{ userID: this.userID, state: this._localAwarenessState }])
|
||||
const buf = Y.encoding.toBuffer(encoder)
|
||||
this.ws.send(buf)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,13 +19,14 @@ const docs = new Map()
|
||||
|
||||
const messageSync = 0
|
||||
const messageAwareness = 1
|
||||
const messageAuth = 2
|
||||
|
||||
const afterTransaction = (doc, transaction) => {
|
||||
if (transaction.encodedStructsLen > 0) {
|
||||
const encoder = Y.createEncoder()
|
||||
Y.writeVarUint(encoder, messageSync)
|
||||
Y.writeUpdate(encoder, transaction.encodedStructsLen, transaction.encodedStructs)
|
||||
const message = Y.toBuffer(encoder)
|
||||
const encoder = Y.encoding.createEncoder()
|
||||
Y.encoding.writeVarUint(encoder, messageSync)
|
||||
Y.syncProtocol.writeUpdate(encoder, transaction.encodedStructsLen, transaction.encodedStructs)
|
||||
const message = Y.encoding.toBuffer(encoder)
|
||||
doc.conns.forEach((_, conn) => conn.send(message))
|
||||
}
|
||||
}
|
||||
@@ -45,25 +46,25 @@ class WSSharedDoc extends Y.Y {
|
||||
}
|
||||
|
||||
const messageListener = (conn, doc, message) => {
|
||||
const encoder = Y.createEncoder()
|
||||
const decoder = Y.createDecoder(message)
|
||||
const messageType = Y.readVarUint(decoder)
|
||||
const encoder = Y.encoding.createEncoder()
|
||||
const decoder = Y.decoding.createDecoder(message)
|
||||
const messageType = Y.decoding.readVarUint(decoder)
|
||||
switch (messageType) {
|
||||
case messageSync:
|
||||
Y.writeVarUint(encoder, messageSync)
|
||||
Y.readSyncMessage(decoder, encoder, doc)
|
||||
if (Y.length(encoder) > 1) {
|
||||
conn.send(Y.toBuffer(encoder))
|
||||
Y.encoding.writeVarUint(encoder, messageSync)
|
||||
Y.syncProtocol.readSyncMessage(decoder, encoder, doc)
|
||||
if (Y.encoding.length(encoder) > 1) {
|
||||
conn.send(Y.encoding.toBuffer(encoder))
|
||||
}
|
||||
break
|
||||
case messageAwareness: {
|
||||
Y.writeVarUint(encoder, messageAwareness)
|
||||
const updates = Y.forwardAwarenessMessage(decoder, encoder)
|
||||
Y.encoding.writeVarUint(encoder, messageAwareness)
|
||||
const updates = Y.awarenessProtocol.forwardAwarenessMessage(decoder, encoder)
|
||||
updates.forEach(update => {
|
||||
doc.awareness.set(update.userID, update.state)
|
||||
doc.conns.get(conn).add(update.userID)
|
||||
})
|
||||
const buff = Y.toBuffer(encoder)
|
||||
const buff = Y.encoding.toBuffer(encoder)
|
||||
doc.conns.forEach((_, c) => {
|
||||
c.send(buff)
|
||||
})
|
||||
@@ -86,29 +87,29 @@ const setupConnection = (conn, req) => {
|
||||
conn.on('close', () => {
|
||||
const controlledIds = doc.conns.get(conn)
|
||||
doc.conns.delete(conn)
|
||||
const encoder = Y.createEncoder()
|
||||
Y.writeVarUint(encoder, messageAwareness)
|
||||
Y.writeUsersStateChange(encoder, Array.from(controlledIds).map(userID => {
|
||||
const encoder = Y.encoding.createEncoder()
|
||||
Y.encoding.writeVarUint(encoder, messageAwareness)
|
||||
Y.awarenessProtocol.writeUsersStateChange(encoder, Array.from(controlledIds).map(userID => {
|
||||
doc.awareness.delete(userID)
|
||||
return { userID, state: null }
|
||||
}))
|
||||
const buf = Y.toBuffer(encoder)
|
||||
const buf = Y.encoding.toBuffer(encoder)
|
||||
doc.conns.forEach((_, conn) => conn.send(buf))
|
||||
})
|
||||
// send sync step 1
|
||||
const encoder = Y.createEncoder()
|
||||
Y.writeVarUint(encoder, messageSync)
|
||||
Y.writeSyncStep1(encoder, doc)
|
||||
conn.send(Y.toBuffer(encoder))
|
||||
const encoder = Y.encoding.createEncoder()
|
||||
Y.encoding.writeVarUint(encoder, messageSync)
|
||||
Y.syncProtocol.writeSyncStep1(encoder, doc)
|
||||
conn.send(Y.encoding.toBuffer(encoder))
|
||||
if (doc.awareness.size > 0) {
|
||||
const encoder = Y.createEncoder()
|
||||
const encoder = Y.encoding.createEncoder()
|
||||
const userStates = []
|
||||
doc.awareness.forEach((state, userID) => {
|
||||
userStates.push({ state, userID })
|
||||
})
|
||||
Y.writeVarUint(encoder, messageAwareness)
|
||||
Y.writeUsersStateChange(encoder, userStates)
|
||||
conn.send(Y.toBuffer(encoder))
|
||||
Y.encoding.writeVarUint(encoder, messageAwareness)
|
||||
Y.awarenessProtocol.writeUsersStateChange(encoder, userStates)
|
||||
conn.send(Y.encoding.toBuffer(encoder))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user