Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
47bcec8bc7 | ||
|
|
81c8504462 | ||
|
|
c926ce09f5 |
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "yjs",
|
"name": "yjs",
|
||||||
"version": "11.2.2",
|
"version": "11.2.6",
|
||||||
"homepage": "y-js.org",
|
"homepage": "y-js.org",
|
||||||
"authors": [
|
"authors": [
|
||||||
"Kevin Jahns <kevin.jahns@rwth-aachen.de>"
|
"Kevin Jahns <kevin.jahns@rwth-aachen.de>"
|
||||||
|
|||||||
57
y.es6
57
y.es6
@@ -67,6 +67,16 @@ module.exports = function (Y/* :any */) {
|
|||||||
this.whenSyncedListeners = []
|
this.whenSyncedListeners = []
|
||||||
return this.y.db.stopGarbageCollector()
|
return this.y.db.stopGarbageCollector()
|
||||||
}
|
}
|
||||||
|
repair () {
|
||||||
|
console.info('Repairing the state of Yjs. This can happen if messages get lost, and Yjs detects that something is wrong. If this happens often, please report an issue here: https://github.com/y-js/yjs/issues')
|
||||||
|
for (var name in this.connections) {
|
||||||
|
this.connections[name].isSynced = false
|
||||||
|
}
|
||||||
|
this.isSynced = false
|
||||||
|
this.currentSyncTarget = null
|
||||||
|
this.broadcastedHB = false
|
||||||
|
this.findNextSyncTarget()
|
||||||
|
}
|
||||||
setUserId (userId) {
|
setUserId (userId) {
|
||||||
if (this.userId == null) {
|
if (this.userId == null) {
|
||||||
this.userId = userId
|
this.userId = userId
|
||||||
@@ -696,6 +706,41 @@ module.exports = function (Y /* :any */) {
|
|||||||
if (this.gcTimeout > 0) {
|
if (this.gcTimeout > 0) {
|
||||||
garbageCollect()
|
garbageCollect()
|
||||||
}
|
}
|
||||||
|
this.repairCheckInterval = !opts.repairCheckInterval ? 6000 : opts.repairCheckInterval
|
||||||
|
this.opsReceivedTimestamp = new Date()
|
||||||
|
this.startRepairCheck()
|
||||||
|
}
|
||||||
|
startRepairCheck () {
|
||||||
|
var os = this
|
||||||
|
if (this.repairCheckInterval > 0) {
|
||||||
|
this.repairCheckIntervalHandler = setInterval(function repairOnMissingOperations () {
|
||||||
|
/*
|
||||||
|
Case 1. No ops have been received in a while (new Date() - os.opsReceivedTimestamp > os.repairCheckInterval)
|
||||||
|
- 1.1 os.listenersById is empty. Then the state was correct the whole time. -> Nothing to do (nor to update)
|
||||||
|
- 1.2 os.listenersById is not empty.
|
||||||
|
* Then the state was incorrect for at least {os.repairCheckInterval} seconds.
|
||||||
|
* -> Remove everything in os.listenersById and sync again (connector.repair())
|
||||||
|
Case 2. An op has been received in the last {os.repairCheckInterval } seconds.
|
||||||
|
It is not yet necessary to check for faulty behavior. Everything can still resolve itself. Wait for more messages.
|
||||||
|
If nothing was received for a while and os.listenersById is still not emty, we are in case 1.2
|
||||||
|
-> Do nothing
|
||||||
|
|
||||||
|
Baseline here is: we really only have to catch case 1.2..
|
||||||
|
*/
|
||||||
|
if (
|
||||||
|
new Date() - os.opsReceivedTimestamp > os.repairCheckInterval &&
|
||||||
|
Object.keys(os.listenersById).length > 0 // os.listenersById is not empty
|
||||||
|
) {
|
||||||
|
// haven't received operations for over {os.repairCheckInterval} seconds, resend state vector
|
||||||
|
os.listenersById = {}
|
||||||
|
os.opsReceivedTimestamp = new Date() // update so you don't send repair several times in a row
|
||||||
|
os.y.connector.repair()
|
||||||
|
}
|
||||||
|
}, this.repairCheckInterval)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
stopRepairCheck () {
|
||||||
|
clearInterval(this.repairCheckIntervalHandler)
|
||||||
}
|
}
|
||||||
queueGarbageCollector (id) {
|
queueGarbageCollector (id) {
|
||||||
if (this.y.isConnected()) {
|
if (this.y.isConnected()) {
|
||||||
@@ -791,6 +836,7 @@ module.exports = function (Y /* :any */) {
|
|||||||
* destroy () {
|
* destroy () {
|
||||||
clearInterval(this.gcInterval)
|
clearInterval(this.gcInterval)
|
||||||
this.gcInterval = null
|
this.gcInterval = null
|
||||||
|
this.stopRepairCheck()
|
||||||
for (var key in this.initializedTypes) {
|
for (var key in this.initializedTypes) {
|
||||||
var type = this.initializedTypes[key]
|
var type = this.initializedTypes[key]
|
||||||
if (type._destroy != null) {
|
if (type._destroy != null) {
|
||||||
@@ -830,12 +876,14 @@ module.exports = function (Y /* :any */) {
|
|||||||
/*
|
/*
|
||||||
Apply a list of operations.
|
Apply a list of operations.
|
||||||
|
|
||||||
|
* we save a timestamp, because we received new operations that could resolve ops in this.listenersById (see this.startRepairCheck)
|
||||||
* get a transaction
|
* get a transaction
|
||||||
* check whether all Struct.*.requiredOps are in the OS
|
* check whether all Struct.*.requiredOps are in the OS
|
||||||
* check if it is an expected op (otherwise wait for it)
|
* check if it is an expected op (otherwise wait for it)
|
||||||
* check if was deleted, apply a delete operation after op was applied
|
* check if was deleted, apply a delete operation after op was applied
|
||||||
*/
|
*/
|
||||||
apply (ops) {
|
apply (ops) {
|
||||||
|
this.opsReceivedTimestamp = new Date()
|
||||||
for (var i = 0; i < ops.length; i++) {
|
for (var i = 0; i < ops.length; i++) {
|
||||||
var o = ops[i]
|
var o = ops[i]
|
||||||
if (o.id == null || o.id[0] !== this.y.connector.userId) {
|
if (o.id == null || o.id[0] !== this.y.connector.userId) {
|
||||||
@@ -954,13 +1002,12 @@ module.exports = function (Y /* :any */) {
|
|||||||
var opid = op.id
|
var opid = op.id
|
||||||
var isGarbageCollected = yield* this.isGarbageCollected(opid)
|
var isGarbageCollected = yield* this.isGarbageCollected(opid)
|
||||||
if (!isGarbageCollected) {
|
if (!isGarbageCollected) {
|
||||||
|
// TODO: reduce number of get / put calls for op ..
|
||||||
yield* Y.Struct[op.struct].execute.call(this, op)
|
yield* Y.Struct[op.struct].execute.call(this, op)
|
||||||
yield* this.addOperation(op)
|
yield* this.addOperation(op)
|
||||||
yield* this.store.operationAdded(this, op)
|
yield* this.store.operationAdded(this, op)
|
||||||
if (!Y.utils.compareIds(opid, op.id)) {
|
// operationAdded can change op..
|
||||||
// operationAdded changed op
|
op = yield* this.getOperation(opid)
|
||||||
op = yield* this.getOperation(opid)
|
|
||||||
}
|
|
||||||
// if insertion, try to combine with left
|
// if insertion, try to combine with left
|
||||||
yield* this.tryCombineWithLeft(op)
|
yield* this.tryCombineWithLeft(op)
|
||||||
}
|
}
|
||||||
@@ -1024,6 +1071,7 @@ module.exports = function (Y /* :any */) {
|
|||||||
// Delete if DS says this is actually deleted
|
// Delete if DS says this is actually deleted
|
||||||
var len = op.content != null ? op.content.length : 1
|
var len = op.content != null ? op.content.length : 1
|
||||||
var startId = op.id // You must not use op.id in the following loop, because op will change when deleted
|
var startId = op.id // You must not use op.id in the following loop, because op will change when deleted
|
||||||
|
// TODO: !! console.log('TODO: change this before commiting')
|
||||||
for (let i = 0; i < len; i++) {
|
for (let i = 0; i < len; i++) {
|
||||||
var id = [startId[0], startId[1] + i]
|
var id = [startId[0], startId[1] + i]
|
||||||
var opIsDeleted = yield* transaction.isDeleted(id)
|
var opIsDeleted = yield* transaction.isDeleted(id)
|
||||||
@@ -2505,6 +2553,7 @@ module.exports = function (Y/* :any */) {
|
|||||||
if (firstMissing != null) {
|
if (firstMissing != null) {
|
||||||
// update startPos
|
// update startPos
|
||||||
startPos = firstMissing.id[1]
|
startPos = firstMissing.id[1]
|
||||||
|
startSS[user] = startPos
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
yield* this.os.iterate(this, [user, startPos], [user, Number.MAX_VALUE], function * (op) {
|
yield* this.os.iterate(this, [user, startPos], [user, Number.MAX_VALUE], function * (op) {
|
||||||
|
|||||||
Reference in New Issue
Block a user