Compare commits

..

5 Commits

Author SHA1 Message Date
Kevin Jahns
fbfd377622 v13.0.0-12 -- distribution files 2017-08-04 18:08:20 +02:00
Kevin Jahns
3ca260e0da 13.0.0-12 2017-08-04 18:07:44 +02:00
Kevin Jahns
edb5e4f719 send sync step 1 after sync step 2 is processed (for slaves) 2017-08-04 18:06:36 +02:00
Kevin Jahns
be3b8b65ce 13.0.0-11 2017-08-04 16:30:58 +02:00
Kevin Jahns
d093ef56c8 userJoined accepts auth parameter. Sync with all users at once, instead of one at a time 2017-08-04 16:27:07 +02:00
8 changed files with 105 additions and 125 deletions

View File

@@ -1,6 +1,6 @@
{ {
"name": "yjs", "name": "yjs",
"version": "13.0.0-10", "version": "13.0.0-12",
"description": "A framework for real-time p2p shared editing on any data", "description": "A framework for real-time p2p shared editing on any data",
"main": "./y.node.js", "main": "./y.node.js",
"browser": "./y.js", "browser": "./y.js",

View File

@@ -43,10 +43,12 @@ export default function extendConnector (Y/* :any */) {
this.setUserId(Y.utils.generateUserId()) this.setUserId(Y.utils.generateUserId())
} }
} }
reconnect () { reconnect () {
this.log('reconnecting..') this.log('reconnecting..')
return this.y.db.startGarbageCollector() return this.y.db.startGarbageCollector()
} }
disconnect () { disconnect () {
this.log('discronnecting..') this.log('discronnecting..')
this.connections = new Map() this.connections = new Map()
@@ -56,13 +58,16 @@ export default function extendConnector (Y/* :any */) {
this.y.db.stopGarbageCollector() this.y.db.stopGarbageCollector()
return this.y.db.whenTransactionsFinished() return this.y.db.whenTransactionsFinished()
} }
repair () { repair () {
this.log('Repairing the state of Yjs. This can happen if messages get lost, and Yjs detects that something is wrong. If this happens often, please report an issue here: https://github.com/y-js/yjs/issues') this.log('Repairing the state of Yjs. This can happen if messages get lost, and Yjs detects that something is wrong. If this happens often, please report an issue here: https://github.com/y-js/yjs/issues')
this.connections.forEach(user => { user.isSynced = false })
this.isSynced = false this.isSynced = false
this.currentSyncTarget = null this.connections.forEach((user, userId) => {
this.findNextSyncTarget() user.isSynced = false
this._syncWithUser(userId)
})
} }
setUserId (userId) { setUserId (userId) {
if (this.userId == null) { if (this.userId == null) {
if (!Number.isInteger(userId)) { if (!Number.isInteger(userId)) {
@@ -77,20 +82,21 @@ export default function extendConnector (Y/* :any */) {
return null return null
} }
} }
onUserEvent (f) { onUserEvent (f) {
this.userEventListeners.push(f) this.userEventListeners.push(f)
} }
removeUserEventListener (f) { removeUserEventListener (f) {
this.userEventListeners = this.userEventListeners.filter(g => f !== g) this.userEventListeners = this.userEventListeners.filter(g => f !== g)
} }
userLeft (user) { userLeft (user) {
if (this.connections.has(user)) { if (this.connections.has(user)) {
this.log('%s: User left %s', this.userId, user) this.log('%s: User left %s', this.userId, user)
this.connections.delete(user) this.connections.delete(user)
if (user === this.currentSyncTarget) { // check if isSynced event can be sent now
this.currentSyncTarget = null this._setSyncedWith(null)
this.findNextSyncTarget()
}
for (var f of this.userEventListeners) { for (var f of this.userEventListeners) {
f({ f({
action: 'userLeft', action: 'userLeft',
@@ -99,7 +105,7 @@ export default function extendConnector (Y/* :any */) {
} }
} }
} }
userJoined (user, role) { userJoined (user, role, auth) {
if (role == null) { if (role == null) {
throw new Error('You must specify the role of the joined user!') throw new Error('You must specify the role of the joined user!')
} }
@@ -112,7 +118,7 @@ export default function extendConnector (Y/* :any */) {
isSynced: false, isSynced: false,
role: role, role: role,
processAfterAuth: [], processAfterAuth: [],
auth: null, auth: auth || null,
receivedSyncStep2: false receivedSyncStep2: false
}) })
let defer = {} let defer = {}
@@ -125,9 +131,7 @@ export default function extendConnector (Y/* :any */) {
role: role role: role
}) })
} }
if (this.currentSyncTarget == null) { this._syncWithUser(user)
this.findNextSyncTarget()
}
} }
// Execute a function _when_ we are connected. // Execute a function _when_ we are connected.
// If not connected, wait until connected // If not connected, wait until connected
@@ -138,27 +142,11 @@ export default function extendConnector (Y/* :any */) {
this.whenSyncedListeners.push(f) this.whenSyncedListeners.push(f)
} }
} }
findNextSyncTarget () { _syncWithUser (userid) {
if (this.currentSyncTarget != null || this.role === 'slave') { if (this.role === 'slave') {
return // "The current sync has not finished or this is controlled by a master!" return // "The current sync has not finished or this is controlled by a master!"
} }
sendSyncStep1(this, userid)
var syncUser = null
for (var [uid, user] of this.connections) {
if (!user.isSynced) {
syncUser = uid
break
}
}
var conn = this
if (syncUser != null) {
this.currentSyncTarget = syncUser
sendSyncStep1(this, syncUser)
} else {
if (!conn.isSynced) {
conn._fireIsSyncedListeners()
}
}
} }
_fireIsSyncedListeners () { _fireIsSyncedListeners () {
this.y.db.whenTransactionsFinished().then(() => { this.y.db.whenTransactionsFinished().then(() => {
@@ -271,6 +259,7 @@ export default function extendConnector (Y/* :any */) {
senderConn.processAfterAuth.push([messageType, senderConn, decoder, encoder, sender]) senderConn.processAfterAuth.push([messageType, senderConn, decoder, encoder, sender])
} }
} }
computeMessage (messageType, senderConn, decoder, encoder, sender) { computeMessage (messageType, senderConn, decoder, encoder, sender) {
if (messageType === 'sync step 1' && (senderConn.auth === 'write' || senderConn.auth === 'read')) { if (messageType === 'sync step 1' && (senderConn.auth === 'write' || senderConn.auth === 'read')) {
// cannot wait for sync step 1 to finish, because we may wait for sync step 2 in sync step 1 (->lock) // cannot wait for sync step 1 to finish, because we may wait for sync step 2 in sync step 1 (->lock)
@@ -284,19 +273,17 @@ export default function extendConnector (Y/* :any */) {
return Promise.reject(new Error('Unable to receive message')) return Promise.reject(new Error('Unable to receive message'))
} }
} }
_setSyncedWith (user) { _setSyncedWith (user) {
var conn = this.connections.get(user) if (user != null) {
if (conn != null) { this.connections.get(user).isSynced = true
conn.isSynced = true
} }
if (user === this.currentSyncTarget) { let conns = Array.from(this.connections.values())
this.currentSyncTarget = null if (conns.length > 0 && conns.every(u => u.isSynced)) {
this.findNextSyncTarget()
}
if (this.role === 'slave' && conn.role === 'master') {
this._fireIsSyncedListeners() this._fireIsSyncedListeners()
} }
} }
/* /*
Currently, the HB encodes operations as JSON. For the moment I want to keep it Currently, the HB encodes operations as JSON. For the moment I want to keep it
that way. Maybe we support encoding in the HB as XML in the future, but for now I don't want that way. Maybe we support encoding in the HB as XML in the future, but for now I don't want

View File

@@ -92,27 +92,30 @@ export function computeMessageSyncStep1 (decoder, encoder, conn, senderConn, sen
conn.y.destroy() conn.y.destroy()
} }
// send sync step 2 return conn.y.db.whenTransactionsFinished().then(() => {
conn.y.db.requestTransaction(function * () { // send sync step 2
encoder.writeVarString('sync step 2') conn.y.db.requestTransaction(function * () {
encoder.writeVarString(conn.authInfo || '') encoder.writeVarString('sync step 2')
encoder.writeVarString(conn.authInfo || '')
if (preferUntransformed) { if (preferUntransformed) {
encoder.writeUint8(1) encoder.writeUint8(1)
yield * this.writeOperationsUntransformed(encoder) yield * this.writeOperationsUntransformed(encoder)
} else { } else {
encoder.writeUint8(0) encoder.writeUint8(0)
yield * this.writeOperations(encoder, decoder) yield * this.writeOperations(encoder, decoder)
} }
yield * this.writeDeleteSet(encoder) yield * this.writeDeleteSet(encoder)
conn.send(senderConn.uid, encoder.createBuffer()) conn.send(senderConn.uid, encoder.createBuffer())
senderConn.receivedSyncStep2 = true senderConn.receivedSyncStep2 = true
})
return conn.y.db.whenTransactionsFinished().then(() => {
if (conn.role === 'slave') {
sendSyncStep1(conn, sender)
}
})
}) })
if (conn.role === 'slave') {
sendSyncStep1(conn, sender)
}
return conn.y.db.whenTransactionsFinished()
} }
export function logSS (decoder, strBuilder) { export function logSS (decoder, strBuilder) {

View File

@@ -141,7 +141,7 @@ export async function initArrays (t, opts) {
let connOpts let connOpts
if (i === 0) { if (i === 0) {
// Only one instance can gc! // Only one instance can gc!
dbOpts = Object.assign({ gc: true }, opts.db) dbOpts = Object.assign({ gc: false }, opts.db)
connOpts = Object.assign({ role: 'master' }, connector) connOpts = Object.assign({ role: 'master' }, connector)
} else { } else {
dbOpts = Object.assign({ gc: false }, opts.db) dbOpts = Object.assign({ gc: false }, opts.db)

10
y.js

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

108
y.node.js
View File

@@ -1,7 +1,7 @@
/** /**
* yjs - A framework for real-time p2p shared editing on any data * yjs - A framework for real-time p2p shared editing on any data
* @version v13.0.0-10 * @version v13.0.0-12
* @license MIT * @license MIT
*/ */
@@ -506,27 +506,30 @@ function computeMessageSyncStep1 (decoder, encoder, conn, senderConn, sender) {
conn.y.destroy(); conn.y.destroy();
} }
// send sync step 2 return conn.y.db.whenTransactionsFinished().then(() => {
conn.y.db.requestTransaction(function * () { // send sync step 2
encoder.writeVarString('sync step 2'); conn.y.db.requestTransaction(function * () {
encoder.writeVarString(conn.authInfo || ''); encoder.writeVarString('sync step 2');
encoder.writeVarString(conn.authInfo || '');
if (preferUntransformed) { if (preferUntransformed) {
encoder.writeUint8(1); encoder.writeUint8(1);
yield * this.writeOperationsUntransformed(encoder); yield * this.writeOperationsUntransformed(encoder);
} else { } else {
encoder.writeUint8(0); encoder.writeUint8(0);
yield * this.writeOperations(encoder, decoder); yield * this.writeOperations(encoder, decoder);
} }
yield * this.writeDeleteSet(encoder); yield * this.writeDeleteSet(encoder);
conn.send(senderConn.uid, encoder.createBuffer()); conn.send(senderConn.uid, encoder.createBuffer());
senderConn.receivedSyncStep2 = true; senderConn.receivedSyncStep2 = true;
}); });
if (conn.role === 'slave') { return conn.y.db.whenTransactionsFinished().then(() => {
sendSyncStep1(conn, sender); if (conn.role === 'slave') {
} sendSyncStep1(conn, sender);
return conn.y.db.whenTransactionsFinished() }
})
})
} }
function logSS (decoder, strBuilder) { function logSS (decoder, strBuilder) {
@@ -640,10 +643,12 @@ function extendConnector (Y/* :any */) {
this.setUserId(Y.utils.generateUserId()); this.setUserId(Y.utils.generateUserId());
} }
} }
reconnect () { reconnect () {
this.log('reconnecting..'); this.log('reconnecting..');
return this.y.db.startGarbageCollector() return this.y.db.startGarbageCollector()
} }
disconnect () { disconnect () {
this.log('discronnecting..'); this.log('discronnecting..');
this.connections = new Map(); this.connections = new Map();
@@ -653,13 +658,16 @@ function extendConnector (Y/* :any */) {
this.y.db.stopGarbageCollector(); this.y.db.stopGarbageCollector();
return this.y.db.whenTransactionsFinished() return this.y.db.whenTransactionsFinished()
} }
repair () { repair () {
this.log('Repairing the state of Yjs. This can happen if messages get lost, and Yjs detects that something is wrong. If this happens often, please report an issue here: https://github.com/y-js/yjs/issues'); this.log('Repairing the state of Yjs. This can happen if messages get lost, and Yjs detects that something is wrong. If this happens often, please report an issue here: https://github.com/y-js/yjs/issues');
this.connections.forEach(user => { user.isSynced = false; });
this.isSynced = false; this.isSynced = false;
this.currentSyncTarget = null; this.connections.forEach((user, userId) => {
this.findNextSyncTarget(); user.isSynced = false;
this._syncWithUser(userId);
});
} }
setUserId (userId) { setUserId (userId) {
if (this.userId == null) { if (this.userId == null) {
if (!Number.isInteger(userId)) { if (!Number.isInteger(userId)) {
@@ -674,20 +682,21 @@ function extendConnector (Y/* :any */) {
return null return null
} }
} }
onUserEvent (f) { onUserEvent (f) {
this.userEventListeners.push(f); this.userEventListeners.push(f);
} }
removeUserEventListener (f) { removeUserEventListener (f) {
this.userEventListeners = this.userEventListeners.filter(g => f !== g); this.userEventListeners = this.userEventListeners.filter(g => f !== g);
} }
userLeft (user) { userLeft (user) {
if (this.connections.has(user)) { if (this.connections.has(user)) {
this.log('%s: User left %s', this.userId, user); this.log('%s: User left %s', this.userId, user);
this.connections.delete(user); this.connections.delete(user);
if (user === this.currentSyncTarget) { // check if isSynced event can be sent now
this.currentSyncTarget = null; this._setSyncedWith(null);
this.findNextSyncTarget();
}
for (var f of this.userEventListeners) { for (var f of this.userEventListeners) {
f({ f({
action: 'userLeft', action: 'userLeft',
@@ -696,7 +705,7 @@ function extendConnector (Y/* :any */) {
} }
} }
} }
userJoined (user, role) { userJoined (user, role, auth) {
if (role == null) { if (role == null) {
throw new Error('You must specify the role of the joined user!') throw new Error('You must specify the role of the joined user!')
} }
@@ -709,7 +718,7 @@ function extendConnector (Y/* :any */) {
isSynced: false, isSynced: false,
role: role, role: role,
processAfterAuth: [], processAfterAuth: [],
auth: null, auth: auth || null,
receivedSyncStep2: false receivedSyncStep2: false
}); });
let defer = {}; let defer = {};
@@ -722,9 +731,7 @@ function extendConnector (Y/* :any */) {
role: role role: role
}); });
} }
if (this.currentSyncTarget == null) { this._syncWithUser(user);
this.findNextSyncTarget();
}
} }
// Execute a function _when_ we are connected. // Execute a function _when_ we are connected.
// If not connected, wait until connected // If not connected, wait until connected
@@ -735,27 +742,11 @@ function extendConnector (Y/* :any */) {
this.whenSyncedListeners.push(f); this.whenSyncedListeners.push(f);
} }
} }
findNextSyncTarget () { _syncWithUser (userid) {
if (this.currentSyncTarget != null || this.role === 'slave') { if (this.role === 'slave') {
return // "The current sync has not finished or this is controlled by a master!" return // "The current sync has not finished or this is controlled by a master!"
} }
sendSyncStep1(this, userid);
var syncUser = null;
for (var [uid, user] of this.connections) {
if (!user.isSynced) {
syncUser = uid;
break
}
}
var conn = this;
if (syncUser != null) {
this.currentSyncTarget = syncUser;
sendSyncStep1(this, syncUser);
} else {
if (!conn.isSynced) {
conn._fireIsSyncedListeners();
}
}
} }
_fireIsSyncedListeners () { _fireIsSyncedListeners () {
this.y.db.whenTransactionsFinished().then(() => { this.y.db.whenTransactionsFinished().then(() => {
@@ -868,6 +859,7 @@ function extendConnector (Y/* :any */) {
senderConn.processAfterAuth.push([messageType, senderConn, decoder, encoder, sender]); senderConn.processAfterAuth.push([messageType, senderConn, decoder, encoder, sender]);
} }
} }
computeMessage (messageType, senderConn, decoder, encoder, sender) { computeMessage (messageType, senderConn, decoder, encoder, sender) {
if (messageType === 'sync step 1' && (senderConn.auth === 'write' || senderConn.auth === 'read')) { if (messageType === 'sync step 1' && (senderConn.auth === 'write' || senderConn.auth === 'read')) {
// cannot wait for sync step 1 to finish, because we may wait for sync step 2 in sync step 1 (->lock) // cannot wait for sync step 1 to finish, because we may wait for sync step 2 in sync step 1 (->lock)
@@ -881,19 +873,17 @@ function extendConnector (Y/* :any */) {
return Promise.reject(new Error('Unable to receive message')) return Promise.reject(new Error('Unable to receive message'))
} }
} }
_setSyncedWith (user) { _setSyncedWith (user) {
var conn = this.connections.get(user); if (user != null) {
if (conn != null) { this.connections.get(user).isSynced = true;
conn.isSynced = true;
} }
if (user === this.currentSyncTarget) { let conns = Array.from(this.connections.values());
this.currentSyncTarget = null; if (conns.length > 0 && conns.every(u => u.isSynced)) {
this.findNextSyncTarget();
}
if (this.role === 'slave' && conn.role === 'master') {
this._fireIsSyncedListeners(); this._fireIsSyncedListeners();
} }
} }
/* /*
Currently, the HB encodes operations as JSON. For the moment I want to keep it Currently, the HB encodes operations as JSON. For the moment I want to keep it
that way. Maybe we support encoding in the HB as XML in the future, but for now I don't want that way. Maybe we support encoding in the HB as XML in the future, but for now I don't want

File diff suppressed because one or more lines are too long