diff --git a/examples/textarea/index.html b/examples/textarea/index.html index 0aaf8cc4..6a811663 100644 --- a/examples/textarea/index.html +++ b/examples/textarea/index.html @@ -2,8 +2,6 @@ - - - + diff --git a/examples/textarea/index.js b/examples/textarea/index.js index 77e7a729..15ecb028 100644 --- a/examples/textarea/index.js +++ b/examples/textarea/index.js @@ -1,5 +1,14 @@ -/* global Y */ +import { createYdbClient } from '../../ydb/index.js' +import Y from '../../src/Y.dist.js' +createYdbClient('ws://localhost:8899/ws').then(ydbclient => { + const y = ydbclient.getY('textarea') + let type = y.define('textarea', Y.Text) + let textarea = document.querySelector('textarea') + window.binding = new Y.TextareaBinding(type, textarea) +}) + +/* let y = new Y('textarea-example', { connector: { name: 'websockets-client', @@ -13,3 +22,4 @@ window.yTextarea = y let type = y.define('textarea', Y.Text) let textarea = document.querySelector('textarea') window.binding = new Y.TextareaBinding(type, textarea) +*/ \ No newline at end of file diff --git a/src/Connector.js b/src/Connector.js index 654a2e9f..99f6ad0a 100644 --- a/src/Connector.js +++ b/src/Connector.js @@ -5,7 +5,8 @@ import { sendSyncStep1, readSyncStep1 } from './MessageHandler/syncStep1.js' import { readSyncStep2 } from './MessageHandler/syncStep2.js' import { integrateRemoteStructs } from './MessageHandler/integrateRemoteStructs.js' -import debug from 'debug' +// TODO: reintroduce or remove +// import debug from 'debug' // TODO: rename Connector diff --git a/src/Y.dist.js b/src/Y.dist.js index fbf2445a..50b0c9be 100644 --- a/src/Y.dist.js +++ b/src/Y.dist.js @@ -22,7 +22,6 @@ import QuillBinding from './Bindings/QuillBinding/QuillBinding.js' import DomBinding from './Bindings/DomBinding/DomBinding.js' import { toBinary, fromBinary } from './MessageHandler/binaryEncode.js' -import debug from 'debug' import domToType from './Bindings/DomBinding/domToType.js' import { domsToTypes, switchAssociation } from './Bindings/DomBinding/util.js' @@ -56,7 +55,4 @@ Y.utils = { fromBinary } -Y.debug = debug -debug.formatters.Y = messageToString -debug.formatters.y = messageToRoomname export default Y diff --git a/src/Y.js b/src/Y.js index 0279cf17..ba87d929 100644 --- a/src/Y.js +++ b/src/Y.js @@ -36,12 +36,12 @@ export default class Y extends NamedEventHandler { * @type {String} */ this.room = room - if (opts != null) { + if (opts != null && opts.connector != null) { opts.connector.room = room } this._contentReady = false this._opts = opts - if (typeof opts.userID !== 'number') { + if (opts == null || typeof opts.userID !== 'number') { this.userID = generateRandomUint32() } else { this.userID = opts.userID diff --git a/ydb/README.md b/ydb/README.md new file mode 100644 index 00000000..3e740cee --- /dev/null +++ b/ydb/README.md @@ -0,0 +1 @@ +* Host should discard message when confNumber is older than expected \ No newline at end of file diff --git a/ydb/broadcastchannel.js b/ydb/broadcastchannel.js new file mode 100644 index 00000000..99d205e2 --- /dev/null +++ b/ydb/broadcastchannel.js @@ -0,0 +1,45 @@ +/* eslint-env browser */ + +import * as decoding from './decoding.js' +import * as encoding from './encoding.js' + +const bc = new BroadcastChannel('ydb-client') +const subs = new Map() + +bc.onmessage = event => { + const decoder = decoding.createDecoder(event.data) + const room = decoding.readVarString(decoder) + const update = decoding.readTail(decoder) + const rsubs = subs.get(room) + if (rsubs !== undefined) { + rsubs.forEach(f => f(update)) + } +} + +/** + * @param {string} room + * @param {function(ArrayBuffer)} f + */ +export const subscribe = (room, f) => { + let rsubs = subs.get(room) + if (rsubs === undefined) { + rsubs = new Set() + subs.set(room, rsubs) + } + rsubs.add(f) +} + +/** + * @param {string} room + * @param {ArrayBuffer} update + */ +export const publish = (room, update) => { + const encoder = encoding.createEncoder() + encoding.writeVarString(encoder, room) + encoding.writeArrayBuffer(encoder, update) + bc.postMessage(encoding.toBuffer(encoder)) + const rsubs = subs.get(room) + if (rsubs !== undefined) { + rsubs.forEach(f => f(update)) + } +} diff --git a/ydb/decoding.js b/ydb/decoding.js new file mode 100644 index 00000000..eba0220b --- /dev/null +++ b/ydb/decoding.js @@ -0,0 +1,188 @@ + +/* global Buffer */ + +import * as globals from './globals.js' + +/** + * A Decoder handles the decoding of an ArrayBuffer. + */ +class Decoder { + /** + * @param {ArrayBuffer} buffer Binary data to decode + */ + constructor (buffer) { + this.arr = new Uint8Array(buffer) + this.pos = 0 + } +} + +/** + * @param {ArrayBuffer} buffer + * @return {Decoder} + */ +export const createDecoder = buffer => new Decoder(buffer) + +export const hasContent = decoder => decoder.pos !== decoder.arr.length + +/** + * Clone a decoder instance. + * Optionally set a new position parameter. + * @param {Decoder} decoder The decoder instance + * @return {Decoder} A clone of `decoder` + */ +export const clone = (decoder, newPos = decoder.pos) => { + let _decoder = createDecoder(decoder.arr.buffer) + _decoder.pos = newPos + return _decoder +} + +/** + * Read `len` bytes as an ArrayBuffer. + * @param {Decoder} decoder The decoder instance + * @param {number} len The length of bytes to read + * @return {ArrayBuffer} + */ +export const readArrayBuffer = (decoder, len) => { + const arrayBuffer = globals.createUint8ArrayFromLen(len) + const view = globals.createUint8ArrayFromBuffer(decoder.arr.buffer, decoder.pos, len) + arrayBuffer.set(view) + decoder.pos += len + return arrayBuffer.buffer +} + +/** + * Read variable length payload as ArrayBuffer + * @param {Decoder} decoder + * @return {ArrayBuffer} + */ +export const readPayload = decoder => readArrayBuffer(decoder, readVarUint(decoder)) + +/** + * Read the rest of the content as an ArrayBuffer + * @param {Decoder} decoder + * @return {ArrayBuffer} + */ +export const readTail = decoder => readArrayBuffer(decoder, decoder.arr.length - decoder.pos) + +/** + * Skip one byte, jump to the next position. + * @param {Decoder} decoder The decoder instance + * @return {number} The next position + */ +export const skip8 = decoder => decoder.pos++ + +/** + * Read one byte as unsigned integer. + * @param {Decoder} decoder The decoder instance + * @return {number} Unsigned 8-bit integer + */ +export const readUint8 = decoder => decoder.arr[decoder.pos++] + +/** + * Read 4 bytes as unsigned integer. + * + * @param {Decoder} decoder + * @return {number} An unsigned integer. + */ +export const readUint32 = decoder => { + let uint = + decoder.arr[decoder.pos] + + (decoder.arr[decoder.pos + 1] << 8) + + (decoder.arr[decoder.pos + 2] << 16) + + (decoder.arr[decoder.pos + 3] << 24) + decoder.pos += 4 + return uint +} + +/** + * Look ahead without incrementing position. + * to the next byte and read it as unsigned integer. + * + * @param {Decoder} decoder + * @return {number} An unsigned integer. + */ +export const peekUint8 = decoder => decoder.arr[decoder.pos] + +/** + * Read unsigned integer (32bit) with variable length. + * 1/8th of the storage is used as encoding overhead. + * * numbers < 2^7 is stored in one bytlength + * * numbers < 2^14 is stored in two bylength + * + * @param {Decoder} decoder + * @return {number} An unsigned integer.length + */ +export const readVarUint = decoder => { + let num = 0 + let len = 0 + while (true) { + let r = decoder.arr[decoder.pos++] + num = num | ((r & 0b1111111) << len) + len += 7 + if (r < 1 << 7) { + return num >>> 0 // return unsigned number! + } + if (len > 35) { + throw new Error('Integer out of range!') + } + } +} + +/** + * Read string of variable length + * * varUint is used to store the length of the string + * + * Transforming utf8 to a string is pretty expensive. The code performs 10x better + * when String.fromCodePoint is fed with all characters as arguments. + * But most environments have a maximum number of arguments per functions. + * For effiency reasons we apply a maximum of 10000 characters at once. + * + * @param {Decoder} decoder + * @return {String} The read String. + */ +export const readVarString = decoder => { + let remainingLen = readVarUint(decoder) + let encodedString = '' + while (remainingLen > 0) { + const nextLen = remainingLen < 10000 ? remainingLen : 10000 + const bytes = new Array(nextLen) + for (let i = 0; i < nextLen; i++) { + bytes[i] = decoder.arr[decoder.pos++] + } + encodedString += String.fromCodePoint.apply(null, bytes) + remainingLen -= nextLen + } + return decodeURIComponent(escape(encodedString)) +} + +/** + * Look ahead and read varString without incrementing position + * @param {Decoder} decoder + * @return {string} + */ +export const peekVarString = decoder => { + let pos = decoder.pos + let s = readVarString(decoder) + decoder.pos = pos + return s +} + +/** + * Read ID. + * * If first varUint read is 0xFFFFFF a RootID is returned. + * * Otherwise an ID is returned + * + * @param {Decoder} decoder + * @return {ID} + * +export const readID = decoder => { + let user = decoder.readVarUint() + if (user === RootFakeUserID) { + // read property name and type id + const rid = new RootID(decoder.readVarString(), null) + rid.type = decoder.readVarUint() + return rid + } + return new ID(user, decoder.readVarUint()) +} +*/ diff --git a/ydb/encoding.js b/ydb/encoding.js new file mode 100644 index 00000000..e56197c2 --- /dev/null +++ b/ydb/encoding.js @@ -0,0 +1,234 @@ + +import * as globals from './globals.js' + +const bits7 = 0b1111111 +const bits8 = 0b11111111 + +/** + * A BinaryEncoder handles the encoding to an ArrayBuffer. + */ +class Encoder { + constructor () { + this.cpos = 0 + this.cbuf = globals.createUint8ArrayFromLen(1000) + this.bufs = [] + } +} + +export const createEncoder = () => new Encoder() + +/** + * The current length of the encoded data. + */ +export const length = encoder => { + let len = 0 + for (let i = 0; i < encoder.bufs.length; i++) { + len += encoder.bufs[i].length + } + len += encoder.cpos + return len +} + +/** + * Transform to ArrayBuffer. + * @param {Encoder} encoder + * @return {ArrayBuffer} The created ArrayBuffer. + */ +export const toBuffer = encoder => { + const uint8arr = globals.createUint8ArrayFromLen(length(encoder)) + let curPos = 0 + for (let i = 0; i < encoder.bufs.length; i++) { + let d = encoder.bufs[i] + uint8arr.set(d, curPos) + curPos += d.length + } + uint8arr.set(globals.createUint8ArrayFromBuffer(encoder.cbuf.buffer, 0, encoder.cpos), curPos) + return uint8arr.buffer +} + +/** + * Write one byte to the encoder. + * + * @param {Encoder} encoder + * @param {number} num The byte that is to be encoded. + */ +export const write = (encoder, num) => { + if (encoder.cpos === encoder.cbuf.length) { + encoder.bufs.push(encoder.cbuf) + encoder.cbuf = globals.createUint8ArrayFromLen(encoder.cbuf.length * 2) + encoder.cpos = 0 + } + encoder.cbuf[encoder.cpos++] = num +} + +/** + * Write one byte at a specific position. + * Position must already be written (i.e. encoder.length > pos) + * + * @param {Encoder} encoder + * @param {number} pos Position to which to write data + * @param {number} num Unsigned 8-bit integer + */ +export const set = (encoder, pos, num) => { + let buffer = null + // iterate all buffers and adjust position + for (let i = 0; i < encoder.bufs.length && buffer === null; i++) { + const b = encoder.bufs[i] + if (pos < b.length) { + buffer = b // found buffer + } else { + pos -= b.length + } + } + if (buffer === null) { + // use current buffer + buffer = encoder.cbuf + } + buffer[pos] = num +} + +/** + * Write one byte as an unsigned integer. + * + * @param {Encoder} encoder + * @param {number} num The number that is to be encoded. + */ +export const writeUint8 = (encoder, num) => write(encoder, num & bits8) + +/** + * Write one byte as an unsigned Integer at a specific location. + * + * @param {Encoder} encoder + * @param {number} pos The location where the data will be written. + * @param {number} num The number that is to be encoded. + */ +export const setUint8 = (encoder, pos, num) => set(encoder, pos, num & bits8) + +/** + * Write two bytes as an unsigned integer. + * + * @param {Encoder} encoder + * @param {number} num The number that is to be encoded. + */ +export const writeUint16 = (encoder, num) => { + write(encoder, num & bits8) + write(encoder, (num >>> 8) & bits8) +} +/** + * Write two bytes as an unsigned integer at a specific location. + * + * @param {Encoder} encoder + * @param {number} pos The location where the data will be written. + * @param {number} num The number that is to be encoded. + */ +export const setUint16 = (encoder, pos, num) => { + set(encoder, pos, num & bits8) + set(encoder, pos + 1, (num >>> 8) & bits8) +} + +/** + * Write two bytes as an unsigned integer + * + * @param {Encoder} encoder + * @param {number} num The number that is to be encoded. + */ +export const writeUint32 = (encoder, num) => { + for (let i = 0; i < 4; i++) { + write(encoder, num & bits8) + num >>>= 8 + } +} + +/** + * Write two bytes as an unsigned integer at a specific location. + * + * @param {Encoder} encoder + * @param {number} pos The location where the data will be written. + * @param {number} num The number that is to be encoded. + */ +export const setUint32 = (encoder, pos, num) => { + for (let i = 0; i < 4; i++) { + set(encoder, pos + i, num & bits8) + num >>>= 8 + } +} + +/** + * Write a variable length unsigned integer. + * + * Encodes integers in the range from [0, 4294967295] / [0, 0xffffffff]. (max 32 bit unsigned integer) + * + * @param {Encoder} encoder + * @param {number} num The number that is to be encoded. + */ +export const writeVarUint = (encoder, num) => { + while (num >= 0b10000000) { + write(encoder, 0b10000000 | (bits7 & num)) + num >>>= 7 + } + write(encoder, bits7 & num) +} + +/** + * Write a variable length string. + * + * @param {Encoder} encoder + * @param {String} str The string that is to be encoded. + */ +export const writeVarString = (encoder, str) => { + const encodedString = unescape(encodeURIComponent(str)) + const len = encodedString.length + writeVarUint(encoder, len) + for (let i = 0; i < len; i++) { + write(encoder, encodedString.codePointAt(i)) + } +} + +/** + * Write the content of another biUint8Arr + * + * @param {Encoder} encoder The enUint8Arr + * @param encoderToAppend The BinaryEncoder to be written. + */ +export const writeBinaryEncoder = (encoder, encoderToAppend) => writeArrayBuffer(encoder, toBuffer(encoder)) + +/** + * Append an arrayBuffer to the encoder. + * + * @param {Encoder} encoder + * @param {ArrayBuffer} arrayBuffer + */ +export const writeArrayBuffer = (encoder, arrayBuffer) => { + const prevBufferLen = encoder.cbuf.length + // TODO: Append to cbuf if possible + encoder.bufs.push(globals.createUint8ArrayFromBuffer(encoder.cbuf.buffer, 0, encoder.cpos)) + encoder.bufs.push(globals.createUint8ArrayFromArrayBuffer(arrayBuffer)) + encoder.cbuf = globals.createUint8ArrayFromLen(prevBufferLen) + encoder.cpos = 0 +} + +/** + * @param {Encoder} encoder + * @param {ArrayBuffer} arrayBuffer + */ +export const writePayload = (encoder, arrayBuffer) => { + writeVarUint(encoder, arrayBuffer.byteLength) + writeArrayBuffer(encoder, arrayBuffer) +} + +/** + * Write an ID at the current position. + * + * @param {ID} id The ID that is to be written. + * +export const writeID = (encoder, id) => { + const user = id.user + writeVarUint(encoder, user) + if (user !== RootFakeUserID) { + writeVarUint(encoder, id.clock) + } else { + writeVarString(encoder, id.name) + writeVarUint(encoder, id.type) + } +} +*/ diff --git a/ydb/encoding.test.js b/ydb/encoding.test.js new file mode 100644 index 00000000..b957707b --- /dev/null +++ b/ydb/encoding.test.js @@ -0,0 +1,49 @@ +import * as encoding from './encoding.js' + +/** + * Check if binary encoding is compatible with golang binary encoding - binary.PutVarUint. + * + * Result: is compatible up to 32 bit: [0, 4294967295] / [0, 0xffffffff]. (max 32 bit unsigned integer) + */ +let err = null +try { + const tests = [ + { in: 0, out: [0] }, + { in: 1, out: [1] }, + { in: 128, out: [128, 1] }, + { in: 200, out: [200, 1] }, + { in: 32, out: [32] }, + { in: 500, out: [244, 3] }, + { in: 256, out: [128, 2] }, + { in: 700, out: [188, 5] }, + { in: 1024, out: [128, 8] }, + { in: 1025, out: [129, 8] }, + { in: 4048, out: [208, 31] }, + { in: 5050, out: [186, 39] }, + { in: 1000000, out: [192, 132, 61] }, + { in: 34951959, out: [151, 166, 213, 16] }, + { in: 2147483646, out: [254, 255, 255, 255, 7] }, + { in: 2147483647, out: [255, 255, 255, 255, 7] }, + { in: 2147483648, out: [128, 128, 128, 128, 8] }, + { in: 2147483700, out: [180, 128, 128, 128, 8] }, + { in: 4294967294, out: [254, 255, 255, 255, 15] }, + { in: 4294967295, out: [255, 255, 255, 255, 15] } + ] + tests.forEach(test => { + const encoder = encoding.createEncoder() + encoding.writeVarUint(encoder, test.in) + const buffer = new Uint8Array(encoding.toBuffer(encoder)) + if (buffer.byteLength !== test.out.length) { + throw new Error('Length don\'t match!') + } + for (let j = 0; j < buffer.length; j++) { + if (buffer[j] !== test[1][j]) { + throw new Error('values don\'t match!') + } + } + }) +} catch (error) { + err = error +} finally { + console.log('YDB Client: Encoding varUint compatiblity test: ', err || 'success!') +} diff --git a/ydb/globals.js b/ydb/globals.js new file mode 100644 index 00000000..39ca5206 --- /dev/null +++ b/ydb/globals.js @@ -0,0 +1,59 @@ +/* eslint-env browser */ + +export const Uint8Array_ = Uint8Array + +/** + * @param {Array} arr + * @return {ArrayBuffer} + */ +export const createArrayBufferFromArray = arr => new Uint8Array_(arr).buffer + +export const createUint8ArrayFromLen = len => new Uint8Array_(len) + +/** + * Create Uint8Array with initial content from buffer + */ +export const createUint8ArrayFromBuffer = (buffer, byteOffset, length) => new Uint8Array_(buffer, byteOffset, length) + +/** + * Create Uint8Array with initial content from buffer + */ +export const createUint8ArrayFromArrayBuffer = arraybuffer => new Uint8Array_(arraybuffer) +export const createArrayFromArrayBuffer = arraybuffer => Array.from(createUint8ArrayFromArrayBuffer(arraybuffer)) + +export const createPromise = f => new Promise(f) +/** + * `Promise.all` wait for all promises in the array to resolve and return the result + * @param {Array>} arrp + * @return {any} + */ +export const pall = arrp => Promise.all(arrp) +export const preject = reason => Promise.reject(reason) +export const presolve = res => Promise.resolve(res) + +export const until = (timeout, check) => createPromise((resolve, reject) => { + const hasTimeout = timeout > 0 + const untilInterval = () => { + if (check()) { + clearInterval(intervalHandle) + resolve() + } else if (hasTimeout) { + timeout -= 10 + if (timeout < 0) { + clearInterval(intervalHandle) + reject(error('Timeout')) + } + } + } + const intervalHandle = setInterval(untilInterval, 10) +}) + +export const error = description => new Error(description) + +export const max = (a, b) => a > b ? a : b + +/** + * @param {number} t Time to wait + * @return {Promise} Promise that is resolved after t ms + */ +export const wait = t => createPromise(r => setTimeout(r, t)) diff --git a/ydb/idb.js b/ydb/idb.js new file mode 100644 index 00000000..3a78261b --- /dev/null +++ b/ydb/idb.js @@ -0,0 +1,144 @@ +/* eslint-env browser */ + +import * as globals from './globals.js' + +/* + * IDB Request to Promise transformer + */ +export const rtop = request => globals.createPromise((resolve, reject) => { + request.onerror = event => reject(new Error(event.target.error)) + request.onblocked = () => location.reload() + request.onsuccess = event => resolve(event.target.result) +}) + +/** + * @return {Promise} + */ +export const openDB = (name, initDB) => globals.createPromise((resolve, reject) => { + let request = indexedDB.open(name) + /** + * @param {any} event + */ + request.onupgradeneeded = event => initDB(event.target.result) + /** + * @param {any} event + */ + request.onerror = event => reject(new Error(event.target.error)) + request.onblocked = () => location.reload() + /** + * @param {any} event + */ + request.onsuccess = event => { + const db = event.target.result + db.onversionchange = () => { db.close() } + addEventListener('unload', () => db.close()) + resolve(db) + } +}) + +export const deleteDB = name => rtop(indexedDB.deleteDatabase(name)) + +export const createStores = (db, definitions) => definitions.forEach(d => + db.createObjectStore.apply(db, d) +) + +/** + * @param {IDBObjectStore} store + * @param {String | number | ArrayBuffer | Date | Array } key + * @return {Promise} + */ +export const get = (store, key) => + rtop(store.get(key)) + +/** + * @param {IDBObjectStore} store + * @param {String | number | ArrayBuffer | Date | IDBKeyRange | Array } key + */ +export const del = (store, key) => + rtop(store.delete(key)) + +/** + * @param {IDBObjectStore} store + * @param {String | number | ArrayBuffer | Date | boolean} item + * @param {String | number | ArrayBuffer | Date | Array} [key] + */ +export const put = (store, item, key) => + rtop(store.put(item, key)) + +/** + * @param {IDBObjectStore} store + * @param {String | number | ArrayBuffer | Date | boolean} item + * @param {String | number | ArrayBuffer | Date | Array} [key] + * @return {Promise} + */ +export const add = (store, item, key) => + rtop(store.add(item, key)) + +/** + * @param {IDBObjectStore} store + * @param {String | number | ArrayBuffer | Date} item + * @return {Promise} + */ +export const addAutoKey = (store, item) => + rtop(store.add(item)) + +/** + * @param {IDBObjectStore} store + * @param {IDBKeyRange} [range] + */ +export const getAll = (store, range) => + rtop(store.getAll(range)) + +/** + * @param {IDBObjectStore} store + * @param {IDBKeyRange} [range] + */ +export const getAllKeys = (store, range) => + rtop(store.getAllKeys(range)) + +/** + * Iterate on keys and values + * @param {IDBObjectStore} store + * @param {IDBKeyRange?} keyrange + * @param {function(any, any)} f Return true in order to continue the cursor + */ +export const iterate = (store, keyrange, f) => globals.createPromise((resolve, reject) => { + const request = store.openCursor(keyrange) + request.onerror = reject + /** + * @param {any} event + */ + request.onsuccess = event => { + const cursor = event.target.result + if (cursor === null) { + return resolve() + } + f(cursor.value, cursor.key) + cursor.continue() + } +}) + +/** + * Iterate on the keys (no values) + * @param {IDBObjectStore} store + * @param {IDBKeyRange} keyrange + * @param {function(IDBCursor)} f Call `idbcursor.continue()` to iterate further + */ +export const iterateKeys = (store, keyrange, f) => { + /** + * @param {any} event + */ + store.openKeyCursor(keyrange).onsuccess = event => f(event.target.result) +} + +/** + * Open store from transaction + * @param {IDBTransaction} t + * @param {String} store + * @returns {IDBObjectStore} + */ +export const getStore = (t, store) => t.objectStore(store) + +export const createIDBKeyRangeBound = (lower, upper, lowerOpen, upperOpen) => IDBKeyRange.bound(lower, upper, lowerOpen, upperOpen) +export const createIDBKeyRangeUpperBound = (upper, upperOpen) => IDBKeyRange.upperBound(upper, upperOpen) +export const createIDBKeyRangeLowerBound = (lower, lowerOpen) => IDBKeyRange.lowerBound(lower, lowerOpen) diff --git a/ydb/idb.test.js b/ydb/idb.test.js new file mode 100644 index 00000000..ac9e82a1 --- /dev/null +++ b/ydb/idb.test.js @@ -0,0 +1,34 @@ +import * as test from './test.js' +import * as idb from './idb.js' +import * as logging from './logging.js' + +const initTestDB = db => idb.createStores(db, [['test']]) +const testDBName = 'idb-test' + +const createTransaction = db => db.transaction(['test'], 'readwrite') +/** + * @param {IDBTransaction} t + * @return {IDBObjectStore} + */ +const getStore = t => idb.getStore(t, 'test') + +idb.deleteDB(testDBName).then(() => idb.openDB(testDBName, initTestDB)).then(db => { + test.run('idb iteration', async testname => { + const t = createTransaction(db) + await idb.put(getStore(t), 0, ['t', 0]) + await idb.put(getStore(t), 1, ['t', 1]) + const valsGetAll = await idb.getAll(getStore(t)) + if (valsGetAll.length !== 2) { + logging.fail('getAll does not return two values') + } + const valsIterate = [] + const keyrange = idb.createIDBKeyRangeBound(['t', 0], ['t', 1], false, false) + await idb.put(getStore(t), 2, ['t', 2]) + await idb.iterate(getStore(t), keyrange, (val, key) => { + valsIterate.push(val) + }) + if (valsIterate.length !== 2) { + logging.fail('iterate does not return two values') + } + }) +}) diff --git a/ydb/idbactions.js b/ydb/idbactions.js new file mode 100644 index 00000000..a7fd0e8b --- /dev/null +++ b/ydb/idbactions.js @@ -0,0 +1,300 @@ +/* eslint-env browser */ + +/** + * Naming conventions: + * * ydb: Think of ydb as a federated set of servers. This is not yet true, but we will eventually get there. With this assumption come some challenges with the client + * * ydb instance: A single ydb instance that this ydb-client connects to + * * (room) host: Exactly one ydb instance controls a room at any time. The ownership may change over time. The host of a room is the ydb instance that owns it. This is not necessarily the instance we connect to. + * * room session id: An random id that is assigned to a room. When the server dies unexpectedly, we can conclude which data is missing and send it to the server (or delete it and prevent duplicate content) + * * update: An ArrayBuffer of binary data. Neither Ydb nor Ydb-client care about the content of update. Updates may be appended to each other. + * + * The database has four tables: + * + * CU "client-unconfirmed" confid -> room, update + * - The client writes to this table when it creates an update. + * - Then it sends an update to the host with the generated confid + * - In case the host doesn't confirm that it received this update, it is sent again on next sync + * HU "host-unconfirmed" room, offset -> update + * - Updates from the host are written to this table + * - When host confirms that an unconfirmed update was persisted, the update is written to the Co table + * - When client sync to host and the room session ids don't match, all host-unconfirmed messages are sent to host + * Co "confirmed": + * data:{room} -> update + * - this field holds confirmed room updates + * meta:{room} -> room session id, confirmed offset + * - this field holds metadata about the room + * US "unconfirmed-subscriptions" room -> _ + * - Subscriptions sent to the server, but didn't receive confirmation yet + * - Either a room is in US or in Co + * - A client may update a room when the room is in either US or Co + */ + +import * as encoding from './encoding.js' +import * as decoding from './decoding.js' +import * as idb from './idb.js' +import * as globals from './globals.js' +import * as message from './message.js' + +/** + * Get 'client-unconfirmed' store from transaction + * @param {IDBTransaction} t + * @return {IDBObjectStore} + */ +const getStoreCU = t => idb.getStore(t, STORE_CU) +/** + * Get 'host-unconfirmed' store from transaction + * @param {IDBTransaction} t + * @return {IDBObjectStore} + */ +const getStoreHU = t => idb.getStore(t, STORE_HU) +/** + * Get 'confirmed' store from transaction + * @param {IDBTransaction} t + * @return {IDBObjectStore} + */ +const getStoreCo = t => idb.getStore(t, STORE_CO) + +/** + * Get `unconfirmed-subscriptions` store from transaction + * @param {IDBTransaction} t + * @return {IDBObjectStore} + */ +const getStoreUS = t => idb.getStore(t, STORE_US) + +/** + * @param {string} room + * @param {number} offset + * @return {[string, number]} + */ +const encodeHUKey = (room, offset) => [room, offset] + +/** + * @typedef RoomAndOffset + * @type {Object} + * @property {string} room + * @property {number} offset Received offsets (including offsets that are not yet confirmed) + */ + +/** + * @param {[string, number]} key + * @return {RoomAndOffset} + */ +const decodeHUKey = key => { + return { + room: key[0], + offset: key[1] + } +} + +const getCoMetaKey = room => 'meta:' + room +const getCoDataKey = room => 'data:' + room + +const STORE_CU = 'client-unconfirmed' +const STORE_US = 'unconfirmed-subscriptions' +const STORE_CO = 'confirmed' +const STORE_HU = 'host-unconfirmed' + +/** + * @param {string} dbNamespace + * @return {Promise} + */ +export const openDB = dbNamespace => idb.openDB(dbNamespace, db => idb.createStores(db, [ + [STORE_CU, { autoIncrement: true }], + [STORE_HU], + [STORE_CO], + [STORE_US] +])) + +export const deleteDB = name => idb.deleteDB(name) + +/** + * Create a new IDBTransaction accessing all object stores. Normally we should care that we can access object stores in parallel. + * But this is not possible in ydb-client since at least two object stores are requested in every IDB change. + * @param {IDBDatabase} db + * @return {IDBTransaction} + */ +export const createTransaction = db => db.transaction([STORE_CU, STORE_HU, STORE_CO, STORE_US], 'readwrite') + +/** + * Write an update to the db after the client created it. This update is not yet received by the host. + * This function returns a client confirmation number. The confirmation number must be send to the host so it can identify the update, + * and we can move the update to HU when it is confirmed (@see writeHostUnconfirmedByClient) + * @param {IDBTransaction} t + * @param {String} room + * @param {ArrayBuffer} update + * @return {Promise} client confirmation number + */ +export const writeClientUnconfirmed = (t, room, update) => { + const encoder = encoding.createEncoder() + encoding.writeVarString(encoder, room) + encoding.writeArrayBuffer(encoder, update) + return idb.addAutoKey(getStoreCU(t), encoding.toBuffer(encoder)) +} + +/** + * Get all updates that are not yet confirmed by host. + * @param {IDBTransaction} t + * @return {Promise} All update messages as a single ArrayBuffer + */ +export const getUnconfirmedUpdates = t => { + const encoder = encoding.createEncoder() + return idb.iterate(getStoreCU(t), null, (value, clientConf) => { + const decoder = decoding.createDecoder(value) + const room = decoding.readVarString(decoder) + const update = decoding.readTail(decoder) + encoding.writeArrayBuffer(encoder, message.createUpdate(room, update, clientConf)) + }).then(() => encoding.toBuffer(encoder)) +} + +/** + * The host confirms that it received and persisted an update. The update can be safely removed from CU. + * It is necessary to call this function in case that the client disconnected before the host could send `writeHostUnconfirmedByClient`. + * @param {IDBTransaction} t + * @param {number} clientConf + * @return {Promise} + */ +export const confirmClient = (t, clientConf) => idb.del(getStoreCU(t), idb.createIDBKeyRangeUpperBound(clientConf, false)) + +/** + * The host confirms that it received and broadcasted an update sent from this client. + * Calling this method does not confirm that the update has been persisted by the server. + * + * Other clients will receive an update with `writeHostUnconfirmed`. Since this client created the update, it only receives a confirmation. So + * we can simply move the update from CU to HU. + * + * @param {IDBTransaction} t + * @param {number} clientConf The client confirmation number that identifies the update + * @param {number} offset The offset with wich the server will store the information + */ +export const writeHostUnconfirmedByClient = (t, clientConf, offset) => idb.get(getStoreCU(t), clientConf).then(roomAndUpdate => { + const decoder = decoding.createDecoder(roomAndUpdate) + const room = decoding.readVarString(decoder) + const update = decoding.readTail(decoder) + return writeHostUnconfirmed(t, room, offset, update).then(() => + idb.del(getStoreCU(t), clientConf) + ) +}) + +/** + * The host broadcasts an update created by another client. It assures that the update will eventually be persisted with + * `offset`. Calling this function does not imply that the update was persisted by the host. In case of mismatching room session ids + * the updates in HU will be sent to the server. + * + * @param {IDBTransaction} t + * @param {String} room + * @param {number} offset + * @param {ArrayBuffer} update + * @return {Promise} + */ +export const writeHostUnconfirmed = (t, room, offset, update) => idb.add(getStoreHU(t), update, encodeHUKey(room, offset)) + +/** + * The host confirms that it persisted updates up until (including) offset. updates may be moved from HU to Co. + * + * @param {IDBTransaction} t + * @param {String} room + * @param {number} offset Inclusive range [0, offset - 1] has been stored to host + */ +export const writeConfirmedByHost = (t, room, offset) => { + const co = getStoreCo(t) + return globals.pall([idb.get(co, getCoDataKey(room)), idb.get(co, getCoMetaKey(room))]).then(async arr => { + const data = arr[0] + const meta = arr[1] + const metaSessionId = decodeMetaValue(meta).roomsid + const dataEncoder = encoding.createEncoder() + encoding.writeArrayBuffer(dataEncoder, data) + const hu = getStoreHU(t) + const huKeyRange = idb.createIDBKeyRangeBound(encodeHUKey(room, 0), encodeHUKey(room, offset), false, false) + return idb.iterate(hu, huKeyRange, (value, _key) => { + const key = decodeHUKey(_key) // @kevin _key is an array. remove decodeHUKey functions + if (key.room === room && key.offset <= offset) { + encoding.writeArrayBuffer(dataEncoder, value) + } + }).then(() => + globals.pall([idb.put(co, encodeMetaValue(metaSessionId, offset), getCoMetaKey(room)), idb.put(co, encoding.toBuffer(dataEncoder), getCoDataKey(room)), idb.del(hu, huKeyRange)]) + ) + }) +} + +/** + * @typedef RoomMeta + * @type {Object} + * @property {string} room + * @property {number} roomsid Room session id + * @property {number} offset Received offsets (including offsets that are not yet confirmed) + */ + +/** + * Get all meta information for all rooms. + * + * @param {IDBTransaction} t + * @return {Promise>} + */ +export const getRoomMetas = t => { + const hu = getStoreHU(t) + const result = [] + return idb.iterate(getStoreCo(t), idb.createIDBKeyRangeLowerBound('meta:', false), (metavalue, metakey) => + idb.getAllKeys(hu, idb.createIDBKeyRangeBound(encodeHUKey(metakey.slice(5), 0), encodeHUKey(metakey.slice(5), 2 ** 32), false, false)).then(keys => { + const { roomsid, offset } = decodeMetaValue(metavalue) + result.push({ + room: metakey.slice(5), + roomsid, + offset: keys.reduce((cur, key) => globals.max(decodeHUKey(key).offset, cur), offset) + }) + }) + ).then(() => globals.presolve(result)) +} + +export const getRoomMeta = (t, room) => + idb.get(getStoreCo(t), getCoMetaKey(room)) + +/** + * Get all data from idb, including unconfirmed updates. + * TODO: include updates in CU + * @param {IDBTransaction} t + * @param {string} room + * @return {Promise} + */ +export const getRoomData = (t, room) => globals.pall([idb.get(getStoreCo(t), 'data:' + room), idb.getAll(getStoreHU(t), idb.createIDBKeyRangeBound(encodeHUKey(room, 0), encodeHUKey(room, 2 ** 32), false, false))]).then(([data, updates]) => { + const encoder = encoding.createEncoder() + encoding.writeArrayBuffer(encoder, data || new Uint8Array(0)) + updates.forEach(update => encoding.writeArrayBuffer(encoder, update)) + return encoding.toBuffer(encoder) +}) + +const decodeMetaValue = buffer => { + const decoder = decoding.createDecoder(buffer) + const roomsid = decoding.readVarUint(decoder) + const offset = decoding.readVarUint(decoder) + return { + roomsid, offset + } +} +/** + * @param {number} roomsid room session id + * @param {number} offset + * @return {ArrayBuffer} + */ +const encodeMetaValue = (roomsid, offset) => { + const encoder = encoding.createEncoder() + encoding.writeVarUint(encoder, roomsid) + encoding.writeVarUint(encoder, offset) + return encoding.toBuffer(encoder) +} + +/** + * Set the initial room data. Overwrites initial data if there is any! + * @param {IDBTransaction} t + * @param {string} room + * @param {number} roomsessionid + * @param {number} offset + * @return {Promise} + */ +export const confirmSubscription = (t, room, roomsessionid, offset) => globals.pall([ + idb.put(getStoreCo(t), encodeMetaValue(roomsessionid, offset), getCoMetaKey(room)), + idb.put(getStoreCo(t), globals.createArrayBufferFromArray([]), getCoDataKey(room)) +]).then(() => idb.del(getStoreUS(t), room)) + +export const writeUnconfirmedSubscription = (t, room) => idb.put(getStoreUS(t), true, room) + +export const getUnconfirmedSubscriptions = t => idb.getAllKeys(getStoreUS(t)) diff --git a/ydb/idbactions.test.js b/ydb/idbactions.test.js new file mode 100644 index 00000000..1d41445b --- /dev/null +++ b/ydb/idbactions.test.js @@ -0,0 +1,23 @@ +import * as globals from './globals.js' +import * as idbactions from './idbactions.js' +import * as test from './test.js' + +idbactions.deleteDB().then(() => idbactions.openDB()).then(db => { + test.run('update lifetime 1', async (testname) => { + const update = new Uint8Array([1, 2, 3]).buffer + const t = idbactions.createTransaction(db) + idbactions.writeInitialRoomData(t, testname, 42, 1, new Uint8Array([0]).buffer) + const clientConf = await idbactions.writeClientUnconfirmed(t, testname, update) + await idbactions.writeHostUnconfirmedByClient(t, clientConf, 0) + await idbactions.writeConfirmedByHost(t, testname, 4) + const metas = await idbactions.getRoomMetas(t) + const roommeta = metas.find(meta => meta.room === testname) + if (roommeta == null || roommeta.offset !== 4 || roommeta.roomsid !== 42) { + throw globals.error() + } + const data = await idbactions.getRoomData(t, testname) + if (!test.compareArrays(new Uint8Array(data), new Uint8Array([0, 1, 2, 3]))) { + throw globals.error() + } + }) +}) diff --git a/ydb/index.js b/ydb/index.js new file mode 100644 index 00000000..578f545b --- /dev/null +++ b/ydb/index.js @@ -0,0 +1,7 @@ +import * as ydbclient from './ydb-client.js' + +/** + * @param {string} url + * @return {Promise} + */ +export const createYdbClient = url => ydbclient.get(url) diff --git a/ydb/logging.js b/ydb/logging.js new file mode 100644 index 00000000..e06d4dad --- /dev/null +++ b/ydb/logging.js @@ -0,0 +1,23 @@ + +import * as globals from './globals.js' + +let date = new Date().getTime() + +const writeDate = () => { + const oldDate = date + date = new Date().getTime() + return date - oldDate +} + +export const print = (...args) => console.log(...args) +export const log = m => print(`%cydb-client: %c${m} %c+${writeDate()}ms`, 'color: blue;', '', 'color: blue') + +export const fail = m => { + throw new Error(m) +} + +/** + * @param {ArrayBuffer} buffer + * @return {string} + */ +export const arrayBufferToString = buffer => JSON.stringify(Array.from(globals.createUint8ArrayFromBuffer(buffer))) diff --git a/ydb/message.js b/ydb/message.js new file mode 100644 index 00000000..4affde14 --- /dev/null +++ b/ydb/message.js @@ -0,0 +1,106 @@ +import * as encoding from './encoding.js' +import * as decoding from './decoding.js' +import * as idbactions from './idbactions.js' +import * as logging from './logging.js' +import * as bc from './broadcastchannel.js' + +/* make sure to update message.go in ydb when updating these values.. */ +export const MESSAGE_UPDATE = 0 // TODO: rename host_unconfirmed? +export const MESSAGE_SUB = 1 +export const MESSAGE_CONFIRMATION = 2 +export const MESSAGE_SUB_CONF = 3 +export const MESSAGE_HOST_UNCONFIRMED_BY_CLIENT = 4 +export const MESSAGE_CONFIRMED_BY_HOST = 5 + +/** + * @param {any} ydb YdbClient instance + * @param {ArrayBuffer} message + */ +export const readMessage = (ydb, message) => { + const t = idbactions.createTransaction(ydb.db) + const decoder = decoding.createDecoder(message) + while (decoding.hasContent(decoder)) { + switch (decoding.readVarUint(decoder)) { + case MESSAGE_UPDATE: { + const offset = decoding.readVarUint(decoder) + const room = decoding.readVarString(decoder) + const update = decoding.readPayload(decoder) + logging.log(`Received Update. room "${room}", offset ${offset}, ${logging.arrayBufferToString(update)}`) + idbactions.writeHostUnconfirmed(t, room, offset, update) + bc.publish(room, update) + break + } + case MESSAGE_SUB_CONF: { + const nSubs = decoding.readVarUint(decoder) + for (let i = 0; i < nSubs; i++) { + const room = decoding.readVarString(decoder) + const offset = decoding.readVarUint(decoder) + const roomsid = decoding.readVarUint(decoder) // TODO: SID + logging.log(`Received Sub Conf. room "${room}", offset ${offset}, roomsid ${roomsid}`) + idbactions.confirmSubscription(t, room, roomsid, offset) + } + break + } + case MESSAGE_CONFIRMATION: { + const room = decoding.readVarString(decoder) + const offset = decoding.readVarUint(decoder) + logging.log(`Received Confirmation. room "${room}", offset ${offset}`) + idbactions.writeConfirmedByHost(t, room, offset) + break + } + case MESSAGE_HOST_UNCONFIRMED_BY_CLIENT: { + const clientConf = decoding.readVarUint(decoder) + const offset = decoding.readVarUint(decoder) + logging.log(`Received HostUnconfirmedByClient. clientConf "${clientConf}", offset ${offset}`) + idbactions.writeHostUnconfirmedByClient(t, clientConf, offset) + break + } + case MESSAGE_CONFIRMED_BY_HOST: { + const room = decoding.readVarString(decoder) + const offset = decoding.readVarUint(decoder) + logging.log(`Received Confirmation By Host. room "${room}", offset ${offset}`) + idbactions.writeConfirmedByHost(t, room, offset) + break + } + default: + logging.fail(`Unexpected message type`) + } + } +} + +/** + * @param {string} room + * @param {ArrayBuffer} update + * @param {number} clientConf + * @return {ArrayBuffer} + */ +export const createUpdate = (room, update, clientConf) => { + const encoder = encoding.createEncoder() + encoding.writeVarUint(encoder, MESSAGE_UPDATE) + encoding.writeVarUint(encoder, clientConf) + encoding.writeVarString(encoder, room) + encoding.writePayload(encoder, update) + return encoding.toBuffer(encoder) +} + +/** + * @typedef SubDef + * @type {Object} + * @property {string} room + * @property {number} offset + */ + +/** + * @param {Array} rooms + * @return {ArrayBuffer} + */ +export const createSub = rooms => { + const encoder = encoding.createEncoder() + encoding.writeVarUint(encoder, MESSAGE_SUB) + encoding.writeVarUint(encoder, rooms.length) + for (let i = 0; i < rooms.length; i++) { + encoding.writeVarString(encoder, rooms[i].room) + encoding.writeVarUint(encoder, rooms[i].offset) + } + return encoding.toBuffer(encoder) +} diff --git a/ydb/test.html b/ydb/test.html new file mode 100644 index 00000000..cd578f12 --- /dev/null +++ b/ydb/test.html @@ -0,0 +1,10 @@ + + + + + + + + + + diff --git a/ydb/test.js b/ydb/test.js new file mode 100644 index 00000000..ecbf023c --- /dev/null +++ b/ydb/test.js @@ -0,0 +1,25 @@ +import * as logging from './logging.js' + +export const run = async (name, f) => { + console.log(`%cStart:%c ${name}`, 'color:blue;', '') + const start = new Date() + try { + await f(name) + } catch (e) { + logging.print(`%cFailure:%c ${name} in %c${new Date().getTime() - start.getTime()}ms`, 'color:red;font-weight:bold', '', 'color:grey') + throw e + } + logging.print(`%cSuccess:%c ${name} in %c${new Date().getTime() - start.getTime()}ms`, 'color:green;font-weight:bold', '', 'color:grey') +} + +export const compareArrays = (as, bs) => { + if (as.length !== bs.length) { + return false + } + for (let i = 0; i < as.length; i++) { + if (as[i] !== bs[i]) { + return false + } + } + return true +} diff --git a/ydb/ydb-client.js b/ydb/ydb-client.js new file mode 100644 index 00000000..9d40ea87 --- /dev/null +++ b/ydb/ydb-client.js @@ -0,0 +1,135 @@ +/* eslint-env browser */ +import * as idbactions from './idbactions.js' +import * as globals from './globals.js' +import * as message from './message.js' +import * as bc from './broadcastchannel.js' +import * as encoding from './encoding.js' +import * as logging from './logging.js' +import * as idb from './idb.js' +import Y from '../src/Y.js' + +export class YdbClient { + constructor (url, db) { + this.url = url + this.ws = new WebSocket(url) + this.rooms = new Map() + this.db = db + this.connected = false + initWS(this, this.ws) + } + /** + * Open a Yjs instance that connects to `roomname`. + * @param {string} roomname + * @return {Y} + */ + getY (roomname) { + const y = new Y(roomname) + y.on('afterTransaction', function () { + debugger + }) + return y + } +} + +/** + * Initialize WebSocket connection. Try to reconnect on error/disconnect. + * @param {YdbClient} ydb + * @param {WebSocket} ws + */ +const initWS = (ydb, ws) => { + ws.binaryType = 'arraybuffer' + ws.onclose = () => { + ydb.connected = false + logging.log('Disconnected from ydb. Reconnecting..') + ydb.ws = new WebSocket(ydb.url) + initWS(ydb, ws) + } + ws.onopen = () => { + const t = idbactions.createTransaction(ydb.db) + globals.pall([idbactions.getRoomMetas(t), idbactions.getUnconfirmedSubscriptions(t), idbactions.getUnconfirmedUpdates(t)]).then(([metas, us, unconfirmedUpdates]) => { + const subs = [] + metas.forEach(meta => { + subs.push({ + room: meta.room, + offset: meta.offset + }) + }) + us.forEach(room => { + subs.push({ + room, offset: 0 + }) + }) + ydb.connected = true + const encoder = encoding.createEncoder() + encoding.writeArrayBuffer(encoder, message.createSub(subs)) + encoding.writeArrayBuffer(encoder, unconfirmedUpdates) + send(ydb, encoding.toBuffer(encoder)) + }) + } + ws.onmessage = event => message.readMessage(ydb, event.data) +} + +// maps from dbNamespace to db +const dbPromises = new Map() + +/** + * Factory function. Get a ydb instance that connects to url, and uses dbNamespace as indexeddb namespace. + * Create if it does not exist yet. + * + * @param {string} url + * @param {string} dbNamespace + * @return {Promise} + */ +export const get = (url, dbNamespace = 'ydb') => { + if (!dbPromises.has(dbNamespace)) { + dbPromises.set(dbNamespace, idbactions.openDB(dbNamespace)) + } + return dbPromises.get(dbNamespace).then(db => globals.presolve(new YdbClient(url, db))) +} + +/** + * Remove a db namespace. Call this to remove any persisted data. Make sure to close active sessions. + * TODO: destroy active ydbClient sessions / throw if a session is still active + * @param {string} dbNamespace + * @return {Promise} + */ +export const clear = (dbNamespace = 'ydb') => idb.deleteDB(dbNamespace) + +/** + * @param {YdbClient} ydb + * @param {ArrayBuffer} m + */ +export const send = (ydb, m) => ydb.connected && ydb.ws.send(m) + +/** + * @param {YdbClient} ydb + * @param {string} room + * @param {ArrayBuffer} update + */ +export const update = (ydb, room, update) => { + bc.publish(room, update) + const t = idbactions.createTransaction(ydb.db) + logging.log(`Write Unconfirmed Update. room "${room}", ${JSON.stringify(update)}`) + return idbactions.writeClientUnconfirmed(t, room, update).then(clientConf => { + logging.log(`Send Unconfirmed Update. connected ${ydb.connected} room "${room}", clientConf ${clientConf}, ${logging.arrayBufferToString(update)}`) + send(ydb, message.createUpdate(room, update, clientConf)) + }) +} + +export const subscribe = (ydb, room, f) => { + bc.subscribe(room, f) + const t = idbactions.createTransaction(ydb.db) + idbactions.getRoomData(t, room).then(data => { + if (data.byteLength > 0) { + f(data) + } + }) + idbactions.getRoomMeta(t, room).then(meta => { + if (meta === undefined) { + logging.log(`Send Subscribe. room "${room}", offset ${0}`) + // TODO: maybe set prelim meta value so we don't sub twice + send(ydb, message.createSub([{ room, offset: 0 }])) + idbactions.writeUnconfirmedSubscription(t, room) + } + }) +} diff --git a/ydb/ydb-client.test.js b/ydb/ydb-client.test.js new file mode 100644 index 00000000..828c1ee6 --- /dev/null +++ b/ydb/ydb-client.test.js @@ -0,0 +1,85 @@ +/* eslint-env browser */ + +import * as test from './test.js' +import * as ydbClient from './ydb-client.js' +import * as globals from './globals.js' +import * as idbactions from './idbactions.js' +import * as logging from './logging.js' + +const wsUrl = 'ws://127.0.0.1:8899/ws' +const testRoom = 'testroom' + +class YdbTestClient { + constructor (ydb) { + this.ydb = ydb + this.createdUpdates = new Set() + this.data = [] + this.checked = new Set() + } +} + +const clearAllYdbContent = () => fetch('http://127.0.0.1:8899/clearAll', { mode: 'no-cors' }) + +/** + * @param {string} name + * @return {Promise} + */ +const getTestClient = async name => { + await ydbClient.clear('ydb-' + name) + const ydb = await ydbClient.get(wsUrl, 'ydb-' + name) + const testClient = new YdbTestClient(ydb) + ydbClient.subscribe(ydb, testRoom, data => { + testClient.data.push(data) + globals.createArrayFromArrayBuffer(data).forEach(d => { + if (d < nextUpdateNumber) { + testClient.checked.add(d) + } + }) + console.log(name, 'received data', data, testClient.data) + }) + return testClient +} + +// TODO: does only work for 8bit numbers.. +let nextUpdateNumber = 0 + +/** + * Create an update. We use an increasing message counter so each update is unique. + * @param {YdbTestClient} client + */ +const update = (client) => { + ydbClient.update(client.ydb, testRoom, globals.createArrayBufferFromArray([nextUpdateNumber++])) +} + +/** + * Check if tsclient has all data in dataset + * @param {...YdbTestClient} testClients + */ +const checkTestClients = (...testClients) => globals.until(100000, () => testClients.every(testClient => + testClient.checked.size === nextUpdateNumber +)).then(() => + globals.wait(150) // wait 150 for all conf messages to come in.. + // TODO: do the below check in the until handler +).then(() => globals.pall(testClients.map(testClient => idbactions.getRoomData(idbactions.createTransaction(testClient.ydb.db), testRoom)))).then(testClientsData => { + testClientsData.forEach((testClientData, i) => { + const checked = new Set() + globals.createArrayFromArrayBuffer(testClientData).forEach(d => { + if (checked.has(d)) { + logging.fail('duplicate content') + } + checked.add(d) + }) + if (checked.size !== nextUpdateNumber) { + logging.fail(`Not all data is available in idb in client ${i}`) + } + }) +}) + +clearAllYdbContent().then(() => { + test.run('ydb-client', async testname => { + const c1 = await getTestClient('1') + const c2 = await getTestClient('2') + update(c1) + await checkTestClients(c1, c2) + }) +})