Files
bunnyrun/lib/bunnyrun/consumer.rb

137 lines
2.9 KiB
Ruby

# frozen_string_literal: true
require 'bunnyrun/message'
module BunnyRun
class Consumer
attr_reader :connection
attr_reader :publish_channel
attr_reader :default_prefetch
attr_reader :options
attr_reader :logger
class << self
def inherited(klass)
children << klass
end
def children
@children ||= []
end
def queue(name = nil, attrs = {})
return @queue if name.nil?
@queue = { name: name, attrs: attrs }
end
def exchange(name, attrs = {})
exchanges[name] = attrs
end
def bind(exchange_name, attrs = {})
bindings << [exchange_name, attrs]
end
def manual_ack(value = nil)
return @manual_ack || false if value.nil?
@manual_ack = value
end
def prefetch(count = nil)
return @prefetch if count.nil?
@prefetch = count
end
def exchanges
@exchanges ||= {}
end
def bindings
@bindings ||= []
end
end
def initialize(opts = {})
@connection = opts[:connection]
@publish_channel = opts[:publish_channel]
@default_prefetch = opts[:default_prefetch]
@options = opts[:options]
@logger = opts[:logger]
end
def channel
@channel ||= connection.create_channel
end
def start
perform_bindings
set_prefetch
subscribe
end
def subscribe
logger.info("#{self.class}: subscribing to queue \"#{queue.name}\"...")
opts = { manual_ack: self.class.manual_ack }
queue.subscribe(opts) do |delivery_info, properties, payload|
message = Message.new(delivery_info, properties, payload)
logger.debug("#{self.class}: received message #{message}")
perform(message)
end
end
def publish(exchange_name, payload, attrs = {})
exch = publish_exchange(exchange_name)
exch.publish(payload, attrs)
end
def queue
@queue ||= begin
opts = self.class.queue
channel.queue(opts[:name], opts[:attrs])
end
end
def exchange(name)
exchanges[name] ||= begin
return unless self.class.exchanges.key?(name)
attrs = self.class.exchanges[name]
channel.exchange(name, attrs)
end
end
def publish_exchange(name)
publish_exchanges[name] ||= begin
return unless self.class.exchanges.key?(name)
attrs = self.class.exchanges[name]
publish_channel.exchange(name, attrs)
end
end
private
def perform_bindings
self.class.bindings.each do |(exchange_name, attrs)|
exch = exchange(exchange_name)
queue.bind(exch, attrs)
end
end
def set_prefetch
count = self.class.prefetch || default_prefetch
channel.prefetch(count, true)
end
def exchanges
@exchanges ||= {}
end
def publish_exchanges
@publish_exchanges ||= {}
end
end
end