Object
A CouchDB storage mechanism for ruote.
The storage merely ‘routes’ work to Ruote::Couch::Database instances, one per document ‘type’ (expressions, msgs, schedules, variables, …)
Hooks the storage to a CouchDB instance.
The main option is ‘couch_prefix’, which indicate which prefix should be added to all the database names used by this storage. ‘prefix’ is accepted as well.
# File lib/ruote/couch/storage.rb, line 53
53: def initialize (*args)
54:
55: hc = Rufus::Jig::HttpCore.new(*args)
56: # leverage the argument parsing logic in there
57:
58: @host = hc.host
59: @port = hc.port
60:
61: @options = hc.options
62:
63: @prefix = hc.options['couch_prefix'] || hc.options['prefix'] || ''
64: @prefix = "#{@prefix}_" if @prefix.size > 0
65:
66: @dbs = {}
67:
68: ] msgs configurations variables ].each do |type|
69:
70: @dbs[type] = Database.new(
71: @host, @port, type, "#{@prefix}ruote_#{type}", @options)
72: end
73:
74: ] errors expressions schedules ].each do |type|
75:
76: @dbs[type] = WfidIndexedDatabase.new(
77: @host, @port, type, "#{@prefix}ruote_#{type}", @options)
78: end
79:
80: @dbs['workitems'] = WorkitemDatabase.new(
81: @host, @port, 'workitems', "#{@prefix}ruote_workitems", @options)
82:
83: put_configuration
84:
85: @msgs_thread = nil
86: @msgs_queue = ::Queue.new
87:
88: @schedules_thread = nil
89: @schedules_queue = ::Queue.new
90: @schedules = nil
91: end
Mainly used by ruote’s test/unit/ut_17_storage.rb
# File lib/ruote/couch/storage.rb, line 143
143: def add_type (type)
144:
145: @dbs[type] = Database.new(
146: #@host, @port, type, "#{@prefix}ruote_#{type}", false)
147: @host, @port, type, "#{@prefix}ruote_#{type}")
148: end
# File lib/ruote/couch/storage.rb, line 169
169: def by_field (type, field, value=nil)
170:
171: raise NotImplementedError if type != 'workitems'
172:
173: @dbs['workitems'].by_field(field, value)
174: end
A provision made for workitems, allow to query them directly by participant name.
# File lib/ruote/couch/storage.rb, line 162
162: def by_participant (type, participant_name, opts)
163:
164: raise NotImplementedError if type != 'workitems'
165:
166: @dbs['workitems'].by_participant(participant_name, opts)
167: end
# File lib/ruote/couch/storage.rb, line 103
103: def delete (doc)
104:
105: db = @dbs[doc['type']]
106:
107: raise ArgumentError.new("no database for type '#{doc['type']}'") unless db
108:
109: db.delete(doc)
110: end
# File lib/ruote/couch/storage.rb, line 128
128: def dump (type)
129:
130: @dbs[type].dump
131: end
# File lib/ruote/couch/storage.rb, line 98
98: def get (type, key)
99:
100: @dbs[type].get(key)
101: end
# File lib/ruote/couch/storage.rb, line 112
112: def get_many (type, key=nil, opts={})
113:
114: @dbs[type].get_many(key, opts)
115: end
Overwriting Ruote::StorageBase.get_msgs
Taking care of using long-polling (wiki.apache.org/couchdb/HTTP_database_API) when possible
# File lib/ruote/couch/storage.rb, line 190
190: def get_msgs
191:
192: mt = @msgs_thread
193:
194: ensure_msgs_thread_is_running
195:
196: msgs = []
197: 2.times { msgs = get_many('msgs') } if mt != @msgs_thread
198: #
199: # seems necessary to avoid any msgs leak :-(
200:
201: while @msgs_queue.size > 0
202: msgs << @msgs_queue.pop
203: end
204:
205: msgs
206: end
# File lib/ruote/couch/storage.rb, line 208
208: def get_schedules (delta, now)
209:
210: ensure_schedules_thread_is_running
211:
212: if @schedules.nil?
213:
214: # NOTE : the problem with this approach is that ALL the schedules
215: # are stored in memory. Most of the time it's not a problem, but
216: # for people will lots of schedules...
217:
218: @schedules = get_many('schedules')
219: @schedules = @schedules.inject({}) { |h, s| h[s['_id']] = s; h }
220: end
221:
222: while @schedules_queue.size > 0
223:
224: deleted, s = @schedules_queue.pop
225:
226: next unless s
227:
228: if deleted
229: @schedules.delete(s['_id'])
230: else
231: @schedules[s['_id']] = s
232: end
233: end
234:
235: filter_schedules(@schedules.values.reject { |sch| sch['at'].nil? }, now)
236: end
# File lib/ruote/couch/storage.rb, line 117
117: def ids (type)
118:
119: @dbs[type].ids
120: end
# File lib/ruote/couch/storage.rb, line 122
122: def purge!
123:
124: @dbs.values.each { |db| db.purge! }
125: #@dbs.values.each { |db| db.shutdown }
126: end
Nukes a db type and reputs it (losing all the documents that were in it).
# File lib/ruote/couch/storage.rb, line 152
152: def purge_type! (type)
153:
154: if db = @dbs[type]
155: db.purge!
156: end
157: end
# File lib/ruote/couch/storage.rb, line 93
93: def put (doc, opts={})
94:
95: @dbs[doc['type']].put(doc, opts)
96: end
# File lib/ruote/couch/storage.rb, line 176
176: def query_workitems (criteria)
177:
178: count = criteria.delete('count')
179:
180: result = @dbs['workitems'].query_workitems(criteria)
181:
182: count ? result.size : result.collect { |h| Ruote::Workitem.new(h) }
183: end
# File lib/ruote/couch/storage.rb, line 249
249: def ensure_msgs_thread_is_running
250:
251: status = @msgs_thread ? @msgs_thread.status : 1
252: return if status == 'run' || status == 'sleep'
253:
254: @msgs_thread = Thread.new do
255: @dbs['msgs'].couch.on_change do |_, deleted, doc|
256: @msgs_queue << doc unless deleted
257: end
258: end
259: end
# File lib/ruote/couch/storage.rb, line 261
261: def ensure_schedules_thread_is_running
262:
263: status = @schedules_thread ? @schedules_thread.status : 1
264: return if status == 'run' || status == 'sleep'
265:
266: @schedules_thread = Thread.new do
267: @dbs['schedules'].couch.on_change do |_, deleted, doc|
268: @schedules_queue << [ deleted, doc ]
269: end
270: end
271: end
Disabled; run with --debug to generate this.
Generated with the Darkfish Rdoc Generator 1.1.6.