ydb client
This commit is contained in:
parent
9d5bf50676
commit
b98ebddb69
@ -2,8 +2,6 @@
|
||||
<html>
|
||||
<body>
|
||||
<textarea style="width:80%;" rows=40 autocomplete="off" autocorrect="off" autocapitalize="off" spellcheck="false"></textarea>
|
||||
<script src="../../y.js"></script>
|
||||
<script src='../../../y-websockets-client/y-websockets-client.js'></script>
|
||||
<script src="./index.js"></script>
|
||||
<script type="module" src="./index.js"></script>
|
||||
</body>
|
||||
</html>
|
||||
|
@ -1,5 +1,14 @@
|
||||
/* global Y */
|
||||
import { createYdbClient } from '../../ydb/index.js'
|
||||
import Y from '../../src/Y.dist.js'
|
||||
|
||||
createYdbClient('ws://localhost:8899/ws').then(ydbclient => {
|
||||
const y = ydbclient.getY('textarea')
|
||||
let type = y.define('textarea', Y.Text)
|
||||
let textarea = document.querySelector('textarea')
|
||||
window.binding = new Y.TextareaBinding(type, textarea)
|
||||
})
|
||||
|
||||
/*
|
||||
let y = new Y('textarea-example', {
|
||||
connector: {
|
||||
name: 'websockets-client',
|
||||
@ -13,3 +22,4 @@ window.yTextarea = y
|
||||
let type = y.define('textarea', Y.Text)
|
||||
let textarea = document.querySelector('textarea')
|
||||
window.binding = new Y.TextareaBinding(type, textarea)
|
||||
*/
|
@ -5,7 +5,8 @@ import { sendSyncStep1, readSyncStep1 } from './MessageHandler/syncStep1.js'
|
||||
import { readSyncStep2 } from './MessageHandler/syncStep2.js'
|
||||
import { integrateRemoteStructs } from './MessageHandler/integrateRemoteStructs.js'
|
||||
|
||||
import debug from 'debug'
|
||||
// TODO: reintroduce or remove
|
||||
// import debug from 'debug'
|
||||
|
||||
// TODO: rename Connector
|
||||
|
||||
|
@ -22,7 +22,6 @@ import QuillBinding from './Bindings/QuillBinding/QuillBinding.js'
|
||||
import DomBinding from './Bindings/DomBinding/DomBinding.js'
|
||||
import { toBinary, fromBinary } from './MessageHandler/binaryEncode.js'
|
||||
|
||||
import debug from 'debug'
|
||||
import domToType from './Bindings/DomBinding/domToType.js'
|
||||
import { domsToTypes, switchAssociation } from './Bindings/DomBinding/util.js'
|
||||
|
||||
@ -56,7 +55,4 @@ Y.utils = {
|
||||
fromBinary
|
||||
}
|
||||
|
||||
Y.debug = debug
|
||||
debug.formatters.Y = messageToString
|
||||
debug.formatters.y = messageToRoomname
|
||||
export default Y
|
||||
|
4
src/Y.js
4
src/Y.js
@ -36,12 +36,12 @@ export default class Y extends NamedEventHandler {
|
||||
* @type {String}
|
||||
*/
|
||||
this.room = room
|
||||
if (opts != null) {
|
||||
if (opts != null && opts.connector != null) {
|
||||
opts.connector.room = room
|
||||
}
|
||||
this._contentReady = false
|
||||
this._opts = opts
|
||||
if (typeof opts.userID !== 'number') {
|
||||
if (opts == null || typeof opts.userID !== 'number') {
|
||||
this.userID = generateRandomUint32()
|
||||
} else {
|
||||
this.userID = opts.userID
|
||||
|
1
ydb/README.md
Normal file
1
ydb/README.md
Normal file
@ -0,0 +1 @@
|
||||
* Host should discard message when confNumber is older than expected
|
45
ydb/broadcastchannel.js
Normal file
45
ydb/broadcastchannel.js
Normal file
@ -0,0 +1,45 @@
|
||||
/* eslint-env browser */
|
||||
|
||||
import * as decoding from './decoding.js'
|
||||
import * as encoding from './encoding.js'
|
||||
|
||||
const bc = new BroadcastChannel('ydb-client')
|
||||
const subs = new Map()
|
||||
|
||||
bc.onmessage = event => {
|
||||
const decoder = decoding.createDecoder(event.data)
|
||||
const room = decoding.readVarString(decoder)
|
||||
const update = decoding.readTail(decoder)
|
||||
const rsubs = subs.get(room)
|
||||
if (rsubs !== undefined) {
|
||||
rsubs.forEach(f => f(update))
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} room
|
||||
* @param {function(ArrayBuffer)} f
|
||||
*/
|
||||
export const subscribe = (room, f) => {
|
||||
let rsubs = subs.get(room)
|
||||
if (rsubs === undefined) {
|
||||
rsubs = new Set()
|
||||
subs.set(room, rsubs)
|
||||
}
|
||||
rsubs.add(f)
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} room
|
||||
* @param {ArrayBuffer} update
|
||||
*/
|
||||
export const publish = (room, update) => {
|
||||
const encoder = encoding.createEncoder()
|
||||
encoding.writeVarString(encoder, room)
|
||||
encoding.writeArrayBuffer(encoder, update)
|
||||
bc.postMessage(encoding.toBuffer(encoder))
|
||||
const rsubs = subs.get(room)
|
||||
if (rsubs !== undefined) {
|
||||
rsubs.forEach(f => f(update))
|
||||
}
|
||||
}
|
188
ydb/decoding.js
Normal file
188
ydb/decoding.js
Normal file
@ -0,0 +1,188 @@
|
||||
|
||||
/* global Buffer */
|
||||
|
||||
import * as globals from './globals.js'
|
||||
|
||||
/**
|
||||
* A Decoder handles the decoding of an ArrayBuffer.
|
||||
*/
|
||||
class Decoder {
|
||||
/**
|
||||
* @param {ArrayBuffer} buffer Binary data to decode
|
||||
*/
|
||||
constructor (buffer) {
|
||||
this.arr = new Uint8Array(buffer)
|
||||
this.pos = 0
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {ArrayBuffer} buffer
|
||||
* @return {Decoder}
|
||||
*/
|
||||
export const createDecoder = buffer => new Decoder(buffer)
|
||||
|
||||
export const hasContent = decoder => decoder.pos !== decoder.arr.length
|
||||
|
||||
/**
|
||||
* Clone a decoder instance.
|
||||
* Optionally set a new position parameter.
|
||||
* @param {Decoder} decoder The decoder instance
|
||||
* @return {Decoder} A clone of `decoder`
|
||||
*/
|
||||
export const clone = (decoder, newPos = decoder.pos) => {
|
||||
let _decoder = createDecoder(decoder.arr.buffer)
|
||||
_decoder.pos = newPos
|
||||
return _decoder
|
||||
}
|
||||
|
||||
/**
|
||||
* Read `len` bytes as an ArrayBuffer.
|
||||
* @param {Decoder} decoder The decoder instance
|
||||
* @param {number} len The length of bytes to read
|
||||
* @return {ArrayBuffer}
|
||||
*/
|
||||
export const readArrayBuffer = (decoder, len) => {
|
||||
const arrayBuffer = globals.createUint8ArrayFromLen(len)
|
||||
const view = globals.createUint8ArrayFromBuffer(decoder.arr.buffer, decoder.pos, len)
|
||||
arrayBuffer.set(view)
|
||||
decoder.pos += len
|
||||
return arrayBuffer.buffer
|
||||
}
|
||||
|
||||
/**
|
||||
* Read variable length payload as ArrayBuffer
|
||||
* @param {Decoder} decoder
|
||||
* @return {ArrayBuffer}
|
||||
*/
|
||||
export const readPayload = decoder => readArrayBuffer(decoder, readVarUint(decoder))
|
||||
|
||||
/**
|
||||
* Read the rest of the content as an ArrayBuffer
|
||||
* @param {Decoder} decoder
|
||||
* @return {ArrayBuffer}
|
||||
*/
|
||||
export const readTail = decoder => readArrayBuffer(decoder, decoder.arr.length - decoder.pos)
|
||||
|
||||
/**
|
||||
* Skip one byte, jump to the next position.
|
||||
* @param {Decoder} decoder The decoder instance
|
||||
* @return {number} The next position
|
||||
*/
|
||||
export const skip8 = decoder => decoder.pos++
|
||||
|
||||
/**
|
||||
* Read one byte as unsigned integer.
|
||||
* @param {Decoder} decoder The decoder instance
|
||||
* @return {number} Unsigned 8-bit integer
|
||||
*/
|
||||
export const readUint8 = decoder => decoder.arr[decoder.pos++]
|
||||
|
||||
/**
|
||||
* Read 4 bytes as unsigned integer.
|
||||
*
|
||||
* @param {Decoder} decoder
|
||||
* @return {number} An unsigned integer.
|
||||
*/
|
||||
export const readUint32 = decoder => {
|
||||
let uint =
|
||||
decoder.arr[decoder.pos] +
|
||||
(decoder.arr[decoder.pos + 1] << 8) +
|
||||
(decoder.arr[decoder.pos + 2] << 16) +
|
||||
(decoder.arr[decoder.pos + 3] << 24)
|
||||
decoder.pos += 4
|
||||
return uint
|
||||
}
|
||||
|
||||
/**
|
||||
* Look ahead without incrementing position.
|
||||
* to the next byte and read it as unsigned integer.
|
||||
*
|
||||
* @param {Decoder} decoder
|
||||
* @return {number} An unsigned integer.
|
||||
*/
|
||||
export const peekUint8 = decoder => decoder.arr[decoder.pos]
|
||||
|
||||
/**
|
||||
* Read unsigned integer (32bit) with variable length.
|
||||
* 1/8th of the storage is used as encoding overhead.
|
||||
* * numbers < 2^7 is stored in one bytlength
|
||||
* * numbers < 2^14 is stored in two bylength
|
||||
*
|
||||
* @param {Decoder} decoder
|
||||
* @return {number} An unsigned integer.length
|
||||
*/
|
||||
export const readVarUint = decoder => {
|
||||
let num = 0
|
||||
let len = 0
|
||||
while (true) {
|
||||
let r = decoder.arr[decoder.pos++]
|
||||
num = num | ((r & 0b1111111) << len)
|
||||
len += 7
|
||||
if (r < 1 << 7) {
|
||||
return num >>> 0 // return unsigned number!
|
||||
}
|
||||
if (len > 35) {
|
||||
throw new Error('Integer out of range!')
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Read string of variable length
|
||||
* * varUint is used to store the length of the string
|
||||
*
|
||||
* Transforming utf8 to a string is pretty expensive. The code performs 10x better
|
||||
* when String.fromCodePoint is fed with all characters as arguments.
|
||||
* But most environments have a maximum number of arguments per functions.
|
||||
* For effiency reasons we apply a maximum of 10000 characters at once.
|
||||
*
|
||||
* @param {Decoder} decoder
|
||||
* @return {String} The read String.
|
||||
*/
|
||||
export const readVarString = decoder => {
|
||||
let remainingLen = readVarUint(decoder)
|
||||
let encodedString = ''
|
||||
while (remainingLen > 0) {
|
||||
const nextLen = remainingLen < 10000 ? remainingLen : 10000
|
||||
const bytes = new Array(nextLen)
|
||||
for (let i = 0; i < nextLen; i++) {
|
||||
bytes[i] = decoder.arr[decoder.pos++]
|
||||
}
|
||||
encodedString += String.fromCodePoint.apply(null, bytes)
|
||||
remainingLen -= nextLen
|
||||
}
|
||||
return decodeURIComponent(escape(encodedString))
|
||||
}
|
||||
|
||||
/**
|
||||
* Look ahead and read varString without incrementing position
|
||||
* @param {Decoder} decoder
|
||||
* @return {string}
|
||||
*/
|
||||
export const peekVarString = decoder => {
|
||||
let pos = decoder.pos
|
||||
let s = readVarString(decoder)
|
||||
decoder.pos = pos
|
||||
return s
|
||||
}
|
||||
|
||||
/**
|
||||
* Read ID.
|
||||
* * If first varUint read is 0xFFFFFF a RootID is returned.
|
||||
* * Otherwise an ID is returned
|
||||
*
|
||||
* @param {Decoder} decoder
|
||||
* @return {ID}
|
||||
*
|
||||
export const readID = decoder => {
|
||||
let user = decoder.readVarUint()
|
||||
if (user === RootFakeUserID) {
|
||||
// read property name and type id
|
||||
const rid = new RootID(decoder.readVarString(), null)
|
||||
rid.type = decoder.readVarUint()
|
||||
return rid
|
||||
}
|
||||
return new ID(user, decoder.readVarUint())
|
||||
}
|
||||
*/
|
234
ydb/encoding.js
Normal file
234
ydb/encoding.js
Normal file
@ -0,0 +1,234 @@
|
||||
|
||||
import * as globals from './globals.js'
|
||||
|
||||
const bits7 = 0b1111111
|
||||
const bits8 = 0b11111111
|
||||
|
||||
/**
|
||||
* A BinaryEncoder handles the encoding to an ArrayBuffer.
|
||||
*/
|
||||
class Encoder {
|
||||
constructor () {
|
||||
this.cpos = 0
|
||||
this.cbuf = globals.createUint8ArrayFromLen(1000)
|
||||
this.bufs = []
|
||||
}
|
||||
}
|
||||
|
||||
export const createEncoder = () => new Encoder()
|
||||
|
||||
/**
|
||||
* The current length of the encoded data.
|
||||
*/
|
||||
export const length = encoder => {
|
||||
let len = 0
|
||||
for (let i = 0; i < encoder.bufs.length; i++) {
|
||||
len += encoder.bufs[i].length
|
||||
}
|
||||
len += encoder.cpos
|
||||
return len
|
||||
}
|
||||
|
||||
/**
|
||||
* Transform to ArrayBuffer.
|
||||
* @param {Encoder} encoder
|
||||
* @return {ArrayBuffer} The created ArrayBuffer.
|
||||
*/
|
||||
export const toBuffer = encoder => {
|
||||
const uint8arr = globals.createUint8ArrayFromLen(length(encoder))
|
||||
let curPos = 0
|
||||
for (let i = 0; i < encoder.bufs.length; i++) {
|
||||
let d = encoder.bufs[i]
|
||||
uint8arr.set(d, curPos)
|
||||
curPos += d.length
|
||||
}
|
||||
uint8arr.set(globals.createUint8ArrayFromBuffer(encoder.cbuf.buffer, 0, encoder.cpos), curPos)
|
||||
return uint8arr.buffer
|
||||
}
|
||||
|
||||
/**
|
||||
* Write one byte to the encoder.
|
||||
*
|
||||
* @param {Encoder} encoder
|
||||
* @param {number} num The byte that is to be encoded.
|
||||
*/
|
||||
export const write = (encoder, num) => {
|
||||
if (encoder.cpos === encoder.cbuf.length) {
|
||||
encoder.bufs.push(encoder.cbuf)
|
||||
encoder.cbuf = globals.createUint8ArrayFromLen(encoder.cbuf.length * 2)
|
||||
encoder.cpos = 0
|
||||
}
|
||||
encoder.cbuf[encoder.cpos++] = num
|
||||
}
|
||||
|
||||
/**
|
||||
* Write one byte at a specific position.
|
||||
* Position must already be written (i.e. encoder.length > pos)
|
||||
*
|
||||
* @param {Encoder} encoder
|
||||
* @param {number} pos Position to which to write data
|
||||
* @param {number} num Unsigned 8-bit integer
|
||||
*/
|
||||
export const set = (encoder, pos, num) => {
|
||||
let buffer = null
|
||||
// iterate all buffers and adjust position
|
||||
for (let i = 0; i < encoder.bufs.length && buffer === null; i++) {
|
||||
const b = encoder.bufs[i]
|
||||
if (pos < b.length) {
|
||||
buffer = b // found buffer
|
||||
} else {
|
||||
pos -= b.length
|
||||
}
|
||||
}
|
||||
if (buffer === null) {
|
||||
// use current buffer
|
||||
buffer = encoder.cbuf
|
||||
}
|
||||
buffer[pos] = num
|
||||
}
|
||||
|
||||
/**
|
||||
* Write one byte as an unsigned integer.
|
||||
*
|
||||
* @param {Encoder} encoder
|
||||
* @param {number} num The number that is to be encoded.
|
||||
*/
|
||||
export const writeUint8 = (encoder, num) => write(encoder, num & bits8)
|
||||
|
||||
/**
|
||||
* Write one byte as an unsigned Integer at a specific location.
|
||||
*
|
||||
* @param {Encoder} encoder
|
||||
* @param {number} pos The location where the data will be written.
|
||||
* @param {number} num The number that is to be encoded.
|
||||
*/
|
||||
export const setUint8 = (encoder, pos, num) => set(encoder, pos, num & bits8)
|
||||
|
||||
/**
|
||||
* Write two bytes as an unsigned integer.
|
||||
*
|
||||
* @param {Encoder} encoder
|
||||
* @param {number} num The number that is to be encoded.
|
||||
*/
|
||||
export const writeUint16 = (encoder, num) => {
|
||||
write(encoder, num & bits8)
|
||||
write(encoder, (num >>> 8) & bits8)
|
||||
}
|
||||
/**
|
||||
* Write two bytes as an unsigned integer at a specific location.
|
||||
*
|
||||
* @param {Encoder} encoder
|
||||
* @param {number} pos The location where the data will be written.
|
||||
* @param {number} num The number that is to be encoded.
|
||||
*/
|
||||
export const setUint16 = (encoder, pos, num) => {
|
||||
set(encoder, pos, num & bits8)
|
||||
set(encoder, pos + 1, (num >>> 8) & bits8)
|
||||
}
|
||||
|
||||
/**
|
||||
* Write two bytes as an unsigned integer
|
||||
*
|
||||
* @param {Encoder} encoder
|
||||
* @param {number} num The number that is to be encoded.
|
||||
*/
|
||||
export const writeUint32 = (encoder, num) => {
|
||||
for (let i = 0; i < 4; i++) {
|
||||
write(encoder, num & bits8)
|
||||
num >>>= 8
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Write two bytes as an unsigned integer at a specific location.
|
||||
*
|
||||
* @param {Encoder} encoder
|
||||
* @param {number} pos The location where the data will be written.
|
||||
* @param {number} num The number that is to be encoded.
|
||||
*/
|
||||
export const setUint32 = (encoder, pos, num) => {
|
||||
for (let i = 0; i < 4; i++) {
|
||||
set(encoder, pos + i, num & bits8)
|
||||
num >>>= 8
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Write a variable length unsigned integer.
|
||||
*
|
||||
* Encodes integers in the range from [0, 4294967295] / [0, 0xffffffff]. (max 32 bit unsigned integer)
|
||||
*
|
||||
* @param {Encoder} encoder
|
||||
* @param {number} num The number that is to be encoded.
|
||||
*/
|
||||
export const writeVarUint = (encoder, num) => {
|
||||
while (num >= 0b10000000) {
|
||||
write(encoder, 0b10000000 | (bits7 & num))
|
||||
num >>>= 7
|
||||
}
|
||||
write(encoder, bits7 & num)
|
||||
}
|
||||
|
||||
/**
|
||||
* Write a variable length string.
|
||||
*
|
||||
* @param {Encoder} encoder
|
||||
* @param {String} str The string that is to be encoded.
|
||||
*/
|
||||
export const writeVarString = (encoder, str) => {
|
||||
const encodedString = unescape(encodeURIComponent(str))
|
||||
const len = encodedString.length
|
||||
writeVarUint(encoder, len)
|
||||
for (let i = 0; i < len; i++) {
|
||||
write(encoder, encodedString.codePointAt(i))
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Write the content of another biUint8Arr
|
||||
*
|
||||
* @param {Encoder} encoder The enUint8Arr
|
||||
* @param encoderToAppend The BinaryEncoder to be written.
|
||||
*/
|
||||
export const writeBinaryEncoder = (encoder, encoderToAppend) => writeArrayBuffer(encoder, toBuffer(encoder))
|
||||
|
||||
/**
|
||||
* Append an arrayBuffer to the encoder.
|
||||
*
|
||||
* @param {Encoder} encoder
|
||||
* @param {ArrayBuffer} arrayBuffer
|
||||
*/
|
||||
export const writeArrayBuffer = (encoder, arrayBuffer) => {
|
||||
const prevBufferLen = encoder.cbuf.length
|
||||
// TODO: Append to cbuf if possible
|
||||
encoder.bufs.push(globals.createUint8ArrayFromBuffer(encoder.cbuf.buffer, 0, encoder.cpos))
|
||||
encoder.bufs.push(globals.createUint8ArrayFromArrayBuffer(arrayBuffer))
|
||||
encoder.cbuf = globals.createUint8ArrayFromLen(prevBufferLen)
|
||||
encoder.cpos = 0
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {Encoder} encoder
|
||||
* @param {ArrayBuffer} arrayBuffer
|
||||
*/
|
||||
export const writePayload = (encoder, arrayBuffer) => {
|
||||
writeVarUint(encoder, arrayBuffer.byteLength)
|
||||
writeArrayBuffer(encoder, arrayBuffer)
|
||||
}
|
||||
|
||||
/**
|
||||
* Write an ID at the current position.
|
||||
*
|
||||
* @param {ID} id The ID that is to be written.
|
||||
*
|
||||
export const writeID = (encoder, id) => {
|
||||
const user = id.user
|
||||
writeVarUint(encoder, user)
|
||||
if (user !== RootFakeUserID) {
|
||||
writeVarUint(encoder, id.clock)
|
||||
} else {
|
||||
writeVarString(encoder, id.name)
|
||||
writeVarUint(encoder, id.type)
|
||||
}
|
||||
}
|
||||
*/
|
49
ydb/encoding.test.js
Normal file
49
ydb/encoding.test.js
Normal file
@ -0,0 +1,49 @@
|
||||
import * as encoding from './encoding.js'
|
||||
|
||||
/**
|
||||
* Check if binary encoding is compatible with golang binary encoding - binary.PutVarUint.
|
||||
*
|
||||
* Result: is compatible up to 32 bit: [0, 4294967295] / [0, 0xffffffff]. (max 32 bit unsigned integer)
|
||||
*/
|
||||
let err = null
|
||||
try {
|
||||
const tests = [
|
||||
{ in: 0, out: [0] },
|
||||
{ in: 1, out: [1] },
|
||||
{ in: 128, out: [128, 1] },
|
||||
{ in: 200, out: [200, 1] },
|
||||
{ in: 32, out: [32] },
|
||||
{ in: 500, out: [244, 3] },
|
||||
{ in: 256, out: [128, 2] },
|
||||
{ in: 700, out: [188, 5] },
|
||||
{ in: 1024, out: [128, 8] },
|
||||
{ in: 1025, out: [129, 8] },
|
||||
{ in: 4048, out: [208, 31] },
|
||||
{ in: 5050, out: [186, 39] },
|
||||
{ in: 1000000, out: [192, 132, 61] },
|
||||
{ in: 34951959, out: [151, 166, 213, 16] },
|
||||
{ in: 2147483646, out: [254, 255, 255, 255, 7] },
|
||||
{ in: 2147483647, out: [255, 255, 255, 255, 7] },
|
||||
{ in: 2147483648, out: [128, 128, 128, 128, 8] },
|
||||
{ in: 2147483700, out: [180, 128, 128, 128, 8] },
|
||||
{ in: 4294967294, out: [254, 255, 255, 255, 15] },
|
||||
{ in: 4294967295, out: [255, 255, 255, 255, 15] }
|
||||
]
|
||||
tests.forEach(test => {
|
||||
const encoder = encoding.createEncoder()
|
||||
encoding.writeVarUint(encoder, test.in)
|
||||
const buffer = new Uint8Array(encoding.toBuffer(encoder))
|
||||
if (buffer.byteLength !== test.out.length) {
|
||||
throw new Error('Length don\'t match!')
|
||||
}
|
||||
for (let j = 0; j < buffer.length; j++) {
|
||||
if (buffer[j] !== test[1][j]) {
|
||||
throw new Error('values don\'t match!')
|
||||
}
|
||||
}
|
||||
})
|
||||
} catch (error) {
|
||||
err = error
|
||||
} finally {
|
||||
console.log('YDB Client: Encoding varUint compatiblity test: ', err || 'success!')
|
||||
}
|
59
ydb/globals.js
Normal file
59
ydb/globals.js
Normal file
@ -0,0 +1,59 @@
|
||||
/* eslint-env browser */
|
||||
|
||||
export const Uint8Array_ = Uint8Array
|
||||
|
||||
/**
|
||||
* @param {Array<number>} arr
|
||||
* @return {ArrayBuffer}
|
||||
*/
|
||||
export const createArrayBufferFromArray = arr => new Uint8Array_(arr).buffer
|
||||
|
||||
export const createUint8ArrayFromLen = len => new Uint8Array_(len)
|
||||
|
||||
/**
|
||||
* Create Uint8Array with initial content from buffer
|
||||
*/
|
||||
export const createUint8ArrayFromBuffer = (buffer, byteOffset, length) => new Uint8Array_(buffer, byteOffset, length)
|
||||
|
||||
/**
|
||||
* Create Uint8Array with initial content from buffer
|
||||
*/
|
||||
export const createUint8ArrayFromArrayBuffer = arraybuffer => new Uint8Array_(arraybuffer)
|
||||
export const createArrayFromArrayBuffer = arraybuffer => Array.from(createUint8ArrayFromArrayBuffer(arraybuffer))
|
||||
|
||||
export const createPromise = f => new Promise(f)
|
||||
/**
|
||||
* `Promise.all` wait for all promises in the array to resolve and return the result
|
||||
* @param {Array<Promise<any>>} arrp
|
||||
* @return {any}
|
||||
*/
|
||||
export const pall = arrp => Promise.all(arrp)
|
||||
export const preject = reason => Promise.reject(reason)
|
||||
export const presolve = res => Promise.resolve(res)
|
||||
|
||||
export const until = (timeout, check) => createPromise((resolve, reject) => {
|
||||
const hasTimeout = timeout > 0
|
||||
const untilInterval = () => {
|
||||
if (check()) {
|
||||
clearInterval(intervalHandle)
|
||||
resolve()
|
||||
} else if (hasTimeout) {
|
||||
timeout -= 10
|
||||
if (timeout < 0) {
|
||||
clearInterval(intervalHandle)
|
||||
reject(error('Timeout'))
|
||||
}
|
||||
}
|
||||
}
|
||||
const intervalHandle = setInterval(untilInterval, 10)
|
||||
})
|
||||
|
||||
export const error = description => new Error(description)
|
||||
|
||||
export const max = (a, b) => a > b ? a : b
|
||||
|
||||
/**
|
||||
* @param {number} t Time to wait
|
||||
* @return {Promise} Promise that is resolved after t ms
|
||||
*/
|
||||
export const wait = t => createPromise(r => setTimeout(r, t))
|
144
ydb/idb.js
Normal file
144
ydb/idb.js
Normal file
@ -0,0 +1,144 @@
|
||||
/* eslint-env browser */
|
||||
|
||||
import * as globals from './globals.js'
|
||||
|
||||
/*
|
||||
* IDB Request to Promise transformer
|
||||
*/
|
||||
export const rtop = request => globals.createPromise((resolve, reject) => {
|
||||
request.onerror = event => reject(new Error(event.target.error))
|
||||
request.onblocked = () => location.reload()
|
||||
request.onsuccess = event => resolve(event.target.result)
|
||||
})
|
||||
|
||||
/**
|
||||
* @return {Promise<IDBDatabase>}
|
||||
*/
|
||||
export const openDB = (name, initDB) => globals.createPromise((resolve, reject) => {
|
||||
let request = indexedDB.open(name)
|
||||
/**
|
||||
* @param {any} event
|
||||
*/
|
||||
request.onupgradeneeded = event => initDB(event.target.result)
|
||||
/**
|
||||
* @param {any} event
|
||||
*/
|
||||
request.onerror = event => reject(new Error(event.target.error))
|
||||
request.onblocked = () => location.reload()
|
||||
/**
|
||||
* @param {any} event
|
||||
*/
|
||||
request.onsuccess = event => {
|
||||
const db = event.target.result
|
||||
db.onversionchange = () => { db.close() }
|
||||
addEventListener('unload', () => db.close())
|
||||
resolve(db)
|
||||
}
|
||||
})
|
||||
|
||||
export const deleteDB = name => rtop(indexedDB.deleteDatabase(name))
|
||||
|
||||
export const createStores = (db, definitions) => definitions.forEach(d =>
|
||||
db.createObjectStore.apply(db, d)
|
||||
)
|
||||
|
||||
/**
|
||||
* @param {IDBObjectStore} store
|
||||
* @param {String | number | ArrayBuffer | Date | Array } key
|
||||
* @return {Promise<ArrayBuffer>}
|
||||
*/
|
||||
export const get = (store, key) =>
|
||||
rtop(store.get(key))
|
||||
|
||||
/**
|
||||
* @param {IDBObjectStore} store
|
||||
* @param {String | number | ArrayBuffer | Date | IDBKeyRange | Array } key
|
||||
*/
|
||||
export const del = (store, key) =>
|
||||
rtop(store.delete(key))
|
||||
|
||||
/**
|
||||
* @param {IDBObjectStore} store
|
||||
* @param {String | number | ArrayBuffer | Date | boolean} item
|
||||
* @param {String | number | ArrayBuffer | Date | Array} [key]
|
||||
*/
|
||||
export const put = (store, item, key) =>
|
||||
rtop(store.put(item, key))
|
||||
|
||||
/**
|
||||
* @param {IDBObjectStore} store
|
||||
* @param {String | number | ArrayBuffer | Date | boolean} item
|
||||
* @param {String | number | ArrayBuffer | Date | Array} [key]
|
||||
* @return {Promise<ArrayBuffer>}
|
||||
*/
|
||||
export const add = (store, item, key) =>
|
||||
rtop(store.add(item, key))
|
||||
|
||||
/**
|
||||
* @param {IDBObjectStore} store
|
||||
* @param {String | number | ArrayBuffer | Date} item
|
||||
* @return {Promise<number>}
|
||||
*/
|
||||
export const addAutoKey = (store, item) =>
|
||||
rtop(store.add(item))
|
||||
|
||||
/**
|
||||
* @param {IDBObjectStore} store
|
||||
* @param {IDBKeyRange} [range]
|
||||
*/
|
||||
export const getAll = (store, range) =>
|
||||
rtop(store.getAll(range))
|
||||
|
||||
/**
|
||||
* @param {IDBObjectStore} store
|
||||
* @param {IDBKeyRange} [range]
|
||||
*/
|
||||
export const getAllKeys = (store, range) =>
|
||||
rtop(store.getAllKeys(range))
|
||||
|
||||
/**
|
||||
* Iterate on keys and values
|
||||
* @param {IDBObjectStore} store
|
||||
* @param {IDBKeyRange?} keyrange
|
||||
* @param {function(any, any)} f Return true in order to continue the cursor
|
||||
*/
|
||||
export const iterate = (store, keyrange, f) => globals.createPromise((resolve, reject) => {
|
||||
const request = store.openCursor(keyrange)
|
||||
request.onerror = reject
|
||||
/**
|
||||
* @param {any} event
|
||||
*/
|
||||
request.onsuccess = event => {
|
||||
const cursor = event.target.result
|
||||
if (cursor === null) {
|
||||
return resolve()
|
||||
}
|
||||
f(cursor.value, cursor.key)
|
||||
cursor.continue()
|
||||
}
|
||||
})
|
||||
|
||||
/**
|
||||
* Iterate on the keys (no values)
|
||||
* @param {IDBObjectStore} store
|
||||
* @param {IDBKeyRange} keyrange
|
||||
* @param {function(IDBCursor)} f Call `idbcursor.continue()` to iterate further
|
||||
*/
|
||||
export const iterateKeys = (store, keyrange, f) => {
|
||||
/**
|
||||
* @param {any} event
|
||||
*/
|
||||
store.openKeyCursor(keyrange).onsuccess = event => f(event.target.result)
|
||||
}
|
||||
|
||||
/**
|
||||
* Open store from transaction
|
||||
* @param {IDBTransaction} t
|
||||
* @param {String} store
|
||||
* @returns {IDBObjectStore}
|
||||
*/
|
||||
export const getStore = (t, store) => t.objectStore(store)
|
||||
|
||||
export const createIDBKeyRangeBound = (lower, upper, lowerOpen, upperOpen) => IDBKeyRange.bound(lower, upper, lowerOpen, upperOpen)
|
||||
export const createIDBKeyRangeUpperBound = (upper, upperOpen) => IDBKeyRange.upperBound(upper, upperOpen)
|
||||
export const createIDBKeyRangeLowerBound = (lower, lowerOpen) => IDBKeyRange.lowerBound(lower, lowerOpen)
|
34
ydb/idb.test.js
Normal file
34
ydb/idb.test.js
Normal file
@ -0,0 +1,34 @@
|
||||
import * as test from './test.js'
|
||||
import * as idb from './idb.js'
|
||||
import * as logging from './logging.js'
|
||||
|
||||
const initTestDB = db => idb.createStores(db, [['test']])
|
||||
const testDBName = 'idb-test'
|
||||
|
||||
const createTransaction = db => db.transaction(['test'], 'readwrite')
|
||||
/**
|
||||
* @param {IDBTransaction} t
|
||||
* @return {IDBObjectStore}
|
||||
*/
|
||||
const getStore = t => idb.getStore(t, 'test')
|
||||
|
||||
idb.deleteDB(testDBName).then(() => idb.openDB(testDBName, initTestDB)).then(db => {
|
||||
test.run('idb iteration', async testname => {
|
||||
const t = createTransaction(db)
|
||||
await idb.put(getStore(t), 0, ['t', 0])
|
||||
await idb.put(getStore(t), 1, ['t', 1])
|
||||
const valsGetAll = await idb.getAll(getStore(t))
|
||||
if (valsGetAll.length !== 2) {
|
||||
logging.fail('getAll does not return two values')
|
||||
}
|
||||
const valsIterate = []
|
||||
const keyrange = idb.createIDBKeyRangeBound(['t', 0], ['t', 1], false, false)
|
||||
await idb.put(getStore(t), 2, ['t', 2])
|
||||
await idb.iterate(getStore(t), keyrange, (val, key) => {
|
||||
valsIterate.push(val)
|
||||
})
|
||||
if (valsIterate.length !== 2) {
|
||||
logging.fail('iterate does not return two values')
|
||||
}
|
||||
})
|
||||
})
|
300
ydb/idbactions.js
Normal file
300
ydb/idbactions.js
Normal file
@ -0,0 +1,300 @@
|
||||
/* eslint-env browser */
|
||||
|
||||
/**
|
||||
* Naming conventions:
|
||||
* * ydb: Think of ydb as a federated set of servers. This is not yet true, but we will eventually get there. With this assumption come some challenges with the client
|
||||
* * ydb instance: A single ydb instance that this ydb-client connects to
|
||||
* * (room) host: Exactly one ydb instance controls a room at any time. The ownership may change over time. The host of a room is the ydb instance that owns it. This is not necessarily the instance we connect to.
|
||||
* * room session id: An random id that is assigned to a room. When the server dies unexpectedly, we can conclude which data is missing and send it to the server (or delete it and prevent duplicate content)
|
||||
* * update: An ArrayBuffer of binary data. Neither Ydb nor Ydb-client care about the content of update. Updates may be appended to each other.
|
||||
*
|
||||
* The database has four tables:
|
||||
*
|
||||
* CU "client-unconfirmed" confid -> room, update
|
||||
* - The client writes to this table when it creates an update.
|
||||
* - Then it sends an update to the host with the generated confid
|
||||
* - In case the host doesn't confirm that it received this update, it is sent again on next sync
|
||||
* HU "host-unconfirmed" room, offset -> update
|
||||
* - Updates from the host are written to this table
|
||||
* - When host confirms that an unconfirmed update was persisted, the update is written to the Co table
|
||||
* - When client sync to host and the room session ids don't match, all host-unconfirmed messages are sent to host
|
||||
* Co "confirmed":
|
||||
* data:{room} -> update
|
||||
* - this field holds confirmed room updates
|
||||
* meta:{room} -> room session id, confirmed offset
|
||||
* - this field holds metadata about the room
|
||||
* US "unconfirmed-subscriptions" room -> _
|
||||
* - Subscriptions sent to the server, but didn't receive confirmation yet
|
||||
* - Either a room is in US or in Co
|
||||
* - A client may update a room when the room is in either US or Co
|
||||
*/
|
||||
|
||||
import * as encoding from './encoding.js'
|
||||
import * as decoding from './decoding.js'
|
||||
import * as idb from './idb.js'
|
||||
import * as globals from './globals.js'
|
||||
import * as message from './message.js'
|
||||
|
||||
/**
|
||||
* Get 'client-unconfirmed' store from transaction
|
||||
* @param {IDBTransaction} t
|
||||
* @return {IDBObjectStore}
|
||||
*/
|
||||
const getStoreCU = t => idb.getStore(t, STORE_CU)
|
||||
/**
|
||||
* Get 'host-unconfirmed' store from transaction
|
||||
* @param {IDBTransaction} t
|
||||
* @return {IDBObjectStore}
|
||||
*/
|
||||
const getStoreHU = t => idb.getStore(t, STORE_HU)
|
||||
/**
|
||||
* Get 'confirmed' store from transaction
|
||||
* @param {IDBTransaction} t
|
||||
* @return {IDBObjectStore}
|
||||
*/
|
||||
const getStoreCo = t => idb.getStore(t, STORE_CO)
|
||||
|
||||
/**
|
||||
* Get `unconfirmed-subscriptions` store from transaction
|
||||
* @param {IDBTransaction} t
|
||||
* @return {IDBObjectStore}
|
||||
*/
|
||||
const getStoreUS = t => idb.getStore(t, STORE_US)
|
||||
|
||||
/**
|
||||
* @param {string} room
|
||||
* @param {number} offset
|
||||
* @return {[string, number]}
|
||||
*/
|
||||
const encodeHUKey = (room, offset) => [room, offset]
|
||||
|
||||
/**
|
||||
* @typedef RoomAndOffset
|
||||
* @type {Object}
|
||||
* @property {string} room
|
||||
* @property {number} offset Received offsets (including offsets that are not yet confirmed)
|
||||
*/
|
||||
|
||||
/**
|
||||
* @param {[string, number]} key
|
||||
* @return {RoomAndOffset}
|
||||
*/
|
||||
const decodeHUKey = key => {
|
||||
return {
|
||||
room: key[0],
|
||||
offset: key[1]
|
||||
}
|
||||
}
|
||||
|
||||
const getCoMetaKey = room => 'meta:' + room
|
||||
const getCoDataKey = room => 'data:' + room
|
||||
|
||||
const STORE_CU = 'client-unconfirmed'
|
||||
const STORE_US = 'unconfirmed-subscriptions'
|
||||
const STORE_CO = 'confirmed'
|
||||
const STORE_HU = 'host-unconfirmed'
|
||||
|
||||
/**
|
||||
* @param {string} dbNamespace
|
||||
* @return {Promise<IDBDatabase>}
|
||||
*/
|
||||
export const openDB = dbNamespace => idb.openDB(dbNamespace, db => idb.createStores(db, [
|
||||
[STORE_CU, { autoIncrement: true }],
|
||||
[STORE_HU],
|
||||
[STORE_CO],
|
||||
[STORE_US]
|
||||
]))
|
||||
|
||||
export const deleteDB = name => idb.deleteDB(name)
|
||||
|
||||
/**
|
||||
* Create a new IDBTransaction accessing all object stores. Normally we should care that we can access object stores in parallel.
|
||||
* But this is not possible in ydb-client since at least two object stores are requested in every IDB change.
|
||||
* @param {IDBDatabase} db
|
||||
* @return {IDBTransaction}
|
||||
*/
|
||||
export const createTransaction = db => db.transaction([STORE_CU, STORE_HU, STORE_CO, STORE_US], 'readwrite')
|
||||
|
||||
/**
|
||||
* Write an update to the db after the client created it. This update is not yet received by the host.
|
||||
* This function returns a client confirmation number. The confirmation number must be send to the host so it can identify the update,
|
||||
* and we can move the update to HU when it is confirmed (@see writeHostUnconfirmedByClient)
|
||||
* @param {IDBTransaction} t
|
||||
* @param {String} room
|
||||
* @param {ArrayBuffer} update
|
||||
* @return {Promise<number>} client confirmation number
|
||||
*/
|
||||
export const writeClientUnconfirmed = (t, room, update) => {
|
||||
const encoder = encoding.createEncoder()
|
||||
encoding.writeVarString(encoder, room)
|
||||
encoding.writeArrayBuffer(encoder, update)
|
||||
return idb.addAutoKey(getStoreCU(t), encoding.toBuffer(encoder))
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all updates that are not yet confirmed by host.
|
||||
* @param {IDBTransaction} t
|
||||
* @return {Promise<ArrayBuffer>} All update messages as a single ArrayBuffer
|
||||
*/
|
||||
export const getUnconfirmedUpdates = t => {
|
||||
const encoder = encoding.createEncoder()
|
||||
return idb.iterate(getStoreCU(t), null, (value, clientConf) => {
|
||||
const decoder = decoding.createDecoder(value)
|
||||
const room = decoding.readVarString(decoder)
|
||||
const update = decoding.readTail(decoder)
|
||||
encoding.writeArrayBuffer(encoder, message.createUpdate(room, update, clientConf))
|
||||
}).then(() => encoding.toBuffer(encoder))
|
||||
}
|
||||
|
||||
/**
|
||||
* The host confirms that it received and persisted an update. The update can be safely removed from CU.
|
||||
* It is necessary to call this function in case that the client disconnected before the host could send `writeHostUnconfirmedByClient`.
|
||||
* @param {IDBTransaction} t
|
||||
* @param {number} clientConf
|
||||
* @return {Promise}
|
||||
*/
|
||||
export const confirmClient = (t, clientConf) => idb.del(getStoreCU(t), idb.createIDBKeyRangeUpperBound(clientConf, false))
|
||||
|
||||
/**
|
||||
* The host confirms that it received and broadcasted an update sent from this client.
|
||||
* Calling this method does not confirm that the update has been persisted by the server.
|
||||
*
|
||||
* Other clients will receive an update with `writeHostUnconfirmed`. Since this client created the update, it only receives a confirmation. So
|
||||
* we can simply move the update from CU to HU.
|
||||
*
|
||||
* @param {IDBTransaction} t
|
||||
* @param {number} clientConf The client confirmation number that identifies the update
|
||||
* @param {number} offset The offset with wich the server will store the information
|
||||
*/
|
||||
export const writeHostUnconfirmedByClient = (t, clientConf, offset) => idb.get(getStoreCU(t), clientConf).then(roomAndUpdate => {
|
||||
const decoder = decoding.createDecoder(roomAndUpdate)
|
||||
const room = decoding.readVarString(decoder)
|
||||
const update = decoding.readTail(decoder)
|
||||
return writeHostUnconfirmed(t, room, offset, update).then(() =>
|
||||
idb.del(getStoreCU(t), clientConf)
|
||||
)
|
||||
})
|
||||
|
||||
/**
|
||||
* The host broadcasts an update created by another client. It assures that the update will eventually be persisted with
|
||||
* `offset`. Calling this function does not imply that the update was persisted by the host. In case of mismatching room session ids
|
||||
* the updates in HU will be sent to the server.
|
||||
*
|
||||
* @param {IDBTransaction} t
|
||||
* @param {String} room
|
||||
* @param {number} offset
|
||||
* @param {ArrayBuffer} update
|
||||
* @return {Promise}
|
||||
*/
|
||||
export const writeHostUnconfirmed = (t, room, offset, update) => idb.add(getStoreHU(t), update, encodeHUKey(room, offset))
|
||||
|
||||
/**
|
||||
* The host confirms that it persisted updates up until (including) offset. updates may be moved from HU to Co.
|
||||
*
|
||||
* @param {IDBTransaction} t
|
||||
* @param {String} room
|
||||
* @param {number} offset Inclusive range [0, offset - 1] has been stored to host
|
||||
*/
|
||||
export const writeConfirmedByHost = (t, room, offset) => {
|
||||
const co = getStoreCo(t)
|
||||
return globals.pall([idb.get(co, getCoDataKey(room)), idb.get(co, getCoMetaKey(room))]).then(async arr => {
|
||||
const data = arr[0]
|
||||
const meta = arr[1]
|
||||
const metaSessionId = decodeMetaValue(meta).roomsid
|
||||
const dataEncoder = encoding.createEncoder()
|
||||
encoding.writeArrayBuffer(dataEncoder, data)
|
||||
const hu = getStoreHU(t)
|
||||
const huKeyRange = idb.createIDBKeyRangeBound(encodeHUKey(room, 0), encodeHUKey(room, offset), false, false)
|
||||
return idb.iterate(hu, huKeyRange, (value, _key) => {
|
||||
const key = decodeHUKey(_key) // @kevin _key is an array. remove decodeHUKey functions
|
||||
if (key.room === room && key.offset <= offset) {
|
||||
encoding.writeArrayBuffer(dataEncoder, value)
|
||||
}
|
||||
}).then(() =>
|
||||
globals.pall([idb.put(co, encodeMetaValue(metaSessionId, offset), getCoMetaKey(room)), idb.put(co, encoding.toBuffer(dataEncoder), getCoDataKey(room)), idb.del(hu, huKeyRange)])
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* @typedef RoomMeta
|
||||
* @type {Object}
|
||||
* @property {string} room
|
||||
* @property {number} roomsid Room session id
|
||||
* @property {number} offset Received offsets (including offsets that are not yet confirmed)
|
||||
*/
|
||||
|
||||
/**
|
||||
* Get all meta information for all rooms.
|
||||
*
|
||||
* @param {IDBTransaction} t
|
||||
* @return {Promise<Array<RoomMeta>>}
|
||||
*/
|
||||
export const getRoomMetas = t => {
|
||||
const hu = getStoreHU(t)
|
||||
const result = []
|
||||
return idb.iterate(getStoreCo(t), idb.createIDBKeyRangeLowerBound('meta:', false), (metavalue, metakey) =>
|
||||
idb.getAllKeys(hu, idb.createIDBKeyRangeBound(encodeHUKey(metakey.slice(5), 0), encodeHUKey(metakey.slice(5), 2 ** 32), false, false)).then(keys => {
|
||||
const { roomsid, offset } = decodeMetaValue(metavalue)
|
||||
result.push({
|
||||
room: metakey.slice(5),
|
||||
roomsid,
|
||||
offset: keys.reduce((cur, key) => globals.max(decodeHUKey(key).offset, cur), offset)
|
||||
})
|
||||
})
|
||||
).then(() => globals.presolve(result))
|
||||
}
|
||||
|
||||
export const getRoomMeta = (t, room) =>
|
||||
idb.get(getStoreCo(t), getCoMetaKey(room))
|
||||
|
||||
/**
|
||||
* Get all data from idb, including unconfirmed updates.
|
||||
* TODO: include updates in CU
|
||||
* @param {IDBTransaction} t
|
||||
* @param {string} room
|
||||
* @return {Promise<ArrayBuffer>}
|
||||
*/
|
||||
export const getRoomData = (t, room) => globals.pall([idb.get(getStoreCo(t), 'data:' + room), idb.getAll(getStoreHU(t), idb.createIDBKeyRangeBound(encodeHUKey(room, 0), encodeHUKey(room, 2 ** 32), false, false))]).then(([data, updates]) => {
|
||||
const encoder = encoding.createEncoder()
|
||||
encoding.writeArrayBuffer(encoder, data || new Uint8Array(0))
|
||||
updates.forEach(update => encoding.writeArrayBuffer(encoder, update))
|
||||
return encoding.toBuffer(encoder)
|
||||
})
|
||||
|
||||
const decodeMetaValue = buffer => {
|
||||
const decoder = decoding.createDecoder(buffer)
|
||||
const roomsid = decoding.readVarUint(decoder)
|
||||
const offset = decoding.readVarUint(decoder)
|
||||
return {
|
||||
roomsid, offset
|
||||
}
|
||||
}
|
||||
/**
|
||||
* @param {number} roomsid room session id
|
||||
* @param {number} offset
|
||||
* @return {ArrayBuffer}
|
||||
*/
|
||||
const encodeMetaValue = (roomsid, offset) => {
|
||||
const encoder = encoding.createEncoder()
|
||||
encoding.writeVarUint(encoder, roomsid)
|
||||
encoding.writeVarUint(encoder, offset)
|
||||
return encoding.toBuffer(encoder)
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the initial room data. Overwrites initial data if there is any!
|
||||
* @param {IDBTransaction} t
|
||||
* @param {string} room
|
||||
* @param {number} roomsessionid
|
||||
* @param {number} offset
|
||||
* @return {Promise<void>}
|
||||
*/
|
||||
export const confirmSubscription = (t, room, roomsessionid, offset) => globals.pall([
|
||||
idb.put(getStoreCo(t), encodeMetaValue(roomsessionid, offset), getCoMetaKey(room)),
|
||||
idb.put(getStoreCo(t), globals.createArrayBufferFromArray([]), getCoDataKey(room))
|
||||
]).then(() => idb.del(getStoreUS(t), room))
|
||||
|
||||
export const writeUnconfirmedSubscription = (t, room) => idb.put(getStoreUS(t), true, room)
|
||||
|
||||
export const getUnconfirmedSubscriptions = t => idb.getAllKeys(getStoreUS(t))
|
23
ydb/idbactions.test.js
Normal file
23
ydb/idbactions.test.js
Normal file
@ -0,0 +1,23 @@
|
||||
import * as globals from './globals.js'
|
||||
import * as idbactions from './idbactions.js'
|
||||
import * as test from './test.js'
|
||||
|
||||
idbactions.deleteDB().then(() => idbactions.openDB()).then(db => {
|
||||
test.run('update lifetime 1', async (testname) => {
|
||||
const update = new Uint8Array([1, 2, 3]).buffer
|
||||
const t = idbactions.createTransaction(db)
|
||||
idbactions.writeInitialRoomData(t, testname, 42, 1, new Uint8Array([0]).buffer)
|
||||
const clientConf = await idbactions.writeClientUnconfirmed(t, testname, update)
|
||||
await idbactions.writeHostUnconfirmedByClient(t, clientConf, 0)
|
||||
await idbactions.writeConfirmedByHost(t, testname, 4)
|
||||
const metas = await idbactions.getRoomMetas(t)
|
||||
const roommeta = metas.find(meta => meta.room === testname)
|
||||
if (roommeta == null || roommeta.offset !== 4 || roommeta.roomsid !== 42) {
|
||||
throw globals.error()
|
||||
}
|
||||
const data = await idbactions.getRoomData(t, testname)
|
||||
if (!test.compareArrays(new Uint8Array(data), new Uint8Array([0, 1, 2, 3]))) {
|
||||
throw globals.error()
|
||||
}
|
||||
})
|
||||
})
|
7
ydb/index.js
Normal file
7
ydb/index.js
Normal file
@ -0,0 +1,7 @@
|
||||
import * as ydbclient from './ydb-client.js'
|
||||
|
||||
/**
|
||||
* @param {string} url
|
||||
* @return {Promise<ydbclient.YdbClient>}
|
||||
*/
|
||||
export const createYdbClient = url => ydbclient.get(url)
|
23
ydb/logging.js
Normal file
23
ydb/logging.js
Normal file
@ -0,0 +1,23 @@
|
||||
|
||||
import * as globals from './globals.js'
|
||||
|
||||
let date = new Date().getTime()
|
||||
|
||||
const writeDate = () => {
|
||||
const oldDate = date
|
||||
date = new Date().getTime()
|
||||
return date - oldDate
|
||||
}
|
||||
|
||||
export const print = (...args) => console.log(...args)
|
||||
export const log = m => print(`%cydb-client: %c${m} %c+${writeDate()}ms`, 'color: blue;', '', 'color: blue')
|
||||
|
||||
export const fail = m => {
|
||||
throw new Error(m)
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {ArrayBuffer} buffer
|
||||
* @return {string}
|
||||
*/
|
||||
export const arrayBufferToString = buffer => JSON.stringify(Array.from(globals.createUint8ArrayFromBuffer(buffer)))
|
106
ydb/message.js
Normal file
106
ydb/message.js
Normal file
@ -0,0 +1,106 @@
|
||||
import * as encoding from './encoding.js'
|
||||
import * as decoding from './decoding.js'
|
||||
import * as idbactions from './idbactions.js'
|
||||
import * as logging from './logging.js'
|
||||
import * as bc from './broadcastchannel.js'
|
||||
|
||||
/* make sure to update message.go in ydb when updating these values.. */
|
||||
export const MESSAGE_UPDATE = 0 // TODO: rename host_unconfirmed?
|
||||
export const MESSAGE_SUB = 1
|
||||
export const MESSAGE_CONFIRMATION = 2
|
||||
export const MESSAGE_SUB_CONF = 3
|
||||
export const MESSAGE_HOST_UNCONFIRMED_BY_CLIENT = 4
|
||||
export const MESSAGE_CONFIRMED_BY_HOST = 5
|
||||
|
||||
/**
|
||||
* @param {any} ydb YdbClient instance
|
||||
* @param {ArrayBuffer} message
|
||||
*/
|
||||
export const readMessage = (ydb, message) => {
|
||||
const t = idbactions.createTransaction(ydb.db)
|
||||
const decoder = decoding.createDecoder(message)
|
||||
while (decoding.hasContent(decoder)) {
|
||||
switch (decoding.readVarUint(decoder)) {
|
||||
case MESSAGE_UPDATE: {
|
||||
const offset = decoding.readVarUint(decoder)
|
||||
const room = decoding.readVarString(decoder)
|
||||
const update = decoding.readPayload(decoder)
|
||||
logging.log(`Received Update. room "${room}", offset ${offset}, ${logging.arrayBufferToString(update)}`)
|
||||
idbactions.writeHostUnconfirmed(t, room, offset, update)
|
||||
bc.publish(room, update)
|
||||
break
|
||||
}
|
||||
case MESSAGE_SUB_CONF: {
|
||||
const nSubs = decoding.readVarUint(decoder)
|
||||
for (let i = 0; i < nSubs; i++) {
|
||||
const room = decoding.readVarString(decoder)
|
||||
const offset = decoding.readVarUint(decoder)
|
||||
const roomsid = decoding.readVarUint(decoder) // TODO: SID
|
||||
logging.log(`Received Sub Conf. room "${room}", offset ${offset}, roomsid ${roomsid}`)
|
||||
idbactions.confirmSubscription(t, room, roomsid, offset)
|
||||
}
|
||||
break
|
||||
}
|
||||
case MESSAGE_CONFIRMATION: {
|
||||
const room = decoding.readVarString(decoder)
|
||||
const offset = decoding.readVarUint(decoder)
|
||||
logging.log(`Received Confirmation. room "${room}", offset ${offset}`)
|
||||
idbactions.writeConfirmedByHost(t, room, offset)
|
||||
break
|
||||
}
|
||||
case MESSAGE_HOST_UNCONFIRMED_BY_CLIENT: {
|
||||
const clientConf = decoding.readVarUint(decoder)
|
||||
const offset = decoding.readVarUint(decoder)
|
||||
logging.log(`Received HostUnconfirmedByClient. clientConf "${clientConf}", offset ${offset}`)
|
||||
idbactions.writeHostUnconfirmedByClient(t, clientConf, offset)
|
||||
break
|
||||
}
|
||||
case MESSAGE_CONFIRMED_BY_HOST: {
|
||||
const room = decoding.readVarString(decoder)
|
||||
const offset = decoding.readVarUint(decoder)
|
||||
logging.log(`Received Confirmation By Host. room "${room}", offset ${offset}`)
|
||||
idbactions.writeConfirmedByHost(t, room, offset)
|
||||
break
|
||||
}
|
||||
default:
|
||||
logging.fail(`Unexpected message type`)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {string} room
|
||||
* @param {ArrayBuffer} update
|
||||
* @param {number} clientConf
|
||||
* @return {ArrayBuffer}
|
||||
*/
|
||||
export const createUpdate = (room, update, clientConf) => {
|
||||
const encoder = encoding.createEncoder()
|
||||
encoding.writeVarUint(encoder, MESSAGE_UPDATE)
|
||||
encoding.writeVarUint(encoder, clientConf)
|
||||
encoding.writeVarString(encoder, room)
|
||||
encoding.writePayload(encoder, update)
|
||||
return encoding.toBuffer(encoder)
|
||||
}
|
||||
|
||||
/**
|
||||
* @typedef SubDef
|
||||
* @type {Object}
|
||||
* @property {string} room
|
||||
* @property {number} offset
|
||||
*/
|
||||
|
||||
/**
|
||||
* @param {Array<SubDef>} rooms
|
||||
* @return {ArrayBuffer}
|
||||
*/
|
||||
export const createSub = rooms => {
|
||||
const encoder = encoding.createEncoder()
|
||||
encoding.writeVarUint(encoder, MESSAGE_SUB)
|
||||
encoding.writeVarUint(encoder, rooms.length)
|
||||
for (let i = 0; i < rooms.length; i++) {
|
||||
encoding.writeVarString(encoder, rooms[i].room)
|
||||
encoding.writeVarUint(encoder, rooms[i].offset)
|
||||
}
|
||||
return encoding.toBuffer(encoder)
|
||||
}
|
10
ydb/test.html
Normal file
10
ydb/test.html
Normal file
@ -0,0 +1,10 @@
|
||||
<!DOCTYPE html>
|
||||
<html>
|
||||
<head>
|
||||
</head>
|
||||
<body>
|
||||
<!--script type="module" src="./encoding.test.js"></script-->
|
||||
<script type="module" src="./idb.test.js"></script>
|
||||
<script type="module" src="./ydb-client.test.js"></script>
|
||||
</body>
|
||||
</html>
|
25
ydb/test.js
Normal file
25
ydb/test.js
Normal file
@ -0,0 +1,25 @@
|
||||
import * as logging from './logging.js'
|
||||
|
||||
export const run = async (name, f) => {
|
||||
console.log(`%cStart:%c ${name}`, 'color:blue;', '')
|
||||
const start = new Date()
|
||||
try {
|
||||
await f(name)
|
||||
} catch (e) {
|
||||
logging.print(`%cFailure:%c ${name} in %c${new Date().getTime() - start.getTime()}ms`, 'color:red;font-weight:bold', '', 'color:grey')
|
||||
throw e
|
||||
}
|
||||
logging.print(`%cSuccess:%c ${name} in %c${new Date().getTime() - start.getTime()}ms`, 'color:green;font-weight:bold', '', 'color:grey')
|
||||
}
|
||||
|
||||
export const compareArrays = (as, bs) => {
|
||||
if (as.length !== bs.length) {
|
||||
return false
|
||||
}
|
||||
for (let i = 0; i < as.length; i++) {
|
||||
if (as[i] !== bs[i]) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
135
ydb/ydb-client.js
Normal file
135
ydb/ydb-client.js
Normal file
@ -0,0 +1,135 @@
|
||||
/* eslint-env browser */
|
||||
import * as idbactions from './idbactions.js'
|
||||
import * as globals from './globals.js'
|
||||
import * as message from './message.js'
|
||||
import * as bc from './broadcastchannel.js'
|
||||
import * as encoding from './encoding.js'
|
||||
import * as logging from './logging.js'
|
||||
import * as idb from './idb.js'
|
||||
import Y from '../src/Y.js'
|
||||
|
||||
export class YdbClient {
|
||||
constructor (url, db) {
|
||||
this.url = url
|
||||
this.ws = new WebSocket(url)
|
||||
this.rooms = new Map()
|
||||
this.db = db
|
||||
this.connected = false
|
||||
initWS(this, this.ws)
|
||||
}
|
||||
/**
|
||||
* Open a Yjs instance that connects to `roomname`.
|
||||
* @param {string} roomname
|
||||
* @return {Y}
|
||||
*/
|
||||
getY (roomname) {
|
||||
const y = new Y(roomname)
|
||||
y.on('afterTransaction', function () {
|
||||
debugger
|
||||
})
|
||||
return y
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize WebSocket connection. Try to reconnect on error/disconnect.
|
||||
* @param {YdbClient} ydb
|
||||
* @param {WebSocket} ws
|
||||
*/
|
||||
const initWS = (ydb, ws) => {
|
||||
ws.binaryType = 'arraybuffer'
|
||||
ws.onclose = () => {
|
||||
ydb.connected = false
|
||||
logging.log('Disconnected from ydb. Reconnecting..')
|
||||
ydb.ws = new WebSocket(ydb.url)
|
||||
initWS(ydb, ws)
|
||||
}
|
||||
ws.onopen = () => {
|
||||
const t = idbactions.createTransaction(ydb.db)
|
||||
globals.pall([idbactions.getRoomMetas(t), idbactions.getUnconfirmedSubscriptions(t), idbactions.getUnconfirmedUpdates(t)]).then(([metas, us, unconfirmedUpdates]) => {
|
||||
const subs = []
|
||||
metas.forEach(meta => {
|
||||
subs.push({
|
||||
room: meta.room,
|
||||
offset: meta.offset
|
||||
})
|
||||
})
|
||||
us.forEach(room => {
|
||||
subs.push({
|
||||
room, offset: 0
|
||||
})
|
||||
})
|
||||
ydb.connected = true
|
||||
const encoder = encoding.createEncoder()
|
||||
encoding.writeArrayBuffer(encoder, message.createSub(subs))
|
||||
encoding.writeArrayBuffer(encoder, unconfirmedUpdates)
|
||||
send(ydb, encoding.toBuffer(encoder))
|
||||
})
|
||||
}
|
||||
ws.onmessage = event => message.readMessage(ydb, event.data)
|
||||
}
|
||||
|
||||
// maps from dbNamespace to db
|
||||
const dbPromises = new Map()
|
||||
|
||||
/**
|
||||
* Factory function. Get a ydb instance that connects to url, and uses dbNamespace as indexeddb namespace.
|
||||
* Create if it does not exist yet.
|
||||
*
|
||||
* @param {string} url
|
||||
* @param {string} dbNamespace
|
||||
* @return {Promise<YdbClient>}
|
||||
*/
|
||||
export const get = (url, dbNamespace = 'ydb') => {
|
||||
if (!dbPromises.has(dbNamespace)) {
|
||||
dbPromises.set(dbNamespace, idbactions.openDB(dbNamespace))
|
||||
}
|
||||
return dbPromises.get(dbNamespace).then(db => globals.presolve(new YdbClient(url, db)))
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove a db namespace. Call this to remove any persisted data. Make sure to close active sessions.
|
||||
* TODO: destroy active ydbClient sessions / throw if a session is still active
|
||||
* @param {string} dbNamespace
|
||||
* @return {Promise}
|
||||
*/
|
||||
export const clear = (dbNamespace = 'ydb') => idb.deleteDB(dbNamespace)
|
||||
|
||||
/**
|
||||
* @param {YdbClient} ydb
|
||||
* @param {ArrayBuffer} m
|
||||
*/
|
||||
export const send = (ydb, m) => ydb.connected && ydb.ws.send(m)
|
||||
|
||||
/**
|
||||
* @param {YdbClient} ydb
|
||||
* @param {string} room
|
||||
* @param {ArrayBuffer} update
|
||||
*/
|
||||
export const update = (ydb, room, update) => {
|
||||
bc.publish(room, update)
|
||||
const t = idbactions.createTransaction(ydb.db)
|
||||
logging.log(`Write Unconfirmed Update. room "${room}", ${JSON.stringify(update)}`)
|
||||
return idbactions.writeClientUnconfirmed(t, room, update).then(clientConf => {
|
||||
logging.log(`Send Unconfirmed Update. connected ${ydb.connected} room "${room}", clientConf ${clientConf}, ${logging.arrayBufferToString(update)}`)
|
||||
send(ydb, message.createUpdate(room, update, clientConf))
|
||||
})
|
||||
}
|
||||
|
||||
export const subscribe = (ydb, room, f) => {
|
||||
bc.subscribe(room, f)
|
||||
const t = idbactions.createTransaction(ydb.db)
|
||||
idbactions.getRoomData(t, room).then(data => {
|
||||
if (data.byteLength > 0) {
|
||||
f(data)
|
||||
}
|
||||
})
|
||||
idbactions.getRoomMeta(t, room).then(meta => {
|
||||
if (meta === undefined) {
|
||||
logging.log(`Send Subscribe. room "${room}", offset ${0}`)
|
||||
// TODO: maybe set prelim meta value so we don't sub twice
|
||||
send(ydb, message.createSub([{ room, offset: 0 }]))
|
||||
idbactions.writeUnconfirmedSubscription(t, room)
|
||||
}
|
||||
})
|
||||
}
|
85
ydb/ydb-client.test.js
Normal file
85
ydb/ydb-client.test.js
Normal file
@ -0,0 +1,85 @@
|
||||
/* eslint-env browser */
|
||||
|
||||
import * as test from './test.js'
|
||||
import * as ydbClient from './ydb-client.js'
|
||||
import * as globals from './globals.js'
|
||||
import * as idbactions from './idbactions.js'
|
||||
import * as logging from './logging.js'
|
||||
|
||||
const wsUrl = 'ws://127.0.0.1:8899/ws'
|
||||
const testRoom = 'testroom'
|
||||
|
||||
class YdbTestClient {
|
||||
constructor (ydb) {
|
||||
this.ydb = ydb
|
||||
this.createdUpdates = new Set()
|
||||
this.data = []
|
||||
this.checked = new Set()
|
||||
}
|
||||
}
|
||||
|
||||
const clearAllYdbContent = () => fetch('http://127.0.0.1:8899/clearAll', { mode: 'no-cors' })
|
||||
|
||||
/**
|
||||
* @param {string} name
|
||||
* @return {Promise<YdbTestClient>}
|
||||
*/
|
||||
const getTestClient = async name => {
|
||||
await ydbClient.clear('ydb-' + name)
|
||||
const ydb = await ydbClient.get(wsUrl, 'ydb-' + name)
|
||||
const testClient = new YdbTestClient(ydb)
|
||||
ydbClient.subscribe(ydb, testRoom, data => {
|
||||
testClient.data.push(data)
|
||||
globals.createArrayFromArrayBuffer(data).forEach(d => {
|
||||
if (d < nextUpdateNumber) {
|
||||
testClient.checked.add(d)
|
||||
}
|
||||
})
|
||||
console.log(name, 'received data', data, testClient.data)
|
||||
})
|
||||
return testClient
|
||||
}
|
||||
|
||||
// TODO: does only work for 8bit numbers..
|
||||
let nextUpdateNumber = 0
|
||||
|
||||
/**
|
||||
* Create an update. We use an increasing message counter so each update is unique.
|
||||
* @param {YdbTestClient} client
|
||||
*/
|
||||
const update = (client) => {
|
||||
ydbClient.update(client.ydb, testRoom, globals.createArrayBufferFromArray([nextUpdateNumber++]))
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if tsclient has all data in dataset
|
||||
* @param {...YdbTestClient} testClients
|
||||
*/
|
||||
const checkTestClients = (...testClients) => globals.until(100000, () => testClients.every(testClient =>
|
||||
testClient.checked.size === nextUpdateNumber
|
||||
)).then(() =>
|
||||
globals.wait(150) // wait 150 for all conf messages to come in..
|
||||
// TODO: do the below check in the until handler
|
||||
).then(() => globals.pall(testClients.map(testClient => idbactions.getRoomData(idbactions.createTransaction(testClient.ydb.db), testRoom)))).then(testClientsData => {
|
||||
testClientsData.forEach((testClientData, i) => {
|
||||
const checked = new Set()
|
||||
globals.createArrayFromArrayBuffer(testClientData).forEach(d => {
|
||||
if (checked.has(d)) {
|
||||
logging.fail('duplicate content')
|
||||
}
|
||||
checked.add(d)
|
||||
})
|
||||
if (checked.size !== nextUpdateNumber) {
|
||||
logging.fail(`Not all data is available in idb in client ${i}`)
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
clearAllYdbContent().then(() => {
|
||||
test.run('ydb-client', async testname => {
|
||||
const c1 = await getTestClient('1')
|
||||
const c2 = await getTestClient('2')
|
||||
update(c1)
|
||||
await checkTestClients(c1, c2)
|
||||
})
|
||||
})
|
Loading…
x
Reference in New Issue
Block a user