fixed doSync bug, fixed connection problems, improved p2p sync method - still

there are some cases that may lead to inconsistencies. Currently, only the master-slave method is a reliable sync method
This commit is contained in:
DadaMonad 2015-02-05 10:46:40 +00:00
parent 58a479be9b
commit 3eb933400a
21 changed files with 631 additions and 330 deletions

View File

@ -0,0 +1,7 @@
<polymer-element name="y-object" hidden attributes="val connector y">
</polymer-element>
<polymer-element name="y-property" hidden attributes="val name y">
</polymer-element>
<script src="./y-object.js"></script>

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -8,8 +8,9 @@ adaptConnector = function(connector, engine, HB, execution_listener) {
f = ConnectorClass[name]; f = ConnectorClass[name];
connector[name] = f; connector[name] = f;
} }
connector.setIsBoundToY();
send_ = function(o) { send_ = function(o) {
if (o.uid.creator === HB.getUserId() && (typeof o.uid.op_number !== "string")) { if ((o.uid.creator === HB.getUserId()) && (typeof o.uid.op_number !== "string") && (o.uid.doSync === "true" || o.uid.doSync === true) && (HB.getUserId() !== "_temp")) {
return connector.broadcast(o); return connector.broadcast(o);
} }
}; };
@ -42,32 +43,27 @@ adaptConnector = function(connector, engine, HB, execution_listener) {
return encode_state_vector(HB.getOperationCounter()); return encode_state_vector(HB.getOperationCounter());
}; };
getHB = function(v) { getHB = function(v) {
var hb, json, o, state_vector, _i, _len; var hb, json, state_vector;
state_vector = parse_state_vector(v); state_vector = parse_state_vector(v);
hb = HB._encode(state_vector); hb = HB._encode(state_vector);
for (_i = 0, _len = hb.length; _i < _len; _i++) {
o = hb[_i];
o.fromHB = "true";
}
json = { json = {
hb: hb, hb: hb,
state_vector: encode_state_vector(HB.getOperationCounter()) state_vector: encode_state_vector(HB.getOperationCounter())
}; };
return json; return json;
}; };
applyHB = function(hb) { applyHB = function(hb, fromHB) {
return engine.applyOp(hb); return engine.applyOp(hb, fromHB);
}; };
connector.getStateVector = getStateVector; connector.getStateVector = getStateVector;
connector.getHB = getHB; connector.getHB = getHB;
connector.applyHB = applyHB; connector.applyHB = applyHB;
connector.receive_handlers = []; connector.receive_handlers = [];
connector.receive_handlers.push(function(sender, op) { return connector.receive_handlers.push(function(sender, op) {
if (op.uid.creator !== HB.getUserId()) { if (op.uid.creator !== HB.getUserId()) {
return engine.applyOp(op); return engine.applyOp(op);
} }
}); });
return connector.setIsBoundToY();
}; };
module.exports = adaptConnector; module.exports = adaptConnector;

View File

