Compare commits
6 Commits
v13.0.0-9
...
v13.0.0-11
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
88c71b9c1e | ||
|
|
be3b8b65ce | ||
|
|
d093ef56c8 | ||
|
|
90b2a895b8 | ||
|
|
4f57c91b82 | ||
|
|
3e1d89253f |
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "yjs",
|
"name": "yjs",
|
||||||
"version": "13.0.0-9",
|
"version": "13.0.0-11",
|
||||||
"description": "A framework for real-time p2p shared editing on any data",
|
"description": "A framework for real-time p2p shared editing on any data",
|
||||||
"main": "./y.node.js",
|
"main": "./y.node.js",
|
||||||
"browser": "./y.js",
|
"browser": "./y.js",
|
||||||
|
|||||||
127
src/Connector.js
127
src/Connector.js
@@ -43,10 +43,12 @@ export default function extendConnector (Y/* :any */) {
|
|||||||
this.setUserId(Y.utils.generateUserId())
|
this.setUserId(Y.utils.generateUserId())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
reconnect () {
|
reconnect () {
|
||||||
this.log('reconnecting..')
|
this.log('reconnecting..')
|
||||||
return this.y.db.startGarbageCollector()
|
return this.y.db.startGarbageCollector()
|
||||||
}
|
}
|
||||||
|
|
||||||
disconnect () {
|
disconnect () {
|
||||||
this.log('discronnecting..')
|
this.log('discronnecting..')
|
||||||
this.connections = new Map()
|
this.connections = new Map()
|
||||||
@@ -56,13 +58,16 @@ export default function extendConnector (Y/* :any */) {
|
|||||||
this.y.db.stopGarbageCollector()
|
this.y.db.stopGarbageCollector()
|
||||||
return this.y.db.whenTransactionsFinished()
|
return this.y.db.whenTransactionsFinished()
|
||||||
}
|
}
|
||||||
|
|
||||||
repair () {
|
repair () {
|
||||||
this.log('Repairing the state of Yjs. This can happen if messages get lost, and Yjs detects that something is wrong. If this happens often, please report an issue here: https://github.com/y-js/yjs/issues')
|
this.log('Repairing the state of Yjs. This can happen if messages get lost, and Yjs detects that something is wrong. If this happens often, please report an issue here: https://github.com/y-js/yjs/issues')
|
||||||
this.connections.forEach(user => { user.isSynced = false })
|
|
||||||
this.isSynced = false
|
this.isSynced = false
|
||||||
this.currentSyncTarget = null
|
this.connections.forEach((user, userId) => {
|
||||||
this.findNextSyncTarget()
|
user.isSynced = false
|
||||||
|
this._syncWithUser(userId)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
setUserId (userId) {
|
setUserId (userId) {
|
||||||
if (this.userId == null) {
|
if (this.userId == null) {
|
||||||
if (!Number.isInteger(userId)) {
|
if (!Number.isInteger(userId)) {
|
||||||
@@ -77,20 +82,21 @@ export default function extendConnector (Y/* :any */) {
|
|||||||
return null
|
return null
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
onUserEvent (f) {
|
onUserEvent (f) {
|
||||||
this.userEventListeners.push(f)
|
this.userEventListeners.push(f)
|
||||||
}
|
}
|
||||||
|
|
||||||
removeUserEventListener (f) {
|
removeUserEventListener (f) {
|
||||||
this.userEventListeners = this.userEventListeners.filter(g => f !== g)
|
this.userEventListeners = this.userEventListeners.filter(g => f !== g)
|
||||||
}
|
}
|
||||||
|
|
||||||
userLeft (user) {
|
userLeft (user) {
|
||||||
if (this.connections.has(user)) {
|
if (this.connections.has(user)) {
|
||||||
this.log('%s: User left %s', this.userId, user)
|
this.log('%s: User left %s', this.userId, user)
|
||||||
this.connections.delete(user)
|
this.connections.delete(user)
|
||||||
if (user === this.currentSyncTarget) {
|
// check if isSynced event can be sent now
|
||||||
this.currentSyncTarget = null
|
this._setSyncedWith(null)
|
||||||
this.findNextSyncTarget()
|
|
||||||
}
|
|
||||||
for (var f of this.userEventListeners) {
|
for (var f of this.userEventListeners) {
|
||||||
f({
|
f({
|
||||||
action: 'userLeft',
|
action: 'userLeft',
|
||||||
@@ -99,7 +105,7 @@ export default function extendConnector (Y/* :any */) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
userJoined (user, role) {
|
userJoined (user, role, auth) {
|
||||||
if (role == null) {
|
if (role == null) {
|
||||||
throw new Error('You must specify the role of the joined user!')
|
throw new Error('You must specify the role of the joined user!')
|
||||||
}
|
}
|
||||||
@@ -112,7 +118,7 @@ export default function extendConnector (Y/* :any */) {
|
|||||||
isSynced: false,
|
isSynced: false,
|
||||||
role: role,
|
role: role,
|
||||||
processAfterAuth: [],
|
processAfterAuth: [],
|
||||||
auth: null,
|
auth: auth || null,
|
||||||
receivedSyncStep2: false
|
receivedSyncStep2: false
|
||||||
})
|
})
|
||||||
let defer = {}
|
let defer = {}
|
||||||
@@ -125,9 +131,7 @@ export default function extendConnector (Y/* :any */) {
|
|||||||
role: role
|
role: role
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
if (this.currentSyncTarget == null) {
|
this._syncWithUser(user)
|
||||||
this.findNextSyncTarget()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
// Execute a function _when_ we are connected.
|
// Execute a function _when_ we are connected.
|
||||||
// If not connected, wait until connected
|
// If not connected, wait until connected
|
||||||
@@ -138,39 +142,25 @@ export default function extendConnector (Y/* :any */) {
|
|||||||
this.whenSyncedListeners.push(f)
|
this.whenSyncedListeners.push(f)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
findNextSyncTarget () {
|
_syncWithUser (userid) {
|
||||||
if (this.currentSyncTarget != null || this.role === 'slave') {
|
if (this.role === 'slave') {
|
||||||
return // "The current sync has not finished or this is controlled by a master!"
|
return // "The current sync has not finished or this is controlled by a master!"
|
||||||
}
|
}
|
||||||
|
sendSyncStep1(this, userid)
|
||||||
var syncUser = null
|
}
|
||||||
for (var [uid, user] of this.connections) {
|
_fireIsSyncedListeners () {
|
||||||
if (!user.isSynced) {
|
this.y.db.whenTransactionsFinished().then(() => {
|
||||||
syncUser = uid
|
if (!this.isSynced) {
|
||||||
break
|
this.isSynced = true
|
||||||
|
// It is safer to remove this!
|
||||||
|
// TODO: remove: yield * this.garbageCollectAfterSync()
|
||||||
|
// call whensynced listeners
|
||||||
|
for (var f of this.whenSyncedListeners) {
|
||||||
|
f()
|
||||||
|
}
|
||||||
|
this.whenSyncedListeners = []
|
||||||
}
|
}
|
||||||
}
|
})
|
||||||
var conn = this
|
|
||||||
if (syncUser != null) {
|
|
||||||
this.currentSyncTarget = syncUser
|
|
||||||
sendSyncStep1(this, syncUser)
|
|
||||||
} else {
|
|
||||||
if (!conn.isSynced) {
|
|
||||||
this.y.db.requestTransaction(function * () {
|
|
||||||
if (!conn.isSynced) {
|
|
||||||
// it is crucial that isSynced is set at the time garbageCollectAfterSync is called
|
|
||||||
conn.isSynced = true
|
|
||||||
// It is safer to remove this!
|
|
||||||
// TODO: remove: yield * this.garbageCollectAfterSync()
|
|
||||||
// call whensynced listeners
|
|
||||||
for (var f of conn.whenSyncedListeners) {
|
|
||||||
f()
|
|
||||||
}
|
|
||||||
conn.whenSyncedListeners = []
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
send (uid, buffer) {
|
send (uid, buffer) {
|
||||||
if (!(buffer instanceof ArrayBuffer || buffer instanceof Uint8Array)) {
|
if (!(buffer instanceof ArrayBuffer || buffer instanceof Uint8Array)) {
|
||||||
@@ -222,10 +212,10 @@ export default function extendConnector (Y/* :any */) {
|
|||||||
*/
|
*/
|
||||||
receiveMessage (sender, buffer) {
|
receiveMessage (sender, buffer) {
|
||||||
if (!(buffer instanceof ArrayBuffer || buffer instanceof Uint8Array)) {
|
if (!(buffer instanceof ArrayBuffer || buffer instanceof Uint8Array)) {
|
||||||
throw new Error('Expected Message to be an ArrayBuffer or Uint8Array!')
|
return Promise.reject(new Error('Expected Message to be an ArrayBuffer or Uint8Array!'))
|
||||||
}
|
}
|
||||||
if (sender === this.userId) {
|
if (sender === this.userId) {
|
||||||
return
|
return Promise.resolve()
|
||||||
}
|
}
|
||||||
let decoder = new BinaryDecoder(buffer)
|
let decoder = new BinaryDecoder(buffer)
|
||||||
let encoder = new BinaryEncoder()
|
let encoder = new BinaryEncoder()
|
||||||
@@ -244,29 +234,33 @@ export default function extendConnector (Y/* :any */) {
|
|||||||
if (messageType === 'sync step 1' || messageType === 'sync step 2') {
|
if (messageType === 'sync step 1' || messageType === 'sync step 2') {
|
||||||
let auth = decoder.readVarUint()
|
let auth = decoder.readVarUint()
|
||||||
if (senderConn.auth == null) {
|
if (senderConn.auth == null) {
|
||||||
senderConn.processAfterAuth.push([sender, buffer])
|
senderConn.processAfterAuth.push([messageType, senderConn, decoder, encoder, sender])
|
||||||
|
|
||||||
// check auth
|
// check auth
|
||||||
return this.checkAuth(auth, this.y, sender).then(authPermissions => {
|
return this.checkAuth(auth, this.y, sender).then(authPermissions => {
|
||||||
senderConn.auth = authPermissions
|
if (senderConn.auth == null) {
|
||||||
this.y.emit('userAuthenticated', {
|
senderConn.auth = authPermissions
|
||||||
user: senderConn.uid,
|
this.y.emit('userAuthenticated', {
|
||||||
auth: authPermissions
|
user: senderConn.uid,
|
||||||
})
|
auth: authPermissions
|
||||||
return senderConn.syncStep2.promise
|
})
|
||||||
}).then(() => {
|
|
||||||
if (senderConn.processAfterAuth == null) {
|
|
||||||
return Promise.resolve()
|
|
||||||
}
|
}
|
||||||
let messages = senderConn.processAfterAuth
|
let messages = senderConn.processAfterAuth
|
||||||
senderConn.processAfterAuth = null
|
senderConn.processAfterAuth = []
|
||||||
return Promise.all(messages.map(m =>
|
|
||||||
this.receiveMessage(m[0], m[1])
|
return messages.reduce((p, m) =>
|
||||||
))
|
p.then(() => this.computeMessage(m[0], m[1], m[2], m[3], m[4]))
|
||||||
|
, Promise.resolve())
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (senderConn.auth != null) {
|
||||||
|
return this.computeMessage(messageType, senderConn, decoder, encoder, sender)
|
||||||
|
} else {
|
||||||
|
senderConn.processAfterAuth.push([messageType, senderConn, decoder, encoder, sender])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
computeMessage (messageType, senderConn, decoder, encoder, sender) {
|
||||||
if (messageType === 'sync step 1' && (senderConn.auth === 'write' || senderConn.auth === 'read')) {
|
if (messageType === 'sync step 1' && (senderConn.auth === 'write' || senderConn.auth === 'read')) {
|
||||||
// cannot wait for sync step 1 to finish, because we may wait for sync step 2 in sync step 1 (->lock)
|
// cannot wait for sync step 1 to finish, because we may wait for sync step 2 in sync step 1 (->lock)
|
||||||
computeMessageSyncStep1(decoder, encoder, this, senderConn, sender)
|
computeMessageSyncStep1(decoder, encoder, this, senderConn, sender)
|
||||||
@@ -276,19 +270,20 @@ export default function extendConnector (Y/* :any */) {
|
|||||||
} else if (messageType === 'update' && senderConn.auth === 'write') {
|
} else if (messageType === 'update' && senderConn.auth === 'write') {
|
||||||
return computeMessageUpdate(decoder, encoder, this, senderConn, sender)
|
return computeMessageUpdate(decoder, encoder, this, senderConn, sender)
|
||||||
} else {
|
} else {
|
||||||
Promise.reject(new Error('Unable to receive message'))
|
return Promise.reject(new Error('Unable to receive message'))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
_setSyncedWith (user) {
|
_setSyncedWith (user) {
|
||||||
var conn = this.connections.get(user)
|
if (user != null) {
|
||||||
if (conn != null) {
|
this.connections.get(user).isSynced = true
|
||||||
conn.isSynced = true
|
|
||||||
}
|
}
|
||||||
if (user === this.currentSyncTarget) {
|
let conns = Array.from(this.connections.values())
|
||||||
this.currentSyncTarget = null
|
if (conns.length > 0 && conns.every(u => u.isSynced)) {
|
||||||
this.findNextSyncTarget()
|
this._fireIsSyncedListeners()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
Currently, the HB encodes operations as JSON. For the moment I want to keep it
|
Currently, the HB encodes operations as JSON. For the moment I want to keep it
|
||||||
that way. Maybe we support encoding in the HB as XML in the future, but for now I don't want
|
that way. Maybe we support encoding in the HB as XML in the future, but for now I don't want
|
||||||
|
|||||||
@@ -141,7 +141,7 @@ export async function initArrays (t, opts) {
|
|||||||
let connOpts
|
let connOpts
|
||||||
if (i === 0) {
|
if (i === 0) {
|
||||||
// Only one instance can gc!
|
// Only one instance can gc!
|
||||||
dbOpts = Object.assign({ gc: true }, opts.db)
|
dbOpts = Object.assign({ gc: false }, opts.db)
|
||||||
connOpts = Object.assign({ role: 'master' }, connector)
|
connOpts = Object.assign({ role: 'master' }, connector)
|
||||||
} else {
|
} else {
|
||||||
dbOpts = Object.assign({ gc: false }, opts.db)
|
dbOpts = Object.assign({ gc: false }, opts.db)
|
||||||
|
|||||||
129
y.node.js
129
y.node.js
@@ -1,7 +1,7 @@
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* yjs - A framework for real-time p2p shared editing on any data
|
* yjs - A framework for real-time p2p shared editing on any data
|
||||||
* @version v13.0.0-9
|
* @version v13.0.0-11
|
||||||
* @license MIT
|
* @license MIT
|
||||||
*/
|
*/
|
||||||
|
|
||||||
@@ -640,10 +640,12 @@ function extendConnector (Y/* :any */) {
|
|||||||
this.setUserId(Y.utils.generateUserId());
|
this.setUserId(Y.utils.generateUserId());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
reconnect () {
|
reconnect () {
|
||||||
this.log('reconnecting..');
|
this.log('reconnecting..');
|
||||||
return this.y.db.startGarbageCollector()
|
return this.y.db.startGarbageCollector()
|
||||||
}
|
}
|
||||||
|
|
||||||
disconnect () {
|
disconnect () {
|
||||||
this.log('discronnecting..');
|
this.log('discronnecting..');
|
||||||
this.connections = new Map();
|
this.connections = new Map();
|
||||||
@@ -653,13 +655,16 @@ function extendConnector (Y/* :any */) {
|
|||||||
this.y.db.stopGarbageCollector();
|
this.y.db.stopGarbageCollector();
|
||||||
return this.y.db.whenTransactionsFinished()
|
return this.y.db.whenTransactionsFinished()
|
||||||
}
|
}
|
||||||
|
|
||||||
repair () {
|
repair () {
|
||||||
this.log('Repairing the state of Yjs. This can happen if messages get lost, and Yjs detects that something is wrong. If this happens often, please report an issue here: https://github.com/y-js/yjs/issues');
|
this.log('Repairing the state of Yjs. This can happen if messages get lost, and Yjs detects that something is wrong. If this happens often, please report an issue here: https://github.com/y-js/yjs/issues');
|
||||||
this.connections.forEach(user => { user.isSynced = false; });
|
|
||||||
this.isSynced = false;
|
this.isSynced = false;
|
||||||
this.currentSyncTarget = null;
|
this.connections.forEach((user, userId) => {
|
||||||
this.findNextSyncTarget();
|
user.isSynced = false;
|
||||||
|
this._syncWithUser(userId);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
setUserId (userId) {
|
setUserId (userId) {
|
||||||
if (this.userId == null) {
|
if (this.userId == null) {
|
||||||
if (!Number.isInteger(userId)) {
|
if (!Number.isInteger(userId)) {
|
||||||
@@ -674,20 +679,21 @@ function extendConnector (Y/* :any */) {
|
|||||||
return null
|
return null
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
onUserEvent (f) {
|
onUserEvent (f) {
|
||||||
this.userEventListeners.push(f);
|
this.userEventListeners.push(f);
|
||||||
}
|
}
|
||||||
|
|
||||||
removeUserEventListener (f) {
|
removeUserEventListener (f) {
|
||||||
this.userEventListeners = this.userEventListeners.filter(g => f !== g);
|
this.userEventListeners = this.userEventListeners.filter(g => f !== g);
|
||||||
}
|
}
|
||||||
|
|
||||||
userLeft (user) {
|
userLeft (user) {
|
||||||
if (this.connections.has(user)) {
|
if (this.connections.has(user)) {
|
||||||
this.log('%s: User left %s', this.userId, user);
|
this.log('%s: User left %s', this.userId, user);
|
||||||
this.connections.delete(user);
|
this.connections.delete(user);
|
||||||
if (user === this.currentSyncTarget) {
|
// check if isSynced event can be sent now
|
||||||
this.currentSyncTarget = null;
|
this._setSyncedWith(null);
|
||||||
this.findNextSyncTarget();
|
|
||||||
}
|
|
||||||
for (var f of this.userEventListeners) {
|
for (var f of this.userEventListeners) {
|
||||||
f({
|
f({
|
||||||
action: 'userLeft',
|
action: 'userLeft',
|
||||||
@@ -696,7 +702,7 @@ function extendConnector (Y/* :any */) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
userJoined (user, role) {
|
userJoined (user, role, auth) {
|
||||||
if (role == null) {
|
if (role == null) {
|
||||||
throw new Error('You must specify the role of the joined user!')
|
throw new Error('You must specify the role of the joined user!')
|
||||||
}
|
}
|
||||||
@@ -709,7 +715,7 @@ function extendConnector (Y/* :any */) {
|
|||||||
isSynced: false,
|
isSynced: false,
|
||||||
role: role,
|
role: role,
|
||||||
processAfterAuth: [],
|
processAfterAuth: [],
|
||||||
auth: null,
|
auth: auth || null,
|
||||||
receivedSyncStep2: false
|
receivedSyncStep2: false
|
||||||
});
|
});
|
||||||
let defer = {};
|
let defer = {};
|
||||||
@@ -722,9 +728,7 @@ function extendConnector (Y/* :any */) {
|
|||||||
role: role
|
role: role
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
if (this.currentSyncTarget == null) {
|
this._syncWithUser(user);
|
||||||
this.findNextSyncTarget();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
// Execute a function _when_ we are connected.
|
// Execute a function _when_ we are connected.
|
||||||
// If not connected, wait until connected
|
// If not connected, wait until connected
|
||||||
@@ -735,39 +739,25 @@ function extendConnector (Y/* :any */) {
|
|||||||
this.whenSyncedListeners.push(f);
|
this.whenSyncedListeners.push(f);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
findNextSyncTarget () {
|
_syncWithUser (userid) {
|
||||||
if (this.currentSyncTarget != null || this.role === 'slave') {
|
if (this.role === 'slave') {
|
||||||
return // "The current sync has not finished or this is controlled by a master!"
|
return // "The current sync has not finished or this is controlled by a master!"
|
||||||
}
|
}
|
||||||
|
sendSyncStep1(this, userid);
|
||||||
var syncUser = null;
|
}
|
||||||
for (var [uid, user] of this.connections) {
|
_fireIsSyncedListeners () {
|
||||||
if (!user.isSynced) {
|
this.y.db.whenTransactionsFinished().then(() => {
|
||||||
syncUser = uid;
|
if (!this.isSynced) {
|
||||||
break
|
this.isSynced = true;
|
||||||
|
// It is safer to remove this!
|
||||||
|
// TODO: remove: yield * this.garbageCollectAfterSync()
|
||||||
|
// call whensynced listeners
|
||||||
|
for (var f of this.whenSyncedListeners) {
|
||||||
|
f();
|
||||||
|
}
|
||||||
|
this.whenSyncedListeners = [];
|
||||||
}
|
}
|
||||||
}
|
});
|
||||||
var conn = this;
|
|
||||||
if (syncUser != null) {
|
|
||||||
this.currentSyncTarget = syncUser;
|
|
||||||
sendSyncStep1(this, syncUser);
|
|
||||||
} else {
|
|
||||||
if (!conn.isSynced) {
|
|
||||||
this.y.db.requestTransaction(function * () {
|
|
||||||
if (!conn.isSynced) {
|
|
||||||
// it is crucial that isSynced is set at the time garbageCollectAfterSync is called
|
|
||||||
conn.isSynced = true;
|
|
||||||
// It is safer to remove this!
|
|
||||||
// TODO: remove: yield * this.garbageCollectAfterSync()
|
|
||||||
// call whensynced listeners
|
|
||||||
for (var f of conn.whenSyncedListeners) {
|
|
||||||
f();
|
|
||||||
}
|
|
||||||
conn.whenSyncedListeners = [];
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
send (uid, buffer) {
|
send (uid, buffer) {
|
||||||
if (!(buffer instanceof ArrayBuffer || buffer instanceof Uint8Array)) {
|
if (!(buffer instanceof ArrayBuffer || buffer instanceof Uint8Array)) {
|
||||||
@@ -819,10 +809,10 @@ function extendConnector (Y/* :any */) {
|
|||||||
*/
|
*/
|
||||||
receiveMessage (sender, buffer) {
|
receiveMessage (sender, buffer) {
|
||||||
if (!(buffer instanceof ArrayBuffer || buffer instanceof Uint8Array)) {
|
if (!(buffer instanceof ArrayBuffer || buffer instanceof Uint8Array)) {
|
||||||
throw new Error('Expected Message to be an ArrayBuffer or Uint8Array!')
|
return Promise.reject(new Error('Expected Message to be an ArrayBuffer or Uint8Array!'))
|
||||||
}
|
}
|
||||||
if (sender === this.userId) {
|
if (sender === this.userId) {
|
||||||
return
|
return Promise.resolve()
|
||||||
}
|
}
|
||||||
let decoder = new BinaryDecoder(buffer);
|
let decoder = new BinaryDecoder(buffer);
|
||||||
let encoder = new BinaryEncoder();
|
let encoder = new BinaryEncoder();
|
||||||
@@ -841,29 +831,33 @@ function extendConnector (Y/* :any */) {
|
|||||||
if (messageType === 'sync step 1' || messageType === 'sync step 2') {
|
if (messageType === 'sync step 1' || messageType === 'sync step 2') {
|
||||||
let auth = decoder.readVarUint();
|
let auth = decoder.readVarUint();
|
||||||
if (senderConn.auth == null) {
|
if (senderConn.auth == null) {
|
||||||
senderConn.processAfterAuth.push([sender, buffer]);
|
senderConn.processAfterAuth.push([messageType, senderConn, decoder, encoder, sender]);
|
||||||
|
|
||||||
// check auth
|
// check auth
|
||||||
return this.checkAuth(auth, this.y, sender).then(authPermissions => {
|
return this.checkAuth(auth, this.y, sender).then(authPermissions => {
|
||||||
senderConn.auth = authPermissions;
|
if (senderConn.auth == null) {
|
||||||
this.y.emit('userAuthenticated', {
|
senderConn.auth = authPermissions;
|
||||||
user: senderConn.uid,
|
this.y.emit('userAuthenticated', {
|
||||||
auth: authPermissions
|
user: senderConn.uid,
|
||||||
});
|
auth: authPermissions
|
||||||
return senderConn.syncStep2.promise
|
});
|
||||||
}).then(() => {
|
|
||||||
if (senderConn.processAfterAuth == null) {
|
|
||||||
return Promise.resolve()
|
|
||||||
}
|
}
|
||||||
let messages = senderConn.processAfterAuth;
|
let messages = senderConn.processAfterAuth;
|
||||||
senderConn.processAfterAuth = null;
|
senderConn.processAfterAuth = [];
|
||||||
return Promise.all(messages.map(m =>
|
|
||||||
this.receiveMessage(m[0], m[1])
|
return messages.reduce((p, m) =>
|
||||||
))
|
p.then(() => this.computeMessage(m[0], m[1], m[2], m[3], m[4]))
|
||||||
|
, Promise.resolve())
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (senderConn.auth != null) {
|
||||||
|
return this.computeMessage(messageType, senderConn, decoder, encoder, sender)
|
||||||
|
} else {
|
||||||
|
senderConn.processAfterAuth.push([messageType, senderConn, decoder, encoder, sender]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
computeMessage (messageType, senderConn, decoder, encoder, sender) {
|
||||||
if (messageType === 'sync step 1' && (senderConn.auth === 'write' || senderConn.auth === 'read')) {
|
if (messageType === 'sync step 1' && (senderConn.auth === 'write' || senderConn.auth === 'read')) {
|
||||||
// cannot wait for sync step 1 to finish, because we may wait for sync step 2 in sync step 1 (->lock)
|
// cannot wait for sync step 1 to finish, because we may wait for sync step 2 in sync step 1 (->lock)
|
||||||
computeMessageSyncStep1(decoder, encoder, this, senderConn, sender);
|
computeMessageSyncStep1(decoder, encoder, this, senderConn, sender);
|
||||||
@@ -873,19 +867,20 @@ function extendConnector (Y/* :any */) {
|
|||||||
} else if (messageType === 'update' && senderConn.auth === 'write') {
|
} else if (messageType === 'update' && senderConn.auth === 'write') {
|
||||||
return computeMessageUpdate(decoder, encoder, this, senderConn, sender)
|
return computeMessageUpdate(decoder, encoder, this, senderConn, sender)
|
||||||
} else {
|
} else {
|
||||||
Promise.reject(new Error('Unable to receive message'));
|
return Promise.reject(new Error('Unable to receive message'))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
_setSyncedWith (user) {
|
_setSyncedWith (user) {
|
||||||
var conn = this.connections.get(user);
|
if (user != null) {
|
||||||
if (conn != null) {
|
this.connections.get(user).isSynced = true;
|
||||||
conn.isSynced = true;
|
|
||||||
}
|
}
|
||||||
if (user === this.currentSyncTarget) {
|
let conns = Array.from(this.connections.values());
|
||||||
this.currentSyncTarget = null;
|
if (conns.length > 0 && conns.every(u => u.isSynced)) {
|
||||||
this.findNextSyncTarget();
|
this._fireIsSyncedListeners();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
Currently, the HB encodes operations as JSON. For the moment I want to keep it
|
Currently, the HB encodes operations as JSON. For the moment I want to keep it
|
||||||
that way. Maybe we support encoding in the HB as XML in the future, but for now I don't want
|
that way. Maybe we support encoding in the HB as XML in the future, but for now I don't want
|
||||||
|
|||||||
File diff suppressed because one or more lines are too long
Reference in New Issue
Block a user