mirror of
https://github.com/jimeh/bunnyrun.git
synced 2026-02-19 07:56:40 +00:00
Initial commit
This commit is contained in:
1
lib/bunny_run.rb
Normal file
1
lib/bunny_run.rb
Normal file
@@ -0,0 +1 @@
|
||||
require 'bunnyrun'
|
||||
21
lib/bunnyrun.rb
Normal file
21
lib/bunnyrun.rb
Normal file
@@ -0,0 +1,21 @@
|
||||
require 'bunnyrun/consumer'
|
||||
require 'bunnyrun/cli'
|
||||
require 'bunnyrun/runner'
|
||||
require 'bunnyrun/version'
|
||||
|
||||
module BunnyRun
|
||||
class << self
|
||||
def publish(exchange_name, payload, attrs = {}); end
|
||||
|
||||
def after_start(&block)
|
||||
callbacks[:after_start] ||= []
|
||||
callbacks[:after_start] << block
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def callbacks
|
||||
@callbacks ||= {}
|
||||
end
|
||||
end
|
||||
end
|
||||
30
lib/bunnyrun/cli.rb
Normal file
30
lib/bunnyrun/cli.rb
Normal file
@@ -0,0 +1,30 @@
|
||||
require 'bunnyrun/consumer'
|
||||
require 'bunnyrun/options'
|
||||
require 'bunnyrun/runner'
|
||||
|
||||
module BunnyRun
|
||||
class CLI
|
||||
attr_reader :options
|
||||
|
||||
def self.run(argv = [])
|
||||
new.run(argv)
|
||||
end
|
||||
|
||||
def run(argv = [])
|
||||
options = Options.parse(argv)
|
||||
require_files(options.paths)
|
||||
consumers = Consumer.children
|
||||
|
||||
runner = Runner.new(options, consumers)
|
||||
runner.run
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def require_files(paths)
|
||||
paths.each do |path|
|
||||
require File.join(Dir.pwd, path)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
128
lib/bunnyrun/consumer.rb
Normal file
128
lib/bunnyrun/consumer.rb
Normal file
@@ -0,0 +1,128 @@
|
||||
require 'bunnyrun/message'
|
||||
|
||||
module BunnyRun
|
||||
class Consumer
|
||||
attr_reader :connection
|
||||
attr_reader :publish_channel
|
||||
attr_reader :default_prefetch
|
||||
attr_reader :logger
|
||||
|
||||
class << self
|
||||
def inherited(klass)
|
||||
children << klass
|
||||
end
|
||||
|
||||
def children
|
||||
@children ||= []
|
||||
end
|
||||
|
||||
def queue(name = nil, attrs = {})
|
||||
return @queue if name.nil?
|
||||
@queue = { name: name, attrs: attrs }
|
||||
end
|
||||
|
||||
def exchange(name, attrs = {})
|
||||
exchanges[name] = attrs
|
||||
end
|
||||
|
||||
def bind(exchange_name, attrs = {})
|
||||
bindings << [exchange_name, attrs]
|
||||
end
|
||||
|
||||
def manual_ack(value = nil)
|
||||
return @manual_ack || false if value.nil?
|
||||
@manual_ack = value
|
||||
end
|
||||
|
||||
def prefetch(count = nil)
|
||||
return @prefetch if count.nil?
|
||||
@prefetch = count
|
||||
end
|
||||
|
||||
def exchanges
|
||||
@exchanges ||= {}
|
||||
end
|
||||
|
||||
def bindings
|
||||
@bindings ||= []
|
||||
end
|
||||
end
|
||||
|
||||
def initialize(opts = {})
|
||||
@connection = opts[:connection]
|
||||
@publish_channel = opts[:publish_channel]
|
||||
@default_prefetch = opts[:default_prefetch]
|
||||
@logger = opts[:logger]
|
||||
end
|
||||
|
||||
def channel
|
||||
@channel ||= connection.create_channel
|
||||
end
|
||||
|
||||
def start
|
||||
perform_bindings
|
||||
set_prefetch
|
||||
subscribe
|
||||
end
|
||||
|
||||
def subscribe
|
||||
opts = { manual_ack: self.class.manual_ack }
|
||||
queue.subscribe(opts) do |delivery_info, properties, payload|
|
||||
message = Message.new(delivery_info, properties, payload)
|
||||
perform(message)
|
||||
end
|
||||
end
|
||||
|
||||
def publish(exchange_name, payload, attrs = {})
|
||||
exch = publish_exchange(exchange_name)
|
||||
exch.publish(payload, attrs)
|
||||
end
|
||||
|
||||
def queue
|
||||
@queue ||= begin
|
||||
opts = self.class.queue
|
||||
channel.queue(opts[:name], opts[:attrs])
|
||||
end
|
||||
end
|
||||
|
||||
def exchange(name)
|
||||
exchanges[name] ||= begin
|
||||
return unless self.class.exchanges.key?(name)
|
||||
|
||||
attrs = self.class.exchanges[name]
|
||||
channel.exchange(name, attrs)
|
||||
end
|
||||
end
|
||||
|
||||
def publish_exchange(name)
|
||||
publish_exchanges[name] ||= begin
|
||||
return unless self.class.exchanges.key?(name)
|
||||
|
||||
attrs = self.class.exchanges[name]
|
||||
publish_channel.exchange(name, attrs)
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def perform_bindings
|
||||
self.class.bindings.each do |(exchange_name, attrs)|
|
||||
exch = exchange(exchange_name)
|
||||
queue.bind(exch, attrs)
|
||||
end
|
||||
end
|
||||
|
||||
def set_prefetch
|
||||
count = self.class.prefetch || default_prefetch
|
||||
channel.prefetch(count, true)
|
||||
end
|
||||
|
||||
def exchanges
|
||||
@exchanges ||= {}
|
||||
end
|
||||
|
||||
def publish_exchanges
|
||||
@publish_exchanges ||= {}
|
||||
end
|
||||
end
|
||||
end
|
||||
5
lib/bunnyrun/core_ext/string.rb
Normal file
5
lib/bunnyrun/core_ext/string.rb
Normal file
@@ -0,0 +1,5 @@
|
||||
class String
|
||||
def undent
|
||||
gsub(/^.{#{slice(/^ +/).length}}/, '')
|
||||
end
|
||||
end
|
||||
49
lib/bunnyrun/message.rb
Normal file
49
lib/bunnyrun/message.rb
Normal file
@@ -0,0 +1,49 @@
|
||||
module BunnyRun
|
||||
class Message
|
||||
attr_reader :delivery_info
|
||||
attr_reader :properties
|
||||
attr_reader :payload
|
||||
|
||||
def initialize(delivery_info, properties, payload)
|
||||
@delivery_info = delivery_info
|
||||
@properties = properties
|
||||
@payload = payload
|
||||
@acked = false
|
||||
end
|
||||
|
||||
def channel
|
||||
delivery_info.channel
|
||||
end
|
||||
|
||||
def ack
|
||||
channel.ack(delivery_tag)
|
||||
@acked = true
|
||||
end
|
||||
|
||||
def reject
|
||||
channel.reject(delivery_tag, false)
|
||||
@acked = true
|
||||
end
|
||||
|
||||
def requeue
|
||||
channel.reject(delivery_tag, true)
|
||||
@acked = true
|
||||
end
|
||||
|
||||
def manual_ack?
|
||||
!delivery_info.consumer.no_ack
|
||||
end
|
||||
|
||||
def routing_key
|
||||
delivery_info.routing_key
|
||||
end
|
||||
|
||||
def delivery_mode
|
||||
properties.delivery_mode
|
||||
end
|
||||
|
||||
def delivery_tag
|
||||
delivery_info.delivery_tag
|
||||
end
|
||||
end
|
||||
end
|
||||
152
lib/bunnyrun/options.rb
Normal file
152
lib/bunnyrun/options.rb
Normal file
@@ -0,0 +1,152 @@
|
||||
require 'trollop'
|
||||
|
||||
require 'bunnyrun/core_ext/string'
|
||||
|
||||
module BunnyRun
|
||||
class Options
|
||||
class << self
|
||||
def parse(argv = [])
|
||||
args = argv.clone
|
||||
|
||||
opts = parse_args(args)
|
||||
opts[:paths] = args
|
||||
|
||||
validate_paths(opts)
|
||||
|
||||
opts.each_with_object(new) do |(key, value), memo|
|
||||
memo.send("#{key}=", value) if memo.respond_to?("#{key}=")
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def parse_args(args)
|
||||
Trollop.with_standard_exception_handling(parser) do
|
||||
parser.parse(args)
|
||||
end
|
||||
end
|
||||
|
||||
def validate_paths(opts)
|
||||
parser.die('One or more paths to consumers required', nil) \
|
||||
if opts[:paths].empty?
|
||||
end
|
||||
|
||||
def parser
|
||||
return @parser if @parser
|
||||
|
||||
defaults = new
|
||||
@parser = Trollop::Parser.new do
|
||||
banner <<-EOF.undent
|
||||
Usage: bunnyrun [options] [path ...]
|
||||
|
||||
Options:
|
||||
EOF
|
||||
|
||||
version "bunnyrun #{BunnyRun::VERSION}"
|
||||
opt :url, 'Connection string ' \
|
||||
'(example: "amqp://guest:guest@127.0.0.1:5672/vhost")',
|
||||
short: 'U', type: :string, default: defaults.url
|
||||
opt :host, 'Host',
|
||||
short: 'H', type: :string, default: defaults.host
|
||||
opt :port, 'Port',
|
||||
short: 'P', type: :int, default: defaults.port
|
||||
opt :ssl, 'Connect using SSL',
|
||||
short: 's', type: :bool, default: defaults.ssl
|
||||
opt :vhost, 'Virtual host',
|
||||
short: 'V', type: :string, default: defaults.vhost
|
||||
opt :user, 'Username',
|
||||
short: 'u', type: :string, default: defaults.user
|
||||
opt :pass, 'Password',
|
||||
short: 'p', type: :string, default: defaults.pass
|
||||
opt :prefetch, 'Default prefetch count',
|
||||
short: :none, type: :int, default: defaults.prefetch
|
||||
|
||||
banner ''
|
||||
|
||||
opt :log_target, 'Log target, file path or STDOUT',
|
||||
short: 't', type: :string, default: defaults.log_target
|
||||
opt :log_level, 'Log level (debug, info, warn, error, fatal)',
|
||||
short: 'l', type: :string, default: defaults.log_level
|
||||
opt :bunny_log_target, 'Log target used by Bunny',
|
||||
short: :none, type: :string, default: defaults.bunny_log_target
|
||||
opt :bunny_log_level, 'Log level used by Bunny',
|
||||
short: :none, type: :string, default: defaults.bunny_log_level
|
||||
|
||||
conflicts :url, :host
|
||||
conflicts :url, :port
|
||||
conflicts :url, :ssl
|
||||
conflicts :url, :vhost
|
||||
conflicts :url, :user
|
||||
conflicts :url, :pass
|
||||
|
||||
banner ''
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def url
|
||||
@url ||= nil
|
||||
end
|
||||
attr_writer :url
|
||||
|
||||
def host
|
||||
@host ||= '127.0.0.1'
|
||||
end
|
||||
attr_writer :host
|
||||
|
||||
def port
|
||||
@port ||= 5672
|
||||
end
|
||||
attr_writer :port
|
||||
|
||||
def ssl
|
||||
@ssl ||= false
|
||||
end
|
||||
attr_writer :ssl
|
||||
|
||||
def vhost
|
||||
@vhost ||= '/'
|
||||
end
|
||||
attr_writer :vhost
|
||||
|
||||
def user
|
||||
@user ||= 'guest'
|
||||
end
|
||||
attr_writer :user
|
||||
|
||||
def pass
|
||||
@pass ||= 'guest'
|
||||
end
|
||||
attr_writer :pass
|
||||
|
||||
def prefetch
|
||||
@prefetch ||= 1
|
||||
end
|
||||
attr_writer :prefetch
|
||||
|
||||
def log_target
|
||||
@log_target ||= 'STDOUT'
|
||||
end
|
||||
attr_writer :log_target
|
||||
|
||||
def log_level
|
||||
@log_level ||= 'info'
|
||||
end
|
||||
attr_writer :log_level
|
||||
|
||||
def bunny_log_target
|
||||
@bunny_log_target ||= 'STDOUT'
|
||||
end
|
||||
attr_writer :bunny_log_target
|
||||
|
||||
def bunny_log_level
|
||||
@bunny_log_level ||= 'warn'
|
||||
end
|
||||
attr_writer :bunny_log_level
|
||||
|
||||
def paths
|
||||
@paths ||= []
|
||||
end
|
||||
attr_writer :paths
|
||||
end
|
||||
end
|
||||
85
lib/bunnyrun/runner.rb
Normal file
85
lib/bunnyrun/runner.rb
Normal file
@@ -0,0 +1,85 @@
|
||||
require 'bunny'
|
||||
require 'logger'
|
||||
|
||||
require 'bunnyrun/consumer'
|
||||
|
||||
module BunnyRun
|
||||
class Runner
|
||||
attr_reader :options
|
||||
attr_reader :consumer_classes
|
||||
|
||||
def initialize(options = {}, consumer_classes)
|
||||
@options = options
|
||||
@consumer_classes = consumer_classes
|
||||
end
|
||||
|
||||
def run
|
||||
consumer_classes.each do |consumer_class|
|
||||
launch_consumer(consumer_class)
|
||||
end
|
||||
|
||||
block
|
||||
end
|
||||
|
||||
def connection
|
||||
@connection ||= begin
|
||||
conn = Bunny.new(connection_opts)
|
||||
conn.start
|
||||
conn
|
||||
end
|
||||
end
|
||||
|
||||
def publish_channel
|
||||
@publish_channel ||= connection.create_channel
|
||||
end
|
||||
|
||||
def logger
|
||||
@logger ||= begin
|
||||
logger = Logger.new(log_target)
|
||||
logger.level = log_level
|
||||
logger
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def block
|
||||
loop { sleep 1 }
|
||||
end
|
||||
|
||||
def launch_consumer(consumer_class)
|
||||
consumer = consumer_class.new(
|
||||
connection: connection,
|
||||
publish_channel: publish_channel,
|
||||
default_prefetch: options.prefetch,
|
||||
logger: logger
|
||||
)
|
||||
consumer.start
|
||||
end
|
||||
|
||||
def connection_opts
|
||||
return options.url if options.url
|
||||
|
||||
{
|
||||
host: options.host,
|
||||
port: options.port,
|
||||
ssl: options.ssl,
|
||||
vhost: options.vhost,
|
||||
user: options.user,
|
||||
pass: options.pass
|
||||
}
|
||||
end
|
||||
|
||||
def log_target
|
||||
if options.log_target.casecmp('stdout').zero?
|
||||
STDOUT
|
||||
else
|
||||
options.log_target
|
||||
end
|
||||
end
|
||||
|
||||
def log_level
|
||||
Kernel.const_get("::Logger::#{options.log_level.upcase}")
|
||||
end
|
||||
end
|
||||
end
|
||||
3
lib/bunnyrun/version.rb
Normal file
3
lib/bunnyrun/version.rb
Normal file
@@ -0,0 +1,3 @@
|
||||
module BunnyRun
|
||||
VERSION = '0.1.0'.freeze
|
||||
end
|
||||
Reference in New Issue
Block a user