167 lines
4.8 KiB
JavaScript
167 lines
4.8 KiB
JavaScript
/* global Y */
|
|
import { wait } from './helper.js'
|
|
import { formatYjsMessage } from '../src/MessageHandler.js'
|
|
|
|
var rooms = {}
|
|
|
|
export class TestRoom {
|
|
constructor (roomname) {
|
|
this.room = roomname
|
|
this.users = new Map()
|
|
this.nextUserId = 0
|
|
}
|
|
join (connector) {
|
|
if (connector.userId == null) {
|
|
connector.setUserId(this.nextUserId++)
|
|
}
|
|
this.users.forEach((user, uid) => {
|
|
if (user.role === 'master' || connector.role === 'master') {
|
|
this.users.get(uid).userJoined(connector.userId, connector.role)
|
|
connector.userJoined(uid, this.users.get(uid).role)
|
|
}
|
|
})
|
|
this.users.set(connector.userId, connector)
|
|
}
|
|
leave (connector) {
|
|
this.users.delete(connector.userId)
|
|
this.users.forEach(user => {
|
|
user.userLeft(connector.userId)
|
|
})
|
|
}
|
|
send (sender, receiver, m) {
|
|
var user = this.users.get(receiver)
|
|
if (user != null) {
|
|
user.receiveMessage(sender, m)
|
|
}
|
|
}
|
|
broadcast (sender, m) {
|
|
this.users.forEach((user, receiver) => {
|
|
this.send(sender, receiver, m)
|
|
})
|
|
}
|
|
async flushAll (users) {
|
|
let flushing = true
|
|
let allUserIds = Array.from(this.users.keys())
|
|
if (users == null) {
|
|
users = allUserIds.map(id => this.users.get(id).y)
|
|
}
|
|
while (flushing) {
|
|
await wait(10)
|
|
let res = await Promise.all(allUserIds.map(id => this.users.get(id)._flushAll(users)))
|
|
flushing = res.some(status => status === 'flushing')
|
|
}
|
|
}
|
|
}
|
|
|
|
function getTestRoom (roomname) {
|
|
if (rooms[roomname] == null) {
|
|
rooms[roomname] = new TestRoom(roomname)
|
|
}
|
|
return rooms[roomname]
|
|
}
|
|
|
|
export default function extendTestConnector (Y) {
|
|
class TestConnector extends Y.AbstractConnector {
|
|
constructor (y, options) {
|
|
if (options === undefined) {
|
|
throw new Error('Options must not be undefined!')
|
|
}
|
|
if (options.room == null) {
|
|
throw new Error('You must define a room name!')
|
|
}
|
|
options.forwardAppliedOperations = options.role === 'master'
|
|
super(y, options)
|
|
this.options = options
|
|
this.room = options.room
|
|
this.chance = options.chance
|
|
this.testRoom = getTestRoom(this.room)
|
|
this.testRoom.join(this)
|
|
}
|
|
disconnect () {
|
|
this.testRoom.leave(this)
|
|
return super.disconnect()
|
|
}
|
|
logBufferParsed () {
|
|
console.log(' === Logging buffer of user ' + this.userId + ' === ')
|
|
for (let [user, conn] of this.connections) {
|
|
console.log(` ${user}:`)
|
|
for (let i = 0; i < conn.buffer.length; i++) {
|
|
console.log(formatYjsMessage(conn.buffer[i]))
|
|
}
|
|
}
|
|
}
|
|
reconnect () {
|
|
this.testRoom.join(this)
|
|
return super.reconnect()
|
|
}
|
|
send (uid, message) {
|
|
super.send(uid, message)
|
|
this.testRoom.send(this.userId, uid, message)
|
|
}
|
|
broadcast (message) {
|
|
super.broadcast(message)
|
|
this.testRoom.broadcast(this.userId, message)
|
|
}
|
|
async whenSynced (f) {
|
|
var synced = false
|
|
var periodicFlushTillSync = () => {
|
|
if (synced) {
|
|
f()
|
|
} else {
|
|
this.testRoom.flushAll([this.y]).then(function () {
|
|
setTimeout(periodicFlushTillSync, 10)
|
|
})
|
|
}
|
|
}
|
|
periodicFlushTillSync()
|
|
return super.whenSynced(function () {
|
|
synced = true
|
|
})
|
|
}
|
|
receiveMessage (sender, m) {
|
|
if (this.userId !== sender && this.connections.has(sender)) {
|
|
var buffer = this.connections.get(sender).buffer
|
|
if (buffer == null) {
|
|
buffer = this.connections.get(sender).buffer = []
|
|
}
|
|
buffer.push(m)
|
|
if (this.chance.bool({likelihood: 30})) {
|
|
// flush 1/2 with 30% chance
|
|
var flushLength = Math.round(buffer.length / 2)
|
|
buffer.splice(0, flushLength).forEach(m => {
|
|
super.receiveMessage(sender, m)
|
|
})
|
|
}
|
|
}
|
|
}
|
|
async _flushAll (flushUsers) {
|
|
if (flushUsers.some(u => u.connector.userId === this.userId)) {
|
|
// this one needs to sync with every other user
|
|
flushUsers = Array.from(this.connections.keys()).map(uid => this.testRoom.users.get(uid).y)
|
|
}
|
|
var finished = []
|
|
for (let i = 0; i < flushUsers.length; i++) {
|
|
let userId = flushUsers[i].connector.userId
|
|
if (userId !== this.userId && this.connections.has(userId)) {
|
|
let buffer = this.connections.get(userId).buffer
|
|
if (buffer != null) {
|
|
var messages = buffer.splice(0)
|
|
for (let j = 0; j < messages.length; j++) {
|
|
let p = super.receiveMessage(userId, messages[j])
|
|
finished.push(p)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
await Promise.all(finished)
|
|
await this.y.db.whenTransactionsFinished()
|
|
return finished.length > 0 ? 'flushing' : 'done'
|
|
}
|
|
}
|
|
Y.extend('test', TestConnector)
|
|
}
|
|
|
|
if (typeof Y !== 'undefined') {
|
|
extendTestConnector(Y)
|
|
}
|