968 lines
34 KiB
JavaScript
968 lines
34 KiB
JavaScript
/* @flow */
|
|
'use strict'
|
|
|
|
/*
|
|
Partial definition of a transaction
|
|
|
|
A transaction provides all the the async functionality on a database.
|
|
|
|
By convention, a transaction has the following properties:
|
|
* ss for StateSet
|
|
* os for OperationStore
|
|
* ds for DeleteStore
|
|
|
|
A transaction must also define the following methods:
|
|
* checkDeleteStoreForState(state)
|
|
- When increasing the state of a user, an operation with an higher id
|
|
may already be garbage collected, and therefore it will never be received.
|
|
update the state to reflect this knowledge. This won't call a method to save the state!
|
|
* getDeleteSet(id)
|
|
- Get the delete set in a readable format:
|
|
{
|
|
"userX": [
|
|
[5,1], // starting from position 5, one operations is deleted
|
|
[9,4] // starting from position 9, four operations are deleted
|
|
],
|
|
"userY": ...
|
|
}
|
|
* getOpsFromDeleteSet(ds) -- TODO: just call this.deleteOperation(id) here
|
|
- get a set of deletions that need to be applied in order to get to
|
|
achieve the state of the supplied ds
|
|
* setOperation(op)
|
|
- write `op` to the database.
|
|
Note: this is allowed to return an in-memory object.
|
|
E.g. the Memory adapter returns the object that it has in-memory.
|
|
Changing values on this object will be stored directly in the database
|
|
without calling this function. Therefore,
|
|
setOperation may have no functionality in some adapters. This also has
|
|
implications on the way we use operations that were served from the database.
|
|
We try not to call copyObject, if not necessary.
|
|
* addOperation(op)
|
|
- add an operation to the database.
|
|
This may only be called once for every op.id
|
|
Must return a function that returns the next operation in the database (ordered by id)
|
|
* getOperation(id)
|
|
* removeOperation(id)
|
|
- remove an operation from the database. This is called when an operation
|
|
is garbage collected.
|
|
* setState(state)
|
|
- `state` is of the form
|
|
{
|
|
user: "1",
|
|
clock: 4
|
|
} <- meaning that we have four operations from user "1"
|
|
(with these id's respectively: 0, 1, 2, and 3)
|
|
* getState(user)
|
|
* getStateVector()
|
|
- Get the state of the OS in the form
|
|
[{
|
|
user: "userX",
|
|
clock: 11
|
|
},
|
|
..
|
|
]
|
|
* getStateSet()
|
|
- Get the state of the OS in the form
|
|
{
|
|
"userX": 11,
|
|
"userY": 22
|
|
}
|
|
* getOperations(startSS)
|
|
- Get the all the operations that are necessary in order to achive the
|
|
stateSet of this user, starting from a stateSet supplied by another user
|
|
* makeOperationReady(ss, op)
|
|
- this is called only by `getOperations(startSS)`. It makes an operation
|
|
applyable on a given SS.
|
|
*/
|
|
module.exports = function (Y/* :any */) {
|
|
class TransactionInterface {
|
|
/* ::
|
|
store: Y.AbstractDatabase;
|
|
ds: Store;
|
|
os: Store;
|
|
ss: Store;
|
|
*/
|
|
/*
|
|
Get a type based on the id of its model.
|
|
If it does not exist yes, create it.
|
|
TODO: delete type from store.initializedTypes[id] when corresponding id was deleted!
|
|
*/
|
|
* getType (id, args) {
|
|
var sid = JSON.stringify(id)
|
|
var t = this.store.initializedTypes[sid]
|
|
if (t == null) {
|
|
var op/* :MapStruct | ListStruct */ = yield* this.getOperation(id)
|
|
if (op != null) {
|
|
t = yield* Y[op.type].typeDefinition.initType.call(this, this.store, op, args)
|
|
this.store.initializedTypes[sid] = t
|
|
}
|
|
}
|
|
return t
|
|
}
|
|
* createType (typedefinition, id) {
|
|
var structname = typedefinition[0].struct
|
|
id = id || this.store.getNextOpId()
|
|
var op
|
|
if (id[0] === '_') {
|
|
op = yield* this.getOperation(id)
|
|
} else {
|
|
op = Y.Struct[structname].create(id)
|
|
op.type = typedefinition[0].name
|
|
}
|
|
if (typedefinition[0].appendAdditionalInfo != null) {
|
|
yield* typedefinition[0].appendAdditionalInfo.call(this, op, typedefinition[1])
|
|
}
|
|
if (op[0] === '_') {
|
|
yield* this.setOperation(op)
|
|
} else {
|
|
yield* this.applyCreatedOperations([op])
|
|
}
|
|
return yield* this.getType(id, typedefinition[1])
|
|
}
|
|
/* createType (typedefinition, id) {
|
|
var structname = typedefinition[0].struct
|
|
id = id || this.store.getNextOpId()
|
|
var op = Y.Struct[structname].create(id)
|
|
op.type = typedefinition[0].name
|
|
if (typedefinition[0].appendAdditionalInfo != null) {
|
|
yield* typedefinition[0].appendAdditionalInfo.call(this, op, typedefinition[1])
|
|
}
|
|
// yield* this.applyCreatedOperations([op])
|
|
yield* Y.Struct[op.struct].execute.call(this, op)
|
|
yield* this.addOperation(op)
|
|
yield* this.store.operationAdded(this, op)
|
|
return yield* this.getType(id, typedefinition[1])
|
|
}*/
|
|
/*
|
|
Apply operations that this user created (no remote ones!)
|
|
* does not check for Struct.*.requiredOps()
|
|
* also broadcasts it through the connector
|
|
*/
|
|
* applyCreatedOperations (ops) {
|
|
var send = []
|
|
for (var i = 0; i < ops.length; i++) {
|
|
var op = ops[i]
|
|
yield* this.store.tryExecute.call(this, op)
|
|
if (op.id == null || typeof op.id[1] !== 'string') {
|
|
send.push(Y.Struct[op.struct].encode(op))
|
|
}
|
|
}
|
|
if (!this.store.y.connector.isDisconnected() && send.length > 0) { // TODO: && !this.store.forwardAppliedOperations (but then i don't send delete ops)
|
|
// is connected, and this is not going to be send in addOperation
|
|
this.store.y.connector.broadcastOps(send)
|
|
}
|
|
}
|
|
|
|
* deleteList (start) {
|
|
while (start != null) {
|
|
start = yield* this.getOperation(start)
|
|
if (!start.gc) {
|
|
start.gc = true
|
|
start.deleted = true
|
|
yield* this.setOperation(start)
|
|
yield* this.markDeleted(start.id, 1)
|
|
if (start.opContent != null) {
|
|
yield* this.deleteOperation(start.opContent)
|
|
}
|
|
if (this.store.y.connector.isSynced){
|
|
this.store.gc1.push(start.id)
|
|
}
|
|
}
|
|
start = start.right
|
|
}
|
|
}
|
|
|
|
/*
|
|
Mark an operation as deleted, and add it to the GC, if possible.
|
|
*/
|
|
* deleteOperation (targetId, preventCallType) /* :Generator<any, any, any> */ {
|
|
var target = yield* this.getOperation(targetId)
|
|
var callType = false
|
|
|
|
if (target == null || !target.deleted) {
|
|
yield* this.markDeleted(targetId, 1)
|
|
}
|
|
|
|
if (target != null) {
|
|
if (!target.deleted) {
|
|
callType = true
|
|
// set deleted & notify type
|
|
target.deleted = true
|
|
/*
|
|
if (!preventCallType) {
|
|
var type = this.store.initializedTypes[JSON.stringify(target.parent)]
|
|
if (type != null) {
|
|
yield* type._changed(this, {
|
|
struct: 'Delete',
|
|
target: targetId
|
|
})
|
|
}
|
|
}
|
|
*/
|
|
// delete containing lists
|
|
if (target.start != null) {
|
|
// TODO: don't do it like this .. -.-
|
|
yield* this.deleteList(target.start)
|
|
// yield* this.deleteList(target.id) -- do not gc itself because this may still get referenced
|
|
}
|
|
if (target.map != null) {
|
|
for (var name in target.map) {
|
|
yield* this.deleteList(target.map[name])
|
|
}
|
|
// TODO: here to.. (see above)
|
|
// yield* this.deleteList(target.id) -- see above
|
|
}
|
|
if (target.opContent != null) {
|
|
yield* this.deleteOperation(target.opContent)
|
|
// target.opContent = null
|
|
}
|
|
if (target.requires != null) {
|
|
for (var i = 0; i < target.requires.length; i++) {
|
|
yield* this.deleteOperation(target.requires[i])
|
|
}
|
|
}
|
|
}
|
|
var left
|
|
if (target.left != null) {
|
|
left = yield* this.getOperation(target.left)
|
|
} else {
|
|
left = null
|
|
}
|
|
|
|
this.store.addToGarbageCollector(target, left)
|
|
|
|
// set here because it was deleted and/or gc'd
|
|
yield* this.setOperation(target)
|
|
|
|
/*
|
|
Check if it is possible to add right to the gc.
|
|
Because this delete can't be responsible for left being gc'd,
|
|
we don't have to add left to the gc..
|
|
*/
|
|
var right
|
|
if (target.right != null) {
|
|
right = yield* this.getOperation(target.right)
|
|
} else {
|
|
right = null
|
|
}
|
|
if (
|
|
right != null &&
|
|
this.store.addToGarbageCollector(right, target)
|
|
) {
|
|
yield* this.setOperation(right)
|
|
}
|
|
return callType
|
|
}
|
|
}
|
|
/*
|
|
Mark an operation as deleted&gc'd
|
|
*/
|
|
* markGarbageCollected (id, len) {
|
|
// this.mem.push(["gc", id]);
|
|
var n = yield* this.markDeleted(id, len)
|
|
if (n.id[1] < id[1] && !n.gc) {
|
|
// un-extend left
|
|
var newlen = n.len - (id[1] - n.id[1])
|
|
n.len -= newlen
|
|
yield* this.ds.put(n)
|
|
n = {id: id, len: newlen, gc: false}
|
|
yield* this.ds.put(n)
|
|
}
|
|
// get prev&next before adding a new operation
|
|
var prev = yield* this.ds.findPrev(id)
|
|
var next = yield* this.ds.findNext(id)
|
|
|
|
if (id[1] < n.id[1] + n.len - len && !n.gc) {
|
|
// un-extend right
|
|
yield* this.ds.put({id: [id[0], id[1] + 1], len: n.len - 1, gc: false})
|
|
n.len = 1
|
|
}
|
|
// set gc'd
|
|
n.gc = true
|
|
// can extend left?
|
|
if (
|
|
prev != null &&
|
|
prev.gc &&
|
|
Y.utils.compareIds([prev.id[0], prev.id[1] + prev.len], n.id)
|
|
) {
|
|
prev.len += n.len
|
|
yield* this.ds.delete(n.id)
|
|
n = prev
|
|
// ds.put n here?
|
|
}
|
|
// can extend right?
|
|
if (
|
|
next != null &&
|
|
next.gc &&
|
|
Y.utils.compareIds([n.id[0], n.id[1] + n.len], next.id)
|
|
) {
|
|
n.len += next.len
|
|
yield* this.ds.delete(next.id)
|
|
}
|
|
yield* this.ds.put(n)
|
|
}
|
|
/*
|
|
Mark an operation as deleted.
|
|
|
|
returns the delete node
|
|
*/
|
|
* markDeleted (id, length) {
|
|
if (length == null) {
|
|
length = 1
|
|
}
|
|
// this.mem.push(["del", id]);
|
|
var n = yield* this.ds.findWithUpperBound(id)
|
|
if (n != null && n.id[0] === id[0]) {
|
|
if (n.id[1] <= id[1] && id[1] <= n.id[1] + n.len) {
|
|
// id is in n's range
|
|
var diff = id[1] + length - (n.id[1] + n.len) // overlapping right
|
|
if (diff > 0) {
|
|
// id+length overlaps n
|
|
if (!n.gc) {
|
|
n.len += diff
|
|
} else {
|
|
diff = n.id[1] + n.len - id[1] // overlapping left (id till n.end)
|
|
if (diff < length) {
|
|
// a partial deletion
|
|
n = {id: [id[0], id[1] + diff], len: length - diff, gc: false}
|
|
yield* this.ds.put(n)
|
|
} else {
|
|
// already gc'd
|
|
throw new Error('Cannot happen! (it dit though.. :()')
|
|
// return n
|
|
}
|
|
}
|
|
} else {
|
|
// no overlapping, already deleted
|
|
return n
|
|
}
|
|
} else {
|
|
// cannot extend left (there is no left!)
|
|
n = {id: id, len: length, gc: false}
|
|
yield* this.ds.put(n) // TODO: you double-put !!
|
|
}
|
|
} else {
|
|
// cannot extend left
|
|
n = {id: id, len: length, gc: false}
|
|
yield* this.ds.put(n)
|
|
}
|
|
// can extend right?
|
|
var next = yield* this.ds.findNext(n.id)
|
|
if (
|
|
next != null &&
|
|
n.id[0] === next.id[0] &&
|
|
n.id[1] + n.len >= next.id[1]
|
|
) {
|
|
diff = n.id[1] + n.len - next.id[1] // from next.start to n.end
|
|
if (next.gc) {
|
|
if (diff >= 0) {
|
|
n.len -= diff
|
|
if (diff > next.len) {
|
|
// need to create another deletion after $next
|
|
// TODO: (may not be necessary, because this case shouldn't happen!)
|
|
// also this is supposed to return a deletion range. which one to choose? n or the new created deletion?
|
|
throw new Error('This case is not handled (on purpose!)')
|
|
}
|
|
} // else: everything is fine :)
|
|
} else {
|
|
if (diff >= 0) {
|
|
if (diff > next.len) {
|
|
// may be neccessary to extend next.next!
|
|
// TODO: (may not be necessary, because this case shouldn't happen!)
|
|
throw new Error('This case is not handled (on purpose!)')
|
|
}
|
|
n.len += next.len - diff
|
|
yield* this.ds.delete(next.id)
|
|
}
|
|
}
|
|
}
|
|
yield* this.ds.put(n)
|
|
return n
|
|
}
|
|
/*
|
|
Call this method when the client is connected&synced with the
|
|
other clients (e.g. master). This will query the database for
|
|
operations that can be gc'd and add them to the garbage collector.
|
|
*/
|
|
* garbageCollectAfterSync () {
|
|
yield* this.os.iterate(this, null, null, function * (op) {
|
|
if (op.gc) {
|
|
this.store.gc1.push(op.id)
|
|
} else {
|
|
if (op.parent != null) {
|
|
var parentDeleted = yield* this.isDeleted(op.parent)
|
|
if (parentDeleted) {
|
|
op.gc = true
|
|
if (!op.deleted) {
|
|
yield* this.markDeleted(op.id, 1)
|
|
op.deleted = true
|
|
if (op.opContent != null) {
|
|
yield* this.deleteOperation(op.opContent)
|
|
/*
|
|
var opContent = yield* this.getOperation(op.opContent)
|
|
opContent.gc = true
|
|
yield* this.setOperation(opContent)
|
|
this.store.gc1.push(opContent.id)
|
|
*/
|
|
}
|
|
if (op.requires != null) {
|
|
for (var i = 0; i < op.requires.length; i++) {
|
|
yield* this.deleteOperation(op.requires[i])
|
|
}
|
|
}
|
|
}
|
|
yield* this.setOperation(op)
|
|
this.store.gc1.push(op.id)
|
|
return
|
|
}
|
|
}
|
|
if (op.deleted && op.left != null) {
|
|
var left = yield* this.getOperation(op.left)
|
|
this.store.addToGarbageCollector(op, left)
|
|
}
|
|
}
|
|
})
|
|
}
|
|
/*
|
|
Really remove an op and all its effects.
|
|
The complicated case here is the Insert operation:
|
|
* reset left
|
|
* reset right
|
|
* reset parent.start
|
|
* reset parent.end
|
|
* reset origins of all right ops
|
|
*/
|
|
* garbageCollectOperation (id) {
|
|
this.store.addToDebug('yield* this.garbageCollectOperation(', id, ')')
|
|
var o = yield* this.getOperation(id)
|
|
yield* this.markGarbageCollected(id, 1) // always mark gc'd
|
|
// if op exists, then clean that mess up..
|
|
if (o != null) {
|
|
/*
|
|
if (!o.deleted) {
|
|
yield* this.deleteOperation(id)
|
|
o = yield* this.getOperation(id)
|
|
}
|
|
*/
|
|
|
|
var deps = []
|
|
if (o.opContent != null) {
|
|
deps.push(o.opContent)
|
|
}
|
|
if (o.requires != null) {
|
|
deps = deps.concat(o.requires)
|
|
}
|
|
for (var i = 0; i < deps.length; i++) {
|
|
var dep = yield* this.getOperation(deps[i])
|
|
if (dep != null) {
|
|
if (!dep.deleted) {
|
|
yield* this.deleteOperation(dep.id)
|
|
dep = yield* this.getOperation(dep.id)
|
|
}
|
|
dep.gc = true
|
|
yield* this.setOperation(dep)
|
|
this.store.gc1.push(dep.id)
|
|
} else {
|
|
yield* this.markGarbageCollected(deps[i], 1)
|
|
yield* this.updateState(deps[i][0]) // TODO: unneccessary?
|
|
}
|
|
}
|
|
|
|
// remove gc'd op from the left op, if it exists
|
|
if (o.left != null) {
|
|
var left = yield* this.getOperation(o.left)
|
|
left.right = o.right
|
|
yield* this.setOperation(left)
|
|
}
|
|
// remove gc'd op from the right op, if it exists
|
|
// also reset origins of right ops
|
|
if (o.right != null) {
|
|
var right = yield* this.getOperation(o.right)
|
|
right.left = o.left
|
|
|
|
if (o.originOf != null && o.originOf.length > 0) {
|
|
// find new origin of right ops
|
|
// origin is the first left deleted operation
|
|
var neworigin = o.left
|
|
var neworigin_ = null
|
|
while (neworigin != null) {
|
|
neworigin_ = yield* this.getOperation(neworigin)
|
|
if (neworigin_.deleted) {
|
|
break
|
|
}
|
|
neworigin = neworigin_.left
|
|
}
|
|
|
|
// reset origin of all right ops (except first right - duh!),
|
|
|
|
/* ** The following code does not rely on the the originOf property **
|
|
I recently added originOf to all Insert Operations (see Struct.Insert.execute),
|
|
which saves which operations originate in a Insert operation.
|
|
Garbage collecting without originOf is more memory efficient, but is nearly impossible for large texts, or lists!
|
|
But I keep this code for now
|
|
```
|
|
// reset origin of right
|
|
right.origin = neworigin
|
|
// search until you find origin pointer to the left of o
|
|
if (right.right != null) {
|
|
var i = yield* this.getOperation(right.right)
|
|
var ids = [o.id, o.right]
|
|
while (ids.some(function (id) {
|
|
return Y.utils.compareIds(id, i.origin)
|
|
})) {
|
|
if (Y.utils.compareIds(i.origin, o.id)) {
|
|
// reset origin of i
|
|
i.origin = neworigin
|
|
yield* this.setOperation(i)
|
|
}
|
|
// get next i
|
|
if (i.right == null) {
|
|
break
|
|
} else {
|
|
ids.push(i.id)
|
|
i = yield* this.getOperation(i.right)
|
|
}
|
|
}
|
|
}
|
|
```
|
|
*/
|
|
// ** Now the new implementation starts **
|
|
// reset neworigin of all originOf[*]
|
|
for (var _i in o.originOf) {
|
|
var originsIn = yield* this.getOperation(o.originOf[_i])
|
|
if (originsIn != null) {
|
|
originsIn.origin = neworigin
|
|
yield* this.setOperation(originsIn)
|
|
}
|
|
}
|
|
if (neworigin != null) {
|
|
if (neworigin_.originOf == null) {
|
|
neworigin_.originOf = o.originOf
|
|
} else {
|
|
neworigin_.originOf = o.originOf.concat(neworigin_.originOf)
|
|
}
|
|
yield* this.setOperation(neworigin_)
|
|
}
|
|
// we don't need to set right here, because
|
|
// right should be in o.originOf => it is set it the previous for loop
|
|
} else {
|
|
// we didn't need to reset the origin of right
|
|
// so we have to set right here
|
|
yield* this.setOperation(right)
|
|
}
|
|
}
|
|
// o may originate in another operation.
|
|
// Since o is deleted, we have to reset o.origin's `originOf` property
|
|
if (o.origin != null) {
|
|
var origin = yield* this.getOperation(o.origin)
|
|
origin.originOf = origin.originOf.filter(function (_id) {
|
|
return !Y.utils.compareIds(id, _id)
|
|
})
|
|
yield* this.setOperation(origin)
|
|
}
|
|
var parent
|
|
if (o.parent != null){
|
|
parent = yield* this.getOperation(o.parent)
|
|
}
|
|
// remove gc'd op from parent, if it exists
|
|
if (parent != null) {
|
|
var setParent = false // whether to save parent to the os
|
|
if (o.parentSub != null) {
|
|
if (Y.utils.compareIds(parent.map[o.parentSub], o.id)) {
|
|
setParent = true
|
|
parent.map[o.parentSub] = o.right
|
|
}
|
|
} else {
|
|
if (Y.utils.compareIds(parent.start, o.id)) {
|
|
// gc'd op is the start
|
|
setParent = true
|
|
parent.start = o.right
|
|
}
|
|
if (Y.utils.compareIds(parent.end, o.id)) {
|
|
// gc'd op is the end
|
|
setParent = true
|
|
parent.end = o.left
|
|
}
|
|
}
|
|
if (setParent) {
|
|
yield* this.setOperation(parent)
|
|
}
|
|
}
|
|
// finally remove it from the os
|
|
yield* this.removeOperation(o.id)
|
|
}
|
|
}
|
|
* checkDeleteStoreForState (state) {
|
|
var n = yield* this.ds.findWithUpperBound([state.user, state.clock])
|
|
if (n != null && n.id[0] === state.user && n.gc) {
|
|
state.clock = Math.max(state.clock, n.id[1] + n.len)
|
|
}
|
|
}
|
|
* updateState (user) {
|
|
var state = yield* this.getState(user)
|
|
yield* this.checkDeleteStoreForState(state)
|
|
var o = yield* this.getOperation([user, state.clock])
|
|
while (o != null && o.id[1] === state.clock && user === o.id[0]) {
|
|
// either its a new operation (1. case), or it is an operation that was deleted, but is not yet in the OS
|
|
state.clock++
|
|
yield* this.checkDeleteStoreForState(state)
|
|
o = yield* this.os.findNext(o.id)
|
|
}
|
|
yield* this.setState(state)
|
|
}
|
|
/*
|
|
apply a delete set in order to get
|
|
the state of the supplied ds
|
|
*/
|
|
* applyDeleteSet (ds) {
|
|
var deletions = []
|
|
function createDeletions (user, start, len, gc) {
|
|
deletions.push([user, start, len, gc])
|
|
}
|
|
|
|
for (var user in ds) {
|
|
var dv = ds[user]
|
|
var pos = 0
|
|
var d = dv[pos]
|
|
yield* this.ds.iterate(this, [user, 0], [user, Number.MAX_VALUE], function * (n) {
|
|
// cases:
|
|
// 1. d deletes something to the right of n
|
|
// => go to next n (break)
|
|
// 2. d deletes something to the left of n
|
|
// => create deletions
|
|
// => reset d accordingly
|
|
// *)=> if d doesn't delete anything anymore, go to next d (continue)
|
|
// 3. not 2) and d deletes something that also n deletes
|
|
// => reset d so that it doesn't contain n's deletion
|
|
// *)=> if d does not delete anything anymore, go to next d (continue)
|
|
while (d != null) {
|
|
var diff = 0 // describe the diff of length in 1) and 2)
|
|
if (n.id[1] + n.len <= d[0]) {
|
|
// 1)
|
|
break
|
|
} else if (d[0] < n.id[1]) {
|
|
// 2)
|
|
// delete maximum the len of d
|
|
// else delete as much as possible
|
|
diff = Math.min(n.id[1] - d[0], d[1])
|
|
createDeletions(user, d[0], diff, d[2])
|
|
} else {
|
|
// 3)
|
|
diff = n.id[1] + n.len - d[0] // never null (see 1)
|
|
if (d[2] && !n.gc) {
|
|
// d marks as gc'd but n does not
|
|
// then delete either way
|
|
createDeletions(user, d[0], Math.min(diff, d[1]), d[2])
|
|
}
|
|
}
|
|
if (d[1] <= diff) {
|
|
// d doesn't delete anything anymore
|
|
d = dv[++pos]
|
|
} else {
|
|
d[0] = d[0] + diff // reset pos
|
|
d[1] = d[1] - diff // reset length
|
|
}
|
|
}
|
|
})
|
|
// for the rest.. just apply it
|
|
for (; pos < dv.length; pos++) {
|
|
d = dv[pos]
|
|
createDeletions(user, d[0], d[1], d[2])
|
|
}
|
|
}
|
|
for (var i = 0; i < deletions.length; i++) {
|
|
var del = deletions[i]
|
|
// always try to delete..
|
|
var state = yield* this.getState(del[0])
|
|
if (del[1] < state.clock) {
|
|
for (let c = del[1]; c < del[1] + del[2]; c++) {
|
|
var id = [del[0], c]
|
|
var addOperation = yield* this.deleteOperation(id)
|
|
if (addOperation) {
|
|
// TODO:.. really .. here? You could prevent calling all these functions in operationAdded
|
|
yield* this.store.operationAdded(this, {struct: 'Delete', target: id})
|
|
}
|
|
if (del[3]) {
|
|
// gc
|
|
yield* this.garbageCollectOperation(id)
|
|
}
|
|
}
|
|
} else {
|
|
if (del[3]) {
|
|
yield* this.markGarbageCollected([del[0], del[1]], del[2])
|
|
} else {
|
|
yield* this.markDeleted([del[0], del[1]], del[2])
|
|
}
|
|
}
|
|
if (del[3]) {
|
|
// check to increase the state of the respective user
|
|
if (state.clock >= del[1] && state.clock < del[1] + del[2]) {
|
|
state.clock = del[1] + del[2]
|
|
// also check if more expected operations were gc'd
|
|
yield* this.checkDeleteStoreForState(state) // TODO: unneccessary?
|
|
// then set the state
|
|
yield* this.setState(state)
|
|
}
|
|
}
|
|
if (this.store.forwardAppliedOperations) {
|
|
var ops = []
|
|
for (let c = del[1]; c < del[1] + del[2]; c++) {
|
|
ops.push({struct: 'Delete', target: [d[0], c]}) // TODO: implement Delete with deletion length!
|
|
}
|
|
this.store.y.connector.broadcastOps(ops)
|
|
}
|
|
}
|
|
}
|
|
* isGarbageCollected (id) {
|
|
var n = yield* this.ds.findWithUpperBound(id)
|
|
return n != null && n.id[0] === id[0] && id[1] < n.id[1] + n.len && n.gc
|
|
}
|
|
/*
|
|
A DeleteSet (ds) describes all the deleted ops in the OS
|
|
*/
|
|
* getDeleteSet () {
|
|
var ds = {}
|
|
yield* this.ds.iterate(this, null, null, function * (n) {
|
|
var user = n.id[0]
|
|
var counter = n.id[1]
|
|
var len = n.len
|
|
var gc = n.gc
|
|
var dv = ds[user]
|
|
if (dv === void 0) {
|
|
dv = []
|
|
ds[user] = dv
|
|
}
|
|
dv.push([counter, len, gc])
|
|
})
|
|
return ds
|
|
}
|
|
* isDeleted (id) {
|
|
var n = yield* this.ds.findWithUpperBound(id)
|
|
return n != null && n.id[0] === id[0] && id[1] < n.id[1] + n.len
|
|
}
|
|
* setOperation (op) {
|
|
yield* this.os.put(op)
|
|
return op
|
|
}
|
|
* addOperation (op) {
|
|
yield* this.os.put(op)
|
|
if (!this.store.y.connector.isDisconnected() && this.store.forwardAppliedOperations && typeof op.id[1] !== 'string') {
|
|
// is connected, and this is not going to be send in addOperation
|
|
this.store.y.connector.broadcastOps([op])
|
|
}
|
|
}
|
|
* getOperation (id/* :any */)/* :Transaction<any> */ {
|
|
var o = yield* this.os.find(id)
|
|
if (o != null || id[0] !== '_') {
|
|
return o
|
|
} else {
|
|
// generate this operation?
|
|
var comp = id[1].split('_')
|
|
if (comp.length > 1) {
|
|
var struct = comp[0]
|
|
var op = Y.Struct[struct].create(id)
|
|
op.type = comp[1]
|
|
yield* this.setOperation(op)
|
|
return op
|
|
} else {
|
|
// won't be called. but just in case..
|
|
console.error('Unexpected case. How can this happen?')
|
|
debugger // eslint-disable-line
|
|
return null
|
|
}
|
|
return null
|
|
}
|
|
}
|
|
* removeOperation (id) {
|
|
yield* this.os.delete(id)
|
|
}
|
|
* setState (state) {
|
|
var val = {
|
|
id: [state.user],
|
|
clock: state.clock
|
|
}
|
|
yield* this.ss.put(val)
|
|
}
|
|
* getState (user) {
|
|
var n = yield* this.ss.find([user])
|
|
var clock = n == null ? null : n.clock
|
|
if (clock == null) {
|
|
clock = 0
|
|
}
|
|
return {
|
|
user: user,
|
|
clock: clock
|
|
}
|
|
}
|
|
* getStateVector () {
|
|
var stateVector = []
|
|
yield* this.ss.iterate(this, null, null, function * (n) {
|
|
stateVector.push({
|
|
user: n.id[0],
|
|
clock: n.clock
|
|
})
|
|
})
|
|
return stateVector
|
|
}
|
|
* getStateSet () {
|
|
var ss = {}
|
|
yield* this.ss.iterate(this, null, null, function * (n) {
|
|
ss[n.id[0]] = n.clock
|
|
})
|
|
return ss
|
|
}
|
|
/*
|
|
Here, we make all missing operations executable for the receiving user.
|
|
|
|
Notes:
|
|
startSS: denotes to the SV that the remote user sent
|
|
currSS: denotes to the state vector that the user should have if he
|
|
applies all already sent operations (increases is each step)
|
|
|
|
We face several problems:
|
|
* Execute op as is won't work because ops depend on each other
|
|
-> find a way so that they do not anymore
|
|
* When changing left, must not go more to the left than the origin
|
|
* When changing right, you have to consider that other ops may have op
|
|
as their origin, this means that you must not set one of these ops
|
|
as the new right (interdependencies of ops)
|
|
* can't just go to the right until you find the first known operation,
|
|
With currSS
|
|
-> interdependency of ops is a problem
|
|
With startSS
|
|
-> leads to inconsistencies when two users join at the same time.
|
|
Then the position depends on the order of execution -> error!
|
|
|
|
Solution:
|
|
-> re-create originial situation
|
|
-> set op.left = op.origin (which never changes)
|
|
-> set op.right
|
|
to the first operation that is known (according to startSS)
|
|
or to the first operation that has an origin that is not to the
|
|
right of op.
|
|
-> Enforces unique execution order -> happy user
|
|
|
|
Improvements: TODO
|
|
* Could set left to origin, or the first known operation
|
|
(startSS or currSS.. ?)
|
|
-> Could be necessary when I turn GC again.
|
|
-> Is a bad(ish) idea because it requires more computation
|
|
|
|
What we do:
|
|
* Iterate over all missing operations.
|
|
* When there is an operation, where the right op is known, send this op all missing ops to the left to the user
|
|
* I explained above what we have to do with each operation. Here is how we do it efficiently:
|
|
1. Go to the left until you find either op.origin, or a known operation (let o denote current operation in the iteration)
|
|
2. Found a known operation -> set op.left = o, and send it to the user. stop
|
|
3. Found o = op.origin -> set op.left = op.origin, and send it to the user. start again from 1. (set op = o)
|
|
4. Found some o -> set o.right = op, o.left = o.origin, send it to the user, continue
|
|
*/
|
|
* getOperations (startSS) {
|
|
// TODO: use bounds here!
|
|
if (startSS == null) {
|
|
startSS = {}
|
|
}
|
|
var send = []
|
|
|
|
var endSV = yield* this.getStateVector()
|
|
for (var endState of endSV) {
|
|
var user = endState.user
|
|
if (user === '_') {
|
|
continue
|
|
}
|
|
var startPos = startSS[user] || 0
|
|
|
|
yield* this.os.iterate(this, [user, startPos], [user, Number.MAX_VALUE], function * (op) {
|
|
op = Y.Struct[op.struct].encode(op)
|
|
if (op.struct !== 'Insert') {
|
|
send.push(op)
|
|
} else if (op.right == null || op.right[1] < (startSS[op.right[0]] || 0)) {
|
|
// case 1. op.right is known
|
|
var o = op
|
|
// Remember: ?
|
|
// -> set op.right
|
|
// 1. to the first operation that is known (according to startSS)
|
|
// 2. or to the first operation that has an origin that is not to the
|
|
// right of op.
|
|
// For this we maintain a list of ops which origins are not found yet.
|
|
var missing_origins = [op]
|
|
var newright = op.right
|
|
while (true) {
|
|
if (o.left == null) {
|
|
op.left = null
|
|
send.push(op)
|
|
if (!Y.utils.compareIds(o.id, op.id)) {
|
|
o = Y.Struct[op.struct].encode(o)
|
|
o.right = missing_origins[missing_origins.length - 1].id
|
|
send.push(o)
|
|
}
|
|
break
|
|
}
|
|
o = yield* this.getOperation(o.left)
|
|
// we set another o, check if we can reduce $missing_origins
|
|
while (missing_origins.length > 0 && Y.utils.compareIds(missing_origins[missing_origins.length - 1].origin, o.id)) {
|
|
missing_origins.pop()
|
|
}
|
|
if (o.id[1] < (startSS[o.id[0]] || 0)) {
|
|
// case 2. o is known
|
|
op.left = o.id
|
|
send.push(op)
|
|
break
|
|
} else if (Y.utils.compareIds(o.id, op.origin)) {
|
|
// case 3. o is op.origin
|
|
op.left = op.origin
|
|
send.push(op)
|
|
op = Y.Struct[op.struct].encode(o)
|
|
op.right = newright
|
|
if (missing_origins.length > 0) {
|
|
console.log('This should not happen .. :( please report this')
|
|
}
|
|
missing_origins = [op]
|
|
} else {
|
|
// case 4. send o, continue to find op.origin
|
|
var s = Y.Struct[op.struct].encode(o)
|
|
s.right = missing_origins[missing_origins.length - 1].id
|
|
s.left = s.origin
|
|
send.push(s)
|
|
missing_origins.push(o)
|
|
}
|
|
}
|
|
}
|
|
})
|
|
}
|
|
return send.reverse()
|
|
}
|
|
/* this is what we used before.. use this as a reference..
|
|
* makeOperationReady (startSS, op) {
|
|
op = Y.Struct[op.struct].encode(op)
|
|
op = Y.utils.copyObject(op)
|
|
var o = op
|
|
var ids = [op.id]
|
|
// search for the new op.right
|
|
// it is either the first known op (according to startSS)
|
|
// or the o that has no origin to the right of op
|
|
// (this is why we use the ids array)
|
|
while (o.right != null) {
|
|
var right = yield* this.getOperation(o.right)
|
|
if (o.right[1] < (startSS[o.right[0]] || 0) || !ids.some(function (id) {
|
|
return Y.utils.compareIds(id, right.origin)
|
|
})) {
|
|
break
|
|
}
|
|
ids.push(o.right)
|
|
o = right
|
|
}
|
|
op.right = o.right
|
|
op.left = op.origin
|
|
return op
|
|
}
|
|
*/
|
|
* flush () {
|
|
yield* this.os.flush()
|
|
yield* this.ss.flush()
|
|
yield* this.ds.flush()
|
|
}
|
|
}
|
|
Y.Transaction = TransactionInterface
|
|
}
|