outsourced helper and test-connector
This commit is contained in:
parent
020dacdad4
commit
2ea163a5cf
140
tests-lib/helper.js
Normal file
140
tests-lib/helper.js
Normal file
@ -0,0 +1,140 @@
|
||||
|
||||
import _Y from '../../yjs/src/y.js'
|
||||
|
||||
import yMemory from '../../y-memory/src/Memory.js'
|
||||
import yArray from '../../y-array/src/y-array.js'
|
||||
import yMap from '../../y-map/src/Map.js'
|
||||
import yTest from './test-connector.js'
|
||||
|
||||
import Chance from 'chance'
|
||||
|
||||
export let Y = _Y
|
||||
|
||||
Y.extend(yMemory, yArray, yMap, yTest)
|
||||
|
||||
export async function garbageCollectAllUsers (t, users) {
|
||||
await flushAll(t, users)
|
||||
await Promise.all(users.map(u => u.db.emptyGarbageCollector()))
|
||||
}
|
||||
|
||||
export async function compareUsers (t, users) {
|
||||
var unsynced = users.filter(u => !u.connector.isSynced)
|
||||
unsynced.forEach(u => u.reconnect())
|
||||
if (users[0].connector.testRoom != null) {
|
||||
// flush for sync if test-connector
|
||||
await users[0].connector.testRoom.flushAll(users)
|
||||
}
|
||||
await Promise.all(unsynced.map(u => {
|
||||
return new Promise(function (resolve) {
|
||||
u.connector.whenSynced(resolve)
|
||||
})
|
||||
}))
|
||||
await flushAll(t, users)
|
||||
// types must be equal before garbage collect
|
||||
var userTypeContents = users.map(u => u.share.array._content.map(c => c.val || JSON.stringify(c.type)))
|
||||
var data = await Promise.all(users.map(async (u) => {
|
||||
var data = {}
|
||||
await u.db.garbageCollect()
|
||||
await u.db.garbageCollect()
|
||||
u.db.requestTransaction(function * () {
|
||||
data.os = yield * this.getOperationsUntransformed()
|
||||
data.os = data.os.untransformed.map((op) => {
|
||||
return Y.Struct[op.struct].encode(op)
|
||||
})
|
||||
data.ds = yield * this.getDeleteSet()
|
||||
data.ss = yield * this.getStateSet()
|
||||
})
|
||||
await u.db.whenTransactionsFinished()
|
||||
return data
|
||||
}))
|
||||
for (var i = 0; i < data.length - 1; i++) {
|
||||
await t.asyncGroup(async () => {
|
||||
t.compare(userTypeContents[i], userTypeContents[i + 1], 'types')
|
||||
t.compare(data[i].os, data[i + 1].os, 'os')
|
||||
t.compare(data[i].ds, data[i + 1].ds, 'ds')
|
||||
t.compare(data[i].ss, data[i + 1].ss, 'ss')
|
||||
}, `Compare user${i} with user${i + 1}`)
|
||||
}
|
||||
}
|
||||
|
||||
export async function initArrays (t, opts) {
|
||||
var result = {
|
||||
users: []
|
||||
}
|
||||
var share = Object.assign({ flushHelper: 'Map', array: 'Array' }, opts.share)
|
||||
var chance = opts.chance || new Chance(t.getSeed() * 1000000000)
|
||||
var connector = Object.assign({ room: 'debugging_' + t.name, testContext: t, chance }, opts.connector)
|
||||
for (let i = 0; i < opts.users; i++) {
|
||||
let y = await Y({
|
||||
connector: connector,
|
||||
db: opts.db,
|
||||
share: share
|
||||
})
|
||||
result.users.push(y)
|
||||
for (let name in share) {
|
||||
result[name + i] = y.share[name]
|
||||
}
|
||||
}
|
||||
result.array0.delete(0, result.array0.length)
|
||||
if (result.users[0].connector.testRoom != null) {
|
||||
// flush for sync if test-connector
|
||||
await result.users[0].connector.testRoom.flushAll(result.users)
|
||||
}
|
||||
await Promise.all(result.users.map(u => {
|
||||
return new Promise(function (resolve) {
|
||||
u.connector.whenSynced(resolve)
|
||||
})
|
||||
}))
|
||||
await flushAll(t, result.users)
|
||||
return result
|
||||
}
|
||||
|
||||
export async function flushAll (t, users) {
|
||||
users = users.filter(u => u.connector.isSynced)
|
||||
if (users.length === 0) {
|
||||
return
|
||||
}
|
||||
await wait(0)
|
||||
if (users[0].connector.testRoom != null) {
|
||||
// use flushAll method specified in Test Connector
|
||||
await users[0].connector.testRoom.flushAll(users)
|
||||
} else {
|
||||
// flush for any connector
|
||||
await Promise.all(users.map(u => { return u.db.whenTransactionsFinished() }))
|
||||
|
||||
var flushCounter = users[0].share.flushHelper.get('0') || 0
|
||||
flushCounter++
|
||||
await Promise.all(users.map(async (u, i) => {
|
||||
// wait for all users to set the flush counter to the same value
|
||||
await new Promise(resolve => {
|
||||
function observer () {
|
||||
var allUsersReceivedUpdate = true
|
||||
for (var i = 0; i < users.length; i++) {
|
||||
if (u.share.flushHelper.get(i + '') !== flushCounter) {
|
||||
allUsersReceivedUpdate = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if (allUsersReceivedUpdate) {
|
||||
resolve()
|
||||
}
|
||||
}
|
||||
u.share.flushHelper.observe(observer)
|
||||
u.share.flushHelper.set(i + '', flushCounter)
|
||||
})
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
export async function flushSome (t, users) {
|
||||
if (users[0].connector.testRoom == null) {
|
||||
// if not test-connector, wait for some time for operations to arrive
|
||||
await wait(100)
|
||||
}
|
||||
}
|
||||
|
||||
export function wait (t) {
|
||||
return new Promise(function (resolve) {
|
||||
setTimeout(resolve, t != null ? t : 100)
|
||||
})
|
||||
}
|
151
tests-lib/test-connector.js
Normal file
151
tests-lib/test-connector.js
Normal file
@ -0,0 +1,151 @@
|
||||
/* global Y */
|
||||
import { wait } from './helper.js'
|
||||
|
||||
var rooms = {}
|
||||
|
||||
export class TestRoom {
|
||||
constructor (roomname) {
|
||||
this.room = roomname
|
||||
this.users = {}
|
||||
this.nextUserId = 0
|
||||
}
|
||||
join (connector) {
|
||||
if (connector.userId == null) {
|
||||
connector.setUserId('' + (this.nextUserId++))
|
||||
}
|
||||
Object.keys(this.users).forEach(uid => {
|
||||
this.users[uid].userJoined(connector.userId, 'master')
|
||||
connector.userJoined(uid, 'master')
|
||||
})
|
||||
this.users[connector.userId] = connector
|
||||
}
|
||||
leave (connector) {
|
||||
delete this.users[connector.userId]
|
||||
Object.keys(this.users).forEach(uid => {
|
||||
this.users[uid].userLeft(connector.userId)
|
||||
})
|
||||
}
|
||||
send (sender, receiver, m) {
|
||||
m = JSON.parse(JSON.stringify(m))
|
||||
var user = this.users[receiver]
|
||||
if (user != null) {
|
||||
user.receiveMessage(sender, m)
|
||||
}
|
||||
}
|
||||
broadcast (sender, m) {
|
||||
Object.keys(this.users).forEach(receiver => {
|
||||
this.send(sender, receiver, m)
|
||||
})
|
||||
}
|
||||
async flushAll (users) {
|
||||
let flushing = true
|
||||
let allUserIds = Object.keys(this.users)
|
||||
if (users == null) {
|
||||
users = allUserIds.map(id => this.users[id].y)
|
||||
}
|
||||
while (flushing) {
|
||||
let res = await Promise.all(allUserIds.map(id => this.users[id]._flushAll(users)))
|
||||
flushing = res.some(status => status === 'flushing')
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function getTestRoom (roomname) {
|
||||
if (rooms[roomname] == null) {
|
||||
rooms[roomname] = new TestRoom(roomname)
|
||||
}
|
||||
return rooms[roomname]
|
||||
}
|
||||
|
||||
export default function extendTestConnector (Y) {
|
||||
class TestConnector extends Y.AbstractConnector {
|
||||
constructor (y, options) {
|
||||
if (options === undefined) {
|
||||
throw new Error('Options must not be undefined!')
|
||||
}
|
||||
if (options.room == null) {
|
||||
throw new Error('You must define a room name!')
|
||||
}
|
||||
options.role = 'slave'
|
||||
super(y, options)
|
||||
this.options = options
|
||||
this.room = options.room
|
||||
this.chance = options.chance
|
||||
this.testRoom = getTestRoom(this.room)
|
||||
this.testRoom.join(this)
|
||||
}
|
||||
disconnect () {
|
||||
this.testRoom.leave(this)
|
||||
return super.disconnect()
|
||||
}
|
||||
reconnect () {
|
||||
this.testRoom.join(this)
|
||||
return super.reconnect()
|
||||
}
|
||||
send (uid, message) {
|
||||
this.testRoom.send(this.userId, uid, message)
|
||||
}
|
||||
broadcast (message) {
|
||||
this.testRoom.broadcast(this.userId, message)
|
||||
}
|
||||
async whenSynced (f) {
|
||||
var synced = false
|
||||
var periodicFlushTillSync = () => {
|
||||
if (synced) {
|
||||
f()
|
||||
} else {
|
||||
this.testRoom.flushAll([this.y]).then(function () {
|
||||
setTimeout(periodicFlushTillSync, 10)
|
||||
})
|
||||
}
|
||||
}
|
||||
periodicFlushTillSync()
|
||||
return super.whenSynced(function () {
|
||||
synced = true
|
||||
})
|
||||
}
|
||||
receiveMessage (sender, m) {
|
||||
if (this.userId !== sender && this.connections[sender] != null) {
|
||||
var buffer = this.connections[sender].buffer
|
||||
if (buffer == null) {
|
||||
buffer = this.connections[sender].buffer = []
|
||||
}
|
||||
buffer.push(m)
|
||||
if (this.chance.bool({likelihood: 30})) {
|
||||
// flush 1/2 with 30% chance
|
||||
var flushLength = Math.round(buffer.length / 2)
|
||||
buffer.splice(0, flushLength).forEach(m => {
|
||||
super.receiveMessage(sender, m)
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
async _flushAll (flushUsers) {
|
||||
if (flushUsers.some(u => u.connector.userId === this.userId)) {
|
||||
// this one needs to sync with every other user
|
||||
flushUsers = Object.keys(this.connections).map(id => this.testRoom.users[id].y)
|
||||
}
|
||||
var finished = []
|
||||
for (let i = 0; i < flushUsers.length; i++) {
|
||||
let userId = flushUsers[i].connector.userId
|
||||
if (userId === this.userId) continue
|
||||
let buffer = this.connections[userId].buffer
|
||||
if (buffer != null) {
|
||||
var messages = buffer.splice(0)
|
||||
for (let j = 0; j < messages.length; j++) {
|
||||
let p = super.receiveMessage(userId, messages[j])
|
||||
finished.push(p)
|
||||
}
|
||||
}
|
||||
}
|
||||
await Promise.all(finished)
|
||||
await this.y.db.whenTransactionsFinished()
|
||||
return finished.length > 0 ? 'flushing' : 'done'
|
||||
}
|
||||
}
|
||||
Y.extend('test', TestConnector)
|
||||
}
|
||||
|
||||
if (typeof Y !== 'undefined') {
|
||||
extendTestConnector(Y)
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user