implementing participants
Participant receive workitems, do something about/to them. Then, if necessary, they reply to ruote with the [updated] workitem.
In its simplest form a participant implementation will take the form of a block participant.
engine.register_participant :flush_garbage do |workitem|
GarbageCan.find(workitem.fields['garbage_can_id']).flush
end
In fact, this is equivalent to the following participant implementation (and registration) :
class Flusher
include Ruote::LocalParticipant
def on_workitem
GarbageCan.find(workitem.fields['garbage_can_id']).flush
reply
end
def on_cancel
# nothing to do
end
#def on_reply
#end
end
engine.register_participant :flush_garbage, Flusher
Yes, you are right, the block participant’s block is reproduced in the on_workitem method of the participant implementation, but a reply is added.
#on_workitem isn’t the only method involved in a participant, there is also #on_cancel. Cancel is called when a process instance or a segment of it gets cancelled and the participant holds a workitem for the segment. The workitem has to be somehow cancelled / removed / rolled back.
Two optional methods also exist : #accept? and #on_reply. A participant with an accept? method is free to discard incoming workitems that it doesn’t like. A participant with an on_reply method is usually some kind of proxy for a real (remote) participant; the on_reply method, when present, is called when the answer (workitem) comes back from the real participant.
By default, when a process instance execution reaches a participant expression, it runs the #on_workitem method of the participant in a dedicated Ruby thread. if the #do_not_thread method returns false, no extra thread is created, the #on_workitem call is done immediately (ie, it blocks the worker).
Ruote::Participant (class)
Ruote 2.3.0 brings a new Ruote::Participant class which includes the Ruote::LocalParticipant module and brings an empty implementation of on_cancel. Thus the above participant can be written:
class Flusher < Ruote::Participant
def on_workitem
GarbageCan.find(workitem.fields['garbage_can_id']).flush
reply
end
end
Ruote::LocalParticipant (module)
Most of the time, you’ll want your participant to include the Ruote::LocalParticipant mixin.
It’s named ‘local’ because it has access to the ruote storage.
Methods to implement or override when implementing a participant and some helpers granted by Ruote::LocalParticipant.
methods
- #on_workitem (was “consume” before 2.3.0)
- #on_cancel (was “cancel” before 2.3.0)
- #accept?
- #on_apply
- #on_reply
- #do_not_thread (#dont_thread?)
helpers
#on_workitem
When a participant is handed a workitem, this method is called with the workitem as the argument.
# computing the total for a invoice being passed in the workitem.
#
class TotalParticipant
include Ruote::LocalParticipant
def on_workitem
workitem.fields['total'] =
workitem.fields['items'].inject(0.0) { |t, item|
t + item['count'] * PricingService.lookup(item['id'])
}
reply
end
end
Note the call to #reply when the work is done. Without this call (sooner or later), the process doesn’t resume after this participant.
#on_cancel
If you don’t immediately (and very quickly) reply to the engine in your #on_workitem method, you’d better have a #on_cancel implementation for your class.
require 'fileutils'
require 'yaml'
# A plain "worklist" implementations, workitems are placed in files under
# worklist/
#
# We assume the rest of the application reads those yaml files and
# calls PlainWorklist.reply(workitem_h) when done with a workitem.
#
class PlainWorklist
include Ruote::LocalParticipant
# Called by ruote when there's work for our participant
#
def on_workitem
h = workitem.to_h
h['fei'] = workitem.fei.sid
File.open("worklist/workitem_#{workitem.fei.sid}.yaml", wb) do |f|
f.puts(h.to_yaml)
end
# no reply_to_engine
end
# When ruote has to cancel a process instance with an active workitem
# for this participant, this method is called.
#
def on_cancel
puts "(cancelling, flavour is #{flavour}"
FileUtils.rm("worklist/workitem_#{fei.sid}.yaml")
end
# A method called by external systems when they're done with a workitem
# hash, they hand it back to the engine via this method.
#
def self.reply(workitem_h)
FileUtils.rm("worklist/workitem_#{workitem_h['fei']}.yaml")
receive(workitem)
end
end
Our on_workitem method doesn’t reply to the engine, at all. The reply occurs later, in the #reply class method. If a cancel order comes, the workitem stored on disk has to be removed.
When #on_cancel returns false, no reply will be emitted towards ruote. It can be useful in scenarii where the #on_workitem itself deals with cancel messages (and it has some way to know about them…).
cancel flavour
This parameter is usually set to ‘cancel’. When a process is killed instead of cancelled, the flavour is set to “kill”. Usually this parameter is simply ignored, but who knows, perhaps a different behaviour might be necessary in some context?
#accept?
Given a list of participant
engine.register_participant 'al.+', FirstParticipantImplementation
engine.register_participant 'al.+', SecondParticipantImplementation
the first participant whose regex matches the participant name is used. If that first participant has a method #accept?(workitem), it’s called and if it replies with false, the next participant in the participant list is tried.
class FirstParticipantImplementation
include Ruote::LocalParticipant
# Do something...
#
def on_workitem
workitem.fields['message'] = 'kilroy was here'
reply
end
# This participant will not accept workitems for non military contexts.
# The next matching participant in the participant list will be consulted.
#
def accept?
workitem.fields['context'] == 'military'
end
end
#on_apply
Equivalent to on_reply, but triggers before the workitem is dispatched to the participant. It gives participant implementers an opportunity to perform some operations before the dispatch. The typical example is adding some information to the workitem before it’s passed on.
Note, that like for ‘on_reply’:#on_reply, on_apply is executed in the worker thread (whereas dispatch is performed, by default, in a new thread).
#on_reply
Roughly, there are two categories of participants, those who receive a workitem and reply immediately and those who take some time before replying.
In this second category, one can find participants that emit workitem to a remote participant. The workitem comes back via a listener usually.
Participants that provide a #on_reply(workitem) implementation are given a opportunity to tweak the workitem once it comes back.
Here is an example where an hypothetical WorkQueue is placed between the ruote system and the remote participant bound under the name “remote”. The receiver pops messages and feeds the back to the engine. When workitems come back, the worker will hand it to the on_reply method of the participant.
class MyRemoteParticipant
#include Ruote::LocalParticipant
def on_workitem
WorkQueue.push(workitem_to_msg(workitem))
end
def on_reply
workitem.fields['msg'] = "back at #{Time.now.to_s}"
end
protected
def workitem_to_msg(workitem)
# ...
end
end
class Receiver < Ruote::Receiver
def initialize(engine)
super
Thread.new { listen }
end
protected
def listen
loop { reply_to_engine(workitem_from_msg(WorkQueue.pop)) }
end
def workitem_from_msg(msg)
# ...
end
end
engine = Ruote::Engine.new(Ruote::Worker.new(Ruote::HashStorage.new))
engine.register_participant :remote, MyRemoteParticipant
receiver = Receiver.new(engine)
# ...
#do_not_thread #dont_thread?
By default, when dispatching a workitem to a participant, a ruote worker will call the participant’s #on_workitem method in a new thread. The idea is to move the [potentially] bulk work of dispatching outside of the worker loop very quickly.
This is not always appropriate. The participant can tell the worker not to spawn a new thread by implementing a do_not_thread method and letting it reply with true when it’s called.
class MyParticipant
include Ruote::LocalParticipant
# Do something...
#
def on_workitem
workitem.fields['message'] = 'kilroy was here'
reply
end
# The #on_workitem method does such a trivial thing that it's not worth spawning
# a thread.
#
def do_not_thread
true
end
end
It’s OK to have a do_not_thread method that accepts the workitem as argument :
def do_not_thread
workitem.fields['colour'] == 'blue'
end
If you want to influence more radically threading while dispatching, overriding the dispatch_pool service is probably the best solution.
#
# This implementation never dispatches in a new thread...
#
class MyDispatchPool < Ruote::DispatchPool
def dispatch(msg)
participant = @context.plist.lookup(
msg['participant'] || msg['participant_name'], msg['workitem'])
do_dispatch(participant, msg)
end
end
# ...
opts = {
's_dispatch_pool' => [ 'path/to/my_dispatch_pool', 'MyDispatchPool' ]
}
engine = Ruote::Engine.new(Ruote::Worker.new(Ruote::HashStorage.new(opts)))
or something like that…
From ruote 2.3.0 on, you can use the dont_thread? method instead, and the workitem is implicit (like from #on_workitem and #on_reply). So this is OK :
def dont_thread?
workitem.fields['colour'] == 'blue'
end
#re_dispatch
Use this method to re_dispatch the workitem.
It takes two options :in and :at for “later re_dispatch”. Without one of those options, the method is a “reject”.
Here is an example where re_dispatch is used to implement a retry tactic.
class RetryParticipant
include Ruote::LocalParticipant
def initialize(opts)
@opts = opts
end
def on_workitem
begin
do_the_job
reply
rescue
re_dispatch(workitem, :in => @opts['delay'] || '1s')
end
end
def on_cancel
# fei is set for the participant and contains the "FlowExpressionId" of
# the expression getting cancelled
unschedule_re_dispatch(fei)
end
end
re_dispatch/reject makes sense too in a multi worker context where a participant on a given host wants to reject a task and let another worker (on another host) do the work. Maybe the accept? method is better for those cases, though.
#unschedule_re_dispatch
Look at the example in the previous section to see unschedule_re_dispatch in action.
#rtimeout
Usually, timeouts are given for an expression in the process definition.
participant 'alice', :timeout => '2d'
where alice as two days to complete her task (send back the workitem).
But it’s OK for participant classes registered in the engine to provide
their own timeout value. The participant instance simply has to reply to
the #rtimeout method and provide a meaningful timeout value (like a
number of seconds, or a string like “2d” or “1M2w”. For example :
class MyParticipant
# ...
def rtimeout(workitem)
workitem.fields['category'] == 'shoes' ? '2w' : '1w'
end
# ...
end
From ruote 2.3.0 on, the workitem is implicit :
class MyParticipant
# ...
def rtimeout
workitem.fields['category'] == 'shoes' ? '2w' : '1w'
end
# ...
end
Note however, that the process definition timeout (if any) will take
precedence over the participant specified one.
#rtimers
We saw above that #rtimeout can provide a default value for a participant
“timeout” attribute. Likewise, #rtimers provides a default value for the
“timers” attribute.
class MyParticipant
# ...
def rtimeout
'1h: reminder, 12h: error'
end
# ...
end
For more info about the timers common attribute.
reply
Often used at the end of a on_workitem implementation.
class Total
include Ruote::LocalParticipant
def on_workitem
workitem.fields['total'] =
workitem.fields['items'].inject(0) { |t, (i, c)|
item = Item.find(i)
t = t + c * item.price
}
reply
end
end
engine.register 'total', Total
If you scroll a bit, you’ll find an example where this method is called outside of the #on_workitem method.
fexp
Via this method, the participant implementation has access to its [participant] expression.
class PickCustomer
include Ruote::LocalParticipant
def on_workitem
customers = fexp.lookup_variable('customers')
workitem.fields['customer'] = customers[rand(customers.length)]
reply
end
end
engine.register 'pick_customer', PickCustomer
workitem
Whereas applied_workitem returns the workitem as was applied (to the participant expression), workitem returns the workitem as was dispatched to this participant [implementation].
The main difference is that a workitem as dispatched contains the ‘params’ field corresponding to the attributes specified in the process definition.
applied_workitem
Especially when a participant is meant as a task list, workitems may get modified and not immediately replied (proceeded) to the engine.
It could be necessary to look at a copy of the workitem as it was when it reached the participant [expression].
class TaskList
include Ruote::LocalParticipant
def initialize(opts)
@tasks = connect_to_some_db(opts)
end
def on_workitem
@tasks[workitem.fei] = workitem
end
def on_cancel
@tasks.delete(fei)
end
def by_user(username)
@tasks.by(:user => username)
end
def update(workitem)
@tasks[workitem.fei] = workitem
end
def original(workitem)
applied_workitem(workitem)
end
def self.reply(workitem)
if @tasks.delete[workitem.fei]
reply_to_engine(workitem)
end
end
end
# ...
engine.register 'user_.+', TaskList
# ...
tasklist = engine.participant('user_x')
# returns an instance of our participant, 'user_x' will do since
# it's registered for any participant starting with 'user_'
workitem = tasklist.by_user('user_fred').first
if workitem.fields != tasklist.original(workitem).fields
puts "fred's workitem got modified"
end
tasklist.reply(workitem)
Note the use of Engine#participant(name) to fetch an instance of a participant via the engine.
participant_name
Since ruote 2.3.0, it’s ok to shorten
def consume(workitem)
(workitem.fields['supervisors'] ||= []) << workitem.participant_name
reply_to_engine(workitem)
end
to
def on_workitem
(workitem.fields['supervisors'] ||= []) << participant_name
reply
end
(the simplification to notice here is the one from “workitem.participant_name” to “participant_name”, well it’s not the only simplification…)
lookup_variable
Quickly do a variable lookup in the participant expression.
def on_workitem
workitem.fields['my_var'] = lookup_variable('my_var')
workitem.fields['my_other_var'] = fexp.lookup_variable('my_other_var')
reply
end
put and get
Sometimes participant implementations need to stash data somewhere, but they can’t do it in the workitem itself (visibility issues). There are the put and get methods available.
You’ll find an example at the end of this discussion thread.