mirror of
https://github.com/jimeh/amqp-failover.git
synced 2026-02-19 10:56:44 +00:00
lots of updates, and all specs PASS!! ^_^
This commit is contained in:
@@ -32,7 +32,7 @@ module AMQP
|
||||
def default_options
|
||||
{ :retry_timeout => 1,
|
||||
:selection => :sequential, #TODO: Impliment next server selection algorithm
|
||||
:fallback => false, #TODO: Enable by default once a sane solution is found
|
||||
:fallback => false, #TODO: Enable by default once a sane implimentation is figured out
|
||||
:fallback_interval => 10 }
|
||||
end
|
||||
|
||||
@@ -58,7 +58,7 @@ module AMQP
|
||||
|
||||
def add_config(conf = {}, ref = nil)
|
||||
index = configs.index(conf)
|
||||
configs << Config::Failed.new(conf) if index.nil?
|
||||
configs.set(conf) if index.nil?
|
||||
refs[ref] = (index || configs.index(conf)) if !ref.nil?
|
||||
end
|
||||
|
||||
@@ -70,12 +70,12 @@ module AMQP
|
||||
|
||||
def failed_with(conf = {}, ref = nil, time = nil)
|
||||
time ||= Time.now
|
||||
if index = configs.index(conf)
|
||||
if !(index = configs.index(conf)).nil?
|
||||
configs[index].last_fail = time
|
||||
@latest_failed = configs[index]
|
||||
else
|
||||
configs << Config::Failed.new(conf, time)
|
||||
@latest_failed = configs.last
|
||||
@latest_failed = configs.set(conf)
|
||||
configs.last.last_fail = time
|
||||
end
|
||||
refs[ref] = (index || configs.index(conf)) if !ref.nil?
|
||||
end
|
||||
@@ -87,7 +87,7 @@ module AMQP
|
||||
index = configs.index(after)
|
||||
available = (index > 0) ? configs[index+1..-1] + configs[0..index-1] : configs[1..-1]
|
||||
available.each do |conf|
|
||||
return conf if conf.last_fail.nil? || (conf.last_fail + retry_timeout.seconds) < Time.now
|
||||
return conf if conf.last_fail.nil? || (conf.last_fail.to_i + retry_timeout) < Time.now.to_i
|
||||
end
|
||||
return nil
|
||||
end
|
||||
|
||||
@@ -34,7 +34,7 @@ module AMQP
|
||||
end
|
||||
return other.last_fail <=> self.last_fail
|
||||
end
|
||||
super(other)
|
||||
return 0
|
||||
end
|
||||
|
||||
end # Config
|
||||
|
||||
@@ -1,9 +1,11 @@
|
||||
# encoding: utf-8
|
||||
|
||||
AMQP.client = AMQP::FailoverClient
|
||||
|
||||
module AMQP
|
||||
module Client
|
||||
|
||||
class << self
|
||||
alias :connect_without_failover :connect
|
||||
|
||||
# Connect with Failover supports specifying multiple AMQP servers and configurations.
|
||||
#
|
||||
@@ -20,13 +22,15 @@ module AMQP
|
||||
#
|
||||
# Available failover options are:
|
||||
# - :retry_timeout, time to wait before retrying a specific AMQP config after failure.
|
||||
# - :fallback, monitor for original server's return and fallback to it if so.
|
||||
# - :fallback, check for the return of the primary server, and fallback to it if and when it returns.
|
||||
# - :fallback_interval, seconds between each check for original server if :fallback is true.
|
||||
# - :selection, not yet implimented.
|
||||
#
|
||||
def connect_with_failover(opts = nil)
|
||||
opts = parse_amqp_url_or_opts(opts)
|
||||
connect_without_failover(opts)
|
||||
end
|
||||
alias :connect_without_failover :connect
|
||||
alias :connect :connect_with_failover
|
||||
|
||||
def parse_amqp_url_or_opts(opts = nil)
|
||||
@@ -35,7 +39,7 @@ module AMQP
|
||||
elsif opts.is_a?(Array)
|
||||
opts = init_failover(opts)
|
||||
elsif opts.is_a?(Hash) && opts[:hosts].is_a?(Array)
|
||||
confs = opts.delete[:hosts]
|
||||
confs = opts.delete(:hosts)
|
||||
opts = init_failover(confs, opts)
|
||||
end
|
||||
opts
|
||||
@@ -43,10 +47,19 @@ module AMQP
|
||||
|
||||
def init_failover(confs = nil, opts = {})
|
||||
if !confs.nil? && confs.size > 0
|
||||
failover.primary.merge({ :failover => Failover.new(confs, opts) })
|
||||
failover = Failover.new(confs, opts)
|
||||
failover.primary.merge({ :failover => failover })
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
end # << self
|
||||
|
||||
def disconnected_with_failover
|
||||
return failover_switch if @failover
|
||||
disconnected_without_failover
|
||||
end
|
||||
alias :disconnected_without_failover :disconnected
|
||||
alias :disconnected :disconnected_with_failover
|
||||
|
||||
end # Client
|
||||
end # AMQP
|
||||
@@ -12,7 +12,24 @@ module AMQP
|
||||
|
||||
def self.extended(base)
|
||||
if (base.failover = base.settings.delete(:failover))
|
||||
base.on_disconnect = base.method(:failover_leap)
|
||||
base.on_disconnect = base.method(:disconnected)
|
||||
end
|
||||
end
|
||||
|
||||
def failover_switch
|
||||
if (new_settings = @failover.from(@settings))
|
||||
log_message = "Could not connect to or lost connection to server #{@settings[:host]}:#{@settings[:port]}. " +
|
||||
"Attempting connection to: #{new_settings[:host]}:#{new_settings[:port]}"
|
||||
logger.error(log_message)
|
||||
logger.info(log_message)
|
||||
|
||||
if @failover.options[:fallback] && @failover.primary == @settings
|
||||
fallback(@failover.primary, @failover.fallback_interval)
|
||||
end
|
||||
@settings = new_settings
|
||||
reconnect
|
||||
else
|
||||
raise Error, "Could not connect to server #{@settings[:host]}:#{@settings[:port]}"
|
||||
end
|
||||
end
|
||||
|
||||
@@ -31,23 +48,6 @@ module AMQP
|
||||
Process.exit
|
||||
end
|
||||
|
||||
def failover_leap
|
||||
if (new_settings = @failover.from(@settings))
|
||||
log_message = "Could not connect to or lost connection to server #{@settings[:host]}:#{@settings[:port]}. " +
|
||||
"Attempting connection to: #{new_settings[:host]}:#{new_settings[:port]}"
|
||||
logger.error(log_message)
|
||||
logger.info(log_message)
|
||||
|
||||
if @failover.options[:fallback] && @failover.primary == @settings
|
||||
fallback(@failover.primary, @failover.fallback_interval)
|
||||
end
|
||||
@settings = new_settings
|
||||
reconnect
|
||||
else
|
||||
raise Error, "Could not connect to server #{@settings[:host]}:#{@settings[:port]}"
|
||||
end
|
||||
end
|
||||
|
||||
def fallback(conf = {}, retry_interval = nil)
|
||||
@fallback_monitor = Failover::ServerDiscovery.monitor(conf, retry_interval) do
|
||||
fallback_callback.call(conf, retry_interval)
|
||||
|
||||
Reference in New Issue
Block a user