175 lines
5.1 KiB
JavaScript
175 lines
5.1 KiB
JavaScript
/* global getRandom, async */
|
|
'use strict'
|
|
|
|
module.exports = function (Y) {
|
|
var globalRoom = {
|
|
users: {},
|
|
buffers: {},
|
|
removeUser: function (user) {
|
|
for (var i in this.users) {
|
|
this.users[i].userLeft(user)
|
|
}
|
|
delete this.users[user]
|
|
delete this.buffers[user]
|
|
},
|
|
addUser: function (connector) {
|
|
this.users[connector.userId] = connector
|
|
this.buffers[connector.userId] = {}
|
|
for (var uname in this.users) {
|
|
if (uname !== connector.userId) {
|
|
var u = this.users[uname]
|
|
u.userJoined(connector.userId, 'master')
|
|
connector.userJoined(u.userId, 'master')
|
|
}
|
|
}
|
|
},
|
|
whenTransactionsFinished: function () {
|
|
var self = this
|
|
return new Promise(function (resolve, reject) {
|
|
// The connector first has to send the messages to the db.
|
|
// Wait for the checkAuth-function to resolve
|
|
// The test lib only has a simple checkAuth function: `() => Promise.resolve()`
|
|
// Just add a function to the event-queue, in order to wait for the event.
|
|
// TODO: this may be buggy in test applications (but it isn't be for real-life apps)
|
|
setTimeout(function () {
|
|
var ps = []
|
|
for (var name in self.users) {
|
|
ps.push(self.users[name].y.db.whenTransactionsFinished())
|
|
}
|
|
Promise.all(ps).then(resolve, reject)
|
|
}, 0)
|
|
})
|
|
},
|
|
flushOne: function flushOne () {
|
|
var bufs = []
|
|
for (var receiver in globalRoom.buffers) {
|
|
let buff = globalRoom.buffers[receiver]
|
|
var push = false
|
|
for (let sender in buff) {
|
|
if (buff[sender].length > 0) {
|
|
push = true
|
|
break
|
|
}
|
|
}
|
|
if (push) {
|
|
bufs.push(receiver)
|
|
}
|
|
}
|
|
if (bufs.length > 0) {
|
|
var userId = getRandom(bufs)
|
|
let buff = globalRoom.buffers[userId]
|
|
let sender = getRandom(Object.keys(buff))
|
|
var m = buff[sender].shift()
|
|
if (buff[sender].length === 0) {
|
|
delete buff[sender]
|
|
}
|
|
var user = globalRoom.users[userId]
|
|
return user.receiveMessage(m[0], m[1]).then(function () {
|
|
return user.y.db.whenTransactionsFinished()
|
|
}, function () {})
|
|
} else {
|
|
return false
|
|
}
|
|
},
|
|
flushAll: function () {
|
|
return new Promise(function (resolve) {
|
|
// flushes may result in more created operations,
|
|
// flush until there is nothing more to flush
|
|
function nextFlush () {
|
|
var c = globalRoom.flushOne()
|
|
if (c) {
|
|
while (c) {
|
|
c = globalRoom.flushOne()
|
|
}
|
|
globalRoom.whenTransactionsFinished().then(nextFlush)
|
|
} else {
|
|
c = globalRoom.flushOne()
|
|
if (c) {
|
|
c.then(function () {
|
|
globalRoom.whenTransactionsFinished().then(nextFlush)
|
|
})
|
|
} else {
|
|
resolve()
|
|
}
|
|
}
|
|
}
|
|
globalRoom.whenTransactionsFinished().then(nextFlush)
|
|
})
|
|
}
|
|
}
|
|
Y.utils.globalRoom = globalRoom
|
|
|
|
var userIdCounter = 0
|
|
|
|
class Test extends Y.AbstractConnector {
|
|
constructor (y, options) {
|
|
if (options === undefined) {
|
|
throw new Error('Options must not be undefined!')
|
|
}
|
|
options.role = 'master'
|
|
options.forwardToSyncingClients = false
|
|
super(y, options)
|
|
this.setUserId((userIdCounter++) + '').then(() => {
|
|
globalRoom.addUser(this)
|
|
})
|
|
this.globalRoom = globalRoom
|
|
this.syncingClientDuration = 0
|
|
}
|
|
receiveMessage (sender, m) {
|
|
return super.receiveMessage(sender, JSON.parse(JSON.stringify(m)))
|
|
}
|
|
send (userId, message) {
|
|
var buffer = globalRoom.buffers[userId]
|
|
if (buffer != null) {
|
|
if (buffer[this.userId] == null) {
|
|
buffer[this.userId] = []
|
|
}
|
|
buffer[this.userId].push(JSON.parse(JSON.stringify([this.userId, message])))
|
|
}
|
|
}
|
|
broadcast (message) {
|
|
for (var key in globalRoom.buffers) {
|
|
var buff = globalRoom.buffers[key]
|
|
if (buff[this.userId] == null) {
|
|
buff[this.userId] = []
|
|
}
|
|
buff[this.userId].push(JSON.parse(JSON.stringify([this.userId, message])))
|
|
}
|
|
}
|
|
isDisconnected () {
|
|
return globalRoom.users[this.userId] == null
|
|
}
|
|
reconnect () {
|
|
if (this.isDisconnected()) {
|
|
globalRoom.addUser(this)
|
|
super.reconnect()
|
|
}
|
|
return Y.utils.globalRoom.flushAll()
|
|
}
|
|
disconnect () {
|
|
if (!this.isDisconnected()) {
|
|
globalRoom.removeUser(this.userId)
|
|
super.disconnect()
|
|
}
|
|
return this.y.db.whenTransactionsFinished()
|
|
}
|
|
flush () {
|
|
var self = this
|
|
return async(function * () {
|
|
var buff = globalRoom.buffers[self.userId]
|
|
while (Object.keys(buff).length > 0) {
|
|
var sender = getRandom(Object.keys(buff))
|
|
var m = buff[sender].shift()
|
|
if (buff[sender].length === 0) {
|
|
delete buff[sender]
|
|
}
|
|
yield this.receiveMessage(m[0], m[1])
|
|
}
|
|
yield self.whenTransactionsFinished()
|
|
})
|
|
}
|
|
}
|
|
|
|
Y.Test = Test
|
|
}
|