major restructuring, specs probably all break

right now
This commit is contained in:
2011-01-28 17:09:30 +00:00
parent 8d5771a12a
commit 5496445d4d
16 changed files with 418 additions and 284 deletions

View File

@@ -4,15 +4,116 @@ require 'yaml'
require 'amqp/failover_client'
require 'amqp/failover/config'
require 'amqp/failover/disconnected'
require 'amqp/failover/configs'
require 'amqp/failover/logger'
require 'amqp/failover/logic'
require 'amqp/failover/logic/failed_config'
require 'amqp/failover/server_discovery'
require 'amqp/failover/version'
require 'amqp/failover/ext/amqp/client.rb'
module AMQP
module Failover
class Failover
end
end
attr_reader :latest_failed
attr_accessor :primary
attr_accessor :retry_timeout
attr_accessor :fallback
def initialize(confs = nil, opts = {})
@configs = Configs.new(confs)
@options = default_options.merge(opts)
end
# pluggable logger specifically for tracking failover and fallbacks
def self.logger
@logger ||= Logger.new
end
def default_options
{ :retry_timeout => 1,
:selection => :sequential, #TODO: Impliment next server selection algorithm
:fallback => false,
:fallback_interval => 10 }
end
def options
@options ||= {}
end
def fallback_interval
options[:fallback_interval] ||= default_options[:fallback_interval]
end
def primary
configs[:primary]
end
def refs
@refs ||= {}
end
def configs
@configs ||= Config.new
end
def configs=(confs = [])
@configs = nil
confs.each do |conf|
if conf.is_a?(Array)
add_config(conf[1], conf[0])
else
add_config(conf)
end
end
end
def add_config(conf = {}, ref = nil)
index = configs.index(conf)
configs << Config::Failed.new(conf) if index.nil?
refs[ref] = (index || configs.index(conf)) if !ref.nil?
end
def failover_from(conf = {}, time = nil)
failed_with(conf, nil, time)
next_config
end
alias :from :failover_from
def failed_with(conf = {}, ref = nil, time = nil)
time ||= Time.now
if index = configs.index(conf)
configs[index].last_fail = time
@latest_failed = configs[index]
else
configs << Config::Failed.new(conf, time)
@latest_failed = configs.last
end
refs[ref] = (index || configs.index(conf)) if !ref.nil?
end
def next_config(retry_timeout = nil, after = nil)
return nil if configs.size <= 1
retry_timeout ||= @options[:retry_timeout]
after ||= @latest_failed
index = configs.index(after)
available = (index > 0) ? configs[index+1..-1] + configs[0..index-1] : configs[1..-1]
available.each do |conf|
return conf if conf.last_fail.nil? || (conf.last_fail + retry_timeout.seconds) < Time.now
end
return nil
end
def last_fail_of(match)
((match.is_a?(Hash) ? get_by_conf(match) : get_by_ref(match)) || Config::Failed.new).last_fail
end
def get_by_conf(conf = {})
configs[configs.index(conf)]
end
def get_by_ref(ref = nil)
configs[refs[ref]] if refs[ref]
end
end # Failover
end # AMQP

View File

@@ -1,92 +1,36 @@
# encoding: utf-8
module AMQP
module Failover
class Config
class Failover
class Config < ::Hash
attr_accessor :configs
attr_accessor :failover_config
attr_accessor :last_fail
def failover_config
@failover_config ||= { :retry_timeout => 30 }
def initialize(hash = {}, last_fail_date = nil)
self.replace(symbolize_keys(hash))
self.last_fail = last_fail_date if last_fail_date
end
def refs
@refs ||= {}
end
def configs
@configs ||= []
end
def primary
@primary ||= 0
end
def primary=(ref)
@primary = ref
end
def get_primary
get(primary) || default_config
end
def set_primary(conf = {})
set(conf, primary)
end
def get(ref = nil)
return configs[ref] if ref.is_a?(Fixnum)
configs[refs[ref]] if refs[ref]
end
def set(conf = {}, ref = nil)
conf = default_config.merge(conf)
configs << conf if (index = configs.index(conf)).nil?
if ref
refs[ref] = (index || configs.index(conf))
end
end
def find_next(conf = {})
current = configs.index(conf)
configs[(current+1 == configs.size) ? 0 : current+1] if current
end
def load_file(file, env = nil)
raise ArgumentError, "Can't find #{file}" unless File.exists?(file)
load(YAML.load_file(file)[env || "development"])
end
def load_yaml(data, env = nil)
load(YAML.load(data)[env || "development"])
end
def load(conf)
if conf.is_a?(::Array)
load_array(conf)
elsif conf.is_a?(::Hash)
load_hash(conf)
end
end
def load_array(confs = [])
@configs = nil
confs.each do |conf|
load_hash(conf)
end
end
def load_hash(conf = {})
conf = conf.inject({}) do |result, (key, value)|
def symbolize_keys(hash = {})
hash.inject({}) do |result, (key, value)|
result[key.is_a?(String) ? key.to_sym : key] = value
result
end
self.set(conf)
end
def default_config
AMQP.settings
# order by latest fail, potentially useful if random config selection is used
def <=>(other)
if self.respond_to?(:last_fail) && other.respond_to?(:last_fail)
if self.last_fail.nil? && other.last_fail.nil?
return 0
elsif self.last_fail.nil? && !other.last_fail.nil?
return 1
elsif !self.last_fail.nil? && other.last_fail.nil?
return -1
end
return other.last_fail <=> self.last_fail
end
return 0
end
end # Config

