implementing participants

Participant receive workitems, do something about/to them. Then, if necessary, they reply to ruote with the [updated] workitem.

In its simplest form a participant implementation will take the form of a block participant.

            engine.register_participant :flush_garbage do |workitem|
              GarbageCan.find(workitem.fields['garbage_can_id']).flush
            end
            

In fact, this is equivalent to the following participant implementation (and registration) :

            class Flusher
              include Ruote::LocalParticipant
            
              def on_workitem
                GarbageCan.find(workitem.fields['garbage_can_id']).flush
                reply
              end
            
              def on_cancel
                # nothing to do
              end
            
              #def on_reply
              #end
            end
            
            engine.register_participant :flush_garbage, Flusher
            

Yes, you are right, the block participant’s block is reproduced in the on_workitem method of the participant implementation, but a reply is added.

#on_workitem isn’t the only method involved in a participant, there is also #on_cancel. Cancel is called when a process instance or a segment of it gets cancelled and the participant holds a workitem for the segment. The workitem has to be somehow cancelled / removed / rolled back.

Two optional methods also exist : #accept? and #on_reply. A participant with an accept? method is free to discard incoming workitems that it doesn’t like. A participant with an on_reply method is usually some kind of proxy for a real (remote) participant; the on_reply method, when present, is called when the answer (workitem) comes back from the real participant.

By default, when a process instance execution reaches a participant expression, it runs the #on_workitem method of the participant in a dedicated Ruby thread. if the #do_not_thread method returns false, no extra thread is created, the #on_workitem call is done immediately (ie, it blocks the worker).

Ruote::Participant (class)

Ruote 2.3.0 brings a new Ruote::Participant class which includes the Ruote::LocalParticipant module and brings an empty implementation of on_cancel. Thus the above participant can be written:

            class Flusher < Ruote::Participant
            
              def on_workitem
                GarbageCan.find(workitem.fields['garbage_can_id']).flush
                reply
              end
            end
            

Ruote::LocalParticipant (module)

Most of the time, you’ll want your participant to include the Ruote::LocalParticipant mixin.

It’s named ‘local’ because it has access to the ruote storage.


Methods to implement or override when implementing a participant and some helpers granted by Ruote::LocalParticipant.

methods

helpers


#on_workitem

When a participant is handed a workitem, this method is called with the workitem as the argument.

            # computing the total for a invoice being passed in the workitem.
            #
            class TotalParticipant
              include Ruote::LocalParticipant
            
              def on_workitem
                workitem.fields['total'] =
                  workitem.fields['items'].inject(0.0) { |t, item|
                    t + item['count'] * PricingService.lookup(item['id'])
                  }
                reply
              end
            end
            

Note the call to #reply when the work is done. Without this call (sooner or later), the process doesn’t resume after this participant.

#on_cancel

If you don’t immediately (and very quickly) reply to the engine in your #on_workitem method, you’d better have a #on_cancel implementation for your class.

            require 'fileutils'
            require 'yaml'
            
            # A plain "worklist" implementations, workitems are placed in files under
            # worklist/
            #
            # We assume the rest of the application reads those yaml files and
            # calls PlainWorklist.reply(workitem_h) when done with a workitem.
            #
            class PlainWorklist
              include Ruote::LocalParticipant
            
              # Called by ruote when there's work for our participant
              #
              def on_workitem
                h = workitem.to_h
                h['fei'] = workitem.fei.sid
                File.open("worklist/workitem_#{workitem.fei.sid}.yaml", wb) do |f|
                  f.puts(h.to_yaml)
                end
                # no reply_to_engine
              end
            
              # When ruote has to cancel a process instance with an active workitem
              # for this participant, this method is called.
              #
              def on_cancel
                puts "(cancelling, flavour is #{flavour}"
                FileUtils.rm("worklist/workitem_#{fei.sid}.yaml")
              end
            
              # A method called by external systems when they're done with a workitem
              # hash, they hand it back to the engine via this method.
              #
              def self.reply(workitem_h)
                FileUtils.rm("worklist/workitem_#{workitem_h['fei']}.yaml")
                receive(workitem)
              end
            end
            

Our on_workitem method doesn’t reply to the engine, at all. The reply occurs later, in the #reply class method. If a cancel order comes, the workitem stored on disk has to be removed.

When #on_cancel returns false, no reply will be emitted towards ruote. It can be useful in scenarii where the #on_workitem itself deals with cancel messages (and it has some way to know about them…).

cancel flavour

This parameter is usually set to ‘cancel’. When a process is killed instead of cancelled, the flavour is set to “kill”. Usually this parameter is simply ignored, but who knows, perhaps a different behaviour might be necessary in some context?

#accept?

Given a list of participant

            engine.register_participant 'al.+', FirstParticipantImplementation
            engine.register_participant 'al.+', SecondParticipantImplementation
            

