From 67bbc0a3fe3f03e4a526c6d0c42f578fde7a89cb Mon Sep 17 00:00:00 2001 From: Kevin Jahns Date: Tue, 30 Oct 2018 00:51:09 +0100 Subject: [PATCH] implemented websocket provider --- .gitignore | 1 + examples/textarea/index.js | 34 ++++------ package-lock.json | 17 ++++- package.json | 3 + provider/websocket/WebSocketProvider.js | 85 +++++++++++++++++++++++++ provider/websocket/server.js | 53 +++++++++++++++ rollup.node.js | 26 +++----- src/index.js | 4 ++ 8 files changed, 180 insertions(+), 43 deletions(-) create mode 100644 provider/websocket/WebSocketProvider.js create mode 100644 provider/websocket/server.js diff --git a/.gitignore b/.gitignore index 379570ed..ef6b38b4 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,4 @@ docs /examples/yjs-dist.js* .vscode .yjsPersisted +build \ No newline at end of file diff --git a/examples/textarea/index.js b/examples/textarea/index.js index db3eedea..99a949cc 100644 --- a/examples/textarea/index.js +++ b/examples/textarea/index.js @@ -1,25 +1,13 @@ -import { createYdbClient } from '../../YdbClient/index.js' -import Y from '../../src/Y.dist.js' +/* eslint-env browser */ +import * as Y from '../../src/index.js' +import WebsocketProvider from '../../provider/websocket/WebSocketProvider.js' -createYdbClient('ws://localhost:8899/ws').then(ydbclient => { - const y = ydbclient.getY('textarea') - let type = y.define('textarea', Y.Text) - let textarea = document.querySelector('textarea') - window.binding = new Y.TextareaBinding(type, textarea) -}) +const provider = new WebsocketProvider('ws://localhost:1234/') +const ydocument = provider.get('textarea') +const type = ydocument.define('textarea', Y.Text) +const textarea = document.querySelector('textarea') +const binding = new Y.TextareaBinding(type, textarea) -/* -let y = new Y('textarea-example', { - connector: { - name: 'websockets-client', - url: 'http://127.0.0.1:1234' - } -}) - -window.yTextarea = y - -// bind the textarea to a shared text element -let type = y.define('textarea', Y.Text) -let textarea = document.querySelector('textarea') -window.binding = new Y.TextareaBinding(type, textarea) -*/ \ No newline at end of file +window.textareaExample = { + provider, ydocument, type, textarea, binding +} diff --git a/package-lock.json b/package-lock.json index 1872bda9..9635963b 100644 --- a/package-lock.json +++ b/package-lock.json @@ -271,6 +271,11 @@ "integrity": "sha1-GdOGodntxufByF04iu28xW0zYC0=", "dev": true }, + "async-limiter": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/async-limiter/-/async-limiter-1.0.0.tgz", + "integrity": "sha512-jp/uFnooOiO+L211eZOoSyzpOITMXx1rBITauYykG3BRYPu8h0UcxsPNB04RR5vo4Tyz3+ay17tR6JVf9qzYWg==" + }, "asynckit": { "version": "0.4.0", "resolved": "https://registry.npmjs.org/asynckit/-/asynckit-0.4.0.tgz", @@ -2597,7 +2602,7 @@ "dependencies": { "babel-generator": { "version": "6.11.4", - "resolved": "http://registry.npmjs.org/babel-generator/-/babel-generator-6.11.4.tgz", + "resolved": "https://registry.npmjs.org/babel-generator/-/babel-generator-6.11.4.tgz", "integrity": "sha1-FPaTOrsgxiZm0n47e59bncBxKpo=", "dev": true, "requires": { @@ -2628,7 +2633,7 @@ }, "minimist": { "version": "1.2.0", - "resolved": "http://registry.npmjs.org/minimist/-/minimist-1.2.0.tgz", + "resolved": "https://registry.npmjs.org/minimist/-/minimist-1.2.0.tgz", "integrity": "sha1-o1AIsg9BOD7sH7kU9M1d95omQoQ=", "dev": true }, @@ -7019,6 +7024,14 @@ "mkdirp": "^0.5.1" } }, + "ws": { + "version": "6.1.0", + "resolved": "https://registry.npmjs.org/ws/-/ws-6.1.0.tgz", + "integrity": "sha512-H3dGVdGvW2H8bnYpIDc3u3LH8Wue3Qh+Zto6aXXFzvESkTVT6rAfKR6tR/+coaUvxs8yHtmNV0uioBF62ZGSTg==", + "requires": { + "async-limiter": "~1.0.0" + } + }, "xml-name-validator": { "version": "2.0.1", "resolved": "https://registry.npmjs.org/xml-name-validator/-/xml-name-validator-2.0.1.tgz", diff --git a/package.json b/package.json index 148c5a13..db142b69 100644 --- a/package.json +++ b/package.json @@ -69,5 +69,8 @@ "rollup-regenerator-runtime": "^6.23.1", "rollup-watch": "^3.2.2", "standard": "^11.0.1" + }, + "dependencies": { + "ws": "^6.1.0" } } diff --git a/provider/websocket/WebSocketProvider.js b/provider/websocket/WebSocketProvider.js new file mode 100644 index 00000000..ee1c36c8 --- /dev/null +++ b/provider/websocket/WebSocketProvider.js @@ -0,0 +1,85 @@ +/* eslint-env browser */ + +import * as Y from '../../src/index.js' +export * from '../../src/index.js' + +const reconnectTimeout = 100 + +const setupWS = (doc, url) => { + const websocket = new WebSocket(url) + websocket.binaryType = 'arraybuffer' + doc.ws = websocket + websocket.onmessage = event => { + const decoder = Y.createDecoder(event.data) + const encoder = Y.createEncoder() + doc.mux(() => + Y.readMessage(decoder, encoder, doc) + ) + if (Y.length(encoder) > 0) { + websocket.send(Y.toBuffer(encoder)) + } + } + websocket.onclose = () => { + doc.ws = null + doc.wsconnected = false + doc.emit('status', { + status: 'connected' + }) + setTimeout(setupWS, reconnectTimeout, doc, url) + } + websocket.onopen = () => { + doc.wsconnected = true + doc.emit('status', { + status: 'disconnected' + }) + // always send sync step 1 when connected + const encoder = Y.createEncoder() + Y.writeSyncStep1(encoder, doc) + websocket.send(Y.toBuffer(encoder)) + } +} + +const broadcastUpdate = (y, transaction) => { + if (y.wsconnected && transaction.encodedStructsLen > 0) { + y.mux(() => { + const encoder = Y.createEncoder() + Y.writeUpdate(encoder, transaction.encodedStructsLen, transaction.encodedStructs) + y.ws.send(Y.toBuffer(encoder)) + }) + } +} + +class WebsocketsSharedDocument extends Y.Y { + constructor (url) { + super() + this.wsconnected = false + this.mux = Y.createMutex() + setupWS(this, url) + this.on('afterTransaction', broadcastUpdate) + } +} + +export default class WebsocketProvider { + constructor (url) { + // ensure that url is always ends with / + while (url[url.length - 1] === '/') { + url = url.slice(0, url.length - 1) + } + this.url = url + '/' + /** + * @type {Map} + */ + this.docs = new Map() + } + /** + * @param {string} name + * @return {WebsocketsSharedDocument} + */ + get (name) { + let doc = this.docs.get(name) + if (doc === undefined) { + doc = new WebsocketsSharedDocument(this.url + name) + } + return doc + } +} diff --git a/provider/websocket/server.js b/provider/websocket/server.js new file mode 100644 index 00000000..b23ed34d --- /dev/null +++ b/provider/websocket/server.js @@ -0,0 +1,53 @@ +const Y = require('../../build/node/index.js') +const WebSocket = require('ws') +const wss = new WebSocket.Server({ port: 1234 }) +const docs = new Map() + +const afterTransaction = (doc, transaction) => { + if (transaction.encodedStructsLen > 0) { + const encoder = Y.createEncoder() + Y.writeUpdate(encoder, transaction.encodedStructsLen, transaction.encodedStructs) + const message = Y.toBuffer(encoder) + doc.conns.forEach(conn => conn.send(message)) + } +} + +class WSSharedDoc extends Y.Y { + constructor () { + super() + this.mux = Y.createMutex() + this.conns = new Set() + this.on('afterTransaction', afterTransaction) + } +} + +const messageListener = (conn, doc, message) => { + const encoder = Y.createEncoder() + const decoder = Y.createDecoder(message) + Y.readMessage(decoder, encoder, doc) + if (Y.length(encoder) > 0) { + conn.send(Y.toBuffer(encoder)) + } +} + +const setupConnection = (conn, req) => { + conn.binaryType = 'arraybuffer' + // get doc, create if it does not exist yet + let doc = docs.get(req.url.slice(1)) + if (doc === undefined) { + doc = new WSSharedDoc() + docs.set(req.url.slice(1), doc) + } + doc.conns.add(conn) + // listen and reply to events + conn.on('message', message => messageListener(conn, doc, message)) + conn.on('close', () => + doc.conns.delete(conn) + ) + // send sync step 1 + const encoder = Y.createEncoder() + Y.writeSyncStep1(encoder, doc) + conn.send(Y.toBuffer(encoder)) +} + +wss.on('connection', setupConnection) diff --git a/rollup.node.js b/rollup.node.js index d80070a0..46f1bdcf 100644 --- a/rollup.node.js +++ b/rollup.node.js @@ -1,28 +1,18 @@ -import nodeResolve from 'rollup-plugin-node-resolve' -import commonjs from 'rollup-plugin-commonjs' -var pkg = require('./package.json') +const pkg = require('./package.json') export default { - input: 'src/Y.dist.js', - nameame: 'Y', - sourcemap: true, + input: 'src/index.js', output: { - file: 'y.node.js', - format: 'cjs' - }, - plugins: [ - nodeResolve({ - main: true, - module: true, - browser: true - }), - commonjs() - ], - banner: ` + name: 'Y', + file: 'build/node/index.js', + format: 'cjs', + sourcemap: true, + banner: ` /** * ${pkg.name} - ${pkg.description} * @version v${pkg.version} * @license ${pkg.license} */ ` + } } diff --git a/src/index.js b/src/index.js index 78202a1b..f4d65e51 100644 --- a/src/index.js +++ b/src/index.js @@ -36,6 +36,10 @@ export { default as DomBinding } from './Bindings/DomBinding/DomBinding.js' export { default as domToType } from './Bindings/DomBinding/domToType.js' export { domsToTypes, switchAssociation } from './Bindings/DomBinding/util.js' +export * from './message.js' +export * from '../lib/encoding.js' +export * from '../lib/decoding.js' +export * from '../lib/mutex.js' // TODO: reorder (Item* should have low numbers) registerStruct(0, ItemJSON)