@ -24,11 +24,11 @@ module.exports = {
this.syncMode = "syncAll"; this.syncMode = "syncAll";
} }
this.is_synced = false; this.is_synced = false;
this.is_syncing = false;
this.connections = {}; this.connections = {};
this.is_bound_to_y = false; this.is_bound_to_y = false;
this.connections = {}; this.connections = {};
return this.current_sync_target = null; this.current_sync_target = null;
return this.sent_hb_to_all_users = false;
}, },
isRoleMaster: function() { isRoleMaster: function() {
return this.role === "master"; return this.role === "master";
@ -49,6 +49,9 @@ module.exports = {
} }
} }
} }
if (this.current_sync_target == null) {
this.setStateSynced();
}
return null; return null;
}, },
userLeft: function(user) { userLeft: function(user) {
@ -57,7 +60,7 @@ module.exports = {
}, },
userJoined: function(user, role) { userJoined: function(user, role) {
if (role == null) { if (role == null) {
throw new Error("Internal: You must specify the role of the joined user!"); throw new Error("Internal: You must specify the role of the joined user! E.g. userJoined('uid:3939','slave')");
} }
this.connections[user] = { this.connections[user] = {
is_synced: false is_synced: false
@ -101,43 +104,62 @@ module.exports = {
throw new Error "You must implement send!" throw new Error "You must implement send!"
*/ */
performSync: function(user) { performSync: function(user) {
var hb, o, _hb, _i, _len;
if (this.current_sync_target == null) { if (this.current_sync_target == null) {
this.current_sync_target = user; this.current_sync_target = user;
return this.send(user, {
sync_step: "getHB",
data: this.getStateVector()
});
}
},
performSyncWithMaster: function(user) {
var hb, o, _hb, _i, _len;
if (!this.is_syncing) {
this.current_sync_target = user;
this.is_syncing = true;
this.send(user, { this.send(user, {
sync_step: "getHB", sync_step: "getHB",
send_again: "true", send_again: "true",
data: [] data: []
}); });
hb = this.getHB([]).hb; if (!this.sent_hb_to_all_users) {
_hb = []; this.sent_hb_to_all_users = true;
for (_i = 0, _len = hb.length; _i < _len; _i++) { hb = this.getHB([]).hb;
o = hb[_i]; _hb = [];
_hb.push(o); for (_i = 0, _len = hb.length; _i < _len; _i++) {
if (_hb.length > 30) { o = hb[_i];
this.broadcast({ _hb.push(o);
sync_step: "applyHB_", if (_hb.length > 30) {
data: _hb this.broadcast({
}); sync_step: "applyHB_",
_hb = []; data: _hb
});
_hb = [];
}
} }
return this.broadcast({
sync_step: "applyHB",
data: _hb
});
} }
return this.broadcast({
sync_step: "applyHB",
data: _hb
});
} }
}, },
performSyncWithMaster: function(user) {
var hb, o, _hb, _i, _len;
this.current_sync_target = user;
this.send(user, {
sync_step: "getHB",
send_again: "true",
data: []
});
hb = this.getHB([]).hb;
_hb = [];
for (_i = 0, _len = hb.length; _i < _len; _i++) {
o = hb[_i];
_hb.push(o);
if (_hb.length > 30) {
this.broadcast({
sync_step: "applyHB_",
data: _hb
});
_hb = [];
}
}
return this.broadcast({
sync_step: "applyHB",
data: _hb
});
},
setStateSynced: function() { setStateSynced: function() {
var f, _i, _len, _ref; var f, _i, _len, _ref;
if (!this.is_synced) { if (!this.is_synced) {
@ -152,7 +174,7 @@ module.exports = {
} }
}, },
receiveMessage: function(sender, res) { receiveMessage: function(sender, res) {
var data, f, hb, o, send_again, _hb, _i, _j, _len, _len1, _ref, _results; var data, f, hb, o, sendApplyHB, send_again, _hb, _i, _j, _len, _len1, _ref, _results;
if (res.sync_step == null) { if (res.sync_step == null) {
_ref = this.receive_handlers; _ref = this.receive_handlers;
_results = []; _results = [];
@ -162,22 +184,38 @@ module.exports = {
} }
return _results; return _results;
} else { } else {
if (sender === this.user_id) {
return;
}
if (res.sync_step === "getHB") { if (res.sync_step === "getHB") {
data = this.getHB(res.data); data = this.getHB(res.data);
hb = data.hb; hb = data.hb;
_hb = []; _hb = [];
if (this.is_synced) {
sendApplyHB = (function(_this) {
return function(m) {
return _this.send(sender, m);
};
})(this);
} else {
sendApplyHB = (function(_this) {
return function(m) {
return _this.broadcast(m);
};
})(this);
}
for (_j = 0, _len1 = hb.length; _j < _len1; _j++) { for (_j = 0, _len1 = hb.length; _j < _len1; _j++) {
o = hb[_j]; o = hb[_j];
_hb.push(o); _hb.push(o);
if (_hb.length > 30) { if (_hb.length > 30) {
this.send(sender, { sendApplyHB({
sync_step: "applyHB_", sync_step: "applyHB_",
data: _hb data: _hb
}); });
_hb = []; _hb = [];
} }
} }
this.send(sender, { sendApplyHB({
sync_step: "applyHB", sync_step: "applyHB",
data: _hb data: _hb
}); });
@ -197,14 +235,13 @@ module.exports = {
return setTimeout(send_again, 3000); return setTimeout(send_again, 3000);
} }
} else if (res.sync_step === "applyHB") { } else if (res.sync_step === "applyHB") {
this.applyHB(res.data); this.applyHB(res.data, sender === this.current_sync_target);
if ((this.syncMode === "syncAll" || (res.sent_again != null)) && !this.is_synced) { if ((this.syncMode === "syncAll" || (res.sent_again != null)) && (!this.is_synced) && (this.current_sync_target === sender)) {
this.setStateSynced();
this.connections[sender].is_synced = true; this.connections[sender].is_synced = true;
return this.findNewSyncTarget(); return this.findNewSyncTarget();
} }
} else if (res.sync_step === "applyHB_") { } else if (res.sync_step === "applyHB_") {
return this.applyHB(res.data); return this.applyHB(res.data, sender === this.current_sync_target);
} }
} }
}, },

View File

@ -59,13 +59,19 @@ Engine = (function() {
return this.applyOp(ops_json); return this.applyOp(ops_json);
}; };
Engine.prototype.applyOp = function(op_json_array) { Engine.prototype.applyOp = function(op_json_array, fromHB) {
var o, op_json, _i, _len; var o, op_json, _i, _len;
if (fromHB == null) {
fromHB = false;
}
if (op_json_array.constructor !== Array) { if (op_json_array.constructor !== Array) {
op_json_array = [op_json_array]; op_json_array = [op_json_array];
} }
for (_i = 0, _len = op_json_array.length; _i < _len; _i++) { for (_i = 0, _len = op_json_array.length; _i < _len; _i++) {
op_json = op_json_array[_i]; op_json = op_json_array[_i];
if (fromHB) {
op_json.fromHB = "true";
}
o = this.parseOperation(op_json); o = this.parseOperation(op_json);
if (op_json.fromHB != null) { if (op_json.fromHB != null) {
o.fromHB = op_json.fromHB; o.fromHB = op_json.fromHB;

View File

@ -22,7 +22,12 @@ HistoryBuffer = (function() {
if (own != null) { if (own != null) {
for (o_name in own) { for (o_name in own) {
o = own[o_name]; o = own[o_name];
o.uid.creator = id; if (o.uid.creator != null) {
o.uid.creator = id;
}
if (o.uid.alt != null) {
o.uid.alt.creator = id;
}
} }
if (this.buffer[id] != null) { if (this.buffer[id] != null) {
throw new Error("You are re-assigning an old user id - this is not (yet) possible!"); throw new Error("You are re-assigning an old user id - this is not (yet) possible!");
@ -140,7 +145,7 @@ HistoryBuffer = (function() {
user = _ref[u_name]; user = _ref[u_name];
for (o_number in user) { for (o_number in user) {
o = user[o_number]; o = user[o_number];
if (o.uid.doSync && unknown(u_name, o_number)) { if ((o.uid.noOperation == null) && o.uid.doSync && unknown(u_name, o_number)) {
o_json = o._encode(); o_json = o._encode();
if (o.next_cl != null) { if (o.next_cl != null) {
o_next = o.next_cl; o_next = o.next_cl;

View File

@ -88,10 +88,18 @@ module.exports = function(HB) {
}; };
Operation.prototype.getUid = function() { Operation.prototype.getUid = function() {
var map_uid;
if (this.uid.noOperation == null) { if (this.uid.noOperation == null) {
return this.uid; return this.uid;
} else { } else {
return this.uid.alt; if (this.uid.alt != null) {
map_uid = this.uid.alt.cloneUid();
map_uid.sub = this.uid.sub;
map_uid.doSync = false;
return map_uid;
} else {
return void 0;
}
} }
}; };

View File

@ -66,17 +66,16 @@ module.exports = function(HB) {
}; };
MapManager.prototype.retrieveSub = function(property_name) { MapManager.prototype.retrieveSub = function(property_name) {
var event_properties, event_this, map_uid, rm, rm_uid; var event_properties, event_this, rm, rm_uid;
if (this.map[property_name] == null) { if (this.map[property_name] == null) {
event_properties = { event_properties = {
name: property_name name: property_name
}; };
event_this = this; event_this = this;
map_uid = this.cloneUid();
map_uid.sub = property_name;
rm_uid = { rm_uid = {
noOperation: true, noOperation: true,
alt: map_uid sub: property_name,
alt: this
}; };
rm = new types.ReplaceManager(event_properties, event_this, rm_uid); rm = new types.ReplaceManager(event_properties, event_this, rm_uid);
this.map[property_name] = rm; this.map[property_name] = rm;

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@ -1,11 +1,11 @@
<link rel="import" href="../../y-object.html"> <link rel="import" href="../../build/browser/y-object.html">
<link rel="import" href="../../../y-xmpp/build/browser/y-xmpp.html"> <link rel="import" href="../../../y-xmpp/build/browser/y-xmpp.html">
<link rel="import" href="../../../paper-slider/paper-slider.html"> <link rel="import" href="../../../paper-slider/paper-slider.html">
<polymer-element name="y-test" attributes="y connector stuff"> <polymer-element name="y-test" attributes="y connector stuff">
<template> <template>
<h1 id="text" contentEditable> Check this out !</h1> <h1 id="text" contentEditable> Check this out !</h1>
<y-xmpp id="connector" connector={{connector}} room="testy-xmpp-polymer" syncMode="syncAll"></y-xmpp> <y-xmpp id="connector" connector={{connector}} room="testy-xmpp-polymer" syncMode="syncAll" debug="true"></y-xmpp>
<y-object connector={{connector}} val={{y}}> <y-object connector={{connector}} val={{y}}>
<y-property name="slider" val={{slider}}> <y-property name="slider" val={{slider}}>
</y-property> </y-property>

View File

@ -10,8 +10,13 @@ adaptConnector = (connector, engine, HB, execution_listener)->
for name, f of ConnectorClass for name, f of ConnectorClass
connector[name] = f connector[name] = f
connector.setIsBoundToY()
send_ = (o)-> send_ = (o)->
if o.uid.creator is HB.getUserId() and (typeof o.uid.op_number isnt "string") if (o.uid.creator is HB.getUserId()) and
(typeof o.uid.op_number isnt "string") and # TODO: i don't think that we need this anymore..
(o.uid.doSync is "true" or o.uid.doSync is true) and # TODO: ensure, that only true is valid
(HB.getUserId() isnt "_temp")
connector.broadcast o connector.broadcast o
if connector.invokeSync? if connector.invokeSync?
@ -36,15 +41,13 @@ adaptConnector = (connector, engine, HB, execution_listener)->
getHB = (v)-> getHB = (v)->
state_vector = parse_state_vector v state_vector = parse_state_vector v
hb = HB._encode state_vector hb = HB._encode state_vector
for o in hb
o.fromHB = "true" # execute immediately
json = json =
hb: hb hb: hb
state_vector: encode_state_vector HB.getOperationCounter() state_vector: encode_state_vector HB.getOperationCounter()
json json
applyHB = (hb)-> applyHB = (hb, fromHB)->
engine.applyOp hb engine.applyOp hb, fromHB
connector.getStateVector = getStateVector connector.getStateVector = getStateVector
connector.getHB = getHB connector.getHB = getHB
@ -55,6 +58,5 @@ adaptConnector = (connector, engine, HB, execution_listener)->
if op.uid.creator isnt HB.getUserId() if op.uid.creator isnt HB.getUserId()
engine.applyOp op engine.applyOp op
connector.setIsBoundToY()
module.exports = adaptConnector module.exports = adaptConnector

View File

@ -27,8 +27,6 @@ module.exports =
# is set to true when this is synced with all other connections # is set to true when this is synced with all other connections
@is_synced = false @is_synced = false
# true, iff the client is currently syncing
@is_syncing = false
# Peerjs Connections: key: conn-id, value: object # Peerjs Connections: key: conn-id, value: object
@connections = {} @connections = {}
# List of functions that shall process incoming data # List of functions that shall process incoming data
@ -38,6 +36,7 @@ module.exports =
@is_bound_to_y = false @is_bound_to_y = false
@connections = {} @connections = {}
@current_sync_target = null @current_sync_target = null
@sent_hb_to_all_users = false
isRoleMaster: -> isRoleMaster: ->
@role is "master" @role is "master"
@ -52,6 +51,8 @@ module.exports =
if not c.is_synced if not c.is_synced
@performSync user @performSync user
break break
if not @current_sync_target?
@setStateSynced()
null null
userLeft: (user)-> userLeft: (user)->
@ -60,7 +61,7 @@ module.exports =
userJoined: (user, role)-> userJoined: (user, role)->
if not role? if not role?
throw new Error "Internal: You must specify the role of the joined user!" throw new Error "Internal: You must specify the role of the joined user! E.g. userJoined('uid:3939','slave')"
# a user joined the room # a user joined the room
@connections[user] = @connections[user] =
is_synced : false is_synced : false
@ -115,32 +116,49 @@ module.exports =
@current_sync_target = user @current_sync_target = user
@send user, @send user,
sync_step: "getHB" sync_step: "getHB"
data: @getStateVector() send_again: "true"
data: [] # @getStateVector()
if not @sent_hb_to_all_users
@sent_hb_to_all_users = true
hb = @getHB([]).hb
_hb = []
for o in hb
_hb.push o
if _hb.length > 30
@broadcast
sync_step: "applyHB_"
data: _hb
_hb = []
@broadcast
sync_step: "applyHB"
data: _hb
# #
# When a master node joined the room, perform this sync with him. It will ask the master for the HB, # When a master node joined the room, perform this sync with him. It will ask the master for the HB,
# and will broadcast his own HB # and will broadcast his own HB
# #
performSyncWithMaster: (user)-> performSyncWithMaster: (user)->
if not @is_syncing @current_sync_target = user
@current_sync_target = user @send user,
@is_syncing = true sync_step: "getHB"
@send user, send_again: "true"
sync_step: "getHB" data: []
send_again: "true" hb = @getHB([]).hb
data: [] _hb = []
hb = @getHB([]).hb for o in hb
_hb = [] _hb.push o
for o in hb if _hb.length > 30
_hb.push o @broadcast
if _hb.length > 30 sync_step: "applyHB_"
@broadcast data: _hb
sync_step: "applyHB_" _hb = []
data: _hb @broadcast
_hb = [] sync_step: "applyHB"
@broadcast data: _hb
sync_step: "applyHB"
data: _hb
# #
# You are sure that all clients are synced, call this function. # You are sure that all clients are synced, call this function.
# #
@ -160,24 +178,35 @@ module.exports =
for f in @receive_handlers for f in @receive_handlers
f sender, res f sender, res
else else
if sender is @user_id
return
if res.sync_step is "getHB" if res.sync_step is "getHB"
data = @getHB(res.data) data = @getHB(res.data)
hb = data.hb hb = data.hb
_hb = [] _hb = []
# always broadcast, when not synced.
# This reduces errors, when the clients goes offline prematurely.
# When this client only syncs to one other clients, but looses connectors,
# before syncing to the other clients, the online clients have different states.
# Since we do not want to perform regular syncs, this is a good alternative
if @is_synced if @is_synced
sendApplyHB = ()-> sendApplyHB = (m)=>
@send sender, m
else
sendApplyHB = (m)=>
@broadcast m
for o in hb for o in hb
_hb.push o _hb.push o
if _hb.length > 30 if _hb.length > 30
@send sender, sendApplyHB
sync_step: "applyHB_" sync_step: "applyHB_"
data: _hb data: _hb
_hb = [] _hb = []
if @is_synced
@send sender, sendApplyHB
sync_s tep: "applyHB" sync_step : "applyHB"
data: _hb data: _hb
if res.send_again? if res.send_again?
send_again = do (sv = data.state_vector)=> send_again = do (sv = data.state_vector)=>
@ -189,15 +218,14 @@ module.exports =
sent_again: "true" sent_again: "true"
setTimeout send_again, 3000 setTimeout send_again, 3000
else if res.sync_step is "applyHB" else if res.sync_step is "applyHB"
@applyHB(res.data) @applyHB(res.data, sender is @current_sync_target)
if (@syncMode is "syncAll" or res.sent_again?) and not @is_synced if (@syncMode is "syncAll" or res.sent_again?) and (not @is_synced) and (@current_sync_target is sender)
@setStateSynced()
@connections[sender].is_synced = true @connections[sender].is_synced = true
@findNewSyncTarget() @findNewSyncTarget()
else if res.sync_step is "applyHB_" else if res.sync_step is "applyHB_"
@applyHB(res.data) @applyHB(res.data, sender is @current_sync_target)
# Currently, the HB encodes operations as JSON. For the moment I want to keep it # Currently, the HB encodes operations as JSON. For the moment I want to keep it

View File

@ -61,10 +61,12 @@ class Engine
# TODO: make this more efficient!! # TODO: make this more efficient!!
# - operations may only executed in order by creator, order them in object of arrays (key by creator) # - operations may only executed in order by creator, order them in object of arrays (key by creator)
# - you can probably make something like dependencies (creator1 waits for creator2) # - you can probably make something like dependencies (creator1 waits for creator2)
applyOp: (op_json_array)-> applyOp: (op_json_array, fromHB = false)->
if op_json_array.constructor isnt Array if op_json_array.constructor isnt Array
op_json_array = [op_json_array] op_json_array = [op_json_array]
for op_json in op_json_array for op_json in op_json_array
if fromHB
op_json.fromHB = "true" # execute immediately, if
# $parse_and_execute will return false if $o_json was parsed and executed, otherwise the parsed operadion # $parse_and_execute will return false if $o_json was parsed and executed, otherwise the parsed operadion
o = @parseOperation op_json o = @parseOperation op_json
if op_json.fromHB? if op_json.fromHB?

View File

@ -26,7 +26,10 @@ class HistoryBuffer
own = @buffer[@user_id] own = @buffer[@user_id]
if own? if own?
for o_name,o of own for o_name,o of own
o.uid.creator = id if o.uid.creator?
o.uid.creator = id
if o.uid.alt?
o.uid.alt.creator = id
if @buffer[id]? if @buffer[id]?
throw new Error "You are re-assigning an old user id - this is not (yet) possible!" throw new Error "You are re-assigning an old user id - this is not (yet) possible!"
@buffer[id] = own @buffer[id] = own
@ -114,7 +117,7 @@ class HistoryBuffer
for u_name,user of @buffer for u_name,user of @buffer
# TODO next, if @state_vector[user] <= state_vector[user] # TODO next, if @state_vector[user] <= state_vector[user]
for o_number,o of user for o_number,o of user
if o.uid.doSync and unknown(u_name, o_number) if (not o.uid.noOperation?) and o.uid.doSync and unknown(u_name, o_number)
# its necessary to send it, and not known in state_vector # its necessary to send it, and not known in state_vector
o_json = o._encode() o_json = o._encode()
if o.next_cl? # applies for all ops but the most right delimiter! if o.next_cl? # applies for all ops but the most right delimiter!

View File

@ -111,7 +111,13 @@ module.exports = (HB)->
if not @uid.noOperation? if not @uid.noOperation?
@uid @uid
else else
@uid.alt # could be (safely) undefined if @uid.alt? # could be (safely) undefined
map_uid = @uid.alt.cloneUid()
map_uid.sub = @uid.sub
map_uid.doSync = false
map_uid
else
undefined
cloneUid: ()-> cloneUid: ()->
uid = {} uid = {}

View File

@ -56,11 +56,10 @@ module.exports = (HB)->
event_properties = event_properties =
name: property_name name: property_name
event_this = @ event_this = @
map_uid = @cloneUid()
map_uid.sub = property_name
rm_uid = rm_uid =
noOperation: true noOperation: true
alt: map_uid sub: property_name
alt: @
rm = new types.ReplaceManager event_properties, event_this, rm_uid # this operation shall not be saved in the HB rm = new types.ReplaceManager event_properties, event_this, rm_uid # this operation shall not be saved in the HB
@map[property_name] = rm @map[property_name] = rm
rm.setParent @, property_name rm.setParent @, property_name

View File

@ -1,4 +1,3 @@
<link rel="import" href="../polymer/polymer.html">
<polymer-element name="y-object" hidden attributes="val connector y"> <polymer-element name="y-object" hidden attributes="val connector y">
</polymer-element> </polymer-element>

File diff suppressed because one or more lines are too long

4
y.js

File diff suppressed because one or more lines are too long