Parent

Included Modules

Ruote::Beanstalk::ParticipantProxy

This participant emits workitems towards a beanstalk queue.

  engine.register_participant(
    :heavy_labour,
    :reply_by_default => true, :beanstalk => '127.0.0.1:11300')

workitem format

Workitems are encoded in the format

  [ 'workitem', workitem.to_h ]

and then serialized as JSON strings.

cancel items

Like workitems, but the format is

  [ 'cancelitem', fei.to_h, flavour.to_s ]

where fei is the FlowExpressionId of the expression getting cancelled (and whose workitems are to be retired) and flavour is either ‘cancel’ or ‘kill’.

extending this participant

Extend and overwrite encode_workitem and encode_cancelitem or simply re-open the class and change those methods.

:beanstalk

Indicates which beanstalk to talk to

  engine.register_participant(
    'alice'
    Ruote::Beanstalk::ParticipantProxy,
    'beanstalk' => '127.0.0.1:11300')

:tube

Most of the time, you want the workitems (or the cancelitems) to be emitted over/in a specific tube

  engine.register_participant(
    'alice'
    Ruote::Beanstalk::ParticipantProxy,
    'beanstalk' => '127.0.0.1:11300',
    'tube' => 'ruote-workitems')

:reply_by_default

If the participant is configured with ‘reply_by_default’ => true, the participant will dispatch the workitem over to Beanstalk and then immediately reply to its ruote engine (letting the flow resume).

  engine.register_participant(
    'alice'
    Ruote::Beanstalk::ParticipantProxy,
    'beanstalk' => '127.0.0.1:11300',
    'reply_by_default' => true)

Public Class Methods

new(opts) click to toggle source
     # File lib/ruote/beanstalk/participant_proxy.rb, line 105
105:     def initialize(opts)
106: 
107:       @opts = opts
108:     end

Public Instance Methods

cancel(fei, flavour) click to toggle source
     # File lib/ruote/beanstalk/participant_proxy.rb, line 122
122:     def cancel(fei, flavour)
123: 
124:       con = new_connection
125: 
126:       con.put(encode_cancelitem(fei, flavour))
127: 
128:     ensure
129:       con.close rescue nil
130:     end
consume(workitem) click to toggle source
     # File lib/ruote/beanstalk/participant_proxy.rb, line 110
110:     def consume(workitem)
111: 
112:       con = new_connection
113: 
114:       con.put(encode_workitem(workitem))
115: 
116:       reply(workitem) if @opts['reply_by_default']
117: 
118:     ensure
119:       con.close rescue nil
120:     end
encode_cancelitem(fei, flavour) click to toggle source
     # File lib/ruote/beanstalk/participant_proxy.rb, line 137
137:     def encode_cancelitem(fei, flavour)
138: 
139:       Rufus::Json.encode([ 'cancelitem', fei.to_h, flavour.to_s ])
140:     end
encode_workitem(workitem) click to toggle source
     # File lib/ruote/beanstalk/participant_proxy.rb, line 132
132:     def encode_workitem(workitem)
133: 
134:       Rufus::Json.encode([ 'workitem', workitem.to_h ])
135:     end

Protected Instance Methods

new_connection() click to toggle source
     # File lib/ruote/beanstalk/participant_proxy.rb, line 144
144:     def new_connection
145: 
146:       con = ::Beanstalk::Connection.new(@opts['beanstalk'])
147: 
148:       if tube = @opts['tube']
149:         con.use(tube)
150:       end
151: 
152:       con
153:     end

Disabled; run with --debug to generate this.

[Validate]

Generated with the Darkfish Rdoc Generator 1.1.6.