Compare commits

..

1 Commits

Author SHA1 Message Date
Kevin Jahns
306789ddc1 v13.0.0-8 -- distribution files 2017-07-31 16:07:35 +02:00
10 changed files with 175 additions and 168 deletions

View File

@@ -4,8 +4,8 @@
<!-- quill does not include dist files! We are using the hosted version instead --> <!-- quill does not include dist files! We are using the hosted version instead -->
<!--link rel="stylesheet" href="../bower_components/quill/dist/quill.snow.css" /--> <!--link rel="stylesheet" href="../bower_components/quill/dist/quill.snow.css" /-->
<link href="https://cdn.quilljs.com/1.0.4/quill.snow.css" rel="stylesheet"> <link href="https://cdn.quilljs.com/1.0.4/quill.snow.css" rel="stylesheet">
<link href="https://cdnjs.cloudflare.com/ajax/libs/KaTeX/0.5.1/katex.min.css" rel="stylesheet"> <link href="//cdnjs.cloudflare.com/ajax/libs/KaTeX/0.5.1/katex.min.css" rel="stylesheet">
<link href="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/9.2.0/styles/monokai-sublime.min.css" rel="stylesheet"> <link href="//cdnjs.cloudflare.com/ajax/libs/highlight.js/9.2.0/styles/monokai-sublime.min.css" rel="stylesheet">
<style> <style>
#quill-container { #quill-container {
border: 1px solid gray; border: 1px solid gray;
@@ -19,17 +19,13 @@
</div> </div>
</div> </div>
<script src="https://cdnjs.cloudflare.com/ajax/libs/KaTeX/0.5.1/katex.min.js" type="text/javascript"></script> <script src="//cdnjs.cloudflare.com/ajax/libs/KaTeX/0.5.1/katex.min.js" type="text/javascript"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/9.2.0/highlight.min.js" type="text/javascript"></script> <script src="//cdnjs.cloudflare.com/ajax/libs/highlight.js/9.2.0/highlight.min.js" type="text/javascript"></script>
<script src="https://cdn.quilljs.com/1.0.4/quill.js"></script> <script src="https://cdn.quilljs.com/1.0.4/quill.js"></script>
<!-- quill does not include dist files! We are using the hosted version instead (see above) <!-- quill does not include dist files! We are using the hosted version instead (see above)
<script src="../bower_components/quill/dist/quill.js"></script> <script src="../bower_components/quill/dist/quill.js"></script>
--> -->
<script src="../../y.js"></script> <script src="../bower_components/yjs/y.js"></script>
<script src="../../../y-array/y-array.js"></script>
<script src="../../../y-richtext/dist/y-richtext.js"></script>
<script src="../../../y-memory/y-memory.js"></script>
<script src="../../../y-websockets-client/y-websockets-client.js"></script>
<script src="./index.js"></script> <script src="./index.js"></script>
</body> </body>
</html> </html>

View File