the first participant whose regex matches the participant name is used. If that first participant has a method #accept?(workitem), it’s called and if it replies with false, the next participant in the participant list is tried.

            class FirstParticipantImplementation
              include Ruote::LocalParticipant
            
              # Do something...
              #
              def on_workitem
                workitem.fields['message'] = 'kilroy was here'
                reply
              end
            
              # This participant will not accept workitems for non military contexts.
              # The next matching participant in the participant list will be consulted.
              #
              def accept?
                workitem.fields['context'] == 'military'
              end
            end
            

#on_apply

Equivalent to on_reply, but triggers before the workitem is dispatched to the participant. It gives participant implementers an opportunity to perform some operations before the dispatch. The typical example is adding some information to the workitem before it’s passed on.

Note, that like for ‘on_reply’:#on_reply, on_apply is executed in the worker thread (whereas dispatch is performed, by default, in a new thread).

#on_reply

Roughly, there are two categories of participants, those who receive a workitem and reply immediately and those who take some time before replying.

In this second category, one can find participants that emit workitem to a remote participant. The workitem comes back via a listener usually.

Participants that provide a #on_reply(workitem) implementation are given a opportunity to tweak the workitem once it comes back.

Here is an example where an hypothetical WorkQueue is placed between the ruote system and the remote participant bound under the name “remote”. The receiver pops messages and feeds the back to the engine. When workitems come back, the worker will hand it to the on_reply method of the participant.

            class MyRemoteParticipant
              #include Ruote::LocalParticipant
            
              def on_workitem
                WorkQueue.push(workitem_to_msg(workitem))
              end
            
              def on_reply
                workitem.fields['msg'] = "back at #{Time.now.to_s}"
              end
            
              protected
            
              def workitem_to_msg(workitem)
                # ...
              end
            end
            
            class Receiver < Ruote::Receiver
            
              def initialize(engine)
                super
                Thread.new { listen }
              end
            
              protected
            
              def listen
                loop { reply_to_engine(workitem_from_msg(WorkQueue.pop)) }
              end
            
              def workitem_from_msg(msg)
                # ...
              end
            end
            
            engine = Ruote::Engine.new(Ruote::Worker.new(Ruote::HashStorage.new))
            
            engine.register_participant :remote, MyRemoteParticipant
            
            receiver = Receiver.new(engine)
            
            # ...
            

#do_not_thread #dont_thread?

By default, when dispatching a workitem to a participant, a ruote worker will call the participant’s #on_workitem method in a new thread. The idea is to move the [potentially] bulk work of dispatching outside of the worker loop very quickly.

This is not always appropriate. The participant can tell the worker not to spawn a new thread by implementing a do_not_thread method and letting it reply with true when it’s called.

            class MyParticipant
              include Ruote::LocalParticipant
            
              # Do something...
              #
              def on_workitem
                workitem.fields['message'] = 'kilroy was here'
                reply
              end
            
              # The #on_workitem method does such a trivial thing that it's not worth spawning
              # a thread.
              #
              def do_not_thread
                true
              end
            end
            

It’s OK to have a do_not_thread method that accepts the workitem as argument :

              def do_not_thread
                workitem.fields['colour'] == 'blue'
              end
            

If you want to influence more radically threading while dispatching, overriding the dispatch_pool service is probably the best solution.

            #
            # This implementation never dispatches in a new thread...
            #
            class MyDispatchPool < Ruote::DispatchPool
            
              def dispatch(msg)
            
                participant = @context.plist.lookup(
                  msg['participant'] || msg['participant_name'], msg['workitem'])
            
                do_dispatch(participant, msg)
              end
            end
            
            # ...
            
            opts = {
              's_dispatch_pool' => [ 'path/to/my_dispatch_pool', 'MyDispatchPool' ]
            }
            
            engine = Ruote::Engine.new(Ruote::Worker.new(Ruote::HashStorage.new(opts)))
            

or something like that…

