Object
This ruote storage can be used in two modes : client and server.
Beanstalk is the medium.
The storage is pointed at a beanstalk queue
engine = Ruote::Engine.new(
Ruote::Worker.new(
Ruote::Beanstalk::Storage.new('127.0.0.1:11300', opts)))
All the operations(put, get, get_many, …) of the storage are done by a server, connected to the same beanstalk queue.
The storage point to a beanstalk queue and receives orders from clients via the queue.
Ruote::Beanstalk::Storage.new(':11300', 'ruote_work', :fork => true)
Note the directory passed as a string. When in server mode, this storage uses an embedded Ruote::FsStorage for the actual storage.
The :fork => true lets the storage start and adjacent OS process containing the Beanstalk server. The storage takes care of stopping the beanstalk server when the Ruby process exits.
# File lib/ruote/beanstalk/storage.rb, line 73
73: def initialize(uri, directory=nil, options=nil)
74:
75: @uri, address, port = split_uri(uri)
76:
77: directory, @options = if directory.nil?
78: [ nil, {} ]
79: elsif directory.is_a?(Hash)
80: [ nil, directory ]
81: else
82: [ directory, options || {} ]
83: end
84:
85: @cloche = nil
86:
87: if directory
88: #
89: # run embedded Ruote::FsStorage
90:
91: require 'rufus/cloche'
92:
93: FileUtils.mkdir_p(directory)
94:
95: @cloche = Rufus::Cloche.new(
96: :dir => directory, :nolock => @options['cloche_nolock'])
97: end
98:
99: if fork_opts = @options[:fork]
100: #
101: # run beanstalk in a forked process
102:
103: fork_opts = fork_opts.is_a?(Hash) ? fork_opts : {}
104: fork_opts = { :address => address, :port => port }.merge(fork_opts)
105:
106: Ruote::Beanstalk.fork(fork_opts)
107:
108: sleep 0.1
109: end
110:
111: put_configuration
112:
113: serve if @cloche
114: end
Mainly used by ruote’s test/unit/ut_17_storage.rb
# File lib/ruote/beanstalk/storage.rb, line 212
212: def add_type(type)
213:
214: # nothing to do
215: end
# File lib/ruote/beanstalk/storage.rb, line 205
205: def close
206:
207: shutdown
208: end
# File lib/ruote/beanstalk/storage.rb, line 152
152: def delete(doc)
153:
154: return @cloche.delete(doc) if @cloche
155:
156: operate('delete', [ doc ])
157: end
# File lib/ruote/beanstalk/storage.rb, line 189
189: def dump(type)
190:
191: get_many(type)
192: end
# File lib/ruote/beanstalk/storage.rb, line 145
145: def get(type, key)
146:
147: return @cloche.get(type, key) if @cloche
148:
149: operate('get', [ type, key ])
150: end
# File lib/ruote/beanstalk/storage.rb, line 159
159: def get_many(type, key=nil, opts={})
160:
161: return operate('get_many', [ type, key, opts ]) unless @cloche
162:
163: if key
164: key = Array(key).collect { |k|
165: k[0..6] == '(?-mix:' ? Regexp.new(k[7..2]) : "!#{k}"
166: } if key
167: end
168: # assuming /!#{wfid}$/...
169:
170: @cloche.get_many(type, key, opts)
171: end
One catch : will return [] in case of [network] error
# File lib/ruote/beanstalk/storage.rb, line 118
118: def get_msgs
119:
120: super rescue []
121: end
# File lib/ruote/beanstalk/storage.rb, line 173
173: def ids(type)
174:
175: return @cloche.ids(type) if @cloche
176:
177: operate('ids', [ type ])
178: end
# File lib/ruote/beanstalk/storage.rb, line 180
180: def purge!
181:
182: if @cloche
183: FileUtils.rm_rf(@cloche.dir)
184: else
185: operate('purge!', [])
186: end
187: end
Nukes a db type and reputs it(losing all the documents that were in it).
# File lib/ruote/beanstalk/storage.rb, line 219
219: def purge_type!(type)
220:
221: if @cloche
222: @cloche.purge_type!(type)
223: else
224: operate('purge_type!', [ type ])
225: end
226: end
# File lib/ruote/beanstalk/storage.rb, line 130
130: def put(doc, opts={})
131:
132: doc.merge!('put_at' => Ruote.now_to_utc_s)
133:
134: return @cloche.put(doc, opts) if @cloche
135:
136: r = operate('put', [ doc ])
137:
138: return r unless r.nil?
139:
140: doc['_rev'] =(doc['_rev'] || 1) + 1 if opts[:update_rev]
141:
142: nil
143: end
# File lib/ruote/beanstalk/storage.rb, line 244
244: def connection
245:
246: c = Thread.current[CONN_KEY]
247:
248: #begin
249: # c.stats
250: # return c
251: #rescue Exception => e
252: # c = nil
253: #end if c
254: # keeping around the idea around
255:
256: return c if c
257:
258: c = ::Beanstalk::Connection.new(@uri, TUBE_NAME)
259: c.ignore('default')
260:
261: Thread.current[CONN_KEY] = c
262: end
# File lib/ruote/beanstalk/storage.rb, line 275
275: def operate(command, params)
276:
277: client_id = "BsStorage-#{Thread.current.object_id}-#{$$}"
278: timestamp = Time.now.to_f.to_s
279:
280: con = connection
281:
282: con.put(Rufus::Json.encode([ command, params, client_id, timestamp ]))
283:
284: con.watch(client_id)
285: con.ignore(TUBE_NAME)
286:
287: result = nil
288:
289: loop do
290:
291: job = nil
292: begin
293: job = con.reserve
294: rescue Exception => e
295: # probably our timeout
296: break
297: end
298: job.delete
299:
300: result, ts = Rufus::Json.decode(job.body)
301:
302: break if ts == timestamp # hopefully
303: end
304:
305: if result.is_a?(Array) && result.first == 'error'
306: raise ArgumentError.new(result.last) if result[1] == 'ArgumentError'
307: raise StorageError.new(result.last)
308: end
309:
310: result
311: end
Don’t put configuration if it’s already in
(avoid storages from trashing configuration...)
# File lib/ruote/beanstalk/storage.rb, line 268
268: def put_configuration
269:
270: return if get('configurations', 'engine')
271:
272: put({ '_id' => 'engine', 'type' => 'configurations' }.merge(@options))
273: end
# File lib/ruote/beanstalk/storage.rb, line 315
315: def serve
316:
317: con = connection
318:
319: loop do
320:
321: job = con.reserve
322: job.delete
323:
324: command, params, client_id, timestamp = Rufus::Json.decode(job.body)
325:
326: result = begin
327:
328: if COMMANDS.include?(command)
329: send(command, *params)
330: else
331: [ 'error', 'UnknownCommand', command ]
332: end
333:
334: rescue Exception => e
335: #p e
336: #e.backtrace.each { |l| puts l }
337: [ 'error', e.class.to_s, e.to_s ]
338: end
339:
340: con.use(client_id)
341: con.put(Rufus::Json.encode([ result, timestamp ]))
342: end
343: end
# File lib/ruote/beanstalk/storage.rb, line 233
233: def split_uri(uri)
234:
235: uri = ':' if uri == ''
236:
237: address, port = uri.split(':')
238: address = '127.0.0.1' if address.strip == ''
239: port = 11300 if port.strip == ''
240:
241: [ "#{address}:#{port}", address, port ]
242: end
Disabled; run with --debug to generate this.
Generated with the Darkfish Rdoc Generator 1.1.6.