yjs/src/Database.js
2015-10-18 03:07:34 +02:00

342 lines
9.6 KiB
JavaScript

/* global Y */
'use strict'
/*
Partial definition of an OperationStore.
TODO: name it Database, operation store only holds operations.
A database definition must alse define the following methods:
* logTable() (optional)
- show relevant information information in a table
* requestTransaction(makeGen)
- request a transaction
* destroy()
- destroy the database
*/
class AbstractDatabase {
constructor (y, opts) {
this.y = y
// E.g. this.listenersById[id] : Array<Listener>
this.listenersById = {}
// Execute the next time a transaction is requested
this.listenersByIdExecuteNow = []
// A transaction is requested
this.listenersByIdRequestPending = false
/* To make things more clear, the following naming conventions:
* ls : we put this.listenersById on ls
* l : Array<Listener>
* id : Id (can't use as property name)
* sid : String (converted from id via JSON.stringify
so we can use it as a property name)
Always remember to first overwrite
a property before you iterate over it!
*/
// TODO: Use ES7 Weak Maps. This way types that are no longer user,
// wont be kept in memory.
this.initializedTypes = {}
this.whenUserIdSetListener = null
this.waitingTransactions = []
this.transactionInProgress = false
if (typeof YConcurrency_TestingMode !== 'undefined') {
this.executeOrder = []
}
this.gc1 = [] // first stage
this.gc2 = [] // second stage -> after that, remove the op
this.gcTimeout = opts.gcTimeout || 5000
var os = this
function garbageCollect () {
return new Promise((resolve) => {
os.requestTransaction(function * () {
if (os.y.connector != null && os.y.connector.isSynced) {
for (var i in os.gc2) {
var oid = os.gc2[i]
yield* this.garbageCollectOperation(oid)
}
os.gc2 = os.gc1
os.gc1 = []
}
if (os.gcTimeout > 0) {
os.gcInterval = setTimeout(garbageCollect, os.gcTimeout)
}
resolve()
})
})
}
this.garbageCollect = garbageCollect
if (this.gcTimeout > 0) {
garbageCollect()
}
}
addToDebug () {
if (typeof YConcurrency_TestingMode !== 'undefined') {
var command = Array.prototype.map.call(arguments, function (s) {
if (typeof s === 'string') {
return s
} else {
return JSON.stringify(s)
}
}).join('').replace(/"/g, "'").replace(/,/g, ', ').replace(/:/g, ': ')
this.executeOrder.push(command)
}
}
getDebugData () {
console.log(this.executeOrder.join('\n'))
}
stopGarbageCollector () {
var self = this
return new Promise(function (resolve) {
self.requestTransaction(function * () {
var ungc = self.gc1.concat(self.gc2)
self.gc1 = []
self.gc2 = []
for (var i in ungc) {
var op = yield* this.getOperation(ungc[i])
delete op.gc
yield* this.setOperation(op)
}
resolve()
})
})
}
/*
Try to add to GC.
TODO: rename this function
Rulez:
* Only gc if this user is online
* The most left element in a list must not be gc'd.
=> There is at least one element in the list
returns true iff op was added to GC
*/
addToGarbageCollector (op, left) {
if (
op.gc == null &&
op.deleted === true &&
this.y.connector.isSynced &&
left != null &&
left.deleted === true
) {
op.gc = true
this.gc1.push(op.id)
return true
} else {
return false
}
}
removeFromGarbageCollector (op) {
function filter (o) {
return !Y.utils.compareIds(o, op.id)
}
this.gc1 = this.gc1.filter(filter)
this.gc2 = this.gc2.filter(filter)
delete op.gc
}
destroy () {
clearInterval(this.gcInterval)
this.gcInterval = null
}
setUserId (userId) {
var self = this
return new Promise(function (resolve) {
self.requestTransaction(function * () {
self.userId = userId
self.opClock = (yield* this.getState(userId)).clock
if (self.whenUserIdSetListener != null) {
self.whenUserIdSetListener()
self.whenUserIdSetListener = null
}
resolve()
})
})
}
whenUserIdSet (f) {
if (this.userId != null) {
f()
} else {
this.whenUserIdSetListener = f
}
}
getNextOpId () {
if (this.userId == null) {
throw new Error('OperationStore not yet initialized!')
}
return [this.userId, this.opClock++]
}
/*
Apply a list of operations.
* get a transaction
* check whether all Struct.*.requiredOps are in the OS
* check if it is an expected op (otherwise wait for it)
* check if was deleted, apply a delete operation after op was applied
*/
apply (ops) {
for (var key in ops) {
var o = ops[key]
var required = Y.Struct[o.struct].requiredOps(o)
this.whenOperationsExist(required, o)
}
}
/*
op is executed as soon as every operation requested is available.
Note that Transaction can (and should) buffer requests.
*/
whenOperationsExist (ids, op) {
if (ids.length > 0) {
let listener = {
op: op,
missing: ids.length
}
for (let key in ids) {
let id = ids[key]
let sid = JSON.stringify(id)
let l = this.listenersById[sid]
if (l == null) {
l = []
this.listenersById[sid] = l
}
l.push(listener)
}
} else {
this.listenersByIdExecuteNow.push({
op: op
})
}
if (this.listenersByIdRequestPending) {
return
}
this.listenersByIdRequestPending = true
var store = this
this.requestTransaction(function * () {
var exeNow = store.listenersByIdExecuteNow
store.listenersByIdExecuteNow = []
var ls = store.listenersById
store.listenersById = {}
store.listenersByIdRequestPending = false
for (let key in exeNow) {
let o = exeNow[key].op
yield* store.tryExecute.call(this, o)
}
for (var sid in ls) {
var l = ls[sid]
var id = JSON.parse(sid)
if ((yield* this.getOperation(id)) == null) {
store.listenersById[sid] = l
} else {
for (let key in l) {
let listener = l[key]
let o = listener.op
if (--listener.missing === 0) {
yield* store.tryExecute.call(this, o)
}
}
}
}
})
}
/*
Actually execute an operation, when all expected operations are available.
*/
* tryExecute (op) {
this.store.addToDebug('yield* this.store.tryExecute.call(this, ', JSON.stringify(op), ')')
if (op.struct === 'Delete') {
yield* Y.Struct.Delete.execute.call(this, op)
yield* this.store.operationAdded(this, op)
} else if ((yield* this.getOperation(op.id)) == null && !(yield* this.isGarbageCollected(op.id))) {
yield* Y.Struct[op.struct].execute.call(this, op)
yield* this.addOperation(op)
yield* this.store.operationAdded(this, op)
}
}
// called by a transaction when an operation is added
* operationAdded (transaction, op) {
if (op.struct === 'Delete') {
var target = yield* transaction.getOperation(op.target)
if (target != null) {
var type = transaction.store.initializedTypes[JSON.stringify(target.parent)]
if (type != null) {
yield* type._changed(transaction, {
struct: 'Delete',
target: op.target
})
}
}
} else {
// increase SS
var o = op
var state = yield* transaction.getState(op.id[0])
while (o != null && o.id[1] === state.clock && op.id[0] === o.id[0]) {
// either its a new operation (1. case), or it is an operation that was deleted, but is not yet in the OS
state.clock++
yield* transaction.checkDeleteStoreForState(state)
o = yield* transaction.os.findNext(o.id)
}
yield* transaction.setState(state)
// notify whenOperation listeners (by id)
var sid = JSON.stringify(op.id)
var l = this.listenersById[sid]
delete this.listenersById[sid]
if (l != null) {
for (var key in l) {
var listener = l[key]
if (--listener.missing === 0) {
this.whenOperationsExist([], listener.op)
}
}
}
var t = this.initializedTypes[JSON.stringify(op.parent)]
// notify parent, if it has been initialized as a custom type
if (t != null) {
yield* t._changed(transaction, Y.utils.copyObject(op))
}
// Delete if DS says this is actually deleted
if (!op.deleted && (yield* transaction.isDeleted(op.id))) {
var delop = {
struct: 'Delete',
target: op.id
}
yield* Y.Struct['Delete'].execute.call(transaction, delop)
if (t != null) {
yield* t._changed(transaction, delop)
}
}
}
}
getNextRequest () {
if (this.waitingTransactions.length === 0) {
this.transactionInProgress = false
return null
} else {
return this.waitingTransactions.shift()
}
}
requestTransaction (makeGen, callImmediately) {
if (callImmediately) {
this.transact(makeGen)
} else if (!this.transactionInProgress) {
this.transactionInProgress = true
var self = this
setTimeout(function () {
self.transact(makeGen)
}, 0)
} else {
this.waitingTransactions.push(makeGen)
}
}
}
Y.AbstractDatabase = AbstractDatabase