Object
A Redis storage for ruote. The constructor accepts two arguments, the first one is a Redis instance
( see github.com/ezmobius/redis-rb ), the second one is the classic
ruote engine options( see
http://ruote.rubyforge.org/configuration.html#engine )
require 'redis' # gem install redis
require 'ruote' # gem install ruote
require 'ruote-redis' # gem install ruote-redis
engine = Ruote::Engine.new(
Ruote::Worker.new(
Ruote::Redis::RedisStorage.new(
::Redis.new(:db => 14, :thread_safe => true), {})))
== em-redis
Not tried, but I guess, that substituting an instance of em-redis for
the redis instance passed to the constructor might work.
http://github.com/madsimian/em-redis
If you try and it works, feedback is welcome
http://groups.google.com/group/openwferu-users
A Redis storage for ruote.
# File lib/ruote/redis/storage.rb, line 71
71: def initialize(redis, options={})
72:
73: @redis = redis
74: @options = options
75:
76: def @redis.keys_to_a(opt)
77: r = keys(opt)
78: r.is_a?(Array) ? r : r.split(' ')
79: end
80:
81: put_configuration
82: end
Mainly used by ruote’s test/unit/ut_17_storage.rb
# File lib/ruote/redis/storage.rb, line 268
268: def add_type(type)
269: end
# File lib/ruote/redis/storage.rb, line 261
261: def close
262:
263: @redis.quit
264: end
# File lib/ruote/redis/storage.rb, line 157
157: def delete(doc)
158:
159: rev = doc['_rev']
160:
161: raise ArgumentError.new("can't delete doc without _rev") unless rev
162:
163: key = key_for(doc)
164:
165: lock(key) do
166:
167: current_doc = do_get(key)
168:
169: if current_doc.nil?
170: #
171: # document is [already] gone, delete fails (return true)
172: #
173: true
174:
175: elsif current_doc['_rev'] != rev
176: #
177: # version in storage doesn't match version to delete
178: # (return version in storage)
179: #
180: current_doc
181:
182: else
183: #
184: # delete is successful (return nil)
185: #
186: @redis.del(key)
187:
188: nil
189: end
190: end
191: end
# File lib/ruote/redis/storage.rb, line 111
111: def delete_schedule(schedule_id)
112:
113: @redis.del(key_for('schedules', schedule_id))
114: end
Returns a String containing a representation of the current content of in this Redis storage.
# File lib/ruote/redis/storage.rb, line 256
256: def dump(type)
257:
258: @redis.keys_to_a("#{type}/*").sort.join("\n")
259: end
# File lib/ruote/redis/storage.rb, line 152
152: def get(type, key)
153:
154: do_get(key_for(type, key))
155: end
# File lib/ruote/redis/storage.rb, line 193
193: def get_many(type, key=nil, opts={})
194:
195: keys = key ? Array(key) : nil
196:
197: #ids = if type == 'msgs' || type == 'schedules'
198: # @redis.keys_to_a("#{type}/*")
199:
200: ids = if keys == nil
201:
202: @redis.keys_to_a("#{type}/*")
203:
204: elsif keys.first.is_a?(String)
205:
206: keys.collect { |k| @redis.keys_to_a("#{type}/*!#{k}") }.flatten
207:
208: else #if keys.first.is_a?(Regexp)
209:
210: @redis.keys_to_a("#{type}/*").select { |i|
211:
212: i = i[type.length + 1..1]
213: # removing "^type/"
214:
215: keys.find { |k| k.match(i) }
216: }
217: end
218:
219: ids = ids.reject { |i| i.match(LOCK_KEY) }
220: ids = ids.sort
221: ids = ids.reverse if opts[:descending]
222:
223: skip = opts[:skip] || 0
224: limit = opts[:limit] || ids.length
225: ids = ids[skip, limit]
226:
227: docs = ids.length > 0 ? @redis.mget(*ids) : []
228: docs = docs.inject({}) do |h, doc|
229: if doc
230: doc = Rufus::Json.decode(doc)
231: h[doc['_id']] = doc
232: end
233: h
234: end
235:
236: opts[:count] ? docs.size : docs.values
237: end
# File lib/ruote/redis/storage.rb, line 239
239: def ids(type)
240:
241: @redis.keys_to_a("#{type}/*").reject { |i|
242: i.match(LOCK_KEY)
243: }.collect { |i|
244: i.split('/').last
245: }.sort
246: end
# File lib/ruote/redis/storage.rb, line 248
248: def purge!
249:
250: @redis.keys_to_a('*').each { |k| @redis.del(k) }
251: end
Nukes a db type and reputs it(losing all the documents that were in it).
# File lib/ruote/redis/storage.rb, line 273
273: def purge_type!(type)
274:
275: @redis.keys_to_a("#{type}/*").each { |k| @redis.del(k) }
276: end
# File lib/ruote/redis/storage.rb, line 116
116: def put(doc, opts={})
117:
118: key = key_for(doc)
119: rev = doc['_rev']
120:
121: lock(key) do
122:
123: current_doc = do_get(key)
124: current_rev = current_doc ? current_doc['_rev'] : nil
125:
126: if current_rev && rev != current_rev
127: #
128: # version in storage is newer than version being put,
129: # (eturn version in storage)
130: #
131: current_doc
132:
133: elsif rev && current_rev.nil?
134: #
135: # document deleted, put fails (return true)
136: #
137: true
138:
139: else
140: #
141: # put is successful (return nil)
142: #
143: nrev = (rev.to_i + 1).to_s
144: @redis.set(key, to_json(doc.merge('_rev' => nrev)))
145: doc['_rev'] = nrev if opts[:update_rev]
146:
147: nil
148: end
149: end
150: end
# File lib/ruote/redis/storage.rb, line 89
89: def put_msg(action, options)
90:
91: doc = prepare_msg_doc(action, options)
92:
93: puts "XXX" if @redis.nil?
94:
95: @redis.set(key_for(doc), to_json(doc))
96:
97: nil
98: end
# File lib/ruote/redis/storage.rb, line 100
100: def put_schedule(flavour, owner_fei, s, msg)
101:
102: doc = prepare_schedule_doc(flavour, owner_fei, s, msg)
103:
104: return nil unless doc
105:
106: @redis.set(key_for(doc), to_json(doc))
107:
108: doc['_id']
109: end
# File lib/ruote/redis/storage.rb, line 328
328: def do_get(key)
329:
330: from_json(@redis.get(key))
331: end
# File lib/ruote/redis/storage.rb, line 333
333: def from_json(s)
334:
335: s ? Rufus::Json.decode(s) : nil
336: end
A locking mecha.
Mostly inspired from code.google.com/p/redis/wiki/SetnxCommand
# File lib/ruote/redis/storage.rb, line 286
286: def lock(key, &block)
287:
288: kl = "#{key}-lock"
289:
290: loop do
291:
292: r = @redis.setnx(kl, Time.now.to_f.to_s)
293:
294: if r == false
295:
296: t = @redis.get(kl)
297:
298: @redis.del(kl) if t && Time.now.to_f - t.to_f > 60.0
299: # after 1 minute, locks time out
300:
301: sleep 0.007 # let's try to lock again after a while
302: else
303:
304: break # lock acquired
305: end
306: end
307:
308: #@redis.expire(kl, 2)
309: # this doesn't work, it makes the next call to setnx succeed
310:
311: result = block.call
312:
313: @redis.del(kl)
314:
315: result
316: end
Don’t put configuration if it’s already in
(prevent storages from trashing configuration...)
# File lib/ruote/redis/storage.rb, line 348
348: def put_configuration
349:
350: return if get('configurations', 'engine')
351:
352: put({ '_id' => 'engine', 'type' => 'configurations' }.merge(@options))
353: end
Disabled; run with --debug to generate this.
Generated with the Darkfish Rdoc Generator 1.1.6.