Parent

Included Modules

Ruote::Beanstalk::Storage

This ruote storage can be used in two modes : client and server.

Beanstalk is the medium.

client

The storage is pointed at a beanstalk queue

  engine = Ruote::Engine.new(
    Ruote::Worker.new(
      Ruote::Beanstalk::Storage.new('127.0.0.1:11300', opts)))

All the operations(put, get, get_many, …) of the storage are done by a server, connected to the same beanstalk queue.

server

The storage point to a beanstalk queue and receives orders from clients via the queue.

  Ruote::Beanstalk::Storage.new(':11300', 'ruote_work', :fork => true)

Note the directory passed as a string. When in server mode, this storage uses an embedded Ruote::FsStorage for the actual storage.

The :fork => true lets the storage start and adjacent OS process containing the Beanstalk server. The storage takes care of stopping the beanstalk server when the Ruby process exits.

Constants

CONN_KEY
TUBE_NAME
COMMANDS

Public Class Methods

new(uri, directory=nil, options=nil) click to toggle source
     # File lib/ruote/beanstalk/storage.rb, line 73
 73:     def initialize(uri, directory=nil, options=nil)
 74: 
 75:       @uri, address, port = split_uri(uri)
 76: 
 77:       directory, @options = if directory.nil?
 78:         [ nil, {} ]
 79:       elsif directory.is_a?(Hash)
 80:         [ nil, directory ]
 81:       else
 82:         [ directory, options || {} ]
 83:       end
 84: 
 85:       @cloche = nil
 86: 
 87:       if directory
 88:         #
 89:         # run embedded Ruote::FsStorage
 90: 
 91:         require 'rufus/cloche'
 92: 
 93:         FileUtils.mkdir_p(directory)
 94: 
 95:         @cloche = Rufus::Cloche.new(
 96:           :dir => directory, :nolock => @options['cloche_nolock'])
 97:       end
 98: 
 99:       if fork_opts = @options[:fork]
100:         #
101:         # run beanstalk in a forked process
102: 
103:         fork_opts = fork_opts.is_a?(Hash) ? fork_opts : {}
104:         fork_opts = { :address => address, :port => port }.merge(fork_opts)
105: 
106:         Ruote::Beanstalk.fork(fork_opts)
107: 
108:         sleep 0.1
109:       end
110: 
111:       put_configuration
112: 
113:       serve if @cloche
114:     end

Public Instance Methods

add_type(type) click to toggle source

Mainly used by ruote’s test/unit/ut_17_storage.rb

     # File lib/ruote/beanstalk/storage.rb, line 212
212:     def add_type(type)
213: 
214:       # nothing to do
215:     end
close() click to toggle source
     # File lib/ruote/beanstalk/storage.rb, line 205
205:     def close
206: 
207:       shutdown
208:     end
delete(doc) click to toggle source
     # File lib/ruote/beanstalk/storage.rb, line 152
152:     def delete(doc)
153: 
154:       return @cloche.delete(doc) if @cloche
155: 
156:       operate('delete', [ doc ])
157:     end
dump(type) click to toggle source
     # File lib/ruote/beanstalk/storage.rb, line 189
189:     def dump(type)
190: 
191:       get_many(type)
192:     end
get(type, key) click to toggle source
     # File lib/ruote/beanstalk/storage.rb, line 145
145:     def get(type, key)
146: 
147:       return @cloche.get(type, key) if @cloche
148: 
149:       operate('get', [ type, key ])
150:     end
get_many(type, key=nil, opts={}) click to toggle source
     # File lib/ruote/beanstalk/storage.rb, line 159
159:     def get_many(type, key=nil, opts={})
160: 
161:       return operate('get_many', [ type, key, opts ]) unless @cloche
162: 
163:       if key
164:         key = Array(key).collect { |k|
165:           k[0..6] == '(?-mix:' ? Regexp.new(k[7..2]) : "!#{k}"
166:         } if key
167:       end
168:         # assuming /!#{wfid}$/...
169: 
170:       @cloche.get_many(type, key, opts)
171:     end
get_msgs() click to toggle source

One catch : will return [] in case of [network] error

     # File lib/ruote/beanstalk/storage.rb, line 118
118:     def get_msgs
119: 
120:       super rescue []
121:     end
ids(type) click to toggle source
     # File lib/ruote/beanstalk/storage.rb, line 173
173:     def ids(type)
174: 
175:       return @cloche.ids(type) if @cloche
176: 
177:       operate('ids', [ type ])
178:     end
purge!() click to toggle source
     # File lib/ruote/beanstalk/storage.rb, line 180
180:     def purge!
181: 
182:       if @cloche
183:         FileUtils.rm_rf(@cloche.dir)
184:       else
185:         operate('purge!', [])
186:       end
187:     end
purge_type!(type) click to toggle source

