605 lines
20 KiB
JavaScript
605 lines
20 KiB
JavaScript
/* @flow */
|
|
'use strict'
|
|
|
|
export default function extendDatabase (Y /* :any */) {
|
|
/*
|
|
Partial definition of an OperationStore.
|
|
TODO: name it Database, operation store only holds operations.
|
|
|
|
A database definition must alse define the following methods:
|
|
* logTable() (optional)
|
|
- show relevant information information in a table
|
|
* requestTransaction(makeGen)
|
|
- request a transaction
|
|
* destroy()
|
|
- destroy the database
|
|
*/
|
|
class AbstractDatabase {
|
|
/* ::
|
|
y: YConfig;
|
|
forwardAppliedOperations: boolean;
|
|
listenersById: Object;
|
|
listenersByIdExecuteNow: Array<Object>;
|
|
listenersByIdRequestPending: boolean;
|
|
initializedTypes: Object;
|
|
whenUserIdSetListener: ?Function;
|
|
waitingTransactions: Array<Transaction>;
|
|
transactionInProgress: boolean;
|
|
executeOrder: Array<Object>;
|
|
gc1: Array<Struct>;
|
|
gc2: Array<Struct>;
|
|
gcTimeout: number;
|
|
gcInterval: any;
|
|
garbageCollect: Function;
|
|
executeOrder: Array<any>; // for debugging only
|
|
userId: UserId;
|
|
opClock: number;
|
|
transactionsFinished: ?{promise: Promise, resolve: any};
|
|
transact: (x: ?Generator) => any;
|
|
*/
|
|
constructor (y, opts) {
|
|
this.y = y
|
|
this.dbOpts = opts
|
|
var os = this
|
|
this.userId = null
|
|
var resolve_
|
|
this.userIdPromise = new Promise(function (resolve) {
|
|
resolve_ = resolve
|
|
})
|
|
this.userIdPromise.resolve = resolve_
|
|
// whether to broadcast all applied operations (insert & delete hook)
|
|
this.forwardAppliedOperations = false
|
|
// E.g. this.listenersById[id] : Array<Listener>
|
|
this.listenersById = {}
|
|
// Execute the next time a transaction is requested
|
|
this.listenersByIdExecuteNow = []
|
|
// A transaction is requested
|
|
this.listenersByIdRequestPending = false
|
|
/* To make things more clear, the following naming conventions:
|
|
* ls : we put this.listenersById on ls
|
|
* l : Array<Listener>
|
|
* id : Id (can't use as property name)
|
|
* sid : String (converted from id via JSON.stringify
|
|
so we can use it as a property name)
|
|
|
|
Always remember to first overwrite
|
|
a property before you iterate over it!
|
|
*/
|
|
// TODO: Use ES7 Weak Maps. This way types that are no longer user,
|
|
// wont be kept in memory.
|
|
this.initializedTypes = {}
|
|
this.waitingTransactions = []
|
|
this.transactionInProgress = false
|
|
this.transactionIsFlushed = false
|
|
if (typeof YConcurrencyTestingMode !== 'undefined') {
|
|
this.executeOrder = []
|
|
}
|
|
this.gc1 = [] // first stage
|
|
this.gc2 = [] // second stage -> after that, remove the op
|
|
|
|
function garbageCollect () {
|
|
return os.whenTransactionsFinished().then(function () {
|
|
if (os.gc1.length > 0 || os.gc2.length > 0) {
|
|
if (!os.y.connector.isSynced) {
|
|
console.warn('gc should be empty when not synced!')
|
|
}
|
|
return new Promise((resolve) => {
|
|
os.requestTransaction(function * () {
|
|
if (os.y.connector != null && os.y.connector.isSynced) {
|
|
for (var i = 0; i < os.gc2.length; i++) {
|
|
var oid = os.gc2[i]
|
|
yield * this.garbageCollectOperation(oid)
|
|
}
|
|
os.gc2 = os.gc1
|
|
os.gc1 = []
|
|
}
|
|
// TODO: Use setInterval here instead (when garbageCollect is called several times there will be several timeouts..)
|
|
if (os.gcTimeout > 0) {
|
|
os.gcInterval = setTimeout(garbageCollect, os.gcTimeout)
|
|
}
|
|
resolve()
|
|
})
|
|
})
|
|
} else {
|
|
// TODO: see above
|
|
if (os.gcTimeout > 0) {
|
|
os.gcInterval = setTimeout(garbageCollect, os.gcTimeout)
|
|
}
|
|
return Promise.resolve()
|
|
}
|
|
})
|
|
}
|
|
this.garbageCollect = garbageCollect
|
|
this.startGarbageCollector()
|
|
|
|
this.repairCheckInterval = !opts.repairCheckInterval ? 6000 : opts.repairCheckInterval
|
|
this.opsReceivedTimestamp = new Date()
|
|
this.startRepairCheck()
|
|
}
|
|
startGarbageCollector () {
|
|
this.gc = this.dbOpts.gc == null || this.dbOpts.gc
|
|
if (this.gc) {
|
|
this.gcTimeout = !this.dbOpts.gcTimeout ? 50000 : this.dbOpts.gcTimeout
|
|
} else {
|
|
this.gcTimeout = -1
|
|
}
|
|
if (this.gcTimeout > 0) {
|
|
this.garbageCollect()
|
|
}
|
|
}
|
|
startRepairCheck () {
|
|
var os = this
|
|
if (this.repairCheckInterval > 0) {
|
|
this.repairCheckIntervalHandler = setInterval(function repairOnMissingOperations () {
|
|
/*
|
|
Case 1. No ops have been received in a while (new Date() - os.opsReceivedTimestamp > os.repairCheckInterval)
|
|
- 1.1 os.listenersById is empty. Then the state was correct the whole time. -> Nothing to do (nor to update)
|
|
- 1.2 os.listenersById is not empty.
|
|
* Then the state was incorrect for at least {os.repairCheckInterval} seconds.
|
|
* -> Remove everything in os.listenersById and sync again (connector.repair())
|
|
Case 2. An op has been received in the last {os.repairCheckInterval } seconds.
|
|
It is not yet necessary to check for faulty behavior. Everything can still resolve itself. Wait for more messages.
|
|
If nothing was received for a while and os.listenersById is still not emty, we are in case 1.2
|
|
-> Do nothing
|
|
|
|
Baseline here is: we really only have to catch case 1.2..
|
|
*/
|
|
if (
|
|
new Date() - os.opsReceivedTimestamp > os.repairCheckInterval &&
|
|
Object.keys(os.listenersById).length > 0 // os.listenersById is not empty
|
|
) {
|
|
// haven't received operations for over {os.repairCheckInterval} seconds, resend state vector
|
|
os.listenersById = {}
|
|
os.opsReceivedTimestamp = new Date() // update so you don't send repair several times in a row
|
|
os.y.connector.repair()
|
|
}
|
|
}, this.repairCheckInterval)
|
|
}
|
|
}
|
|
stopRepairCheck () {
|
|
clearInterval(this.repairCheckIntervalHandler)
|
|
}
|
|
queueGarbageCollector (id) {
|
|
if (this.y.connector.isSynced && this.gc) {
|
|
this.gc1.push(id)
|
|
}
|
|
}
|
|
emptyGarbageCollector () {
|
|
return new Promise(resolve => {
|
|
var check = () => {
|
|
if (this.gc1.length > 0 || this.gc2.length > 0) {
|
|
this.garbageCollect().then(check)
|
|
} else {
|
|
resolve()
|
|
}
|
|
}
|
|
setTimeout(check, 0)
|
|
})
|
|
}
|
|
addToDebug () {
|
|
if (typeof YConcurrencyTestingMode !== 'undefined') {
|
|
var command /* :string */ = Array.prototype.map.call(arguments, function (s) {
|
|
if (typeof s === 'string') {
|
|
return s
|
|
} else {
|
|
return JSON.stringify(s)
|
|
}
|
|
}).join('').replace(/"/g, "'").replace(/,/g, ', ').replace(/:/g, ': ')
|
|
this.executeOrder.push(command)
|
|
}
|
|
}
|
|
getDebugData () {
|
|
console.log(this.executeOrder.join('\n'))
|
|
}
|
|
stopGarbageCollector () {
|
|
var self = this
|
|
this.gc = false
|
|
this.gcTimeout = -1
|
|
return new Promise(function (resolve) {
|
|
self.requestTransaction(function * () {
|
|
var ungc /* :Array<Struct> */ = self.gc1.concat(self.gc2)
|
|
self.gc1 = []
|
|
self.gc2 = []
|
|
for (var i = 0; i < ungc.length; i++) {
|
|
var op = yield * this.getOperation(ungc[i])
|
|
if (op != null) {
|
|
delete op.gc
|
|
yield * this.setOperation(op)
|
|
}
|
|
}
|
|
resolve()
|
|
})
|
|
})
|
|
}
|
|
/*
|
|
Try to add to GC.
|
|
|
|
TODO: rename this function
|
|
|
|
Rulez:
|
|
* Only gc if this user is online & gc turned on
|
|
* The most left element in a list must not be gc'd.
|
|
=> There is at least one element in the list
|
|
|
|
returns true iff op was added to GC
|
|
*/
|
|
* addToGarbageCollector (op, left) {
|
|
if (
|
|
op.gc == null &&
|
|
op.deleted === true &&
|
|
this.store.gc &&
|
|
this.store.y.connector.isSynced
|
|
) {
|
|
var gc = false
|
|
if (left != null && left.deleted === true) {
|
|
gc = true
|
|
} else if (op.content != null && op.content.length > 1) {
|
|
op = yield * this.getInsertionCleanStart([op.id[0], op.id[1] + 1])
|
|
gc = true
|
|
}
|
|
if (gc) {
|
|
op.gc = true
|
|
yield * this.setOperation(op)
|
|
this.store.queueGarbageCollector(op.id)
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
removeFromGarbageCollector (op) {
|
|
function filter (o) {
|
|
return !Y.utils.compareIds(o, op.id)
|
|
}
|
|
this.gc1 = this.gc1.filter(filter)
|
|
this.gc2 = this.gc2.filter(filter)
|
|
delete op.gc
|
|
}
|
|
destroyTypes () {
|
|
for (var key in this.initializedTypes) {
|
|
var type = this.initializedTypes[key]
|
|
if (type._destroy != null) {
|
|
type._destroy()
|
|
} else {
|
|
console.error('The type you included does not provide destroy functionality, it will remain in memory (updating your packages will help).')
|
|
}
|
|
}
|
|
}
|
|
* destroy () {
|
|
clearInterval(this.gcInterval)
|
|
this.gcInterval = null
|
|
this.stopRepairCheck()
|
|
}
|
|
setUserId (userId) {
|
|
if (!this.userIdPromise.inProgress) {
|
|
this.userIdPromise.inProgress = true
|
|
var self = this
|
|
self.requestTransaction(function * () {
|
|
self.userId = userId
|
|
var state = yield * this.getState(userId)
|
|
self.opClock = state.clock
|
|
self.userIdPromise.resolve(userId)
|
|
})
|
|
}
|
|
return this.userIdPromise
|
|
}
|
|
whenUserIdSet (f) {
|
|
this.userIdPromise.then(f)
|
|
}
|
|
getNextOpId (numberOfIds) {
|
|
if (numberOfIds == null) {
|
|
throw new Error('getNextOpId expects the number of created ids to create!')
|
|
} else if (this.userId == null) {
|
|
throw new Error('OperationStore not yet initialized!')
|
|
} else {
|
|
var id = [this.userId, this.opClock]
|
|
this.opClock += numberOfIds
|
|
return id
|
|
}
|
|
}
|
|
/*
|
|
Apply a list of operations.
|
|
|
|
* we save a timestamp, because we received new operations that could resolve ops in this.listenersById (see this.startRepairCheck)
|
|
* get a transaction
|
|
* check whether all Struct.*.requiredOps are in the OS
|
|
* check if it is an expected op (otherwise wait for it)
|
|
* check if was deleted, apply a delete operation after op was applied
|
|
*/
|
|
apply (ops) {
|
|
this.opsReceivedTimestamp = new Date()
|
|
for (var i = 0; i < ops.length; i++) {
|
|
var o = ops[i]
|
|
if (o.id == null || o.id[0] !== this.y.connector.userId) {
|
|
var required = Y.Struct[o.struct].requiredOps(o)
|
|
if (o.requires != null) {
|
|
required = required.concat(o.requires)
|
|
}
|
|
this.whenOperationsExist(required, o)
|
|
}
|
|
}
|
|
}
|
|
/*
|
|
op is executed as soon as every operation requested is available.
|
|
Note that Transaction can (and should) buffer requests.
|
|
*/
|
|
whenOperationsExist (ids, op) {
|
|
if (ids.length > 0) {
|
|
let listener = {
|
|
op: op,
|
|
missing: ids.length
|
|
}
|
|
|
|
for (let i = 0; i < ids.length; i++) {
|
|
let id = ids[i]
|
|
let sid = JSON.stringify(id)
|
|
let l = this.listenersById[sid]
|
|
if (l == null) {
|
|
l = []
|
|
this.listenersById[sid] = l
|
|
}
|
|
l.push(listener)
|
|
}
|
|
} else {
|
|
this.listenersByIdExecuteNow.push({
|
|
op: op
|
|
})
|
|
}
|
|
|
|
if (this.listenersByIdRequestPending) {
|
|
return
|
|
}
|
|
|
|
this.listenersByIdRequestPending = true
|
|
var store = this
|
|
|
|
this.requestTransaction(function * () {
|
|
var exeNow = store.listenersByIdExecuteNow
|
|
store.listenersByIdExecuteNow = []
|
|
|
|
var ls = store.listenersById
|
|
store.listenersById = {}
|
|
|
|
store.listenersByIdRequestPending = false
|
|
|
|
for (let key = 0; key < exeNow.length; key++) {
|
|
let o = exeNow[key].op
|
|
yield * store.tryExecute.call(this, o)
|
|
}
|
|
|
|
for (var sid in ls) {
|
|
var l = ls[sid]
|
|
var id = JSON.parse(sid)
|
|
var op
|
|
if (typeof id[1] === 'string') {
|
|
op = yield * this.getOperation(id)
|
|
} else {
|
|
op = yield * this.getInsertion(id)
|
|
}
|
|
if (op == null) {
|
|
store.listenersById[sid] = l
|
|
} else {
|
|
for (let i = 0; i < l.length; i++) {
|
|
let listener = l[i]
|
|
let o = listener.op
|
|
if (--listener.missing === 0) {
|
|
yield * store.tryExecute.call(this, o)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
})
|
|
}
|
|
/*
|
|
Actually execute an operation, when all expected operations are available.
|
|
*/
|
|
/* :: // TODO: this belongs somehow to transaction
|
|
store: Object;
|
|
getOperation: any;
|
|
isGarbageCollected: any;
|
|
addOperation: any;
|
|
whenOperationsExist: any;
|
|
*/
|
|
* tryExecute (op) {
|
|
this.store.addToDebug('yield* this.store.tryExecute.call(this, ', JSON.stringify(op), ')')
|
|
if (op.struct === 'Delete') {
|
|
yield * Y.Struct.Delete.execute.call(this, op)
|
|
// this is now called in Transaction.deleteOperation!
|
|
// yield* this.store.operationAdded(this, op)
|
|
} else {
|
|
// check if this op was defined
|
|
var defined = yield * this.getInsertion(op.id)
|
|
while (defined != null && defined.content != null) {
|
|
// check if this op has a longer content in the case it is defined
|
|
if (defined.id[1] + defined.content.length < op.id[1] + op.content.length) {
|
|
var overlapSize = defined.content.length - (op.id[1] - defined.id[1])
|
|
op.content.splice(0, overlapSize)
|
|
op.id = [op.id[0], op.id[1] + overlapSize]
|
|
op.left = Y.utils.getLastId(defined)
|
|
op.origin = op.left
|
|
defined = yield * this.getOperation(op.id) // getOperation suffices here
|
|
} else {
|
|
break
|
|
}
|
|
}
|
|
if (defined == null) {
|
|
var opid = op.id
|
|
var isGarbageCollected = yield * this.isGarbageCollected(opid)
|
|
if (!isGarbageCollected) {
|
|
// TODO: reduce number of get / put calls for op ..
|
|
yield * Y.Struct[op.struct].execute.call(this, op)
|
|
yield * this.addOperation(op)
|
|
yield * this.store.operationAdded(this, op)
|
|
// operationAdded can change op..
|
|
op = yield * this.getOperation(opid)
|
|
// if insertion, try to combine with left
|
|
yield * this.tryCombineWithLeft(op)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
/*
|
|
* Called by a transaction when an operation is added.
|
|
* This function is especially important for y-indexeddb, where several instances may share a single database.
|
|
* Every time an operation is created by one instance, it is send to all other instances and operationAdded is called
|
|
*
|
|
* If it's not a Delete operation:
|
|
* * Checks if another operation is executable (listenersById)
|
|
* * Update state, if possible
|
|
*
|
|
* Always:
|
|
* * Call type
|
|
*/
|
|
* operationAdded (transaction, op) {
|
|
if (op.struct === 'Delete') {
|
|
var type = this.initializedTypes[JSON.stringify(op.targetParent)]
|
|
if (type != null) {
|
|
yield * type._changed(transaction, op)
|
|
}
|
|
} else {
|
|
// increase SS
|
|
yield * transaction.updateState(op.id[0])
|
|
var opLen = op.content != null ? op.content.length : 1
|
|
for (let i = 0; i < opLen; i++) {
|
|
// notify whenOperation listeners (by id)
|
|
var sid = JSON.stringify([op.id[0], op.id[1] + i])
|
|
var l = this.listenersById[sid]
|
|
delete this.listenersById[sid]
|
|
if (l != null) {
|
|
for (var key in l) {
|
|
var listener = l[key]
|
|
if (--listener.missing === 0) {
|
|
this.whenOperationsExist([], listener.op)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
var t = this.initializedTypes[JSON.stringify(op.parent)]
|
|
|
|
// if parent is deleted, mark as gc'd and return
|
|
if (op.parent != null) {
|
|
var parentIsDeleted = yield * transaction.isDeleted(op.parent)
|
|
if (parentIsDeleted) {
|
|
yield * transaction.deleteList(op.id)
|
|
return
|
|
}
|
|
}
|
|
|
|
// notify parent, if it was instanciated as a custom type
|
|
if (t != null) {
|
|
let o = Y.utils.copyOperation(op)
|
|
yield * t._changed(transaction, o)
|
|
}
|
|
if (!op.deleted) {
|
|
// Delete if DS says this is actually deleted
|
|
var len = op.content != null ? op.content.length : 1
|
|
var startId = op.id // You must not use op.id in the following loop, because op will change when deleted
|
|
// TODO: !! console.log('TODO: change this before commiting')
|
|
for (let i = 0; i < len; i++) {
|
|
var id = [startId[0], startId[1] + i]
|
|
var opIsDeleted = yield * transaction.isDeleted(id)
|
|
if (opIsDeleted) {
|
|
var delop = {
|
|
struct: 'Delete',
|
|
target: id
|
|
}
|
|
yield * this.tryExecute.call(transaction, delop)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
whenTransactionsFinished () {
|
|
if (this.transactionInProgress) {
|
|
if (this.transactionsFinished == null) {
|
|
var resolve_
|
|
var promise = new Promise(function (resolve) {
|
|
resolve_ = resolve
|
|
})
|
|
this.transactionsFinished = {
|
|
resolve: resolve_,
|
|
promise: promise
|
|
}
|
|
}
|
|
return this.transactionsFinished.promise
|
|
} else {
|
|
return Promise.resolve()
|
|
}
|
|
}
|
|
// Check if there is another transaction request.
|
|
// * the last transaction is always a flush :)
|
|
getNextRequest () {
|
|
if (this.waitingTransactions.length === 0) {
|
|
if (this.transactionIsFlushed) {
|
|
this.transactionInProgress = false
|
|
this.transactionIsFlushed = false
|
|
if (this.transactionsFinished != null) {
|
|
this.transactionsFinished.resolve()
|
|
this.transactionsFinished = null
|
|
}
|
|
return null
|
|
} else {
|
|
this.transactionIsFlushed = true
|
|
return function * () {
|
|
yield * this.flush()
|
|
}
|
|
}
|
|
} else {
|
|
this.transactionIsFlushed = false
|
|
return this.waitingTransactions.shift()
|
|
}
|
|
}
|
|
requestTransaction (makeGen/* :any */, callImmediately) {
|
|
this.waitingTransactions.push(makeGen)
|
|
if (!this.transactionInProgress) {
|
|
this.transactionInProgress = true
|
|
setTimeout(() => {
|
|
this.transact(this.getNextRequest())
|
|
}, 0)
|
|
}
|
|
}
|
|
/*
|
|
Get a created/initialized type.
|
|
*/
|
|
getType (id) {
|
|
return this.initializedTypes[JSON.stringify(id)]
|
|
}
|
|
/*
|
|
Init type. This is called when a remote operation is retrieved, and transformed to a type
|
|
TODO: delete type from store.initializedTypes[id] when corresponding id was deleted!
|
|
*/
|
|
* initType (id, args) {
|
|
var sid = JSON.stringify(id)
|
|
var t = this.store.initializedTypes[sid]
|
|
if (t == null) {
|
|
var op/* :MapStruct | ListStruct */ = yield * this.getOperation(id)
|
|
if (op != null) {
|
|
t = yield * Y[op.type].typeDefinition.initType.call(this, this.store, op, args)
|
|
this.store.initializedTypes[sid] = t
|
|
}
|
|
}
|
|
return t
|
|
}
|
|
/*
|
|
Create type. This is called when the local user creates a type (which is a synchronous action)
|
|
*/
|
|
createType (typedefinition, id) {
|
|
var structname = typedefinition[0].struct
|
|
id = id || this.getNextOpId(1)
|
|
var op = Y.Struct[structname].create(id)
|
|
op.type = typedefinition[0].name
|
|
|
|
this.requestTransaction(function * () {
|
|
if (op.id[0] === '_') {
|
|
yield * this.setOperation(op)
|
|
} else {
|
|
yield * this.applyCreatedOperations([op])
|
|
}
|
|
})
|
|
var t = Y[op.type].typeDefinition.createType(this, op, typedefinition[1])
|
|
this.initializedTypes[JSON.stringify(op.id)] = t
|
|
return t
|
|
}
|
|
}
|
|
Y.AbstractDatabase = AbstractDatabase
|
|
}
|