diff --git a/examples/exampleConfig.js b/examples/exampleConfig.js new file mode 100644 index 00000000..566ed9b5 --- /dev/null +++ b/examples/exampleConfig.js @@ -0,0 +1,9 @@ +/* eslint-env browser */ + +const isDeployed = location.hostname === 'yjs.website' + +if (!isDeployed) { + console.log('%cYjs: Start your local websocket server by running %c`npm run websocket-server`', 'color:blue', 'color: grey; font-weight: bold') +} + +export const serverAddress = isDeployed ? 'wss://api.yjs.website' : 'ws://localhost:1234' diff --git a/lib/binary.js b/lib/binary.js index d554ba65..645f5f87 100644 --- a/lib/binary.js +++ b/lib/binary.js @@ -1,10 +1,40 @@ +/* eslint-env browser */ + /** * @module binary */ +import * as string from './string.js' +import * as globals from './globals.js' + export const BITS32 = 0xFFFFFFFF export const BITS21 = (1 << 21) - 1 export const BITS16 = (1 << 16) - 1 export const BIT26 = 1 << 26 export const BIT32 = 1 << 32 + +/** + * @param {Uint8Array} bytes + * @return {string} + */ +export const toBase64 = bytes => { + let s = '' + for (let i = 0; i < bytes.byteLength; i++) { + s += string.fromCharCode(bytes[i]) + } + return btoa(s) +} + +/** + * @param {string} s + * @return {Uint8Array} + */ +export const fromBase64 = s => { + const a = atob(s) + const bytes = globals.createUint8ArrayFromLen(a.length) + for (let i = 0; i < a.length; i++) { + bytes[i] = a.charCodeAt(i) + } + return bytes +} diff --git a/lib/broadcastchannel.js b/lib/broadcastchannel.js new file mode 100644 index 00000000..2cee4448 --- /dev/null +++ b/lib/broadcastchannel.js @@ -0,0 +1,72 @@ +/* eslint-env browser */ + +import * as binary from './binary.js' +import * as globals from './globals.js' + +/** + * @typedef {Object} Channel + * @property {Set} Channel.subs + * @property {BC} Channel.bc + */ + +/** + * @type {Map} + */ +const channels = new Map() + +class LocalStoragePolyfill { + constructor (room) { + this.room = room + this.onmessage = null + addEventListener('storage', e => e.key === room && this.onmessage !== null && this.onmessage({ data: binary.fromBase64(e.newValue) })) + } + /** + * @param {ArrayBuffer} data + */ + postMessage (buf) { + if (typeof localStorage !== 'undefined') { + localStorage.setItem(this.room, binary.toBase64(globals.createUint8ArrayFromArrayBuffer(buf))) + } + } +} + +// Use BroadcastChannel or Polyfill +const BC = typeof BroadcastChannel === 'undefined' ? LocalStoragePolyfill : BroadcastChannel + +/** + * @param {string} room + * @return {Channel} + */ +const getChannel = room => { + let c = channels.get(room) + if (c === undefined) { + const subs = new Set() + const bc = new BC(room) + bc.onmessage = e => subs.forEach(sub => sub(e.data)) + c = { + bc, subs + } + channels.set(room, c) + } + return c +} + +/** + * @function + * @param {string} room + * @param {Function} f + */ +export const subscribe = (room, f) => getChannel(room).subs.add(f) + +/** + * Publish data to all subscribers (including subscribers on this tab) + * + * @function + * @param {string} room + * @param {ArrayBuffer} data + */ +export const publish = (room, data) => { + const c = getChannel(room) + c.bc.postMessage(data) + c.subs.forEach(sub => sub(data)) +} diff --git a/persistences/indexeddb.js b/persistences/indexeddb.js index 5b40c481..b5d8490a 100644 --- a/persistences/indexeddb.js +++ b/persistences/indexeddb.js @@ -1 +1,5 @@ import * as idb from '../lib/idb.js' + +const bc = new BroadcastChannel('ydb-client') + +idb.openDB() \ No newline at end of file diff --git a/provider/websocket/WebSocketProvider.js b/provider/websocket/WebSocketProvider.js index 68f94e7a..676fbbfa 100644 --- a/provider/websocket/WebSocketProvider.js +++ b/provider/websocket/WebSocketProvider.js @@ -5,32 +5,42 @@ /* eslint-env browser */ import * as Y from '../../index.js' -export * from '../../index.js' +import * as bc from '../../lib/broadcastchannel.js' const messageSync = 0 const messageAwareness = 1 const reconnectTimeout = 100 +/** + * @param {WebsocketsSharedDocument} doc + * @param {ArrayBuffer} buf + * @return {Y.Encoder} + */ +const readMessage = (doc, buf) => { + const decoder = Y.createDecoder(buf) + const encoder = Y.createEncoder() + const messageType = Y.readVarUint(decoder) + switch (messageType) { + case messageSync: + Y.writeVarUint(encoder, messageSync) + doc.mux(() => + Y.readSyncMessage(decoder, encoder, doc) + ) + break + case messageAwareness: + Y.readAwarenessMessage(decoder, doc) + break + } + return encoder +} + const setupWS = (doc, url) => { const websocket = new WebSocket(url) websocket.binaryType = 'arraybuffer' doc.ws = websocket websocket.onmessage = event => { - const decoder = Y.createDecoder(event.data) - const encoder = Y.createEncoder() - const messageType = Y.readVarUint(decoder) - switch (messageType) { - case messageSync: - Y.writeVarUint(encoder, messageSync) - doc.mux(() => - Y.readSyncMessage(decoder, encoder, doc) - ) - break - case messageAwareness: - Y.readAwarenessMessage(decoder, doc) - break - } + const encoder = readMessage(doc, event.data) if (Y.length(encoder) > 1) { websocket.send(Y.toBuffer(encoder)) } @@ -59,12 +69,16 @@ const setupWS = (doc, url) => { } const broadcastUpdate = (y, transaction) => { - if (y.wsconnected && transaction.encodedStructsLen > 0) { + if (transaction.encodedStructsLen > 0) { y.mux(() => { const encoder = Y.createEncoder() Y.writeVarUint(encoder, messageSync) Y.writeUpdate(encoder, transaction.encodedStructsLen, transaction.encodedStructs) - y.ws.send(Y.toBuffer(encoder)) + const buf = Y.toBuffer(encoder) + if (y.wsconnected) { + y.ws.send(buf) + } + bc.publish(y.url, buf) }) } } @@ -72,6 +86,7 @@ const broadcastUpdate = (y, transaction) => { class WebsocketsSharedDocument extends Y.Y { constructor (url) { super() + this.url = url this.wsconnected = false this.mux = Y.createMutex() this.ws = null @@ -79,6 +94,22 @@ class WebsocketsSharedDocument extends Y.Y { this.awareness = new Map() setupWS(this, url) this.on('afterTransaction', broadcastUpdate) + this._bcSubscriber = data => { + const encoder = readMessage(this, data) + if (Y.length(encoder) > 1) { + this.mux(() => { + bc.publish(url, Y.toBuffer(encoder)) + }) + } + } + bc.subscribe(url, this._bcSubscriber) + // send sync step1 to bc + this.mux(() => { + const encoder = Y.createEncoder() + Y.writeVarUint(encoder, messageSync) + Y.writeSyncStep1(encoder, this) + bc.publish(url, Y.toBuffer(encoder)) + }) } getLocalAwarenessInfo () { return this._localAwarenessState @@ -94,7 +125,8 @@ class WebsocketsSharedDocument extends Y.Y { const encoder = Y.createEncoder() Y.writeVarUint(encoder, messageAwareness) Y.writeUsersStateChange(encoder, [{ userID: this.userID, state: this._localAwarenessState }]) - this.ws.send(Y.toBuffer(encoder)) + const buf = Y.toBuffer(encoder) + this.ws.send(buf) } } } diff --git a/provider/ydb/broadcastchannel.js b/provider/ydb/broadcastchannel.js index f25354e4..38ec88b9 100644 --- a/provider/ydb/broadcastchannel.js +++ b/provider/ydb/broadcastchannel.js @@ -10,6 +10,7 @@ import * as globals from '../../lib/globals.js' import * as NamedEventHandler from './NamedEventHandler.js' const bc = new BroadcastChannel('ydb-client') + /** * @type {Map>} */