mirror of
https://github.com/jimeh/amqp-failover.git
synced 2026-02-19 02:46:43 +00:00
initial import of failover code
This commit is contained in:
24
.gitignore
vendored
24
.gitignore
vendored
@@ -1,4 +1,28 @@
|
||||
## MAC OS
|
||||
.DS_Store
|
||||
|
||||
## TEXTMATE
|
||||
*.tmproj
|
||||
tmtags
|
||||
|
||||
## EMACS
|
||||
*~
|
||||
\#*
|
||||
.\#*
|
||||
|
||||
## VIM
|
||||
*.swp
|
||||
|
||||
## PROJECT::GENERAL
|
||||
coverage
|
||||
rdoc
|
||||
*.gem
|
||||
.bundle
|
||||
Gemfile.lock
|
||||
pkg/*
|
||||
|
||||
## PROJECT::SPECIFIC
|
||||
.yardoc/*
|
||||
spec/db/*
|
||||
doc/*
|
||||
|
||||
|
||||
49
Rakefile
49
Rakefile
@@ -1,2 +1,51 @@
|
||||
$LOAD_PATH.unshift File.expand_path("lib", File.dirname(__FILE__))
|
||||
|
||||
require 'bundler'
|
||||
Bundler::GemHelper.install_tasks
|
||||
|
||||
|
||||
#
|
||||
# Rspec
|
||||
#
|
||||
|
||||
require 'rspec/core/rake_task'
|
||||
desc "Run all specs"
|
||||
task :spec => ["spec:unit", "spec:integration"]
|
||||
|
||||
RSpec::Core::RakeTask.new('spec:unit') do |spec|
|
||||
spec.pattern = 'spec/unit/**/*_spec.rb'
|
||||
end
|
||||
|
||||
RSpec::Core::RakeTask.new('spec:integration') do |spec|
|
||||
spec.pattern = 'spec/integration/**/*_spec.rb'
|
||||
end
|
||||
|
||||
RSpec::Core::RakeTask.new(:rcov) do |spec|
|
||||
spec.pattern = 'spec/**/*_spec.rb'
|
||||
spec.rcov = true
|
||||
end
|
||||
|
||||
|
||||
#
|
||||
# Yard
|
||||
#
|
||||
|
||||
begin
|
||||
require 'yard'
|
||||
YARD::Rake::YardocTask.new
|
||||
rescue LoadError
|
||||
task :yardoc do
|
||||
abort "YARD is not available. In order to run yardoc, you must: sudo gem install yard"
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
#
|
||||
# Misc.
|
||||
#
|
||||
|
||||
desc "Start irb with amqp-failover pre-loaded"
|
||||
task :console do
|
||||
exec "irb -r spec/spec_helper"
|
||||
end
|
||||
task :c => :console
|
||||
|
||||
@@ -1,16 +1,16 @@
|
||||
# -*- encoding: utf-8 -*-
|
||||
$:.push File.expand_path("../lib", __FILE__)
|
||||
require "amqp-failover/version"
|
||||
require "amqp/failover/version"
|
||||
|
||||
Gem::Specification.new do |s|
|
||||
s.name = "amqp-failover"
|
||||
s.version = Amqp::Failover::VERSION
|
||||
s.version = AMQP::Failover::VERSION
|
||||
s.platform = Gem::Platform::RUBY
|
||||
s.authors = ["TODO: Write your name"]
|
||||
s.email = ["TODO: Write your email address"]
|
||||
s.homepage = ""
|
||||
s.summary = %q{TODO: Write a gem summary}
|
||||
s.description = %q{TODO: Write a gem description}
|
||||
s.authors = ["Jim Myhrberg"]
|
||||
s.email = ["contact@jimeh.me"]
|
||||
s.homepage = 'http://github.com/jimeh/amqp-failover'
|
||||
s.summary = 'Add multi-server failover and fallback to amqp gem.'
|
||||
s.description = 'Add multi-server failover and fallback to amqp gem.'
|
||||
|
||||
s.rubyforge_project = "amqp-failover"
|
||||
|
||||
@@ -18,4 +18,12 @@ Gem::Specification.new do |s|
|
||||
s.test_files = `git ls-files -- {test,spec,features}/*`.split("\n")
|
||||
s.executables = `git ls-files -- bin/*`.split("\n").map{ |f| File.basename(f) }
|
||||
s.require_paths = ["lib"]
|
||||
|
||||
s.add_runtime_dependency 'amqp', '>= 0.7.0'
|
||||
|
||||
s.add_development_dependency 'rake', '>= 0.8.7'
|
||||
s.add_development_dependency 'rack-test', '>= 0.5.6'
|
||||
s.add_development_dependency 'rspec', '>= 2.1.0'
|
||||
s.add_development_dependency 'yard', '>= 0.6.3'
|
||||
s.add_development_dependency 'ruby-debug'
|
||||
end
|
||||
|
||||
@@ -1,5 +0,0 @@
|
||||
module Amqp
|
||||
module Failover
|
||||
# Your code goes here...
|
||||
end
|
||||
end
|
||||
18
lib/amqp/failover.rb
Normal file
18
lib/amqp/failover.rb
Normal file
@@ -0,0 +1,18 @@
|
||||
# encoding: utf-8
|
||||
|
||||
require 'amqp'
|
||||
require 'yaml'
|
||||
|
||||
require 'amqp/failover/basic_client'
|
||||
require 'amqp/failover/config'
|
||||
require 'amqp/failover/fallback'
|
||||
require 'amqp/failover/logger'
|
||||
require 'amqp/failover/logic'
|
||||
require 'amqp/failover/logic/failed_config'
|
||||
require 'amqp/failover/version'
|
||||
|
||||
module AMQP
|
||||
module Failover
|
||||
|
||||
end
|
||||
end
|
||||
55
lib/amqp/failover/basic_client.rb
Normal file
55
lib/amqp/failover/basic_client.rb
Normal file
@@ -0,0 +1,55 @@
|
||||
# encoding: utf-8
|
||||
|
||||
module AMQP
|
||||
module Failover
|
||||
module BasicClient
|
||||
include AMQP::BasicClient
|
||||
|
||||
class Error < Exception; end
|
||||
|
||||
attr_accessor :on_disconnect
|
||||
attr_accessor :settings
|
||||
|
||||
def self.extended(base)
|
||||
base.on_disconnect = proc {
|
||||
OnDisconnect.new(base).call
|
||||
}
|
||||
end
|
||||
|
||||
def logger
|
||||
@logger ||= Logger.new
|
||||
end
|
||||
|
||||
def failover_conf
|
||||
@failover_conf ||= Config.new
|
||||
end
|
||||
|
||||
def configs
|
||||
failover_conf.configs
|
||||
end
|
||||
|
||||
def clean_exit(msg = nil)
|
||||
msg ||= "clean exit"
|
||||
logger.info(msg)
|
||||
logger.error(msg)
|
||||
Process.exit
|
||||
end
|
||||
|
||||
def process_frame(frame)
|
||||
if mq = channels[frame.channel]
|
||||
mq.process_frame(frame)
|
||||
return
|
||||
end
|
||||
|
||||
if frame.is_a?(::AMQP::Frame::Method) && (method = frame.payload).is_a?(::AMQP::Protocol::Connection::Close)
|
||||
if method.reply_text =~ /^NOT_ALLOWED/
|
||||
raise ::AMQP::Error, "#{method.reply_text} in #{::AMQP::Protocol.classes[method.class_id].methods[method.method_id]}"
|
||||
end
|
||||
end
|
||||
super(frame)
|
||||
end
|
||||
|
||||
end # BasicClient
|
||||
end # Failover
|
||||
end # AMQP
|
||||
|
||||
46
lib/amqp/failover/basic_client/on_disconnect.rb
Normal file
46
lib/amqp/failover/basic_client/on_disconnect.rb
Normal file
@@ -0,0 +1,46 @@
|
||||
# encoding: utf-8
|
||||
|
||||
module AMQP
|
||||
module Failover
|
||||
module BasicClient
|
||||
class OnDisconnect
|
||||
|
||||
attr_accessor :base
|
||||
attr_accessor :failover
|
||||
attr_accessor :configs
|
||||
attr_accessor :logger
|
||||
|
||||
def initialize(base)
|
||||
@base = base
|
||||
@configs = @base.configs
|
||||
@failover_conf = @base.failover_conf
|
||||
@logger = @base.logger
|
||||
end
|
||||
|
||||
def call
|
||||
@logic ||= Logic.new(@configs.configs, @failover_conf.get_primary, @failover_conf.failover_config)
|
||||
if (new_settings = @logic.failover_from(@base.settings))
|
||||
log_message = "Could not connect to or lost connection to server #{@base.settings[:host]}:#{@base.settings[:port]}. " +
|
||||
"Attempting connection to: #{new_settings[:host]}:#{new_settings[:port]}"
|
||||
@logger.error(log_message)
|
||||
@logger.info(log_message)
|
||||
|
||||
if @failover_conf.get_primary == @base.settings
|
||||
FallbackMonitor.monitor(@failover_conf.get_primary) do
|
||||
@base.clean_exit("Primary server (#{@failover_conf.get_primary[:host]}:#{@failover_conf.get_primary[:port]}) is back. " +
|
||||
"Performing clean exit to be relaunched with primary config.")
|
||||
end
|
||||
end
|
||||
|
||||
@base.settings = new_settings
|
||||
@base.reconnect
|
||||
else
|
||||
raise Error, "Could not connect to server #{@base.settings[:host]}:#{@base.settings[:port]}"
|
||||
end
|
||||
end
|
||||
|
||||
end # OnDisconnect
|
||||
end # BasicClient
|
||||
end # Failover
|
||||
end # AMQP
|
||||
|
||||
94
lib/amqp/failover/config.rb
Normal file
94
lib/amqp/failover/config.rb
Normal file
@@ -0,0 +1,94 @@
|
||||
# encoding: utf-8
|
||||
|
||||
module AMQP
|
||||
module Failover
|
||||
class Config
|
||||
|
||||
attr_accessor :configs
|
||||
attr_accessor :failover_config
|
||||
|
||||
def failover_config
|
||||
@failover_config ||= { :retry_timeout => 30 }
|
||||
end
|
||||
|
||||
def refs
|
||||
@refs ||= {}
|
||||
end
|
||||
|
||||
def configs
|
||||
@configs ||= []
|
||||
end
|
||||
|
||||
def primary
|
||||
@primary ||= 0
|
||||
end
|
||||
|
||||
def primary=(ref)
|
||||
@primary = ref
|
||||
end
|
||||
|
||||
def get_primary
|
||||
get(primary) || default_config
|
||||
end
|
||||
|
||||
def set_primary(conf = {})
|
||||
set(conf, primary)
|
||||
end
|
||||
|
||||
def get(ref = nil)
|
||||
return configs[ref] if ref.is_a?(Fixnum)
|
||||
configs[refs[ref]] if refs[ref]
|
||||
end
|
||||
|
||||
def set(conf = {}, ref = nil)
|
||||
conf = default_config.merge(conf)
|
||||
configs << conf if (index = configs.index(conf)).nil?
|
||||
if ref
|
||||
refs[ref] = (index || configs.index(conf))
|
||||
end
|
||||
end
|
||||
|
||||
def find_next(conf = {})
|
||||
current = configs.index(conf)
|
||||
configs[(current+1 == configs.size) ? 0 : current+1] if current
|
||||
end
|
||||
|
||||
def load_file(file, env = nil)
|
||||
raise ArgumentError, "Can't find #{file}" unless File.exists?(file)
|
||||
load(YAML.load_file(file)[env || "development"])
|
||||
end
|
||||
|
||||
def load_yaml(data, env = nil)
|
||||
load(YAML.load(data)[env || "development"])
|
||||
end
|
||||
|
||||
def load(conf)
|
||||
if conf.is_a?(::Array)
|
||||
load_array(conf)
|
||||
elsif conf.is_a?(::Hash)
|
||||
load_hash(conf)
|
||||
end
|
||||
end
|
||||
|
||||
def load_array(confs = [])
|
||||
@configs = nil
|
||||
confs.each do |conf|
|
||||
load_hash(conf)
|
||||
end
|
||||
end
|
||||
|
||||
def load_hash(conf = {})
|
||||
conf = conf.inject({}) do |result, (key, value)|
|
||||
result[key.is_a?(String) ? key.to_sym : key] = value
|
||||
result
|
||||
end
|
||||
self.set(conf)
|
||||
end
|
||||
|
||||
def default_config
|
||||
AMQP.settings
|
||||
end
|
||||
|
||||
end # Config
|
||||
end # Failover
|
||||
end # AMQP
|
||||
44
lib/amqp/failover/fallback.rb
Normal file
44
lib/amqp/failover/fallback.rb
Normal file
@@ -0,0 +1,44 @@
|
||||
# encoding: utf-8
|
||||
|
||||
module AMQP
|
||||
module Failover
|
||||
class Fallback < EM::Connection
|
||||
|
||||
class << self
|
||||
attr_accessor :connection
|
||||
end
|
||||
|
||||
def initialize(args)
|
||||
@done = args[:done]
|
||||
@timer = args[:timer]
|
||||
end
|
||||
|
||||
def connection_completed
|
||||
@done.call
|
||||
@timer.cancel
|
||||
close_connection
|
||||
end
|
||||
|
||||
def self.monitor(conf = {}, &block)
|
||||
if EM.reactor_running?
|
||||
start_monitoring(conf, &block)
|
||||
else
|
||||
EM.run { start_monitoring(conf, &block) }
|
||||
end
|
||||
end
|
||||
|
||||
def self.start_monitoring(conf = {}, &block)
|
||||
conf = conf.clone
|
||||
conf[:done] = block
|
||||
conf[:timer] = EM::PeriodicTimer.new(conf[:retry_interval] || 5) do
|
||||
@connection = connect(conf)
|
||||
end
|
||||
end
|
||||
|
||||
def self.connect(conf)
|
||||
EM.connect(conf[:host], conf[:port], self, conf)
|
||||
end
|
||||
|
||||
end # Fallback
|
||||
end # Failover
|
||||
end # AMQP
|
||||
31
lib/amqp/failover/logger.rb
Normal file
31
lib/amqp/failover/logger.rb
Normal file
@@ -0,0 +1,31 @@
|
||||
# encoding: utf-8
|
||||
|
||||
module AMQP
|
||||
module Failover
|
||||
class Logger
|
||||
|
||||
attr_accessor :enabled
|
||||
|
||||
def initialize(enabled = nil)
|
||||
@enabled = enabled || true
|
||||
end
|
||||
|
||||
def error(*msg)
|
||||
msg[0] = "[ERROR]: " + msg[0] if msg[0].is_a?(String)
|
||||
write(*msg)
|
||||
end
|
||||
|
||||
def info(*msg)
|
||||
write(*msg)
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def write(*msg)
|
||||
return if !@enabled
|
||||
puts *msg
|
||||
end
|
||||
|
||||
end # Logger
|
||||
end # Failover
|
||||
end # AMQP
|
||||
86
lib/amqp/failover/logic.rb
Normal file
86
lib/amqp/failover/logic.rb
Normal file
@@ -0,0 +1,86 @@
|
||||
# encoding: utf-8
|
||||
|
||||
module AMQP
|
||||
module Failover
|
||||
class Logic
|
||||
|
||||
attr_reader :latest_failed
|
||||
attr_accessor :primary
|
||||
attr_accessor :retry_timeout
|
||||
attr_accessor :fallback
|
||||
|
||||
def initialize(confs = nil, primary = nil, options = {})
|
||||
@primary = primary
|
||||
@retry_timeout = (options.delete(:retry_timeout) || 30)
|
||||
self.configs = confs if !confs.nil?
|
||||
end
|
||||
|
||||
def refs
|
||||
@refs ||= {}
|
||||
end
|
||||
|
||||
def configs
|
||||
@configs ||= []
|
||||
end
|
||||
|
||||
def configs=(confs = [])
|
||||
@configs = []
|
||||
confs.each do |conf|
|
||||
if conf.is_a?(Array)
|
||||
add_config(conf[1], conf[0])
|
||||
else
|
||||
add_config(conf)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def add_config(conf = {}, ref = nil)
|
||||
index = configs.index(conf)
|
||||
configs << FailedConfig.new(conf) if index.nil?
|
||||
refs[ref] = (index || configs.index(conf)) if !ref.nil?
|
||||
end
|
||||
|
||||
def failover_from(conf = {}, time = nil)
|
||||
failed_with(conf, nil, time)
|
||||
next_config
|
||||
end
|
||||
|
||||
def failed_with(conf = {}, ref = nil, time = nil)
|
||||
time ||= Time.now
|
||||
if index = configs.index(conf)
|
||||
configs[index].last_fail = time
|
||||
@latest_failed = configs[index]
|
||||
else
|
||||
configs << FailedConfig.new(conf, time)
|
||||
@latest_failed = configs.last
|
||||
end
|
||||
refs[ref] = (index || configs.index(conf)) if !ref.nil?
|
||||
end
|
||||
|
||||
def next_config(retry_timeout = nil, after = nil)
|
||||
return nil if configs.size <= 1
|
||||
retry_timeout ||= @retry_timeout
|
||||
after ||= @latest_failed
|
||||
index = configs.index(after)
|
||||
available = (index > 0) ? configs[index+1..-1] + configs[0..index-1] : configs[1..-1]
|
||||
available.each do |conf|
|
||||
return conf if conf.last_fail.nil? || (conf.last_fail + retry_timeout.seconds) < Time.now
|
||||
end
|
||||
return nil
|
||||
end
|
||||
|
||||
def last_fail_of(match)
|
||||
((match.is_a?(Hash) ? get_by_conf(match) : get_by_ref(match)) || FailedConfig.new).last_fail
|
||||
end
|
||||
|
||||
def get_by_conf(conf = {})
|
||||
configs[configs.index(conf)]
|
||||
end
|
||||
|
||||
def get_by_ref(ref = nil)
|
||||
configs[refs[ref]] if refs[ref]
|
||||
end
|
||||
|
||||
end # Logic
|
||||
end # Failover
|
||||
end # AMQP
|
||||
33
lib/amqp/failover/logic/failed_config.rb
Normal file
33
lib/amqp/failover/logic/failed_config.rb
Normal file
@@ -0,0 +1,33 @@
|
||||
# encoding: utf-8
|
||||
|
||||
module AMQP
|
||||
module Failover
|
||||
class Logic
|
||||
class FailedConfig < ::Hash
|
||||
|
||||
attr_accessor :last_fail
|
||||
|
||||
def initialize(hash = {}, last_fail_date = nil)
|
||||
self.replace(hash)
|
||||
self.last_fail = last_fail_date if last_fail_date
|
||||
end
|
||||
|
||||
# order by latest fail, potentially useful if random config selection is used
|
||||
def <=>(other)
|
||||
if self.respond_to?(:last_fail) && other.respond_to?(:last_fail)
|
||||
if self.last_fail.nil? && other.last_fail.nil?
|
||||
return 0
|
||||
elsif self.last_fail.nil? && !other.last_fail.nil?
|
||||
return 1
|
||||
elsif !self.last_fail.nil? && other.last_fail.nil?
|
||||
return -1
|
||||
end
|
||||
return other.last_fail <=> self.last_fail
|
||||
end
|
||||
return 0
|
||||
end
|
||||
|
||||
end # FailedConfig
|
||||
end # Logic
|
||||
end # Failover
|
||||
end # AMQP
|
||||
@@ -1,4 +1,6 @@
|
||||
module Amqp
|
||||
# encoding: utf-8
|
||||
|
||||
module AMQP
|
||||
module Failover
|
||||
VERSION = "0.0.1"
|
||||
end
|
||||
9
spec/spec_helper.rb
Normal file
9
spec/spec_helper.rb
Normal file
@@ -0,0 +1,9 @@
|
||||
# add project-relative load paths
|
||||
$LOAD_PATH.unshift File.dirname(__FILE__)
|
||||
$LOAD_PATH.unshift File.join(File.dirname(__FILE__), '..', 'lib')
|
||||
|
||||
# require stuff
|
||||
require 'rubygems'
|
||||
require 'amqp/failover'
|
||||
require 'rspec'
|
||||
require 'rspec/autorun'
|
||||
59
spec/unit/amqp/failover/config_spec.rb
Normal file
59
spec/unit/amqp/failover/config_spec.rb
Normal file
@@ -0,0 +1,59 @@
|
||||
# encoding: utf-8
|
||||
|
||||
require 'spec_helper'
|
||||
|
||||
describe AMQP::Failover::Config do
|
||||
|
||||
before(:each) do
|
||||
@conf = AMQP::Failover::Config.new
|
||||
[:primary, :configs, :refs].each do |var|
|
||||
@conf.instance_variable_set("@#{var}", nil)
|
||||
end
|
||||
@raw_configs = [
|
||||
{:host => 'rabbit3.local'},
|
||||
{:host => 'rabbit2.local'},
|
||||
{:host => 'rabbit2.local', :port => 5673}
|
||||
]
|
||||
@configs = @raw_configs.map { |conf| @conf.default_config.merge(conf) }
|
||||
end
|
||||
|
||||
after(:each) do
|
||||
[:primary, :configs, :refs].each do |var|
|
||||
@conf.instance_variable_set("@#{var}", nil)
|
||||
end
|
||||
end
|
||||
|
||||
it "should set and get configs" do
|
||||
@conf.primary.should == 0
|
||||
@conf.configs.should have(0).items
|
||||
|
||||
@conf.set(@raw_configs[0])
|
||||
@conf.configs.should have(1).items
|
||||
@conf.get(0).should == @configs[0]
|
||||
|
||||
@conf.set(@raw_configs[1])
|
||||
@conf.configs.should have(2).items
|
||||
@conf.get(1).should == @configs[1]
|
||||
|
||||
@conf.set(@raw_configs[1], :the_one)
|
||||
@conf.configs.should have(2).items
|
||||
@conf.get(1).should == @configs[1]
|
||||
@conf.get(:the_one).should == @configs[1]
|
||||
|
||||
@conf.load_array(@raw_configs)
|
||||
@conf.configs.should have(3).items
|
||||
@conf.get_primary.should == @configs[0]
|
||||
@conf.primary = 1
|
||||
@conf.get_primary.should == @configs[1]
|
||||
end
|
||||
|
||||
it "should #find_next" do
|
||||
@conf.load(@raw_configs)
|
||||
@conf.configs.should have(3).items
|
||||
@conf.find_next(@configs[0]).should == @configs[1]
|
||||
@conf.find_next(@configs[1]).should == @configs[2]
|
||||
@conf.find_next(@configs[2]).should == @configs[0]
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
Reference in New Issue
Block a user