merged changes on home pc. some improvements on rejoin&sync
This commit is contained in:
		
						commit
						2013266d56
					
				@ -22,6 +22,7 @@ class AbstractConnector {
 | 
			
		||||
    }
 | 
			
		||||
    this.role = opts.role
 | 
			
		||||
    this.connections = {}
 | 
			
		||||
    this.isSynced = false
 | 
			
		||||
    this.userEventListeners = []
 | 
			
		||||
    this.whenSyncedListeners = []
 | 
			
		||||
    this.currentSyncTarget = null
 | 
			
		||||
@ -97,7 +98,7 @@ class AbstractConnector {
 | 
			
		||||
   true otherwise
 | 
			
		||||
  */
 | 
			
		||||
  findNextSyncTarget () {
 | 
			
		||||
    if (this.currentSyncTarget != null) {
 | 
			
		||||
    if (this.currentSyncTarget != null || this.isSynced) {
 | 
			
		||||
      return // "The current sync has not finished!"
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@ -118,19 +119,18 @@ class AbstractConnector {
 | 
			
		||||
          deleteSet: yield* this.getDeleteSet()
 | 
			
		||||
        })
 | 
			
		||||
      })
 | 
			
		||||
    }
 | 
			
		||||
    // This user synced with at least one user, set the state to synced (TODO: does this suffice?)
 | 
			
		||||
    if (!this.isSynced) {
 | 
			
		||||
    } else {
 | 
			
		||||
      this.isSynced = true
 | 
			
		||||
      for (var f of this.whenSyncedListeners) {
 | 
			
		||||
        f()
 | 
			
		||||
      }
 | 
			
		||||
      this.whenSyncedListeners = []
 | 
			
		||||
      this.y.db.garbageCollectAfterSync()
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
  send (uid, message) {
 | 
			
		||||
    if (this.debug) {
 | 
			
		||||
      console.log(`me -> ${uid}: ${message.type}`, m);// eslint-disable-line
 | 
			
		||||
      console.log(`send ${this.userId} -> ${uid}: ${message.type}`, m);// eslint-disable-line
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
  /*
 | 
			
		||||
@ -141,36 +141,26 @@ class AbstractConnector {
 | 
			
		||||
      return
 | 
			
		||||
    }
 | 
			
		||||
    if (this.debug) {
 | 
			
		||||
      console.log(`${sender} -> me: ${m.type}`, m);// eslint-disable-line
 | 
			
		||||
      if (m.os != null && m.os.some(function(o){return o.deleted })){
 | 
			
		||||
        console.log("bullshit.. ")
 | 
			
		||||
        debugger
 | 
			
		||||
      }
 | 
			
		||||
      console.log(`receive ${sender} -> ${this.userId}: ${m.type}`, m);// eslint-disable-line
 | 
			
		||||
    }
 | 
			
		||||
    if (m.type === 'sync step 1') {
 | 
			
		||||
      // TODO: make transaction, stream the ops
 | 
			
		||||
      let conn = this
 | 
			
		||||
      this.y.db.requestTransaction(function *() {
 | 
			
		||||
        var currentStateSet = yield* this.getStateSet()
 | 
			
		||||
        var dels = yield* this.getOpsFromDeleteSet(m.deleteSet)
 | 
			
		||||
        for (var i in dels) {
 | 
			
		||||
          // TODO: no longer get delete ops (just get the ids..)!
 | 
			
		||||
          yield* Y.Struct.Delete.delete.call(this, dels[i].target)
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        var ops = yield* this.getOperations(m.stateSet)
 | 
			
		||||
        conn.send(sender, {
 | 
			
		||||
          type: 'sync step 2',
 | 
			
		||||
          os: ops,
 | 
			
		||||
          stateSet: yield* this.getStateSet(),
 | 
			
		||||
          deleteSet: yield* this.getDeleteSet() // TODO: consider that you have a ds from the other user..
 | 
			
		||||
          stateSet: currentStateSet,
 | 
			
		||||
          deleteSet: yield* this.getDeleteSet()
 | 
			
		||||
        })
 | 
			
		||||
        var dels = yield* this.getOpsFromDeleteSet(m.deleteSet)
 | 
			
		||||
        if (dels.length > 0) {
 | 
			
		||||
          for (var i in dels) {
 | 
			
		||||
            // TODO: no longer get delete ops (just get the ids..)!
 | 
			
		||||
            yield* Y.Struct.Delete.delete.call(this, dels[i].target)
 | 
			
		||||
          }
 | 
			
		||||
          /*/ broadcast missing dels from syncing client
 | 
			
		||||
          this.store.y.connector.broadcast({
 | 
			
		||||
            type: 'update',
 | 
			
		||||
            ops: dels
 | 
			
		||||
          })
 | 
			
		||||
          */
 | 
			
		||||
        }
 | 
			
		||||
        if (this.forwardToSyncingClients) {
 | 
			
		||||
          conn.syncingClients.push(sender)
 | 
			
		||||
          setTimeout(function () {
 | 
			
		||||
@ -192,11 +182,13 @@ class AbstractConnector {
 | 
			
		||||
      let conn = this
 | 
			
		||||
      var broadcastHB = !this.broadcastedHB
 | 
			
		||||
      this.broadcastedHB = true
 | 
			
		||||
      this.y.db.requestTransaction(function *() {
 | 
			
		||||
        var ops = yield* this.getOperations(m.stateSet)
 | 
			
		||||
      this.y.db.requestTransaction(function * () {
 | 
			
		||||
        var dels = yield* this.getOpsFromDeleteSet(m.deleteSet)
 | 
			
		||||
        for (var i in dels) {
 | 
			
		||||
          yield* Y.Struct.Delete.delete.call(this, dels[i].target)
 | 
			
		||||
        }
 | 
			
		||||
        var ops = yield* this.getOperations(m.stateSet)
 | 
			
		||||
        this.store.apply(m.os)
 | 
			
		||||
        this.store.apply(dels)
 | 
			
		||||
        if (ops.length > 0) {
 | 
			
		||||
          m = {
 | 
			
		||||
            type: 'update',
 | 
			
		||||
 | 
			
		||||
@ -71,8 +71,8 @@ g.applyRandomTransactions = async(function * applyRandomTransactions (users, obj
 | 
			
		||||
    var f = getRandom(transactions)
 | 
			
		||||
    f(root)
 | 
			
		||||
  }
 | 
			
		||||
  function applyTransactions () {
 | 
			
		||||
    for (var i = 0; i < numberOfTransactions / 2 + 1; i++) {
 | 
			
		||||
  function applyTransactions (relAmount) {
 | 
			
		||||
    for (var i = 0; i < numberOfTransactions * relAmount + 1; i++) {
 | 
			
		||||
      var r = Math.random()
 | 
			
		||||
      if (r >= 0.9) {
 | 
			
		||||
        // 10% chance to flush
 | 
			
		||||
@ -83,16 +83,17 @@ g.applyRandomTransactions = async(function * applyRandomTransactions (users, obj
 | 
			
		||||
      wait()
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
  applyTransactions()
 | 
			
		||||
  applyTransactions(0.5)
 | 
			
		||||
  yield users[0].connector.flushAll()
 | 
			
		||||
  yield g.garbageCollectAllUsers(users)
 | 
			
		||||
  yield wait()
 | 
			
		||||
  users[0].disconnect()
 | 
			
		||||
  yield wait()
 | 
			
		||||
  applyTransactions()
 | 
			
		||||
  applyTransactions(0.5)
 | 
			
		||||
  yield users[0].connector.flushAll()
 | 
			
		||||
  yield g.garbageCollectAllUsers(users)
 | 
			
		||||
  users[0].reconnect()
 | 
			
		||||
  yield wait(100)
 | 
			
		||||
  users[0].reconnect()
 | 
			
		||||
  yield wait()
 | 
			
		||||
  yield users[0].connector.flushAll()
 | 
			
		||||
})
 | 
			
		||||
 | 
			
		||||
@ -219,14 +220,25 @@ function async (makeGenerator) {
 | 
			
		||||
        return handle(generator.throw(err))
 | 
			
		||||
      })
 | 
			
		||||
    }
 | 
			
		||||
    // this may throw errors here, but its ok since this is used only for debugging
 | 
			
		||||
    return handle(generator.next())
 | 
			
		||||
    /* try {
 | 
			
		||||
    try {
 | 
			
		||||
      return handle(generator.next())
 | 
			
		||||
    } catch (ex) {
 | 
			
		||||
      generator.throw(ex) // TODO: check this out
 | 
			
		||||
      // return Promise.reject(ex)
 | 
			
		||||
    }*/
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
g.async = async
 | 
			
		||||
 | 
			
		||||
function logUsers (self) {
 | 
			
		||||
  if (self.constructor === Array) {
 | 
			
		||||
    self = {users: self}
 | 
			
		||||
  }
 | 
			
		||||
  console.log('User 1: ', self.users[0].connector.userId) // eslint-disable-line
 | 
			
		||||
  self.users[0].db.os.logTable() // eslint-disable-line
 | 
			
		||||
  console.log('User 2: ', self.users[1].connector.userId) // eslint-disable-line
 | 
			
		||||
  self.users[1].db.os.logTable() // eslint-disable-line
 | 
			
		||||
  console.log('User 3: ', self.users[2].connector.userId) // eslint-disable-line
 | 
			
		||||
  self.users[2].db.os.logTable() // eslint-disable-line
 | 
			
		||||
}
 | 
			
		||||
g.logUsers = logUsers
 | 
			
		||||
 | 
			
		||||
@ -148,7 +148,7 @@ class AbstractOperationStore {
 | 
			
		||||
    this.waitingOperations = new Y.utils.RBTree()
 | 
			
		||||
 | 
			
		||||
    this.gc1 = [] // first stage
 | 
			
		||||
    this.gc2 = [] // second stage -> after that, kill it
 | 
			
		||||
    this.gc2 = [] // second stage -> after that, remove the op
 | 
			
		||||
    this.gcTimeout = opts.gcTimeout || 5000
 | 
			
		||||
    var os = this
 | 
			
		||||
    function garbageCollect () {
 | 
			
		||||
@ -197,10 +197,46 @@ class AbstractOperationStore {
 | 
			
		||||
      garbageCollect()
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
  addToGarbageCollector (op) {
 | 
			
		||||
    if (op.gc == null) {
 | 
			
		||||
  garbageCollectAfterSync () {
 | 
			
		||||
    var os = this.os
 | 
			
		||||
    var self = this
 | 
			
		||||
    os.iterate(null, null, function (op) {
 | 
			
		||||
      if (op.deleted && op.left != null && op.right != null) {
 | 
			
		||||
        var left = os.find(op.left)
 | 
			
		||||
        var right = os.find(op.right)
 | 
			
		||||
        self.addToGarbageCollector(op, left, right)
 | 
			
		||||
      }
 | 
			
		||||
    })
 | 
			
		||||
  }
 | 
			
		||||
  /*
 | 
			
		||||
    Try to add to GC.
 | 
			
		||||
 | 
			
		||||
    TODO: rename this function
 | 
			
		||||
 | 
			
		||||
    Only gc when
 | 
			
		||||
       * creator of op is online
 | 
			
		||||
       * left & right defined and both are from the same creator as op
 | 
			
		||||
 | 
			
		||||
    returns true iff op was added to GC
 | 
			
		||||
  */
 | 
			
		||||
  addToGarbageCollector (op, left, right) {
 | 
			
		||||
    if (
 | 
			
		||||
      op.gc == null &&
 | 
			
		||||
      op.deleted === true &&
 | 
			
		||||
      this.y.connector.isSynced &&
 | 
			
		||||
      // (this.y.connector.connections[op.id[0]] != null || op.id[0] === this.y.connector.userId) &&
 | 
			
		||||
      left != null &&
 | 
			
		||||
      right != null &&
 | 
			
		||||
      left.deleted &&
 | 
			
		||||
      right.deleted &&
 | 
			
		||||
      left.id[0] === op.id[0] &&
 | 
			
		||||
      right.id[0] === op.id[0]
 | 
			
		||||
    ) {
 | 
			
		||||
      op.gc = true
 | 
			
		||||
      this.gc1.push(op.id)
 | 
			
		||||
      return true
 | 
			
		||||
    } else {
 | 
			
		||||
      return false
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
  removeFromGarbageCollector (op) {
 | 
			
		||||
 | 
			
		||||
@ -175,7 +175,7 @@ Y.Memory = (function () {
 | 
			
		||||
      return stateVector
 | 
			
		||||
    }
 | 
			
		||||
    * getStateSet () {
 | 
			
		||||
      return this.ss
 | 
			
		||||
      return Y.utils.copyObject(this.ss)
 | 
			
		||||
    }
 | 
			
		||||
    * getOperations (startSS) {
 | 
			
		||||
      // TODO: use bounds here!
 | 
			
		||||
 | 
			
		||||
@ -46,34 +46,51 @@ var Struct = {
 | 
			
		||||
    */
 | 
			
		||||
    delete: function * (targetId) {
 | 
			
		||||
      var target = yield* this.getOperation(targetId)
 | 
			
		||||
      if (target != null && !target.deleted) {
 | 
			
		||||
        target.deleted = true
 | 
			
		||||
        if (target.left != null && (yield* this.getOperation(target.left)).deleted) {
 | 
			
		||||
          // left is defined & the left op is already deleted.
 | 
			
		||||
          // => Then this may get gc'd
 | 
			
		||||
          this.store.addToGarbageCollector(target)
 | 
			
		||||
        }
 | 
			
		||||
        if (target.right != null) {
 | 
			
		||||
          var right = yield* this.getOperation(target.right)
 | 
			
		||||
          if (right.deleted && right.gc == null) {
 | 
			
		||||
            this.store.addToGarbageCollector(right)
 | 
			
		||||
            yield* this.setOperation(right)
 | 
			
		||||
          }
 | 
			
		||||
        }
 | 
			
		||||
        yield* this.setOperation(target)
 | 
			
		||||
        var t = this.store.initializedTypes[JSON.stringify(target.parent)]
 | 
			
		||||
        if (t != null) {
 | 
			
		||||
          yield* t._changed(this, {
 | 
			
		||||
            struct: 'Delete',
 | 
			
		||||
            target: targetId
 | 
			
		||||
          })
 | 
			
		||||
 | 
			
		||||
      if (target == null || !target.deleted) {
 | 
			
		||||
        this.ds.delete(targetId)
 | 
			
		||||
        var state = yield* this.getState(targetId[0])
 | 
			
		||||
        if (state.clock === targetId[1]) {
 | 
			
		||||
          yield* this.checkDeleteStoreForState(state)
 | 
			
		||||
          yield* this.setState(state)
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
      this.ds.delete(targetId)
 | 
			
		||||
      var state = yield* this.getState(targetId[0])
 | 
			
		||||
      if (state.clock === targetId[1]) {
 | 
			
		||||
        yield* this.checkDeleteStoreForState(state)
 | 
			
		||||
        yield* this.setState(state)
 | 
			
		||||
 | 
			
		||||
      if (target != null && target.gc == null) {
 | 
			
		||||
        if (!target.deleted) {
 | 
			
		||||
          // set deleted & notify type
 | 
			
		||||
          target.deleted = true
 | 
			
		||||
          var type = this.store.initializedTypes[JSON.stringify(target.parent)]
 | 
			
		||||
          if (type != null) {
 | 
			
		||||
            yield* type._changed(this, {
 | 
			
		||||
              struct: 'Delete',
 | 
			
		||||
              target: targetId
 | 
			
		||||
            })
 | 
			
		||||
          }
 | 
			
		||||
        }
 | 
			
		||||
        var left = target.left != null ? yield* this.getOperation(target.left) : null
 | 
			
		||||
        var right = target.right != null ? yield* this.getOperation(target.right) : null
 | 
			
		||||
 | 
			
		||||
        this.store.addToGarbageCollector(target, left, right)
 | 
			
		||||
 | 
			
		||||
        // set here because it was deleted and/or gc'd
 | 
			
		||||
        yield* this.setOperation(target)
 | 
			
		||||
 | 
			
		||||
        if (
 | 
			
		||||
          left != null &&
 | 
			
		||||
          left.left != null &&
 | 
			
		||||
          this.store.addToGarbageCollector(left, yield* this.getOperation(left.left), target)
 | 
			
		||||
        ) {
 | 
			
		||||
          yield* this.setOperation(left)
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        if (
 | 
			
		||||
          right != null &&
 | 
			
		||||
          right.right != null &&
 | 
			
		||||
          this.store.addToGarbageCollector(right, target, yield* this.getOperation(right.right))
 | 
			
		||||
        ) {
 | 
			
		||||
          yield* this.setOperation(right)
 | 
			
		||||
        }
 | 
			
		||||
      }
 | 
			
		||||
    },
 | 
			
		||||
    execute: function * (op) {
 | 
			
		||||
@ -215,6 +232,12 @@ var Struct = {
 | 
			
		||||
        left = yield* this.getOperation(op.left)
 | 
			
		||||
        op.right = left.right
 | 
			
		||||
        left.right = op.id
 | 
			
		||||
 | 
			
		||||
        // if left exists, and it is supposed to be gc'd. Remove it from the gc
 | 
			
		||||
        if (left.gc != null) {
 | 
			
		||||
          this.store.removeFromGarbageCollector(left)
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        yield* this.setOperation(left)
 | 
			
		||||
      } else {
 | 
			
		||||
        op.right = op.parentSub ? parent.map[op.parentSub] || null : parent.start
 | 
			
		||||
@ -228,7 +251,6 @@ var Struct = {
 | 
			
		||||
        if (right.gc != null) {
 | 
			
		||||
          this.store.removeFromGarbageCollector(right)
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        yield* this.setOperation(right)
 | 
			
		||||
      }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@ -1,17 +1,17 @@
 | 
			
		||||
/* global createUsers, wait, Y, compareAllUsers, getRandomNumber, applyRandomTransactions, async, garbageCollectAllUsers, describeManyTimes */
 | 
			
		||||
/* eslint-env browser,jasmine */
 | 
			
		||||
 | 
			
		||||
var numberOfYArrayTests = 100
 | 
			
		||||
var repeatArrayTests = 1
 | 
			
		||||
var numberOfYArrayTests = 50
 | 
			
		||||
var repeatArrayTests = 300
 | 
			
		||||
 | 
			
		||||
describe('Array Type', function () {
 | 
			
		||||
  var y1, y2, y3, yconfig1, yconfig2, yconfig3, flushAll
 | 
			
		||||
 | 
			
		||||
  beforeEach(async(function * (done) {
 | 
			
		||||
    yield createUsers(this, 2)
 | 
			
		||||
    yield createUsers(this, 3)
 | 
			
		||||
    y1 = (yconfig1 = this.users[0]).root
 | 
			
		||||
    y2 = (yconfig2 = this.users[1]).root
 | 
			
		||||
    // y3 = (yconfig3 = this.users[2]).root
 | 
			
		||||
    y3 = (yconfig3 = this.users[2]).root
 | 
			
		||||
    flushAll = this.users[0].connector.flushAll
 | 
			
		||||
    yield wait(10)
 | 
			
		||||
    done()
 | 
			
		||||
@ -244,7 +244,9 @@ describe('Array Type', function () {
 | 
			
		||||
      }
 | 
			
		||||
      yield applyRandomTransactions(this.users, this.arrays, randomArrayTransactions, numberOfYArrayTests)
 | 
			
		||||
      yield flushAll()
 | 
			
		||||
      yield garbageCollectAllUsers(this.users)
 | 
			
		||||
      yield compareArrayValues(this.arrays)
 | 
			
		||||
      yield compareAllUsers(this.users)
 | 
			
		||||
      done()
 | 
			
		||||
    }))
 | 
			
		||||
  })
 | 
			
		||||
 | 
			
		||||
@ -1,7 +1,7 @@
 | 
			
		||||
/* global createUsers, Y, compareAllUsers, getRandomNumber, applyRandomTransactions, async, describeManyTimes */
 | 
			
		||||
/* eslint-env browser,jasmine */
 | 
			
		||||
 | 
			
		||||
var numberOfYMapTests = 150
 | 
			
		||||
var numberOfYMapTests = 70
 | 
			
		||||
var repeatMapTeasts = 1
 | 
			
		||||
 | 
			
		||||
describe('Map Type', function () {
 | 
			
		||||
 | 
			
		||||
							
								
								
									
										3
									
								
								src/y.js
									
									
									
									
									
								
							
							
						
						
									
										3
									
								
								src/y.js
									
									
									
									
									
								
							@ -36,12 +36,15 @@ class YConfig {
 | 
			
		||||
    this.connector.disconnect()
 | 
			
		||||
  }
 | 
			
		||||
  reconnect () {
 | 
			
		||||
    this.connector.reconnect()
 | 
			
		||||
    /* TODO: maybe do this..
 | 
			
		||||
    Promise.all([
 | 
			
		||||
      this.db.garbageCollect(),
 | 
			
		||||
      this.db.garbageCollect()
 | 
			
		||||
    ]).then(() => {
 | 
			
		||||
      this.connector.reconnect()
 | 
			
		||||
    })
 | 
			
		||||
    */
 | 
			
		||||
  }
 | 
			
		||||
  destroy () {
 | 
			
		||||
    this.connector.disconnect()
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user