implement generic broadcastchannel and apply it to websocket provider

This commit is contained in:
Kevin Jahns 2018-11-27 18:29:18 +01:00
parent ab3dba5b06
commit a2c51c36e9
6 changed files with 166 additions and 18 deletions

View File

@ -0,0 +1,9 @@
/* eslint-env browser */
const isDeployed = location.hostname === 'yjs.website'
if (!isDeployed) {
console.log('%cYjs: Start your local websocket server by running %c`npm run websocket-server`', 'color:blue', 'color: grey; font-weight: bold')
}
export const serverAddress = isDeployed ? 'wss://api.yjs.website' : 'ws://localhost:1234'

View File

@ -1,10 +1,40 @@
/* eslint-env browser */
/**
* @module binary
*/
import * as string from './string.js'
import * as globals from './globals.js'
export const BITS32 = 0xFFFFFFFF
export const BITS21 = (1 << 21) - 1
export const BITS16 = (1 << 16) - 1
export const BIT26 = 1 << 26
export const BIT32 = 1 << 32
/**
* @param {Uint8Array} bytes
* @return {string}
*/
export const toBase64 = bytes => {
let s = ''
for (let i = 0; i < bytes.byteLength; i++) {
s += string.fromCharCode(bytes[i])
}
return btoa(s)
}
/**
* @param {string} s
* @return {Uint8Array}
*/
export const fromBase64 = s => {
const a = atob(s)
const bytes = globals.createUint8ArrayFromLen(a.length)
for (let i = 0; i < a.length; i++) {
bytes[i] = a.charCodeAt(i)
}
return bytes
}

72
lib/broadcastchannel.js Normal file
View File

@ -0,0 +1,72 @@
/* eslint-env browser */
import * as binary from './binary.js'
import * as globals from './globals.js'
/**
* @typedef {Object} Channel
* @property {Set<Function>} Channel.subs
* @property {BC} Channel.bc
*/
/**
* @type {Map<string, Channel>}
*/
const channels = new Map()
class LocalStoragePolyfill {
constructor (room) {
this.room = room
this.onmessage = null
addEventListener('storage', e => e.key === room && this.onmessage !== null && this.onmessage({ data: binary.fromBase64(e.newValue) }))
}
/**
* @param {ArrayBuffer} data
*/
postMessage (buf) {
if (typeof localStorage !== 'undefined') {
localStorage.setItem(this.room, binary.toBase64(globals.createUint8ArrayFromArrayBuffer(buf)))
}
}
}
// Use BroadcastChannel or Polyfill
const BC = typeof BroadcastChannel === 'undefined' ? LocalStoragePolyfill : BroadcastChannel
/**
* @param {string} room
* @return {Channel}
*/
const getChannel = room => {
let c = channels.get(room)
if (c === undefined) {
const subs = new Set()
const bc = new BC(room)
bc.onmessage = e => subs.forEach(sub => sub(e.data))
c = {
bc, subs
}
channels.set(room, c)
}
return c
}
/**
* @function
* @param {string} room
* @param {Function} f
*/
export const subscribe = (room, f) => getChannel(room).subs.add(f)
/**
* Publish data to all subscribers (including subscribers on this tab)
*
* @function
* @param {string} room
* @param {ArrayBuffer} data
*/
export const publish = (room, data) => {
const c = getChannel(room)
c.bc.postMessage(data)
c.subs.forEach(sub => sub(data))
}

View File

@ -1 +1,5 @@
import * as idb from '../lib/idb.js'
const bc = new BroadcastChannel('ydb-client')
idb.openDB()

View File

@ -5,32 +5,42 @@
/* eslint-env browser */
import * as Y from '../../index.js'
export * from '../../index.js'
import * as bc from '../../lib/broadcastchannel.js'
const messageSync = 0
const messageAwareness = 1
const reconnectTimeout = 100
/**
* @param {WebsocketsSharedDocument} doc
* @param {ArrayBuffer} buf
* @return {Y.Encoder}
*/
const readMessage = (doc, buf) => {
const decoder = Y.createDecoder(buf)
const encoder = Y.createEncoder()
const messageType = Y.readVarUint(decoder)
switch (messageType) {
case messageSync:
Y.writeVarUint(encoder, messageSync)
doc.mux(() =>
Y.readSyncMessage(decoder, encoder, doc)
)
break
case messageAwareness:
Y.readAwarenessMessage(decoder, doc)
break
}
return encoder
}
const setupWS = (doc, url) => {
const websocket = new WebSocket(url)
websocket.binaryType = 'arraybuffer'
doc.ws = websocket
websocket.onmessage = event => {
const decoder = Y.createDecoder(event.data)
const encoder = Y.createEncoder()
const messageType = Y.readVarUint(decoder)
switch (messageType) {
case messageSync:
Y.writeVarUint(encoder, messageSync)
doc.mux(() =>
Y.readSyncMessage(decoder, encoder, doc)
)
break
case messageAwareness:
Y.readAwarenessMessage(decoder, doc)
break
}
const encoder = readMessage(doc, event.data)
if (Y.length(encoder) > 1) {
websocket.send(Y.toBuffer(encoder))
}
@ -59,12 +69,16 @@ const setupWS = (doc, url) => {
}
const broadcastUpdate = (y, transaction) => {
if (y.wsconnected && transaction.encodedStructsLen > 0) {
if (transaction.encodedStructsLen > 0) {
y.mux(() => {
const encoder = Y.createEncoder()
Y.writeVarUint(encoder, messageSync)
Y.writeUpdate(encoder, transaction.encodedStructsLen, transaction.encodedStructs)
y.ws.send(Y.toBuffer(encoder))
const buf = Y.toBuffer(encoder)
if (y.wsconnected) {
y.ws.send(buf)
}
bc.publish(y.url, buf)
})
}
}
@ -72,6 +86,7 @@ const broadcastUpdate = (y, transaction) => {
class WebsocketsSharedDocument extends Y.Y {
constructor (url) {
super()
this.url = url
this.wsconnected = false
this.mux = Y.createMutex()
this.ws = null
@ -79,6 +94,22 @@ class WebsocketsSharedDocument extends Y.Y {
this.awareness = new Map()
setupWS(this, url)
this.on('afterTransaction', broadcastUpdate)
this._bcSubscriber = data => {
const encoder = readMessage(this, data)
if (Y.length(encoder) > 1) {
this.mux(() => {
bc.publish(url, Y.toBuffer(encoder))
})
}
}
bc.subscribe(url, this._bcSubscriber)
// send sync step1 to bc
this.mux(() => {
const encoder = Y.createEncoder()
Y.writeVarUint(encoder, messageSync)
Y.writeSyncStep1(encoder, this)
bc.publish(url, Y.toBuffer(encoder))
})
}
getLocalAwarenessInfo () {
return this._localAwarenessState
@ -94,7 +125,8 @@ class WebsocketsSharedDocument extends Y.Y {
const encoder = Y.createEncoder()
Y.writeVarUint(encoder, messageAwareness)
Y.writeUsersStateChange(encoder, [{ userID: this.userID, state: this._localAwarenessState }])
this.ws.send(Y.toBuffer(encoder))
const buf = Y.toBuffer(encoder)
this.ws.send(buf)
}
}
}

View File

@ -10,6 +10,7 @@ import * as globals from '../../lib/globals.js'
import * as NamedEventHandler from './NamedEventHandler.js'
const bc = new BroadcastChannel('ydb-client')
/**
* @type {Map<string, Set<Function>>}
*/