included connector type
This commit is contained in:
@@ -1,11 +1,15 @@
|
||||
|
||||
|
||||
ConnectorClass = require "./ConnectorClass"
|
||||
#
|
||||
# @param {Engine} engine The transformation engine
|
||||
# @param {HistoryBuffer} HB
|
||||
# @param {Array<Function>} execution_listener You must ensure that whenever an operation is executed, every function in this Array is called.
|
||||
#
|
||||
adaptConnector = (connector, engine, HB, execution_listener)->
|
||||
|
||||
for name, f of ConnectorClass
|
||||
connector[name] = f
|
||||
|
||||
send_ = (o)->
|
||||
if o.uid.creator is HB.getUserId() and (typeof o.uid.op_number isnt "string")
|
||||
connector.broadcast o
|
||||
@@ -46,6 +50,7 @@ adaptConnector = (connector, engine, HB, execution_listener)->
|
||||
connector.getHB = getHB
|
||||
connector.applyHB = applyHB
|
||||
|
||||
connector.receive_handlers = []
|
||||
connector.receive_handlers.push (sender, op)->
|
||||
if op.uid.creator isnt HB.getUserId()
|
||||
engine.applyOp op
|
||||
|
||||
279
lib/ConnectorClass.coffee
Normal file
279
lib/ConnectorClass.coffee
Normal file
@@ -0,0 +1,279 @@
|
||||
|
||||
module.exports =
|
||||
#
|
||||
# @params new Connector(syncMode, role)
|
||||
# @param syncMode {String} is either "syncAll" or "master-slave".
|
||||
# @param role {String} The role of this client
|
||||
# (slave or master (only used when syncMode is master-slave))
|
||||
#
|
||||
init: (options)->
|
||||
req = (name, choices)=>
|
||||
if options[name]?
|
||||
if (not choices?) or choices.some((c)->c is options[name])
|
||||
@[name] = options[name]
|
||||
else
|
||||
throw new Error "You can set the '"+name+"' option to one of the following choices: "+JSON.encode(choices)
|
||||
else
|
||||
throw new Error "You must specify "+name+", when initializing the Connector!"
|
||||
|
||||
req "syncMode", ["syncAll", "master-slave"]
|
||||
req "role", ["master", "slave"]
|
||||
req "user_id"
|
||||
@on_user_id_set(@user_id)
|
||||
|
||||
# A Master should sync with everyone! TODO: really? - for now its safer this way!
|
||||
if @role is "master"
|
||||
@syncMode = "syncAll"
|
||||
|
||||
# is set to true when this is synced with all other connections
|
||||
@is_synced = false
|
||||
# true, iff the client is currently syncing
|
||||
@is_syncing = false
|
||||
# Peerjs Connections: key: conn-id, value: object
|
||||
@connections = {}
|
||||
# List of functions that shall process incoming data
|
||||
# @receive_handlers = [] # this is already set in the ConnectorAdapter!
|
||||
|
||||
# whether this instance is bound to any y instance
|
||||
@is_bound_to_y = false
|
||||
@connections = {}
|
||||
@current_sync_target = null
|
||||
|
||||
isRoleMaster: ->
|
||||
@role is "master"
|
||||
|
||||
isRoleSlave: ->
|
||||
@role is "slave"
|
||||
|
||||
findNewSyncTarget: ()->
|
||||
@current_sync_target = null
|
||||
if @syncMode is "syncAll"
|
||||
for user, c of @connections
|
||||
if not c.is_synced
|
||||
@performSync user
|
||||
break
|
||||
null
|
||||
|
||||
userLeft: (user)->
|
||||
delete @connections[user]
|
||||
@findNewSyncTarget()
|
||||
|
||||
userJoined: (user, role)->
|
||||
if not role?
|
||||
throw new Error "Internal: You must specify the role of the joined user!"
|
||||
# a user joined the room
|
||||
@connections[user] =
|
||||
is_synced : false
|
||||
|
||||
if (not @is_synced) or @syncMode is "syncAll"
|
||||
if @syncMode is "syncAll"
|
||||
@performSync user
|
||||
else if role is "master"
|
||||
# TODO: What if there are two masters? Prevent sending everything two times!
|
||||
@performSyncWithMaster user
|
||||
|
||||
|
||||
#
|
||||
# Execute a function _when_ we are connected. If not connected, wait until connected.
|
||||
# @param f {Function} Will be executed on the PeerJs-Connector context.
|
||||
#
|
||||
whenSynced: (args)->
|
||||
if args.constructore is Function
|
||||
args = [args]
|
||||
if @is_synced
|
||||
args[0].apply this, args[1..]
|
||||
else
|
||||
@compute_when_synced ?= []
|
||||
@compute_when_synced.push args
|
||||
|
||||
#
|
||||
# Execute an function when a message is received.
|
||||
# @param f {Function} Will be executed on the PeerJs-Connector context. f will be called with (sender_id, broadcast {true|false}, message).
|
||||
#
|
||||
onReceive: (f)->
|
||||
@receive_handlers.push f
|
||||
|
||||
###
|
||||
# Broadcast a message to all connected peers.
|
||||
# @param message {Object} The message to broadcast.
|
||||
#
|
||||
broadcast: (message)->
|
||||
throw new Error "You must implement broadcast!"
|
||||
|
||||
#
|
||||
# Send a message to a peer, or set of peers
|
||||
#
|
||||
send: (peer_s, message)->
|
||||
throw new Error "You must implement send!"
|
||||
###
|
||||
|
||||
#
|
||||
# perform a sync with a specific user.
|
||||
#
|
||||
performSync: (user)->
|
||||
if not @current_sync_target?
|
||||
@current_sync_target = user
|
||||
@send user,
|
||||
sync_step: "getHB"
|
||||
data: @getStateVector()
|
||||
|
||||
#
|
||||
# When a master node joined the room, perform this sync with him. It will ask the master for the HB,
|
||||
# and will broadcast his own HB
|
||||
#
|
||||
performSyncWithMaster: (user)->
|
||||
if not @is_syncing
|
||||
@current_sync_target = user
|
||||
@is_syncing = true
|
||||
@send user,
|
||||
sync_step: "getHB"
|
||||
send_again: "true"
|
||||
data: []
|
||||
hb = @getHB([]).hb
|
||||
_hb = []
|
||||
for o in hb
|
||||
_hb.push o
|
||||
if _hb.length > 30
|
||||
@broadcast
|
||||
sync_step: "applyHB_"
|
||||
data: _hb
|
||||
_hb = []
|
||||
@broadcast
|
||||
sync_step: "applyHB"
|
||||
data: _hb
|
||||
#
|
||||
# You are sure that all clients are synced, call this function.
|
||||
#
|
||||
setStateSynced: ()->
|
||||
if not @is_synced
|
||||
@is_synced = true
|
||||
for f in @compute_when_synced
|
||||
f()
|
||||
delete @compute_when_synced
|
||||
null
|
||||
|
||||
#
|
||||
# You received a raw message, and you know that it is intended for to Yjs. Then call this function.
|
||||
#
|
||||
receiveMessage: (sender, res)->
|
||||
if not res.sync_step?
|
||||
for f in @receive_handlers
|
||||
f sender, res
|
||||
else
|
||||
if res.sync_step is "getHB"
|
||||
data = @getHB(res.data)
|
||||
hb = data.hb
|
||||
_hb = []
|
||||
if @is_synced
|
||||
sendApplyHB = ()->
|
||||
|
||||
for o in hb
|
||||
_hb.push o
|
||||
if _hb.length > 30
|
||||
@send sender,
|
||||
sync_step: "applyHB_"
|
||||
data: _hb
|
||||
_hb = []
|
||||
if @is_synced
|
||||
@send sender,
|
||||
sync_s tep: "applyHB"
|
||||
data: _hb
|
||||
|
||||
if res.send_again?
|
||||
send_again = do (sv = data.state_vector)=>
|
||||
()=>
|
||||
hb = @getHB(sv).hb
|
||||
@send sender,
|
||||
sync_step: "applyHB",
|
||||
data: hb
|
||||
sent_again: "true"
|
||||
setTimeout send_again, 3000
|
||||
else if res.sync_step is "applyHB"
|
||||
@applyHB(res.data)
|
||||
|
||||
if (@syncMode is "syncAll" or res.sent_again?) and not @is_synced
|
||||
@setStateSynced()
|
||||
@connections[sender].is_synced = true
|
||||
@findNewSyncTarget()
|
||||
|
||||
else if res.sync_step is "applyHB_"
|
||||
@applyHB(res.data)
|
||||
|
||||
|
||||
# Currently, the HB encodes operations as JSON. For the moment I want to keep it
|
||||
# that way. Maybe we support encoding in the HB as XML in the future, but for now I don't want
|
||||
# too much overhead. Y is very likely to get changed a lot in the future
|
||||
#
|
||||
# Because we don't want to encode JSON as string (with character escaping, wich makes it pretty much unreadable)
|
||||
# we encode the JSON as XML.
|
||||
#
|
||||
# When the HB support encoding as XML, the format should look pretty much like this.
|
||||
|
||||
# does not support primitive values as array elements
|
||||
# expects an ltx (less than xml) object
|
||||
parseMessageFromXml: (m)->
|
||||
parse_array = (node)->
|
||||
for n in node.children
|
||||
if n.getAttribute("isArray") is "true"
|
||||
parse_array n
|
||||
else
|
||||
parse_object n
|
||||
|
||||
parse_object = (node)->
|
||||
json = {}
|
||||
for name, value of node.attrs
|
||||
int = parseInt(value)
|
||||
if isNaN(int) or (""+int) isnt value
|
||||
json[name] = value
|
||||
else
|
||||
json[name] = int
|
||||
for n in node.children
|
||||
name = n.name
|
||||
if n.getAttribute("isArray") is "true"
|
||||
json[name] = parse_array n
|
||||
else
|
||||
json[name] = parse_object n
|
||||
json
|
||||
parse_object m
|
||||
|
||||
# encode message in xml
|
||||
# we use string because Strophe only accepts an "xml-string"..
|
||||
# So {a:4,b:{c:5}} will look like
|
||||
# <y a="4">
|
||||
# <b c="5"></b>
|
||||
# </y>
|
||||
# m - ltx element
|
||||
# json - guess it ;)
|
||||
#
|
||||
encodeMessageToXml: (m, json)->
|
||||
# attributes is optional
|
||||
encode_object = (m, json)->
|
||||
for name,value of json
|
||||
if not value?
|
||||
# nop
|
||||
else if value.constructor is Object
|
||||
encode_object m.c(name), value
|
||||
else if value.constructor is Array
|
||||
encode_array m.c(name), value
|
||||
else
|
||||
m.setAttribute(name,value)
|
||||
m
|
||||
encode_array = (m, array)->
|
||||
m.setAttribute("isArray","true")
|
||||
for e in array
|
||||
if e.constructor is Object
|
||||
encode_object m.c("array-element"), e
|
||||
else
|
||||
encode_array m.c("array-element"), e
|
||||
m
|
||||
if json.constructor is Object
|
||||
encode_object m.c("y",{xmlns:"http://y.ninja/connector-stanza"}), json
|
||||
else if json.constructor is Array
|
||||
encode_array m.c("y",{xmlns:"http://y.ninja/connector-stanza"}), json
|
||||
else
|
||||
throw new Error "I can't encode this json!"
|
||||
|
||||
setIsBoundToY: ()->
|
||||
@on_bound_to_y?()
|
||||
delete @when_bound_to_y
|
||||
@is_bound_to_y = true
|
||||
@@ -31,9 +31,9 @@ class HistoryBuffer
|
||||
throw new Error "You are re-assigning an old user id - this is not (yet) possible!"
|
||||
@buffer[id] = own
|
||||
delete @buffer[@user_id]
|
||||
|
||||
@operation_counter[id] = @operation_counter[@user_id]
|
||||
delete @operation_counter[@user_id]
|
||||
if @operation_counter[@user_id]?
|
||||
@operation_counter[id] = @operation_counter[@user_id]
|
||||
delete @operation_counter[@user_id]
|
||||
@user_id = id
|
||||
|
||||
emptyGarbage: ()=>
|
||||
@@ -196,7 +196,7 @@ class HistoryBuffer
|
||||
# you renew your own state_vector to the state_vector of the other user
|
||||
renewStateVector: (state_vector)->
|
||||
for user,state of state_vector
|
||||
if (not @operation_counter[user]?) or (@operation_counter[user] < state_vector[user])
|
||||
if ((not @operation_counter[user]?) or (@operation_counter[user] < state_vector[user])) and state_vector[user]?
|
||||
@operation_counter[user] = state_vector[user]
|
||||
|
||||
#
|
||||
|
||||
@@ -184,7 +184,7 @@ module.exports = (HB)->
|
||||
#
|
||||
#
|
||||
callEventDecorator: (events)->
|
||||
if not @isDeleted()
|
||||
if not (@isDeleted() or @getLastOperation().isDeleted())
|
||||
for event in events
|
||||
for name,prop of @event_properties
|
||||
event[name] = prop
|
||||
|
||||
@@ -6,11 +6,11 @@ adaptConnector = require "./ConnectorAdapter"
|
||||
|
||||
createY = (connector)->
|
||||
user_id = null
|
||||
if connector.id?
|
||||
user_id = connector.id # TODO: change to getUniqueId()
|
||||
if connector.user_id?
|
||||
user_id = connector.user_id # TODO: change to getUniqueId()
|
||||
else
|
||||
user_id = "_temp"
|
||||
connector.onUserIdSet (id)->
|
||||
connector.on_user_id_set = (id)->
|
||||
user_id = id
|
||||
HB.resetUserId id
|
||||
HB = new HistoryBuffer user_id
|
||||
|
||||
Reference in New Issue
Block a user