fixed test connector buffer to really be parallel!

This commit is contained in:
Kevin Jahns 2016-04-11 16:20:27 +02:00
parent f3fadd3895
commit 20321c8a7d
3 changed files with 37 additions and 16 deletions

View File

@ -14,7 +14,7 @@ module.exports = function (Y) {
}, },
addUser: function (connector) { addUser: function (connector) {
this.users[connector.userId] = connector this.users[connector.userId] = connector
this.buffers[connector.userId] = [] this.buffers[connector.userId] = {}
for (var uname in this.users) { for (var uname in this.users) {
if (uname !== connector.userId) { if (uname !== connector.userId) {
var u = this.users[uname] var u = this.users[uname]
@ -32,14 +32,27 @@ module.exports = function (Y) {
}, },
flushOne: function flushOne () { flushOne: function flushOne () {
var bufs = [] var bufs = []
for (var i in globalRoom.buffers) { for (var receiver in globalRoom.buffers) {
if (globalRoom.buffers[i].length > 0) { let buff = globalRoom.buffers[receiver]
bufs.push(i) var push = false
for (let sender in buff) {
if (buff[sender].length > 0) {
push = true
break
}
}
if (push) {
bufs.push(receiver)
} }
} }
if (bufs.length > 0) { if (bufs.length > 0) {
var userId = getRandom(bufs) var userId = getRandom(bufs)
var m = globalRoom.buffers[userId].shift() let buff = globalRoom.buffers[userId]
let sender = getRandom(Object.keys(buff))
var m = buff[sender].shift()
if (buff[sender].length === 0) {
delete buff[sender]
}
var user = globalRoom.users[userId] var user = globalRoom.users[userId]
user.receiveMessage(m[0], m[1]) user.receiveMessage(m[0], m[1])
return user.y.db.whenTransactionsFinished() return user.y.db.whenTransactionsFinished()
@ -99,12 +112,19 @@ module.exports = function (Y) {
send (userId, message) { send (userId, message) {
var buffer = globalRoom.buffers[userId] var buffer = globalRoom.buffers[userId]
if (buffer != null) { if (buffer != null) {
buffer.push(JSON.parse(JSON.stringify([this.userId, message]))) if (buffer[this.userId] == null) {
buffer[this.userId] = []
}
buffer[this.userId].push(JSON.parse(JSON.stringify([this.userId, message])))
} }
} }
broadcast (message) { broadcast (message) {
for (var key in globalRoom.buffers) { for (var key in globalRoom.buffers) {
globalRoom.buffers[key].push(JSON.parse(JSON.stringify([this.userId, message]))) var buff = globalRoom.buffers[key]
if (buff[this.userId] == null) {
buff[this.userId] = []
}
buff[this.userId].push(JSON.parse(JSON.stringify([this.userId, message])))
} }
} }
isDisconnected () { isDisconnected () {
@ -127,8 +147,13 @@ module.exports = function (Y) {
flush () { flush () {
var self = this var self = this
return async(function * () { return async(function * () {
while (globalRoom.buffers[self.userId].length > 0) { var buff = globalRoom.buffers[self.userId]
var m = globalRoom.buffers[self.userId].shift() while (Object.keys(buff).length > 0) {
var sender = getRandom(Object.keys(buff))
var m = buff[sender].shift()
if (buff[sender].length === 0) {
delete buff[sender]
}
this.receiveMessage(m[0], m[1]) this.receiveMessage(m[0], m[1])
} }
yield self.whenTransactionsFinished() yield self.whenTransactionsFinished()

View File

@ -60,11 +60,7 @@ function getRandom (o) {
if (o instanceof Array) { if (o instanceof Array) {
return o[Math.floor(Math.random() * o.length)] return o[Math.floor(Math.random() * o.length)]
} else if (o.constructor === Object) { } else if (o.constructor === Object) {
var ks = [] return o[getRandom(Object.keys(o))]
for (var key in o) {
ks.push(key)
}
return o[getRandom(ks)]
} }
} }
g.getRandom = getRandom g.getRandom = getRandom
@ -79,7 +75,7 @@ g.getRandomNumber = getRandomNumber
function getRandomString () { function getRandomString () {
var tokens = 'abcdefäö' // ü\n\n\n\n\n\n\n' var tokens = 'abcdefäö' // ü\n\n\n\n\n\n\n'
return tokens[getRandomNumber(tokens.length - 1)] return tokens[getRandomNumber(tokens.length)]
} }
g.getRandomString = getRandomString g.getRandomString = getRandomString

View File

@ -409,7 +409,7 @@ module.exports = function (Y/* :any */) {
if (parentDeleted) { if (parentDeleted) {
op.gc = true op.gc = true
if (!op.deleted) { if (!op.deleted) {
yield* this.markDeleted(op.id, delLength) yield* this.markDeleted(op.id, opLength)
op.deleted = true op.deleted = true
if (op.opContent != null) { if (op.opContent != null) {
yield* this.deleteOperation(op.opContent) yield* this.deleteOperation(op.opContent)