Parent

Class/Module Index [+]

Quicksearch

Ruote::StorageParticipant

A participant that stores the workitem in the same storage used by the engine and the worker(s).

part = engine.register_participant 'alfred', Ruote::StorageParticipant

# ... a bit later

puts "workitems still open : "
part.each do |workitem|
  puts "#{workitem.fei.wfid} - #{workitem.fields['params']['task']}"
end

# ... when done with a workitem

part.reply(workitem)
  # this will remove the workitem from the storage and hand it back
  # to the engine

Does not thread by default (the engine will not spawn a dedicated thread to handle the delivery to this participant, the workitem will get stored via the main engine thread and basta).

Public Class Methods

matches?(hwi, pname, criteria) click to toggle source

Used by query when filtering workitems.

# File lib/ruote/part/storage_participant.rb, line 331
def self.matches?(hwi, pname, criteria)

  return false if pname && hwi['participant_name'] != pname

  fields = hwi['fields']

  criteria.each do |fname, fvalue|
    return false if fields[fname] != fvalue
  end

  true
end
new(engine_or_options={}, options=nil) click to toggle source
# File lib/ruote/part/storage_participant.rb, line 58
def initialize(engine_or_options={}, options=nil)

  if engine_or_options.respond_to?(:context)
    @context = engine_or_options.context
  elsif engine_or_options.is_a?(Ruote::Context)
    @context = engine_or_options
  else
    @options = engine_or_options
  end

  @options ||= {}

  @store_name = @options['store_name']
end

Public Instance Methods

[](fei) click to toggle source

Given a fei (or its string version, a sid), returns the corresponding workitem (or nil).

# File lib/ruote/part/storage_participant.rb, line 144
def [](fei)

  doc = fetch(fei)

  doc ? Ruote::Workitem.new(doc) : nil
end
Also aliased as: by_fei
all(opts={}) click to toggle source

Returns all the workitems stored in here.

# File lib/ruote/part/storage_participant.rb, line 209
def all(opts={})

  res = fetch_all(opts)

  res.is_a?(Array) ? res.map { |hwi| Ruote::Workitem.new(hwi) } : res
end
by_fei(fei) click to toggle source
Alias for: []
by_field(field, value=nil, opts={}) click to toggle source

field : returns all the workitems with the given field name present.

field and value : returns all the workitems with the given field name and the given value for that field.

Warning : only some storages are optimized for such queries (like CouchStorage), the others will load all the workitems and then filter them.

# File lib/ruote/part/storage_participant.rb, line 257
def by_field(field, value=nil, opts={})

  (value, opts = nil, value) if value.is_a?(Hash)

  if @context.storage.respond_to?(:by_field)
    return @context.storage.by_field('workitems', field, value, opts)
  end

  do_select(opts) do |hwi|
    hwi['fields'].keys.include?(field) &&
    (value.nil? || hwi['fields'][field] == value)
  end
end
by_participant(participant_name, opts={}) click to toggle source

Returns all workitems for the specified participant name

# File lib/ruote/part/storage_participant.rb, line 237
def by_participant(participant_name, opts={})

  return @context.storage.by_participant(
    'workitems', participant_name, opts
  ) if @context.storage.respond_to?(:by_participant)

  do_select(opts) do |hwi|
    hwi['participant_name'] == participant_name
  end
end
by_wfid(wfid, opts={}) click to toggle source

Return all workitems for the specified wfid

# File lib/ruote/part/storage_participant.rb, line 226
def by_wfid(wfid, opts={})

  if @context.storage.respond_to?(:by_wfid)
    return @context.storage.by_wfid('workitems', wfid, opts)
  end

  wis(@context.storage.get_many('workitems', wfid, opts))
end
delegate(workitem, new_owner) click to toggle source

Delegates a currently owned workitem to a new owner.

Fails if the workitem can’t be found, belongs to noone, or if the workitem passed as argument is out of date (got modified in the mean time).

It’s OK to delegate to nil, thus freeing the workitem.

See reserve for an an explanation of the reserve/delegate/proceed flow.

# File lib/ruote/part/storage_participant.rb, line 405
def delegate(workitem, new_owner)

  hwi = fetch(workitem)

  fail ArgumentError.new(
    "workitem not found"
  ) if hwi == nil

  fail ArgumentError.new(
    "cannot delegate, workitem doesn't belong to anyone"
  ) if hwi['owner'] == nil

  fail ArgumentError.new(
    "cannot delegate, " +
    "workitem owned by '#{hwi['owner']}', not '#{workitem.owner}'"
  ) if hwi['owner'] != workitem.owner

  hwi['owner'] = new_owner

  r = @context.storage.put(hwi, :update_rev => true)

  fail ArgumentError.new("workitem is gone") if r == true
  fail ArgumentError.new("workitem got modified meanwhile") if r != nil

  Workitem.new(hwi)
end
do_not_thread() click to toggle source

No need for a separate thread when delivering to this participant.