From ruote 2.3.0 on, you can use the dont_thread? method instead, and the workitem is implicit (like from #on_workitem and #on_reply). So this is OK :

              def dont_thread?
                workitem.fields['colour'] == 'blue'
              end
            

#re_dispatch

Use this method to re_dispatch the workitem.

It takes two options :in and :at for “later re_dispatch”. Without one of those options, the method is a “reject”.

Here is an example where re_dispatch is used to implement a retry tactic.

            class RetryParticipant
              include Ruote::LocalParticipant
            
              def initialize(opts)
                @opts = opts
              end
            
              def on_workitem
                begin
                  do_the_job
                  reply
                rescue
                  re_dispatch(workitem, :in => @opts['delay'] || '1s')
                end
              end
            
              def on_cancel
                # fei is set for the participant and contains the "FlowExpressionId" of
                # the expression getting cancelled
                unschedule_re_dispatch(fei)
              end
            end
            

re_dispatch/reject makes sense too in a multi worker context where a participant on a given host wants to reject a task and let another worker (on another host) do the work. Maybe the accept? method is better for those cases, though.

#unschedule_re_dispatch

Look at the example in the previous section to see unschedule_re_dispatch in action.

#rtimeout

Usually, timeouts are given for an expression in the process definition.

            participant 'alice', :timeout => '2d'
            

where alice as two days to complete her task (send back the workitem).

But it’s OK for participant classes registered in the engine to provide
their own timeout value. The participant instance simply has to reply to
the #rtimeout method and provide a meaningful timeout value (like a
number of seconds, or a string like “2d” or “1M2w”. For example :

            class MyParticipant
              # ...
              def rtimeout(workitem)
                workitem.fields['category'] == 'shoes' ? '2w' : '1w'
              end
              # ...
            end
            

From ruote 2.3.0 on, the workitem is implicit :

            class MyParticipant
              # ...
              def rtimeout
                workitem.fields['category'] == 'shoes' ? '2w' : '1w'
              end
              # ...
            end
            

Note however, that the process definition timeout (if any) will take
precedence over the participant specified one.

#rtimers

We saw above that #rtimeout can provide a default value for a participant
“timeout” attribute. Likewise, #rtimers provides a default value for the
“timers” attribute.

            class MyParticipant
              # ...
              def rtimeout
                '1h: reminder, 12h: error'
              end
              # ...
            end
            

For more info about the timers common attribute.


reply

Often used at the end of a on_workitem implementation.

            class Total
              include Ruote::LocalParticipant
            
              def on_workitem
                workitem.fields['total'] =
                  workitem.fields['items'].inject(0) { |t, (i, c)|
                    item = Item.find(i)
                    t = t + c * item.price
                  }
                reply
              end
            end
            
            engine.register 'total', Total
            

If you scroll a bit, you’ll find an example where this method is called outside of the #on_workitem method.

fexp

Via this method, the participant implementation has access to its [participant] expression.

            class PickCustomer
              include Ruote::LocalParticipant
            
              def on_workitem
                customers = fexp.lookup_variable('customers')
                workitem.fields['customer'] = customers[rand(customers.length)]
                reply
              end
            end
            
            engine.register 'pick_customer', PickCustomer
            

workitem

Whereas applied_workitem returns the workitem as was applied (to the participant expression), workitem returns the workitem as was dispatched to this participant [implementation].

The main difference is that a workitem as dispatched contains the ‘params’ field corresponding to the attributes specified in the process definition.

applied_workitem

Especially when a participant is meant as a task list, workitems may get modified and not immediately replied (proceeded) to the engine.

It could be necessary to look at a copy of the workitem as it was when it reached the participant [expression].

            class TaskList
              include Ruote::LocalParticipant
            
              def initialize(opts)
                @tasks = connect_to_some_db(opts)
              end
            
              def on_workitem
                @tasks[workitem.fei] = workitem
              end
            
              def on_cancel
                @tasks.delete(fei)
              end
            
              def by_user(username)
                @tasks.by(:user => username)
              end
            
              def update(workitem)
                @tasks[workitem.fei] = workitem
              end
            
              def original(workitem)
                applied_workitem(workitem)
              end
            
              def self.reply(workitem)
                if @tasks.delete[workitem.fei]
                  reply_to_engine(workitem)
                end
              end
            end
            
            # ...
            
            engine.register 'user_.+', TaskList
            
            # ...
            
            tasklist = engine.participant('user_x')
              # returns an instance of our participant, 'user_x' will do since
              # it's registered for any participant starting with 'user_'
            
            workitem = tasklist.by_user('user_fred').first
            
            if workitem.fields != tasklist.original(workitem).fields
              puts "fred's workitem got modified"
            end
            
            tasklist.reply(workitem)
            

Note the use of Engine#participant(name) to fetch an instance of a participant via the engine.

participant_name

Since ruote 2.3.0, it’s ok to shorten

            def consume(workitem)
              (workitem.fields['supervisors'] ||= []) << workitem.participant_name
              reply_to_engine(workitem)
            end
            

to

            def on_workitem
              (workitem.fields['supervisors'] ||= []) << participant_name
              reply
            end
            

(the simplification to notice here is the one from “workitem.participant_name” to “participant_name”, well it’s not the only simplification…)

lookup_variable

Quickly do a variable lookup in the participant expression.

            def on_workitem
              workitem.fields['my_var'] = lookup_variable('my_var')
              workitem.fields['my_other_var'] = fexp.lookup_variable('my_other_var')
              reply
            end
            

put and get

Sometimes participant implementations need to stash data somewhere, but they can’t do it in the workitem itself (visibility issues). There are the put and get methods available.

You’ll find an example at the end of this discussion thread.