checking out new gc approach
This commit is contained in:
parent
5e4c56af29
commit
183f30878e
@ -22,6 +22,7 @@ class AbstractConnector {
|
|||||||
}
|
}
|
||||||
this.role = opts.role
|
this.role = opts.role
|
||||||
this.connections = {}
|
this.connections = {}
|
||||||
|
this.isSynced = false
|
||||||
this.userEventListeners = []
|
this.userEventListeners = []
|
||||||
this.whenSyncedListeners = []
|
this.whenSyncedListeners = []
|
||||||
this.currentSyncTarget = null
|
this.currentSyncTarget = null
|
||||||
@ -97,7 +98,7 @@ class AbstractConnector {
|
|||||||
true otherwise
|
true otherwise
|
||||||
*/
|
*/
|
||||||
findNextSyncTarget () {
|
findNextSyncTarget () {
|
||||||
if (this.currentSyncTarget != null) {
|
if (this.currentSyncTarget != null || this.isSynced) {
|
||||||
return // "The current sync has not finished!"
|
return // "The current sync has not finished!"
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -118,9 +119,7 @@ class AbstractConnector {
|
|||||||
deleteSet: yield* this.getDeleteSet()
|
deleteSet: yield* this.getDeleteSet()
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
}
|
} else {
|
||||||
// This user synced with at least one user, set the state to synced (TODO: does this suffice?)
|
|
||||||
if (!this.isSynced) {
|
|
||||||
this.isSynced = true
|
this.isSynced = true
|
||||||
for (var f of this.whenSyncedListeners) {
|
for (var f of this.whenSyncedListeners) {
|
||||||
f()
|
f()
|
||||||
@ -141,26 +140,31 @@ class AbstractConnector {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
if (this.debug) {
|
if (this.debug) {
|
||||||
console.log(`${sender} -> me: ${m.type}`, m);// eslint-disable-line
|
console.log(`${sender} -> ${this.userId}: ${m.type}`, m);// eslint-disable-line
|
||||||
}
|
}
|
||||||
if (m.type === 'sync step 1') {
|
if (m.type === 'sync step 1') {
|
||||||
// TODO: make transaction, stream the ops
|
// TODO: make transaction, stream the ops
|
||||||
let conn = this
|
let conn = this
|
||||||
this.y.db.requestTransaction(function *() {
|
this.y.db.requestTransaction(function *() {
|
||||||
var ops = yield* this.getOperations(m.stateSet)
|
var currentStateSet = yield* this.getStateSet()
|
||||||
var dels = yield* this.getOpsFromDeleteSet(m.deleteSet)
|
var dels = yield* this.getOpsFromDeleteSet(m.deleteSet)
|
||||||
|
for (var d in dels) {
|
||||||
|
yield* Y.Struct.Delete.delete.call(this, dels[d].target)
|
||||||
|
}
|
||||||
if (dels.length > 0) {
|
if (dels.length > 0) {
|
||||||
this.store.apply(dels)
|
|
||||||
// broadcast missing dels from syncing client
|
// broadcast missing dels from syncing client
|
||||||
|
/* TODO: solve this better?
|
||||||
this.store.y.connector.broadcast({
|
this.store.y.connector.broadcast({
|
||||||
type: 'update',
|
type: 'update',
|
||||||
ops: dels
|
ops: dels
|
||||||
})
|
})
|
||||||
|
*/
|
||||||
}
|
}
|
||||||
|
var ops = yield* this.getOperations(m.stateSet)
|
||||||
conn.send(sender, {
|
conn.send(sender, {
|
||||||
type: 'sync step 2',
|
type: 'sync step 2',
|
||||||
os: ops,
|
os: ops,
|
||||||
stateSet: yield* this.getStateSet(),
|
stateSet: currentStateSet,
|
||||||
deleteSet: yield* this.getDeleteSet()
|
deleteSet: yield* this.getDeleteSet()
|
||||||
})
|
})
|
||||||
if (this.forwardToSyncingClients) {
|
if (this.forwardToSyncingClients) {
|
||||||
@ -185,16 +189,18 @@ class AbstractConnector {
|
|||||||
var broadcastHB = !this.broadcastedHB
|
var broadcastHB = !this.broadcastedHB
|
||||||
this.broadcastedHB = true
|
this.broadcastedHB = true
|
||||||
this.y.db.requestTransaction(function *() {
|
this.y.db.requestTransaction(function *() {
|
||||||
var ops = yield* this.getOperations(m.stateSet)
|
|
||||||
var dels = yield* this.getOpsFromDeleteSet(m.deleteSet)
|
var dels = yield* this.getOpsFromDeleteSet(m.deleteSet)
|
||||||
this.store.apply(dels)
|
for (var d in dels) {
|
||||||
|
yield* Y.Struct.Delete.delete.call(this, dels[d].target)
|
||||||
|
}
|
||||||
|
var ops = yield* this.getOperations(m.stateSet)
|
||||||
this.store.apply(m.os)
|
this.store.apply(m.os)
|
||||||
if (ops.length > 0) {
|
if (ops.length > 0) {
|
||||||
m = {
|
m = {
|
||||||
type: 'update',
|
type: 'update',
|
||||||
ops: ops
|
ops: ops
|
||||||
}
|
}
|
||||||
if (!broadcastHB) {
|
if (!broadcastHB || true) { // TODO: no broadcast?
|
||||||
conn.send(sender, m)
|
conn.send(sender, m)
|
||||||
} else {
|
} else {
|
||||||
// broadcast only once!
|
// broadcast only once!
|
||||||
|
@ -71,8 +71,8 @@ g.applyRandomTransactions = async(function * applyRandomTransactions (users, obj
|
|||||||
var f = getRandom(transactions)
|
var f = getRandom(transactions)
|
||||||
f(root)
|
f(root)
|
||||||
}
|
}
|
||||||
function applyTransactions () {
|
function applyTransactions (relAmount) {
|
||||||
for (var i = 0; i < numberOfTransactions / 2 + 1; i++) {
|
for (var i = 0; i < numberOfTransactions * relAmount + 1; i++) {
|
||||||
var r = Math.random()
|
var r = Math.random()
|
||||||
if (r >= 0.9) {
|
if (r >= 0.9) {
|
||||||
// 10% chance to flush
|
// 10% chance to flush
|
||||||
@ -83,16 +83,16 @@ g.applyRandomTransactions = async(function * applyRandomTransactions (users, obj
|
|||||||
wait()
|
wait()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
applyTransactions()
|
applyTransactions(0.5)
|
||||||
applyTransactions()
|
yield users[0].connector.flushAll()
|
||||||
/* TODO: call applyTransactions here..
|
|
||||||
yield users[0].connector.flushAll()
|
yield users[0].connector.flushAll()
|
||||||
users[0].disconnect()
|
users[0].disconnect()
|
||||||
yield wait()
|
yield wait()
|
||||||
applyTransactions()
|
applyTransactions(0.5)
|
||||||
yield users[0].connector.flushAll()
|
yield users[0].connector.flushAll()
|
||||||
|
// TODO: gc here????
|
||||||
|
yield wait(100)
|
||||||
users[0].reconnect()
|
users[0].reconnect()
|
||||||
*/
|
|
||||||
yield wait()
|
yield wait()
|
||||||
yield users[0].connector.flushAll()
|
yield users[0].connector.flushAll()
|
||||||
})
|
})
|
||||||
@ -104,7 +104,7 @@ g.garbageCollectAllUsers = async(function * garbageCollectAllUsers (users) {
|
|||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
g.compareAllUsers = async(function * compareAllUsers (users) { //eslint-disable-line
|
g.compareAllUsers = async(function * compareAllUsers (users) {
|
||||||
var s1, s2 // state sets
|
var s1, s2 // state sets
|
||||||
var ds1, ds2 // delete sets
|
var ds1, ds2 // delete sets
|
||||||
var allDels1, allDels2 // all deletions
|
var allDels1, allDels2 // all deletions
|
||||||
@ -224,14 +224,12 @@ function async (makeGenerator) {
|
|||||||
return handle(generator.throw(err))
|
return handle(generator.throw(err))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
// this may throw errors here, but its ok since this is used only for debugging
|
try {
|
||||||
return handle(generator.next())
|
|
||||||
/* try {
|
|
||||||
return handle(generator.next())
|
return handle(generator.next())
|
||||||
} catch (ex) {
|
} catch (ex) {
|
||||||
generator.throw(ex) // TODO: check this out
|
generator.throw(ex) // TODO: check this out
|
||||||
// return Promise.reject(ex)
|
// return Promise.reject(ex)
|
||||||
}*/
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
g.async = async
|
g.async = async
|
||||||
|
@ -122,7 +122,7 @@ Y.AbstractTransaction = AbstractTransaction
|
|||||||
* destroy()
|
* destroy()
|
||||||
- destroy the database
|
- destroy the database
|
||||||
*/
|
*/
|
||||||
class AbstractOperationStore { // eslint-disable-line no-unused-vars
|
class AbstractOperationStore {
|
||||||
constructor (y, opts) {
|
constructor (y, opts) {
|
||||||
this.y = y
|
this.y = y
|
||||||
// E.g. this.listenersById[id] : Array<Listener>
|
// E.g. this.listenersById[id] : Array<Listener>
|
||||||
@ -148,7 +148,7 @@ class AbstractOperationStore { // eslint-disable-line no-unused-vars
|
|||||||
this.waitingOperations = new Y.utils.RBTree()
|
this.waitingOperations = new Y.utils.RBTree()
|
||||||
|
|
||||||
this.gc1 = [] // first stage
|
this.gc1 = [] // first stage
|
||||||
this.gc2 = [] // second stage -> after that, kill it
|
this.gc2 = [] // second stage -> after that, remove the op
|
||||||
this.gcTimeout = opts.gcTimeout || 5000
|
this.gcTimeout = opts.gcTimeout || 5000
|
||||||
var os = this
|
var os = this
|
||||||
function garbageCollect () {
|
function garbageCollect () {
|
||||||
@ -197,10 +197,35 @@ class AbstractOperationStore { // eslint-disable-line no-unused-vars
|
|||||||
garbageCollect()
|
garbageCollect()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
addToGarbageCollector (op) {
|
/*
|
||||||
if (op.gc == null) {
|
Try to add to GC.
|
||||||
|
|
||||||
|
TODO: rename this function
|
||||||
|
|
||||||
|
Only gc when
|
||||||
|
* creator of op is online
|
||||||
|
* left & right defined and both are from the same creator as op
|
||||||
|
|
||||||
|
returns true iff op was added to GC
|
||||||
|
*/
|
||||||
|
addToGarbageCollector (op, left, right) {
|
||||||
|
if (
|
||||||
|
op.gc == null &&
|
||||||
|
op.deleted === true &&
|
||||||
|
this.y.connector.isSynced &&
|
||||||
|
(this.y.connector.connections[op.id[0]] != null || op.id[0] === this.y.connector.userId) &&
|
||||||
|
left != null &&
|
||||||
|
right != null &&
|
||||||
|
left.deleted &&
|
||||||
|
right.deleted &&
|
||||||
|
left.id[0] === op.id[0] &&
|
||||||
|
right.id[0] === op.id[0]
|
||||||
|
) {
|
||||||
op.gc = true
|
op.gc = true
|
||||||
this.gc1.push(op.id)
|
this.gc1.push(op.id)
|
||||||
|
return true
|
||||||
|
} else {
|
||||||
|
return false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
removeFromGarbageCollector (op) {
|
removeFromGarbageCollector (op) {
|
||||||
|
@ -175,7 +175,7 @@ Y.Memory = (function () {
|
|||||||
return stateVector
|
return stateVector
|
||||||
}
|
}
|
||||||
* getStateSet () {
|
* getStateSet () {
|
||||||
return this.ss
|
return Y.utils.copyObject(this.ss)
|
||||||
}
|
}
|
||||||
* getOperations (startSS) {
|
* getOperations (startSS) {
|
||||||
// TODO: use bounds here!
|
// TODO: use bounds here!
|
||||||
@ -237,7 +237,7 @@ Y.Memory = (function () {
|
|||||||
return op
|
return op
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
class OperationStore extends Y.AbstractOperationStore { // eslint-disable-line no-undef
|
class OperationStore extends Y.AbstractOperationStore {
|
||||||
constructor (y, opts) {
|
constructor (y, opts) {
|
||||||
super(y, opts)
|
super(y, opts)
|
||||||
this.os = new Y.utils.RBTree()
|
this.os = new Y.utils.RBTree()
|
||||||
|
@ -46,35 +46,52 @@ var Struct = {
|
|||||||
*/
|
*/
|
||||||
delete: function * (targetId) {
|
delete: function * (targetId) {
|
||||||
var target = yield* this.getOperation(targetId)
|
var target = yield* this.getOperation(targetId)
|
||||||
if (target != null && !target.deleted) {
|
|
||||||
target.deleted = true
|
if (target == null || !target.deleted) {
|
||||||
if (target.left != null && (yield* this.getOperation(target.left)).deleted) {
|
|
||||||
// left is defined & the left op is already deleted.
|
|
||||||
// => Then this may get gc'd
|
|
||||||
this.store.addToGarbageCollector(target)
|
|
||||||
}
|
|
||||||
if (target.right != null) {
|
|
||||||
var right = yield* this.getOperation(target.right)
|
|
||||||
if (right.deleted && right.gc == null) {
|
|
||||||
this.store.addToGarbageCollector(right)
|
|
||||||
yield* this.setOperation(right)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
yield* this.setOperation(target)
|
|
||||||
var t = this.store.initializedTypes[JSON.stringify(target.parent)]
|
|
||||||
if (t != null) {
|
|
||||||
yield* t._changed(this, {
|
|
||||||
struct: 'Delete',
|
|
||||||
target: targetId
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
this.ds.delete(targetId)
|
this.ds.delete(targetId)
|
||||||
var state = yield* this.getState(targetId[0])
|
var state = yield* this.getState(targetId[0])
|
||||||
if (state.clock === targetId[1]) {
|
if (state.clock === targetId[1]) {
|
||||||
yield* this.checkDeleteStoreForState(state)
|
yield* this.checkDeleteStoreForState(state)
|
||||||
yield* this.setState(state)
|
yield* this.setState(state)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (target != null && target.gc == null) {
|
||||||
|
if (!target.deleted) {
|
||||||
|
// set deleted & notify type
|
||||||
|
target.deleted = true
|
||||||
|
var type = this.store.initializedTypes[JSON.stringify(target.parent)]
|
||||||
|
if (type != null) {
|
||||||
|
yield* type._changed(this, {
|
||||||
|
struct: 'Delete',
|
||||||
|
target: targetId
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
var left = target.left != null ? yield* this.getOperation(target.left) : null
|
||||||
|
var right = target.right != null ? yield* this.getOperation(target.right) : null
|
||||||
|
|
||||||
|
this.store.addToGarbageCollector(target, left, right)
|
||||||
|
|
||||||
|
// set here because it was deleted and/or gc'd
|
||||||
|
yield* this.setOperation(target)
|
||||||
|
|
||||||
|
if (
|
||||||
|
left != null &&
|
||||||
|
left.left != null &&
|
||||||
|
this.store.addToGarbageCollector(left, yield* this.getOperation(left.left), target)
|
||||||
|
) {
|
||||||
|
yield* this.setOperation(left)
|
||||||
|
}
|
||||||
|
|
||||||
|
if (
|
||||||
|
right != null &&
|
||||||
|
right.right != null &&
|
||||||
|
this.store.addToGarbageCollector(right, target, yield* this.getOperation(right.right))
|
||||||
|
) {
|
||||||
|
yield* this.setOperation(right)
|
||||||
|
}
|
||||||
|
}
|
||||||
},
|
},
|
||||||
execute: function * (op) {
|
execute: function * (op) {
|
||||||
yield* Struct.Delete.delete.call(this, op.target)
|
yield* Struct.Delete.delete.call(this, op.target)
|
||||||
@ -215,6 +232,12 @@ var Struct = {
|
|||||||
left = yield* this.getOperation(op.left)
|
left = yield* this.getOperation(op.left)
|
||||||
op.right = left.right
|
op.right = left.right
|
||||||
left.right = op.id
|
left.right = op.id
|
||||||
|
|
||||||
|
// if left exists, and it is supposed to be gc'd. Remove it from the gc
|
||||||
|
if (left.gc != null) {
|
||||||
|
this.store.removeFromGarbageCollector(left)
|
||||||
|
}
|
||||||
|
|
||||||
yield* this.setOperation(left)
|
yield* this.setOperation(left)
|
||||||
} else {
|
} else {
|
||||||
op.right = op.parentSub ? parent.map[op.parentSub] || null : parent.start
|
op.right = op.parentSub ? parent.map[op.parentSub] || null : parent.start
|
||||||
@ -228,7 +251,6 @@ var Struct = {
|
|||||||
if (right.gc != null) {
|
if (right.gc != null) {
|
||||||
this.store.removeFromGarbageCollector(right)
|
this.store.removeFromGarbageCollector(right)
|
||||||
}
|
}
|
||||||
|
|
||||||
yield* this.setOperation(right)
|
yield* this.setOperation(right)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2,7 +2,7 @@
|
|||||||
/* eslint-env browser,jasmine */
|
/* eslint-env browser,jasmine */
|
||||||
|
|
||||||
var numberOfYArrayTests = 100
|
var numberOfYArrayTests = 100
|
||||||
var repeatArrayTests = 1
|
var repeatArrayTests = 3
|
||||||
|
|
||||||
describe('Array Type', function () {
|
describe('Array Type', function () {
|
||||||
var y1, y2, y3, yconfig1, yconfig2, yconfig3, flushAll
|
var y1, y2, y3, yconfig1, yconfig2, yconfig3, flushAll
|
||||||
@ -245,6 +245,8 @@ describe('Array Type', function () {
|
|||||||
yield applyRandomTransactions(this.users, this.arrays, randomArrayTransactions, numberOfYArrayTests)
|
yield applyRandomTransactions(this.users, this.arrays, randomArrayTransactions, numberOfYArrayTests)
|
||||||
yield flushAll()
|
yield flushAll()
|
||||||
yield compareArrayValues(this.arrays)
|
yield compareArrayValues(this.arrays)
|
||||||
|
// yield compareAllUsers(this.users)
|
||||||
|
|
||||||
done()
|
done()
|
||||||
}))
|
}))
|
||||||
})
|
})
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
/* global createUsers, Y, compareAllUsers, getRandomNumber, applyRandomTransactions, async, describeManyTimes */
|
/* global createUsers, Y, compareAllUsers, getRandomNumber, applyRandomTransactions, async, describeManyTimes */
|
||||||
/* eslint-env browser,jasmine */
|
/* eslint-env browser,jasmine */
|
||||||
|
|
||||||
var numberOfYMapTests = 150
|
var numberOfYMapTests = 70
|
||||||
var repeatMapTeasts = 1
|
var repeatMapTeasts = 1
|
||||||
|
|
||||||
describe('Map Type', function () {
|
describe('Map Type', function () {
|
||||||
|
3
src/y.js
3
src/y.js
@ -36,12 +36,15 @@ class YConfig {
|
|||||||
this.connector.disconnect()
|
this.connector.disconnect()
|
||||||
}
|
}
|
||||||
reconnect () {
|
reconnect () {
|
||||||
|
this.connector.reconnect()
|
||||||
|
/* TODO: maybe do this..
|
||||||
Promise.all([
|
Promise.all([
|
||||||
this.db.garbageCollect(),
|
this.db.garbageCollect(),
|
||||||
this.db.garbageCollect()
|
this.db.garbageCollect()
|
||||||
]).then(() => {
|
]).then(() => {
|
||||||
this.connector.reconnect()
|
this.connector.reconnect()
|
||||||
})
|
})
|
||||||
|
*/
|
||||||
}
|
}
|
||||||
destroy () {
|
destroy () {
|
||||||
this.connector.disconnect()
|
this.connector.disconnect()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user