/* @flow */ 'use strict' export default function extendDatabase (Y /* :any */) { /* Partial definition of an OperationStore. TODO: name it Database, operation store only holds operations. A database definition must alse define the following methods: * logTable() (optional) - show relevant information information in a table * requestTransaction(makeGen) - request a transaction * destroy() - destroy the database */ class AbstractDatabase { /* :: y: YConfig; forwardAppliedOperations: boolean; listenersById: Object; listenersByIdExecuteNow: Array; listenersByIdRequestPending: boolean; initializedTypes: Object; whenUserIdSetListener: ?Function; waitingTransactions: Array; transactionInProgress: boolean; executeOrder: Array; gc1: Array; gc2: Array; gcTimeout: number; gcInterval: any; garbageCollect: Function; executeOrder: Array; // for debugging only userId: UserId; opClock: number; transactionsFinished: ?{promise: Promise, resolve: any}; transact: (x: ?Generator) => any; */ constructor (y, opts) { this.y = y opts.gc = opts.gc === true this.dbOpts = opts var os = this this.userId = null var resolve_ this.userIdPromise = new Promise(function (resolve) { resolve_ = resolve }) this.userIdPromise.resolve = resolve_ // whether to broadcast all applied operations (insert & delete hook) this.forwardAppliedOperations = false // E.g. this.listenersById[id] : Array this.listenersById = {} // Execute the next time a transaction is requested this.listenersByIdExecuteNow = [] // A transaction is requested this.listenersByIdRequestPending = false /* To make things more clear, the following naming conventions: * ls : we put this.listenersById on ls * l : Array * id : Id (can't use as property name) * sid : String (converted from id via JSON.stringify so we can use it as a property name) Always remember to first overwrite a property before you iterate over it! */ // TODO: Use ES7 Weak Maps. This way types that are no longer user, // wont be kept in memory. this.initializedTypes = {} this.waitingTransactions = [] this.transactionInProgress = false this.transactionIsFlushed = false if (typeof YConcurrencyTestingMode !== 'undefined') { this.executeOrder = [] } this.gc1 = [] // first stage this.gc2 = [] // second stage -> after that, remove the op function garbageCollect () { return os.whenTransactionsFinished().then(function () { if (os.gcTimeout > 0 && (os.gc1.length > 0 || os.gc2.length > 0)) { if (!os.y.connector.isSynced) { console.warn('gc should be empty when not synced!') } return new Promise((resolve) => { os.requestTransaction(function * () { if (os.y.connector != null && os.y.connector.isSynced) { for (var i = 0; i < os.gc2.length; i++) { var oid = os.gc2[i] yield * this.garbageCollectOperation(oid) } os.gc2 = os.gc1 os.gc1 = [] } // TODO: Use setInterval here instead (when garbageCollect is called several times there will be several timeouts..) if (os.gcTimeout > 0) { os.gcInterval = setTimeout(garbageCollect, os.gcTimeout) } resolve() }) }) } else { // TODO: see above if (os.gcTimeout > 0) { os.gcInterval = setTimeout(garbageCollect, os.gcTimeout) } return Promise.resolve() } }) } this.garbageCollect = garbageCollect this.startGarbageCollector() this.repairCheckInterval = !opts.repairCheckInterval ? 6000 : opts.repairCheckInterval this.opsReceivedTimestamp = new Date() this.startRepairCheck() } startGarbageCollector () { this.gc = this.dbOpts.gc if (this.gc) { this.gcTimeout = !this.dbOpts.gcTimeout ? 30000 : this.dbOpts.gcTimeout } else { this.gcTimeout = -1 } if (this.gcTimeout > 0) { this.garbageCollect() } } startRepairCheck () { var os = this if (this.repairCheckInterval > 0) { this.repairCheckIntervalHandler = setInterval(function repairOnMissingOperations () { /* Case 1. No ops have been received in a while (new Date() - os.opsReceivedTimestamp > os.repairCheckInterval) - 1.1 os.listenersById is empty. Then the state was correct the whole time. -> Nothing to do (nor to update) - 1.2 os.listenersById is not empty. * Then the state was incorrect for at least {os.repairCheckInterval} seconds. * -> Remove everything in os.listenersById and sync again (connector.repair()) Case 2. An op has been received in the last {os.repairCheckInterval } seconds. It is not yet necessary to check for faulty behavior. Everything can still resolve itself. Wait for more messages. If nothing was received for a while and os.listenersById is still not emty, we are in case 1.2 -> Do nothing Baseline here is: we really only have to catch case 1.2.. */ if ( new Date() - os.opsReceivedTimestamp > os.repairCheckInterval && Object.keys(os.listenersById).length > 0 // os.listenersById is not empty ) { // haven't received operations for over {os.repairCheckInterval} seconds, resend state vector os.listenersById = {} os.opsReceivedTimestamp = new Date() // update so you don't send repair several times in a row os.y.connector.repair() } }, this.repairCheckInterval) } } stopRepairCheck () { clearInterval(this.repairCheckIntervalHandler) } queueGarbageCollector (id) { if (this.y.connector.isSynced && this.gc) { this.gc1.push(id) } } emptyGarbageCollector () { return new Promise(resolve => { var check = () => { if (this.gc1.length > 0 || this.gc2.length > 0) { this.garbageCollect().then(check) } else { resolve() } } setTimeout(check, 0) }) } addToDebug () { if (typeof YConcurrencyTestingMode !== 'undefined') { var command /* :string */ = Array.prototype.map.call(arguments, function (s) { if (typeof s === 'string') { return s } else { return JSON.stringify(s) } }).join('').replace(/"/g, "'").replace(/,/g, ', ').replace(/:/g, ': ') this.executeOrder.push(command) } } getDebugData () { console.log(this.executeOrder.join('\n')) } stopGarbageCollector () { var self = this this.gc = false this.gcTimeout = -1 return new Promise(function (resolve) { self.requestTransaction(function * () { var ungc /* :Array */ = self.gc1.concat(self.gc2) self.gc1 = [] self.gc2 = [] for (var i = 0; i < ungc.length; i++) { var op = yield * this.getOperation(ungc[i]) if (op != null) { delete op.gc yield * this.setOperation(op) } } resolve() }) }) } /* Try to add to GC. TODO: rename this function Rulez: * Only gc if this user is online & gc turned on * The most left element in a list must not be gc'd. => There is at least one element in the list returns true iff op was added to GC */ * addToGarbageCollector (op, left) { if ( op.gc == null && op.deleted === true && this.store.gc && this.store.y.connector.isSynced ) { var gc = false if (left != null && left.deleted === true) { gc = true } else if (op.content != null && op.content.length > 1) { op = yield * this.getInsertionCleanStart([op.id[0], op.id[1] + 1]) gc = true } if (gc) { op.gc = true yield * this.setOperation(op) this.store.queueGarbageCollector(op.id) return true } } return false } removeFromGarbageCollector (op) { function filter (o) { return !Y.utils.compareIds(o, op.id) } this.gc1 = this.gc1.filter(filter) this.gc2 = this.gc2.filter(filter) delete op.gc } destroyTypes () { for (var key in this.initializedTypes) { var type = this.initializedTypes[key] if (type._destroy != null) { type._destroy() } else { console.error('The type you included does not provide destroy functionality, it will remain in memory (updating your packages will help).') } } } * destroy () { clearTimeout(this.gcInterval) this.gcInterval = null this.stopRepairCheck() } setUserId (userId) { if (!this.userIdPromise.inProgress) { this.userIdPromise.inProgress = true var self = this self.requestTransaction(function * () { self.userId = userId var state = yield * this.getState(userId) self.opClock = state.clock self.userIdPromise.resolve(userId) }) } return this.userIdPromise } whenUserIdSet (f) { this.userIdPromise.then(f) } getNextOpId (numberOfIds) { if (numberOfIds == null) { throw new Error('getNextOpId expects the number of created ids to create!') } else if (this.userId == null) { throw new Error('OperationStore not yet initialized!') } else { var id = [this.userId, this.opClock] this.opClock += numberOfIds return id } } /* Apply a list of operations. * we save a timestamp, because we received new operations that could resolve ops in this.listenersById (see this.startRepairCheck) * get a transaction * check whether all Struct.*.requiredOps are in the OS * check if it is an expected op (otherwise wait for it) * check if was deleted, apply a delete operation after op was applied */ applyOperations (decoder) { this.opsReceivedTimestamp = new Date() let length = decoder.readUint32() for (var i = 0; i < length; i++) { let o = Y.Struct.binaryDecodeOperation(decoder) if (o.id == null || o.id[0] !== this.y.connector.userId) { var required = Y.Struct[o.struct].requiredOps(o) if (o.requires != null) { required = required.concat(o.requires) } this.whenOperationsExist(required, o) } } } /* op is executed as soon as every operation requested is available. Note that Transaction can (and should) buffer requests. */ whenOperationsExist (ids, op) { if (ids.length > 0) { let listener = { op: op, missing: ids.length } for (let i = 0; i < ids.length; i++) { let id = ids[i] let sid = JSON.stringify(id) let l = this.listenersById[sid] if (l == null) { l = [] this.listenersById[sid] = l } l.push(listener) } } else { this.listenersByIdExecuteNow.push({ op: op }) } if (this.listenersByIdRequestPending) { return } this.listenersByIdRequestPending = true var store = this this.requestTransaction(function * () { var exeNow = store.listenersByIdExecuteNow store.listenersByIdExecuteNow = [] var ls = store.listenersById store.listenersById = {} store.listenersByIdRequestPending = false for (let key = 0; key < exeNow.length; key++) { let o = exeNow[key].op yield * store.tryExecute.call(this, o) } for (var sid in ls) { var l = ls[sid] var id = JSON.parse(sid) var op if (typeof id[1] === 'string') { op = yield * this.getOperation(id) } else { op = yield * this.getInsertion(id) } if (op == null) { store.listenersById[sid] = l } else { for (let i = 0; i < l.length; i++) { let listener = l[i] let o = listener.op if (--listener.missing === 0) { yield * store.tryExecute.call(this, o) } } } } }) } /* Actually execute an operation, when all expected operations are available. */ /* :: // TODO: this belongs somehow to transaction store: Object; getOperation: any; isGarbageCollected: any; addOperation: any; whenOperationsExist: any; */ * tryExecute (op) { this.store.addToDebug('yield * this.store.tryExecute.call(this, ', JSON.stringify(op), ')') if (op.struct === 'Delete') { yield * Y.Struct.Delete.execute.call(this, op) // this is now called in Transaction.deleteOperation! // yield * this.store.operationAdded(this, op) } else { // check if this op was defined var defined = yield * this.getInsertion(op.id) while (defined != null && defined.content != null) { // check if this op has a longer content in the case it is defined if (defined.id[1] + defined.content.length < op.id[1] + op.content.length) { var overlapSize = defined.content.length - (op.id[1] - defined.id[1]) op.content.splice(0, overlapSize) op.id = [op.id[0], op.id[1] + overlapSize] op.left = Y.utils.getLastId(defined) op.origin = op.left defined = yield * this.getOperation(op.id) // getOperation suffices here } else { break } } if (defined == null) { var opid = op.id var isGarbageCollected = yield * this.isGarbageCollected(opid) if (!isGarbageCollected) { // TODO: reduce number of get / put calls for op .. yield * Y.Struct[op.struct].execute.call(this, op) yield * this.addOperation(op) yield * this.store.operationAdded(this, op) // operationAdded can change op.. op = yield * this.getOperation(opid) // if insertion, try to combine with left yield * this.tryCombineWithLeft(op) } } } } /* * Called by a transaction when an operation is added. * This function is especially important for y-indexeddb, where several instances may share a single database. * Every time an operation is created by one instance, it is send to all other instances and operationAdded is called * * If it's not a Delete operation: * * Checks if another operation is executable (listenersById) * * Update state, if possible * * Always: * * Call type */ * operationAdded (transaction, op) { if (op.struct === 'Delete') { var type = this.initializedTypes[JSON.stringify(op.targetParent)] if (type != null) { yield * type._changed(transaction, op) } } else { // increase SS yield * transaction.updateState(op.id[0]) var opLen = op.content != null ? op.content.length : 1 for (let i = 0; i < opLen; i++) { // notify whenOperation listeners (by id) var sid = JSON.stringify([op.id[0], op.id[1] + i]) var l = this.listenersById[sid] delete this.listenersById[sid] if (l != null) { for (var key in l) { var listener = l[key] if (--listener.missing === 0) { this.whenOperationsExist([], listener.op) } } } } var t = this.initializedTypes[JSON.stringify(op.parent)] // if parent is deleted, mark as gc'd and return if (op.parent != null) { var parentIsDeleted = yield * transaction.isDeleted(op.parent) if (parentIsDeleted) { yield * transaction.deleteList(op.id) return } } // notify parent, if it was instanciated as a custom type if (t != null) { let o = Y.utils.copyOperation(op) yield * t._changed(transaction, o) } if (!op.deleted) { // Delete if DS says this is actually deleted var len = op.content != null ? op.content.length : 1 var startId = op.id // You must not use op.id in the following loop, because op will change when deleted // TODO: !! console.log('TODO: change this before commiting') for (let i = 0; i < len; i++) { var id = [startId[0], startId[1] + i] var opIsDeleted = yield * transaction.isDeleted(id) if (opIsDeleted) { var delop = { struct: 'Delete', target: id } yield * this.tryExecute.call(transaction, delop) } } } } } whenTransactionsFinished () { if (this.transactionInProgress) { if (this.transactionsFinished == null) { var resolve_ var promise = new Promise(function (resolve) { resolve_ = resolve }) this.transactionsFinished = { resolve: resolve_, promise: promise } } return this.transactionsFinished.promise } else { return Promise.resolve() } } // Check if there is another transaction request. // * the last transaction is always a flush :) getNextRequest () { if (this.waitingTransactions.length === 0) { if (this.transactionIsFlushed) { this.transactionInProgress = false this.transactionIsFlushed = false if (this.transactionsFinished != null) { this.transactionsFinished.resolve() this.transactionsFinished = null } return null } else { this.transactionIsFlushed = true return function * () { yield * this.flush() } } } else { this.transactionIsFlushed = false return this.waitingTransactions.shift() } } requestTransaction (makeGen/* :any */, callImmediately) { this.waitingTransactions.push(makeGen) if (!this.transactionInProgress) { this.transactionInProgress = true setTimeout(() => { this.transact(this.getNextRequest()) }, 0) } } /* Get a created/initialized type. */ getType (id) { return this.initializedTypes[JSON.stringify(id)] } /* Init type. This is called when a remote operation is retrieved, and transformed to a type TODO: delete type from store.initializedTypes[id] when corresponding id was deleted! */ * initType (id, args) { var sid = JSON.stringify(id) var t = this.store.initializedTypes[sid] if (t == null) { var op/* :MapStruct | ListStruct */ = yield * this.getOperation(id) if (op != null) { t = yield * Y[op.type].typeDefinition.initType.call(this, this.store, op, args) this.store.initializedTypes[sid] = t } } return t } /* Create type. This is called when the local user creates a type (which is a synchronous action) */ createType (typedefinition, id) { var structname = typedefinition[0].struct id = id || this.getNextOpId(1) var op = Y.Struct[structname].create(id, typedefinition[1]) op.type = typedefinition[0].name this.requestTransaction(function * () { if (op.id[0] === 0xFFFFFF) { yield * this.setOperation(op) } else { yield * this.applyCreatedOperations([op]) } }) var t = Y[op.type].typeDefinition.createType(this, op, typedefinition[1]) this.initializedTypes[JSON.stringify(op.id)] = t return t } } Y.AbstractDatabase = AbstractDatabase }