diff --git a/examples/notes/index.js b/examples/notes/index.js index 021a7d93..0872f4d9 100644 --- a/examples/notes/index.js +++ b/examples/notes/index.js @@ -13,6 +13,7 @@ const uuidv4 = () => 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, c = createYdbClient('ws://localhost:8899/ws').then(ydbclient => { const y = ydbclient.getY('notelist') let ynotelist = y.define('notelist', Y.Array) + window.ynotelist = ynotelist const domNoteList = document.querySelector('.notelist') // utils @@ -63,24 +64,67 @@ createYdbClient('ws://localhost:8899/ws').then(ydbclient => { addEventListener(window, 'hashchange', updateEditor) updateEditor() + const styleSyncedState = (div, noteSyncedState) => { + let classes = [] + if (noteSyncedState.persisted) { + classes.push('persisted') + } else { + if (noteSyncedState.upsynced) { + classes.push('upsynced') + } else { + classes.push('noupsynced') + } + if (noteSyncedState.downsynced) { + classes.push('downsynced') + } else { + classes.push('nodownsynced') + } + } + div.setAttribute('class', classes.join(' ')) + } + + ydbclient.on('syncstate', event => event.updated.forEach((state, room) => { + const a = document.querySelector(`[href="#${room}"]`) + if (a !== null) { + styleSyncedState(a.firstChild, state) + } + })) + // render note list - const renderNoteList = addedElements => { + const renderNoteList = (elementList, insertRef = domNoteList.firstChild) => { const fragment = document.createDocumentFragment() - addedElements.forEach(note => { + const addNow = elementList.splice(0, 100) + addNow.forEach(note => { const a = document.createElement('a') + const div = document.createElement('div') + a.insertBefore(div, null) a.setAttribute('href', '#' + note.guid) - a.innerText = note.title + div.innerText = note.title + styleSyncedState(div, ydbclient.getRoomState(note.guid)) fragment.insertBefore(a, null) }) - domNoteList.insertBefore(fragment, domNoteList.firstChild) + if (domBinding == null) { + updateEditor() + } + domNoteList.insertBefore(fragment, insertRef) + if (elementList.length > 0) { + setTimeout(() => renderNoteList(elementList, insertRef), 100) + } + } + { + const notelist = ynotelist.toArray() + if (notelist.length > 0) { + renderNoteList(notelist) + ydb.subscribeRooms(ydbclient, notelist.map(note => note.guid)) + } } - renderNoteList(ynotelist.toArray()) - ydb.subscribeRooms(ydbclient, ynotelist.map(note => note.guid)) ynotelist.observe(event => { const addedNotes = [] event.addedElements.forEach(itemJson => itemJson._content.forEach(json => addedNotes.push(json))) - // const arr = ynotelist.toArray().filter(note => event.addedElements.has(note)) - renderNoteList(addedNotes.reverse()) + renderNoteList(addedNotes.slice().reverse()) // renderNoteList modifies addedNotes, so first make a copy of it + setTimeout(() => { + ydb.subscribeRooms(ydbclient, addedNotes.map(note => note.guid)) + }, 200) if (domBinding === null) { updateEditor() } diff --git a/examples/notes/style.css b/examples/notes/style.css index b58eb39f..c729f2ec 100644 --- a/examples/notes/style.css +++ b/examples/notes/style.css @@ -19,16 +19,21 @@ cursor: pointer; } -.sidebar a { - padding: 6px 8px 6px 16px; - text-decoration: none; - font-size: 13px; - color: #818181; - display: block; +.notelist > a { + padding: 6px 8px 6px 16px; + text-decoration: none; + font-size: 13px; + color: #818181; + display: block; } -.sidebar a.selected { - border-style: outset; +.notelist > a.selected { + border-style: outset; +} + +.notelist > a > div { + position: relative; + display: inline; } /* When you mouse over the navigation links, change their color */ @@ -54,4 +59,42 @@ [contenteditable]:focus { outline: 0px solid transparent; +} + +.persisted::before { + content: "✔"; + color: green; + position: absolute; + right: -14px; + top: 0px; +} + +.upsynced::before { + content: "↑"; + color: green; + position: absolute; + right: -14px; + top: 0px; +} + +.noupsynced::before { + content: "↑"; + color: red; + position: absolute; + right: -14px; + top: 0px; +} +.downsynced::after { + content: "↓"; + color: green; + position: absolute; + right: -22px; + top: 0px; +} +.nodownsynced::after { + content: "↓"; + color: red; + position: absolute; + right: -22px; + top: 0px; } \ No newline at end of file diff --git a/lib/encoding.js b/lib/encoding.js index e56197c2..74680fc2 100644 --- a/lib/encoding.js +++ b/lib/encoding.js @@ -7,7 +7,7 @@ const bits8 = 0b11111111 /** * A BinaryEncoder handles the encoding to an ArrayBuffer. */ -class Encoder { +export class Encoder { constructor () { this.cpos = 0 this.cbuf = globals.createUint8ArrayFromLen(1000) diff --git a/lib/globals.js b/lib/globals.js index 39ca5206..ee57b827 100644 --- a/lib/globals.js +++ b/lib/globals.js @@ -22,6 +22,10 @@ export const createUint8ArrayFromArrayBuffer = arraybuffer => new Uint8Array_(ar export const createArrayFromArrayBuffer = arraybuffer => Array.from(createUint8ArrayFromArrayBuffer(arraybuffer)) export const createPromise = f => new Promise(f) + +export const createMap = () => new Map() +export const createSet = () => new Set() + /** * `Promise.all` wait for all promises in the array to resolve and return the result * @param {Array>} arrp diff --git a/lib/idb.js b/lib/idb.js index 3a78261b..8136fd2c 100644 --- a/lib/idb.js +++ b/lib/idb.js @@ -96,6 +96,21 @@ export const getAll = (store, range) => export const getAllKeys = (store, range) => rtop(store.getAllKeys(range)) +/** + * @typedef KeyValuePair + * @type {Object} + * @property {any} k key + * @property {any} v Value + */ + +/** + * @param {IDBObjectStore} store + * @param {IDBKeyRange} [range] + * @return {Promise>} + */ +export const getAllKeysValues = (store, range) => + globals.pall([getAllKeys(store, range), getAll(store, range)]).then(([ks, vs]) => ks.map((k, i) => ({ k, v: vs[i] }))) + /** * Iterate on keys and values * @param {IDBObjectStore} store diff --git a/package-lock.json b/package-lock.json index 03cb332c..f5b8865d 100644 --- a/package-lock.json +++ b/package-lock.json @@ -2825,6 +2825,7 @@ "version": "0.0.9", "bundled": true, "dev": true, + "optional": true, "requires": { "inherits": "~2.0.0" } @@ -2849,7 +2850,8 @@ "buffer-shims": { "version": "1.0.0", "bundled": true, - "dev": true + "dev": true, + "optional": true }, "caseless": { "version": "0.12.0", @@ -2866,12 +2868,14 @@ "code-point-at": { "version": "1.1.0", "bundled": true, - "dev": true + "dev": true, + "optional": true }, "combined-stream": { "version": "1.0.5", "bundled": true, "dev": true, + "optional": true, "requires": { "delayed-stream": "~1.0.0" } @@ -2884,17 +2888,20 @@ "console-control-strings": { "version": "1.1.0", "bundled": true, - "dev": true + "dev": true, + "optional": true }, "core-util-is": { "version": "1.0.2", "bundled": true, - "dev": true + "dev": true, + "optional": true }, "cryptiles": { "version": "2.0.5", "bundled": true, "dev": true, + "optional": true, "requires": { "boom": "2.x.x" } @@ -2934,7 +2941,8 @@ "delayed-stream": { "version": "1.0.0", "bundled": true, - "dev": true + "dev": true, + "optional": true }, "delegates": { "version": "1.0.0", @@ -2966,7 +2974,8 @@ "extsprintf": { "version": "1.0.2", "bundled": true, - "dev": true + "dev": true, + "optional": true }, "forever-agent": { "version": "0.6.1", @@ -3089,6 +3098,7 @@ "version": "3.1.3", "bundled": true, "dev": true, + "optional": true, "requires": { "boom": "2.x.x", "cryptiles": "2.x.x", @@ -3136,6 +3146,7 @@ "version": "1.0.0", "bundled": true, "dev": true, + "optional": true, "requires": { "number-is-nan": "^1.0.0" } @@ -3149,7 +3160,8 @@ "isarray": { "version": "1.0.0", "bundled": true, - "dev": true + "dev": true, + "optional": true }, "isstream": { "version": "0.1.2", @@ -3222,12 +3234,14 @@ "mime-db": { "version": "1.27.0", "bundled": true, - "dev": true + "dev": true, + "optional": true }, "mime-types": { "version": "2.1.15", "bundled": true, "dev": true, + "optional": true, "requires": { "mime-db": "~1.27.0" } @@ -3303,7 +3317,8 @@ "number-is-nan": { "version": "1.0.1", "bundled": true, - "dev": true + "dev": true, + "optional": true }, "oauth-sign": { "version": "0.8.2", @@ -3361,7 +3376,8 @@ "process-nextick-args": { "version": "1.0.7", "bundled": true, - "dev": true + "dev": true, + "optional": true }, "punycode": { "version": "1.4.1", @@ -3399,6 +3415,7 @@ "version": "2.2.9", "bundled": true, "dev": true, + "optional": true, "requires": { "buffer-shims": "~1.0.0", "core-util-is": "~1.0.0", @@ -3450,7 +3467,8 @@ "safe-buffer": { "version": "5.0.1", "bundled": true, - "dev": true + "dev": true, + "optional": true }, "semver": { "version": "5.3.0", @@ -3474,6 +3492,7 @@ "version": "1.0.9", "bundled": true, "dev": true, + "optional": true, "requires": { "hoek": "2.x.x" } @@ -3507,6 +3526,7 @@ "version": "1.0.2", "bundled": true, "dev": true, + "optional": true, "requires": { "code-point-at": "^1.0.0", "is-fullwidth-code-point": "^1.0.0", @@ -3517,6 +3537,7 @@ "version": "1.0.1", "bundled": true, "dev": true, + "optional": true, "requires": { "safe-buffer": "^5.0.1" } @@ -3545,6 +3566,7 @@ "version": "2.2.1", "bundled": true, "dev": true, + "optional": true, "requires": { "block-stream": "*", "fstream": "^1.0.2", @@ -3600,7 +3622,8 @@ "util-deprecate": { "version": "1.0.2", "bundled": true, - "dev": true + "dev": true, + "optional": true }, "uuid": { "version": "3.0.1", diff --git a/src/MessageHandler/integrateRemoteStructs.js b/src/MessageHandler/integrateRemoteStructs.js index e4038675..0259dc39 100644 --- a/src/MessageHandler/integrateRemoteStructs.js +++ b/src/MessageHandler/integrateRemoteStructs.js @@ -49,6 +49,8 @@ function _integrateRemoteStructHelper (y, struct) { decoder.pos = oldPos if (missing.length === 0) { y._readyToIntegrate.push(missingDef.struct) + } else { + // TODO: throw error here } } }) diff --git a/src/YdbClient/NamedEventHandler.js b/src/YdbClient/NamedEventHandler.js new file mode 100644 index 00000000..0fa4d085 --- /dev/null +++ b/src/YdbClient/NamedEventHandler.js @@ -0,0 +1,20 @@ + +import * as globals from './globals.js' + +export const Class = class NamedEventHandler { + constructor () { + this.l = globals.createMap() + } + on (eventname, f) { + const l = this.l + let h = l.get(eventname) + if (h === undefined) { + h = globals.createSet() + l.set(eventname, h) + } + h.add(f) + } +} + +export const fire = (handler, eventname, event) => + handler.l.get(eventname).forEach(f => f(event)) diff --git a/src/YdbClient/YdbClient.js b/src/YdbClient/YdbClient.js index 2a63f7fa..e540a3c5 100644 --- a/src/YdbClient/YdbClient.js +++ b/src/YdbClient/YdbClient.js @@ -11,13 +11,48 @@ import BinaryDecoder from '../src/Util/Binary/Decoder.js' import { integrateRemoteStruct } from '../src/MessageHandler/integrateRemoteStructs.js' import { createMutualExclude } from '../src/Util/mutualExclude.js' -export class YdbClient { +import * as NamedEventHandler from './NamedEventHandler.js' + +/** + * @typedef RoomState + * @type {Object} + * @property {number} rsid room session id, -1 if unknown (created locally) + * @property {number} offset By server, -1 if unknown + * @property {number} cOffset current offset by client + */ + +/** + * @typedef SyncState + * @type {Object} + * @property {boolean} upsynced True if all local updates have been sent to the server and the server confirmed that it received the update + * @property {boolean} downsynced True if the current session subscribed to the room, the server confirmed the subscription, and the initial data was received + * @property {boolean} persisted True if the server confirmed that it persisted all published data + */ + +/** + * + */ +export class YdbClient extends NamedEventHandler.Class { constructor (url, db) { + super() this.url = url this.ws = new WebSocket(url) - this.rooms = new Map() + this.rooms = globals.createMap() this.db = db this.connected = false + /** + * Set of room states. We try to keep it up in sync with idb, but this may fail due to concurrency with other windows. + * TODO: implement tests for this + * @type Map + */ + this.roomStates = globals.createMap() + /** + * Meta information about unconfirmed updates created by this client. + * Maps from confid to roomname + * @type Map + */ + this.clientUnconfirmedStates = globals.createMap() + bc.subscribeYdbEvents(this) initWS(this, this.ws) } /** @@ -43,6 +78,15 @@ export class YdbClient { })) return y } + getRoomState (roomname) { + return bc.computeRoomState(this, bc.getUnconfirmedRooms(this), roomname) + } + getRoomStates () { + const unconfirmedRooms = bc.getUnconfirmedRooms(this) + const states = globals.createMap() + this.roomStates.forEach((rstate, roomname) => states.set(roomname, bc.computeRoomState(this, unconfirmedRooms, roomname))) + return states + } } /** @@ -61,21 +105,26 @@ const initWS = (ydb, ws) => { ws.onopen = () => { const t = idbactions.createTransaction(ydb.db) globals.pall([idbactions.getRoomMetas(t), idbactions.getUnconfirmedSubscriptions(t), idbactions.getUnconfirmedUpdates(t)]).then(([metas, us, unconfirmedUpdates]) => { - const subs = [] + let subs = [] metas.forEach(meta => { subs.push({ room: meta.room, - offset: meta.offset + offset: meta.offset, + rsid: meta.rsid }) }) us.forEach(room => { subs.push({ - room, offset: 0 + room, offset: 0, rsid: 0 }) }) + subs = subs.filter(subdev => !ydb.roomStates.has(subdev.room)) // filter already subbed rooms ydb.connected = true const encoder = encoding.createEncoder() - encoding.writeArrayBuffer(encoder, message.createSub(subs)) + if (subs.length > 0) { + encoding.writeArrayBuffer(encoder, message.createSub(subs)) + bc._broadcastYdbSyncingRoomsToServer(subs.map(subdev => subdev.room)) + } encoding.writeArrayBuffer(encoder, unconfirmedUpdates) send(ydb, encoding.toBuffer(encoder)) }) @@ -113,7 +162,7 @@ export const clear = (dbNamespace = 'ydb') => idb.deleteDB(dbNamespace) * @param {YdbClient} ydb * @param {ArrayBuffer} m */ -export const send = (ydb, m) => ydb.connected && ydb.ws.send(m) +export const send = (ydb, m) => ydb.connected && m.byteLength !== 0 && ydb.ws.send(m) /** * @param {YdbClient} ydb @@ -121,45 +170,45 @@ export const send = (ydb, m) => ydb.connected && ydb.ws.send(m) * @param {ArrayBuffer} update */ export const update = (ydb, room, update) => { - bc.publish(room, update) + bc.publishRoomData(room, update) const t = idbactions.createTransaction(ydb.db) logging.log(`Write Unconfirmed Update. room "${room}", ${JSON.stringify(update)}`) return idbactions.writeClientUnconfirmed(t, room, update).then(clientConf => { logging.log(`Send Unconfirmed Update. connected ${ydb.connected} room "${room}", clientConf ${clientConf}`) + bc._broadcastYdbCUConfCreated(clientConf, room) send(ydb, message.createUpdate(room, update, clientConf)) }) } export const subscribe = (ydb, room, f) => { - bc.subscribe(room, f) + bc.subscribeRoomData(room, f) const t = idbactions.createTransaction(ydb.db) + if (!ydb.roomStates.has(room)) { + subscribeRooms(ydb, [room]) + } idbactions.getRoomData(t, room).then(data => { if (data.byteLength > 0) { f(data) } }) - idbactions.getRoomMeta(t, room).then(meta => { - if (meta === undefined) { - logging.log(`Send Subscribe. room "${room}", offset ${0}`) - // TODO: maybe set prelim meta value so we don't sub twice - send(ydb, message.createSub([{ room, offset: 0 }])) - idbactions.writeUnconfirmedSubscription(t, room) - } - }) } export const subscribeRooms = (ydb, rooms) => { const t = idbactions.createTransaction(ydb.db) - const subs = [] + let subs = [] + // TODO: try not to do too many single calls. Implement getRoomMetas(t, rooms) or retrieve all metas once and store them on ydb + // TODO: find out performance of getRoomMetas with all metas return globals.pall(rooms.map(room => idbactions.getRoomMeta(t, room).then(meta => { if (meta === undefined) { subs.push(room) return idbactions.writeUnconfirmedSubscription(t, room) } }))).then(() => { + subs = subs.filter(room => !ydb.roomStates.has(room)) // write all sub messages when all unconfirmed subs are writted to idb if (subs.length > 0) { - send(ydb, message.createSub(rooms.map(room => ({room, offset: 0})))) + send(ydb, message.createSub(subs.map(room => ({room, offset: 0, rsid: 0})))) + bc._broadcastYdbSyncingRoomsToServer(subs) } }) } diff --git a/src/YdbClient/broadcastchannel.js b/src/YdbClient/broadcastchannel.js index 99d205e2..be5dc0cf 100644 --- a/src/YdbClient/broadcastchannel.js +++ b/src/YdbClient/broadcastchannel.js @@ -2,29 +2,285 @@ import * as decoding from './decoding.js' import * as encoding from './encoding.js' +import * as globals from './globals.js' +import * as NamedEventHandler from './NamedEventHandler.js' const bc = new BroadcastChannel('ydb-client') -const subs = new Map() +/** + * @type {Map>} + */ +const datasubs = globals.createMap() +/** + * @type {Set} Set of Ydb instances + */ +const ydbinstances = globals.createSet() -bc.onmessage = event => { - const decoder = decoding.createDecoder(event.data) - const room = decoding.readVarString(decoder) - const update = decoding.readTail(decoder) - const rsubs = subs.get(room) - if (rsubs !== undefined) { - rsubs.forEach(f => f(update)) +const bcRoomDataMessage = 0 +const bcYdbCUConfCreated = 1 +const bcYdbCUConfConfirmed = 2 +const bcYdbRemoteOffsetReceived = 3 +const bcYdbRemoteOffsetConfirmed = 4 +const bcYdbSyncingRoomsToServer = 5 +const bcYdbSyncFromServer = 6 + +export const getUnconfirmedRooms = ydb => { + const unconfirmedRooms = globals.createSet() + ydb.clientUnconfirmedStates.forEach(room => unconfirmedRooms.add(room)) + return unconfirmedRooms +} + +export const computeRoomState = (ydb, unconfirmedRooms, room) => { + // state is a RoomState, defined in YdbClient.js + const state = ydb.roomStates.get(room) + if (state === undefined) { + return { + upsynced: false, + downsynced: false, + persisted: false + } } + return { + upsynced: !unconfirmedRooms.has(room), + downsynced: state.offset >= 0 && state.coffset >= state.offset, + persisted: state.coffset === state.offset && state.offset >= 0 && !unconfirmedRooms.has(room) + } +} + +let roomStatesUpdating = [] +const fireRoomStateUpdate = (ydb, room) => { + roomStatesUpdating.push(room) + if (roomStatesUpdating.length === 1) { + // first time this is called, trigger actual publisher + // setTimeout(() => { + const updated = new Map() + const unconfirmedRooms = getUnconfirmedRooms(ydb) + roomStatesUpdating.forEach(room => { + if (!updated.has(room)) { + updated.set(room, computeRoomState(ydb, unconfirmedRooms, room)) + } + }) + NamedEventHandler.fire(ydb, 'syncstate', { + updated + }) + roomStatesUpdating = [] + // }, 0) + } +} + +const receiveBCData = data => { + const decoder = decoding.createDecoder(data) + while (decoding.hasContent(decoder)) { + const messageType = decoding.readVarUint(decoder) + switch (messageType) { + case bcRoomDataMessage: { + const room = decoding.readVarString(decoder) + const update = decoding.readTail(decoder) + const rsubs = datasubs.get(room) + if (rsubs !== undefined) { + rsubs.forEach(f => f(update)) + } + break + } + case bcYdbCUConfCreated: { + const confid = decoding.readVarUint(decoder) + const room = decoding.readVarString(decoder) + ydbinstances.forEach(ydb => { + ydb.clientUnconfirmedStates.set(confid, room) + fireRoomStateUpdate(ydb, room) + }) + break + } + case bcYdbCUConfConfirmed: { + const confid = decoding.readVarUint(decoder) + const offset = decoding.readVarUint(decoder) + ydbinstances.forEach(ydb => { + const room = ydb.clientUnconfirmedStates.get(confid) + if (room !== undefined) { + ydb.clientUnconfirmedStates.delete(confid) + const state = ydb.roomStates.get(room) + if (state.coffset < offset) { + state.coffset = offset + } + fireRoomStateUpdate(ydb, room) + } + }) + break + } + case bcYdbRemoteOffsetReceived: { + const len = decoding.readVarUint(decoder) + for (let i = 0; i < len; i++) { + const room = decoding.readVarString(decoder) + const offset = decoding.readVarUint(decoder) + ydbinstances.forEach(ydb => { + // this is only called when an update is received + // so roomState.get(room) should exist + const state = ydb.roomStates.get(room) + if (state.coffset < offset) { + state.coffset = offset + } + fireRoomStateUpdate(ydb, room) + }) + } + break + } + case bcYdbRemoteOffsetConfirmed: { + const len = decoding.readVarUint(decoder) + for (let i = 0; i < len; i++) { + const room = decoding.readVarString(decoder) + const offset = decoding.readVarUint(decoder) + ydbinstances.forEach(ydb => { + const state = ydb.roomStates.get(room) + state.offset = offset + fireRoomStateUpdate(ydb, room) + }) + } + break + } + case bcYdbSyncingRoomsToServer: { + const len = decoding.readVarUint(decoder) + for (let i = 0; i < len; i++) { + const room = decoding.readVarString(decoder) + ydbinstances.forEach(ydb => { + const state = ydb.roomStates.get(room) + if (state === undefined) { + ydb.roomStates.set(room, { + rsid: -1, + offset: -1, + coffset: 0 + }) + fireRoomStateUpdate(ydb, room) + } + }) + } + break + } + case bcYdbSyncFromServer: { + const len = decoding.readVarUint(decoder) + for (let i = 0; i < len; i++) { + const room = decoding.readVarString(decoder) + const offset = decoding.readVarUint(decoder) + const rsid = decoding.readVarUint(decoder) + ydbinstances.forEach(ydb => { + const state = ydb.roomStates.get(room) + state.offset = offset + state.rsid = rsid + fireRoomStateUpdate(ydb, room) + }) + } + break + } + default: + globals.error('Unexpected bc message type') + } + } +} + +bc.onmessage = event => receiveBCData(event.data) + +/** + * Publish to all, including self + * @param {encoding.Encoder} encoder + */ +export const publishAll = encoder => { + const buffer = encoding.toBuffer(encoder) + bc.postMessage(buffer) + receiveBCData(buffer) +} + +/** + * Call this when update was created by this user and confid was created + * @param {number} cconf + * @param {string} roomname + */ +export const _broadcastYdbCUConfCreated = (cconf, roomname) => { + const encoder = encoding.createEncoder() + encoding.writeVarUint(encoder, bcYdbCUConfCreated) + encoding.writeVarUint(encoder, cconf) + encoding.writeVarString(encoder, roomname) + publishAll(encoder) +} + +/** + * Call this when user confid was confirmed by host + * @param {number} cconf + * @param {number} offset The conf-offset of the client-created offset + */ +export const _broadcastYdbCUConfConfirmed = (cconf, offset) => { + const encoder = encoding.createEncoder() + encoding.writeVarUint(encoder, bcYdbCUConfConfirmed) + encoding.writeVarUint(encoder, cconf) + encoding.writeVarUint(encoder, offset) + publishAll(encoder) +} + +/** + * Call this when remote update is received (thus host has increased, but not confirmed, the offset) + * @param {Array} subs sub is { room, offset } + */ +export const _broadcastYdbRemoteOffsetReceived = subs => { + const encoder = encoding.createEncoder() + encoding.writeVarUint(encoder, bcYdbRemoteOffsetReceived) + encoding.writeVarUint(encoder, subs.length) + subs.forEach(sub => { + encoding.writeVarString(encoder, sub.room) + encoding.writeVarUint(encoder, sub.offset) + }) + publishAll(encoder) +} + +/** + * @param {Array} subs sub is { room, offset } + */ +export const _broadcastYdbRemoteOffsetConfirmed = subs => { + const encoder = encoding.createEncoder() + encoding.writeVarUint(encoder, bcYdbRemoteOffsetConfirmed) + encoding.writeVarUint(encoder, subs.length) + subs.forEach(sub => { + encoding.writeVarString(encoder, sub.room) + encoding.writeVarUint(encoder, sub.offset) + }) + publishAll(encoder) +} + +/** + * Call this when a subscription is created + * @param {Array} rooms + */ +export const _broadcastYdbSyncingRoomsToServer = rooms => { + const encoder = encoding.createEncoder() + encoding.writeVarUint(encoder, bcYdbSyncingRoomsToServer) + encoding.writeVarUint(encoder, rooms.length) + rooms.forEach(room => { + encoding.writeVarString(encoder, room) + }) + publishAll(encoder) +} + +/** + * Call this when sync confirmed by host + * @param {Array} subs sub is {room, offset, rsid} + */ +export const _broadcastYdbSyncFromServer = subs => { + const encoder = encoding.createEncoder() + encoding.writeVarUint(encoder, bcYdbSyncFromServer) + encoding.writeVarUint(encoder, subs.length) + subs.forEach(sub => { + encoding.writeVarString(encoder, sub.room) + encoding.writeVarUint(encoder, sub.offset) + encoding.writeVarUint(encoder, sub.rsid) + }) + publishAll(encoder) } /** * @param {string} room * @param {function(ArrayBuffer)} f */ -export const subscribe = (room, f) => { - let rsubs = subs.get(room) +export const subscribeRoomData = (room, f) => { + let rsubs = datasubs.get(room) if (rsubs === undefined) { rsubs = new Set() - subs.set(room, rsubs) + datasubs.set(room, rsubs) } rsubs.add(f) } @@ -33,13 +289,17 @@ export const subscribe = (room, f) => { * @param {string} room * @param {ArrayBuffer} update */ -export const publish = (room, update) => { +export const publishRoomData = (room, update) => { const encoder = encoding.createEncoder() encoding.writeVarString(encoder, room) encoding.writeArrayBuffer(encoder, update) bc.postMessage(encoding.toBuffer(encoder)) - const rsubs = subs.get(room) + // call subs directly here instead of calling receivedBCData + const rsubs = datasubs.get(room) if (rsubs !== undefined) { rsubs.forEach(f => f(update)) } } + +export const subscribeYdbEvents = ydb => + ydbinstances.add(ydb) diff --git a/src/YdbClient/idbactions.js b/src/YdbClient/idbactions.js index 9f960677..b3e78b40 100644 --- a/src/YdbClient/idbactions.js +++ b/src/YdbClient/idbactions.js @@ -213,7 +213,7 @@ export const writeConfirmedByHost = (t, room, offset) => { encoding.writeArrayBuffer(dataEncoder, value) } }).then(() => { - globals.pall([idb.put(co, encodeMetaValue(meta.roomsid, offset), getCoMetaKey(room)), idb.put(co, encoding.toBuffer(dataEncoder), getCoDataKey(room)), idb.del(hu, huKeyRange)]) + globals.pall([idb.put(co, encodeMetaValue(meta.rsid, offset), getCoMetaKey(room)), idb.put(co, encoding.toBuffer(dataEncoder), getCoDataKey(room)), idb.del(hu, huKeyRange)]) }) }) } @@ -222,7 +222,7 @@ export const writeConfirmedByHost = (t, room, offset) => { * @typedef RoomMeta * @type {Object} * @property {string} room - * @property {number} roomsid Room session id + * @property {number} rsid Room session id * @property {number} offset Received offsets (including offsets that are not yet confirmed) */ @@ -233,23 +233,49 @@ export const writeConfirmedByHost = (t, room, offset) => { * @return {Promise>} */ export const getRoomMetas = t => { - const hu = getStoreHU(t) - const result = [] + // const result = [] + const storeCo = getStoreCo(t) + const coQuery = idb.createIDBKeyRangeLowerBound('meta:', false) + return globals.pall([idb.getAll(storeCo, coQuery), idb.getAllKeys(storeCo, coQuery)]).then(([metaValues, metaKeys]) => globals.pall(metaValues.map((metavalue, i) => { + const room = metaKeys[i].slice(5) + const { rsid, offset } = decodeMetaValue(metavalue) + return { + room, + rsid, + offset: offset + } + }))) + /* return idb.iterate(getStoreCo(t), idb.createIDBKeyRangeLowerBound('meta:', false), (metavalue, metakey) => idb.getAllKeys(hu, idb.createIDBKeyRangeBound(encodeHUKey(metakey.slice(5), 0), encodeHUKey(metakey.slice(5), 2 ** 32), false, false)).then(keys => { - const { roomsid, offset } = decodeMetaValue(metavalue) + const { rsid, offset } = decodeMetaValue(metavalue) result.push({ room: metakey.slice(5), - roomsid, + rsid, offset: keys.reduce((cur, key) => globals.max(decodeHUKey(key).offset, cur), offset) }) }) ).then(() => globals.presolve(result)) + */ } export const getRoomMeta = (t, room) => idb.get(getStoreCo(t), getCoMetaKey(room)) +/** + * Get all data from idb, excluding unconfirmed updates. + * TODO: include updates in CU + * @param {IDBTransaction} t + * @param {string} room + * @return {Promise} + */ +export const getRoomDataWithoutCU = (t, room) => globals.pall([idb.get(getStoreCo(t), 'data:' + room), idb.getAll(getStoreHU(t), idb.createIDBKeyRangeBound(encodeHUKey(room, 0), encodeHUKey(room, 2 ** 32), false, false))]).then(([data, updates]) => { + const encoder = encoding.createEncoder() + encoding.writeArrayBuffer(encoder, data || new Uint8Array(0)) + updates.forEach(update => encoding.writeArrayBuffer(encoder, update)) + return encoding.toBuffer(encoder) +}) + /** * Get all data from idb, including unconfirmed updates. * TODO: include updates in CU @@ -257,57 +283,100 @@ export const getRoomMeta = (t, room) => * @param {string} room * @return {Promise} */ -export const getRoomData = (t, room) => globals.pall([idb.get(getStoreCo(t), 'data:' + room), idb.getAll(getStoreHU(t), idb.createIDBKeyRangeBound(encodeHUKey(room, 0), encodeHUKey(room, 2 ** 32), false, false))]).then(([data, updates]) => { +export const getRoomData = (t, room) => globals.pall([idb.get(getStoreCo(t), 'data:' + room), idb.getAll(getStoreHU(t), idb.createIDBKeyRangeBound(encodeHUKey(room, 0), encodeHUKey(room, 2 ** 32), false, false)), idb.getAll(getStoreCU(t))]).then(([data, updates, cuUpdates]) => { const encoder = encoding.createEncoder() encoding.writeArrayBuffer(encoder, data || new Uint8Array(0)) updates.forEach(update => encoding.writeArrayBuffer(encoder, update)) + cuUpdates.forEach(roomAndUpdate => { + const decoder = decoding.createDecoder(roomAndUpdate) + if (decoding.readVarString(decoder) === room) { + encoding.writeArrayBuffer(encoder, decoding.readTail(decoder)) + } + }) return encoding.toBuffer(encoder) }) const decodeMetaValue = buffer => { const decoder = decoding.createDecoder(buffer) - const roomsid = decoding.readVarUint(decoder) + const rsid = decoding.readVarUint(decoder) const offset = decoding.readVarUint(decoder) return { - roomsid, offset + rsid, offset } } /** - * @param {number} roomsid room session id + * @param {number} rsid room session id * @param {number} offset * @return {ArrayBuffer} */ -const encodeMetaValue = (roomsid, offset) => { +const encodeMetaValue = (rsid, offset) => { const encoder = encoding.createEncoder() - encoding.writeVarUint(encoder, roomsid) + encoding.writeVarUint(encoder, rsid) encoding.writeVarUint(encoder, offset) return encoding.toBuffer(encoder) } +const writeInitialCoEntry = (t, room, roomsessionid, offset) => globals.pall([ + idb.put(getStoreCo(t), encodeMetaValue(roomsessionid, offset), getCoMetaKey(room)), + idb.put(getStoreCo(t), globals.createArrayBufferFromArray([]), getCoDataKey(room)) +]) + +const _confirmSub = (t, metaval, sub) => { + if (metaval === undefined) { + return writeInitialCoEntry(t, sub.room, sub.rsid, sub.offset).then(() => idb.del(getStoreUS(t), sub.room)).then(() => null) + } + const meta = decodeMetaValue(metaval) + if (meta.rsid !== sub.rsid) { + // TODO: Yjs sync with server here + // get all room data (without CU) and save it as a client update. Then remove all data + return getRoomDataWithoutCU(t, sub.room) + .then(roomdata => + writeClientUnconfirmed(t, sub.room, roomdata) + .then(clientConf => message.createUpdate(sub.room, roomdata, clientConf)) + .then(update => + writeInitialCoEntry(t, sub.room, sub.rsid, sub.offset).then(() => update) + ) + ) + } else if (meta.offset < sub.offset) { + return writeConfirmedByHost(t, sub.room, sub.offset).then(() => null) + } else { + // nothing needs to happen + return null + } +} + +/** + * @typedef Sub + * @type {Object} + * @property {string} room room name + * @property {number} rsid room session id + * @property {number} offset + */ + /** * Set the initial room data. Overwrites initial data if there is any! * @param {IDBTransaction} t - * @param {string} room - * @param {number} roomsessionid - * @param {number} offset - * @return {Promise} + * @param {Sub} sub + * @return {Promise} Message to send to server */ -export const confirmSubscription = (t, room, roomsessionid, offset) => idb.get(getStoreCo(t), getCoMetaKey(room)).then(metaval => { - if (metaval === undefined) { - return globals.pall([ - idb.put(getStoreCo(t), encodeMetaValue(roomsessionid, offset), getCoMetaKey(room)), - idb.put(getStoreCo(t), globals.createArrayBufferFromArray([]), getCoDataKey(room)) - ]).then(() => idb.del(getStoreUS(t), room)) - } - const meta = decodeMetaValue(metaval) - if (meta.roomsid !== roomsessionid) { - // TODO: upload all unconfirmed updates - // or do a Yjs sync with server - } else if (meta.roomsid < offset) { - return writeConfirmedByHost(t, room, offset) - } else { - // nothing needs to happen +export const confirmSubscription = (t, sub) => idb.get(getStoreCo(t), getCoMetaKey(sub.room)).then(metaval => _confirmSub(t, metaval, sub)) + +export const confirmSubscriptions = (t, subs) => idb.getAllKeysValues(getStoreCo(t), idb.createIDBKeyRangeLowerBound('meta:', false)).then(kvs => { + const ps = [] + const subMap = new Map() + subs.forEach(sub => subMap.set(sub.room, sub)) + for (let i = 0, len = kvs.length; i < len; i++) { + const kv = kvs[i] + const kvroom = kv.k.slice(5) + const exSub = subMap.get(kvroom) + if (exSub !== undefined) { + subMap.delete(kvroom) + ps.push(_confirmSub(t, kv.v, exSub)) + } } + // all remaining elements in subMap do not exist yet in Co. + subMap.forEach(nonexSub => ps.push(_confirmSub(t, undefined, nonexSub))) + return ps }) export const writeUnconfirmedSubscription = (t, room) => idb.put(getStoreUS(t), true, room) diff --git a/src/YdbClient/idbactions.test.js b/src/YdbClient/idbactions.test.js index 1d41445b..745b5d9b 100644 --- a/src/YdbClient/idbactions.test.js +++ b/src/YdbClient/idbactions.test.js @@ -12,7 +12,7 @@ idbactions.deleteDB().then(() => idbactions.openDB()).then(db => { await idbactions.writeConfirmedByHost(t, testname, 4) const metas = await idbactions.getRoomMetas(t) const roommeta = metas.find(meta => meta.room === testname) - if (roommeta == null || roommeta.offset !== 4 || roommeta.roomsid !== 42) { + if (roommeta == null || roommeta.offset !== 4 || roommeta.rsid !== 42) { throw globals.error() } const data = await idbactions.getRoomData(t, testname) diff --git a/src/YdbClient/message.js b/src/YdbClient/message.js index 5a205c83..a076c366 100644 --- a/src/YdbClient/message.js +++ b/src/YdbClient/message.js @@ -27,25 +27,35 @@ export const readMessage = (ydb, message) => { const update = decoding.readPayload(decoder) logging.log(`Received Update. room "${room}", offset ${offset}`) idbactions.writeHostUnconfirmed(t, room, offset, update) - bc.publish(room, update) + bc.publishRoomData(room, update) + bc._broadcastYdbRemoteOffsetReceived([{ room, offset }]) break } case MESSAGE_SUB_CONF: { const nSubs = decoding.readVarUint(decoder) + const subs = [] for (let i = 0; i < nSubs; i++) { const room = decoding.readVarString(decoder) const offset = decoding.readVarUint(decoder) - const roomsid = decoding.readVarUint(decoder) // TODO: SID - // logging.log(`Received Sub Conf. room "${room}", offset ${offset}, roomsid ${roomsid}`) - idbactions.confirmSubscription(t, room, roomsid, offset) + const rsid = decoding.readVarUint(decoder) + subs.push({ + room, offset, rsid + }) + } + bc._broadcastYdbSyncFromServer(subs) + if (nSubs < 500) { + subs.map(sub => idbactions.confirmSubscription(t, sub)) + } else { + idbactions.confirmSubscriptions(t, subs) } break } - case MESSAGE_CONFIRMATION: { + case MESSAGE_CONFIRMATION: { // TODO: duplicate with MESSAGE_CONFIRMED_BY_HOST! const room = decoding.readVarString(decoder) const offset = decoding.readVarUint(decoder) logging.log(`Received Confirmation. room "${room}", offset ${offset}`) idbactions.writeConfirmedByHost(t, room, offset) + bc._broadcastYdbRemoteOffsetConfirmed([{ room, offset }]) break } case MESSAGE_HOST_UNCONFIRMED_BY_CLIENT: { @@ -53,6 +63,7 @@ export const readMessage = (ydb, message) => { const offset = decoding.readVarUint(decoder) logging.log(`Received HostUnconfirmedByClient. clientConf "${clientConf}", offset ${offset}`) idbactions.writeHostUnconfirmedByClient(t, clientConf, offset) + bc._broadcastYdbCUConfConfirmed(clientConf, offset) break } case MESSAGE_CONFIRMED_BY_HOST: { @@ -60,6 +71,7 @@ export const readMessage = (ydb, message) => { const offset = decoding.readVarUint(decoder) logging.log(`Received Confirmation By Host. room "${room}", offset ${offset}`) idbactions.writeConfirmedByHost(t, room, offset) + bc._broadcastYdbRemoteOffsetConfirmed([{ room, offset }]) break } default: @@ -88,6 +100,7 @@ export const createUpdate = (room, update, clientConf) => { * @type {Object} * @property {string} room * @property {number} offset + * @property {number} rsid */ /** @@ -101,6 +114,7 @@ export const createSub = rooms => { for (let i = 0; i < rooms.length; i++) { encoding.writeVarString(encoder, rooms[i].room) encoding.writeVarUint(encoder, rooms[i].offset) + encoding.writeVarUint(encoder, rooms[i].rsid) } return encoding.toBuffer(encoder) }