Parent

Included Modules

Class Index [+]

Quicksearch

Ruote::Redis::RedisStorage

 A Redis storage for ruote.

 The constructor accepts two arguments, the first one is a Redis instance

( see github.com/ezmobius/redis-rb ), the second one is the classic

 ruote engine options( see
 http://ruote.rubyforge.org/configuration.html#engine )

   require 'redis' # gem install redis
   require 'ruote' # gem install ruote
   require 'ruote-redis' # gem install ruote-redis

   engine = Ruote::Engine.new(
     Ruote::Worker.new(
       Ruote::Redis::RedisStorage.new(
         ::Redis.new(:db => 14, :thread_safe => true), {})))

 == em-redis

 Not tried, but I guess, that substituting an instance of em-redis for
 the redis instance passed to the constructor might work.
 http://github.com/madsimian/em-redis

 If you try and it works, feedback is welcome
 http://groups.google.com/group/openwferu-users

Constants

LOCK_KEY

Attributes

redis[R]

Public Class Methods

new(redis, options={}) click to toggle source

A Redis storage for ruote.

    # File lib/ruote/redis/storage.rb, line 71
71:     def initialize(redis, options={})
72: 
73:       @redis = redis
74:       @options = options
75: 
76:       def @redis.keys_to_a(opt)
77:         r = keys(opt)
78:         r.is_a?(Array) ? r : r.split(' ')
79:       end
80: 
81:       put_configuration
82:     end

Public Instance Methods

add_type(type) click to toggle source

Mainly used by ruote’s test/unit/ut_17_storage.rb

     # File lib/ruote/redis/storage.rb, line 268
268:     def add_type(type)
269:     end
close() click to toggle source
     # File lib/ruote/redis/storage.rb, line 261
261:     def close
262: 
263:       @redis.quit
264:     end
delete(doc) click to toggle source
     # File lib/ruote/redis/storage.rb, line 157
157:     def delete(doc)
158: 
159:       rev = doc['_rev']
160: 
161:       raise ArgumentError.new("can't delete doc without _rev") unless rev
162: 
163:       key = key_for(doc)
164: 
165:       lock(key) do
166: 
167:         current_doc = do_get(key)
168: 
169:         if current_doc.nil?
170:           #
171:           # document is [already] gone, delete fails (return true)
172:           #
173:           true
174: 
175:         elsif current_doc['_rev'] != rev
176:           #
177:           # version in storage doesn't match version to delete
178:           # (return version in storage)
179:           #
180:           current_doc
181: 
182:         else
183:           #
184:           # delete is successful (return nil)
185:           #
186:           @redis.del(key)
187: 
188:           nil
189:         end
190:       end
191:     end
delete_schedule(schedule_id) click to toggle source
     # File lib/ruote/redis/storage.rb, line 111
111:     def delete_schedule(schedule_id)
112: 
113:       @redis.del(key_for('schedules', schedule_id))
114:     end
dump(type) click to toggle source

Returns a String containing a representation of the current content of in this Redis storage.

     # File lib/ruote/redis/storage.rb, line 256
256:     def dump(type)
257: 
258:       @redis.keys_to_a("#{type}/*").sort.join("\n")
259:     end
get(type, key) click to toggle source
     # File lib/ruote/redis/storage.rb, line 152
152:     def get(type, key)
153: 
154:       do_get(key_for(type, key))
155:     end
get_many(type, key=nil, opts={}) click to toggle source
     # File lib/ruote/redis/storage.rb, line 193
193:     def get_many(type, key=nil, opts={})
194: 
195:       keys = key ? Array(key) : nil
196: 
197:       #ids = if type == 'msgs' || type == 'schedules'
198:       #  @redis.keys_to_a("#{type}/*")
199: 
200:       ids = if keys == nil
201: 
202:         @redis.keys_to_a("#{type}/*")
203: 
204:       elsif keys.first.is_a?(String)
205: 
206:         keys.collect { |k| @redis.keys_to_a("#{type}/*!#{k}") }.flatten
207: 
208:       else #if keys.first.is_a?(Regexp)
209: 
210:         @redis.keys_to_a("#{type}/*").select { |i|
211: 
212:           i = i[type.length + 1..1]
213:             # removing "^type/"
214: 
215:           keys.find { |k| k.match(i) }
216:         }
217:       end
218: 
219:       ids = ids.reject { |i| i.match(LOCK_KEY) }
220:       ids = ids.sort
221:       ids = ids.reverse if opts[:descending]
222: 
223:       skip = opts[:skip] || 0
224:       limit = opts[:limit] || ids.length
225:       ids = ids[skip, limit]
226: 
227:       docs = ids.length > 0 ? @redis.mget(*ids) : []
228:       docs = docs.inject({}) do |h, doc|
229:         if doc
230:           doc = Rufus::Json.decode(doc)
231:           h[doc['_id']] = doc
232:         end
233:         h
234:       end
235: 
236:       opts[:count] ? docs.size : docs.values
237:     end
ids(type) click to toggle source
     # File lib/ruote/redis/storage.rb, line 239
239:     def ids(type)
240: 
241:       @redis.keys_to_a("#{type}/*").reject { |i|
242:         i.match(LOCK_KEY)
243:       }.collect { |i|
244:         i.split('/').last
245:       }.sort
246:     end
purge!() click to toggle source
     # File lib/ruote/redis/storage.rb, line 248
248:     def purge!
249: 
250:       @redis.keys_to_a('*').each { |k| @redis.del(k) }
251:     end
purge_type!(type) click to toggle source

Nukes a db type and reputs it(losing all the documents that were in it).

     # File lib/ruote/redis/storage.rb, line 273
273:     def purge_type!(type)
274: 
275:       @redis.keys_to_a("#{type}/*").each { |k| @redis.del(k) }
276:     end
put(doc, opts={}) click to toggle source
     # File lib/ruote/redis/storage.rb, line 116
116:     def put(doc, opts={})
117: 
118:       key = key_for(doc)
119:       rev = doc['_rev']
120: 
121:       lock(key) do
122: 
123:         current_doc = do_get(key)
124:         current_rev = current_doc ? current_doc['_rev'] : nil
125: 
126:         if current_rev && rev != current_rev
127:           #
128:           # version in storage is newer than version being put,
129:           # (eturn version in storage)
130:           #
131:           current_doc
132: 
133:         elsif rev && current_rev.nil?
134:           #
135:           # document deleted, put fails (return true)
136:           #
137:           true
138: 
139:         else
140:           #
141:           # put is successful (return nil)
142:           #
143:           nrev = (rev.to_i + 1).to_s
144:           @redis.set(key, to_json(doc.merge('_rev' => nrev)))
145:           doc['_rev'] = nrev if opts[:update_rev]
146: 
147:           nil
148:         end
149:       end
150:     end
put_msg(action, options) click to toggle source
    # File lib/ruote/redis/storage.rb, line 89
89:     def put_msg(action, options)
90: 
91:       doc = prepare_msg_doc(action, options)
92: 
93:       puts "XXX" if @redis.nil?
94: 
95:       @redis.set(key_for(doc), to_json(doc))
96: 
97:       nil
98:     end
put_schedule(flavour, owner_fei, s, msg) click to toggle source
     # File lib/ruote/redis/storage.rb, line 100
100:     def put_schedule(flavour, owner_fei, s, msg)
101: 
102:       doc = prepare_schedule_doc(flavour, owner_fei, s, msg)
103: 
104:       return nil unless doc
105: 
106:       @redis.set(key_for(doc), to_json(doc))
107: 
108:       doc['_id']
109:     end
reserve(doc) click to toggle source
    # File lib/ruote/redis/storage.rb, line 84
84:     def reserve(doc)
85: 
86:       @redis.del(key_for(doc))
87:     end

Protected Instance Methods

do_get(key) click to toggle source
     # File lib/ruote/redis/storage.rb, line 328
328:     def do_get(key)
329: 
330:       from_json(@redis.get(key))
331:     end
from_json(s) click to toggle source
     # File lib/ruote/redis/storage.rb, line 333
333:     def from_json(s)
334: 
335:       s ? Rufus::Json.decode(s) : nil
336:     end
key_for(*args) click to toggle source

key_for(doc) key_for(type, key)

     # File lib/ruote/redis/storage.rb, line 321
321:     def key_for(*args)
322: 
323:       a = args.first
324: 
325:      (a.is_a?(Hash) ? [ a['type'], a['_id'] ] : args[0, 2]).join('/')
326:     end
lock(key, &block) click to toggle source

A locking mecha.

Mostly inspired from code.google.com/p/redis/wiki/SetnxCommand

     # File lib/ruote/redis/storage.rb, line 286
286:     def lock(key, &block)
287: 
288:       kl = "#{key}-lock"
289: 
290:       loop do
291: 
292:         r = @redis.setnx(kl, Time.now.to_f.to_s)
293: 
294:         if r == false
295: 
296:           t = @redis.get(kl)
297: 
298:           @redis.del(kl) if t && Time.now.to_f - t.to_f > 60.0
299:             # after 1 minute, locks time out
300: 
301:           sleep 0.007 # let's try to lock again after a while
302:         else
303: 
304:           break # lock acquired
305:         end
306:       end
307: 
308:       #@redis.expire(kl, 2)
309:         # this doesn't work, it makes the next call to setnx succeed
310: 
311:       result = block.call
312: 
313:       @redis.del(kl)
314: 
315:       result
316:     end
put_configuration() click to toggle source

Don’t put configuration if it’s already in

(prevent storages from trashing configuration...)

     # File lib/ruote/redis/storage.rb, line 348
348:     def put_configuration
349: 
350:       return if get('configurations', 'engine')
351: 
352:       put({ '_id' => 'engine', 'type' => 'configurations' }.merge(@options))
353:     end
to_json(doc, opts={}) click to toggle source
     # File lib/ruote/redis/storage.rb, line 338
338:     def to_json(doc, opts={})
339: 
340:       Rufus::Json.encode(
341:         opts[:delete] ? nil : doc.merge('put_at' => Ruote.now_to_utc_s))
342:     end

Disabled; run with --debug to generate this.

[Validate]

Generated with the Darkfish Rdoc Generator 1.1.6.