Compare commits
1 Commits
v13.0.0-12
...
v13.0.0-8
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
306789ddc1 |
@@ -4,8 +4,8 @@
|
||||
<!-- quill does not include dist files! We are using the hosted version instead -->
|
||||
<!--link rel="stylesheet" href="../bower_components/quill/dist/quill.snow.css" /-->
|
||||
<link href="https://cdn.quilljs.com/1.0.4/quill.snow.css" rel="stylesheet">
|
||||
<link href="https://cdnjs.cloudflare.com/ajax/libs/KaTeX/0.5.1/katex.min.css" rel="stylesheet">
|
||||
<link href="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/9.2.0/styles/monokai-sublime.min.css" rel="stylesheet">
|
||||
<link href="//cdnjs.cloudflare.com/ajax/libs/KaTeX/0.5.1/katex.min.css" rel="stylesheet">
|
||||
<link href="//cdnjs.cloudflare.com/ajax/libs/highlight.js/9.2.0/styles/monokai-sublime.min.css" rel="stylesheet">
|
||||
<style>
|
||||
#quill-container {
|
||||
border: 1px solid gray;
|
||||
@@ -19,17 +19,13 @@
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<script src="https://cdnjs.cloudflare.com/ajax/libs/KaTeX/0.5.1/katex.min.js" type="text/javascript"></script>
|
||||
<script src="https://cdnjs.cloudflare.com/ajax/libs/highlight.js/9.2.0/highlight.min.js" type="text/javascript"></script>
|
||||
<script src="//cdnjs.cloudflare.com/ajax/libs/KaTeX/0.5.1/katex.min.js" type="text/javascript"></script>
|
||||
<script src="//cdnjs.cloudflare.com/ajax/libs/highlight.js/9.2.0/highlight.min.js" type="text/javascript"></script>
|
||||
<script src="https://cdn.quilljs.com/1.0.4/quill.js"></script>
|
||||
<!-- quill does not include dist files! We are using the hosted version instead (see above)
|
||||
<script src="../bower_components/quill/dist/quill.js"></script>
|
||||
-->
|
||||
<script src="../../y.js"></script>
|
||||
<script src="../../../y-array/y-array.js"></script>
|
||||
<script src="../../../y-richtext/dist/y-richtext.js"></script>
|
||||
<script src="../../../y-memory/y-memory.js"></script>
|
||||
<script src="../../../y-websockets-client/y-websockets-client.js"></script>
|
||||
<script src="../bower_components/yjs/y.js"></script>
|
||||
<script src="./index.js"></script>
|
||||
</body>
|
||||
</html>
|
||||
|
||||
@@ -8,8 +8,7 @@ Y({
|
||||
},
|
||||
connector: {
|
||||
name: 'websockets-client',
|
||||
room: 'richtext-example-quill-1.0-test',
|
||||
url: 'http://localhost:1234'
|
||||
room: 'richtext-example-quill-1.0-test'
|
||||
},
|
||||
sourceDir: '/bower_components',
|
||||
share: {
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "yjs",
|
||||
"version": "13.0.0-12",
|
||||
"version": "13.0.0-8",
|
||||
"description": "A framework for real-time p2p shared editing on any data",
|
||||
"main": "./y.node.js",
|
||||
"browser": "./y.js",
|
||||
|
||||
135
src/Connector.js
135
src/Connector.js
@@ -43,12 +43,10 @@ export default function extendConnector (Y/* :any */) {
|
||||
this.setUserId(Y.utils.generateUserId())
|
||||
}
|
||||
}
|
||||
|
||||
reconnect () {
|
||||
this.log('reconnecting..')
|
||||
return this.y.db.startGarbageCollector()
|
||||
}
|
||||
|
||||
disconnect () {
|
||||
this.log('discronnecting..')
|
||||
this.connections = new Map()
|
||||
@@ -58,16 +56,13 @@ export default function extendConnector (Y/* :any */) {
|
||||
this.y.db.stopGarbageCollector()
|
||||
return this.y.db.whenTransactionsFinished()
|
||||
}
|
||||
|
||||
repair () {
|
||||
this.log('Repairing the state of Yjs. This can happen if messages get lost, and Yjs detects that something is wrong. If this happens often, please report an issue here: https://github.com/y-js/yjs/issues')
|
||||
this.connections.forEach(user => { user.isSynced = false })
|
||||
this.isSynced = false
|
||||
this.connections.forEach((user, userId) => {
|
||||
user.isSynced = false
|
||||
this._syncWithUser(userId)
|
||||
})
|
||||
this.currentSyncTarget = null
|
||||
this.findNextSyncTarget()
|
||||
}
|
||||
|
||||
setUserId (userId) {
|
||||
if (this.userId == null) {
|
||||
if (!Number.isInteger(userId)) {
|
||||
@@ -82,21 +77,20 @@ export default function extendConnector (Y/* :any */) {
|
||||
return null
|
||||
}
|
||||
}
|
||||
|
||||
onUserEvent (f) {
|
||||
this.userEventListeners.push(f)
|
||||
}
|
||||
|
||||
removeUserEventListener (f) {
|
||||
this.userEventListeners = this.userEventListeners.filter(g => f !== g)
|
||||
}
|
||||
|
||||
userLeft (user) {
|
||||
if (this.connections.has(user)) {
|
||||
this.log('%s: User left %s', this.userId, user)
|
||||
this.connections.delete(user)
|
||||
// check if isSynced event can be sent now
|
||||
this._setSyncedWith(null)
|
||||
if (user === this.currentSyncTarget) {
|
||||
this.currentSyncTarget = null
|
||||
this.findNextSyncTarget()
|
||||
}
|
||||
for (var f of this.userEventListeners) {
|
||||
f({
|
||||
action: 'userLeft',
|
||||
@@ -105,7 +99,7 @@ export default function extendConnector (Y/* :any */) {
|
||||
}
|
||||
}
|
||||
}
|
||||
userJoined (user, role, auth) {
|
||||
userJoined (user, role) {
|
||||
if (role == null) {
|
||||
throw new Error('You must specify the role of the joined user!')
|
||||
}
|
||||
@@ -118,7 +112,7 @@ export default function extendConnector (Y/* :any */) {
|
||||
isSynced: false,
|
||||
role: role,
|
||||
processAfterAuth: [],
|
||||
auth: auth || null,
|
||||
auth: null,
|
||||
receivedSyncStep2: false
|
||||
})
|
||||
let defer = {}
|
||||
@@ -131,7 +125,9 @@ export default function extendConnector (Y/* :any */) {
|
||||
role: role
|
||||
})
|
||||
}
|
||||
this._syncWithUser(user)
|
||||
if (this.currentSyncTarget == null) {
|
||||
this.findNextSyncTarget()
|
||||
}
|
||||
}
|
||||
// Execute a function _when_ we are connected.
|
||||
// If not connected, wait until connected
|
||||
@@ -142,25 +138,39 @@ export default function extendConnector (Y/* :any */) {
|
||||
this.whenSyncedListeners.push(f)
|
||||
}
|
||||
}
|
||||
_syncWithUser (userid) {
|
||||
if (this.role === 'slave') {
|
||||
findNextSyncTarget () {
|
||||
if (this.currentSyncTarget != null || this.role === 'slave') {
|
||||
return // "The current sync has not finished or this is controlled by a master!"
|
||||
}
|
||||
sendSyncStep1(this, userid)
|
||||
}
|
||||
_fireIsSyncedListeners () {
|
||||
this.y.db.whenTransactionsFinished().then(() => {
|
||||
if (!this.isSynced) {
|
||||
this.isSynced = true
|
||||
// It is safer to remove this!
|
||||
// TODO: remove: yield * this.garbageCollectAfterSync()
|
||||
// call whensynced listeners
|
||||
for (var f of this.whenSyncedListeners) {
|
||||
f()
|
||||
}
|
||||
this.whenSyncedListeners = []
|
||||
|
||||
var syncUser = null
|
||||
for (var [uid, user] of this.connections) {
|
||||
if (!user.isSynced) {
|
||||
syncUser = uid
|
||||
break
|
||||
}
|
||||
})
|
||||
}
|
||||
var conn = this
|
||||
if (syncUser != null) {
|
||||
this.currentSyncTarget = syncUser
|
||||
sendSyncStep1(this, syncUser)
|
||||
} else {
|
||||
if (!conn.isSynced) {
|
||||
this.y.db.requestTransaction(function * () {
|
||||
if (!conn.isSynced) {
|
||||
// it is crucial that isSynced is set at the time garbageCollectAfterSync is called
|
||||
conn.isSynced = true
|
||||
// It is safer to remove this!
|
||||
// TODO: remove: yield * this.garbageCollectAfterSync()
|
||||
// call whensynced listeners
|
||||
for (var f of conn.whenSyncedListeners) {
|
||||
f()
|
||||
}
|
||||
conn.whenSyncedListeners = []
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
send (uid, buffer) {
|
||||
if (!(buffer instanceof ArrayBuffer || buffer instanceof Uint8Array)) {
|
||||
@@ -210,12 +220,12 @@ export default function extendConnector (Y/* :any */) {
|
||||
/*
|
||||
You received a raw message, and you know that it is intended for Yjs. Then call this function.
|
||||
*/
|
||||
receiveMessage (sender, buffer) {
|
||||
async receiveMessage (sender, buffer) {
|
||||
if (!(buffer instanceof ArrayBuffer || buffer instanceof Uint8Array)) {
|
||||
return Promise.reject(new Error('Expected Message to be an ArrayBuffer or Uint8Array!'))
|
||||
throw new Error('Expected Message to be an ArrayBuffer or Uint8Array!')
|
||||
}
|
||||
if (sender === this.userId) {
|
||||
return Promise.resolve()
|
||||
return
|
||||
}
|
||||
let decoder = new BinaryDecoder(buffer)
|
||||
let encoder = new BinaryEncoder()
|
||||
@@ -234,33 +244,31 @@ export default function extendConnector (Y/* :any */) {
|
||||
if (messageType === 'sync step 1' || messageType === 'sync step 2') {
|
||||
let auth = decoder.readVarUint()
|
||||
if (senderConn.auth == null) {
|
||||
senderConn.processAfterAuth.push([messageType, senderConn, decoder, encoder, sender])
|
||||
// check auth
|
||||
return this.checkAuth(auth, this.y, sender).then(authPermissions => {
|
||||
if (senderConn.auth == null) {
|
||||
senderConn.auth = authPermissions
|
||||
this.y.emit('userAuthenticated', {
|
||||
user: senderConn.uid,
|
||||
auth: authPermissions
|
||||
})
|
||||
let authPermissions = await this.checkAuth(auth, this.y, sender)
|
||||
senderConn.auth = authPermissions
|
||||
this.y.emit('userAuthenticated', {
|
||||
user: senderConn.uid,
|
||||
auth: authPermissions
|
||||
})
|
||||
senderConn.syncStep2.promise.then(() => {
|
||||
if (senderConn.processAfterAuth == null) {
|
||||
return
|
||||
}
|
||||
let messages = senderConn.processAfterAuth
|
||||
senderConn.processAfterAuth = []
|
||||
|
||||
return messages.reduce((p, m) =>
|
||||
p.then(() => this.computeMessage(m[0], m[1], m[2], m[3], m[4]))
|
||||
, Promise.resolve())
|
||||
for (let i = 0; i < senderConn.processAfterAuth.length; i++) {
|
||||
let m = senderConn.processAfterAuth[i]
|
||||
this.receiveMessage(m[0], m[1])
|
||||
}
|
||||
senderConn.processAfterAuth = null
|
||||
})
|
||||
}
|
||||
}
|
||||
if (senderConn.auth != null) {
|
||||
return this.computeMessage(messageType, senderConn, decoder, encoder, sender)
|
||||
} else {
|
||||
senderConn.processAfterAuth.push([messageType, senderConn, decoder, encoder, sender])
|
||||
}
|
||||
}
|
||||
|
||||
computeMessage (messageType, senderConn, decoder, encoder, sender) {
|
||||
if (senderConn.auth == null) {
|
||||
senderConn.processAfterAuth.push([sender, buffer])
|
||||
return
|
||||
}
|
||||
|
||||
if (messageType === 'sync step 1' && (senderConn.auth === 'write' || senderConn.auth === 'read')) {
|
||||
// cannot wait for sync step 1 to finish, because we may wait for sync step 2 in sync step 1 (->lock)
|
||||
computeMessageSyncStep1(decoder, encoder, this, senderConn, sender)
|
||||
@@ -270,20 +278,19 @@ export default function extendConnector (Y/* :any */) {
|
||||
} else if (messageType === 'update' && senderConn.auth === 'write') {
|
||||
return computeMessageUpdate(decoder, encoder, this, senderConn, sender)
|
||||
} else {
|
||||
return Promise.reject(new Error('Unable to receive message'))
|
||||
console.error('Unable to receive message')
|
||||
}
|
||||
}
|
||||
|
||||
_setSyncedWith (user) {
|
||||
if (user != null) {
|
||||
this.connections.get(user).isSynced = true
|
||||
var conn = this.connections.get(user)
|
||||
if (conn != null) {
|
||||
conn.isSynced = true
|
||||
}
|
||||
let conns = Array.from(this.connections.values())
|
||||
if (conns.length > 0 && conns.every(u => u.isSynced)) {
|
||||
this._fireIsSyncedListeners()
|
||||
if (user === this.currentSyncTarget) {
|
||||
this.currentSyncTarget = null
|
||||
this.findNextSyncTarget()
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
Currently, the HB encodes operations as JSON. For the moment I want to keep it
|
||||
that way. Maybe we support encoding in the HB as XML in the future, but for now I don't want
|
||||
|
||||
@@ -26,14 +26,14 @@ export function formatYjsMessageType (buffer) {
|
||||
return decoder.readVarString()
|
||||
}
|
||||
|
||||
export function logMessageUpdate (decoder, strBuilder) {
|
||||
export async function logMessageUpdate (decoder, strBuilder) {
|
||||
let len = decoder.readUint32()
|
||||
for (let i = 0; i < len; i++) {
|
||||
strBuilder.push(JSON.stringify(Y.Struct.binaryDecodeOperation(decoder)) + '\n')
|
||||
}
|
||||
}
|
||||
|
||||
export function computeMessageUpdate (decoder, encoder, conn) {
|
||||
export async function computeMessageUpdate (decoder, encoder, conn) {
|
||||
if (conn.y.db.forwardAppliedOperations) {
|
||||
let messagePosition = decoder.pos
|
||||
let len = decoder.readUint32()
|
||||
@@ -78,7 +78,7 @@ export function logMessageSyncStep1 (decoder, strBuilder) {
|
||||
logSS(decoder, strBuilder)
|
||||
}
|
||||
|
||||
export function computeMessageSyncStep1 (decoder, encoder, conn, senderConn, sender) {
|
||||
export async function computeMessageSyncStep1 (decoder, encoder, conn, senderConn, sender) {
|
||||
let protocolVersion = decoder.readVarUint()
|
||||
let preferUntransformed = decoder.readUint8() === 1
|
||||
|
||||
@@ -92,30 +92,27 @@ export function computeMessageSyncStep1 (decoder, encoder, conn, senderConn, sen
|
||||
conn.y.destroy()
|
||||
}
|
||||
|
||||
return conn.y.db.whenTransactionsFinished().then(() => {
|
||||
// send sync step 2
|
||||
conn.y.db.requestTransaction(function * () {
|
||||
encoder.writeVarString('sync step 2')
|
||||
encoder.writeVarString(conn.authInfo || '')
|
||||
// send sync step 2
|
||||
conn.y.db.requestTransaction(function * () {
|
||||
encoder.writeVarString('sync step 2')
|
||||
encoder.writeVarString(conn.authInfo || '')
|
||||
|
||||
if (preferUntransformed) {
|
||||
encoder.writeUint8(1)
|
||||
yield * this.writeOperationsUntransformed(encoder)
|
||||
} else {
|
||||
encoder.writeUint8(0)
|
||||
yield * this.writeOperations(encoder, decoder)
|
||||
}
|
||||
if (preferUntransformed) {
|
||||
encoder.writeUint8(1)
|
||||
yield * this.writeOperationsUntransformed(encoder)
|
||||
} else {
|
||||
encoder.writeUint8(0)
|
||||
yield * this.writeOperations(encoder, decoder)
|
||||
}
|
||||
|
||||
yield * this.writeDeleteSet(encoder)
|
||||
conn.send(senderConn.uid, encoder.createBuffer())
|
||||
senderConn.receivedSyncStep2 = true
|
||||
})
|
||||
return conn.y.db.whenTransactionsFinished().then(() => {
|
||||
if (conn.role === 'slave') {
|
||||
sendSyncStep1(conn, sender)
|
||||
}
|
||||
})
|
||||
yield * this.writeDeleteSet(encoder)
|
||||
conn.send(senderConn.uid, encoder.createBuffer())
|
||||
senderConn.receivedSyncStep2 = true
|
||||
})
|
||||
if (conn.role === 'slave') {
|
||||
sendSyncStep1(conn, sender)
|
||||
}
|
||||
await conn.y.db.whenTransactionsFinished()
|
||||
}
|
||||
|
||||
export function logSS (decoder, strBuilder) {
|
||||
@@ -164,7 +161,7 @@ export function logMessageSyncStep2 (decoder, strBuilder) {
|
||||
logDS(decoder, strBuilder)
|
||||
}
|
||||
|
||||
export function computeMessageSyncStep2 (decoder, encoder, conn, senderConn, sender) {
|
||||
export async function computeMessageSyncStep2 (decoder, encoder, conn, senderConn, sender) {
|
||||
var db = conn.y.db
|
||||
let defer = senderConn.syncStep2
|
||||
|
||||
@@ -181,8 +178,7 @@ export function computeMessageSyncStep2 (decoder, encoder, conn, senderConn, sen
|
||||
db.requestTransaction(function * () {
|
||||
yield * this.applyDeleteSet(decoder)
|
||||
})
|
||||
return db.whenTransactionsFinished().then(() => {
|
||||
conn._setSyncedWith(sender)
|
||||
defer.resolve()
|
||||
})
|
||||
await db.whenTransactionsFinished()
|
||||
conn._setSyncedWith(sender)
|
||||
defer.resolve()
|
||||
}
|
||||
|
||||
@@ -141,7 +141,7 @@ export async function initArrays (t, opts) {
|
||||
let connOpts
|
||||
if (i === 0) {
|
||||
// Only one instance can gc!
|
||||
dbOpts = Object.assign({ gc: false }, opts.db)
|
||||
dbOpts = Object.assign({ gc: true }, opts.db)
|
||||
connOpts = Object.assign({ role: 'master' }, connector)
|
||||
} else {
|
||||
dbOpts = Object.assign({ gc: false }, opts.db)
|
||||
|
||||
195
y.node.js
195
y.node.js
@@ -1,7 +1,7 @@
|
||||
|
||||
/**
|
||||
* yjs - A framework for real-time p2p shared editing on any data
|
||||
* @version v13.0.0-12
|
||||
* @version v13.0.0-8
|
||||
* @license MIT
|
||||
*/
|
||||
|
||||
@@ -440,14 +440,14 @@ function formatYjsMessageType (buffer) {
|
||||
return decoder.readVarString()
|
||||
}
|
||||
|
||||
function logMessageUpdate (decoder, strBuilder) {
|
||||
async function logMessageUpdate (decoder, strBuilder) {
|
||||
let len = decoder.readUint32();
|
||||
for (let i = 0; i < len; i++) {
|
||||
strBuilder.push(JSON.stringify(Y.Struct.binaryDecodeOperation(decoder)) + '\n');
|
||||
}
|
||||
}
|
||||
|
||||
function computeMessageUpdate (decoder, encoder, conn) {
|
||||
async function computeMessageUpdate (decoder, encoder, conn) {
|
||||
if (conn.y.db.forwardAppliedOperations) {
|
||||
let messagePosition = decoder.pos;
|
||||
let len = decoder.readUint32();
|
||||
@@ -492,7 +492,7 @@ function logMessageSyncStep1 (decoder, strBuilder) {
|
||||
logSS(decoder, strBuilder);
|
||||
}
|
||||
|
||||
function computeMessageSyncStep1 (decoder, encoder, conn, senderConn, sender) {
|
||||
async function computeMessageSyncStep1 (decoder, encoder, conn, senderConn, sender) {
|
||||
let protocolVersion = decoder.readVarUint();
|
||||
let preferUntransformed = decoder.readUint8() === 1;
|
||||
|
||||
@@ -506,30 +506,27 @@ function computeMessageSyncStep1 (decoder, encoder, conn, senderConn, sender) {
|
||||
conn.y.destroy();
|
||||
}
|
||||
|
||||
return conn.y.db.whenTransactionsFinished().then(() => {
|
||||
// send sync step 2
|
||||
conn.y.db.requestTransaction(function * () {
|
||||
encoder.writeVarString('sync step 2');
|
||||
encoder.writeVarString(conn.authInfo || '');
|
||||
// send sync step 2
|
||||
conn.y.db.requestTransaction(function * () {
|
||||
encoder.writeVarString('sync step 2');
|
||||
encoder.writeVarString(conn.authInfo || '');
|
||||
|
||||
if (preferUntransformed) {
|
||||
encoder.writeUint8(1);
|
||||
yield * this.writeOperationsUntransformed(encoder);
|
||||
} else {
|
||||
encoder.writeUint8(0);
|
||||
yield * this.writeOperations(encoder, decoder);
|
||||
}
|
||||
if (preferUntransformed) {
|
||||
encoder.writeUint8(1);
|
||||
yield * this.writeOperationsUntransformed(encoder);
|
||||
} else {
|
||||
encoder.writeUint8(0);
|
||||
yield * this.writeOperations(encoder, decoder);
|
||||
}
|
||||
|
||||
yield * this.writeDeleteSet(encoder);
|
||||
conn.send(senderConn.uid, encoder.createBuffer());
|
||||
senderConn.receivedSyncStep2 = true;
|
||||
});
|
||||
return conn.y.db.whenTransactionsFinished().then(() => {
|
||||
if (conn.role === 'slave') {
|
||||
sendSyncStep1(conn, sender);
|
||||
}
|
||||
})
|
||||
})
|
||||
yield * this.writeDeleteSet(encoder);
|
||||
conn.send(senderConn.uid, encoder.createBuffer());
|
||||
senderConn.receivedSyncStep2 = true;
|
||||
});
|
||||
if (conn.role === 'slave') {
|
||||
sendSyncStep1(conn, sender);
|
||||
}
|
||||
await conn.y.db.whenTransactionsFinished();
|
||||
}
|
||||
|
||||
function logSS (decoder, strBuilder) {
|
||||
@@ -578,7 +575,7 @@ function logMessageSyncStep2 (decoder, strBuilder) {
|
||||
logDS(decoder, strBuilder);
|
||||
}
|
||||
|
||||
function computeMessageSyncStep2 (decoder, encoder, conn, senderConn, sender) {
|
||||
async function computeMessageSyncStep2 (decoder, encoder, conn, senderConn, sender) {
|
||||
var db = conn.y.db;
|
||||
let defer = senderConn.syncStep2;
|
||||
|
||||
@@ -595,10 +592,9 @@ function computeMessageSyncStep2 (decoder, encoder, conn, senderConn, sender) {
|
||||
db.requestTransaction(function * () {
|
||||
yield * this.applyDeleteSet(decoder);
|
||||
});
|
||||
return db.whenTransactionsFinished().then(() => {
|
||||
conn._setSyncedWith(sender);
|
||||
defer.resolve();
|
||||
})
|
||||
await db.whenTransactionsFinished();
|
||||
conn._setSyncedWith(sender);
|
||||
defer.resolve();
|
||||
}
|
||||
|
||||
function extendConnector (Y/* :any */) {
|
||||
@@ -643,12 +639,10 @@ function extendConnector (Y/* :any */) {
|
||||
this.setUserId(Y.utils.generateUserId());
|
||||
}
|
||||
}
|
||||
|
||||
reconnect () {
|
||||
this.log('reconnecting..');
|
||||
return this.y.db.startGarbageCollector()
|
||||
}
|
||||
|
||||
disconnect () {
|
||||
this.log('discronnecting..');
|
||||
this.connections = new Map();
|
||||
@@ -658,16 +652,13 @@ function extendConnector (Y/* :any */) {
|
||||
this.y.db.stopGarbageCollector();
|
||||
return this.y.db.whenTransactionsFinished()
|
||||
}
|
||||
|
||||
repair () {
|
||||
this.log('Repairing the state of Yjs. This can happen if messages get lost, and Yjs detects that something is wrong. If this happens often, please report an issue here: https://github.com/y-js/yjs/issues');
|
||||
this.connections.forEach(user => { user.isSynced = false; });
|
||||
this.isSynced = false;
|
||||
this.connections.forEach((user, userId) => {
|
||||
user.isSynced = false;
|
||||
this._syncWithUser(userId);
|
||||
});
|
||||
this.currentSyncTarget = null;
|
||||
this.findNextSyncTarget();
|
||||
}
|
||||
|
||||
setUserId (userId) {
|
||||
if (this.userId == null) {
|
||||
if (!Number.isInteger(userId)) {
|
||||
@@ -682,21 +673,20 @@ function extendConnector (Y/* :any */) {
|
||||
return null
|
||||
}
|
||||
}
|
||||
|
||||
onUserEvent (f) {
|
||||
this.userEventListeners.push(f);
|
||||
}
|
||||
|
||||
removeUserEventListener (f) {
|
||||
this.userEventListeners = this.userEventListeners.filter(g => f !== g);
|
||||
}
|
||||
|
||||
userLeft (user) {
|
||||
if (this.connections.has(user)) {
|
||||
this.log('%s: User left %s', this.userId, user);
|
||||
this.connections.delete(user);
|
||||
// check if isSynced event can be sent now
|
||||
this._setSyncedWith(null);
|
||||
if (user === this.currentSyncTarget) {
|
||||
this.currentSyncTarget = null;
|
||||
this.findNextSyncTarget();
|
||||
}
|
||||
for (var f of this.userEventListeners) {
|
||||
f({
|
||||
action: 'userLeft',
|
||||
@@ -705,7 +695,7 @@ function extendConnector (Y/* :any */) {
|
||||
}
|
||||
}
|
||||
}
|
||||
userJoined (user, role, auth) {
|
||||
userJoined (user, role) {
|
||||
if (role == null) {
|
||||
throw new Error('You must specify the role of the joined user!')
|
||||
}
|
||||
@@ -718,7 +708,7 @@ function extendConnector (Y/* :any */) {
|
||||
isSynced: false,
|
||||
role: role,
|
||||
processAfterAuth: [],
|
||||
auth: auth || null,
|
||||
auth: null,
|
||||
receivedSyncStep2: false
|
||||
});
|
||||
let defer = {};
|
||||
@@ -731,7 +721,9 @@ function extendConnector (Y/* :any */) {
|
||||
role: role
|
||||
});
|
||||
}
|
||||
this._syncWithUser(user);
|
||||
if (this.currentSyncTarget == null) {
|
||||
this.findNextSyncTarget();
|
||||
}
|
||||
}
|
||||
// Execute a function _when_ we are connected.
|
||||
// If not connected, wait until connected
|
||||
@@ -742,25 +734,39 @@ function extendConnector (Y/* :any */) {
|
||||
this.whenSyncedListeners.push(f);
|
||||
}
|
||||
}
|
||||
_syncWithUser (userid) {
|
||||
if (this.role === 'slave') {
|
||||
findNextSyncTarget () {
|
||||
if (this.currentSyncTarget != null || this.role === 'slave') {
|
||||
return // "The current sync has not finished or this is controlled by a master!"
|
||||
}
|
||||
sendSyncStep1(this, userid);
|
||||
}
|
||||
_fireIsSyncedListeners () {
|
||||
this.y.db.whenTransactionsFinished().then(() => {
|
||||
if (!this.isSynced) {
|
||||
this.isSynced = true;
|
||||
// It is safer to remove this!
|
||||
// TODO: remove: yield * this.garbageCollectAfterSync()
|
||||
// call whensynced listeners
|
||||
for (var f of this.whenSyncedListeners) {
|
||||
f();
|
||||
}
|
||||
this.whenSyncedListeners = [];
|
||||
|
||||
var syncUser = null;
|
||||
for (var [uid, user] of this.connections) {
|
||||
if (!user.isSynced) {
|
||||
syncUser = uid;
|
||||
break
|
||||
}
|
||||
});
|
||||
}
|
||||
var conn = this;
|
||||
if (syncUser != null) {
|
||||
this.currentSyncTarget = syncUser;
|
||||
sendSyncStep1(this, syncUser);
|
||||
} else {
|
||||
if (!conn.isSynced) {
|
||||
this.y.db.requestTransaction(function * () {
|
||||
if (!conn.isSynced) {
|
||||
// it is crucial that isSynced is set at the time garbageCollectAfterSync is called
|
||||
conn.isSynced = true;
|
||||
// It is safer to remove this!
|
||||
// TODO: remove: yield * this.garbageCollectAfterSync()
|
||||
// call whensynced listeners
|
||||
for (var f of conn.whenSyncedListeners) {
|
||||
f();
|
||||
}
|
||||
conn.whenSyncedListeners = [];
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
send (uid, buffer) {
|
||||
if (!(buffer instanceof ArrayBuffer || buffer instanceof Uint8Array)) {
|
||||
@@ -810,12 +816,12 @@ function extendConnector (Y/* :any */) {
|
||||
/*
|
||||
You received a raw message, and you know that it is intended for Yjs. Then call this function.
|
||||
*/
|
||||
receiveMessage (sender, buffer) {
|
||||
async receiveMessage (sender, buffer) {
|
||||
if (!(buffer instanceof ArrayBuffer || buffer instanceof Uint8Array)) {
|
||||
return Promise.reject(new Error('Expected Message to be an ArrayBuffer or Uint8Array!'))
|
||||
throw new Error('Expected Message to be an ArrayBuffer or Uint8Array!')
|
||||
}
|
||||
if (sender === this.userId) {
|
||||
return Promise.resolve()
|
||||
return
|
||||
}
|
||||
let decoder = new BinaryDecoder(buffer);
|
||||
let encoder = new BinaryEncoder();
|
||||
@@ -834,33 +840,31 @@ function extendConnector (Y/* :any */) {
|
||||
if (messageType === 'sync step 1' || messageType === 'sync step 2') {
|
||||
let auth = decoder.readVarUint();
|
||||
if (senderConn.auth == null) {
|
||||
senderConn.processAfterAuth.push([messageType, senderConn, decoder, encoder, sender]);
|
||||
// check auth
|
||||
return this.checkAuth(auth, this.y, sender).then(authPermissions => {
|
||||
if (senderConn.auth == null) {
|
||||
senderConn.auth = authPermissions;
|
||||
this.y.emit('userAuthenticated', {
|
||||
user: senderConn.uid,
|
||||
auth: authPermissions
|
||||
});
|
||||
let authPermissions = await this.checkAuth(auth, this.y, sender);
|
||||
senderConn.auth = authPermissions;
|
||||
this.y.emit('userAuthenticated', {
|
||||
user: senderConn.uid,
|
||||
auth: authPermissions
|
||||
});
|
||||
senderConn.syncStep2.promise.then(() => {
|
||||
if (senderConn.processAfterAuth == null) {
|
||||
return
|
||||
}
|
||||
let messages = senderConn.processAfterAuth;
|
||||
senderConn.processAfterAuth = [];
|
||||
|
||||
return messages.reduce((p, m) =>
|
||||
p.then(() => this.computeMessage(m[0], m[1], m[2], m[3], m[4]))
|
||||
, Promise.resolve())
|
||||
})
|
||||
for (let i = 0; i < senderConn.processAfterAuth.length; i++) {
|
||||
let m = senderConn.processAfterAuth[i];
|
||||
this.receiveMessage(m[0], m[1]);
|
||||
}
|
||||
senderConn.processAfterAuth = null;
|
||||
});
|
||||
}
|
||||
}
|
||||
if (senderConn.auth != null) {
|
||||
return this.computeMessage(messageType, senderConn, decoder, encoder, sender)
|
||||
} else {
|
||||
senderConn.processAfterAuth.push([messageType, senderConn, decoder, encoder, sender]);
|
||||
}
|
||||
}
|
||||
|
||||
computeMessage (messageType, senderConn, decoder, encoder, sender) {
|
||||
if (senderConn.auth == null) {
|
||||
senderConn.processAfterAuth.push([sender, buffer]);
|
||||
return
|
||||
}
|
||||
|
||||
if (messageType === 'sync step 1' && (senderConn.auth === 'write' || senderConn.auth === 'read')) {
|
||||
// cannot wait for sync step 1 to finish, because we may wait for sync step 2 in sync step 1 (->lock)
|
||||
computeMessageSyncStep1(decoder, encoder, this, senderConn, sender);
|
||||
@@ -870,20 +874,19 @@ function extendConnector (Y/* :any */) {
|
||||
} else if (messageType === 'update' && senderConn.auth === 'write') {
|
||||
return computeMessageUpdate(decoder, encoder, this, senderConn, sender)
|
||||
} else {
|
||||
return Promise.reject(new Error('Unable to receive message'))
|
||||
console.error('Unable to receive message');
|
||||
}
|
||||
}
|
||||
|
||||
_setSyncedWith (user) {
|
||||
if (user != null) {
|
||||
this.connections.get(user).isSynced = true;
|
||||
var conn = this.connections.get(user);
|
||||
if (conn != null) {
|
||||
conn.isSynced = true;
|
||||
}
|
||||
let conns = Array.from(this.connections.values());
|
||||
if (conns.length > 0 && conns.every(u => u.isSynced)) {
|
||||
this._fireIsSyncedListeners();
|
||||
if (user === this.currentSyncTarget) {
|
||||
this.currentSyncTarget = null;
|
||||
this.findNextSyncTarget();
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
Currently, the HB encodes operations as JSON. For the moment I want to keep it
|
||||
that way. Maybe we support encoding in the HB as XML in the future, but for now I don't want
|
||||
|
||||
File diff suppressed because one or more lines are too long
Reference in New Issue
Block a user