Use integer as userId instead of String

This commit is contained in:
Kevin Jahns 2017-07-13 00:37:35 +02:00
parent cd3f4a72d6
commit 3c317828d1
14 changed files with 89 additions and 658 deletions

View File

@ -12,7 +12,12 @@
<input name="message" type="text" style="width:60%;">
<input type="submit" value="Send">
</form>
<script src="../bower_components/yjs/y.js"></script>
<script src="../../y.js"></script>
<script src="../../../y-array/y-array.js"></script>
<script src="../../../y-map/dist/y-map.js"></script>
<script src="../../../y-text/dist/y-text.js"></script>
<script src="../../../y-memory/y-memory.js"></script>
<script src="../../../y-websockets-client/dist/y-websockets-client.js"></script>
<script src="./index.js"></script>
</body>
</html>

View File

@ -16,8 +16,10 @@
<path d="M 325.63989,452.69678 C 317.92163,423.11035 287.36353,445.9126 268.64673,447.82227 C 250.27417,449.69727 239.77563,436.57373 239.77563,419.32568 C 239.77563,402.07812 254.02368,395.70361 265.27222,394.5791 C 276.52075,393.4541 282.14526,396.45361 297.89331,407.32715 C 313.64136,418.20117 326.3894,399.45313 327.8894,392.7041 C 329.3894,385.95508 320.76538,342.83545 320.01538,330.46191 C 319.75855,326.2207 320.56372,321.21484 321.58911,315.6377 C 315.92114,317.13525 309.58862,318.82715 302.39282,320.71338 C 263.77222,330.83691 232.27661,318.46338 221.40259,308.71484 C 210.52905,298.96582 229.27661,284.71777 234.90112,279.09326 C 240.52563,273.46924 243.90015,262.59521 239.77563,247.22217 C 235.65112,231.84912 201.90503,222.10059 179.03296,239.34814 C 156.16089,256.59619 180.90796,285.09277 188.40698,293.3418 C 195.90601,301.59033 186.15698,312.08936 170.03442,313.58887 C 160.60962,314.46582 119.15894,313.67676 86.340576,312.87012 C 85.573975,347.74561 84.581299,386.15088 83.794922,402.07812 C 82.295166,432.44922 109.29175,422.32568 115.66577,420.82568 C 122.04028,419.32568 126.16479,409.57715 143.03735,408.45215 C 185.9231,405.59326 186.09985,466.69629 144.16235,467.69482 C 128.41431,468.06982 113.79126,451.19678 108.16675,447.44727 C 102.54272,443.69775 87.919433,442.94775 83.794922,457.9458 C 82.01709,464.41113 78.118652,481.65137 78.098144,496.18994 C 78.071045,515.38037 82.295166,531.81201 82.295166,531.81201 C 82.295166,531.81201 105.54224,526.5625 149.41187,526.5625 C 193.28149,526.5625 199.65552,547.93506 194.78101,558.80859 C 189.90649,569.68213 181.28296,568.93213 179.40796,583.18066 C 172.7063,634.11133 253.34106,631.08203 249.14917,584.68018 C 247.96948,571.62354 237.16528,571.66699 232.27661,557.68359 C 222.17944,528.80273 244.64966,523.56299 257.39819,524.68799 C 263.59351,525.23437 290.95679,529.73389 320.75757,531.21582 C 321.22437,531.23877 321.69312,531.25928 322.16089,531.28125 C 326.09985,518.06592 329.95825,503.87646 330.1394,498.44092 C 330.51392,487.19238 330.1394,469.94434 325.63989,452.69678 z " style="fill:#d3ea9d;stroke:#000000" id="path2508"/>
</g>
</svg>
<script src="../bower_components/yjs/y.js"></script>
<script src="../bower_components/d3/d3.js"></script>
<script src="../../y.js"></script>
<script src="../../../y-map/dist/y-map.js"></script>
<script src="../../../y-memory/y-memory.js"></script>
<script src="../../../y-websockets-client/dist/y-websockets-client.js"></script> <script src="../bower_components/d3/d3.js"></script>
<script src="./index.js"></script>
</body>
</html>

View File

