mirror of
https://github.com/jimeh/bunnyrun.git
synced 2026-02-19 07:56:40 +00:00
137 lines
2.9 KiB
Ruby
137 lines
2.9 KiB
Ruby
# frozen_string_literal: true
|
|
|
|
require 'bunnyrun/message'
|
|
|
|
module BunnyRun
|
|
class Consumer
|
|
attr_reader :connection
|
|
attr_reader :publish_channel
|
|
attr_reader :default_prefetch
|
|
attr_reader :options
|
|
attr_reader :logger
|
|
|
|
class << self
|
|
def inherited(klass)
|
|
children << klass
|
|
end
|
|
|
|
def children
|
|
@children ||= []
|
|
end
|
|
|
|
def queue(name = nil, attrs = {})
|
|
return @queue if name.nil?
|
|
@queue = { name: name, attrs: attrs }
|
|
end
|
|
|
|
def exchange(name, attrs = {})
|
|
exchanges[name] = attrs
|
|
end
|
|
|
|
def bind(exchange_name, attrs = {})
|
|
bindings << [exchange_name, attrs]
|
|
end
|
|
|
|
def manual_ack(value = nil)
|
|
return @manual_ack || false if value.nil?
|
|
@manual_ack = value
|
|
end
|
|
|
|
def prefetch(count = nil)
|
|
return @prefetch if count.nil?
|
|
@prefetch = count
|
|
end
|
|
|
|
def exchanges
|
|
@exchanges ||= {}
|
|
end
|
|
|
|
def bindings
|
|
@bindings ||= []
|
|
end
|
|
end
|
|
|
|
def initialize(opts = {})
|
|
@connection = opts[:connection]
|
|
@publish_channel = opts[:publish_channel]
|
|
@default_prefetch = opts[:default_prefetch]
|
|
@options = opts[:options]
|
|
@logger = opts[:logger]
|
|
end
|
|
|
|
def channel
|
|
@channel ||= connection.create_channel
|
|
end
|
|
|
|
def start
|
|
perform_bindings
|
|
set_prefetch
|
|
subscribe
|
|
end
|
|
|
|
def subscribe
|
|
logger.info("#{self.class}: subscribing to queue \"#{queue.name}\"...")
|
|
|
|
opts = { manual_ack: self.class.manual_ack }
|
|
queue.subscribe(opts) do |delivery_info, properties, payload|
|
|
message = Message.new(delivery_info, properties, payload)
|
|
|
|
logger.debug("#{self.class}: received message #{message}")
|
|
perform(message)
|
|
end
|
|
end
|
|
|
|
def publish(exchange_name, payload, attrs = {})
|
|
exch = publish_exchange(exchange_name)
|
|
exch.publish(payload, attrs)
|
|
end
|
|
|
|
def queue
|
|
@queue ||= begin
|
|
opts = self.class.queue
|
|
channel.queue(opts[:name], opts[:attrs])
|
|
end
|
|
end
|
|
|
|
def exchange(name)
|
|
exchanges[name] ||= begin
|
|
return unless self.class.exchanges.key?(name)
|
|
|
|
attrs = self.class.exchanges[name]
|
|
channel.exchange(name, attrs)
|
|
end
|
|
end
|
|
|
|
def publish_exchange(name)
|
|
publish_exchanges[name] ||= begin
|
|
return unless self.class.exchanges.key?(name)
|
|
|
|
attrs = self.class.exchanges[name]
|
|
publish_channel.exchange(name, attrs)
|
|
end
|
|
end
|
|
|
|
private
|
|
|
|
def perform_bindings
|
|
self.class.bindings.each do |(exchange_name, attrs)|
|
|
exch = exchange(exchange_name)
|
|
queue.bind(exch, attrs)
|
|
end
|
|
end
|
|
|
|
def set_prefetch
|
|
count = self.class.prefetch || default_prefetch
|
|
channel.prefetch(count, true)
|
|
end
|
|
|
|
def exchanges
|
|
@exchanges ||= {}
|
|
end
|
|
|
|
def publish_exchanges
|
|
@publish_exchanges ||= {}
|
|
end
|
|
end
|
|
end
|