# File lib/ruote/part/storage_participant.rb, line 75
def do_not_thread; true; end
do_update(workitem=@workitem) click to toggle source

Added for groups.google.com/forum/?fromgroups#!topic/openwferu-users/5bpV2yfKwM0

Makes sure the workitem get saved to the storage. Fails if the workitem is already gone. Returns nil in case of success.

# File lib/ruote/part/storage_participant.rb, line 115
def do_update(workitem=@workitem)

  r = update(workitem)

  fail ArgumentError.new("workitem is gone") if r == true
  return nil if r.nil?

  r.h['fields'] = workitem.fields
  do_update(r)
end
each(&block) click to toggle source

Iterates over the workitems stored in here.

# File lib/ruote/part/storage_participant.rb, line 202
def each(&block)

  all.each { |wi| block.call(wi) }
end
first() click to toggle source

A convenience method (especially when testing), returns the first (only ?) workitem in the participant.

# File lib/ruote/part/storage_participant.rb, line 219
def first

  wi(fetch_all({}).first)
end
flunk(workitem, err_class_or_instance, *err_arguments) click to toggle source

Removes the workitem and hands it back to the flow with an error to raise for the participant expression that emitted the workitem.

# File lib/ruote/part/storage_participant.rb, line 169
def flunk(workitem, err_class_or_instance, *err_arguments)

  r = remove_workitem('reject', workitem)

  return flunk(workitem) if r != nil

  workitem.h.delete('_rev')

  super(workitem, err_class_or_instance, *err_arguments)
end
on_cancel() click to toggle source

Removes the document/workitem from the storage.

Warning: this method is called by the engine (worker), i.e. not by you.

# File lib/ruote/part/storage_participant.rb, line 130
def on_cancel

  doc = fetch(fei)

  return unless doc

  r = @context.storage.delete(doc)

  on_cancel(fei, flavour) if r != nil
end
on_workitem() click to toggle source

This is the method called by ruote when passing a workitem to this participant.

# File lib/ruote/part/storage_participant.rb, line 80
def on_workitem

  doc = workitem.to_h

  doc.merge!(
    'type' => 'workitems',
    '_id' => to_id(doc['fei']),
    'participant_name' => doc['participant_name'],
    'wfid' => doc['fei']['wfid'])

  doc['store_name'] = @store_name if @store_name

  @context.storage.put(doc)
end
per_participant() click to toggle source

Mostly a test method. Returns a Hash were keys are participant names and values are lists of workitems.

# File lib/ruote/part/storage_participant.rb, line 347
def per_participant

  each_with_object({}) { |wi, h| (h[wi.participant_name] ||= []) << wi }
end
per_participant_count() click to toggle source

Mostly a test method. Returns a Hash were keys are participant names and values are integers, the count of workitems for a given participant name.

# File lib/ruote/part/storage_participant.rb, line 356
def per_participant_count

  per_participant.remap { |(k, v), h| h[k] = v.size }
end
proceed(workitem) click to toggle source

Removes the workitem from the storage and replies to the engine.

# File lib/ruote/part/storage_participant.rb, line 155
def proceed(workitem)

  r = remove_workitem('proceed', workitem)

  return proceed(workitem) if r != nil

  workitem.h.delete('_rev')

  reply_to_engine(workitem)
end
purge!() click to toggle source

Cleans this participant out completely

# File lib/ruote/part/storage_participant.rb, line 324
def purge!

  fetch_all({}).each { |hwi| @context.storage.delete(hwi) }
end
query(criteria) click to toggle source

Queries the store participant for workitems.

Some examples :

part.query(:wfid => @wfid).size
part.query('place' => 'nara').size
part.query('place' => 'heiankyou').size
part.query(:wfid => @wfid, :place => 'heiankyou').size

There are two ‘reserved’ criterion : ‘wfid’ and ‘participant’ (‘participant_name’ as well). The rest of the criteria are considered constraints for fields.

‘offset’ and ‘limit’ are reserved as well. They should prove useful for pagination. ‘skip’ can be used instead of ‘offset’.

Note : the criteria is AND only, you’ll have to do ORs (aggregation) by yourself.

# File lib/ruote/part/storage_participant.rb, line 290
def query(criteria)

  cr = Ruote.keys_to_s(criteria)

  if @context.storage.respond_to?(:query_workitems)
    return @context.storage.query_workitems(cr)
  end

  opts = {}
  opts[:skip] = cr.delete('offset') || cr.delete('skip')
  opts[:limit] = cr.delete('limit')
  opts[:count] = cr.delete('count')

  wfid = cr.delete('wfid')

  count = opts[:count]

  pname = cr.delete('participant_name') || cr.delete('participant')
  opts.delete(:count) if pname

  hwis = wfid ?
    @context.storage.get_many('workitems', wfid, opts) : fetch_all(opts)

  return hwis unless hwis.is_a?(Array)

  hwis = hwis.select { |hwi|
    Ruote::StorageParticipant.matches?(hwi, pname, cr)
  }

  count ? hwis.size : wis(hwis)
