Merge branch 'ydb-integration' of https://github.com/y-js/yjs into ydb-integration
This commit is contained in:
commit
fe038822a3
@ -13,6 +13,7 @@ const uuidv4 = () => 'xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx'.replace(/[xy]/g, c =
|
|||||||
createYdbClient('ws://localhost:8899/ws').then(ydbclient => {
|
createYdbClient('ws://localhost:8899/ws').then(ydbclient => {
|
||||||
const y = ydbclient.getY('notelist')
|
const y = ydbclient.getY('notelist')
|
||||||
let ynotelist = y.define('notelist', Y.Array)
|
let ynotelist = y.define('notelist', Y.Array)
|
||||||
|
window.ynotelist = ynotelist
|
||||||
const domNoteList = document.querySelector('.notelist')
|
const domNoteList = document.querySelector('.notelist')
|
||||||
|
|
||||||
// utils
|
// utils
|
||||||
@ -63,24 +64,67 @@ createYdbClient('ws://localhost:8899/ws').then(ydbclient => {
|
|||||||
addEventListener(window, 'hashchange', updateEditor)
|
addEventListener(window, 'hashchange', updateEditor)
|
||||||
updateEditor()
|
updateEditor()
|
||||||
|
|
||||||
|
const styleSyncedState = (div, noteSyncedState) => {
|
||||||
|
let classes = []
|
||||||
|
if (noteSyncedState.persisted) {
|
||||||
|
classes.push('persisted')
|
||||||
|
} else {
|
||||||
|
if (noteSyncedState.upsynced) {
|
||||||
|
classes.push('upsynced')
|
||||||
|
} else {
|
||||||
|
classes.push('noupsynced')
|
||||||
|
}
|
||||||
|
if (noteSyncedState.downsynced) {
|
||||||
|
classes.push('downsynced')
|
||||||
|
} else {
|
||||||
|
classes.push('nodownsynced')
|
||||||
|
}
|
||||||
|
}
|
||||||
|
div.setAttribute('class', classes.join(' '))
|
||||||
|
}
|
||||||
|
|
||||||
|
ydbclient.on('syncstate', event => event.updated.forEach((state, room) => {
|
||||||
|
const a = document.querySelector(`[href="#${room}"]`)
|
||||||
|
if (a !== null) {
|
||||||
|
styleSyncedState(a.firstChild, state)
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
|
||||||
// render note list
|
// render note list
|
||||||
const renderNoteList = addedElements => {
|
const renderNoteList = (elementList, insertRef = domNoteList.firstChild) => {
|
||||||
const fragment = document.createDocumentFragment()
|
const fragment = document.createDocumentFragment()
|
||||||
addedElements.forEach(note => {
|
const addNow = elementList.splice(0, 100)
|
||||||
|
addNow.forEach(note => {
|
||||||
const a = document.createElement('a')
|
const a = document.createElement('a')
|
||||||
|
const div = document.createElement('div')
|
||||||
|
a.insertBefore(div, null)
|
||||||
a.setAttribute('href', '#' + note.guid)
|
a.setAttribute('href', '#' + note.guid)
|
||||||
a.innerText = note.title
|
div.innerText = note.title
|
||||||
|
styleSyncedState(div, ydbclient.getRoomState(note.guid))
|
||||||
fragment.insertBefore(a, null)
|
fragment.insertBefore(a, null)
|
||||||
})
|
})
|
||||||
domNoteList.insertBefore(fragment, domNoteList.firstChild)
|
if (domBinding == null) {
|
||||||
|
updateEditor()
|
||||||
|
}
|
||||||
|
domNoteList.insertBefore(fragment, insertRef)
|
||||||
|
if (elementList.length > 0) {
|
||||||
|
setTimeout(() => renderNoteList(elementList, insertRef), 100)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
{
|
||||||
|
const notelist = ynotelist.toArray()
|
||||||
|
if (notelist.length > 0) {
|
||||||
|
renderNoteList(notelist)
|
||||||
|
ydb.subscribeRooms(ydbclient, notelist.map(note => note.guid))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
renderNoteList(ynotelist.toArray())
|
|
||||||
ydb.subscribeRooms(ydbclient, ynotelist.map(note => note.guid))
|
|
||||||
ynotelist.observe(event => {
|
ynotelist.observe(event => {
|
||||||
const addedNotes = []
|
const addedNotes = []
|
||||||
event.addedElements.forEach(itemJson => itemJson._content.forEach(json => addedNotes.push(json)))
|
event.addedElements.forEach(itemJson => itemJson._content.forEach(json => addedNotes.push(json)))
|
||||||
// const arr = ynotelist.toArray().filter(note => event.addedElements.has(note))
|
renderNoteList(addedNotes.slice().reverse()) // renderNoteList modifies addedNotes, so first make a copy of it
|
||||||
renderNoteList(addedNotes.reverse())
|
setTimeout(() => {
|
||||||
|
ydb.subscribeRooms(ydbclient, addedNotes.map(note => note.guid))
|
||||||
|
}, 200)
|
||||||
if (domBinding === null) {
|
if (domBinding === null) {
|
||||||
updateEditor()
|
updateEditor()
|
||||||
}
|
}
|
||||||
|
@ -19,16 +19,21 @@
|
|||||||
cursor: pointer;
|
cursor: pointer;
|
||||||
}
|
}
|
||||||
|
|
||||||
.sidebar a {
|
.notelist > a {
|
||||||
padding: 6px 8px 6px 16px;
|
padding: 6px 8px 6px 16px;
|
||||||
text-decoration: none;
|
text-decoration: none;
|
||||||
font-size: 13px;
|
font-size: 13px;
|
||||||
color: #818181;
|
color: #818181;
|
||||||
display: block;
|
display: block;
|
||||||
}
|
}
|
||||||
|
|
||||||
.sidebar a.selected {
|
.notelist > a.selected {
|
||||||
border-style: outset;
|
border-style: outset;
|
||||||
|
}
|
||||||
|
|
||||||
|
.notelist > a > div {
|
||||||
|
position: relative;
|
||||||
|
display: inline;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* When you mouse over the navigation links, change their color */
|
/* When you mouse over the navigation links, change their color */
|
||||||
@ -54,4 +59,42 @@
|
|||||||
|
|
||||||
[contenteditable]:focus {
|
[contenteditable]:focus {
|
||||||
outline: 0px solid transparent;
|
outline: 0px solid transparent;
|
||||||
|
}
|
||||||
|
|
||||||
|
.persisted::before {
|
||||||
|
content: "✔";
|
||||||
|
color: green;
|
||||||
|
position: absolute;
|
||||||
|
right: -14px;
|
||||||
|
top: 0px;
|
||||||
|
}
|
||||||
|
|
||||||
|
.upsynced::before {
|
||||||
|
content: "↑";
|
||||||
|
color: green;
|
||||||
|
position: absolute;
|
||||||
|
right: -14px;
|
||||||
|
top: 0px;
|
||||||
|
}
|
||||||
|
|
||||||
|
.noupsynced::before {
|
||||||
|
content: "↑";
|
||||||
|
color: red;
|
||||||
|
position: absolute;
|
||||||
|
right: -14px;
|
||||||
|
top: 0px;
|
||||||
|
}
|
||||||
|
.downsynced::after {
|
||||||
|
content: "↓";
|
||||||
|
color: green;
|
||||||
|
position: absolute;
|
||||||
|
right: -22px;
|
||||||
|
top: 0px;
|
||||||
|
}
|
||||||
|
.nodownsynced::after {
|
||||||
|
content: "↓";
|
||||||
|
color: red;
|
||||||
|
position: absolute;
|
||||||
|
right: -22px;
|
||||||
|
top: 0px;
|
||||||
}
|
}
|
@ -7,7 +7,7 @@ const bits8 = 0b11111111
|
|||||||
/**
|
/**
|
||||||
* A BinaryEncoder handles the encoding to an ArrayBuffer.
|
* A BinaryEncoder handles the encoding to an ArrayBuffer.
|
||||||
*/
|
*/
|
||||||
class Encoder {
|
export class Encoder {
|
||||||
constructor () {
|
constructor () {
|
||||||
this.cpos = 0
|
this.cpos = 0
|
||||||
this.cbuf = globals.createUint8ArrayFromLen(1000)
|
this.cbuf = globals.createUint8ArrayFromLen(1000)
|
||||||
|
@ -22,6 +22,10 @@ export const createUint8ArrayFromArrayBuffer = arraybuffer => new Uint8Array_(ar
|
|||||||
export const createArrayFromArrayBuffer = arraybuffer => Array.from(createUint8ArrayFromArrayBuffer(arraybuffer))
|
export const createArrayFromArrayBuffer = arraybuffer => Array.from(createUint8ArrayFromArrayBuffer(arraybuffer))
|
||||||
|
|
||||||
export const createPromise = f => new Promise(f)
|
export const createPromise = f => new Promise(f)
|
||||||
|
|
||||||
|
export const createMap = () => new Map()
|
||||||
|
export const createSet = () => new Set()
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* `Promise.all` wait for all promises in the array to resolve and return the result
|
* `Promise.all` wait for all promises in the array to resolve and return the result
|
||||||
* @param {Array<Promise<any>>} arrp
|
* @param {Array<Promise<any>>} arrp
|
||||||
|
15
lib/idb.js
15
lib/idb.js
@ -96,6 +96,21 @@ export const getAll = (store, range) =>
|
|||||||
export const getAllKeys = (store, range) =>
|
export const getAllKeys = (store, range) =>
|
||||||
rtop(store.getAllKeys(range))
|
rtop(store.getAllKeys(range))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @typedef KeyValuePair
|
||||||
|
* @type {Object}
|
||||||
|
* @property {any} k key
|
||||||
|
* @property {any} v Value
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param {IDBObjectStore} store
|
||||||
|
* @param {IDBKeyRange} [range]
|
||||||
|
* @return {Promise<Array<KeyValuePair>>}
|
||||||
|
*/
|
||||||
|
export const getAllKeysValues = (store, range) =>
|
||||||
|
globals.pall([getAllKeys(store, range), getAll(store, range)]).then(([ks, vs]) => ks.map((k, i) => ({ k, v: vs[i] })))
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Iterate on keys and values
|
* Iterate on keys and values
|
||||||
* @param {IDBObjectStore} store
|
* @param {IDBObjectStore} store
|
||||||
|
47
package-lock.json
generated
47
package-lock.json
generated
@ -2825,6 +2825,7 @@
|
|||||||
"version": "0.0.9",
|
"version": "0.0.9",
|
||||||
"bundled": true,
|
"bundled": true,
|
||||||
"dev": true,
|
"dev": true,
|
||||||
|
"optional": true,
|
||||||
"requires": {
|
"requires": {
|
||||||
"inherits": "~2.0.0"
|
"inherits": "~2.0.0"
|
||||||
}
|
}
|
||||||
@ -2849,7 +2850,8 @@
|
|||||||
"buffer-shims": {
|
"buffer-shims": {
|
||||||
"version": "1.0.0",
|
"version": "1.0.0",
|
||||||
"bundled": true,
|
"bundled": true,
|
||||||
"dev": true
|
"dev": true,
|
||||||
|
"optional": true
|
||||||
},
|
},
|
||||||
"caseless": {
|
"caseless": {
|
||||||
"version": "0.12.0",
|
"version": "0.12.0",
|
||||||
@ -2866,12 +2868,14 @@
|
|||||||
"code-point-at": {
|
"code-point-at": {
|
||||||
"version": "1.1.0",
|
"version": "1.1.0",
|
||||||
"bundled": true,
|
"bundled": true,
|
||||||
"dev": true
|
"dev": true,
|
||||||
|
"optional": true
|
||||||
},
|
},
|
||||||
"combined-stream": {
|
"combined-stream": {
|
||||||
"version": "1.0.5",
|
"version": "1.0.5",
|
||||||
"bundled": true,
|
"bundled": true,
|
||||||
"dev": true,
|
"dev": true,
|
||||||
|
"optional": true,
|
||||||
"requires": {
|
"requires": {
|
||||||
"delayed-stream": "~1.0.0"
|
"delayed-stream": "~1.0.0"
|
||||||
}
|
}
|
||||||
@ -2884,17 +2888,20 @@
|
|||||||
"console-control-strings": {
|
"console-control-strings": {
|
||||||
"version": "1.1.0",
|
"version": "1.1.0",
|
||||||
"bundled": true,
|
"bundled": true,
|
||||||
"dev": true
|
"dev": true,
|
||||||
|
"optional": true
|
||||||
},
|
},
|
||||||
"core-util-is": {
|
"core-util-is": {
|
||||||
"version": "1.0.2",
|
"version": "1.0.2",
|
||||||
"bundled": true,
|
"bundled": true,
|
||||||
"dev": true
|
"dev": true,
|
||||||
|
"optional": true
|
||||||
},
|
},
|
||||||
"cryptiles": {
|
"cryptiles": {
|
||||||
"version": "2.0.5",
|
"version": "2.0.5",
|
||||||
"bundled": true,
|
"bundled": true,
|
||||||
"dev": true,
|
"dev": true,
|
||||||
|
"optional": true,
|
||||||
"requires": {
|
"requires": {
|
||||||
"boom": "2.x.x"
|
"boom": "2.x.x"
|
||||||
}
|
}
|
||||||
@ -2934,7 +2941,8 @@
|
|||||||
"delayed-stream": {
|
"delayed-stream": {
|
||||||
"version": "1.0.0",
|
"version": "1.0.0",
|
||||||
"bundled": true,
|
"bundled": true,
|
||||||
"dev": true
|
"dev": true,
|
||||||
|
"optional": true
|
||||||
},
|
},
|
||||||
"delegates": {
|
"delegates": {
|
||||||
"version": "1.0.0",
|
"version": "1.0.0",
|
||||||
@ -2966,7 +2974,8 @@
|
|||||||
"extsprintf": {
|
"extsprintf": {
|
||||||
"version": "1.0.2",
|
"version": "1.0.2",
|
||||||
"bundled": true,
|
"bundled": true,
|
||||||
"dev": true
|
"dev": true,
|
||||||
|
"optional": true
|
||||||
},
|
},
|
||||||
"forever-agent": {
|
"forever-agent": {
|
||||||
"version": "0.6.1",
|
"version": "0.6.1",
|
||||||
@ -3089,6 +3098,7 @@
|
|||||||
"version": "3.1.3",
|
"version": "3.1.3",
|
||||||
"bundled": true,
|
"bundled": true,
|
||||||
"dev": true,
|
"dev": true,
|
||||||
|
"optional": true,
|
||||||
"requires": {
|
"requires": {
|
||||||
"boom": "2.x.x",
|
"boom": "2.x.x",
|
||||||
"cryptiles": "2.x.x",
|
"cryptiles": "2.x.x",
|
||||||
@ -3136,6 +3146,7 @@
|
|||||||
"version": "1.0.0",
|
"version": "1.0.0",
|
||||||
"bundled": true,
|
"bundled": true,
|
||||||
"dev": true,
|
"dev": true,
|
||||||
|
"optional": true,
|
||||||
"requires": {
|
"requires": {
|
||||||
"number-is-nan": "^1.0.0"
|
"number-is-nan": "^1.0.0"
|
||||||
}
|
}
|
||||||
@ -3149,7 +3160,8 @@
|
|||||||
"isarray": {
|
"isarray": {
|
||||||
"version": "1.0.0",
|
"version": "1.0.0",
|
||||||
"bundled": true,
|
"bundled": true,
|
||||||
"dev": true
|
"dev": true,
|
||||||
|
"optional": true
|
||||||
},
|
},
|
||||||
"isstream": {
|
"isstream": {
|
||||||
"version": "0.1.2",
|
"version": "0.1.2",
|
||||||
@ -3222,12 +3234,14 @@
|
|||||||
"mime-db": {
|
"mime-db": {
|
||||||
"version": "1.27.0",
|
"version": "1.27.0",
|
||||||
"bundled": true,
|
"bundled": true,
|
||||||
"dev": true
|
"dev": true,
|
||||||
|
"optional": true
|
||||||
},
|
},
|
||||||
"mime-types": {
|
"mime-types": {
|
||||||
"version": "2.1.15",
|
"version": "2.1.15",
|
||||||
"bundled": true,
|
"bundled": true,
|
||||||
"dev": true,
|
"dev": true,
|
||||||
|
"optional": true,
|
||||||
"requires": {
|
"requires": {
|
||||||
"mime-db": "~1.27.0"
|
"mime-db": "~1.27.0"
|
||||||
}
|
}
|
||||||
@ -3303,7 +3317,8 @@
|
|||||||
"number-is-nan": {
|
"number-is-nan": {
|
||||||
"version": "1.0.1",
|
"version": "1.0.1",
|
||||||
"bundled": true,
|
"bundled": true,
|
||||||
"dev": true
|
"dev": true,
|
||||||
|
"optional": true
|
||||||
},
|
},
|
||||||
"oauth-sign": {
|
"oauth-sign": {
|
||||||
"version": "0.8.2",
|
"version": "0.8.2",
|
||||||
@ -3361,7 +3376,8 @@
|
|||||||
"process-nextick-args": {
|
"process-nextick-args": {
|
||||||
"version": "1.0.7",
|
"version": "1.0.7",
|
||||||
"bundled": true,
|
"bundled": true,
|
||||||
"dev": true
|
"dev": true,
|
||||||
|
"optional": true
|
||||||
},
|
},
|
||||||
"punycode": {
|
"punycode": {
|
||||||
"version": "1.4.1",
|
"version": "1.4.1",
|
||||||
@ -3399,6 +3415,7 @@
|
|||||||
"version": "2.2.9",
|
"version": "2.2.9",
|
||||||
"bundled": true,
|
"bundled": true,
|
||||||
"dev": true,
|
"dev": true,
|
||||||
|
"optional": true,
|
||||||
"requires": {
|
"requires": {
|
||||||
"buffer-shims": "~1.0.0",
|
"buffer-shims": "~1.0.0",
|
||||||
"core-util-is": "~1.0.0",
|
"core-util-is": "~1.0.0",
|
||||||
@ -3450,7 +3467,8 @@
|
|||||||
"safe-buffer": {
|
"safe-buffer": {
|
||||||
"version": "5.0.1",
|
"version": "5.0.1",
|
||||||
"bundled": true,
|
"bundled": true,
|
||||||
"dev": true
|
"dev": true,
|
||||||
|
"optional": true
|
||||||
},
|
},
|
||||||
"semver": {
|
"semver": {
|
||||||
"version": "5.3.0",
|
"version": "5.3.0",
|
||||||
@ -3474,6 +3492,7 @@
|
|||||||
"version": "1.0.9",
|
"version": "1.0.9",
|
||||||
"bundled": true,
|
"bundled": true,
|
||||||
"dev": true,
|
"dev": true,
|
||||||
|
"optional": true,
|
||||||
"requires": {
|
"requires": {
|
||||||
"hoek": "2.x.x"
|
"hoek": "2.x.x"
|
||||||
}
|
}
|
||||||
@ -3507,6 +3526,7 @@
|
|||||||
"version": "1.0.2",
|
"version": "1.0.2",
|
||||||
"bundled": true,
|
"bundled": true,
|
||||||
"dev": true,
|
"dev": true,
|
||||||
|
"optional": true,
|
||||||
"requires": {
|
"requires": {
|
||||||
"code-point-at": "^1.0.0",
|
"code-point-at": "^1.0.0",
|
||||||
"is-fullwidth-code-point": "^1.0.0",
|
"is-fullwidth-code-point": "^1.0.0",
|
||||||
@ -3517,6 +3537,7 @@
|
|||||||
"version": "1.0.1",
|
"version": "1.0.1",
|
||||||
"bundled": true,
|
"bundled": true,
|
||||||
"dev": true,
|
"dev": true,
|
||||||
|
"optional": true,
|
||||||
"requires": {
|
"requires": {
|
||||||
"safe-buffer": "^5.0.1"
|
"safe-buffer": "^5.0.1"
|
||||||
}
|
}
|
||||||
@ -3545,6 +3566,7 @@
|
|||||||
"version": "2.2.1",
|
"version": "2.2.1",
|
||||||
"bundled": true,
|
"bundled": true,
|
||||||
"dev": true,
|
"dev": true,
|
||||||
|
"optional": true,
|
||||||
"requires": {
|
"requires": {
|
||||||
"block-stream": "*",
|
"block-stream": "*",
|
||||||
"fstream": "^1.0.2",
|
"fstream": "^1.0.2",
|
||||||
@ -3600,7 +3622,8 @@
|
|||||||
"util-deprecate": {
|
"util-deprecate": {
|
||||||
"version": "1.0.2",
|
"version": "1.0.2",
|
||||||
"bundled": true,
|
"bundled": true,
|
||||||
"dev": true
|
"dev": true,
|
||||||
|
"optional": true
|
||||||
},
|
},
|
||||||
"uuid": {
|
"uuid": {
|
||||||
"version": "3.0.1",
|
"version": "3.0.1",
|
||||||
|
@ -49,6 +49,8 @@ function _integrateRemoteStructHelper (y, struct) {
|
|||||||
decoder.pos = oldPos
|
decoder.pos = oldPos
|
||||||
if (missing.length === 0) {
|
if (missing.length === 0) {
|
||||||
y._readyToIntegrate.push(missingDef.struct)
|
y._readyToIntegrate.push(missingDef.struct)
|
||||||
|
} else {
|
||||||
|
// TODO: throw error here
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
20
src/YdbClient/NamedEventHandler.js
Normal file
20
src/YdbClient/NamedEventHandler.js
Normal file
@ -0,0 +1,20 @@
|
|||||||
|
|
||||||
|
import * as globals from './globals.js'
|
||||||
|
|
||||||
|
export const Class = class NamedEventHandler {
|
||||||
|
constructor () {
|
||||||
|
this.l = globals.createMap()
|
||||||
|
}
|
||||||
|
on (eventname, f) {
|
||||||
|
const l = this.l
|
||||||
|
let h = l.get(eventname)
|
||||||
|
if (h === undefined) {
|
||||||
|
h = globals.createSet()
|
||||||
|
l.set(eventname, h)
|
||||||
|
}
|
||||||
|
h.add(f)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export const fire = (handler, eventname, event) =>
|
||||||
|
handler.l.get(eventname).forEach(f => f(event))
|
@ -11,13 +11,48 @@ import BinaryDecoder from '../src/Util/Binary/Decoder.js'
|
|||||||
import { integrateRemoteStruct } from '../src/MessageHandler/integrateRemoteStructs.js'
|
import { integrateRemoteStruct } from '../src/MessageHandler/integrateRemoteStructs.js'
|
||||||
import { createMutualExclude } from '../src/Util/mutualExclude.js'
|
import { createMutualExclude } from '../src/Util/mutualExclude.js'
|
||||||
|
|
||||||
export class YdbClient {
|
import * as NamedEventHandler from './NamedEventHandler.js'
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @typedef RoomState
|
||||||
|
* @type {Object}
|
||||||
|
* @property {number} rsid room session id, -1 if unknown (created locally)
|
||||||
|
* @property {number} offset By server, -1 if unknown
|
||||||
|
* @property {number} cOffset current offset by client
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @typedef SyncState
|
||||||
|
* @type {Object}
|
||||||
|
* @property {boolean} upsynced True if all local updates have been sent to the server and the server confirmed that it received the update
|
||||||
|
* @property {boolean} downsynced True if the current session subscribed to the room, the server confirmed the subscription, and the initial data was received
|
||||||
|
* @property {boolean} persisted True if the server confirmed that it persisted all published data
|
||||||
|
*/
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
export class YdbClient extends NamedEventHandler.Class {
|
||||||
constructor (url, db) {
|
constructor (url, db) {
|
||||||
|
super()
|
||||||
this.url = url
|
this.url = url
|
||||||
this.ws = new WebSocket(url)
|
this.ws = new WebSocket(url)
|
||||||
this.rooms = new Map()
|
this.rooms = globals.createMap()
|
||||||
this.db = db
|
this.db = db
|
||||||
this.connected = false
|
this.connected = false
|
||||||
|
/**
|
||||||
|
* Set of room states. We try to keep it up in sync with idb, but this may fail due to concurrency with other windows.
|
||||||
|
* TODO: implement tests for this
|
||||||
|
* @type Map<string, RoomState>
|
||||||
|
*/
|
||||||
|
this.roomStates = globals.createMap()
|
||||||
|
/**
|
||||||
|
* Meta information about unconfirmed updates created by this client.
|
||||||
|
* Maps from confid to roomname
|
||||||
|
* @type Map<number, string>
|
||||||
|
*/
|
||||||
|
this.clientUnconfirmedStates = globals.createMap()
|
||||||
|
bc.subscribeYdbEvents(this)
|
||||||
initWS(this, this.ws)
|
initWS(this, this.ws)
|
||||||
}
|
}
|
||||||
/**
|
/**
|
||||||
@ -43,6 +78,15 @@ export class YdbClient {
|
|||||||
}))
|
}))
|
||||||
return y
|
return y
|
||||||
}
|
}
|
||||||
|
getRoomState (roomname) {
|
||||||
|
return bc.computeRoomState(this, bc.getUnconfirmedRooms(this), roomname)
|
||||||
|
}
|
||||||
|
getRoomStates () {
|
||||||
|
const unconfirmedRooms = bc.getUnconfirmedRooms(this)
|
||||||
|
const states = globals.createMap()
|
||||||
|
this.roomStates.forEach((rstate, roomname) => states.set(roomname, bc.computeRoomState(this, unconfirmedRooms, roomname)))
|
||||||
|
return states
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -61,21 +105,26 @@ const initWS = (ydb, ws) => {
|
|||||||
ws.onopen = () => {
|
ws.onopen = () => {
|
||||||
const t = idbactions.createTransaction(ydb.db)
|
const t = idbactions.createTransaction(ydb.db)
|
||||||
globals.pall([idbactions.getRoomMetas(t), idbactions.getUnconfirmedSubscriptions(t), idbactions.getUnconfirmedUpdates(t)]).then(([metas, us, unconfirmedUpdates]) => {
|
globals.pall([idbactions.getRoomMetas(t), idbactions.getUnconfirmedSubscriptions(t), idbactions.getUnconfirmedUpdates(t)]).then(([metas, us, unconfirmedUpdates]) => {
|
||||||
const subs = []
|
let subs = []
|
||||||
metas.forEach(meta => {
|
metas.forEach(meta => {
|
||||||
subs.push({
|
subs.push({
|
||||||
room: meta.room,
|
room: meta.room,
|
||||||
offset: meta.offset
|
offset: meta.offset,
|
||||||
|
rsid: meta.rsid
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
us.forEach(room => {
|
us.forEach(room => {
|
||||||
subs.push({
|
subs.push({
|
||||||
room, offset: 0
|
room, offset: 0, rsid: 0
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
subs = subs.filter(subdev => !ydb.roomStates.has(subdev.room)) // filter already subbed rooms
|
||||||
ydb.connected = true
|
ydb.connected = true
|
||||||
const encoder = encoding.createEncoder()
|
const encoder = encoding.createEncoder()
|
||||||
encoding.writeArrayBuffer(encoder, message.createSub(subs))
|
if (subs.length > 0) {
|
||||||
|
encoding.writeArrayBuffer(encoder, message.createSub(subs))
|
||||||
|
bc._broadcastYdbSyncingRoomsToServer(subs.map(subdev => subdev.room))
|
||||||
|
}
|
||||||
encoding.writeArrayBuffer(encoder, unconfirmedUpdates)
|
encoding.writeArrayBuffer(encoder, unconfirmedUpdates)
|
||||||
send(ydb, encoding.toBuffer(encoder))
|
send(ydb, encoding.toBuffer(encoder))
|
||||||
})
|
})
|
||||||
@ -113,7 +162,7 @@ export const clear = (dbNamespace = 'ydb') => idb.deleteDB(dbNamespace)
|
|||||||
* @param {YdbClient} ydb
|
* @param {YdbClient} ydb
|
||||||
* @param {ArrayBuffer} m
|
* @param {ArrayBuffer} m
|
||||||
*/
|
*/
|
||||||
export const send = (ydb, m) => ydb.connected && ydb.ws.send(m)
|
export const send = (ydb, m) => ydb.connected && m.byteLength !== 0 && ydb.ws.send(m)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param {YdbClient} ydb
|
* @param {YdbClient} ydb
|
||||||
@ -121,45 +170,45 @@ export const send = (ydb, m) => ydb.connected && ydb.ws.send(m)
|
|||||||
* @param {ArrayBuffer} update
|
* @param {ArrayBuffer} update
|
||||||
*/
|
*/
|
||||||
export const update = (ydb, room, update) => {
|
export const update = (ydb, room, update) => {
|
||||||
bc.publish(room, update)
|
bc.publishRoomData(room, update)
|
||||||
const t = idbactions.createTransaction(ydb.db)
|
const t = idbactions.createTransaction(ydb.db)
|
||||||
logging.log(`Write Unconfirmed Update. room "${room}", ${JSON.stringify(update)}`)
|
logging.log(`Write Unconfirmed Update. room "${room}", ${JSON.stringify(update)}`)
|
||||||
return idbactions.writeClientUnconfirmed(t, room, update).then(clientConf => {
|
return idbactions.writeClientUnconfirmed(t, room, update).then(clientConf => {
|
||||||
logging.log(`Send Unconfirmed Update. connected ${ydb.connected} room "${room}", clientConf ${clientConf}`)
|
logging.log(`Send Unconfirmed Update. connected ${ydb.connected} room "${room}", clientConf ${clientConf}`)
|
||||||
|
bc._broadcastYdbCUConfCreated(clientConf, room)
|
||||||
send(ydb, message.createUpdate(room, update, clientConf))
|
send(ydb, message.createUpdate(room, update, clientConf))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
export const subscribe = (ydb, room, f) => {
|
export const subscribe = (ydb, room, f) => {
|
||||||
bc.subscribe(room, f)
|
bc.subscribeRoomData(room, f)
|
||||||
const t = idbactions.createTransaction(ydb.db)
|
const t = idbactions.createTransaction(ydb.db)
|
||||||
|
if (!ydb.roomStates.has(room)) {
|
||||||
|
subscribeRooms(ydb, [room])
|
||||||
|
}
|
||||||
idbactions.getRoomData(t, room).then(data => {
|
idbactions.getRoomData(t, room).then(data => {
|
||||||
if (data.byteLength > 0) {
|
if (data.byteLength > 0) {
|
||||||
f(data)
|
f(data)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
idbactions.getRoomMeta(t, room).then(meta => {
|
|
||||||
if (meta === undefined) {
|
|
||||||
logging.log(`Send Subscribe. room "${room}", offset ${0}`)
|
|
||||||
// TODO: maybe set prelim meta value so we don't sub twice
|
|
||||||
send(ydb, message.createSub([{ room, offset: 0 }]))
|
|
||||||
idbactions.writeUnconfirmedSubscription(t, room)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
export const subscribeRooms = (ydb, rooms) => {
|
export const subscribeRooms = (ydb, rooms) => {
|
||||||
const t = idbactions.createTransaction(ydb.db)
|
const t = idbactions.createTransaction(ydb.db)
|
||||||
const subs = []
|
let subs = []
|
||||||
|
// TODO: try not to do too many single calls. Implement getRoomMetas(t, rooms) or retrieve all metas once and store them on ydb
|
||||||
|
// TODO: find out performance of getRoomMetas with all metas
|
||||||
return globals.pall(rooms.map(room => idbactions.getRoomMeta(t, room).then(meta => {
|
return globals.pall(rooms.map(room => idbactions.getRoomMeta(t, room).then(meta => {
|
||||||
if (meta === undefined) {
|
if (meta === undefined) {
|
||||||
subs.push(room)
|
subs.push(room)
|
||||||
return idbactions.writeUnconfirmedSubscription(t, room)
|
return idbactions.writeUnconfirmedSubscription(t, room)
|
||||||
}
|
}
|
||||||
}))).then(() => {
|
}))).then(() => {
|
||||||
|
subs = subs.filter(room => !ydb.roomStates.has(room))
|
||||||
// write all sub messages when all unconfirmed subs are writted to idb
|
// write all sub messages when all unconfirmed subs are writted to idb
|
||||||
if (subs.length > 0) {
|
if (subs.length > 0) {
|
||||||
send(ydb, message.createSub(rooms.map(room => ({room, offset: 0}))))
|
send(ydb, message.createSub(subs.map(room => ({room, offset: 0, rsid: 0}))))
|
||||||
|
bc._broadcastYdbSyncingRoomsToServer(subs)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -2,29 +2,285 @@
|
|||||||
|
|
||||||
import * as decoding from './decoding.js'
|
import * as decoding from './decoding.js'
|
||||||
import * as encoding from './encoding.js'
|
import * as encoding from './encoding.js'
|
||||||
|
import * as globals from './globals.js'
|
||||||
|
import * as NamedEventHandler from './NamedEventHandler.js'
|
||||||
|
|
||||||
const bc = new BroadcastChannel('ydb-client')
|
const bc = new BroadcastChannel('ydb-client')
|
||||||
const subs = new Map()
|
/**
|
||||||
|
* @type {Map<string, Set<Function>>}
|
||||||
|
*/
|
||||||
|
const datasubs = globals.createMap()
|
||||||
|
/**
|
||||||
|
* @type {Set<any>} Set of Ydb instances
|
||||||
|
*/
|
||||||
|
const ydbinstances = globals.createSet()
|
||||||
|
|
||||||
bc.onmessage = event => {
|
const bcRoomDataMessage = 0
|
||||||
const decoder = decoding.createDecoder(event.data)
|
const bcYdbCUConfCreated = 1
|
||||||
const room = decoding.readVarString(decoder)
|
const bcYdbCUConfConfirmed = 2
|
||||||
const update = decoding.readTail(decoder)
|
const bcYdbRemoteOffsetReceived = 3
|
||||||
const rsubs = subs.get(room)
|
const bcYdbRemoteOffsetConfirmed = 4
|
||||||
if (rsubs !== undefined) {
|
const bcYdbSyncingRoomsToServer = 5
|
||||||
rsubs.forEach(f => f(update))
|
const bcYdbSyncFromServer = 6
|
||||||
|
|
||||||
|
export const getUnconfirmedRooms = ydb => {
|
||||||
|
const unconfirmedRooms = globals.createSet()
|
||||||
|
ydb.clientUnconfirmedStates.forEach(room => unconfirmedRooms.add(room))
|
||||||
|
return unconfirmedRooms
|
||||||
|
}
|
||||||
|
|
||||||
|
export const computeRoomState = (ydb, unconfirmedRooms, room) => {
|
||||||
|
// state is a RoomState, defined in YdbClient.js
|
||||||
|
const state = ydb.roomStates.get(room)
|
||||||
|
if (state === undefined) {
|
||||||
|
return {
|
||||||
|
upsynced: false,
|
||||||
|
downsynced: false,
|
||||||
|
persisted: false
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
return {
|
||||||
|
upsynced: !unconfirmedRooms.has(room),
|
||||||
|
downsynced: state.offset >= 0 && state.coffset >= state.offset,
|
||||||
|
persisted: state.coffset === state.offset && state.offset >= 0 && !unconfirmedRooms.has(room)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let roomStatesUpdating = []
|
||||||
|
const fireRoomStateUpdate = (ydb, room) => {
|
||||||
|
roomStatesUpdating.push(room)
|
||||||
|
if (roomStatesUpdating.length === 1) {
|
||||||
|
// first time this is called, trigger actual publisher
|
||||||
|
// setTimeout(() => {
|
||||||
|
const updated = new Map()
|
||||||
|
const unconfirmedRooms = getUnconfirmedRooms(ydb)
|
||||||
|
roomStatesUpdating.forEach(room => {
|
||||||
|
if (!updated.has(room)) {
|
||||||
|
updated.set(room, computeRoomState(ydb, unconfirmedRooms, room))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
NamedEventHandler.fire(ydb, 'syncstate', {
|
||||||
|
updated
|
||||||
|
})
|
||||||
|
roomStatesUpdating = []
|
||||||
|
// }, 0)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const receiveBCData = data => {
|
||||||
|
const decoder = decoding.createDecoder(data)
|
||||||
|
while (decoding.hasContent(decoder)) {
|
||||||
|
const messageType = decoding.readVarUint(decoder)
|
||||||
|
switch (messageType) {
|
||||||
|
case bcRoomDataMessage: {
|
||||||
|
const room = decoding.readVarString(decoder)
|
||||||
|
const update = decoding.readTail(decoder)
|
||||||
|
const rsubs = datasubs.get(room)
|
||||||
|
if (rsubs !== undefined) {
|
||||||
|
rsubs.forEach(f => f(update))
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
case bcYdbCUConfCreated: {
|
||||||
|
const confid = decoding.readVarUint(decoder)
|
||||||
|
const room = decoding.readVarString(decoder)
|
||||||
|
ydbinstances.forEach(ydb => {
|
||||||
|
ydb.clientUnconfirmedStates.set(confid, room)
|
||||||
|
fireRoomStateUpdate(ydb, room)
|
||||||
|
})
|
||||||
|
break
|
||||||
|
}
|
||||||
|
case bcYdbCUConfConfirmed: {
|
||||||
|
const confid = decoding.readVarUint(decoder)
|
||||||
|
const offset = decoding.readVarUint(decoder)
|
||||||
|
ydbinstances.forEach(ydb => {
|
||||||
|
const room = ydb.clientUnconfirmedStates.get(confid)
|
||||||
|
if (room !== undefined) {
|
||||||
|
ydb.clientUnconfirmedStates.delete(confid)
|
||||||
|
const state = ydb.roomStates.get(room)
|
||||||
|
if (state.coffset < offset) {
|
||||||
|
state.coffset = offset
|
||||||
|
}
|
||||||
|
fireRoomStateUpdate(ydb, room)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
break
|
||||||
|
}
|
||||||
|
case bcYdbRemoteOffsetReceived: {
|
||||||
|
const len = decoding.readVarUint(decoder)
|
||||||
|
for (let i = 0; i < len; i++) {
|
||||||
|
const room = decoding.readVarString(decoder)
|
||||||
|
const offset = decoding.readVarUint(decoder)
|
||||||
|
ydbinstances.forEach(ydb => {
|
||||||
|
// this is only called when an update is received
|
||||||
|
// so roomState.get(room) should exist
|
||||||
|
const state = ydb.roomStates.get(room)
|
||||||
|
if (state.coffset < offset) {
|
||||||
|
state.coffset = offset
|
||||||
|
}
|
||||||
|
fireRoomStateUpdate(ydb, room)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
case bcYdbRemoteOffsetConfirmed: {
|
||||||
|
const len = decoding.readVarUint(decoder)
|
||||||
|
for (let i = 0; i < len; i++) {
|
||||||
|
const room = decoding.readVarString(decoder)
|
||||||
|
const offset = decoding.readVarUint(decoder)
|
||||||
|
ydbinstances.forEach(ydb => {
|
||||||
|
const state = ydb.roomStates.get(room)
|
||||||
|
state.offset = offset
|
||||||
|
fireRoomStateUpdate(ydb, room)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
case bcYdbSyncingRoomsToServer: {
|
||||||
|
const len = decoding.readVarUint(decoder)
|
||||||
|
for (let i = 0; i < len; i++) {
|
||||||
|
const room = decoding.readVarString(decoder)
|
||||||
|
ydbinstances.forEach(ydb => {
|
||||||
|
const state = ydb.roomStates.get(room)
|
||||||
|
if (state === undefined) {
|
||||||
|
ydb.roomStates.set(room, {
|
||||||
|
rsid: -1,
|
||||||
|
offset: -1,
|
||||||
|
coffset: 0
|
||||||
|
})
|
||||||
|
fireRoomStateUpdate(ydb, room)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
case bcYdbSyncFromServer: {
|
||||||
|
const len = decoding.readVarUint(decoder)
|
||||||
|
for (let i = 0; i < len; i++) {
|
||||||
|
const room = decoding.readVarString(decoder)
|
||||||
|
const offset = decoding.readVarUint(decoder)
|
||||||
|
const rsid = decoding.readVarUint(decoder)
|
||||||
|
ydbinstances.forEach(ydb => {
|
||||||
|
const state = ydb.roomStates.get(room)
|
||||||
|
state.offset = offset
|
||||||
|
state.rsid = rsid
|
||||||
|
fireRoomStateUpdate(ydb, room)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
globals.error('Unexpected bc message type')
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bc.onmessage = event => receiveBCData(event.data)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Publish to all, including self
|
||||||
|
* @param {encoding.Encoder} encoder
|
||||||
|
*/
|
||||||
|
export const publishAll = encoder => {
|
||||||
|
const buffer = encoding.toBuffer(encoder)
|
||||||
|
bc.postMessage(buffer)
|
||||||
|
receiveBCData(buffer)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Call this when update was created by this user and confid was created
|
||||||
|
* @param {number} cconf
|
||||||
|
* @param {string} roomname
|
||||||
|
*/
|
||||||
|
export const _broadcastYdbCUConfCreated = (cconf, roomname) => {
|
||||||
|
const encoder = encoding.createEncoder()
|
||||||
|
encoding.writeVarUint(encoder, bcYdbCUConfCreated)
|
||||||
|
encoding.writeVarUint(encoder, cconf)
|
||||||
|
encoding.writeVarString(encoder, roomname)
|
||||||
|
publishAll(encoder)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Call this when user confid was confirmed by host
|
||||||
|
* @param {number} cconf
|
||||||
|
* @param {number} offset The conf-offset of the client-created offset
|
||||||
|
*/
|
||||||
|
export const _broadcastYdbCUConfConfirmed = (cconf, offset) => {
|
||||||
|
const encoder = encoding.createEncoder()
|
||||||
|
encoding.writeVarUint(encoder, bcYdbCUConfConfirmed)
|
||||||
|
encoding.writeVarUint(encoder, cconf)
|
||||||
|
encoding.writeVarUint(encoder, offset)
|
||||||
|
publishAll(encoder)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Call this when remote update is received (thus host has increased, but not confirmed, the offset)
|
||||||
|
* @param {Array<Object>} subs sub is { room, offset }
|
||||||
|
*/
|
||||||
|
export const _broadcastYdbRemoteOffsetReceived = subs => {
|
||||||
|
const encoder = encoding.createEncoder()
|
||||||
|
encoding.writeVarUint(encoder, bcYdbRemoteOffsetReceived)
|
||||||
|
encoding.writeVarUint(encoder, subs.length)
|
||||||
|
subs.forEach(sub => {
|
||||||
|
encoding.writeVarString(encoder, sub.room)
|
||||||
|
encoding.writeVarUint(encoder, sub.offset)
|
||||||
|
})
|
||||||
|
publishAll(encoder)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param {Array<Object>} subs sub is { room, offset }
|
||||||
|
*/
|
||||||
|
export const _broadcastYdbRemoteOffsetConfirmed = subs => {
|
||||||
|
const encoder = encoding.createEncoder()
|
||||||
|
encoding.writeVarUint(encoder, bcYdbRemoteOffsetConfirmed)
|
||||||
|
encoding.writeVarUint(encoder, subs.length)
|
||||||
|
subs.forEach(sub => {
|
||||||
|
encoding.writeVarString(encoder, sub.room)
|
||||||
|
encoding.writeVarUint(encoder, sub.offset)
|
||||||
|
})
|
||||||
|
publishAll(encoder)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Call this when a subscription is created
|
||||||
|
* @param {Array<string>} rooms
|
||||||
|
*/
|
||||||
|
export const _broadcastYdbSyncingRoomsToServer = rooms => {
|
||||||
|
const encoder = encoding.createEncoder()
|
||||||
|
encoding.writeVarUint(encoder, bcYdbSyncingRoomsToServer)
|
||||||
|
encoding.writeVarUint(encoder, rooms.length)
|
||||||
|
rooms.forEach(room => {
|
||||||
|
encoding.writeVarString(encoder, room)
|
||||||
|
})
|
||||||
|
publishAll(encoder)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Call this when sync confirmed by host
|
||||||
|
* @param {Array<Object>} subs sub is {room, offset, rsid}
|
||||||
|
*/
|
||||||
|
export const _broadcastYdbSyncFromServer = subs => {
|
||||||
|
const encoder = encoding.createEncoder()
|
||||||
|
encoding.writeVarUint(encoder, bcYdbSyncFromServer)
|
||||||
|
encoding.writeVarUint(encoder, subs.length)
|
||||||
|
subs.forEach(sub => {
|
||||||
|
encoding.writeVarString(encoder, sub.room)
|
||||||
|
encoding.writeVarUint(encoder, sub.offset)
|
||||||
|
encoding.writeVarUint(encoder, sub.rsid)
|
||||||
|
})
|
||||||
|
publishAll(encoder)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param {string} room
|
* @param {string} room
|
||||||
* @param {function(ArrayBuffer)} f
|
* @param {function(ArrayBuffer)} f
|
||||||
*/
|
*/
|
||||||
export const subscribe = (room, f) => {
|
export const subscribeRoomData = (room, f) => {
|
||||||
let rsubs = subs.get(room)
|
let rsubs = datasubs.get(room)
|
||||||
if (rsubs === undefined) {
|
if (rsubs === undefined) {
|
||||||
rsubs = new Set()
|
rsubs = new Set()
|
||||||
subs.set(room, rsubs)
|
datasubs.set(room, rsubs)
|
||||||
}
|
}
|
||||||
rsubs.add(f)
|
rsubs.add(f)
|
||||||
}
|
}
|
||||||
@ -33,13 +289,17 @@ export const subscribe = (room, f) => {
|
|||||||
* @param {string} room
|
* @param {string} room
|
||||||
* @param {ArrayBuffer} update
|
* @param {ArrayBuffer} update
|
||||||
*/
|
*/
|
||||||
export const publish = (room, update) => {
|
export const publishRoomData = (room, update) => {
|
||||||
const encoder = encoding.createEncoder()
|
const encoder = encoding.createEncoder()
|
||||||
encoding.writeVarString(encoder, room)
|
encoding.writeVarString(encoder, room)
|
||||||
encoding.writeArrayBuffer(encoder, update)
|
encoding.writeArrayBuffer(encoder, update)
|
||||||
bc.postMessage(encoding.toBuffer(encoder))
|
bc.postMessage(encoding.toBuffer(encoder))
|
||||||
const rsubs = subs.get(room)
|
// call subs directly here instead of calling receivedBCData
|
||||||
|
const rsubs = datasubs.get(room)
|
||||||
if (rsubs !== undefined) {
|
if (rsubs !== undefined) {
|
||||||
rsubs.forEach(f => f(update))
|
rsubs.forEach(f => f(update))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export const subscribeYdbEvents = ydb =>
|
||||||
|
ydbinstances.add(ydb)
|
||||||
|
@ -213,7 +213,7 @@ export const writeConfirmedByHost = (t, room, offset) => {
|
|||||||
encoding.writeArrayBuffer(dataEncoder, value)
|
encoding.writeArrayBuffer(dataEncoder, value)
|
||||||
}
|
}
|
||||||
}).then(() => {
|
}).then(() => {
|
||||||
globals.pall([idb.put(co, encodeMetaValue(meta.roomsid, offset), getCoMetaKey(room)), idb.put(co, encoding.toBuffer(dataEncoder), getCoDataKey(room)), idb.del(hu, huKeyRange)])
|
globals.pall([idb.put(co, encodeMetaValue(meta.rsid, offset), getCoMetaKey(room)), idb.put(co, encoding.toBuffer(dataEncoder), getCoDataKey(room)), idb.del(hu, huKeyRange)])
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -222,7 +222,7 @@ export const writeConfirmedByHost = (t, room, offset) => {
|
|||||||
* @typedef RoomMeta
|
* @typedef RoomMeta
|
||||||
* @type {Object}
|
* @type {Object}
|
||||||
* @property {string} room
|
* @property {string} room
|
||||||
* @property {number} roomsid Room session id
|
* @property {number} rsid Room session id
|
||||||
* @property {number} offset Received offsets (including offsets that are not yet confirmed)
|
* @property {number} offset Received offsets (including offsets that are not yet confirmed)
|
||||||
*/
|
*/
|
||||||
|
|
||||||
@ -233,23 +233,49 @@ export const writeConfirmedByHost = (t, room, offset) => {
|
|||||||
* @return {Promise<Array<RoomMeta>>}
|
* @return {Promise<Array<RoomMeta>>}
|
||||||
*/
|
*/
|
||||||
export const getRoomMetas = t => {
|
export const getRoomMetas = t => {
|
||||||
const hu = getStoreHU(t)
|
// const result = []
|
||||||
const result = []
|
const storeCo = getStoreCo(t)
|
||||||
|
const coQuery = idb.createIDBKeyRangeLowerBound('meta:', false)
|
||||||
|
return globals.pall([idb.getAll(storeCo, coQuery), idb.getAllKeys(storeCo, coQuery)]).then(([metaValues, metaKeys]) => globals.pall(metaValues.map((metavalue, i) => {
|
||||||
|
const room = metaKeys[i].slice(5)
|
||||||
|
const { rsid, offset } = decodeMetaValue(metavalue)
|
||||||
|
return {
|
||||||
|
room,
|
||||||
|
rsid,
|
||||||
|
offset: offset
|
||||||
|
}
|
||||||
|
})))
|
||||||
|
/*
|
||||||
return idb.iterate(getStoreCo(t), idb.createIDBKeyRangeLowerBound('meta:', false), (metavalue, metakey) =>
|
return idb.iterate(getStoreCo(t), idb.createIDBKeyRangeLowerBound('meta:', false), (metavalue, metakey) =>
|
||||||
idb.getAllKeys(hu, idb.createIDBKeyRangeBound(encodeHUKey(metakey.slice(5), 0), encodeHUKey(metakey.slice(5), 2 ** 32), false, false)).then(keys => {
|
idb.getAllKeys(hu, idb.createIDBKeyRangeBound(encodeHUKey(metakey.slice(5), 0), encodeHUKey(metakey.slice(5), 2 ** 32), false, false)).then(keys => {
|
||||||
const { roomsid, offset } = decodeMetaValue(metavalue)
|
const { rsid, offset } = decodeMetaValue(metavalue)
|
||||||
result.push({
|
result.push({
|
||||||
room: metakey.slice(5),
|
room: metakey.slice(5),
|
||||||
roomsid,
|
rsid,
|
||||||
offset: keys.reduce((cur, key) => globals.max(decodeHUKey(key).offset, cur), offset)
|
offset: keys.reduce((cur, key) => globals.max(decodeHUKey(key).offset, cur), offset)
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
).then(() => globals.presolve(result))
|
).then(() => globals.presolve(result))
|
||||||
|
*/
|
||||||
}
|
}
|
||||||
|
|
||||||
export const getRoomMeta = (t, room) =>
|
export const getRoomMeta = (t, room) =>
|
||||||
idb.get(getStoreCo(t), getCoMetaKey(room))
|
idb.get(getStoreCo(t), getCoMetaKey(room))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get all data from idb, excluding unconfirmed updates.
|
||||||
|
* TODO: include updates in CU
|
||||||
|
* @param {IDBTransaction} t
|
||||||
|
* @param {string} room
|
||||||
|
* @return {Promise<ArrayBuffer>}
|
||||||
|
*/
|
||||||
|
export const getRoomDataWithoutCU = (t, room) => globals.pall([idb.get(getStoreCo(t), 'data:' + room), idb.getAll(getStoreHU(t), idb.createIDBKeyRangeBound(encodeHUKey(room, 0), encodeHUKey(room, 2 ** 32), false, false))]).then(([data, updates]) => {
|
||||||
|
const encoder = encoding.createEncoder()
|
||||||
|
encoding.writeArrayBuffer(encoder, data || new Uint8Array(0))
|
||||||
|
updates.forEach(update => encoding.writeArrayBuffer(encoder, update))
|
||||||
|
return encoding.toBuffer(encoder)
|
||||||
|
})
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get all data from idb, including unconfirmed updates.
|
* Get all data from idb, including unconfirmed updates.
|
||||||
* TODO: include updates in CU
|
* TODO: include updates in CU
|
||||||
@ -257,57 +283,100 @@ export const getRoomMeta = (t, room) =>
|
|||||||
* @param {string} room
|
* @param {string} room
|
||||||
* @return {Promise<ArrayBuffer>}
|
* @return {Promise<ArrayBuffer>}
|
||||||
*/
|
*/
|
||||||
export const getRoomData = (t, room) => globals.pall([idb.get(getStoreCo(t), 'data:' + room), idb.getAll(getStoreHU(t), idb.createIDBKeyRangeBound(encodeHUKey(room, 0), encodeHUKey(room, 2 ** 32), false, false))]).then(([data, updates]) => {
|
export const getRoomData = (t, room) => globals.pall([idb.get(getStoreCo(t), 'data:' + room), idb.getAll(getStoreHU(t), idb.createIDBKeyRangeBound(encodeHUKey(room, 0), encodeHUKey(room, 2 ** 32), false, false)), idb.getAll(getStoreCU(t))]).then(([data, updates, cuUpdates]) => {
|
||||||
const encoder = encoding.createEncoder()
|
const encoder = encoding.createEncoder()
|
||||||
encoding.writeArrayBuffer(encoder, data || new Uint8Array(0))
|
encoding.writeArrayBuffer(encoder, data || new Uint8Array(0))
|
||||||
updates.forEach(update => encoding.writeArrayBuffer(encoder, update))
|
updates.forEach(update => encoding.writeArrayBuffer(encoder, update))
|
||||||
|
cuUpdates.forEach(roomAndUpdate => {
|
||||||
|
const decoder = decoding.createDecoder(roomAndUpdate)
|
||||||
|
if (decoding.readVarString(decoder) === room) {
|
||||||
|
encoding.writeArrayBuffer(encoder, decoding.readTail(decoder))
|
||||||
|
}
|
||||||
|
})
|
||||||
return encoding.toBuffer(encoder)
|
return encoding.toBuffer(encoder)
|
||||||
})
|
})
|
||||||
|
|
||||||
const decodeMetaValue = buffer => {
|
const decodeMetaValue = buffer => {
|
||||||
const decoder = decoding.createDecoder(buffer)
|
const decoder = decoding.createDecoder(buffer)
|
||||||
const roomsid = decoding.readVarUint(decoder)
|
const rsid = decoding.readVarUint(decoder)
|
||||||
const offset = decoding.readVarUint(decoder)
|
const offset = decoding.readVarUint(decoder)
|
||||||
return {
|
return {
|
||||||
roomsid, offset
|
rsid, offset
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
/**
|
/**
|
||||||
* @param {number} roomsid room session id
|
* @param {number} rsid room session id
|
||||||
* @param {number} offset
|
* @param {number} offset
|
||||||
* @return {ArrayBuffer}
|
* @return {ArrayBuffer}
|
||||||
*/
|
*/
|
||||||
const encodeMetaValue = (roomsid, offset) => {
|
const encodeMetaValue = (rsid, offset) => {
|
||||||
const encoder = encoding.createEncoder()
|
const encoder = encoding.createEncoder()
|
||||||
encoding.writeVarUint(encoder, roomsid)
|
encoding.writeVarUint(encoder, rsid)
|
||||||
encoding.writeVarUint(encoder, offset)
|
encoding.writeVarUint(encoder, offset)
|
||||||
return encoding.toBuffer(encoder)
|
return encoding.toBuffer(encoder)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const writeInitialCoEntry = (t, room, roomsessionid, offset) => globals.pall([
|
||||||
|
idb.put(getStoreCo(t), encodeMetaValue(roomsessionid, offset), getCoMetaKey(room)),
|
||||||
|
idb.put(getStoreCo(t), globals.createArrayBufferFromArray([]), getCoDataKey(room))
|
||||||
|
])
|
||||||
|
|
||||||
|
const _confirmSub = (t, metaval, sub) => {
|
||||||
|
if (metaval === undefined) {
|
||||||
|
return writeInitialCoEntry(t, sub.room, sub.rsid, sub.offset).then(() => idb.del(getStoreUS(t), sub.room)).then(() => null)
|
||||||
|
}
|
||||||
|
const meta = decodeMetaValue(metaval)
|
||||||
|
if (meta.rsid !== sub.rsid) {
|
||||||
|
// TODO: Yjs sync with server here
|
||||||
|
// get all room data (without CU) and save it as a client update. Then remove all data
|
||||||
|
return getRoomDataWithoutCU(t, sub.room)
|
||||||
|
.then(roomdata =>
|
||||||
|
writeClientUnconfirmed(t, sub.room, roomdata)
|
||||||
|
.then(clientConf => message.createUpdate(sub.room, roomdata, clientConf))
|
||||||
|
.then(update =>
|
||||||
|
writeInitialCoEntry(t, sub.room, sub.rsid, sub.offset).then(() => update)
|
||||||
|
)
|
||||||
|
)
|
||||||
|
} else if (meta.offset < sub.offset) {
|
||||||
|
return writeConfirmedByHost(t, sub.room, sub.offset).then(() => null)
|
||||||
|
} else {
|
||||||
|
// nothing needs to happen
|
||||||
|
return null
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @typedef Sub
|
||||||
|
* @type {Object}
|
||||||
|
* @property {string} room room name
|
||||||
|
* @property {number} rsid room session id
|
||||||
|
* @property {number} offset
|
||||||
|
*/
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set the initial room data. Overwrites initial data if there is any!
|
* Set the initial room data. Overwrites initial data if there is any!
|
||||||
* @param {IDBTransaction} t
|
* @param {IDBTransaction} t
|
||||||
* @param {string} room
|
* @param {Sub} sub
|
||||||
* @param {number} roomsessionid
|
* @return {Promise<ArrayBuffer?>} Message to send to server
|
||||||
* @param {number} offset
|
|
||||||
* @return {Promise<void>}
|
|
||||||
*/
|
*/
|
||||||
export const confirmSubscription = (t, room, roomsessionid, offset) => idb.get(getStoreCo(t), getCoMetaKey(room)).then(metaval => {
|
export const confirmSubscription = (t, sub) => idb.get(getStoreCo(t), getCoMetaKey(sub.room)).then(metaval => _confirmSub(t, metaval, sub))
|
||||||
if (metaval === undefined) {
|
|
||||||
return globals.pall([
|
export const confirmSubscriptions = (t, subs) => idb.getAllKeysValues(getStoreCo(t), idb.createIDBKeyRangeLowerBound('meta:', false)).then(kvs => {
|
||||||
idb.put(getStoreCo(t), encodeMetaValue(roomsessionid, offset), getCoMetaKey(room)),
|
const ps = []
|
||||||
idb.put(getStoreCo(t), globals.createArrayBufferFromArray([]), getCoDataKey(room))
|
const subMap = new Map()
|
||||||
]).then(() => idb.del(getStoreUS(t), room))
|
subs.forEach(sub => subMap.set(sub.room, sub))
|
||||||
}
|
for (let i = 0, len = kvs.length; i < len; i++) {
|
||||||
const meta = decodeMetaValue(metaval)
|
const kv = kvs[i]
|
||||||
if (meta.roomsid !== roomsessionid) {
|
const kvroom = kv.k.slice(5)
|
||||||
// TODO: upload all unconfirmed updates
|
const exSub = subMap.get(kvroom)
|
||||||
// or do a Yjs sync with server
|
if (exSub !== undefined) {
|
||||||
} else if (meta.roomsid < offset) {
|
subMap.delete(kvroom)
|
||||||
return writeConfirmedByHost(t, room, offset)
|
ps.push(_confirmSub(t, kv.v, exSub))
|
||||||
} else {
|
}
|
||||||
// nothing needs to happen
|
|
||||||
}
|
}
|
||||||
|
// all remaining elements in subMap do not exist yet in Co.
|
||||||
|
subMap.forEach(nonexSub => ps.push(_confirmSub(t, undefined, nonexSub)))
|
||||||
|
return ps
|
||||||
})
|
})
|
||||||
|
|
||||||
export const writeUnconfirmedSubscription = (t, room) => idb.put(getStoreUS(t), true, room)
|
export const writeUnconfirmedSubscription = (t, room) => idb.put(getStoreUS(t), true, room)
|
||||||
|
@ -12,7 +12,7 @@ idbactions.deleteDB().then(() => idbactions.openDB()).then(db => {
|
|||||||
await idbactions.writeConfirmedByHost(t, testname, 4)
|
await idbactions.writeConfirmedByHost(t, testname, 4)
|
||||||
const metas = await idbactions.getRoomMetas(t)
|
const metas = await idbactions.getRoomMetas(t)
|
||||||
const roommeta = metas.find(meta => meta.room === testname)
|
const roommeta = metas.find(meta => meta.room === testname)
|
||||||
if (roommeta == null || roommeta.offset !== 4 || roommeta.roomsid !== 42) {
|
if (roommeta == null || roommeta.offset !== 4 || roommeta.rsid !== 42) {
|
||||||
throw globals.error()
|
throw globals.error()
|
||||||
}
|
}
|
||||||
const data = await idbactions.getRoomData(t, testname)
|
const data = await idbactions.getRoomData(t, testname)
|
||||||
|
@ -27,25 +27,35 @@ export const readMessage = (ydb, message) => {
|
|||||||
const update = decoding.readPayload(decoder)
|
const update = decoding.readPayload(decoder)
|
||||||
logging.log(`Received Update. room "${room}", offset ${offset}`)
|
logging.log(`Received Update. room "${room}", offset ${offset}`)
|
||||||
idbactions.writeHostUnconfirmed(t, room, offset, update)
|
idbactions.writeHostUnconfirmed(t, room, offset, update)
|
||||||
bc.publish(room, update)
|
bc.publishRoomData(room, update)
|
||||||
|
bc._broadcastYdbRemoteOffsetReceived([{ room, offset }])
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
case MESSAGE_SUB_CONF: {
|
case MESSAGE_SUB_CONF: {
|
||||||
const nSubs = decoding.readVarUint(decoder)
|
const nSubs = decoding.readVarUint(decoder)
|
||||||
|
const subs = []
|
||||||
for (let i = 0; i < nSubs; i++) {
|
for (let i = 0; i < nSubs; i++) {
|
||||||
const room = decoding.readVarString(decoder)
|
const room = decoding.readVarString(decoder)
|
||||||
const offset = decoding.readVarUint(decoder)
|
const offset = decoding.readVarUint(decoder)
|
||||||
const roomsid = decoding.readVarUint(decoder) // TODO: SID
|
const rsid = decoding.readVarUint(decoder)
|
||||||
// logging.log(`Received Sub Conf. room "${room}", offset ${offset}, roomsid ${roomsid}`)
|
subs.push({
|
||||||
idbactions.confirmSubscription(t, room, roomsid, offset)
|
room, offset, rsid
|
||||||
|
})
|
||||||
|
}
|
||||||
|
bc._broadcastYdbSyncFromServer(subs)
|
||||||
|
if (nSubs < 500) {
|
||||||
|
subs.map(sub => idbactions.confirmSubscription(t, sub))
|
||||||
|
} else {
|
||||||
|
idbactions.confirmSubscriptions(t, subs)
|
||||||
}
|
}
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
case MESSAGE_CONFIRMATION: {
|
case MESSAGE_CONFIRMATION: { // TODO: duplicate with MESSAGE_CONFIRMED_BY_HOST!
|
||||||
const room = decoding.readVarString(decoder)
|
const room = decoding.readVarString(decoder)
|
||||||
const offset = decoding.readVarUint(decoder)
|
const offset = decoding.readVarUint(decoder)
|
||||||
logging.log(`Received Confirmation. room "${room}", offset ${offset}`)
|
logging.log(`Received Confirmation. room "${room}", offset ${offset}`)
|
||||||
idbactions.writeConfirmedByHost(t, room, offset)
|
idbactions.writeConfirmedByHost(t, room, offset)
|
||||||
|
bc._broadcastYdbRemoteOffsetConfirmed([{ room, offset }])
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
case MESSAGE_HOST_UNCONFIRMED_BY_CLIENT: {
|
case MESSAGE_HOST_UNCONFIRMED_BY_CLIENT: {
|
||||||
@ -53,6 +63,7 @@ export const readMessage = (ydb, message) => {
|
|||||||
const offset = decoding.readVarUint(decoder)
|
const offset = decoding.readVarUint(decoder)
|
||||||
logging.log(`Received HostUnconfirmedByClient. clientConf "${clientConf}", offset ${offset}`)
|
logging.log(`Received HostUnconfirmedByClient. clientConf "${clientConf}", offset ${offset}`)
|
||||||
idbactions.writeHostUnconfirmedByClient(t, clientConf, offset)
|
idbactions.writeHostUnconfirmedByClient(t, clientConf, offset)
|
||||||
|
bc._broadcastYdbCUConfConfirmed(clientConf, offset)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
case MESSAGE_CONFIRMED_BY_HOST: {
|
case MESSAGE_CONFIRMED_BY_HOST: {
|
||||||
@ -60,6 +71,7 @@ export const readMessage = (ydb, message) => {
|
|||||||
const offset = decoding.readVarUint(decoder)
|
const offset = decoding.readVarUint(decoder)
|
||||||
logging.log(`Received Confirmation By Host. room "${room}", offset ${offset}`)
|
logging.log(`Received Confirmation By Host. room "${room}", offset ${offset}`)
|
||||||
idbactions.writeConfirmedByHost(t, room, offset)
|
idbactions.writeConfirmedByHost(t, room, offset)
|
||||||
|
bc._broadcastYdbRemoteOffsetConfirmed([{ room, offset }])
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
@ -88,6 +100,7 @@ export const createUpdate = (room, update, clientConf) => {
|
|||||||
* @type {Object}
|
* @type {Object}
|
||||||
* @property {string} room
|
* @property {string} room
|
||||||
* @property {number} offset
|
* @property {number} offset
|
||||||
|
* @property {number} rsid
|
||||||
*/
|
*/
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -101,6 +114,7 @@ export const createSub = rooms => {
|
|||||||
for (let i = 0; i < rooms.length; i++) {
|
for (let i = 0; i < rooms.length; i++) {
|
||||||
encoding.writeVarString(encoder, rooms[i].room)
|
encoding.writeVarString(encoder, rooms[i].room)
|
||||||
encoding.writeVarUint(encoder, rooms[i].offset)
|
encoding.writeVarUint(encoder, rooms[i].offset)
|
||||||
|
encoding.writeVarUint(encoder, rooms[i].rsid)
|
||||||
}
|
}
|
||||||
return encoding.toBuffer(encoder)
|
return encoding.toBuffer(encoder)
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user