Nukes a db type and reputs it(losing all the documents that were in it).

     # File lib/ruote/beanstalk/storage.rb, line 219
219:     def purge_type!(type)
220: 
221:       if @cloche
222:         @cloche.purge_type!(type)
223:       else
224:         operate('purge_type!', [ type ])
225:       end
226:     end
put(doc, opts={}) click to toggle source
     # File lib/ruote/beanstalk/storage.rb, line 130
130:     def put(doc, opts={})
131: 
132:       doc.merge!('put_at' => Ruote.now_to_utc_s)
133: 
134:       return @cloche.put(doc, opts) if @cloche
135: 
136:       r = operate('put', [ doc ])
137: 
138:       return r unless r.nil?
139: 
140:       doc['_rev'] =(doc['_rev'] || 1) + 1 if opts[:update_rev]
141: 
142:       nil
143:     end
reserve(doc) click to toggle source

One catch : will return true (failure) in case of [network] error

     # File lib/ruote/beanstalk/storage.rb, line 125
125:     def reserve(doc)
126: 
127:       super(doc) rescue true
128:     end
shutdown() click to toggle source
     # File lib/ruote/beanstalk/storage.rb, line 194
194:     def shutdown
195: 
196:       Thread.list.each do |t|
197:         t.keys.each do |k|
198:           next unless k.to_s.match(CONN_KEY)
199:           t[k].close
200:           t[k] = nil
201:         end
202:       end
203:     end

Protected Instance Methods

connection() click to toggle source
     # File lib/ruote/beanstalk/storage.rb, line 244
244:     def connection
245: 
246:       c = Thread.current[CONN_KEY]
247: 
248:       #begin
249:       #  c.stats
250:       #  return c
251:       #rescue Exception => e
252:       #  c = nil
253:       #end if c
254:         # keeping around the idea around
255: 
256:       return c if c
257: 
258:       c = ::Beanstalk::Connection.new(@uri, TUBE_NAME)
259:       c.ignore('default')
260: 
261:       Thread.current[CONN_KEY] = c
262:     end
operate(command, params) click to toggle source
     # File lib/ruote/beanstalk/storage.rb, line 275
275:     def operate(command, params)
276: 
277:       client_id = "BsStorage-#{Thread.current.object_id}-#{$$}"
278:       timestamp = Time.now.to_f.to_s
279: 
280:       con = connection
281: 
282:       con.put(Rufus::Json.encode([ command, params, client_id, timestamp ]))
283: 
284:       con.watch(client_id)
285:       con.ignore(TUBE_NAME)
286: 
287:       result = nil
288: 
289:       loop do
290: 
291:         job = nil
292:         begin
293:           job = con.reserve
294:         rescue Exception => e
295:           # probably our timeout
296:           break
297:         end
298:         job.delete
299: 
300:         result, ts = Rufus::Json.decode(job.body)
301: 
302:         break if ts == timestamp # hopefully
303:       end
304: 
305:       if result.is_a?(Array) && result.first == 'error'
306:         raise ArgumentError.new(result.last) if result[1] == 'ArgumentError'
307:         raise StorageError.new(result.last)
308:       end
309: 
310:       result
311:     end
put_configuration() click to toggle source

Don’t put configuration if it’s already in

(avoid storages from trashing configuration...)

     # File lib/ruote/beanstalk/storage.rb, line 268
268:     def put_configuration
269: 
270:       return if get('configurations', 'engine')
271: 
272:       put({ '_id' => 'engine', 'type' => 'configurations' }.merge(@options))
273:     end
serve() click to toggle source
     # File lib/ruote/beanstalk/storage.rb, line 315
315:     def serve
316: 
317:       con = connection
318: 
319:       loop do
320: 
321:         job = con.reserve
322:         job.delete
323: 
324:         command, params, client_id, timestamp = Rufus::Json.decode(job.body)
325: 
326:         result = begin
327: 
328:           if COMMANDS.include?(command)
329:             send(command, *params)
330:           else
331:             [ 'error', 'UnknownCommand', command ]
332:           end
333: 
334:         rescue Exception => e
335:           #p e
336:           #e.backtrace.each { |l| puts l }
337:           [ 'error', e.class.to_s, e.to_s ]
338:         end
339: 
340:         con.use(client_id)
341:         con.put(Rufus::Json.encode([ result, timestamp ]))
342:       end
343:     end
split_uri(uri) click to toggle source
     # File lib/ruote/beanstalk/storage.rb, line 233
233:     def split_uri(uri)
234: 
235:       uri = ':' if uri == ''
236: 
237:       address, port = uri.split(':')
238:       address = '127.0.0.1' if address.strip == ''
239:       port = 11300 if port.strip == ''
240: 
241:       [ "#{address}:#{port}", address, port ]
242:     end

Disabled; run with --debug to generate this.

[Validate]

Generated with the Darkfish Rdoc Generator 1.1.6.