refactoring: removed default connector and persistence, new code style, proper jsdocs, enabled typechecking
This commit is contained in:
@@ -1,31 +1,224 @@
|
||||
|
||||
import _Y from '../src/Y.dist.js'
|
||||
import { DomBinding } from '../src/Y.js'
|
||||
import TestConnector from './test-connector.js'
|
||||
|
||||
import Chance from 'chance'
|
||||
import * as Y from '../src/index.js'
|
||||
import ItemJSON from '../src/Struct/ItemJSON.js'
|
||||
import ItemString from '../src/Struct/ItemString.js'
|
||||
import { defragmentItemContent } from '../src/Util/defragmentItemContent.js'
|
||||
import Quill from 'quill'
|
||||
import GC from '../src/Struct/GC.js'
|
||||
import * as random from '../lib/random/random.js'
|
||||
import * as message from '../src/message.js'
|
||||
import * as encoding from '../lib/encoding.js'
|
||||
import * as decoding from '../lib/decoding.js'
|
||||
import { createMutex } from '../lib/mutex.js'
|
||||
|
||||
export const Y = _Y
|
||||
export * from '../src/index.js'
|
||||
|
||||
export const database = { name: 'memory' }
|
||||
export const connector = { name: 'test', url: 'http://localhost:1234' }
|
||||
|
||||
Y.test = TestConnector
|
||||
|
||||
function getStateSet (y) {
|
||||
let ss = {}
|
||||
for (let [user, clock] of y.ss.state) {
|
||||
ss[user] = clock
|
||||
}
|
||||
return ss
|
||||
/**
|
||||
* @param {TestYInstance} y
|
||||
* @param {Y.Transaction} transaction
|
||||
*/
|
||||
const afterTransaction = (y, transaction) => {
|
||||
y.mMux(() => {
|
||||
if (transaction.encodedStructsLen > 0) {
|
||||
const encoder = encoding.createEncoder()
|
||||
message.writeUpdate(encoder, transaction.encodedStructsLen, transaction.encodedStructs)
|
||||
broadcastMessage(y, encoding.toBuffer(encoder))
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
function getDeleteSet (y) {
|
||||
export class TestYInstance extends Y.Y {
|
||||
/**
|
||||
* @param {TestConnector} testConnector
|
||||
*/
|
||||
constructor (testConnector) {
|
||||
super()
|
||||
/**
|
||||
* @type {TestConnector}
|
||||
*/
|
||||
this.tc = testConnector
|
||||
/**
|
||||
* @type {Map<TestYInstance, Array<ArrayBuffer>>}
|
||||
*/
|
||||
this.receiving = new Map()
|
||||
/**
|
||||
* Message mutex
|
||||
* @type {Function}
|
||||
*/
|
||||
this.mMux = createMutex()
|
||||
testConnector.allConns.add(this)
|
||||
// set up observe on local model
|
||||
this.on('afterTransaction', afterTransaction)
|
||||
this.connect()
|
||||
}
|
||||
/**
|
||||
* Disconnect from TestConnector.
|
||||
*/
|
||||
disconnect () {
|
||||
this.receiving = new Map()
|
||||
this.tc.onlineConns.delete(this)
|
||||
}
|
||||
/**
|
||||
* Append yourself to the list of known Y instances in testconnector.
|
||||
* Also initiate sync with all clients.
|
||||
*/
|
||||
connect () {
|
||||
if (!this.tc.onlineConns.has(this)) {
|
||||
const encoder = encoding.createEncoder()
|
||||
message.writeSyncStep1(encoder, this)
|
||||
// publish SyncStep1
|
||||
broadcastMessage(this, encoding.toBuffer(encoder))
|
||||
this.tc.onlineConns.forEach(remoteYInstance => {
|
||||
// remote instance sends instance to this instance
|
||||
const encoder = encoding.createEncoder()
|
||||
message.writeSyncStep1(encoder, remoteYInstance)
|
||||
this._receive(encoding.toBuffer(encoder), remoteYInstance)
|
||||
})
|
||||
this.tc.onlineConns.add(this)
|
||||
}
|
||||
}
|
||||
/**
|
||||
* Receive a message from another client. This message is only appended to the list of receiving messages.
|
||||
* TestConnector decides when this client actually reads this message.
|
||||
*
|
||||
* @param {ArrayBuffer} message
|
||||
* @param {TestYInstance} remoteClient
|
||||
*/
|
||||
_receive (message, remoteClient) {
|
||||
let messages = this.receiving.get(remoteClient)
|
||||
if (messages === undefined) {
|
||||
messages = []
|
||||
this.receiving.set(remoteClient, messages)
|
||||
}
|
||||
messages.push(message)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Keeps track of TestYInstances.
|
||||
*
|
||||
* The TestYInstances add/remove themselves from the list of connections maintained in this object.
|
||||
* I think it makes sense. Deal with it.
|
||||
*/
|
||||
export class TestConnector {
|
||||
constructor (prng) {
|
||||
/**
|
||||
* @type {Set<TestYInstance>}
|
||||
*/
|
||||
this.allConns = new Set()
|
||||
/**
|
||||
* @type {Set<TestYInstance>}
|
||||
*/
|
||||
this.onlineConns = new Set()
|
||||
/**
|
||||
* @type {random.PRNG}
|
||||
*/
|
||||
this.prng = prng
|
||||
}
|
||||
/**
|
||||
* Create a new Y instance and add it to the list of connections
|
||||
*/
|
||||
createY () {
|
||||
return new TestYInstance(this)
|
||||
}
|
||||
/**
|
||||
* Choose random connection and flush a random message from a random sender.
|
||||
*
|
||||
* If this function was unable to flush a message, because there are no more messages to flush, it returns false. true otherwise.
|
||||
* @return {boolean}
|
||||
*/
|
||||
flushRandomMessage () {
|
||||
const prng = this.prng
|
||||
const conns = Array.from(this.onlineConns).filter(conn => conn.receiving.size > 0)
|
||||
if (conns.length > 0) {
|
||||
const receiver = random.oneOf(prng, conns)
|
||||
const [sender, messages] = random.oneOf(prng, Array.from(receiver.receiving))
|
||||
const m = messages.shift()
|
||||
if (messages.length === 0) {
|
||||
receiver.receiving.delete(sender)
|
||||
}
|
||||
const encoder = encoding.createEncoder()
|
||||
receiver.mMux(() => {
|
||||
// do not publish data created when this function is executed (could be ss2 or update message)
|
||||
message.readMessage(decoding.createDecoder(m), encoder, receiver)
|
||||
})
|
||||
if (encoding.length(encoder) > 0) {
|
||||
// send reply message
|
||||
sender._receive(encoding.toBuffer(encoder), receiver)
|
||||
}
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
/**
|
||||
* @return {boolean} True iff this function actually flushed something
|
||||
*/
|
||||
flushAllMessages () {
|
||||
let didSomething = false
|
||||
while (this.flushRandomMessage()) {
|
||||
didSomething = true
|
||||
}
|
||||
return didSomething
|
||||
}
|
||||
reconnectAll () {
|
||||
this.allConns.forEach(conn => conn.connect())
|
||||
}
|
||||
disconnectAll () {
|
||||
this.allConns.forEach(conn => conn.disconnect())
|
||||
}
|
||||
syncAll () {
|
||||
this.reconnectAll()
|
||||
this.flushAllMessages()
|
||||
}
|
||||
/**
|
||||
* @return {boolean} Whether it was possible to disconnect a randon connection.
|
||||
*/
|
||||
disconnectRandom () {
|
||||
if (this.onlineConns.size === 0) {
|
||||
return false
|
||||
}
|
||||
random.oneOf(this.prng, Array.from(this.onlineConns)).disconnect()
|
||||
return true
|
||||
}
|
||||
/**
|
||||
* @return {boolean} Whether it was possible to reconnect a random connection.
|
||||
*/
|
||||
reconnectRandom () {
|
||||
const reconnectable = []
|
||||
this.allConns.forEach(conn => {
|
||||
if (!this.onlineConns.has(conn)) {
|
||||
reconnectable.push(conn)
|
||||
}
|
||||
})
|
||||
if (reconnectable.length === 0) {
|
||||
return false
|
||||
}
|
||||
random.oneOf(this.prng, reconnectable).connect()
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {TestYInstance} y // publish message created by `y` to all other online clients
|
||||
* @param {ArrayBuffer} m
|
||||
*/
|
||||
const broadcastMessage = (y, m) =>
|
||||
y.tc.onlineConns.forEach(remoteYInstance => {
|
||||
if (remoteYInstance !== y) {
|
||||
remoteYInstance._receive(m, y)
|
||||
}
|
||||
})
|
||||
|
||||
/**
|
||||
* Convert DS to a proper DeleteSet of Map.
|
||||
*
|
||||
* @param {Y.Y} y
|
||||
* @return {Object<number, Array<[number, number, boolean]>>}
|
||||
*/
|
||||
const getDeleteSet = y => {
|
||||
/**
|
||||
* @type {Object<number, Array<[number, number, boolean]>}
|
||||
*/
|
||||
var ds = {}
|
||||
y.ds.iterate(null, null, function (n) {
|
||||
var user = n._id.user
|
||||
@@ -42,29 +235,28 @@ function getDeleteSet (y) {
|
||||
return ds
|
||||
}
|
||||
|
||||
/*
|
||||
/**
|
||||
* 1. reconnect and flush all
|
||||
* 2. user 0 gc
|
||||
* 3. get type content
|
||||
* 4. disconnect & reconnect all (so gc is propagated)
|
||||
* 5. compare os, ds, ss
|
||||
*
|
||||
* @param {any} t
|
||||
* @param {Array<TestYInstance>} users
|
||||
*/
|
||||
export async function compareUsers (t, users) {
|
||||
await Promise.all(users.map(u => u.reconnect()))
|
||||
if (users[0].connector.testRoom == null) {
|
||||
await wait(100)
|
||||
}
|
||||
await flushAll(t, users)
|
||||
await wait()
|
||||
await flushAll(t, users)
|
||||
await wait()
|
||||
await flushAll(t, users)
|
||||
await wait()
|
||||
await flushAll(t, users)
|
||||
export function compareUsers (t, users) {
|
||||
users.forEach(u => u.connect())
|
||||
do {
|
||||
users.forEach(u => {
|
||||
// flush dom changes
|
||||
u.domBinding._beforeTransactionHandler(null, null, false)
|
||||
})
|
||||
} while (users[0].tc.flushAllMessages())
|
||||
|
||||
var userArrayValues = users.map(u => u.define('array', Y.Array).toJSON().map(val => JSON.stringify(val)))
|
||||
var userMapValues = users.map(u => u.define('map', Y.Map).toJSON())
|
||||
var userXmlValues = users.map(u => u.define('xml', Y.Xml).toString())
|
||||
var userXmlValues = users.map(u => u.define('xml', Y.XmlElement).toString())
|
||||
var userTextValues = users.map(u => u.define('text', Y.Text).toDelta())
|
||||
var userQuillValues = users.map(u => {
|
||||
u.quill.update('yjs') // get latest changes
|
||||
@@ -81,7 +273,8 @@ export async function compareUsers (t, users) {
|
||||
json = {
|
||||
type: 'GC',
|
||||
id: op._id,
|
||||
length: op._length
|
||||
length: op._length,
|
||||
content: null
|
||||
}
|
||||
} else {
|
||||
json = {
|
||||
@@ -90,7 +283,8 @@ export async function compareUsers (t, users) {
|
||||
right: op._right === null ? null : op._right._id,
|
||||
length: op._length,
|
||||
deleted: op._deleted,
|
||||
parent: op._parent._id
|
||||
parent: op._parent._id,
|
||||
content: null
|
||||
}
|
||||
}
|
||||
if (op instanceof ItemJSON || op instanceof ItemString) {
|
||||
@@ -100,11 +294,15 @@ export async function compareUsers (t, users) {
|
||||
})
|
||||
data.os = ops
|
||||
data.ds = getDeleteSet(u)
|
||||
data.ss = getStateSet(u)
|
||||
const ss = {}
|
||||
u.ss.state.forEach((user, clock) => {
|
||||
ss[user] = clock
|
||||
})
|
||||
data.ss = ss
|
||||
return data
|
||||
})
|
||||
for (var i = 0; i < data.length - 1; i++) {
|
||||
await t.asyncGroup(async () => {
|
||||
t.group(() => {
|
||||
t.compare(userArrayValues[i], userArrayValues[i + 1], 'array types')
|
||||
t.compare(userMapValues[i], userMapValues[i + 1], 'map types')
|
||||
t.compare(userXmlValues[i], userXmlValues[i + 1], 'xml types')
|
||||
@@ -115,10 +313,20 @@ export async function compareUsers (t, users) {
|
||||
t.compare(data[i].ss, data[i + 1].ss, 'ss')
|
||||
}, `Compare user${i} with user${i + 1}`)
|
||||
}
|
||||
users.forEach(user => {
|
||||
if (user._missingStructs.size !== 0) {
|
||||
t.fail('missing structs should mes empty!')
|
||||
}
|
||||
})
|
||||
users.map(u => u.destroy())
|
||||
}
|
||||
|
||||
function domFilter (nodeName, attrs) {
|
||||
/**
|
||||
* @param {string} nodeName
|
||||
* @param {Map<string, string>} attrs
|
||||
* @return {null|Map<string, string>}
|
||||
*/
|
||||
function filter (nodeName, attrs) {
|
||||
if (nodeName === 'HIDDEN') {
|
||||
return null
|
||||
}
|
||||
@@ -126,30 +334,27 @@ function domFilter (nodeName, attrs) {
|
||||
return attrs
|
||||
}
|
||||
|
||||
export async function initArrays (t, opts) {
|
||||
/**
|
||||
* @param {any} t
|
||||
* @param {any} opts
|
||||
* @return {any}
|
||||
*/
|
||||
export function initArrays (t, opts) {
|
||||
var result = {
|
||||
users: []
|
||||
}
|
||||
var chance = opts.chance || new Chance(t.getSeed() * 1000000000)
|
||||
var conn = Object.assign({ room: 'debugging_' + t.name, testContext: t, chance }, connector)
|
||||
var prng = opts.prng || random.createPRNG(t.getSeed())
|
||||
const testConnector = new TestConnector(prng)
|
||||
result.testConnector = testConnector
|
||||
for (let i = 0; i < opts.users; i++) {
|
||||
let connOpts
|
||||
if (i === 0) {
|
||||
connOpts = Object.assign({ role: 'master' }, conn)
|
||||
} else {
|
||||
connOpts = Object.assign({ role: 'slave' }, conn)
|
||||
}
|
||||
let y = new Y(connOpts.room, {
|
||||
userID: i, // evil hackery, don't try this at home
|
||||
connector: connOpts
|
||||
})
|
||||
let y = testConnector.createY()
|
||||
result.users.push(y)
|
||||
result['array' + i] = y.define('array', Y.Array)
|
||||
result['map' + i] = y.define('map', Y.Map)
|
||||
const yxml = y.define('xml', Y.XmlElement)
|
||||
result['xml' + i] = yxml
|
||||
const dom = document.createElement('my-dom')
|
||||
const domBinding = new DomBinding(yxml, dom, { domFilter })
|
||||
const domBinding = new Y.DomBinding(yxml, dom, { filter })
|
||||
result['domBinding' + i] = domBinding
|
||||
result['dom' + i] = dom
|
||||
const textType = y.define('text', Y.Text)
|
||||
@@ -159,116 +364,38 @@ export async function initArrays (t, opts) {
|
||||
result['quill' + i] = quill
|
||||
y.quill = quill // put quill on the y object (so we can use it later)
|
||||
y.dom = dom
|
||||
y.on('afterTransaction', function () {
|
||||
for (let missing of y._missingStructs.values()) {
|
||||
if (missing.size > 0) {
|
||||
console.error(new Error('Test check in "afterTransaction": missing should be empty!'))
|
||||
}
|
||||
}
|
||||
})
|
||||
y.domBinding = domBinding
|
||||
}
|
||||
result.array0.delete(0, result.array0.length)
|
||||
if (result.users[0].connector.testRoom != null) {
|
||||
// flush for sync if test-connector
|
||||
await result.users[0].connector.testRoom.flushAll(result.users)
|
||||
}
|
||||
await Promise.all(result.users.map(u => {
|
||||
return new Promise(function (resolve) {
|
||||
u.connector.whenSynced(resolve)
|
||||
})
|
||||
}))
|
||||
await flushAll(t, result.users)
|
||||
testConnector.syncAll()
|
||||
return result
|
||||
}
|
||||
|
||||
export async function flushAll (t, users) {
|
||||
// users = users.filter(u => u.connector.isSynced)
|
||||
if (users.length === 0) {
|
||||
return
|
||||
}
|
||||
await wait(10)
|
||||
if (users[0].connector.testRoom != null) {
|
||||
// use flushAll method specified in Test Connector
|
||||
await users[0].connector.testRoom.flushAll(users)
|
||||
} else {
|
||||
var flushCounter = users[0].get('flushHelper', Y.Map).get('0') || 0
|
||||
flushCounter++
|
||||
await Promise.all(users.map(async (u, i) => {
|
||||
// wait for all users to set the flush counter to the same value
|
||||
await new Promise(resolve => {
|
||||
function observer () {
|
||||
var allUsersReceivedUpdate = true
|
||||
for (var i = 0; i < users.length; i++) {
|
||||
if (u.get('flushHelper', Y.Map).get(i + '') !== flushCounter) {
|
||||
allUsersReceivedUpdate = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if (allUsersReceivedUpdate) {
|
||||
resolve()
|
||||
}
|
||||
}
|
||||
u.get('flushHelper', Y.Map).observe(observer)
|
||||
u.get('flushHelper').set(i + '', flushCounter)
|
||||
})
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
export async function flushSome (t, users) {
|
||||
if (users[0].connector.testRoom == null) {
|
||||
// if not test-connector, wait for some time for operations to arrive
|
||||
await wait(100)
|
||||
}
|
||||
}
|
||||
|
||||
export function wait (t) {
|
||||
return new Promise(function (resolve) {
|
||||
setTimeout(resolve, t != null ? t : 100)
|
||||
})
|
||||
}
|
||||
|
||||
export async function applyRandomTests (t, mods, iterations) {
|
||||
const chance = new Chance(t.getSeed() * 1000000000)
|
||||
var initInformation = await initArrays(t, { users: 5, chance: chance })
|
||||
let { users } = initInformation
|
||||
export function applyRandomTests (t, mods, iterations) {
|
||||
const prng = random.createPRNG(t.getSeed())
|
||||
const result = initArrays(t, { users: 5, prng })
|
||||
const { testConnector, users } = result
|
||||
for (var i = 0; i < iterations; i++) {
|
||||
if (chance.bool({likelihood: 10})) {
|
||||
// 10% chance to disconnect/reconnect a user
|
||||
// we make sure that the first users always is connected
|
||||
let user = chance.pickone(users.slice(1))
|
||||
if (user.connector.isSynced) {
|
||||
if (users.filter(u => u.connector.isSynced).length > 1) {
|
||||
// make sure that at least one user remains in the room
|
||||
await user.disconnect()
|
||||
if (users[0].connector.testRoom == null) {
|
||||
await wait(100)
|
||||
}
|
||||
}
|
||||
if (random.int32(prng, 0, 100) <= 2) {
|
||||
// 2% chance to disconnect/reconnect a random user
|
||||
if (random.bool(prng)) {
|
||||
testConnector.disconnectRandom()
|
||||
} else {
|
||||
await user.reconnect()
|
||||
if (users[0].connector.testRoom == null) {
|
||||
await wait(100)
|
||||
}
|
||||
await new Promise(function (resolve) {
|
||||
user.connector.whenSynced(resolve)
|
||||
})
|
||||
testConnector.reconnectRandom()
|
||||
}
|
||||
} else if (chance.bool({likelihood: 5})) {
|
||||
// 20%*!prev chance to flush all & garbagecollect
|
||||
} else if (random.int32(prng, 0, 100) <= 1) {
|
||||
// 1% chance to flush all & garbagecollect
|
||||
// TODO: We do not gc all users as this does not work yet
|
||||
// await garbageCollectUsers(t, users)
|
||||
await flushAll(t, users)
|
||||
// await users[0].db.emptyGarbageCollector()
|
||||
await flushAll(t, users)
|
||||
} else if (chance.bool({likelihood: 10})) {
|
||||
// 20%*!prev chance to flush some operations
|
||||
await flushSome(t, users)
|
||||
testConnector.flushAllMessages()
|
||||
// await users[0].db.emptyGarbageCollector() // TODO: reintroduce GC tests!
|
||||
} else if (random.int32(prng, 0, 100) <= 50) {
|
||||
// 50% chance to flush a random message
|
||||
testConnector.flushRandomMessage()
|
||||
}
|
||||
let user = chance.pickone(users)
|
||||
var test = chance.pickone(mods)
|
||||
test(t, user, chance)
|
||||
let user = random.oneOf(prng, users)
|
||||
var test = random.oneOf(prng, mods)
|
||||
test(t, user, prng)
|
||||
}
|
||||
await compareUsers(t, users)
|
||||
return initInformation
|
||||
compareUsers(t, users)
|
||||
return result
|
||||
}
|
||||
|
||||
@@ -1,162 +0,0 @@
|
||||
import { wait } from './helper.js'
|
||||
import { messageToString } from '../src/MessageHandler/messageToString.js'
|
||||
import AbstractConnector from '../src/Connector.js'
|
||||
|
||||
var rooms = {}
|
||||
|
||||
export class TestRoom {
|
||||
constructor (roomname) {
|
||||
this.room = roomname
|
||||
this.users = new Map()
|
||||
}
|
||||
join (connector) {
|
||||
const userID = connector.y.userID
|
||||
this.users.set(userID, connector)
|
||||
for (let [uid, user] of this.users) {
|
||||
if (uid !== userID && (user.role === 'master' || connector.role === 'master')) {
|
||||
// The order is important because there is no timeout in send/receiveMessage
|
||||
// (the user that receives a sync step must already now about the sender)
|
||||
if (user.role === 'master') {
|
||||
connector.userJoined(uid, user.role)
|
||||
user.userJoined(userID, connector.role)
|
||||
} else if (connector.role === 'master') {
|
||||
user.userJoined(userID, connector.role)
|
||||
connector.userJoined(uid, user.role)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
leave (connector) {
|
||||
this.users.delete(connector.y.userID)
|
||||
this.users.forEach(user => {
|
||||
user.userLeft(connector.y.userID)
|
||||
})
|
||||
}
|
||||
send (sender, receiver, m) {
|
||||
var user = this.users.get(receiver)
|
||||
if (user != null) {
|
||||
user.receiveMessage(sender, m)
|
||||
}
|
||||
}
|
||||
broadcast (sender, m) {
|
||||
this.users.forEach((user, receiver) => {
|
||||
this.send(sender, receiver, m)
|
||||
})
|
||||
}
|
||||
async flushAll (users) {
|
||||
let flushing = true
|
||||
let allUsers = Array.from(this.users.values())
|
||||
if (users == null) {
|
||||
users = allUsers.map(user => user.y)
|
||||
}
|
||||
while (flushing) {
|
||||
await wait(10)
|
||||
let res = await Promise.all(allUsers.map(user => user._flushAll(users)))
|
||||
flushing = res.some(status => status === 'flushing')
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
function getTestRoom (roomname) {
|
||||
if (rooms[roomname] == null) {
|
||||
rooms[roomname] = new TestRoom(roomname)
|
||||
}
|
||||
return rooms[roomname]
|
||||
}
|
||||
|
||||
export default class TestConnector extends AbstractConnector {
|
||||
constructor (y, options) {
|
||||
if (options === undefined) {
|
||||
throw new Error('Options must not be undefined!')
|
||||
}
|
||||
if (options.room == null) {
|
||||
throw new Error('You must define a room name!')
|
||||
}
|
||||
options.forwardAppliedOperations = options.role === 'master'
|
||||
super(y, options)
|
||||
this.options = options
|
||||
this.room = options.room
|
||||
this.chance = options.chance
|
||||
this.testRoom = getTestRoom(this.room)
|
||||
this.testRoom.join(this)
|
||||
}
|
||||
disconnect () {
|
||||
this.testRoom.leave(this)
|
||||
return super.disconnect()
|
||||
}
|
||||
logBufferParsed () {
|
||||
console.log(' === Logging buffer of user ' + this.y.userID + ' === ')
|
||||
for (let [user, conn] of this.connections) {
|
||||
console.log(` ${user}:`)
|
||||
for (let i = 0; i < conn.buffer.length; i++) {
|
||||
console.log(messageToString(conn.buffer[i]))
|
||||
}
|
||||
}
|
||||
}
|
||||
reconnect () {
|
||||
this.testRoom.join(this)
|
||||
super.reconnect()
|
||||
return new Promise(resolve => {
|
||||
this.whenSynced(resolve)
|
||||
})
|
||||
}
|
||||
send (uid, message) {
|
||||
super.send(uid, message)
|
||||
this.testRoom.send(this.y.userID, uid, message)
|
||||
}
|
||||
broadcast (message) {
|
||||
super.broadcast(message)
|
||||
this.testRoom.broadcast(this.y.userID, message)
|
||||
}
|
||||
async whenSynced (f) {
|
||||
var synced = false
|
||||
var periodicFlushTillSync = () => {
|
||||
if (synced) {
|
||||
f()
|
||||
} else {
|
||||
this.testRoom.flushAll([this.y]).then(function () {
|
||||
setTimeout(periodicFlushTillSync, 10)
|
||||
})
|
||||
}
|
||||
}
|
||||
periodicFlushTillSync()
|
||||
return super.whenSynced(function () {
|
||||
synced = true
|
||||
})
|
||||
}
|
||||
receiveMessage (sender, m) {
|
||||
if (this.y.userID !== sender && this.connections.has(sender)) {
|
||||
var buffer = this.connections.get(sender).buffer
|
||||
if (buffer == null) {
|
||||
buffer = this.connections.get(sender).buffer = []
|
||||
}
|
||||
buffer.push(m)
|
||||
if (this.chance.bool({likelihood: 30})) {
|
||||
// flush 1/2 with 30% chance
|
||||
var flushLength = Math.round(buffer.length / 2)
|
||||
buffer.splice(0, flushLength).forEach(m => {
|
||||
super.receiveMessage(sender, m)
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
async _flushAll (flushUsers) {
|
||||
if (flushUsers.some(u => u.connector.y.userID === this.y.userID)) {
|
||||
// this one needs to sync with every other user
|
||||
flushUsers = Array.from(this.connections.keys()).map(uid => this.testRoom.users.get(uid).y)
|
||||
}
|
||||
for (let i = 0; i < flushUsers.length; i++) {
|
||||
let userID = flushUsers[i].connector.y.userID
|
||||
if (userID !== this.y.userID && this.connections.has(userID)) {
|
||||
let buffer = this.connections.get(userID).buffer
|
||||
if (buffer != null) {
|
||||
var messages = buffer.splice(0)
|
||||
for (let j = 0; j < messages.length; j++) {
|
||||
super.receiveMessage(userID, messages[j])
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return 'done'
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user