@ -10,7 +10,6 @@ Y({
name: 'websockets-client',
room: 'Puzzle-example'
},
sourceDir: '/bower_components',
share: {
piece1: 'Map',
piece2: 'Map',

View File

@ -6,7 +6,7 @@
<script src="../../../y-array/y-array.js"></script>
<script src="../../../y-text/dist/y-text.js"></script>
<script src="../../../y-memory/y-memory.js"></script>
<script src="../../../y-websockets-client/dist/y-websockets-client.js"></script>
<script src="../../../y-websockets-client/y-websockets-client.js"></script>
<script src="./index.js"></script>
</body>
</html>

View File

@ -7,12 +7,13 @@ Y({
},
connector: {
name: 'websockets-client',
room: 'Textarea-example'
// url: '127.0.0.1:1234'
room: 'Textarea-example',
url: 'http://127.0.0.1:1234'
},
sourceDir: '/bower_components',
share: {
textarea: 'Text' // y.share.textarea is of type Y.Text
textarea: 'Text', // y.share.textarea is of type Y.Text
test: 'Array'
}
}).then(function (y) {
window.yTextarea = y

View File

@ -51,7 +51,7 @@ export default function extendConnector (Y/* :any */) {
this.logMessage = Y.debug('y:connector-message')
this.y.db.forwardAppliedOperations = opts.forwardAppliedOperations || false
this.role = opts.role
this.connections = {}
this.connections = new Map()
this.isSynced = false
this.userEventListeners = []
this.whenSyncedListeners = []
@ -64,7 +64,7 @@ export default function extendConnector (Y/* :any */) {
this.authInfo = opts.auth || null
this.checkAuth = opts.checkAuth || function () { return Promise.resolve('write') } // default is everyone has write access
if (opts.generateUserId !== false) {
this.setUserId(Y.utils.generateGuid())
this.setUserId(Y.utils.generateUserId())
}
}
resetAuth (auth) {
@ -82,7 +82,7 @@ export default function extendConnector (Y/* :any */) {
}
disconnect () {
this.log('discronnecting..')
this.connections = {}
this.connections = new Map()
this.isSynced = false
this.currentSyncTarget = null
this.syncingClients = []
@ -92,15 +92,18 @@ export default function extendConnector (Y/* :any */) {
}
repair () {
this.log('Repairing the state of Yjs. This can happen if messages get lost, and Yjs detects that something is wrong. If this happens often, please report an issue here: https://github.com/y-js/yjs/issues')
for (var name in this.connections) {
this.connections[name].isSynced = false
}
this.connections.forEach(user => { user.isSynced = false })
this.isSynced = false
this.currentSyncTarget = null
this.findNextSyncTarget()
}
setUserId (userId) {
if (this.userId == null) {
if (!Number.isInteger(userId)) {
let err = new Error('UserId must be an integer!')
this.y.emit('error', err)
throw err
}
this.log('Set userId to "%s"', userId)
this.userId = userId
return this.y.db.setUserId(userId)
@ -115,9 +118,9 @@ export default function extendConnector (Y/* :any */) {
this.userEventListeners = this.userEventListeners.filter(g => f !== g)
}
userLeft (user) {
if (this.connections[user] != null) {
this.log('User left: %s', user)
delete this.connections[user]
if (this.connections.has(user)) {
this.log('%s: User left %s', this.userId, user)
this.connections.delete(user)
if (user === this.currentSyncTarget) {
this.currentSyncTarget = null
this.findNextSyncTarget()
@ -137,17 +140,17 @@ export default function extendConnector (Y/* :any */) {
if (role == null) {
throw new Error('You must specify the role of the joined user!')
}
if (this.connections[user] != null) {
if (this.connections.has(user)) {
throw new Error('This user already joined!')
}
this.log('User joined: %s', user)
this.connections[user] = {
this.log('%s: User joined %s', this.userId, user)
this.connections.set(user, {
isSynced: false,
role: role
}
})
let defer = {}
defer.promise = new Promise(function (resolve) { defer.resolve = resolve })
this.connections[user].syncStep2 = defer
this.connections.get(user).syncStep2 = defer
for (var f of this.userEventListeners) {
f({
action: 'userJoined',
@ -174,8 +177,8 @@ export default function extendConnector (Y/* :any */) {
}
var syncUser = null
for (var uid in this.connections) {
if (!this.connections[uid].isSynced) {
for (var [uid, user] of this.connections) {
if (!user.isSynced) {
syncUser = uid
break
}
@ -217,11 +220,11 @@ export default function extendConnector (Y/* :any */) {
}
}
send (uid, message) {
this.log('Send \'%s\' to %s', message.type, uid)
this.log('%s: Send \'%s\' to %s', this.userId, message.type, uid)
this.logMessage('Message: %j', message)
}
broadcast (message) {
this.log('Broadcast \'%s\'', message.type)
this.log('%s: Broadcast \'%s\'', this.userId, message.type)
this.logMessage('Message: %j', message)
}
/*
@ -255,10 +258,10 @@ export default function extendConnector (Y/* :any */) {
if (sender === this.userId) {
return Promise.resolve()
}
this.log('Receive \'%s\' from %s', message.type, sender)
this.log('%s: Receive \'%s\' from %s', this.userId, message.type, sender)
this.logMessage('Message: %j', message)
if (message.protocolVersion != null && message.protocolVersion !== this.protocolVersion) {
this.log(
console.warn(
`You tried to sync with a yjs instance that has a different protocol version
(You: ${this.protocolVersion}, Client: ${message.protocolVersion}).
The sync was stopped. You need to upgrade your dependencies (especially Yjs & the Connector)!
@ -269,10 +272,10 @@ export default function extendConnector (Y/* :any */) {
})
return Promise.reject(new Error('Incompatible protocol version'))
}
if (message.auth != null && this.connections[sender] != null) {
if (message.auth != null && this.connections.has(sender)) {
// authenticate using auth in message
var auth = this.checkAuth(message.auth, this.y)
this.connections[sender].auth = auth
this.connections.get(sender).auth = auth
auth.then(auth => {
for (var f of this.userEventListeners) {
f({
@ -282,19 +285,18 @@ export default function extendConnector (Y/* :any */) {
})
}
})
} else if (this.connections[sender] != null && this.connections[sender].auth == null) {
} else if (this.connections.has(sender) && this.connections.get(sender).auth == null) {
// authenticate without otherwise
this.connections[sender].auth = this.checkAuth(null, this.y)
this.connections.get(sender).auth = this.checkAuth(null, this.y)
}
if (this.connections[sender] != null && this.connections[sender].auth != null) {
return this.connections[sender].auth.then((auth) => {
if (this.connections.has(sender) && this.connections.get(sender).auth != null) {
return this.connections.get(sender).auth.then(auth => {
if (message.type === 'sync step 1' && canRead(auth)) {
let conn = this
let m = message
let wait // wait for sync step 2 to complete
if (this.role === 'slave') {
wait = Promise.all(Object.keys(this.connections)
.map(uid => this.connections[uid])
wait = Promise.all(Array.from(this.connections.values())
.filter(conn => conn.role === 'master')
.map(conn => conn.syncStep2.promise)
)
@ -314,8 +316,8 @@ export default function extendConnector (Y/* :any */) {
type: 'sync step 2',
stateSet: currentStateSet,
deleteSet: ds,
protocolVersion: this.protocolVersion,
auth: this.authInfo
protocolVersion: conn.protocolVersion,
auth: conn.authInfo
}
if (message.preferUntransformed === true && Object.keys(m.stateSet).length === 0) {
answer.osUntransformed = yield * this.getOperationsUntransformed()
@ -323,7 +325,7 @@ export default function extendConnector (Y/* :any */) {
answer.os = yield * this.getOperations(m.stateSet)
}
conn.send(sender, answer)
if (this.forwardToSyncingClients) {
if (false && conn.forwardToSyncingClients) { // still need this? was previously disabled. TODO: remove forward syncingClients
conn.syncingClients.push(sender)
setTimeout(function () {
conn.syncingClients = conn.syncingClients.filter(function (cli) {
@ -342,7 +344,7 @@ export default function extendConnector (Y/* :any */) {
})
} else if (message.type === 'sync step 2' && canWrite(auth)) {
var db = this.y.db
let defer = this.connections[sender].syncStep2
let defer = this.connections.get(sender).syncStep2
let m = message
// apply operations first
db.requestTransaction(function * () {
@ -364,7 +366,7 @@ export default function extendConnector (Y/* :any */) {
return defer.promise
} else if (message.type === 'sync done') {
var self = this
this.connections[sender].syncStep2.promise.then(function () {
this.connections.get(sender).syncStep2.promise.then(function () {
self._setSyncedWith(sender)
})
} else if (message.type === 'update' && canWrite(auth)) {
@ -389,7 +391,7 @@ export default function extendConnector (Y/* :any */) {
}
}
_setSyncedWith (user) {
var conn = this.connections[user]
var conn = this.connections.get(user)
if (conn != null) {
conn.isSynced = true
}

View File

@ -1,178 +0,0 @@
/* global getRandom, async */
'use strict'
module.exports = function (Y) {
var globalRoom = {
users: {},
buffers: {},
removeUser: function (user) {
for (var i in this.users) {
this.users[i].userLeft(user)
}
delete this.users[user]
delete this.buffers[user]
},
addUser: function (connector) {
this.users[connector.userId] = connector
this.buffers[connector.userId] = {}
for (var uname in this.users) {
if (uname !== connector.userId) {
var u = this.users[uname]
u.userJoined(connector.userId, 'master')
connector.userJoined(u.userId, 'master')
}
}
},
whenTransactionsFinished: function () {
var self = this
return new Promise(function (resolve, reject) {
// The connector first has to send the messages to the db.
// Wait for the checkAuth-function to resolve
// The test lib only has a simple checkAuth function: `() => Promise.resolve()`
// Just add a function to the event-queue, in order to wait for the event.
// TODO: this may be buggy in test applications (but it isn't be for real-life apps)
setTimeout(function () {
var ps = []
for (var name in self.users) {
ps.push(self.users[name].y.db.whenTransactionsFinished())
}
Promise.all(ps).then(resolve, reject)
}, 10)
})
},
flushOne: function flushOne () {
var bufs = []
for (var receiver in globalRoom.buffers) {
let buff = globalRoom.buffers[receiver]
var push = false
for (let sender in buff) {
if (buff[sender].length > 0) {
push = true
break
}
}
if (push) {
bufs.push(receiver)
}
}
if (bufs.length > 0) {
var userId = getRandom(bufs)
let buff = globalRoom.buffers[userId]
let sender = getRandom(Object.keys(buff))
var m = buff[sender].shift()
if (buff[sender].length === 0) {
delete buff[sender]
}
var user = globalRoom.users[userId]
return user.receiveMessage(m[0], m[1]).then(function () {
return user.y.db.whenTransactionsFinished()
}, function () {})
} else {
return false
}
},
flushAll: function () {
return new Promise(function (resolve) {
// flushes may result in more created operations,
// flush until there is nothing more to flush
function nextFlush () {
var c = globalRoom.flushOne()
if (c) {
while (c) {
c = globalRoom.flushOne()
}
globalRoom.whenTransactionsFinished().then(nextFlush)
} else {
c = globalRoom.flushOne()
if (c) {
c.then(function () {
globalRoom.whenTransactionsFinished().then(nextFlush)
})
} else {
resolve()
}
}
}
globalRoom.whenTransactionsFinished().then(nextFlush)
})
}
}
Y.utils.globalRoom = globalRoom
var userIdCounter = 0
class Test extends Y.AbstractConnector {
constructor (y, options) {
if (options === undefined) {
throw new Error('Options must not be undefined!')
}
options.role = 'master'
options.forwardToSyncingClients = false
super(y, options)
this.setUserId((userIdCounter++) + '').then(() => {
globalRoom.addUser(this)
})
this.globalRoom = globalRoom
this.syncingClientDuration = 0
}
receiveMessage (sender, m) {
return super.receiveMessage(sender, JSON.parse(JSON.stringify(m)))
}
send (userId, message) {
var buffer = globalRoom.buffers[userId]
if (buffer != null) {
if (buffer[this.userId] == null) {
buffer[this.userId] = []
}
buffer[this.userId].push(JSON.parse(JSON.stringify([this.userId, message])))
}
}
broadcast (message) {
for (var key in globalRoom.buffers) {
var buff = globalRoom.buffers[key]
if (buff[this.userId] == null) {
buff[this.userId] = []
}
buff[this.userId].push(JSON.parse(JSON.stringify([this.userId, message])))
}
}
isDisconnected () {
return globalRoom.users[this.userId] == null
}
reconnect () {
if (this.isDisconnected()) {
globalRoom.addUser(this)
super.reconnect()
}
return Y.utils.globalRoom.flushAll()
}
disconnect () {
var waitForMe = Promise.resolve()
if (!this.isDisconnected()) {
globalRoom.removeUser(this.userId)
waitForMe = super.disconnect()
}
var self = this
return waitForMe.then(function () {
return self.y.db.whenTransactionsFinished()
})
}
flush () {
var self = this
return async(function * () {
var buff = globalRoom.buffers[self.userId]
while (Object.keys(buff).length > 0) {
var sender = getRandom(Object.keys(buff))
var m = buff[sender].shift()
if (buff[sender].length === 0) {
delete buff[sender]
}
yield this.receiveMessage(m[0], m[1])
}
yield self.whenTransactionsFinished()
})
}
}
Y.Test = Test
}

View File

@ -590,7 +590,7 @@ export default function extendDatabase (Y /* :any */) {
op.type = typedefinition[0].name
this.requestTransaction(function * () {
if (op.id[0] === '_') {
if (op.id[0] === -1) {
yield * this.setOperation(op)
} else {
yield * this.applyCreatedOperations([op])

View File

@ -1,404 +0,0 @@
/* eslint-env browser, jasmine */
/*
This is just a compilation of functions that help to test this library!
*/
// When testing, you store everything on the global object. We call it g
var Y = require('./y.js')
require('../../y-memory/src/Memory.js')(Y)
require('../../y-array/src/Array.js')(Y)
require('../../y-map/src/Map.js')(Y)
require('../../y-indexeddb/src/IndexedDB.js')(Y)
module.exports = Y
var g
if (typeof global !== 'undefined') {
g = global
} else if (typeof window !== 'undefined') {
g = window
} else {
throw new Error('No global object?')
}
g.g = g
// Helper methods for the random number generator
Math.seedrandom = require('seedrandom')
g.generateRandomSeed = function generateRandomSeed () {
var seed
if (typeof window !== 'undefined' && window.location.hash.length > 1) {
seed = window.location.hash.slice(1) // first character is the hash!
console.warn('Using random seed that was specified in the url!')
} else {
seed = JSON.stringify(Math.random())
}
console.info('Using random seed: ' + seed)
g.setRandomSeed(seed)
}
g.setRandomSeed = function setRandomSeed (seed) {
Math.seedrandom.currentSeed = seed
Math.seedrandom(Math.seedrandom.currentSeed, { global: true })
}
g.generateRandomSeed()
g.YConcurrencyTestingMode = true
jasmine.DEFAULT_TIMEOUT_INTERVAL = 200000
g.describeManyTimes = function describeManyTimes (times, name, f) {
for (var i = 0; i < times; i++) {
describe(name, f)
}
}
/*
Wait for a specified amount of time (in ms). defaults to 5ms
*/
function wait (t) {
if (t == null) {
t = 0
}
return new Promise(function (resolve) {
setTimeout(function () {
resolve()
}, t)
})
}
g.wait = wait
g.databases = ['memory']
if (typeof window !== 'undefined') {
g.databases.push('indexeddb')
} else {
g.databases.push('leveldb')
}
/*
returns a random element of o.
works on Object, and Array
*/
function getRandom (o) {
if (o instanceof Array) {
return o[Math.floor(Math.random() * o.length)]
} else if (o.constructor === Object) {
return o[getRandom(Object.keys(o))]
}
}
g.getRandom = getRandom
function getRandomNumber (n) {
if (n == null) {
n = 9999
}
return Math.floor(Math.random() * n)
}
g.getRandomNumber = getRandomNumber
function getRandomString () {
var chars = 'abcdefghijklmnopqrstuvwxyzäüöABCDEFGHIJKLMNOPQRSTUVWXYZÄÜÖ'
var char = chars[getRandomNumber(chars.length)] // ü\n\n\n\n\n\n\n'
var length = getRandomNumber(7)
var string = ''
for (var i = 0; i < length; i++) {
string += char
}
return string
}
g.getRandomString = getRandomString
function * applyTransactions (relAmount, numberOfTransactions, objects, users, transactions, noReconnect) {
g.generateRandomSeed() // create a new seed, so we can re-create the behavior
for (var i = 0; i < numberOfTransactions * relAmount + 1; i++) {
var r = Math.random()
if (r > 0.95) {
// 10% chance of toggling concurrent user interactions.
// There will be an artificial delay until ops can be executed by the type,
// therefore, operations of the database will be (pre)transformed until user operations arrive
yield (function simulateConcurrentUserInteractions (type) {
if (!(type instanceof Y.utils.CustomType) && type.y instanceof Y.utils.CustomType) {
// usually we expect type to be a custom type. But in YXml we share an object {y: YXml, dom: Dom} instead
type = type.y
}
if (type.eventHandler.awaiting === 0 && type.eventHandler._debuggingAwaiting !== true) {
type.eventHandler.awaiting = 1
type.eventHandler._debuggingAwaiting = true
} else {
// fixAwaitingInType will handle _debuggingAwaiting
return fixAwaitingInType(type)
}
})(getRandom(objects))
} else if (r >= 0.5) {
// 40% chance to flush
yield Y.utils.globalRoom.flushOne() // flushes for some user.. (not necessarily 0)
} else if (noReconnect || r >= 0.05) {
// 45% chance to create operation
var done = getRandom(transactions)(getRandom(objects))
if (done != null) {
yield done
} else {
yield wait()
}
yield Y.utils.globalRoom.whenTransactionsFinished()
} else {
// 5% chance to disconnect/reconnect
var u = getRandom(users)
yield Promise.all(objects.map(fixAwaitingInType))
if (u.connector.isDisconnected()) {
yield u.reconnect()
} else {
yield u.disconnect()
}
yield Promise.all(objects.map(fixAwaitingInType))
}
}
}
function fixAwaitingInType (type) {
if (!(type instanceof Y.utils.CustomType) && type.y instanceof Y.utils.CustomType) {
// usually we expect type to be a custom type. But in YXml we share an object {y: YXml, dom: Dom} instead
type = type.y
}
return new Promise(function (resolve) {
type.os.whenTransactionsFinished().then(function () {
// _debuggingAwaiting artificially increases the awaiting property. We need to make sure that we only do that once / reverse the effect once
type.os.requestTransaction(function * () {
if (type.eventHandler.awaiting > 0 && type.eventHandler._debuggingAwaiting === true) {
type.eventHandler._debuggingAwaiting = false
yield * type.eventHandler.awaitOps(this, function * () { /* mock function */ })
}
wait(50).then(type.os.whenTransactionsFinished()).then(wait(50)).then(resolve)
})
})
})
}
g.fixAwaitingInType = fixAwaitingInType
g.applyRandomTransactionsNoGCNoDisconnect = async(function * applyRandomTransactions (users, objects, transactions, numberOfTransactions) {
yield * applyTransactions(1, numberOfTransactions, objects, users, transactions, true)
yield Y.utils.globalRoom.flushAll()
yield Promise.all(objects.map(fixAwaitingInType))
})
g.applyRandomTransactionsAllRejoinNoGC = async(function * applyRandomTransactions (users, objects, transactions, numberOfTransactions) {
yield * applyTransactions(1, numberOfTransactions, objects, users, transactions)
yield Promise.all(objects.map(fixAwaitingInType))
yield Y.utils.globalRoom.flushAll()
yield Promise.all(objects.map(fixAwaitingInType))
for (var u in users) {
yield Promise.all(objects.map(fixAwaitingInType))
yield users[u].reconnect()
yield Promise.all(objects.map(fixAwaitingInType))
}
yield Promise.all(objects.map(fixAwaitingInType))
yield Y.utils.globalRoom.flushAll()
yield Promise.all(objects.map(fixAwaitingInType))
yield g.garbageCollectAllUsers(users)
})
g.applyRandomTransactionsWithGC = async(function * applyRandomTransactions (users, objects, transactions, numberOfTransactions) {
yield * applyTransactions(1, numberOfTransactions, objects, users.slice(1), transactions)
yield Y.utils.globalRoom.flushAll()
yield Promise.all(objects.map(fixAwaitingInType))
for (var u in users) {
// TODO: here, we enforce that two users never sync at the same time with u[0]
// enforce that in the connector itself!
yield users[u].reconnect()
}
yield Y.utils.globalRoom.flushAll()
yield Promise.all(objects.map(fixAwaitingInType))
yield g.garbageCollectAllUsers(users)
})
g.garbageCollectAllUsers = async(function * garbageCollectAllUsers (users) {
yield Y.utils.globalRoom.flushAll()
for (var i in users) {
yield users[i].db.emptyGarbageCollector()
}
})
g.compareAllUsers = async(function * compareAllUsers (users) {
var s1, s2 // state sets
var ds1, ds2 // delete sets
var allDels1, allDels2 // all deletions
var db1 = [] // operation store of user1
yield Y.utils.globalRoom.flushAll()
yield g.garbageCollectAllUsers(users)
yield Y.utils.globalRoom.flushAll()
// disconnect, then reconnect all users
// We do this to make sure that the gc is updated by everyone
for (var i = 0; i < users.length; i++) {
yield users[i].disconnect()
yield wait()
yield users[i].reconnect()
}
yield wait()
yield Y.utils.globalRoom.flushAll()
// t1 and t2 basically do the same. They define t[1,2], ds[1,2], and allDels[1,2]
function * t1 () {
s1 = yield * this.getStateSet()
ds1 = yield * this.getDeleteSet()
allDels1 = []
yield * this.ds.iterate(this, null, null, function * (d) {
allDels1.push(d)
})
}
function * t2 () {
s2 = yield * this.getStateSet()
ds2 = yield * this.getDeleteSet()
allDels2 = []
yield * this.ds.iterate(this, null, null, function * (d) {
allDels2.push(d)
})
}
var buffer = Y.utils.globalRoom.buffers
for (var name in buffer) {
if (buffer[name].length > 0) {
// not all ops were transmitted..
debugger // eslint-disable-line
}
}
for (var uid = 0; uid < users.length; uid++) {
var u = users[uid]
u.db.requestTransaction(function * () {
var sv = yield * this.getStateVector()
for (var s of sv) {
yield * this.updateState(s.user)
}
// compare deleted ops against deleteStore
yield * this.os.iterate(this, null, null, function * (o) {
if (o.deleted === true) {
expect(yield * this.isDeleted(o.id)).toBeTruthy()
}
})
// compare deleteStore against deleted ops
var ds = []
yield * this.ds.iterate(this, null, null, function * (d) {
ds.push(d)
})
for (var j in ds) {
var d = ds[j]
for (var i = 0; i < d.len; i++) {
var o = yield * this.getInsertion([d.id[0], d.id[1] + i])
// gc'd or deleted
if (d.gc) {
expect(o).toBeFalsy()
} else {
expect(o.deleted).toBeTruthy()
}
}
}
})
// compare allDels tree
if (s1 == null) {
u.db.requestTransaction(function * () {
yield * t1.call(this)
yield * this.os.iterate(this, null, null, function * (o) {
o = Y.utils.copyObject(o)
delete o.origin
delete o.originOf
db1.push(o)
})
})
} else {
u.db.requestTransaction(function * () {
yield * t2.call(this)
var db2 = []
yield * this.os.iterate(this, null, null, function * (o) {
o = Y.utils.copyObject(o)
delete o.origin
delete o.originOf
db2.push(o)
})
expect(s1).toEqual(s2)
expect(allDels1).toEqual(allDels2) // inner structure
expect(ds1).toEqual(ds2) // exported structure
db2.forEach((o, i) => {
expect(db1[i]).toEqual(o)
})
})
}
yield u.db.whenTransactionsFinished()
}
})
g.createUsers = async(function * createUsers (self, numberOfUsers, database, initType) {
if (Y.utils.globalRoom.users[0] != null) {
yield Y.utils.globalRoom.flushAll()
}
// destroy old users
for (var u in Y.utils.globalRoom.users) {
Y.utils.globalRoom.users[u].y.destroy()
}
self.users = null
var promises = []
for (var i = 0; i < numberOfUsers; i++) {
promises.push(Y({
db: {
name: database,
namespace: 'User ' + i,
cleanStart: true,
gcTimeout: -1,
gc: true,
repairCheckInterval: -1
},
connector: {
name: 'Test',
debug: false
},
share: {
root: initType || 'Map'
}
}))
}
self.users = yield Promise.all(promises)
self.types = self.users.map(function (u) { return u.share.root })
return self.users
})
/*
Until async/await arrives in js, we use this function to wait for promises
by yielding them.
*/
function async (makeGenerator) {
return function (arg) {
var generator = makeGenerator.apply(this, arguments)
function handle (result) {
if (result.done) return Promise.resolve(result.value)
return Promise.resolve(result.value).then(function (res) {
return handle(generator.next(res))
}, function (err) {
return handle(generator.throw(err))
})
}
try {
return handle(generator.next())
} catch (ex) {
generator.throw(ex)
// return Promise.reject(ex)
}
}
}
g.async = async
function logUsers (self) {
if (self.constructor === Array) {
self = {users: self}
}
self.users[0].db.logTable()
self.users[1].db.logTable()
self.users[2].db.logTable()
}
g.logUsers = logUsers

View File

@ -593,6 +593,7 @@ export default function extendTransaction (Y) {
for (var user in ds) {
var dv = ds[user]
user = Number.parseInt(user, 10)
var pos = 0
var d = dv[pos]
yield * this.ds.iterate(this, [user, 0], [user, Number.MAX_VALUE], function * (n) {
@ -821,11 +822,11 @@ export default function extendTransaction (Y) {
}
* getOperation (id/* :any */)/* :Transaction<any> */ {
var o = yield * this.os.find(id)
if (id[0] !== '_' || o != null) {
if (id[0] !== -1 || o != null) {
return o
} else { // type is string
// generate this operation?
var comp = id[1].split('_')
var comp = id[1].split(-1)
if (comp.length > 1) {
var struct = comp[0]
var op = Y.Struct[struct].create(id)
@ -934,7 +935,7 @@ export default function extendTransaction (Y) {
var endSV = yield * this.getStateVector()
for (let endState of endSV) {
let user = endState.user
if (user === '_') {
if (user === -1) {
continue
}
let startPos = startSS[user] || 0
@ -952,7 +953,7 @@ export default function extendTransaction (Y) {
for (let endState of endSV) {
let user = endState.user
let startPos = startSS[user]
if (user === '_') {
if (user === -1) {
continue
}
yield * this.os.iterate(this, [user, startPos], [user, Number.MAX_VALUE], function * (op) {
@ -1031,7 +1032,7 @@ export default function extendTransaction (Y) {
* getOperationsUntransformed () {
var ops = []
yield * this.os.iterate(this, null, null, function * (op) {
if (op.id[0] !== '_') {
if (op.id[0] !== -1) {
ops.push(op)
}
})
@ -1044,7 +1045,7 @@ export default function extendTransaction (Y) {
for (var i = 0; i < ops.length; i++) {
var op = ops[i]
// create, and modify parent, if it is created implicitly
if (op.parent != null && op.parent[0] === '_') {
if (op.parent != null && op.parent[0] === -1) {
if (op.struct === 'Insert') {
// update parents .map/start/end properties
if (op.parentSub != null && op.left == null) {

View File

@ -1,3 +1,5 @@
/* global crypto */
/*
EventHandler is an helper class for constructing custom types.
@ -818,8 +820,10 @@ export default function Utils (Y) {
}
Y.utils.createSmallLookupBuffer = createSmallLookupBuffer
// Generates a unique id, for use as a user id.
// Thx to @jed for this script https://gist.github.com/jed/982883
function generateGuid(a){return a?(a^Math.random()*16>>a/4).toString(16):([1e7]+-1e3+-4e3+-8e3+-1e11).replace(/[018]/g,generateGuid)} // eslint-disable-line
Y.utils.generateGuid = generateGuid
function generateUserId () {
let arr = new Uint32Array(1)
crypto.getRandomValues(arr)
return arr[0]
}
Y.utils.generateUserId = generateUserId
}

View File

@ -169,7 +169,7 @@ class YConfig extends Y.utils.NamedEventHandler {
var typeName = typeConstructor.splice(0, 1)
var type = Y[typeName]
var typedef = type.typeDefinition
var id = ['_', typedef.struct + '_' + typeName + '_' + propertyname + '_' + typeConstructor]
var id = [-1, typedef.struct + -1 + typeName + -1 + propertyname + -1 + typeConstructor]
var args = []
if (typeConstructor.length === 1) {
try {

View File

@ -104,7 +104,7 @@ export async function initArrays (t, opts) {
}
var share = Object.assign({ flushHelper: 'Map', array: 'Array' }, opts.share)
var chance = opts.chance || new Chance(t.getSeed() * 1000000000)
var connector = Object.assign({ room: 'debugging_' + t.name, testContext: t, chance }, opts.connector)
var connector = Object.assign({ room: 'debugging_' + t.name, generateUserId: false, testContext: t, chance }, opts.connector)
for (let i = 0; i < opts.users; i++) {
let dbOpts
let connOpts

View File

@ -6,49 +6,48 @@ var rooms = {}
export class TestRoom {
constructor (roomname) {
this.room = roomname
this.users = {}
this.users = new Map()
this.nextUserId = 0
}
join (connector) {
if (connector.userId == null) {
connector.setUserId('' + (this.nextUserId++))
connector.setUserId(this.nextUserId++)
}
Object.keys(this.users).forEach(uid => {
let user = this.users[uid]
this.users.forEach((user, uid) => {
if (user.role === 'master' || connector.role === 'master') {
this.users[uid].userJoined(connector.userId, connector.role)
connector.userJoined(uid, this.users[uid].role)
this.users.get(uid).userJoined(connector.userId, connector.role)
connector.userJoined(uid, this.users.get(uid).role)
}
})
this.users[connector.userId] = connector
this.users.set(connector.userId, connector)
}
leave (connector) {
delete this.users[connector.userId]
Object.keys(this.users).forEach(uid => {
this.users[uid].userLeft(connector.userId)
this.users.delete(connector.userId)
this.users.forEach(user => {
user.userLeft(connector.userId)
})
}
send (sender, receiver, m) {
m = JSON.parse(JSON.stringify(m))
var user = this.users[receiver]
var user = this.users.get(receiver)
if (user != null) {
user.receiveMessage(sender, m)
}
}
broadcast (sender, m) {
Object.keys(this.users).forEach(receiver => {
this.users.forEach((user, receiver) => {
this.send(sender, receiver, m)
})
}
async flushAll (users) {
let flushing = true
let allUserIds = Object.keys(this.users)
let allUserIds = Array.from(this.users.keys())
if (users == null) {
users = allUserIds.map(id => this.users[id].y)
users = allUserIds.map(id => this.users.get(id).y)
}
while (flushing) {
await wait(10)
let res = await Promise.all(allUserIds.map(id => this.users[id]._flushAll(users)))
let res = await Promise.all(allUserIds.map(id => this.users.get(id)._flushAll(users)))
flushing = res.some(status => status === 'flushing')
}
}
@ -109,10 +108,10 @@ export default function extendTestConnector (Y) {
})
}
receiveMessage (sender, m) {
if (this.userId !== sender && this.connections[sender] != null) {
var buffer = this.connections[sender].buffer
if (this.userId !== sender && this.connections.has(sender)) {
var buffer = this.connections.get(sender).buffer
if (buffer == null) {
buffer = this.connections[sender].buffer = []
buffer = this.connections.get(sender).buffer = []
}
buffer.push(m)
if (this.chance.bool({likelihood: 30})) {
@ -127,13 +126,13 @@ export default function extendTestConnector (Y) {
async _flushAll (flushUsers) {
if (flushUsers.some(u => u.connector.userId === this.userId)) {
// this one needs to sync with every other user
flushUsers = Object.keys(this.connections).map(id => this.testRoom.users[id].y)
flushUsers = Array.from(this.connections.keys()).map(uid => this.testRoom.users.get(uid).y)
}
var finished = []
for (let i = 0; i < flushUsers.length; i++) {
let userId = flushUsers[i].connector.userId
if (userId !== this.userId && this.connections[userId] != null) {
let buffer = this.connections[userId].buffer
if (userId !== this.userId && this.connections.has(userId)) {
let buffer = this.connections.get(userId).buffer
if (buffer != null) {
var messages = buffer.splice(0)
for (let j = 0; j < messages.length; j++) {