View File

@@ -0,0 +1,96 @@
# encoding: utf-8
module AMQP
class Failover
class Configs < Array
def initialize(confs = nil)
load(confs)
end
def [](*args)
return super(*args) if args[0].is_a?(Fixnum)
return get_primary if args[0] == :primary
get(args[0])
end
def []=(*args)
return super(*args) if args[0].is_a?(Fixnum)
return set_primary(args.last, args[0]) if args[0] == :primary
set(args.last, args[0])
end
def refs
@refs ||= {}
end
def primary
@primary ||= 0
end
def primary=(ref)
@primary = ref
end
def get_primary
get(primary) || default_config
end
def set_primary(conf = {})
set(conf, primary)
end
def get(ref = nil)
return self[ref] if ref.is_a?(Fixnum)
self[refs[ref]] if refs[ref]
end
def set(conf = {}, ref = nil)
conf = Config.new(default_config.merge(conf))
self << conf if (index = self.index(conf)).nil?
if ref
refs[ref] = (index || self.index(conf))
end
end
def find_next(conf = {})
current = self.index(conf)
self[(current+1 == self.size) ? 0 : current+1] if current
end
def load_file(file, env = nil)
raise ArgumentError, "Can't find #{file}" unless File.exists?(file)
load(YAML.load_file(file)[env || "development"])
end
def load_yaml(data, env = nil)
load(YAML.load(data)[env || "development"])
end
def load(conf)
if conf.is_a?(::Array)
load_array(conf)
elsif conf.is_a?(::Hash)
load_hash(conf)
end
end
def self.load_array(confs = [])
self.clear
confs.each do |conf|
conf = AMQP::Client.parse_amqp_url(conf) if conf.is_a?(::String)
load_hash(conf)
end
end
def load_hash(conf = {})
set(Config.new(conf))
end
def default_config
AMQP.settings
end
end # Config
end # Failover
end # AMQP

View File

@@ -1,43 +0,0 @@
# encoding: utf-8
module AMQP
module Failover
class Disconnected
attr_accessor :base
attr_accessor :failover
attr_accessor :configs
attr_accessor :logger
def initialize(base)
@base = base
@configs = @base.configs
@failover_conf = @base.failover_conf
@logger = @base.logger
end
def call
@logic ||= Logic.new(@configs.configs, @failover_conf.get_primary, @failover_conf.failover_config)
if (new_settings = @logic.failover_from(@base.settings))
log_message = "Could not connect to or lost connection to server #{@base.settings[:host]}:#{@base.settings[:port]}. " +
"Attempting connection to: #{new_settings[:host]}:#{new_settings[:port]}"
@logger.error(log_message)
@logger.info(log_message)
if @failover_conf.get_primary == @base.settings
ServerDiscovery.monitor(@failover_conf.get_primary) do
@base.clean_exit("Primary server (#{@failover_conf.get_primary[:host]}:#{@failover_conf.get_primary[:port]}) is back. " +
"Performing clean exit to be relaunched with primary config.")
end
end
@base.settings = new_settings
@base.reconnect
else
raise Error, "Could not connect to server #{@base.settings[:host]}:#{@base.settings[:port]}"
end
end
end # Disconnected
end # Failover
end # AMQP

View File

