Parent

Included Modules

Ruote::Couch::CouchStorage

A CouchDB storage mechanism for ruote.

The storage merely ‘routes’ work to Ruote::Couch::Database instances, one per document ‘type’ (expressions, msgs, schedules, variables, …)

Attributes

couch[R]

Public Class Methods

new(*args) click to toggle source

Hooks the storage to a CouchDB instance.

The main option is ‘couch_prefix’, which indicate which prefix should be added to all the database names used by this storage. ‘prefix’ is accepted as well.

    # File lib/ruote/couch/storage.rb, line 53
53:     def initialize (*args)
54: 
55:       hc = Rufus::Jig::HttpCore.new(*args)
56:         # leverage the argument parsing logic in there
57: 
58:       @host = hc.host
59:       @port = hc.port
60: 
61:       @options = hc.options
62: 
63:       @prefix = hc.options['couch_prefix'] || hc.options['prefix'] || ''
64:       @prefix = "#{@prefix}_" if @prefix.size > 0
65: 
66:       @dbs = {}
67: 
68:       ] msgs configurations variables ].each do |type|
69: 
70:         @dbs[type] = Database.new(
71:           @host, @port, type, "#{@prefix}ruote_#{type}", @options)
72:       end
73: 
74:       ] errors expressions schedules ].each do |type|
75: 
76:         @dbs[type] = WfidIndexedDatabase.new(
77:           @host, @port, type, "#{@prefix}ruote_#{type}", @options)
78:       end
79: 
80:       @dbs['workitems'] = WorkitemDatabase.new(
81:         @host, @port, 'workitems', "#{@prefix}ruote_workitems", @options)
82: 
83:       put_configuration
84: 
85:       @msgs_thread = nil
86:       @msgs_queue = ::Queue.new
87: 
88:       @schedules_thread = nil
89:       @schedules_queue = ::Queue.new
90:       @schedules = nil
91:     end

Public Instance Methods

add_type(type) click to toggle source

Mainly used by ruote’s test/unit/ut_17_storage.rb

     # File lib/ruote/couch/storage.rb, line 143
143:     def add_type (type)
144: 
145:       @dbs[type] = Database.new(
146:         #@host, @port, type, "#{@prefix}ruote_#{type}", false)
147:         @host, @port, type, "#{@prefix}ruote_#{type}")
148:     end
by_field(type, field, value=nil) click to toggle source
     # File lib/ruote/couch/storage.rb, line 169
169:     def by_field (type, field, value=nil)
170: 
171:       raise NotImplementedError if type != 'workitems'
172: 
173:       @dbs['workitems'].by_field(field, value)
174:     end
by_participant(type, participant_name, opts) click to toggle source

A provision made for workitems, allow to query them directly by participant name.

     # File lib/ruote/couch/storage.rb, line 162
162:     def by_participant (type, participant_name, opts)
163: 
164:       raise NotImplementedError if type != 'workitems'
165: 
166:       @dbs['workitems'].by_participant(participant_name, opts)
167:     end
delete(doc) click to toggle source
     # File lib/ruote/couch/storage.rb, line 103
103:     def delete (doc)
104: 
105:       db = @dbs[doc['type']]
106: 
107:       raise ArgumentError.new("no database for type '#{doc['type']}'") unless db
108: 
109:       db.delete(doc)
110:     end
dump(type) click to toggle source
     # File lib/ruote/couch/storage.rb, line 128
128:     def dump (type)
129: 
130:       @dbs[type].dump
131:     end
get(type, key) click to toggle source
     # File lib/ruote/couch/storage.rb, line 98
 98:     def get (type, key)
 99: 
100:       @dbs[type].get(key)
101:     end
get_many(type, key=nil, opts={}) click to toggle source
     # File lib/ruote/couch/storage.rb, line 112
112:     def get_many (type, key=nil, opts={})
113: 
114:       @dbs[type].get_many(key, opts)
115:     end
get_msgs() click to toggle source

Overwriting Ruote::StorageBase.get_msgs

Taking care of using long-polling (wiki.apache.org/couchdb/HTTP_database_API) when possible

     # File lib/ruote/couch/storage.rb, line 190
