Provides methods for ‘local’ participants.
Assumes the class that includes this module has a context method that points to the worker or engine ruote context.
It’s “local” because it has access to the ruote storage.
Test shortcut, alleviates the need to set the workitem before calling accept?
# File lib/ruote/part/local_participant.rb, line 274 def _accept?(wi) Ruote.participant_send(self, :accept?, 'workitem' => wi) end
Test shortcut, alleviates the need to set the workitem before calling dont_thread?, do_not_thread? or do_not_thread.
# File lib/ruote/part/local_participant.rb, line 281 def _dont_thread?(wi) Ruote.participant_send( self, [ :dont_thread?, :do_not_thread?, :do_not_thread ], 'workitem' => wi) end
Test shortcut, alleviates the need to set fei and flavour before calling cancel / on_consume.
# File lib/ruote/part/local_participant.rb, line 258 def _on_cancel(fei, flavour) Ruote.participant_send( self, [ :on_cancel, :cancel ], 'fei' => fei, 'flavour' => flavour) end
Test shortcut, alleviates the need to set the workitem before calling on_reply.
# File lib/ruote/part/local_participant.rb, line 267 def _on_reply(wi) Ruote.participant_send(self, :on_reply, 'workitem' => wi) end
Test shortcut, alleviates the need to set the workitem before calling consume / on_workitem.
# File lib/ruote/part/local_participant.rb, line 249 def _on_workitem(wi) Ruote.participant_send( self, [ :on_workitem, :consume ], 'workitem' => wi) end
Test shortcut, alleviates the need to set the workitem before calling rtimeout.
# File lib/ruote/part/local_participant.rb, line 293 def _rtimeout(wi) Ruote.participant_send(self, :rtimeout, 'workitem' => wi) end
Returns the workitem as was applied when the Ruote::ParticipantExpression was reached.
If the _fei arg is specified, it will return the corresponding applied workitem. This args is mostly here for backward compatibility.
# File lib/ruote/part/local_participant.rb, line 105 def applied_workitem(_fei=nil) fetch_workitem(_fei || fei) end
Returns the current fei (Ruote::FlowExpressionId).
# File lib/ruote/part/local_participant.rb, line 83 def fei @fei ? @fei : @workitem.fei end
Returns the Ruote::ParticipantExpression that corresponds with this participant.
If a wi_or_fei arg is given, will return the corresponding flow expression. This arg is mostly here for backward compatibility.
# File lib/ruote/part/local_participant.rb, line 94 def fexp(wi_or_fei=nil) flow_expression(wi_or_fei || fei) end
Returns true if the underlying participant expression is gone or cancelling.
# File lib/ruote/part/local_participant.rb, line 308 def is_cancelled? if fe = fexp return fe.h.state == 'cancelling' else true end end
Returns true if the underlying participant expression is ‘gone’ (probably cancelled somehow).
# File lib/ruote/part/local_participant.rb, line 300 def is_gone? fexp.nil? end
A shortcut for
fexp.lookup_variable(key)
# File lib/ruote/part/local_participant.rb, line 128 def lookup_variable(key) fexp.lookup_variable(key) end
Up until ruote 2.3.0, the participant name had to be fetched from the workitem. This is a shortcut, it lets you write participant code that look like
def on_workitem (workitem.fields['supervisors'] || []) << participant_name reply end
# File lib/ruote/part/local_participant.rb, line 119 def participant_name workitem.participant_name end
Use this method to re_dispatch the workitem.
It takes two options :in and :at for “later re_dispatch”.
Look at the unschedule_re_dispatch method for an example of participant implementation that uses re_dispatch.
Without one of those options, the method is a “reject”.
# File lib/ruote/part/local_participant.rb, line 156 def re_dispatch(wi=nil, opts=nil) wi, opts = [ nil, wi ] if wi.is_a?(Hash) && opts.nil? wi ||= workitem() opts ||= {} wi.h['re_dispatch_count'] = wi.h['re_dispatch_count'].to_s.to_i + 1 msg = { 'action' => 'dispatch', 'fei' => wi.h.fei, 'workitem' => wi.h, 'participant_name' => wi.participant_name } if t = opts[:in] || opts[:at] sched_id = @context.storage.put_schedule('at', wi.h.fei, t, msg) exp = fexp(wi) exp.h['re_dispatch_sched_id'] = sched_id exp.try_persist else @context.storage.put_msg('dispatch', msg) end end
This reject method replaces the workitem in the [internal] message queue of the ruote engine (since it’s a local participant, it has access to the storage and it’s thus easy). The idea is that another worker will pick up the workitem and do the participant dispatching.
This is an advanced technique. It was requested by people who want to have multiple workers and have only certain worker/participants do the handling. Using reject is not the best method, it’s probably better to implement this by re-opening the Ruote::Worker class and changing the cannot_handle(msg) method.
reject could be useful anyway, not sure now, but one could imagine scenarii where some participants reject workitems temporarily (while the same participant on another worker would accept it).
Well, here it is, use with care.
Participant implementations call this method when their on_workitem (consume) methods are done and they want to hand back the workitem to the engine so that the flow can resume.
the (wi=workitem) is mostly for backward compatibility (or for passing a totally different workitem to the engine).
# File lib/ruote/part/local_participant.rb, line 140 def reply_to_engine(wi=workitem) receive(wi) end
Cancels the scheduled re_dispatch, if any.
An example of ‘retrying participant’ :
class RetryParticipant
include Ruote::LocalParticipant
def initialize(opts)
@opts = opts
end
def on_workitem
begin
do_the_job
reply
rescue
re_dispatch(:in => @opts['delay'] || '1s')
end
end
def cancel
unschedule_re_dispatch
end
end
Note how unschedule_re_dispatch is used in the cancel method. Warning, this example could loop forever…
# File lib/ruote/part/local_participant.rb, line 213 def unschedule_re_dispatch(fei=nil) if s = fexp.h['re_dispatch_sched_id'] @context.storage.delete_schedule(s) end end
Returns the current workitem if no fei is given. If a fei is given, it will return the applied workitem for that fei (if any).
The optional fei is mostly here for backward compatibility (with 2.2.0)
# File lib/ruote/part/local_participant.rb, line 74 def workitem(fei=nil) return fetch_workitem(fei) if fei @workitem ? @workitem : applied_workitem end
See put
# File lib/ruote/part/local_participant.rb, line 340 def get(key=nil) stash_get(fei, key) end
Receivers and local participants share the stash_put and stash_get methods. The local participant has put and get which don’t need an initial fei, thus get and put deal with the participant expression directly, whereas stash_put and stash_get can point at any expression.
‘put’ can be called as
put('secret' => 'message', 'to' => 'embassy')
# or
put('secret', 'message')
# File lib/ruote/part/local_participant.rb, line 333 def put(key, value=nil) stash_put(fei, key, value) end
Generated with the Darkfish Rdoc Generator 2.