@@ -0,0 +1,52 @@
# encoding: utf-8
module AMQP
module Client
class << self
alias :connect_without_failover :connect
# Connect with Failover supports specifying multiple AMQP servers and configurations.
#
# Argument Examples:
# - "amqp://guest:guest@host:5672,amqp://guest:guest@host:5673"
# - ["amqp://guest:guest@host:5672", "amqp://guest:guest@host:5673"]
# - [{:host => "host", :port => 5672}, {:host => "host", :port => 5673}]
# - {:hosts => ["amqp://user:pass@host:5672", "amqp://user:pass@host:5673"]}
# - {:hosts => [{:host => "host", :port => 5672}, {:host => "host", :port => 5673}]}
#
# The last two examples are by far the most flexible, cause they also let you specify
# failover and fallback specific options. Like so:
# - {:hosts => ["amqp://localhost:5672"], :fallback => false}
#
# Available failover options are:
# - :retry_timeout, time to wait before retrying a specific AMQP config after failure.
# - :fallback, monitor for original server's return and fallback to it if so.
# - :fallback_interval, seconds between each check for original server if :fallback is true.
#
def connect_with_failover(opts = nil)
opts = parse_amqp_url_or_opts(opts)
connect_without_failover(opts)
end
alias :connect :connect_with_failover
def parse_amqp_url_or_opts(opts = nil)
if opts.is_a?(String) && opts.index(',').nil?
opts = init_failover(opts.split(','))
elsif opts.is_a?(Array)
opts = init_failover(opts)
elsif opts.is_a?(Hash) && opts[:hosts].is_a?(Array)
confs = opts.delete[:hosts]
opts = init_failover(confs, opts)
end
opts
end
def init_failover(confs = nil, opts = {})
if !confs.nil? && confs.size > 0
failover.primary.merge({ :failover => Failover.new(confs, opts) })
end
end
end # << self
end # Client
end # AMQP

View File

@@ -1,7 +1,7 @@
# encoding: utf-8
module AMQP
module Failover
class Failover
class Logger
attr_accessor :enabled

View File

@@ -1,86 +0,0 @@
# encoding: utf-8
module AMQP
module Failover
class Logic
attr_reader :latest_failed
attr_accessor :primary
attr_accessor :retry_timeout
attr_accessor :fallback
def initialize(confs = nil, primary = nil, options = {})
@primary = primary
@retry_timeout = (options.delete(:retry_timeout) || 30)
self.configs = confs if !confs.nil?
end
def refs
@refs ||= {}
end
def configs
@configs ||= []
end
def configs=(confs = [])
@configs = []
confs.each do |conf|
if conf.is_a?(Array)
add_config(conf[1], conf[0])
else
add_config(conf)
end
end
end
def add_config(conf = {}, ref = nil)
index = configs.index(conf)
configs << FailedConfig.new(conf) if index.nil?
refs[ref] = (index || configs.index(conf)) if !ref.nil?
end
def failover_from(conf = {}, time = nil)
failed_with(conf, nil, time)
next_config
end
def failed_with(conf = {}, ref = nil, time = nil)
time ||= Time.now
if index = configs.index(conf)
configs[index].last_fail = time
@latest_failed = configs[index]
else
configs << FailedConfig.new(conf, time)
@latest_failed = configs.last
end
refs[ref] = (index || configs.index(conf)) if !ref.nil?
end
def next_config(retry_timeout = nil, after = nil)
return nil if configs.size <= 1
retry_timeout ||= @retry_timeout
after ||= @latest_failed
index = configs.index(after)
available = (index > 0) ? configs[index+1..-1] + configs[0..index-1] : configs[1..-1]
available.each do |conf|
return conf if conf.last_fail.nil? || (conf.last_fail + retry_timeout.seconds) < Time.now
end
return nil
end
def last_fail_of(match)
((match.is_a?(Hash) ? get_by_conf(match) : get_by_ref(match)) || FailedConfig.new).last_fail
end
def get_by_conf(conf = {})
configs[configs.index(conf)]
end
def get_by_ref(ref = nil)
configs[refs[ref]] if refs[ref]
end
end # Logic
end # Failover
end # AMQP

View File

