diff --git a/tests-lib/helper.js b/tests-lib/helper.js new file mode 100644 index 00000000..12c2f3aa --- /dev/null +++ b/tests-lib/helper.js @@ -0,0 +1,140 @@ + +import _Y from '../../yjs/src/y.js' + +import yMemory from '../../y-memory/src/Memory.js' +import yArray from '../../y-array/src/y-array.js' +import yMap from '../../y-map/src/Map.js' +import yTest from './test-connector.js' + +import Chance from 'chance' + +export let Y = _Y + +Y.extend(yMemory, yArray, yMap, yTest) + +export async function garbageCollectAllUsers (t, users) { + await flushAll(t, users) + await Promise.all(users.map(u => u.db.emptyGarbageCollector())) +} + +export async function compareUsers (t, users) { + var unsynced = users.filter(u => !u.connector.isSynced) + unsynced.forEach(u => u.reconnect()) + if (users[0].connector.testRoom != null) { + // flush for sync if test-connector + await users[0].connector.testRoom.flushAll(users) + } + await Promise.all(unsynced.map(u => { + return new Promise(function (resolve) { + u.connector.whenSynced(resolve) + }) + })) + await flushAll(t, users) + // types must be equal before garbage collect + var userTypeContents = users.map(u => u.share.array._content.map(c => c.val || JSON.stringify(c.type))) + var data = await Promise.all(users.map(async (u) => { + var data = {} + await u.db.garbageCollect() + await u.db.garbageCollect() + u.db.requestTransaction(function * () { + data.os = yield * this.getOperationsUntransformed() + data.os = data.os.untransformed.map((op) => { + return Y.Struct[op.struct].encode(op) + }) + data.ds = yield * this.getDeleteSet() + data.ss = yield * this.getStateSet() + }) + await u.db.whenTransactionsFinished() + return data + })) + for (var i = 0; i < data.length - 1; i++) { + await t.asyncGroup(async () => { + t.compare(userTypeContents[i], userTypeContents[i + 1], 'types') + t.compare(data[i].os, data[i + 1].os, 'os') + t.compare(data[i].ds, data[i + 1].ds, 'ds') + t.compare(data[i].ss, data[i + 1].ss, 'ss') + }, `Compare user${i} with user${i + 1}`) + } +} + +export async function initArrays (t, opts) { + var result = { + users: [] + } + var share = Object.assign({ flushHelper: 'Map', array: 'Array' }, opts.share) + var chance = opts.chance || new Chance(t.getSeed() * 1000000000) + var connector = Object.assign({ room: 'debugging_' + t.name, testContext: t, chance }, opts.connector) + for (let i = 0; i < opts.users; i++) { + let y = await Y({ + connector: connector, + db: opts.db, + share: share + }) + result.users.push(y) + for (let name in share) { + result[name + i] = y.share[name] + } + } + result.array0.delete(0, result.array0.length) + if (result.users[0].connector.testRoom != null) { + // flush for sync if test-connector + await result.users[0].connector.testRoom.flushAll(result.users) + } + await Promise.all(result.users.map(u => { + return new Promise(function (resolve) { + u.connector.whenSynced(resolve) + }) + })) + await flushAll(t, result.users) + return result +} + +export async function flushAll (t, users) { + users = users.filter(u => u.connector.isSynced) + if (users.length === 0) { + return + } + await wait(0) + if (users[0].connector.testRoom != null) { + // use flushAll method specified in Test Connector + await users[0].connector.testRoom.flushAll(users) + } else { + // flush for any connector + await Promise.all(users.map(u => { return u.db.whenTransactionsFinished() })) + + var flushCounter = users[0].share.flushHelper.get('0') || 0 + flushCounter++ + await Promise.all(users.map(async (u, i) => { + // wait for all users to set the flush counter to the same value + await new Promise(resolve => { + function observer () { + var allUsersReceivedUpdate = true + for (var i = 0; i < users.length; i++) { + if (u.share.flushHelper.get(i + '') !== flushCounter) { + allUsersReceivedUpdate = false + break + } + } + if (allUsersReceivedUpdate) { + resolve() + } + } + u.share.flushHelper.observe(observer) + u.share.flushHelper.set(i + '', flushCounter) + }) + })) + } +} + +export async function flushSome (t, users) { + if (users[0].connector.testRoom == null) { + // if not test-connector, wait for some time for operations to arrive + await wait(100) + } +} + +export function wait (t) { + return new Promise(function (resolve) { + setTimeout(resolve, t != null ? t : 100) + }) +} diff --git a/tests-lib/test-connector.js b/tests-lib/test-connector.js new file mode 100644 index 00000000..80289414 --- /dev/null +++ b/tests-lib/test-connector.js @@ -0,0 +1,151 @@ +/* global Y */ +import { wait } from './helper.js' + +var rooms = {} + +export class TestRoom { + constructor (roomname) { + this.room = roomname + this.users = {} + this.nextUserId = 0 + } + join (connector) { + if (connector.userId == null) { + connector.setUserId('' + (this.nextUserId++)) + } + Object.keys(this.users).forEach(uid => { + this.users[uid].userJoined(connector.userId, 'master') + connector.userJoined(uid, 'master') + }) + this.users[connector.userId] = connector + } + leave (connector) { + delete this.users[connector.userId] + Object.keys(this.users).forEach(uid => { + this.users[uid].userLeft(connector.userId) + }) + } + send (sender, receiver, m) { + m = JSON.parse(JSON.stringify(m)) + var user = this.users[receiver] + if (user != null) { + user.receiveMessage(sender, m) + } + } + broadcast (sender, m) { + Object.keys(this.users).forEach(receiver => { + this.send(sender, receiver, m) + }) + } + async flushAll (users) { + let flushing = true + let allUserIds = Object.keys(this.users) + if (users == null) { + users = allUserIds.map(id => this.users[id].y) + } + while (flushing) { + let res = await Promise.all(allUserIds.map(id => this.users[id]._flushAll(users))) + flushing = res.some(status => status === 'flushing') + } + } +} + +function getTestRoom (roomname) { + if (rooms[roomname] == null) { + rooms[roomname] = new TestRoom(roomname) + } + return rooms[roomname] +} + +export default function extendTestConnector (Y) { + class TestConnector extends Y.AbstractConnector { + constructor (y, options) { + if (options === undefined) { + throw new Error('Options must not be undefined!') + } + if (options.room == null) { + throw new Error('You must define a room name!') + } + options.role = 'slave' + super(y, options) + this.options = options + this.room = options.room + this.chance = options.chance + this.testRoom = getTestRoom(this.room) + this.testRoom.join(this) + } + disconnect () { + this.testRoom.leave(this) + return super.disconnect() + } + reconnect () { + this.testRoom.join(this) + return super.reconnect() + } + send (uid, message) { + this.testRoom.send(this.userId, uid, message) + } + broadcast (message) { + this.testRoom.broadcast(this.userId, message) + } + async whenSynced (f) { + var synced = false + var periodicFlushTillSync = () => { + if (synced) { + f() + } else { + this.testRoom.flushAll([this.y]).then(function () { + setTimeout(periodicFlushTillSync, 10) + }) + } + } + periodicFlushTillSync() + return super.whenSynced(function () { + synced = true + }) + } + receiveMessage (sender, m) { + if (this.userId !== sender && this.connections[sender] != null) { + var buffer = this.connections[sender].buffer + if (buffer == null) { + buffer = this.connections[sender].buffer = [] + } + buffer.push(m) + if (this.chance.bool({likelihood: 30})) { + // flush 1/2 with 30% chance + var flushLength = Math.round(buffer.length / 2) + buffer.splice(0, flushLength).forEach(m => { + super.receiveMessage(sender, m) + }) + } + } + } + async _flushAll (flushUsers) { + if (flushUsers.some(u => u.connector.userId === this.userId)) { + // this one needs to sync with every other user + flushUsers = Object.keys(this.connections).map(id => this.testRoom.users[id].y) + } + var finished = [] + for (let i = 0; i < flushUsers.length; i++) { + let userId = flushUsers[i].connector.userId + if (userId === this.userId) continue + let buffer = this.connections[userId].buffer + if (buffer != null) { + var messages = buffer.splice(0) + for (let j = 0; j < messages.length; j++) { + let p = super.receiveMessage(userId, messages[j]) + finished.push(p) + } + } + } + await Promise.all(finished) + await this.y.db.whenTransactionsFinished() + return finished.length > 0 ? 'flushing' : 'done' + } + } + Y.extend('test', TestConnector) +} + +if (typeof Y !== 'undefined') { + extendTestConnector(Y) +}