Compare commits

...

4 Commits

Author SHA1 Message Date
Kevin Jahns
a267affeda v13.0.0-10 -- distribution files 2017-08-03 00:25:40 +02:00
Kevin Jahns
90b2a895b8 13.0.0-10 2017-08-03 00:25:13 +02:00
Kevin Jahns
4f57c91b82 fix syncing protocol - compute messages after auth 2017-08-03 00:24:01 +02:00
Kevin Jahns
3e1d89253f fix unhandled message bug in connector 2017-08-01 17:49:37 +02:00
8 changed files with 22985 additions and 33 deletions

View File

@@ -1,6 +1,6 @@
{
"name": "yjs",
"version": "13.0.0-9",
"version": "13.0.0-10",
"description": "A framework for real-time p2p shared editing on any data",
"main": "./y.node.js",
"browser": "./y.js",

View File

@@ -156,22 +156,24 @@ export default function extendConnector (Y/* :any */) {
sendSyncStep1(this, syncUser)
} else {
if (!conn.isSynced) {
this.y.db.requestTransaction(function * () {
if (!conn.isSynced) {
// it is crucial that isSynced is set at the time garbageCollectAfterSync is called
conn.isSynced = true
// It is safer to remove this!
// TODO: remove: yield * this.garbageCollectAfterSync()
// call whensynced listeners
for (var f of conn.whenSyncedListeners) {
f()
}
conn.whenSyncedListeners = []
}
})
conn._fireIsSyncedListeners()
}
}
}
_fireIsSyncedListeners () {
this.y.db.whenTransactionsFinished().then(() => {
if (!this.isSynced) {
this.isSynced = true
// It is safer to remove this!
// TODO: remove: yield * this.garbageCollectAfterSync()
// call whensynced listeners
for (var f of this.whenSyncedListeners) {
f()
}
this.whenSyncedListeners = []
}
})
}
send (uid, buffer) {
if (!(buffer instanceof ArrayBuffer || buffer instanceof Uint8Array)) {
throw new Error('Expected Message to be an ArrayBuffer or Uint8Array - please don\'t use this method to send custom messages')
@@ -222,10 +224,10 @@ export default function extendConnector (Y/* :any */) {
*/
receiveMessage (sender, buffer) {
if (!(buffer instanceof ArrayBuffer || buffer instanceof Uint8Array)) {
throw new Error('Expected Message to be an ArrayBuffer or Uint8Array!')
return Promise.reject(new Error('Expected Message to be an ArrayBuffer or Uint8Array!'))
}
if (sender === this.userId) {
return
return Promise.resolve()
}
let decoder = new BinaryDecoder(buffer)
let encoder = new BinaryEncoder()
@@ -244,29 +246,32 @@ export default function extendConnector (Y/* :any */) {
if (messageType === 'sync step 1' || messageType === 'sync step 2') {
let auth = decoder.readVarUint()
if (senderConn.auth == null) {
senderConn.processAfterAuth.push([sender, buffer])
senderConn.processAfterAuth.push([messageType, senderConn, decoder, encoder, sender])
// check auth
return this.checkAuth(auth, this.y, sender).then(authPermissions => {
senderConn.auth = authPermissions
this.y.emit('userAuthenticated', {
user: senderConn.uid,
auth: authPermissions
})
return senderConn.syncStep2.promise
}).then(() => {
if (senderConn.processAfterAuth == null) {
return Promise.resolve()
if (senderConn.auth == null) {
senderConn.auth = authPermissions
this.y.emit('userAuthenticated', {
user: senderConn.uid,
auth: authPermissions
})
}
let messages = senderConn.processAfterAuth
senderConn.processAfterAuth = null
return Promise.all(messages.map(m =>
this.receiveMessage(m[0], m[1])
))
senderConn.processAfterAuth = []
return messages.reduce((p, m) =>
p.then(() => this.computeMessage(m[0], m[1], m[2], m[3], m[4]))
, Promise.resolve())
})
}
}
if (senderConn.auth != null) {
return this.computeMessage(messageType, senderConn, decoder, encoder, sender)
} else {
senderConn.processAfterAuth.push([messageType, senderConn, decoder, encoder, sender])
}
}
computeMessage (messageType, senderConn, decoder, encoder, sender) {
if (messageType === 'sync step 1' && (senderConn.auth === 'write' || senderConn.auth === 'read')) {
// cannot wait for sync step 1 to finish, because we may wait for sync step 2 in sync step 1 (->lock)
computeMessageSyncStep1(decoder, encoder, this, senderConn, sender)
@@ -276,7 +281,7 @@ export default function extendConnector (Y/* :any */) {
} else if (messageType === 'update' && senderConn.auth === 'write') {
return computeMessageUpdate(decoder, encoder, this, senderConn, sender)
} else {
Promise.reject(new Error('Unable to receive message'))
return Promise.reject(new Error('Unable to receive message'))
}
}
_setSyncedWith (user) {
@@ -288,6 +293,9 @@ export default function extendConnector (Y/* :any */) {
this.currentSyncTarget = null
this.findNextSyncTarget()
}
if (this.role === 'slave' && conn.role === 'master') {
this._fireIsSyncedListeners()
}
}
/*
Currently, the HB encodes operations as JSON. For the moment I want to keep it

10
y.js Normal file

File diff suppressed because one or more lines are too long

1
y.js.map Normal file

File diff suppressed because one or more lines are too long

4956
y.node.js Normal file

File diff suppressed because it is too large Load Diff

1
y.node.js.map Normal file

File diff suppressed because one or more lines are too long

17975
y.test.js Normal file

File diff suppressed because one or more lines are too long

1
y.test.js.map Normal file

File diff suppressed because one or more lines are too long