making sync more efficient. using new connector features. found lots of small bugs

This commit is contained in:
DadaMonad
2015-01-06 10:29:50 +00:00
parent d2fa906b50
commit d898136f64
28 changed files with 683 additions and 432 deletions

View File

@@ -11,23 +11,38 @@ adaptConnector = (connector, engine, HB, execution_listener)->
connector.broadcast o
execution_listener.push send_
# For the XMPPConnector: lets send it as an array
# therefore, we have to restructure it later
encode_state_vector = (v)->
for name,value of v
user: name
state: value
parse_state_vector = (v)->
state_vector = {}
for s in v
state_vector[s.user] = s.state
state_vector
sendStateVector = ()->
HB.getOperationCounter()
sendHb = (state_vector)->
json = HB._encode(state_vector)
if json.length > 0
json
else
null
applyHb = (hb)->
if hb?
engine.applyOpsCheckDouble hb
else
null
encode_state_vector HB.getOperationCounter()
sendHb = (v)->
state_vector = parse_state_vector v
json =
hb: HB._encode(state_vector)
state_vector: encode_state_vector HB.getOperationCounter()
json
applyHb = (res)->
HB.renewStateVector parse_state_vector res.state_vector
engine.applyOpsCheckDouble res.hb
connector.whenSyncing sendStateVector, sendHb, applyHb
connector.whenReceiving (sender, op)->
if op.uid.creator isnt HB.getUserId()
engine.applyOp op
HB.setInvokeSyncHandler connector.invokeSync
module.exports = adaptConnector

View File

@@ -30,7 +30,7 @@ class Engine
#
# Apply a set of operations. E.g. the operations you received from another users HB._encode().
# @note You must not use this method when you already have ops in your HB!
#
###
applyOpsBundle: (ops_json)->
ops = []
for o in ops_json
@@ -39,6 +39,7 @@ class Engine
if not o.execute()
@unprocessed_ops.push o
@tryUnprocessed()
###
#
# Same as applyOps but operations that are already in the HB are not applied.
@@ -53,22 +54,25 @@ class Engine
# Apply a set of operations. (Helper for using applyOp on Arrays)
# @see Engine.applyOp
applyOps: (ops_json)->
for o in ops_json
@applyOp o
@applyOp ops_json
#
# Apply an operation that you received from another peer.
#
applyOp: (op_json)->
# $parse_and_execute will return false if $o_json was parsed and executed, otherwise the parsed operadion
o = @parseOperation op_json
@HB.addToCounter o
# @HB.addOperation o
if @HB.getOperation(o)?
else if not o.execute()
@unprocessed_ops.push o
window?.unprocessed_counter++ # TODO: del this
window?.unprocessed_types.push o.type
# TODO: make this more efficient!!
# - operations may only executed in order by creator, order them in object of arrays (key by creator)
# - you can probably make something like dependencies (creator1 waits for creator2)
applyOp: (op_json_array)->
if op_json_array.constructor isnt Array
op_json_array = [op_json_array]
for op_json in op_json_array
# $parse_and_execute will return false if $o_json was parsed and executed, otherwise the parsed operadion
o = @parseOperation op_json
# @HB.addOperation o
if @HB.getOperation(o)?
# nop
else if (not @HB.isExpectedOperation(o)) or (not o.execute())
@unprocessed_ops.push o
window?.unprocessed_types.push o.type # TODO: delete this
@tryUnprocessed()
#
@@ -77,18 +81,18 @@ class Engine
#
tryUnprocessed: ()->
while true
window?.unprocessed_exec_counter++ # TODO: del this
old_length = @unprocessed_ops.length
unprocessed = []
for op in @unprocessed_ops
if @HB.getOperation(op)?
else if not op.execute()
# nop
else if (not @HB.isExpectedOperation(op)) or (not op.execute())
unprocessed.push op
@unprocessed_ops = unprocessed
if @unprocessed_ops.length is old_length
break
if @unprocessed_ops.length isnt 0
@HB.invokeSync()
module.exports = Engine

View File

@@ -18,7 +18,7 @@ class HistoryBuffer
@garbage = [] # Will be cleaned on next call of garbageCollector
@trash = [] # Is deleted. Wait until it is not used anymore.
@performGarbageCollection = true
@garbageCollectTimeout = 30000
@garbageCollectTimeout = 2000
@reserved_identifier_counter = 0
setTimeout @emptyGarbage, @garbageCollectTimeout
@@ -96,6 +96,10 @@ class HistoryBuffer
else
@operation_counter[user_id]
isExpectedOperation: (o)->
@operation_counter[o.uid.creator] ?= 0
o.uid.op_number <= @operation_counter[o.uid.creator]
#
# Encode this operation in such a way that it can be parsed by remote peers.
# TODO: Make this more efficient!
@@ -162,14 +166,31 @@ class HistoryBuffer
@buffer[o.uid.creator] = {}
if @buffer[o.uid.creator][o.uid.op_number]?
throw new Error "You must not overwrite operations!"
if (o.uid.op_number.constructor isnt String) and (not @isExpectedOperation(o)) # you already do this in the engine, so delete it here!
throw new Error "this operation was not expected!"
@addToCounter(o)
@buffer[o.uid.creator][o.uid.op_number] = o
@number_of_operations_added_to_HB ?= 0 # TODO: Debug, remove this
@number_of_operations_added_to_HB++
o
removeOperation: (o)->
delete @buffer[o.uid.creator]?[o.uid.op_number]
# When the HB determines inconsistencies, then the invokeSync
# handler wil be called, which should somehow invoke the sync with another collaborator.
# The parameter of the sync handler is the user_id with wich an inconsistency was determined
setInvokeSyncHandler: (f)->
@invokeSync = f
# empty per default # TODO: do i need this?
invokeSync: ()->
# after you received the HB of another user (in the sync process),
# you renew your own state_vector to the state_vector of the other user
renewStateVector: (state_vector)->
for user,state of state_vector
if (not @operation_counter[user]?) or (@operation_counter[user] < state_vector[user])
@operation_counter[user] = state_vector[user]
#
# Increment the operation_counter that defines the current state of the Engine.
#
@@ -177,13 +198,11 @@ class HistoryBuffer
if not @operation_counter[o.uid.creator]?
@operation_counter[o.uid.creator] = 0
if typeof o.uid.op_number is 'number' and o.uid.creator isnt @getUserId()
# TODO: fix this issue better.
# Operations should income in order
# Then you don't have to do this..
# TODO: check if operations are send in order
if o.uid.op_number is @operation_counter[o.uid.creator]
@operation_counter[o.uid.creator]++
while @getOperation({creator:o.uid.creator, op_number: @operation_counter[o.uid.creator]})?
@operation_counter[o.uid.creator]++
else
@invokeSync o.uid.creator
#if @operation_counter[o.uid.creator] isnt (o.uid.op_number + 1)
#console.log (@operation_counter[o.uid.creator] - (o.uid.op_number + 1))

View File

@@ -413,7 +413,7 @@ module.exports = (HB)->
{
'type': "Replaceable"
'content': @content?.getUid()
'ReplaceManager' : @parent.getUid()
'replace_manager' : @parent.getUid()
'prev': @prev_cl.getUid()
'next': @next_cl.getUid()
'uid' : @getUid()
@@ -425,7 +425,7 @@ module.exports = (HB)->
parser["Replaceable"] = (json)->
{
'content' : content
'ReplaceManager' : parent
'replace_manager' : parent
'uid' : uid
'prev': prev
'next': next