end
reply(workitem) click to toggle source

(soon to be removed)

# File lib/ruote/part/storage_participant.rb, line 182
def reply(workitem)

  puts '-' * 80
  puts '*** WARNING : please use the Ruote::StorageParticipant#proceed(wi)'
  puts '              instead of #reply(wi) which is deprecated'
  #caller.each { |l| puts l }
  puts '-' * 80

  proceed(workitem)
end
reserve(workitem_or_fei, owner) click to toggle source

Claims a workitem. Returns the [updated] workitem if successful.

Returns nil if the workitem is already reserved.

Fails if the workitem can’t be found, is gone, or got modified elsewhere.

Here is a mini-diagram explaining the reserve/delegate/proceed flow:

 in    delegate(nil)    delegate(other)
 |    +---------------+ +------+
 v    v               | |      v
+-------+  reserve   +----------+  proceed
| ready | ---------> | reserved | ---------> out
+-------+            +----------+
# File lib/ruote/part/storage_participant.rb, line 377
def reserve(workitem_or_fei, owner)

  hwi = fetch(workitem_or_fei)

  fail ArgumentError.new("workitem not found") if hwi.nil?

  return nil if hwi['owner'] && hwi['owner'] != owner

  hwi['owner'] = owner

  r = @context.storage.put(hwi, :update_rev => true)

  fail ArgumentError.new("workitem is gone") if r == true
  fail ArgumentError.new("workitem got modified meanwhile") if r != nil

  Workitem.new(hwi)
end
size() click to toggle source

Returns the count of workitems stored in this participant.

# File lib/ruote/part/storage_participant.rb, line 195
def size

  fetch_all(:count => true)
end
update(workitem) click to toggle source

Used by client code when “saving” a workitem (fields may have changed). Calling update won’t proceed the workitem.

Returns nil in case of success, true if the workitem is already gone and the newer version of the workitem if the workitem changed in the mean time.

# File lib/ruote/part/storage_participant.rb, line 102
def update(workitem)

  r = @context.storage.put(workitem.h)

  r.is_a?(Hash) ? Ruote::Workitem.new(r) : r
end

Protected Instance Methods

do_select(opts, &block) click to toggle source

Given a few options and a block, returns all the workitems that match the block

# File lib/ruote/part/storage_participant.rb, line 457
def do_select(opts, &block)

  skip = opts[:offset] || opts[:skip]
  limit = opts[:limit]
  count = opts[:count]

  hwis = fetch_all({})
  hwis = hwis.select(&block)

  hwis = hwis[skip..-1] if skip
  hwis = hwis[0, limit] if limit

  return hwis.size if count

  hwis.collect { |hwi| Ruote::Workitem.new(hwi) }
end
fetch(workitem_or_fei) click to toggle source

Fetches a workitem in its raw form (Hash).

# File lib/ruote/part/storage_participant.rb, line 436
def fetch(workitem_or_fei)

  hfei = Ruote::FlowExpressionId.extract_h(workitem_or_fei)

  @context.storage.get('workitems', to_id(hfei))
end
fetch_all(opts) click to toggle source

Fetches all the workitems. If there is a @store_name, will only fetch the workitems in that store.

# File lib/ruote/part/storage_participant.rb, line 446
def fetch_all(opts)

  @context.storage.get_many(
    'workitems',
    @store_name ? /^wi!#{@store_name}::/ : nil,
    opts)
end
remove_workitem(action, workitem) click to toggle source
# File lib/ruote/part/storage_participant.rb, line 499
def remove_workitem(action, workitem)

  hwi = fetch(workitem)

  fail ArgumentError.new(
    "cannot #{action}, workitem not found"
  ) if hwi == nil

  fail ArgumentError.new(
    "cannot #{action}, " +
    "workitem is owned by '#{hwi['owner']}', not '#{workitem.owner}'"
  ) if hwi['owner'] && hwi['owner'] != workitem.owner

  r = @context.storage.delete(hwi)

  fail ArgumentError.new(
    "cannot #{action}, workitem is gone"
  ) if r == true

  r
end
to_id(fei) click to toggle source

Computes the id for the document representing the document in the storage.

# File lib/ruote/part/storage_participant.rb, line 476
def to_id(fei)

  a = [ Ruote.to_storage_id(fei) ]

  a.unshift(@store_name) if @store_name

  a.unshift('wi')

  a.join('!')
end
wi(hwi) click to toggle source
# File lib/ruote/part/storage_participant.rb, line 487
def wi(hwi)

  hwi ? Ruote::Workitem.new(hwi) : nil
end
wis(workitems_or_count) click to toggle source
# File lib/ruote/part/storage_participant.rb, line 492
def wis(workitems_or_count)

  workitems_or_count.is_a?(Array) ?
    workitems_or_count.collect { |wi| Ruote::Workitem.new(wi) } :
    workitems_or_count
end

[Validate]

Generated with the Darkfish Rdoc Generator 2.