@@ -8,8 +8,7 @@ Y({
}, },
connector: { connector: {
name: 'websockets-client', name: 'websockets-client',
room: 'richtext-example-quill-1.0-test', room: 'richtext-example-quill-1.0-test'
url: 'http://localhost:1234'
}, },
sourceDir: '/bower_components', sourceDir: '/bower_components',
share: { share: {

View File

@@ -1,6 +1,6 @@
{ {
"name": "yjs", "name": "yjs",
"version": "13.0.0-11", "version": "13.0.0-8",
"description": "A framework for real-time p2p shared editing on any data", "description": "A framework for real-time p2p shared editing on any data",
"main": "./y.node.js", "main": "./y.node.js",
"browser": "./y.js", "browser": "./y.js",

View File

@@ -43,12 +43,10 @@ export default function extendConnector (Y/* :any */) {
this.setUserId(Y.utils.generateUserId()) this.setUserId(Y.utils.generateUserId())
} }
} }
reconnect () { reconnect () {
this.log('reconnecting..') this.log('reconnecting..')
return this.y.db.startGarbageCollector() return this.y.db.startGarbageCollector()
} }
disconnect () { disconnect () {
this.log('discronnecting..') this.log('discronnecting..')
this.connections = new Map() this.connections = new Map()
@@ -58,16 +56,13 @@ export default function extendConnector (Y/* :any */) {
this.y.db.stopGarbageCollector() this.y.db.stopGarbageCollector()
return this.y.db.whenTransactionsFinished() return this.y.db.whenTransactionsFinished()
} }
repair () { repair () {
this.log('Repairing the state of Yjs. This can happen if messages get lost, and Yjs detects that something is wrong. If this happens often, please report an issue here: https://github.com/y-js/yjs/issues') this.log('Repairing the state of Yjs. This can happen if messages get lost, and Yjs detects that something is wrong. If this happens often, please report an issue here: https://github.com/y-js/yjs/issues')
this.connections.forEach(user => { user.isSynced = false })
this.isSynced = false this.isSynced = false
this.connections.forEach((user, userId) => { this.currentSyncTarget = null
user.isSynced = false this.findNextSyncTarget()
this._syncWithUser(userId)
})
} }
setUserId (userId) { setUserId (userId) {
if (this.userId == null) { if (this.userId == null) {
if (!Number.isInteger(userId)) { if (!Number.isInteger(userId)) {
@@ -82,21 +77,20 @@ export default function extendConnector (Y/* :any */) {
return null return null
} }
} }
onUserEvent (f) { onUserEvent (f) {
this.userEventListeners.push(f) this.userEventListeners.push(f)
} }
removeUserEventListener (f) { removeUserEventListener (f) {
this.userEventListeners = this.userEventListeners.filter(g => f !== g) this.userEventListeners = this.userEventListeners.filter(g => f !== g)
} }
userLeft (user) { userLeft (user) {
if (this.connections.has(user)) { if (this.connections.has(user)) {
this.log('%s: User left %s', this.userId, user) this.log('%s: User left %s', this.userId, user)
this.connections.delete(user) this.connections.delete(user)
// check if isSynced event can be sent now if (user === this.currentSyncTarget) {
this._setSyncedWith(null) this.currentSyncTarget = null
this.findNextSyncTarget()
}
for (var f of this.userEventListeners) { for (var f of this.userEventListeners) {
f({ f({
action: 'userLeft', action: 'userLeft',
@@ -105,7 +99,7 @@ export default function extendConnector (Y/* :any */) {
} }
} }
} }
userJoined (user, role, auth) { userJoined (user, role) {
if (role == null) { if (role == null) {
throw new Error('You must specify the role of the joined user!') throw new Error('You must specify the role of the joined user!')
} }
@@ -118,7 +112,7 @@ export default function extendConnector (Y/* :any */) {
isSynced: false, isSynced: false,
role: role, role: role,
processAfterAuth: [], processAfterAuth: [],
auth: auth || null, auth: null,
receivedSyncStep2: false receivedSyncStep2: false
}) })
let defer = {} let defer = {}
@@ -131,7 +125,9 @@ export default function extendConnector (Y/* :any */) {
role: role role: role
}) })
} }
this._syncWithUser(user) if (this.currentSyncTarget == null) {
this.findNextSyncTarget()
}
} }
// Execute a function _when_ we are connected. // Execute a function _when_ we are connected.
// If not connected, wait until connected // If not connected, wait until connected
@@ -142,25 +138,39 @@ export default function extendConnector (Y/* :any */) {
this.whenSyncedListeners.push(f) this.whenSyncedListeners.push(f)
} }
} }
_syncWithUser (userid) { findNextSyncTarget () {
if (this.role === 'slave') { if (this.currentSyncTarget != null || this.role === 'slave') {
return // "The current sync has not finished or this is controlled by a master!" return // "The current sync has not finished or this is controlled by a master!"
} }
sendSyncStep1(this, userid)
} var syncUser = null
_fireIsSyncedListeners () { for (var [uid, user] of this.connections) {
this.y.db.whenTransactionsFinished().then(() => { if (!user.isSynced) {
if (!this.isSynced) { syncUser = uid
this.isSynced = true break
// It is safer to remove this!
// TODO: remove: yield * this.garbageCollectAfterSync()
// call whensynced listeners
for (var f of this.whenSyncedListeners) {
f()
}
this.whenSyncedListeners = []
} }
}) }
var conn = this
if (syncUser != null) {
this.currentSyncTarget = syncUser
sendSyncStep1(this, syncUser)
} else {
if (!conn.isSynced) {
this.y.db.requestTransaction(function * () {
if (!conn.isSynced) {
// it is crucial that isSynced is set at the time garbageCollectAfterSync is called
conn.isSynced = true
// It is safer to remove this!
// TODO: remove: yield * this.garbageCollectAfterSync()
// call whensynced listeners
for (var f of conn.whenSyncedListeners) {
f()
}
conn.whenSyncedListeners = []
}
})
}
}
} }
send (uid, buffer) { send (uid, buffer) {
if (!(buffer instanceof ArrayBuffer || buffer instanceof Uint8Array)) { if (!(buffer instanceof ArrayBuffer || buffer instanceof Uint8Array)) {
@@ -210,12 +220,12 @@ export default function extendConnector (Y/* :any */) {
/* /*
You received a raw message, and you know that it is intended for Yjs. Then call this function. You received a raw message, and you know that it is intended for Yjs. Then call this function.
*/ */
receiveMessage (sender, buffer) { async receiveMessage (sender, buffer) {
if (!(buffer instanceof ArrayBuffer || buffer instanceof Uint8Array)) { if (!(buffer instanceof ArrayBuffer || buffer instanceof Uint8Array)) {
return Promise.reject(new Error('Expected Message to be an ArrayBuffer or Uint8Array!')) throw new Error('Expected Message to be an ArrayBuffer or Uint8Array!')
} }
if (sender === this.userId) { if (sender === this.userId) {
return Promise.resolve() return
} }
let decoder = new BinaryDecoder(buffer) let decoder = new BinaryDecoder(buffer)
let encoder = new BinaryEncoder() let encoder = new BinaryEncoder()
@@ -234,33 +244,31 @@ export default function extendConnector (Y/* :any */) {
if (messageType === 'sync step 1' || messageType === 'sync step 2') { if (messageType === 'sync step 1' || messageType === 'sync step 2') {
let auth = decoder.readVarUint() let auth = decoder.readVarUint()
if (senderConn.auth == null) { if (senderConn.auth == null) {
senderConn.processAfterAuth.push([messageType, senderConn, decoder, encoder, sender])
// check auth // check auth
return this.checkAuth(auth, this.y, sender).then(authPermissions => { let authPermissions = await this.checkAuth(auth, this.y, sender)
if (senderConn.auth == null) { senderConn.auth = authPermissions
senderConn.auth = authPermissions this.y.emit('userAuthenticated', {
this.y.emit('userAuthenticated', { user: senderConn.uid,
user: senderConn.uid, auth: authPermissions
auth: authPermissions })
}) senderConn.syncStep2.promise.then(() => {
if (senderConn.processAfterAuth == null) {
return
} }
let messages = senderConn.processAfterAuth for (let i = 0; i < senderConn.processAfterAuth.length; i++) {
senderConn.processAfterAuth = [] let m = senderConn.processAfterAuth[i]
this.receiveMessage(m[0], m[1])
return messages.reduce((p, m) => }
p.then(() => this.computeMessage(m[0], m[1], m[2], m[3], m[4])) senderConn.processAfterAuth = null
, Promise.resolve())
}) })
} }
} }
if (senderConn.auth != null) {
return this.computeMessage(messageType, senderConn, decoder, encoder, sender)
} else {
senderConn.processAfterAuth.push([messageType, senderConn, decoder, encoder, sender])
}
}
computeMessage (messageType, senderConn, decoder, encoder, sender) { if (senderConn.auth == null) {
senderConn.processAfterAuth.push([sender, buffer])
return
}
if (messageType === 'sync step 1' && (senderConn.auth === 'write' || senderConn.auth === 'read')) { if (messageType === 'sync step 1' && (senderConn.auth === 'write' || senderConn.auth === 'read')) {
// cannot wait for sync step 1 to finish, because we may wait for sync step 2 in sync step 1 (->lock) // cannot wait for sync step 1 to finish, because we may wait for sync step 2 in sync step 1 (->lock)
computeMessageSyncStep1(decoder, encoder, this, senderConn, sender) computeMessageSyncStep1(decoder, encoder, this, senderConn, sender)
@@ -270,20 +278,19 @@ export default function extendConnector (Y/* :any */) {
} else if (messageType === 'update' && senderConn.auth === 'write') { } else if (messageType === 'update' && senderConn.auth === 'write') {
return computeMessageUpdate(decoder, encoder, this, senderConn, sender) return computeMessageUpdate(decoder, encoder, this, senderConn, sender)
} else { } else {
return Promise.reject(new Error('Unable to receive message')) console.error('Unable to receive message')
} }
} }
_setSyncedWith (user) { _setSyncedWith (user) {
if (user != null) { var conn = this.connections.get(user)
this.connections.get(user).isSynced = true if (conn != null) {
conn.isSynced = true
} }
let conns = Array.from(this.connections.values()) if (user === this.currentSyncTarget) {
if (conns.length > 0 && conns.every(u => u.isSynced)) { this.currentSyncTarget = null
this._fireIsSyncedListeners() this.findNextSyncTarget()
} }
} }
/* /*
Currently, the HB encodes operations as JSON. For the moment I want to keep it Currently, the HB encodes operations as JSON. For the moment I want to keep it
that way. Maybe we support encoding in the HB as XML in the future, but for now I don't want that way. Maybe we support encoding in the HB as XML in the future, but for now I don't want

View File

@@ -26,14 +26,14 @@ export function formatYjsMessageType (buffer) {
return decoder.readVarString() return decoder.readVarString()
} }
export function logMessageUpdate (decoder, strBuilder) { export async function logMessageUpdate (decoder, strBuilder) {
let len = decoder.readUint32() let len = decoder.readUint32()
for (let i = 0; i < len; i++) { for (let i = 0; i < len; i++) {
strBuilder.push(JSON.stringify(Y.Struct.binaryDecodeOperation(decoder)) + '\n') strBuilder.push(JSON.stringify(Y.Struct.binaryDecodeOperation(decoder)) + '\n')
} }
} }
export function computeMessageUpdate (decoder, encoder, conn) { export async function computeMessageUpdate (decoder, encoder, conn) {
if (conn.y.db.forwardAppliedOperations) { if (conn.y.db.forwardAppliedOperations) {
let messagePosition = decoder.pos let messagePosition = decoder.pos
let len = decoder.readUint32() let len = decoder.readUint32()
@@ -78,7 +78,7 @@ export function logMessageSyncStep1 (decoder, strBuilder) {
logSS(decoder, strBuilder) logSS(decoder, strBuilder)
} }
export function computeMessageSyncStep1 (decoder, encoder, conn, senderConn, sender) { export async function computeMessageSyncStep1 (decoder, encoder, conn, senderConn, sender) {
let protocolVersion = decoder.readVarUint() let protocolVersion = decoder.readVarUint()
let preferUntransformed = decoder.readUint8() === 1 let preferUntransformed = decoder.readUint8() === 1
@@ -112,7 +112,7 @@ export function computeMessageSyncStep1 (decoder, encoder, conn, senderConn, sen
if (conn.role === 'slave') { if (conn.role === 'slave') {
sendSyncStep1(conn, sender) sendSyncStep1(conn, sender)
} }
return conn.y.db.whenTransactionsFinished() await conn.y.db.whenTransactionsFinished()
} }
export function logSS (decoder, strBuilder) { export function logSS (decoder, strBuilder) {
@@ -161,7 +161,7 @@ export function logMessageSyncStep2 (decoder, strBuilder) {
logDS(decoder, strBuilder) logDS(decoder, strBuilder)
} }
export function computeMessageSyncStep2 (decoder, encoder, conn, senderConn, sender) { export async function computeMessageSyncStep2 (decoder, encoder, conn, senderConn, sender) {
var db = conn.y.db var db = conn.y.db
let defer = senderConn.syncStep2 let defer = senderConn.syncStep2
@@ -178,8 +178,7 @@ export function computeMessageSyncStep2 (decoder, encoder, conn, senderConn, sen
db.requestTransaction(function * () { db.requestTransaction(function * () {
yield * this.applyDeleteSet(decoder) yield * this.applyDeleteSet(decoder)
}) })
return db.whenTransactionsFinished().then(() => { await db.whenTransactionsFinished()
conn._setSyncedWith(sender) conn._setSyncedWith(sender)
defer.resolve() defer.resolve()
})
} }

View File

@@ -141,7 +141,7 @@ export async function initArrays (t, opts) {
let connOpts let connOpts
if (i === 0) { if (i === 0) {
// Only one instance can gc! // Only one instance can gc!
dbOpts = Object.assign({ gc: false }, opts.db) dbOpts = Object.assign({ gc: true }, opts.db)
connOpts = Object.assign({ role: 'master' }, connector) connOpts = Object.assign({ role: 'master' }, connector)
} else { } else {
dbOpts = Object.assign({ gc: false }, opts.db) dbOpts = Object.assign({ gc: false }, opts.db)

10
y.js

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

156
y.node.js
View File

@@ -1,7 +1,7 @@
/** /**
* yjs - A framework for real-time p2p shared editing on any data * yjs - A framework for real-time p2p shared editing on any data
* @version v13.0.0-11 * @version v13.0.0-8
* @license MIT * @license MIT
*/ */
@@ -440,14 +440,14 @@ function formatYjsMessageType (buffer) {
return decoder.readVarString() return decoder.readVarString()
} }
function logMessageUpdate (decoder, strBuilder) { async function logMessageUpdate (decoder, strBuilder) {
let len = decoder.readUint32(); let len = decoder.readUint32();
for (let i = 0; i < len; i++) { for (let i = 0; i < len; i++) {
strBuilder.push(JSON.stringify(Y.Struct.binaryDecodeOperation(decoder)) + '\n'); strBuilder.push(JSON.stringify(Y.Struct.binaryDecodeOperation(decoder)) + '\n');
} }
} }
function computeMessageUpdate (decoder, encoder, conn) { async function computeMessageUpdate (decoder, encoder, conn) {
if (conn.y.db.forwardAppliedOperations) { if (conn.y.db.forwardAppliedOperations) {
let messagePosition = decoder.pos; let messagePosition = decoder.pos;
let len = decoder.readUint32(); let len = decoder.readUint32();
@@ -492,7 +492,7 @@ function logMessageSyncStep1 (decoder, strBuilder) {
logSS(decoder, strBuilder); logSS(decoder, strBuilder);
} }
function computeMessageSyncStep1 (decoder, encoder, conn, senderConn, sender) { async function computeMessageSyncStep1 (decoder, encoder, conn, senderConn, sender) {
let protocolVersion = decoder.readVarUint(); let protocolVersion = decoder.readVarUint();
let preferUntransformed = decoder.readUint8() === 1; let preferUntransformed = decoder.readUint8() === 1;
@@ -526,7 +526,7 @@ function computeMessageSyncStep1 (decoder, encoder, conn, senderConn, sender) {
if (conn.role === 'slave') { if (conn.role === 'slave') {
sendSyncStep1(conn, sender); sendSyncStep1(conn, sender);
} }
return conn.y.db.whenTransactionsFinished() await conn.y.db.whenTransactionsFinished();
} }
function logSS (decoder, strBuilder) { function logSS (decoder, strBuilder) {
@@ -575,7 +575,7 @@ function logMessageSyncStep2 (decoder, strBuilder) {
logDS(decoder, strBuilder); logDS(decoder, strBuilder);
} }
function computeMessageSyncStep2 (decoder, encoder, conn, senderConn, sender) { async function computeMessageSyncStep2 (decoder, encoder, conn, senderConn, sender) {
var db = conn.y.db; var db = conn.y.db;
let defer = senderConn.syncStep2; let defer = senderConn.syncStep2;
@@ -592,10 +592,9 @@ function computeMessageSyncStep2 (decoder, encoder, conn, senderConn, sender) {
db.requestTransaction(function * () { db.requestTransaction(function * () {
yield * this.applyDeleteSet(decoder); yield * this.applyDeleteSet(decoder);
}); });
return db.whenTransactionsFinished().then(() => { await db.whenTransactionsFinished();
conn._setSyncedWith(sender); conn._setSyncedWith(sender);
defer.resolve(); defer.resolve();
})
} }
function extendConnector (Y/* :any */) { function extendConnector (Y/* :any */) {
@@ -640,12 +639,10 @@ function extendConnector (Y/* :any */) {
this.setUserId(Y.utils.generateUserId()); this.setUserId(Y.utils.generateUserId());
} }
} }
reconnect () { reconnect () {
this.log('reconnecting..'); this.log('reconnecting..');
return this.y.db.startGarbageCollector() return this.y.db.startGarbageCollector()
} }
disconnect () { disconnect () {
this.log('discronnecting..'); this.log('discronnecting..');
this.connections = new Map(); this.connections = new Map();
@@ -655,16 +652,13 @@ function extendConnector (Y/* :any */) {
this.y.db.stopGarbageCollector(); this.y.db.stopGarbageCollector();
return this.y.db.whenTransactionsFinished() return this.y.db.whenTransactionsFinished()
} }
repair () { repair () {
this.log('Repairing the state of Yjs. This can happen if messages get lost, and Yjs detects that something is wrong. If this happens often, please report an issue here: https://github.com/y-js/yjs/issues'); this.log('Repairing the state of Yjs. This can happen if messages get lost, and Yjs detects that something is wrong. If this happens often, please report an issue here: https://github.com/y-js/yjs/issues');
this.connections.forEach(user => { user.isSynced = false; });
this.isSynced = false; this.isSynced = false;
this.connections.forEach((user, userId) => { this.currentSyncTarget = null;
user.isSynced = false; this.findNextSyncTarget();
this._syncWithUser(userId);
});
} }
setUserId (userId) { setUserId (userId) {
if (this.userId == null) { if (this.userId == null) {
if (!Number.isInteger(userId)) { if (!Number.isInteger(userId)) {
@@ -679,21 +673,20 @@ function extendConnector (Y/* :any */) {
return null return null
} }
} }
onUserEvent (f) { onUserEvent (f) {
this.userEventListeners.push(f); this.userEventListeners.push(f);
} }
removeUserEventListener (f) { removeUserEventListener (f) {
this.userEventListeners = this.userEventListeners.filter(g => f !== g); this.userEventListeners = this.userEventListeners.filter(g => f !== g);
} }
userLeft (user) { userLeft (user) {
if (this.connections.has(user)) { if (this.connections.has(user)) {
this.log('%s: User left %s', this.userId, user); this.log('%s: User left %s', this.userId, user);
this.connections.delete(user); this.connections.delete(user);
// check if isSynced event can be sent now if (user === this.currentSyncTarget) {
this._setSyncedWith(null); this.currentSyncTarget = null;
this.findNextSyncTarget();
}
for (var f of this.userEventListeners) { for (var f of this.userEventListeners) {
f({ f({
action: 'userLeft', action: 'userLeft',
@@ -702,7 +695,7 @@ function extendConnector (Y/* :any */) {
} }
} }
} }
userJoined (user, role, auth) { userJoined (user, role) {
if (role == null) { if (role == null) {
throw new Error('You must specify the role of the joined user!') throw new Error('You must specify the role of the joined user!')
} }
@@ -715,7 +708,7 @@ function extendConnector (Y/* :any */) {
isSynced: false, isSynced: false,
role: role, role: role,
processAfterAuth: [], processAfterAuth: [],
auth: auth || null, auth: null,
receivedSyncStep2: false receivedSyncStep2: false
}); });
let defer = {}; let defer = {};
@@ -728,7 +721,9 @@ function extendConnector (Y/* :any */) {
role: role role: role
}); });
} }
this._syncWithUser(user); if (this.currentSyncTarget == null) {
this.findNextSyncTarget();
}
} }
// Execute a function _when_ we are connected. // Execute a function _when_ we are connected.
// If not connected, wait until connected // If not connected, wait until connected
@@ -739,25 +734,39 @@ function extendConnector (Y/* :any */) {
this.whenSyncedListeners.push(f); this.whenSyncedListeners.push(f);
} }
} }
_syncWithUser (userid) { findNextSyncTarget () {
if (this.role === 'slave') { if (this.currentSyncTarget != null || this.role === 'slave') {
return // "The current sync has not finished or this is controlled by a master!" return // "The current sync has not finished or this is controlled by a master!"
} }
sendSyncStep1(this, userid);
} var syncUser = null;
_fireIsSyncedListeners () { for (var [uid, user] of this.connections) {
this.y.db.whenTransactionsFinished().then(() => { if (!user.isSynced) {
if (!this.isSynced) { syncUser = uid;
this.isSynced = true; break
// It is safer to remove this!
// TODO: remove: yield * this.garbageCollectAfterSync()
// call whensynced listeners
for (var f of this.whenSyncedListeners) {
f();
}
this.whenSyncedListeners = [];
} }
}); }
var conn = this;
if (syncUser != null) {
this.currentSyncTarget = syncUser;
sendSyncStep1(this, syncUser);
} else {
if (!conn.isSynced) {
this.y.db.requestTransaction(function * () {
if (!conn.isSynced) {
// it is crucial that isSynced is set at the time garbageCollectAfterSync is called
conn.isSynced = true;
// It is safer to remove this!
// TODO: remove: yield * this.garbageCollectAfterSync()
// call whensynced listeners
for (var f of conn.whenSyncedListeners) {
f();
}
conn.whenSyncedListeners = [];
}
});
}
}
} }
send (uid, buffer) { send (uid, buffer) {
if (!(buffer instanceof ArrayBuffer || buffer instanceof Uint8Array)) { if (!(buffer instanceof ArrayBuffer || buffer instanceof Uint8Array)) {
@@ -807,12 +816,12 @@ function extendConnector (Y/* :any */) {
/* /*
You received a raw message, and you know that it is intended for Yjs. Then call this function. You received a raw message, and you know that it is intended for Yjs. Then call this function.
*/ */
receiveMessage (sender, buffer) { async receiveMessage (sender, buffer) {
if (!(buffer instanceof ArrayBuffer || buffer instanceof Uint8Array)) { if (!(buffer instanceof ArrayBuffer || buffer instanceof Uint8Array)) {
return Promise.reject(new Error('Expected Message to be an ArrayBuffer or Uint8Array!')) throw new Error('Expected Message to be an ArrayBuffer or Uint8Array!')
} }
if (sender === this.userId) { if (sender === this.userId) {
return Promise.resolve() return
} }
let decoder = new BinaryDecoder(buffer); let decoder = new BinaryDecoder(buffer);
let encoder = new BinaryEncoder(); let encoder = new BinaryEncoder();
@@ -831,33 +840,31 @@ function extendConnector (Y/* :any */) {
if (messageType === 'sync step 1' || messageType === 'sync step 2') { if (messageType === 'sync step 1' || messageType === 'sync step 2') {
let auth = decoder.readVarUint(); let auth = decoder.readVarUint();
if (senderConn.auth == null) { if (senderConn.auth == null) {
senderConn.processAfterAuth.push([messageType, senderConn, decoder, encoder, sender]);
// check auth // check auth
return this.checkAuth(auth, this.y, sender).then(authPermissions => { let authPermissions = await this.checkAuth(auth, this.y, sender);
if (senderConn.auth == null) { senderConn.auth = authPermissions;
senderConn.auth = authPermissions; this.y.emit('userAuthenticated', {
this.y.emit('userAuthenticated', { user: senderConn.uid,
user: senderConn.uid, auth: authPermissions
auth: authPermissions });
}); senderConn.syncStep2.promise.then(() => {
if (senderConn.processAfterAuth == null) {
return
} }
let messages = senderConn.processAfterAuth; for (let i = 0; i < senderConn.processAfterAuth.length; i++) {
senderConn.processAfterAuth = []; let m = senderConn.processAfterAuth[i];
this.receiveMessage(m[0], m[1]);
return messages.reduce((p, m) => }
p.then(() => this.computeMessage(m[0], m[1], m[2], m[3], m[4])) senderConn.processAfterAuth = null;
, Promise.resolve()) });
})
} }
} }
if (senderConn.auth != null) {
return this.computeMessage(messageType, senderConn, decoder, encoder, sender)
} else {
senderConn.processAfterAuth.push([messageType, senderConn, decoder, encoder, sender]);
}
}
computeMessage (messageType, senderConn, decoder, encoder, sender) { if (senderConn.auth == null) {
senderConn.processAfterAuth.push([sender, buffer]);
return
}
if (messageType === 'sync step 1' && (senderConn.auth === 'write' || senderConn.auth === 'read')) { if (messageType === 'sync step 1' && (senderConn.auth === 'write' || senderConn.auth === 'read')) {
// cannot wait for sync step 1 to finish, because we may wait for sync step 2 in sync step 1 (->lock) // cannot wait for sync step 1 to finish, because we may wait for sync step 2 in sync step 1 (->lock)
computeMessageSyncStep1(decoder, encoder, this, senderConn, sender); computeMessageSyncStep1(decoder, encoder, this, senderConn, sender);
@@ -867,20 +874,19 @@ function extendConnector (Y/* :any */) {
} else if (messageType === 'update' && senderConn.auth === 'write') { } else if (messageType === 'update' && senderConn.auth === 'write') {
return computeMessageUpdate(decoder, encoder, this, senderConn, sender) return computeMessageUpdate(decoder, encoder, this, senderConn, sender)
} else { } else {
return Promise.reject(new Error('Unable to receive message')) console.error('Unable to receive message');
} }
} }
_setSyncedWith (user) { _setSyncedWith (user) {
if (user != null) { var conn = this.connections.get(user);
this.connections.get(user).isSynced = true; if (conn != null) {
conn.isSynced = true;
} }
let conns = Array.from(this.connections.values()); if (user === this.currentSyncTarget) {
if (conns.length > 0 && conns.every(u => u.isSynced)) { this.currentSyncTarget = null;
this._fireIsSyncedListeners(); this.findNextSyncTarget();
} }
} }
/* /*
Currently, the HB encodes operations as JSON. For the moment I want to keep it Currently, the HB encodes operations as JSON. For the moment I want to keep it
that way. Maybe we support encoding in the HB as XML in the future, but for now I don't want that way. Maybe we support encoding in the HB as XML in the future, but for now I don't want

File diff suppressed because one or more lines are too long