# Copyright (c) 2007 Alexander MacCaw # # Permission is hereby granted, free of charge, to any person obtaining # a copy of this software and associated documentation files (the # "Software"), to deal in the Software without restriction, including # without limitation the rights to use, copy, modify, merge, publish, # distribute, sublicense, and/or sell copies of the Software, and to # permit persons to whom the Software is furnished to do so, subject to # the following conditions: # # The above copyright notice and this permission notice shall be # included in all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE # LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION # WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. # Requirements: # SQS account with Amazon # Daemonize (http://grub.ath.cx/daemonize/) # SQS gem # # # Usage: # ====== # # class AnotherQueue < Mq # def on_message # logger.info('Log from Another Queue') # logger.info(args) # end # # def retry_attempts # 1 # end # # def delay # nil # end # end # # Mq.connect({:access_key_id => 'foo', :secret_access_key => 'foo'}) # in environment.rb # Mq.run_all # in script/console. Call with a number to add more pollers per queue # AnotherQueue.publish('a message') # somewhere in your app... # # Rails Info: # ============= # # You'll need to require all the derivative queues. # Either put them in lib, they'll be required automatically, or add # a custom load path to environment.rb; like so: # config.load_paths += %W( #{RAILS_ROOT}/app/queues ) # # You'll need to call MQ.connect in environment.rb, or in an initializer, # with your AWS credentials # # That's it, very simple! # Alex # Email: info@eribium.org # Blog: http://www.eribium.org require 'sqs' require 'daemonize' require 'logger' class Mq include Daemonize class << self def publish(msg) queue.send_message(msg) end # Call this in environment.rb, or in an initializer, with your AWS credentials def connect(options) SQS.access_key_id = options[:access_key_id] SQS.secret_access_key = options[:secret_access_key] end def run_all(pollers_per_queue = 1) 1.upto(pollers_per_queue) do |n| SQS.each_queue do |q| if qu_child = get_child(q.name.camelize) and qu_child.respond_to?(:is_mq) qu_child.new.run(true) end end end end def run(use_demonize = false) child.run(use_demonize) end def count queue(true).approximate_number_of_messages end def total_count tc = 0 SQS.each_queue do |q| tc += q.approximate_number_of_messages end tc end # So we can be sure a class inherits from Mq when were're finding them def is_mq true end def create queue(true) end def empty? queue(true).empty? end def destroy queue.delete rescue SQS::AWSSimpleQueueServiceNonEmptyQueue false end def destroy! queue.delete! end def queue_name self.name.underscore end def queue(refresh = false) child.queue(refresh) end def destroy_all! SQS.each_queue do |q| q.delete! end end private def child @@child ||= {} @@child[queue_name] ||= self.new end def get_child(child_name) child_name.constantize rescue NameError nil end end def receive(msg) 1.upto(retry_attempts + 1) do |n| begin self.args = msg.body on_message break rescue => e if n == retry_attempts + 1 logger.fatal "Permanently failed: " + e else logger.error "Retry number #{n}: " + e end end end msg.delete end def run(use_demonize = false) daemonize() if use_demonize logger.info 'Receiving messages...' loop do msg = queue.receive_message if msg logger.info "Processing message id: #{msg.id}" receive(msg) sleep(delay) if delay end end end def retry_attempts 0 end # Delay between querying SQS for messages, # not a delay between messages def delay nil end def queue(refresh = false) if refresh return @queue = create_or_find_queue end @queue ||= create_or_find_queue end def logger @logger ||= Logger.new("#{RAILS_ROOT}/log/#{log_name}.log") rescue @logger = Logger.new(STDOUT) end attr_accessor :args def on_message(*args) raise 'You must implement on_message.' end def queue_name self.class.queue_name end def log_name queue_name.tableize.singularize end protected def create_or_find_queue begin SQS.get_queue(queue_name) rescue SQS::UnavailableQueue SQS.create_queue(queue_name) end end end