@@ -1,33 +0,0 @@
# encoding: utf-8
module AMQP
module Failover
class Logic
class FailedConfig < ::Hash
attr_accessor :last_fail
def initialize(hash = {}, last_fail_date = nil)
self.replace(hash)
self.last_fail = last_fail_date if last_fail_date
end
# order by latest fail, potentially useful if random config selection is used
def <=>(other)
if self.respond_to?(:last_fail) && other.respond_to?(:last_fail)
if self.last_fail.nil? && other.last_fail.nil?
return 0
elsif self.last_fail.nil? && !other.last_fail.nil?
return 1
elsif !self.last_fail.nil? && other.last_fail.nil?
return -1
end
return other.last_fail <=> self.last_fail
end
return 0
end
end # FailedConfig
end # Logic
end # Failover
end # AMQP

View File

@@ -1,18 +1,18 @@
# encoding: utf-8
module AMQP
module Failover
class Failover
class ServerDiscovery < EM::Connection
class << self
attr_accessor :connection
end
def self.monitor(conf = {}, &block)
def self.monitor(conf = {}, retry_interval = nil, &block)
if EM.reactor_running?
start_monitoring(conf, &block)
start_monitoring(conf, retry_interval, &block)
else
EM.run { start_monitoring(conf, &block) }
EM.run { start_monitoring(conf, retry_interval, &block) }
end
end
@@ -27,10 +27,11 @@ module AMQP
close_connection
end
def self.start_monitoring(conf = {}, &block)
def self.start_monitoring(conf = {}, retry_interval = nil, &block)
conf = conf.clone
retry_interval ||= 5
conf[:done] = block
conf[:timer] = EM::PeriodicTimer.new(conf[:retry_interval] || 5) do
conf[:timer] = EM::PeriodicTimer.new(retry_interval) do
@connection = connect(conf)
end
end

View File

@@ -1,7 +1,7 @@
# encoding: utf-8
module AMQP
module Failover
class Failover
VERSION = "0.0.1"
end
end

View File

@@ -4,25 +4,24 @@ module AMQP
module FailoverClient
include AMQP::BasicClient
attr_accessor :on_disconnect
attr_accessor :failover
attr_reader :fallback_monitor
attr_accessor :settings
attr_accessor :on_disconnect
def self.extended(base)
base.on_disconnect = proc {
Failover::OnDisconnect.new(base).call
}
if (base.failover = base.settings.delete(:failover))
base.on_disconnect = base.method(:failover_leap)
end
end
def logger
@logger ||= Failover::Logger.new
end
def failover_conf
@failover_conf ||= Failover::Config.new
Failover.logger
end
def configs
failover_conf.configs
@failover.configs if @failover
end
def clean_exit(msg = nil)
@@ -32,20 +31,50 @@ module AMQP
Process.exit
end
def process_frame(frame)
if mq = channels[frame.channel]
mq.process_frame(frame)
return
def failover_leap
if (new_settings = @failover.from(@settings))
log_message = "Could not connect to or lost connection to server #{@settings[:host]}:#{@settings[:port]}. " +
"Attempting connection to: #{new_settings[:host]}:#{new_settings[:port]}"
logger.error(log_message)
logger.info(log_message)
fallback(@failover.primary, @failover.fallback_interval) if @failover.primary == @settings
@settings = new_settings
reconnect
else
raise Error, "Could not connect to server #{@settings[:host]}:#{@settings[:port]}"
end
if frame.is_a?(AMQP::Frame::Method) && (method = frame.payload).is_a?(AMQP::Protocol::Connection::Close)
if method.reply_text =~ /^NOT_ALLOWED/
raise AMQP::Error, "#{method.reply_text} in #{::AMQP::Protocol.classes[method.class_id].methods[method.method_id]}"
end
end
super(frame)
end
def fallback(conf = {}, retry_interval = nil)
@fallback_monitor = Failover::ServerDiscovery.monitor(conf, retry_interval) do
fallback_callback.call(conf, retry_interval)
end
end
def fallback_callback
@fallback_callback ||= proc { |conf, retry_interval|
clean_exit("Primary server (#{conf[:host]}:#{conf[:port]}) is back. " +
"Performing clean exit to be relaunched with primary config.")
}
end
attr_writer :fallback_callback
#TODO: Figure out why I originally needed this
# def process_frame(frame)
# if mq = channels[frame.channel]
# mq.process_frame(frame)
# return
# end
#
# if frame.is_a?(AMQP::Frame::Method) && (method = frame.payload).is_a?(AMQP::Protocol::Connection::Close)
# if method.reply_text =~ /^NOT_ALLOWED/
# raise AMQP::Error, "#{method.reply_text} in #{::AMQP::Protocol.classes[method.class_id].methods[method.method_id]}"
# end
# end
# super(frame)
# end
end # FailoverClient
end # AMQP