190:     def get_msgs
191: 
192:       mt = @msgs_thread
193: 
194:       ensure_msgs_thread_is_running
195: 
196:       msgs = []
197:       2.times { msgs = get_many('msgs') } if mt != @msgs_thread
198:         #
199:         # seems necessary to avoid any msgs leak :-(
200: 
201:       while @msgs_queue.size > 0
202:         msgs << @msgs_queue.pop
203:       end
204: 
205:       msgs
206:     end
get_schedules(delta, now) click to toggle source
     # File lib/ruote/couch/storage.rb, line 208
208:     def get_schedules (delta, now)
209: 
210:       ensure_schedules_thread_is_running
211: 
212:       if @schedules.nil?
213: 
214:         # NOTE : the problem with this approach is that ALL the schedules
215:         # are stored in memory. Most of the time it's not a problem, but
216:         # for people will lots of schedules...
217: 
218:         @schedules = get_many('schedules')
219:         @schedules = @schedules.inject({}) { |h, s| h[s['_id']] = s; h }
220:       end
221: 
222:       while @schedules_queue.size > 0
223: 
224:         deleted, s = @schedules_queue.pop
225: 
226:         next unless s
227: 
228:         if deleted
229:           @schedules.delete(s['_id'])
230:         else
231:           @schedules[s['_id']] = s
232:         end
233:       end
234: 
235:       filter_schedules(@schedules.values.reject { |sch| sch['at'].nil? }, now)
236:     end
ids(type) click to toggle source
     # File lib/ruote/couch/storage.rb, line 117
117:     def ids (type)
118: 
119:       @dbs[type].ids
120:     end
purge!() click to toggle source
     # File lib/ruote/couch/storage.rb, line 122
122:     def purge!
123: 
124:       @dbs.values.each { |db| db.purge! }
125:       #@dbs.values.each { |db| db.shutdown }
126:     end
purge_type!(type) click to toggle source

Nukes a db type and reputs it (losing all the documents that were in it).

     # File lib/ruote/couch/storage.rb, line 152
152:     def purge_type! (type)
153: 
154:       if db = @dbs[type]
155:         db.purge!
156:       end
157:     end
put(doc, opts={}) click to toggle source
    # File lib/ruote/couch/storage.rb, line 93
93:     def put (doc, opts={})
94: 
95:       @dbs[doc['type']].put(doc, opts)
96:     end
query_workitems(criteria) click to toggle source
     # File lib/ruote/couch/storage.rb, line 176
176:     def query_workitems (criteria)
177: 
178:       count = criteria.delete('count')
179: 
180:       result = @dbs['workitems'].query_workitems(criteria)
181: 
182:       count ? result.size : result.collect { |h| Ruote::Workitem.new(h) }
183:     end
shutdown() click to toggle source
     # File lib/ruote/couch/storage.rb, line 133
133:     def shutdown
134: 
135:       @dbs.values.each { |db| db.shutdown }
136: 
137:       @msgs_thread.kill rescue nil
138:       @schedules_thread.kill rescue nil
139:     end

Protected Instance Methods

ensure_msgs_thread_is_running() click to toggle source
     # File lib/ruote/couch/storage.rb, line 249
249:     def ensure_msgs_thread_is_running
250: 
251:       status = @msgs_thread ? @msgs_thread.status : 1
252:       return if status == 'run' || status == 'sleep'
253: 
254:       @msgs_thread = Thread.new do
255:         @dbs['msgs'].couch.on_change do |_, deleted, doc|
256:           @msgs_queue << doc unless deleted
257:         end
258:       end
259:     end
ensure_schedules_thread_is_running() click to toggle source
     # File lib/ruote/couch/storage.rb, line 261
261:     def ensure_schedules_thread_is_running
262: 
263:       status = @schedules_thread ? @schedules_thread.status : 1
264:       return if status == 'run' || status == 'sleep'
265: 
266:       @schedules_thread = Thread.new do
267:         @dbs['schedules'].couch.on_change do |_, deleted, doc|
268:           @schedules_queue << [ deleted, doc ]
269:         end
270:       end
271:     end
put_configuration() click to toggle source
     # File lib/ruote/couch/storage.rb, line 240
240:     def put_configuration
241: 
242:       return if get('configurations', 'engine')
243: 
244:       conf = { '_id' => 'engine', 'type' => 'configurations' }.merge!(@options)
245: 
246:       put(conf)
247:     end

Disabled; run with --debug to generate this.

[Validate]

Generated with the Darkfish Rdoc Generator 1.1.6.