large scale refactoring
This commit is contained in:
5
provider/websocket.js
Normal file
5
provider/websocket.js
Normal file
@@ -0,0 +1,5 @@
|
||||
/**
|
||||
* @module provider/websocket
|
||||
*/
|
||||
|
||||
export * from './websocket/WebSocketProvider.js'
|
||||
@@ -1,7 +1,11 @@
|
||||
/**
|
||||
* @module provider/websocket
|
||||
*/
|
||||
|
||||
/* eslint-env browser */
|
||||
|
||||
import * as Y from '../../src/index.js'
|
||||
export * from '../../src/index.js'
|
||||
import * as Y from '../../index.js'
|
||||
export * from '../../index.js'
|
||||
|
||||
const messageSync = 0
|
||||
const messageAwareness = 1
|
||||
@@ -95,7 +99,7 @@ class WebsocketsSharedDocument extends Y.Y {
|
||||
}
|
||||
}
|
||||
|
||||
export default class WebsocketProvider {
|
||||
export class WebsocketProvider {
|
||||
constructor (url) {
|
||||
// ensure that url is always ends with /
|
||||
while (url[url.length - 1] === '/') {
|
||||
|
||||
@@ -1,4 +1,8 @@
|
||||
const Y = require('../../build/node/index.js')
|
||||
/**
|
||||
* @module provider/websocket
|
||||
*/
|
||||
|
||||
const Y = require('../../build/yjs.umd.js')
|
||||
const WebSocket = require('ws')
|
||||
const wss = new WebSocket.Server({ port: 1234 })
|
||||
const docs = new Map()
|
||||
|
||||
23
provider/ydb/NamedEventHandler.js
Normal file
23
provider/ydb/NamedEventHandler.js
Normal file
@@ -0,0 +1,23 @@
|
||||
/**
|
||||
* @module provider/ydb
|
||||
*/
|
||||
|
||||
import * as globals from './globals.js'
|
||||
|
||||
export const Class = class NamedEventHandler {
|
||||
constructor () {
|
||||
this.l = globals.createMap()
|
||||
}
|
||||
on (eventname, f) {
|
||||
const l = this.l
|
||||
let h = l.get(eventname)
|
||||
if (h === undefined) {
|
||||
h = globals.createSet()
|
||||
l.set(eventname, h)
|
||||
}
|
||||
h.add(f)
|
||||
}
|
||||
}
|
||||
|
||||
export const fire = (handler, eventname, event) =>
|
||||
handler.l.get(eventname).forEach(f => f(event))
|
||||
1
provider/ydb/README.md
Normal file
1
provider/ydb/README.md
Normal file
@@ -0,0 +1 @@
|
||||
* Host should discard message when confNumber is older than expected
|
||||
56
provider/ydb/TODO.md
Normal file
56
provider/ydb/TODO.md
Normal file
@@ -0,0 +1,56 @@
|
||||
Implement default dom filter..
|
||||
|
||||
But requires more explicit filtering of src attributes
|
||||
|
||||
e.g. src="java\nscript:alert(0)"
|
||||
|
||||
function domFilter (nodeName, attributes) {
|
||||
// Filter all attributes that start with on*. E.g. onclick does execute code
|
||||
// If key is 'href' or 'src', filter everything but 'http*', 'blob*', or 'data:image*' urls
|
||||
attributes.forEach(function (value, key) {
|
||||
key = key.toLowerCase();
|
||||
value = value.toLowerCase();
|
||||
if (key != null && (
|
||||
// filter all attributes starting with 'on'
|
||||
key.substr(0, 2) === 'on' ||
|
||||
// if key is 'href' or 'src', filter everything but http, blob, or data:image
|
||||
(
|
||||
(key === 'href' || key === 'src') &&
|
||||
value.substr(0, 4) !== 'http' &&
|
||||
value.substr(0, 4) !== 'blob' &&
|
||||
value.substr(0, 10) !== 'data:image'
|
||||
)
|
||||
)) {
|
||||
attributes.delete(key);
|
||||
}
|
||||
});
|
||||
switch (nodeName) {
|
||||
case 'SCRIPT':
|
||||
return null;
|
||||
case 'EN-ADORNMENTS':
|
||||
// TODO: Remove EN-ADORNMENTS check when merged into master branch!
|
||||
return null;
|
||||
case 'EN-TABLE':
|
||||
attributes.delete('class');
|
||||
return attributes;
|
||||
case 'EN-COMMENT':
|
||||
attributes.delete('style');
|
||||
attributes.delete('class');
|
||||
return attributes;
|
||||
case 'SPAN':
|
||||
return (attributes.get('id') || '').substr(0, 5) === 'goog_' ? null : attributes;
|
||||
case 'TD':
|
||||
attributes.delete('class');
|
||||
return attributes;
|
||||
case 'EMBED':
|
||||
attributes.delete('src');
|
||||
attributes.delete('style');
|
||||
attributes.delete('data-reference');
|
||||
return attributes;
|
||||
case 'FORM':
|
||||
attributes.delete('action');
|
||||
return attributes;
|
||||
default:
|
||||
return (nodeName || '').substr(0, 3) === 'UI-' ? null : attributes;
|
||||
}
|
||||
}
|
||||
218
provider/ydb/YdbClient.js
Normal file
218
provider/ydb/YdbClient.js
Normal file
@@ -0,0 +1,218 @@
|
||||
/**
|
||||
* @module provider/ydb
|
||||
*/
|
||||
|
||||
/* eslint-env browser */
|
||||
import * as idbactions from './idbactions.js'
|
||||
import * as globals from '../../lib/globals.js'
|
||||
import * as message from './message.js'
|
||||
import * as bc from './broadcastchannel.js'
|
||||
import * as encoding from '../../lib/encoding.js'
|
||||
import * as logging from '../../lib/logging.js'
|
||||
import * as idb from '../../lib/idb.js'
|
||||
import * as decoding from '../../lib/decoding.js'
|
||||
import { Y } from '../../utils/Y.js'
|
||||
import { integrateRemoteStruct } from '../MessageHandler/integrateRemoteStructs.js'
|
||||
import { createMutualExclude } from '../../lib/mutualExclude.js'
|
||||
|
||||
import * as NamedEventHandler from './NamedEventHandler.js'
|
||||
|
||||
/**
|
||||
* @typedef RoomState
|
||||
* @type {Object}
|
||||
* @property {number} rsid room session id, -1 if unknown (created locally)
|
||||
* @property {number} offset By server, -1 if unknown
|
||||
* @property {number} cOffset current offset by client
|
||||
*/
|
||||
|
||||
/**
|
||||
* @typedef SyncState
|
||||
* @type {Object}
|
||||
* @property {boolean} upsynced True if all local updates have been sent to the server and the server confirmed that it received the update
|
||||
* @property {boolean} downsynced True if the current session subscribed to the room, the server confirmed the subscription, and the initial data was received
|
||||
* @property {boolean} persisted True if the server confirmed that it persisted all published data
|
||||
*/
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
export class YdbClient extends NamedEventHandler.Class {
|
||||
constructor (url, db) {
|
||||
super()
|
||||
this.url = url
|
||||
this.ws = new WebSocket(url)
|
||||
this.rooms = globals.createMap()
|
||||
this.db = db
|
||||
this.connected = false
|
||||
/**
|
||||
* Set of room states. We try to keep it up in sync with idb, but this may fail due to concurrency with other windows.
|
||||
* TODO: implement tests for this
|
||||
* @type Map<string, RoomState>
|
||||
*/
|
||||
this.roomStates = globals.createMap()
|
||||
/**
|
||||
* Meta information about unconfirmed updates created by this client.
|
||||
* Maps from confid to roomname
|
||||
* @type Map<number, string>
|
||||
*/
|
||||
this.clientUnconfirmedStates = globals.createMap()
|
||||
bc.subscribeYdbEvents(this)
|
||||
initWS(this, this.ws)
|
||||
}
|
||||
/**
|
||||
* Open a Yjs instance that connects to `roomname`.
|
||||
* @param {string} roomname
|
||||
* @return {Y}
|
||||
*/
|
||||
getY (roomname) {
|
||||
const y = new Y(roomname)
|
||||
const mutex = createMutualExclude()
|
||||
y.on('afterTransaction', (y, transaction) => mutex(() => {
|
||||
if (transaction.encodedStructsLen > 0) {
|
||||
update(this, roomname, transaction.encodedStructs.createBuffer())
|
||||
}
|
||||
}))
|
||||
subscribe(this, roomname, update => mutex(() => {
|
||||
y.transact(() => {
|
||||
const decoder = decoding.createDecoder(update)
|
||||
while (decoding.hasContent(decoder)) {
|
||||
integrateRemoteStruct(y, decoder)
|
||||
}
|
||||
}, true)
|
||||
}))
|
||||
return y
|
||||
}
|
||||
getRoomState (roomname) {
|
||||
return bc.computeRoomState(this, bc.getUnconfirmedRooms(this), roomname)
|
||||
}
|
||||
getRoomStates () {
|
||||
const unconfirmedRooms = bc.getUnconfirmedRooms(this)
|
||||
const states = globals.createMap()
|
||||
this.roomStates.forEach((rstate, roomname) => states.set(roomname, bc.computeRoomState(this, unconfirmedRooms, roomname)))
|
||||
return states
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize WebSocket connection. Try to reconnect on error/disconnect.
|
||||
* @param {YdbClient} ydb
|
||||
* @param {WebSocket} ws
|
||||
*/
|
||||
const initWS = (ydb, ws) => {
|
||||
ws.binaryType = 'arraybuffer'
|
||||
ws.onclose = () => {
|
||||
ydb.connected = false
|
||||
logging.log('Disconnected from ydb. Reconnecting..')
|
||||
ydb.ws = new WebSocket(ydb.url)
|
||||
initWS(ydb, ws)
|
||||
}
|
||||
ws.onopen = () => {
|
||||
const t = idbactions.createTransaction(ydb.db)
|
||||
globals.pall([idbactions.getRoomMetas(t), idbactions.getUnconfirmedSubscriptions(t), idbactions.getUnconfirmedUpdates(t)]).then(([metas, us, unconfirmedUpdates]) => {
|
||||
let subs = []
|
||||
metas.forEach(meta => {
|
||||
subs.push({
|
||||
room: meta.room,
|
||||
offset: meta.offset,
|
||||
rsid: meta.rsid
|
||||
})
|
||||
})
|
||||
us.forEach(room => {
|
||||
subs.push({
|
||||
room, offset: 0, rsid: 0
|
||||
})
|
||||
})
|
||||
subs = subs.filter(subdev => !ydb.roomStates.has(subdev.room)) // filter already subbed rooms
|
||||
ydb.connected = true
|
||||
const encoder = encoding.createEncoder()
|
||||
if (subs.length > 0) {
|
||||
encoding.writeArrayBuffer(encoder, message.createSub(subs))
|
||||
bc._broadcastYdbSyncingRoomsToServer(subs.map(subdev => subdev.room))
|
||||
}
|
||||
encoding.writeArrayBuffer(encoder, unconfirmedUpdates)
|
||||
send(ydb, encoding.toBuffer(encoder))
|
||||
})
|
||||
}
|
||||
ws.onmessage = event => message.readMessage(ydb, event.data)
|
||||
}
|
||||
|
||||
// maps from dbNamespace to db
|
||||
const dbPromises = new Map()
|
||||
|
||||
/**
|
||||
* Factory function. Get a ydb instance that connects to url, and uses dbNamespace as indexeddb namespace.
|
||||
* Create if it does not exist yet.
|
||||
*
|
||||
* @param {string} url
|
||||
* @param {string} dbNamespace
|
||||
* @return {Promise<YdbClient>}
|
||||
*/
|
||||
export const get = (url, dbNamespace = 'ydb') => {
|
||||
if (!dbPromises.has(dbNamespace)) {
|
||||
dbPromises.set(dbNamespace, idbactions.openDB(dbNamespace))
|
||||
}
|
||||
return dbPromises.get(dbNamespace).then(db => globals.presolve(new YdbClient(url, db)))
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove a db namespace. Call this to remove any persisted data. Make sure to close active sessions.
|
||||
* TODO: destroy active ydbClient sessions / throw if a session is still active
|
||||
* @param {string} dbNamespace
|
||||
* @return {Promise}
|
||||
*/
|
||||
export const clear = (dbNamespace = 'ydb') => idb.deleteDB(dbNamespace)
|
||||
|
||||
/**
|
||||
* @param {YdbClient} ydb
|
||||
* @param {ArrayBuffer} m
|
||||
*/
|
||||
export const send = (ydb, m) => ydb.connected && m.byteLength !== 0 && ydb.ws.send(m)
|
||||
|
||||
/**
|
||||
* @param {YdbClient} ydb
|
||||
* @param {string} room
|
||||
* @param {ArrayBuffer} update
|
||||
*/
|
||||
export const update = (ydb, room, update) => {
|
||||
bc.publishRoomData(room, update)
|
||||
const t = idbactions.createTransaction(ydb.db)
|
||||
logging.log(`Write Unconfirmed Update. room "${room}", ${JSON.stringify(update)}`)
|
||||
return idbactions.writeClientUnconfirmed(t, room, update).then(clientConf => {
|
||||
logging.log(`Send Unconfirmed Update. connected ${ydb.connected} room "${room}", clientConf ${clientConf}`)
|
||||
bc._broadcastYdbCUConfCreated(clientConf, room)
|
||||
send(ydb, message.createUpdate(room, update, clientConf))
|
||||
})
|
||||
}
|
||||
|
||||
export const subscribe = (ydb, room, f) => {
|
||||
bc.subscribeRoomData(room, f)
|
||||
const t = idbactions.createTransaction(ydb.db)
|
||||
if (!ydb.roomStates.has(room)) {
|
||||
subscribeRooms(ydb, [room])
|
||||
}
|
||||
idbactions.getRoomData(t, room).then(data => {
|
||||
if (data.byteLength > 0) {
|
||||
f(data)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
export const subscribeRooms = (ydb, rooms) => {
|
||||
const t = idbactions.createTransaction(ydb.db)
|
||||
let subs = []
|
||||
// TODO: try not to do too many single calls. Implement getRoomMetas(t, rooms) or retrieve all metas once and store them on ydb
|
||||
// TODO: find out performance of getRoomMetas with all metas
|
||||
return globals.pall(rooms.map(room => idbactions.getRoomMeta(t, room).then(meta => {
|
||||
if (meta === undefined) {
|
||||
subs.push(room)
|
||||
return idbactions.writeUnconfirmedSubscription(t, room)
|
||||
}
|
||||
}))).then(() => {
|
||||
subs = subs.filter(room => !ydb.roomStates.has(room))
|
||||
// write all sub messages when all unconfirmed subs are writted to idb
|
||||
if (subs.length > 0) {
|
||||
send(ydb, message.createSub(subs.map(room => ({room, offset: 0, rsid: 0}))))
|
||||
bc._broadcastYdbSyncingRoomsToServer(subs)
|
||||
}
|
||||
})
|
||||
}
|
||||
89
provider/ydb/YdbClient.test.js
Normal file
89
provider/ydb/YdbClient.test.js
Normal file
@@ -0,0 +1,89 @@
|
||||
/**
|
||||
* @module provider/ydb
|
||||
*/
|
||||
|
||||
/* eslint-env browser */
|
||||
|
||||
import * as test from './test.js'
|
||||
import * as ydbClient from './YdbClient.js'
|
||||
import * as globals from './globals.js'
|
||||
import * as idbactions from './idbactions.js'
|
||||
import * as logging from './logging.js'
|
||||
|
||||
const wsUrl = 'ws://127.0.0.1:8899/ws'
|
||||
const testRoom = 'testroom'
|
||||
|
||||
class YdbTestClient {
|
||||
constructor (ydb) {
|
||||
this.ydb = ydb
|
||||
this.createdUpdates = new Set()
|
||||
this.data = []
|
||||
this.checked = new Set()
|
||||
}
|
||||
}
|
||||
|
||||
const clearAllYdbContent = () => fetch('http://127.0.0.1:8899/clearAll', { mode: 'no-cors' })
|
||||
|
||||
/**
|
||||
* @param {string} name
|
||||
* @return {Promise<YdbTestClient>}
|
||||
*/
|
||||
const getTestClient = async name => {
|
||||
await ydbClient.clear('ydb-' + name)
|
||||
const ydb = await ydbClient.get(wsUrl, 'ydb-' + name)
|
||||
const testClient = new YdbTestClient(ydb)
|
||||
ydbClient.subscribe(ydb, testRoom, data => {
|
||||
testClient.data.push(data)
|
||||
globals.createArrayFromArrayBuffer(data).forEach(d => {
|
||||
if (d < nextUpdateNumber) {
|
||||
testClient.checked.add(d)
|
||||
}
|
||||
})
|
||||
console.log(name, 'received data', data, testClient.data)
|
||||
})
|
||||
return testClient
|
||||
}
|
||||
|
||||
// TODO: does only work for 8bit numbers..
|
||||
let nextUpdateNumber = 0
|
||||
|
||||
/**
|
||||
* Create an update. We use an increasing message counter so each update is unique.
|
||||
* @param {YdbTestClient} client
|
||||
*/
|
||||
const update = (client) => {
|
||||
ydbClient.update(client.ydb, testRoom, globals.createArrayBufferFromArray([nextUpdateNumber++]))
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if tsclient has all data in dataset
|
||||
* @param {...YdbTestClient} testClients
|
||||
*/
|
||||
const checkTestClients = (...testClients) => globals.until(100000, () => testClients.every(testClient =>
|
||||
testClient.checked.size === nextUpdateNumber
|
||||
)).then(() =>
|
||||
globals.wait(150) // wait 150 for all conf messages to come in..
|
||||
// TODO: do the below check in the until handler
|
||||
).then(() => globals.pall(testClients.map(testClient => idbactions.getRoomData(idbactions.createTransaction(testClient.ydb.db), testRoom)))).then(testClientsData => {
|
||||
testClientsData.forEach((testClientData, i) => {
|
||||
const checked = new Set()
|
||||
globals.createArrayFromArrayBuffer(testClientData).forEach(d => {
|
||||
if (checked.has(d)) {
|
||||
logging.fail('duplicate content')
|
||||
}
|
||||
checked.add(d)
|
||||
})
|
||||
if (checked.size !== nextUpdateNumber) {
|
||||
logging.fail(`Not all data is available in idb in client ${i}`)
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
clearAllYdbContent().then(() => {
|
||||
test.run('ydb-client', async testname => {
|
||||
const c1 = await getTestClient('1')
|
||||
const c2 = await getTestClient('2')
|
||||
update(c1)
|
||||
await checkTestClients(c1, c2)
|
||||
})
|
||||
})
|
||||
309
provider/ydb/broadcastchannel.js
Normal file
309
provider/ydb/broadcastchannel.js
Normal file
@@ -0,0 +1,309 @@
|
||||
/**
|
||||
* @module provider/ydb
|
||||
*/
|
||||
|
||||
/* eslint-env browser */
|
||||
|
||||
import * as decoding from '../../lib/decoding.js'
|
||||
import * as encoding from '../../lib/encoding.js'
|
||||
import * as globals from '../../lib/globals.js'
|
||||
import * as NamedEventHandler from './NamedEventHandler.js'
|
||||
|
||||
const bc = new BroadcastChannel('ydb-client')
|
||||
/**
|
||||
* @type {Map<string, Set<Function>>}
|
||||
*/
|
||||
const datasubs = globals.createMap()
|
||||
/**
|
||||
* @type {Set<any>} Set of Ydb instances
|
||||
*/
|
||||
const ydbinstances = globals.createSet()
|
||||
|
||||
const bcRoomDataMessage = 0
|
||||
const bcYdbCUConfCreated = 1
|
||||
const bcYdbCUConfConfirmed = 2
|
||||
const bcYdbRemoteOffsetReceived = 3
|
||||
const bcYdbRemoteOffsetConfirmed = 4
|
||||
const bcYdbSyncingRoomsToServer = 5
|
||||
const bcYdbSyncFromServer = 6
|
||||
|
||||
export const getUnconfirmedRooms = ydb => {
|
||||
const unconfirmedRooms = globals.createSet()
|
||||
ydb.clientUnconfirmedStates.forEach(room => unconfirmedRooms.add(room))
|
||||
return unconfirmedRooms
|
||||
}
|
||||
|
||||
export const computeRoomState = (ydb, unconfirmedRooms, room) => {
|
||||
// state is a RoomState, defined in YdbClient.js
|
||||
const state = ydb.roomStates.get(room)
|
||||
if (state === undefined) {
|
||||
return {
|
||||
upsynced: false,
|
||||
downsynced: false,
|
||||
persisted: false
|
||||
}
|
||||
}
|
||||
return {
|
||||
upsynced: !unconfirmedRooms.has(room),
|
||||
downsynced: state.offset >= 0 && state.coffset >= state.offset,
|
||||
persisted: state.coffset === state.offset && state.offset >= 0 && !unconfirmedRooms.has(room)
|
||||
}
|
||||
}
|
||||
|
||||
let roomStatesUpdating = []
|
||||
const fireRoomStateUpdate = (ydb, room) => {
|
||||
roomStatesUpdating.push(room)
|
||||
if (roomStatesUpdating.length === 1) {
|
||||
// first time this is called, trigger actual publisher
|
||||
// setTimeout(() => {
|
||||
const updated = new Map()
|
||||
const unconfirmedRooms = getUnconfirmedRooms(ydb)
|
||||
roomStatesUpdating.forEach(room => {
|
||||
if (!updated.has(room)) {
|
||||
updated.set(room, computeRoomState(ydb, unconfirmedRooms, room))
|
||||
}
|
||||
})
|
||||
NamedEventHandler.fire(ydb, 'syncstate', {
|
||||
updated
|
||||
})
|
||||
roomStatesUpdating = []
|
||||
// }, 0)
|
||||
}
|
||||
}
|
||||
|
||||
const receiveBCData = data => {
|
||||
const decoder = decoding.createDecoder(data)
|
||||
while (decoding.hasContent(decoder)) {
|
||||
const messageType = decoding.readVarUint(decoder)
|
||||
switch (messageType) {
|
||||
case bcRoomDataMessage: {
|
||||
const room = decoding.readVarString(decoder)
|
||||
const update = decoding.readTail(decoder)
|
||||
const rsubs = datasubs.get(room)
|
||||
if (rsubs !== undefined) {
|
||||
rsubs.forEach(f => f(update))
|
||||
}
|
||||
break
|
||||
}
|
||||
case bcYdbCUConfCreated: {
|
||||
const confid = decoding.readVarUint(decoder)
|
||||
const room = decoding.readVarString(decoder)
|
||||
ydbinstances.forEach(ydb => {
|
||||
ydb.clientUnconfirmedStates.set(confid, room)
|
||||
fireRoomStateUpdate(ydb, room)
|
||||
})
|
||||
break
|
||||
}
|
||||
case bcYdbCUConfConfirmed: {
|
||||
const confid = decoding.readVarUint(decoder)
|
||||
const offset = decoding.readVarUint(decoder)
|
||||
ydbinstances.forEach(ydb => {
|
||||
const room = ydb.clientUnconfirmedStates.get(confid)
|
||||
if (room !== undefined) {
|
||||
ydb.clientUnconfirmedStates.delete(confid)
|
||||
const state = ydb.roomStates.get(room)
|
||||
if (state.coffset < offset) {
|
||||
state.coffset = offset
|
||||
}
|
||||
fireRoomStateUpdate(ydb, room)
|
||||
}
|
||||
})
|
||||
break
|
||||
}
|
||||
case bcYdbRemoteOffsetReceived: {
|
||||
const len = decoding.readVarUint(decoder)
|
||||
for (let i = 0; i < len; i++) {
|
||||
const room = decoding.readVarString(decoder)
|
||||
const offset = decoding.readVarUint(decoder)
|
||||
ydbinstances.forEach(ydb => {
|
||||
// this is only called when an update is received
|
||||
// so roomState.get(room) should exist
|
||||
const state = ydb.roomStates.get(room)
|
||||
if (state.coffset < offset) {
|
||||
state.coffset = offset
|
||||
}
|
||||
fireRoomStateUpdate(ydb, room)
|
||||
})
|
||||
}
|
||||
break
|
||||
}
|
||||
case bcYdbRemoteOffsetConfirmed: {
|
||||
const len = decoding.readVarUint(decoder)
|
||||
for (let i = 0; i < len; i++) {
|
||||
const room = decoding.readVarString(decoder)
|
||||
const offset = decoding.readVarUint(decoder)
|
||||
ydbinstances.forEach(ydb => {
|
||||
const state = ydb.roomStates.get(room)
|
||||
state.offset = offset
|
||||
fireRoomStateUpdate(ydb, room)
|
||||
})
|
||||
}
|
||||
break
|
||||
}
|
||||
case bcYdbSyncingRoomsToServer: {
|
||||
const len = decoding.readVarUint(decoder)
|
||||
for (let i = 0; i < len; i++) {
|
||||
const room = decoding.readVarString(decoder)
|
||||
ydbinstances.forEach(ydb => {
|
||||
const state = ydb.roomStates.get(room)
|
||||
if (state === undefined) {
|
||||
ydb.roomStates.set(room, {
|
||||
rsid: -1,
|
||||
offset: -1,
|
||||
coffset: 0
|
||||
})
|
||||
fireRoomStateUpdate(ydb, room)
|
||||
}
|
||||
})
|
||||
}
|
||||
break
|
||||
}
|
||||
case bcYdbSyncFromServer: {
|
||||
const len = decoding.readVarUint(decoder)
|
||||
for (let i = 0; i < len; i++) {
|
||||
const room = decoding.readVarString(decoder)
|
||||
const offset = decoding.readVarUint(decoder)
|
||||
const rsid = decoding.readVarUint(decoder)
|
||||
ydbinstances.forEach(ydb => {
|
||||
const state = ydb.roomStates.get(room)
|
||||
state.offset = offset
|
||||
state.rsid = rsid
|
||||
fireRoomStateUpdate(ydb, room)
|
||||
})
|
||||
}
|
||||
break
|
||||
}
|
||||
default:
|
||||
globals.error('Unexpected bc message type')
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bc.onmessage = event => receiveBCData(event.data)
|
||||
|
||||
/**
|
||||
* Publish to all, including self
|
||||
* @param {encoding.Encoder} encoder
|
||||
*/
|
||||
export const publishAll = encoder => {
|
||||
const buffer = encoding.toBuffer(encoder)
|
||||
bc.postMessage(buffer)
|
||||
receiveBCData(buffer)
|
||||
}
|
||||
|
||||
/**
|
||||
* Call this when update was created by this user and confid was created
|
||||
* @param {number} cconf
|
||||
* @param {string} roomname
|
||||
*/
|
||||
export const _broadcastYdbCUConfCreated = (cconf, roomname) => {
|
||||
const encoder = encoding.createEncoder()
|
||||
encoding.writeVarUint(encoder, bcYdbCUConfCreated)
|
||||
encoding.writeVarUint(encoder, cconf)
|
||||
encoding.writeVarString(encoder, roomname)
|
||||
publishAll(encoder)
|
||||
}
|
||||
|
||||
/**
|
||||
* Call this when user confid was confirmed by host
|
||||
* @param {number} cconf
|
||||
* @param {number} offset The conf-offset of the client-created offset
|
||||
*/
|
||||
export const _broadcastYdbCUConfConfirmed = (cconf, offset) => {
|
||||
const encoder = encoding.createEncoder()
|
||||
encoding.writeVarUint(encoder, bcYdbCUConfConfirmed)
|
||||
encoding.writeVarUint(encoder, cconf)
|
||||
encoding.writeVarUint(encoder, offset)
|
||||
publishAll(encoder)
|
||||
}
|
||||
|
||||
/**
|
||||
* Call this when remote update is received (thus host has increased, but not confirmed, the offset)
|
||||
* @param {Array<Object>} subs sub is { room, offset }
|
||||
*/
|
||||
export const _broadcastYdbRemoteOffsetReceived = subs => {
|
||||
const encoder = encoding.createEncoder()
|
||||
encoding.writeVarUint(encoder, bcYdbRemoteOffsetReceived)
|
||||
encoding.writeVarUint(encoder, subs.length)
|
||||
subs.forEach(sub => {
|
||||
encoding.writeVarString(encoder, sub.room)
|
||||
encoding.writeVarUint(encoder, sub.offset)
|
||||
})
|
||||
publishAll(encoder)
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {Array<Object>} subs sub is { room, offset }
|
||||
*/
|
||||
export const _broadcastYdbRemoteOffsetConfirmed = subs => {
|
||||
const encoder = encoding.createEncoder()
|
||||
encoding.writeVarUint(encoder, bcYdbRemoteOffsetConfirmed)
|
||||
encoding.writeVarUint(encoder, subs.length)
|
||||
subs.forEach(sub => {
|
||||
encoding.writeVarString(encoder, sub.room)
|
||||
encoding.writeVarUint(encoder, sub.offset)
|
||||
})
|
||||
publishAll(encoder)
|
||||
}
|
||||
|
||||
/**
|
||||
* Call this when a subscription is created
|
||||
* @param {Array<string>} rooms
|
||||
*/
|
||||
export const _broadcastYdbSyncingRoomsToServer = rooms => {
|
||||
const encoder = encoding.createEncoder()
|
||||
encoding.writeVarUint(encoder, bcYdbSyncingRoomsToServer)
|
||||
encoding.writeVarUint(encoder, rooms.length)
|
||||
rooms.forEach(room => {
|
||||
encoding.writeVarString(encoder, room)
|
||||
})
|
||||
publishAll(encoder)
|
||||
}
|
||||
|
||||
/**
|
||||
* Call this when sync confirmed by host
|
||||
* @param {Array<Object>} subs sub is {room, offset, rsid}
|
||||
*/
|
||||
export const _broadcastYdbSyncFromServer = subs => {
|
||||
const encoder = encoding.createEncoder()
|
||||
encoding.writeVarUint(encoder, bcYdbSyncFromServer)
|
||||
encoding.writeVarUint(encoder, subs.length)
|
||||
subs.forEach(sub => {
|
||||
encoding.writeVarString(encoder, sub.room)
|
||||
encoding.writeVarUint(encoder, sub.offset)
|
||||
encoding.writeVarUint(encoder, sub.rsid)
|
||||
})
|
||||
publishAll(encoder)
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} room
|
||||
* @param {Function} f
|
||||
*/
|
||||
export const subscribeRoomData = (room, f) => {
|
||||
let rsubs = datasubs.get(room)
|
||||
if (rsubs === undefined) {
|
||||
rsubs = new Set()
|
||||
datasubs.set(room, rsubs)
|
||||
}
|
||||
rsubs.add(f)
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} room
|
||||
* @param {ArrayBuffer} update
|
||||
*/
|
||||
export const publishRoomData = (room, update) => {
|
||||
const encoder = encoding.createEncoder()
|
||||
encoding.writeVarString(encoder, room)
|
||||
encoding.writeArrayBuffer(encoder, update)
|
||||
bc.postMessage(encoding.toBuffer(encoder))
|
||||
// call subs directly here instead of calling receivedBCData
|
||||
const rsubs = datasubs.get(room)
|
||||
if (rsubs !== undefined) {
|
||||
rsubs.forEach(f => f(update))
|
||||
}
|
||||
}
|
||||
|
||||
export const subscribeYdbEvents = ydb =>
|
||||
ydbinstances.add(ydb)
|
||||
388
provider/ydb/idbactions.js
Normal file
388
provider/ydb/idbactions.js
Normal file
@@ -0,0 +1,388 @@
|
||||
/**
|
||||
* @module provider/ydb
|
||||
*/
|
||||
|
||||
/* eslint-env browser */
|
||||
|
||||
/**
|
||||
* Naming conventions:
|
||||
* * ydb: Think of ydb as a federated set of servers. This is not yet true, but we will eventually get there. With this assumption come some challenges with the client
|
||||
* * ydb instance: A single ydb instance that this ydb-client connects to
|
||||
* * (room) host: Exactly one ydb instance controls a room at any time. The ownership may change over time. The host of a room is the ydb instance that owns it. This is not necessarily the instance we connect to.
|
||||
* * room session id: An random id that is assigned to a room. When the server dies unexpectedly, we can conclude which data is missing and send it to the server (or delete it and prevent duplicate content)
|
||||
* * update: An ArrayBuffer of binary data. Neither Ydb nor Ydb-client care about the content of update. Updates may be appended to each other.
|
||||
*
|
||||
* The database has four tables:
|
||||
*
|
||||
* CU "client-unconfirmed" confid -> room, update
|
||||
* - The client writes to this table when it creates an update.
|
||||
* - Then it sends an update to the host with the generated confid
|
||||
* - In case the host doesn't confirm that it received this update, it is sent again on next sync
|
||||
* HU "host-unconfirmed" room, offset -> update
|
||||
* - Updates from the host are written to this table
|
||||
* - When host confirms that an unconfirmed update was persisted, the update is written to the Co table
|
||||
* - When client sync to host and the room session ids don't match, all host-unconfirmed messages are sent to host
|
||||
* Co "confirmed":
|
||||
* data:{room} -> update
|
||||
* - this field holds confirmed room updates
|
||||
* meta:{room} -> room session id, confirmed offset
|
||||
* - this field holds metadata about the room
|
||||
* US "unconfirmed-subscriptions" room -> _
|
||||
* - Subscriptions sent to the server, but didn't receive confirmation yet
|
||||
* - Either a room is in US or in Co
|
||||
* - A client may update a room when the room is in either US or Co
|
||||
*/
|
||||
|
||||
import * as encoding from '../../lib/encoding.js'
|
||||
import * as decoding from '../../lib/decoding.js'
|
||||
import * as idb from '../../lib/idb.js'
|
||||
import * as globals from '../../lib/globals.js'
|
||||
import * as message from './message.js'
|
||||
|
||||
/**
|
||||
* Get 'client-unconfirmed' store from transaction
|
||||
* @param {IDBTransaction} t
|
||||
* @return {IDBObjectStore}
|
||||
*/
|
||||
const getStoreCU = t => idb.getStore(t, STORE_CU)
|
||||
/**
|
||||
* Get 'host-unconfirmed' store from transaction
|
||||
* @param {IDBTransaction} t
|
||||
* @return {IDBObjectStore}
|
||||
*/
|
||||
const getStoreHU = t => idb.getStore(t, STORE_HU)
|
||||
/**
|
||||
* Get 'confirmed' store from transaction
|
||||
* @param {IDBTransaction} t
|
||||
* @return {IDBObjectStore}
|
||||
*/
|
||||
const getStoreCo = t => idb.getStore(t, STORE_CO)
|
||||
|
||||
/**
|
||||
* Get `unconfirmed-subscriptions` store from transaction
|
||||
* @param {IDBTransaction} t
|
||||
* @return {IDBObjectStore}
|
||||
*/
|
||||
const getStoreUS = t => idb.getStore(t, STORE_US)
|
||||
|
||||
/**
|
||||
* @param {string} room
|
||||
* @param {number} offset
|
||||
* @return {[string, number]}
|
||||
*/
|
||||
const encodeHUKey = (room, offset) => [room, offset]
|
||||
|
||||
/**
|
||||
* @typedef RoomAndOffset
|
||||
* @type {Object}
|
||||
* @property {string} room
|
||||
* @property {number} offset Received offsets (including offsets that are not yet confirmed)
|
||||
*/
|
||||
|
||||
/**
|
||||
* @param {[string, number]} key
|
||||
* @return {RoomAndOffset}
|
||||
*/
|
||||
const decodeHUKey = key => {
|
||||
return {
|
||||
room: key[0],
|
||||
offset: key[1]
|
||||
}
|
||||
}
|
||||
|
||||
const getCoMetaKey = room => 'meta:' + room
|
||||
const getCoDataKey = room => 'data:' + room
|
||||
|
||||
const STORE_CU = 'client-unconfirmed'
|
||||
const STORE_US = 'unconfirmed-subscriptions'
|
||||
const STORE_CO = 'confirmed'
|
||||
const STORE_HU = 'host-unconfirmed'
|
||||
|
||||
/**
|
||||
* @param {string} dbNamespace
|
||||
* @return {Promise<IDBDatabase>}
|
||||
*/
|
||||
export const openDB = dbNamespace => idb.openDB(dbNamespace, db => idb.createStores(db, [
|
||||
[STORE_CU, { autoIncrement: true }],
|
||||
[STORE_HU],
|
||||
[STORE_CO],
|
||||
[STORE_US]
|
||||
]))
|
||||
|
||||
export const deleteDB = name => idb.deleteDB(name)
|
||||
|
||||
/**
|
||||
* Create a new IDBTransaction accessing all object stores. Normally we should care that we can access object stores in parallel.
|
||||
* But this is not possible in ydb-client since at least two object stores are requested in every IDB change.
|
||||
* @param {IDBDatabase} db
|
||||
* @return {IDBTransaction}
|
||||
*/
|
||||
export const createTransaction = db => db.transaction([STORE_CU, STORE_HU, STORE_CO, STORE_US], 'readwrite')
|
||||
|
||||
/**
|
||||
* Write an update to the db after the client created it. This update is not yet received by the host.
|
||||
* This function returns a client confirmation number. The confirmation number must be send to the host so it can identify the update,
|
||||
* and we can move the update to HU when it is confirmed (@see writeHostUnconfirmedByClient)
|
||||
* @param {IDBTransaction} t
|
||||
* @param {String} room
|
||||
* @param {ArrayBuffer} update
|
||||
* @return {Promise<number>} client confirmation number
|
||||
*/
|
||||
export const writeClientUnconfirmed = (t, room, update) => {
|
||||
const encoder = encoding.createEncoder()
|
||||
encoding.writeVarString(encoder, room)
|
||||
encoding.writeArrayBuffer(encoder, update)
|
||||
return idb.addAutoKey(getStoreCU(t), encoding.toBuffer(encoder))
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all updates that are not yet confirmed by host.
|
||||
* @param {IDBTransaction} t
|
||||
* @return {Promise<ArrayBuffer>} All update messages as a single ArrayBuffer
|
||||
*/
|
||||
export const getUnconfirmedUpdates = t => {
|
||||
const encoder = encoding.createEncoder()
|
||||
return idb.iterate(getStoreCU(t), null, (value, clientConf) => {
|
||||
const decoder = decoding.createDecoder(value)
|
||||
const room = decoding.readVarString(decoder)
|
||||
const update = decoding.readTail(decoder)
|
||||
encoding.writeArrayBuffer(encoder, message.createUpdate(room, update, clientConf))
|
||||
}).then(() => encoding.toBuffer(encoder))
|
||||
}
|
||||
|
||||
/**
|
||||
* The host confirms that it received and persisted an update. The update can be safely removed from CU.
|
||||
* It is necessary to call this function in case that the client disconnected before the host could send `writeHostUnconfirmedByClient`.
|
||||
* @param {IDBTransaction} t
|
||||
* @param {number} clientConf
|
||||
* @return {Promise}
|
||||
*/
|
||||
export const confirmClient = (t, clientConf) => idb.del(getStoreCU(t), idb.createIDBKeyRangeUpperBound(clientConf, false))
|
||||
|
||||
/**
|
||||
* The host confirms that it received and broadcasted an update sent from this client.
|
||||
* Calling this method does not confirm that the update has been persisted by the server.
|
||||
*
|
||||
* Other clients will receive an update with `writeHostUnconfirmed`. Since this client created the update, it only receives a confirmation. So
|
||||
* we can simply move the update from CU to HU.
|
||||
*
|
||||
* @param {IDBTransaction} t
|
||||
* @param {number} clientConf The client confirmation number that identifies the update
|
||||
* @param {number} offset The offset with wich the server will store the information
|
||||
*/
|
||||
export const writeHostUnconfirmedByClient = (t, clientConf, offset) => idb.get(getStoreCU(t), clientConf).then(roomAndUpdate => {
|
||||
const decoder = decoding.createDecoder(roomAndUpdate)
|
||||
const room = decoding.readVarString(decoder)
|
||||
const update = decoding.readTail(decoder)
|
||||
return writeHostUnconfirmed(t, room, offset, update).then(() =>
|
||||
idb.del(getStoreCU(t), clientConf)
|
||||
)
|
||||
})
|
||||
|
||||
/**
|
||||
* The host broadcasts an update created by another client. It assures that the update will eventually be persisted with
|
||||
* `offset`. Calling this function does not imply that the update was persisted by the host. In case of mismatching room session ids
|
||||
* the updates in HU will be sent to the server.
|
||||
*
|
||||
* @param {IDBTransaction} t
|
||||
* @param {String} room
|
||||
* @param {number} offset
|
||||
* @param {ArrayBuffer} update
|
||||
* @return {Promise}
|
||||
*/
|
||||
export const writeHostUnconfirmed = (t, room, offset, update) => idb.put(getStoreHU(t), update, encodeHUKey(room, offset))
|
||||
|
||||
/**
|
||||
* The host confirms that it persisted updates up until (including) offset. updates may be moved from HU to Co.
|
||||
*
|
||||
* @param {IDBTransaction} t
|
||||
* @param {String} room
|
||||
* @param {number} offset Inclusive range [0, offset - 1] has been stored to host
|
||||
*/
|
||||
export const writeConfirmedByHost = (t, room, offset) => {
|
||||
const co = getStoreCo(t)
|
||||
return globals.pall([idb.get(co, getCoDataKey(room)), idb.get(co, getCoMetaKey(room))]).then(async arr => {
|
||||
const data = arr[0]
|
||||
const meta = decodeMetaValue(arr[1])
|
||||
const dataEncoder = encoding.createEncoder()
|
||||
if (meta.offset >= offset) {
|
||||
return // nothing to do
|
||||
}
|
||||
encoding.writeArrayBuffer(dataEncoder, data)
|
||||
const hu = getStoreHU(t)
|
||||
const huKeyRange = idb.createIDBKeyRangeBound(encodeHUKey(room, 0), encodeHUKey(room, offset), false, false)
|
||||
return idb.iterate(hu, huKeyRange, (value, _key) => {
|
||||
const key = decodeHUKey(_key) // @kevin _key is an array. remove decodeHUKey functions
|
||||
if (key.room === room && key.offset <= offset) {
|
||||
encoding.writeArrayBuffer(dataEncoder, value)
|
||||
}
|
||||
}).then(() => {
|
||||
globals.pall([idb.put(co, encodeMetaValue(meta.rsid, offset), getCoMetaKey(room)), idb.put(co, encoding.toBuffer(dataEncoder), getCoDataKey(room)), idb.del(hu, huKeyRange)])
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* @typedef RoomMeta
|
||||
* @type {Object}
|
||||
* @property {string} room
|
||||
* @property {number} rsid Room session id
|
||||
* @property {number} offset Received offsets (including offsets that are not yet confirmed)
|
||||
*/
|
||||
|
||||
/**
|
||||
* Get all meta information for all rooms.
|
||||
*
|
||||
* @param {IDBTransaction} t
|
||||
* @return {Promise<Array<RoomMeta>>}
|
||||
*/
|
||||
export const getRoomMetas = t => {
|
||||
// const result = []
|
||||
const storeCo = getStoreCo(t)
|
||||
const coQuery = idb.createIDBKeyRangeLowerBound('meta:', false)
|
||||
return globals.pall([idb.getAll(storeCo, coQuery), idb.getAllKeys(storeCo, coQuery)]).then(([metaValues, metaKeys]) => globals.pall(metaValues.map((metavalue, i) => {
|
||||
const room = metaKeys[i].slice(5)
|
||||
const { rsid, offset } = decodeMetaValue(metavalue)
|
||||
return {
|
||||
room,
|
||||
rsid,
|
||||
offset: offset
|
||||
}
|
||||
})))
|
||||
/*
|
||||
return idb.iterate(getStoreCo(t), idb.createIDBKeyRangeLowerBound('meta:', false), (metavalue, metakey) =>
|
||||
idb.getAllKeys(hu, idb.createIDBKeyRangeBound(encodeHUKey(metakey.slice(5), 0), encodeHUKey(metakey.slice(5), 2 ** 32), false, false)).then(keys => {
|
||||
const { rsid, offset } = decodeMetaValue(metavalue)
|
||||
result.push({
|
||||
room: metakey.slice(5),
|
||||
rsid,
|
||||
offset: keys.reduce((cur, key) => globals.max(decodeHUKey(key).offset, cur), offset)
|
||||
})
|
||||
})
|
||||
).then(() => globals.presolve(result))
|
||||
*/
|
||||
}
|
||||
|
||||
export const getRoomMeta = (t, room) =>
|
||||
idb.get(getStoreCo(t), getCoMetaKey(room))
|
||||
|
||||
/**
|
||||
* Get all data from idb, excluding unconfirmed updates.
|
||||
* TODO: include updates in CU
|
||||
* @param {IDBTransaction} t
|
||||
* @param {string} room
|
||||
* @return {Promise<ArrayBuffer>}
|
||||
*/
|
||||
export const getRoomDataWithoutCU = (t, room) => globals.pall([idb.get(getStoreCo(t), 'data:' + room), idb.getAll(getStoreHU(t), idb.createIDBKeyRangeBound(encodeHUKey(room, 0), encodeHUKey(room, 2 ** 32), false, false))]).then(([data, updates]) => {
|
||||
const encoder = encoding.createEncoder()
|
||||
encoding.writeArrayBuffer(encoder, data || new Uint8Array(0))
|
||||
updates.forEach(update => encoding.writeArrayBuffer(encoder, update))
|
||||
return encoding.toBuffer(encoder)
|
||||
})
|
||||
|
||||
/**
|
||||
* Get all data from idb, including unconfirmed updates.
|
||||
* TODO: include updates in CU
|
||||
* @param {IDBTransaction} t
|
||||
* @param {string} room
|
||||
* @return {Promise<ArrayBuffer>}
|
||||
*/
|
||||
export const getRoomData = (t, room) => globals.pall([idb.get(getStoreCo(t), 'data:' + room), idb.getAll(getStoreHU(t), idb.createIDBKeyRangeBound(encodeHUKey(room, 0), encodeHUKey(room, 2 ** 32), false, false)), idb.getAll(getStoreCU(t))]).then(([data, updates, cuUpdates]) => {
|
||||
const encoder = encoding.createEncoder()
|
||||
encoding.writeArrayBuffer(encoder, data || new Uint8Array(0))
|
||||
updates.forEach(update => encoding.writeArrayBuffer(encoder, update))
|
||||
cuUpdates.forEach(roomAndUpdate => {
|
||||
const decoder = decoding.createDecoder(roomAndUpdate)
|
||||
if (decoding.readVarString(decoder) === room) {
|
||||
encoding.writeArrayBuffer(encoder, decoding.readTail(decoder))
|
||||
}
|
||||
})
|
||||
return encoding.toBuffer(encoder)
|
||||
})
|
||||
|
||||
const decodeMetaValue = buffer => {
|
||||
const decoder = decoding.createDecoder(buffer)
|
||||
const rsid = decoding.readVarUint(decoder)
|
||||
const offset = decoding.readVarUint(decoder)
|
||||
return {
|
||||
rsid, offset
|
||||
}
|
||||
}
|
||||
/**
|
||||
* @param {number} rsid room session id
|
||||
* @param {number} offset
|
||||
* @return {ArrayBuffer}
|
||||
*/
|
||||
const encodeMetaValue = (rsid, offset) => {
|
||||
const encoder = encoding.createEncoder()
|
||||
encoding.writeVarUint(encoder, rsid)
|
||||
encoding.writeVarUint(encoder, offset)
|
||||
return encoding.toBuffer(encoder)
|
||||
}
|
||||
|
||||
const writeInitialCoEntry = (t, room, roomsessionid, offset) => globals.pall([
|
||||
idb.put(getStoreCo(t), encodeMetaValue(roomsessionid, offset), getCoMetaKey(room)),
|
||||
idb.put(getStoreCo(t), globals.createArrayBufferFromArray([]), getCoDataKey(room))
|
||||
])
|
||||
|
||||
const _confirmSub = (t, metaval, sub) => {
|
||||
if (metaval === undefined) {
|
||||
return writeInitialCoEntry(t, sub.room, sub.rsid, sub.offset).then(() => idb.del(getStoreUS(t), sub.room)).then(() => null)
|
||||
}
|
||||
const meta = decodeMetaValue(metaval)
|
||||
if (meta.rsid !== sub.rsid) {
|
||||
// TODO: Yjs sync with server here
|
||||
// get all room data (without CU) and save it as a client update. Then remove all data
|
||||
return getRoomDataWithoutCU(t, sub.room)
|
||||
.then(roomdata =>
|
||||
writeClientUnconfirmed(t, sub.room, roomdata)
|
||||
.then(clientConf => message.createUpdate(sub.room, roomdata, clientConf))
|
||||
.then(update =>
|
||||
writeInitialCoEntry(t, sub.room, sub.rsid, sub.offset).then(() => update)
|
||||
)
|
||||
)
|
||||
} else if (meta.offset < sub.offset) {
|
||||
return writeConfirmedByHost(t, sub.room, sub.offset).then(() => null)
|
||||
} else {
|
||||
// nothing needs to happen
|
||||
return null
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @typedef Sub
|
||||
* @type {Object}
|
||||
* @property {string} room room name
|
||||
* @property {number} rsid room session id
|
||||
* @property {number} offset
|
||||
*/
|
||||
|
||||
/**
|
||||
* Set the initial room data. Overwrites initial data if there is any!
|
||||
* @param {IDBTransaction} t
|
||||
* @param {Sub} sub
|
||||
* @return {Promise<ArrayBuffer?>} Message to send to server
|
||||
*/
|
||||
export const confirmSubscription = (t, sub) => idb.get(getStoreCo(t), getCoMetaKey(sub.room)).then(metaval => _confirmSub(t, metaval, sub))
|
||||
|
||||
export const confirmSubscriptions = (t, subs) => idb.getAllKeysValues(getStoreCo(t), idb.createIDBKeyRangeLowerBound('meta:', false)).then(kvs => {
|
||||
const ps = []
|
||||
const subMap = new Map()
|
||||
subs.forEach(sub => subMap.set(sub.room, sub))
|
||||
for (let i = 0, len = kvs.length; i < len; i++) {
|
||||
const kv = kvs[i]
|
||||
const kvroom = kv.k.slice(5)
|
||||
const exSub = subMap.get(kvroom)
|
||||
if (exSub !== undefined) {
|
||||
subMap.delete(kvroom)
|
||||
ps.push(_confirmSub(t, kv.v, exSub))
|
||||
}
|
||||
}
|
||||
// all remaining elements in subMap do not exist yet in Co.
|
||||
subMap.forEach(nonexSub => ps.push(_confirmSub(t, undefined, nonexSub)))
|
||||
return ps
|
||||
})
|
||||
|
||||
export const writeUnconfirmedSubscription = (t, room) => idb.put(getStoreUS(t), true, room)
|
||||
|
||||
export const getUnconfirmedSubscriptions = t => idb.getAllKeys(getStoreUS(t))
|
||||
23
provider/ydb/idbactions.test.js
Normal file
23
provider/ydb/idbactions.test.js
Normal file
@@ -0,0 +1,23 @@
|
||||
import * as globals from '../../lib/globals.js'
|
||||
import * as idbactions from './idbactions.js'
|
||||
import * as test from '../../lib/testing.js'
|
||||
|
||||
idbactions.deleteDB().then(() => idbactions.openDB()).then(db => {
|
||||
test.run('update lifetime 1', async (testname) => {
|
||||
const update = new Uint8Array([1, 2, 3]).buffer
|
||||
const t = idbactions.createTransaction(db)
|
||||
idbactions.writeInitialRoomData(t, testname, 42, 1, new Uint8Array([0]).buffer)
|
||||
const clientConf = await idbactions.writeClientUnconfirmed(t, testname, update)
|
||||
await idbactions.writeHostUnconfirmedByClient(t, clientConf, 0)
|
||||
await idbactions.writeConfirmedByHost(t, testname, 4)
|
||||
const metas = await idbactions.getRoomMetas(t)
|
||||
const roommeta = metas.find(meta => meta.room === testname)
|
||||
if (roommeta == null || roommeta.offset !== 4 || roommeta.rsid !== 42) {
|
||||
throw globals.error()
|
||||
}
|
||||
const data = await idbactions.getRoomData(t, testname)
|
||||
if (!test.compareArrays(new Uint8Array(data), new Uint8Array([0, 1, 2, 3]))) {
|
||||
throw globals.error()
|
||||
}
|
||||
})
|
||||
})
|
||||
11
provider/ydb/index.js
Normal file
11
provider/ydb/index.js
Normal file
@@ -0,0 +1,11 @@
|
||||
/**
|
||||
* @module provider/ydb
|
||||
*/
|
||||
|
||||
import * as ydbclient from './YdbClient.js'
|
||||
|
||||
/**
|
||||
* @param {string} url
|
||||
* @return {Promise<ydbclient.YdbClient>}
|
||||
*/
|
||||
export const createYdbClient = url => ydbclient.get(url)
|
||||
124
provider/ydb/message.js
Normal file
124
provider/ydb/message.js
Normal file
@@ -0,0 +1,124 @@
|
||||
/**
|
||||
* @module provider/ydb
|
||||
*/
|
||||
|
||||
import * as encoding from './encoding.js'
|
||||
import * as decoding from './decoding.js'
|
||||
import * as idbactions from './idbactions.js'
|
||||
import * as logging from './logging.js'
|
||||
import * as bc from './broadcastchannel.js'
|
||||
|
||||
/* make sure to update message.go in ydb when updating these values.. */
|
||||
export const MESSAGE_UPDATE = 0 // TODO: rename host_unconfirmed?
|
||||
export const MESSAGE_SUB = 1
|
||||
export const MESSAGE_CONFIRMATION = 2
|
||||
export const MESSAGE_SUB_CONF = 3
|
||||
export const MESSAGE_HOST_UNCONFIRMED_BY_CLIENT = 4
|
||||
export const MESSAGE_CONFIRMED_BY_HOST = 5
|
||||
|
||||
/**
|
||||
* @param {any} ydb YdbClient instance
|
||||
* @param {ArrayBuffer} message
|
||||
*/
|
||||
export const readMessage = (ydb, message) => {
|
||||
const t = idbactions.createTransaction(ydb.db)
|
||||
const decoder = decoding.createDecoder(message)
|
||||
while (decoding.hasContent(decoder)) {
|
||||
switch (decoding.readVarUint(decoder)) {
|
||||
case MESSAGE_UPDATE: {
|
||||
const offset = decoding.readVarUint(decoder)
|
||||
const room = decoding.readVarString(decoder)
|
||||
const update = decoding.readPayload(decoder)
|
||||
logging.log(`Received Update. room "${room}", offset ${offset}`)
|
||||
idbactions.writeHostUnconfirmed(t, room, offset, update)
|
||||
bc.publishRoomData(room, update)
|
||||
bc._broadcastYdbRemoteOffsetReceived([{ room, offset }])
|
||||
break
|
||||
}
|
||||
case MESSAGE_SUB_CONF: {
|
||||
const nSubs = decoding.readVarUint(decoder)
|
||||
const subs = []
|
||||
for (let i = 0; i < nSubs; i++) {
|
||||
const room = decoding.readVarString(decoder)
|
||||
const offset = decoding.readVarUint(decoder)
|
||||
const rsid = decoding.readVarUint(decoder)
|
||||
subs.push({
|
||||
room, offset, rsid
|
||||
})
|
||||
}
|
||||
bc._broadcastYdbSyncFromServer(subs)
|
||||
if (nSubs < 500) {
|
||||
subs.map(sub => idbactions.confirmSubscription(t, sub))
|
||||
} else {
|
||||
idbactions.confirmSubscriptions(t, subs)
|
||||
}
|
||||
break
|
||||
}
|
||||
case MESSAGE_CONFIRMATION: { // TODO: duplicate with MESSAGE_CONFIRMED_BY_HOST!
|
||||
const room = decoding.readVarString(decoder)
|
||||
const offset = decoding.readVarUint(decoder)
|
||||
logging.log(`Received Confirmation. room "${room}", offset ${offset}`)
|
||||
idbactions.writeConfirmedByHost(t, room, offset)
|
||||
bc._broadcastYdbRemoteOffsetConfirmed([{ room, offset }])
|
||||
break
|
||||
}
|
||||
case MESSAGE_HOST_UNCONFIRMED_BY_CLIENT: {
|
||||
const clientConf = decoding.readVarUint(decoder)
|
||||
const offset = decoding.readVarUint(decoder)
|
||||
logging.log(`Received HostUnconfirmedByClient. clientConf "${clientConf}", offset ${offset}`)
|
||||
idbactions.writeHostUnconfirmedByClient(t, clientConf, offset)
|
||||
bc._broadcastYdbCUConfConfirmed(clientConf, offset)
|
||||
break
|
||||
}
|
||||
case MESSAGE_CONFIRMED_BY_HOST: {
|
||||
const room = decoding.readVarString(decoder)
|
||||
const offset = decoding.readVarUint(decoder)
|
||||
logging.log(`Received Confirmation By Host. room "${room}", offset ${offset}`)
|
||||
idbactions.writeConfirmedByHost(t, room, offset)
|
||||
bc._broadcastYdbRemoteOffsetConfirmed([{ room, offset }])
|
||||
break
|
||||
}
|
||||
default:
|
||||
logging.fail(`Unexpected message type`)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} room
|
||||
* @param {ArrayBuffer} update
|
||||
* @param {number} clientConf
|
||||
* @return {ArrayBuffer}
|
||||
*/
|
||||
export const createUpdate = (room, update, clientConf) => {
|
||||
const encoder = encoding.createEncoder()
|
||||
encoding.writeVarUint(encoder, MESSAGE_UPDATE)
|
||||
encoding.writeVarUint(encoder, clientConf)
|
||||
encoding.writeVarString(encoder, room)
|
||||
encoding.writePayload(encoder, update)
|
||||
return encoding.toBuffer(encoder)
|
||||
}
|
||||
|
||||
/**
|
||||
* @typedef SubDef
|
||||
* @type {Object}
|
||||
* @property {string} room
|
||||
* @property {number} offset
|
||||
* @property {number} rsid
|
||||
*/
|
||||
|
||||
/**
|
||||
* @param {Array<SubDef>} rooms
|
||||
* @return {ArrayBuffer}
|
||||
*/
|
||||
export const createSub = rooms => {
|
||||
const encoder = encoding.createEncoder()
|
||||
encoding.writeVarUint(encoder, MESSAGE_SUB)
|
||||
encoding.writeVarUint(encoder, rooms.length)
|
||||
for (let i = 0; i < rooms.length; i++) {
|
||||
encoding.writeVarString(encoder, rooms[i].room)
|
||||
encoding.writeVarUint(encoder, rooms[i].offset)
|
||||
encoding.writeVarUint(encoder, rooms[i].rsid)
|
||||
}
|
||||
return encoding.toBuffer(encoder)
|
||||
}
|
||||
Reference in New Issue
Block a user