From b3fc9826b0e9d15f7eab6a80fb16c5619547597e Mon Sep 17 00:00:00 2001 From: Jim Myhrberg Date: Wed, 26 Jan 2011 15:29:46 +0000 Subject: [PATCH 01/25] initial import of failover code --- .gitignore | 24 +++++ .rspec | 2 + .rvmrc | 1 + Rakefile | 49 ++++++++++ amqp-failover.gemspec | 22 +++-- lib/amqp-failover.rb | 5 - lib/amqp/failover.rb | 18 ++++ lib/amqp/failover/basic_client.rb | 55 +++++++++++ .../failover/basic_client/on_disconnect.rb | 46 +++++++++ lib/amqp/failover/config.rb | 94 +++++++++++++++++++ lib/amqp/failover/fallback.rb | 44 +++++++++ lib/amqp/failover/logger.rb | 31 ++++++ lib/amqp/failover/logic.rb | 86 +++++++++++++++++ lib/amqp/failover/logic/failed_config.rb | 33 +++++++ .../failover}/version.rb | 4 +- spec/spec_helper.rb | 9 ++ spec/unit/amqp/failover/config_spec.rb | 59 ++++++++++++ 17 files changed, 569 insertions(+), 13 deletions(-) create mode 100644 .rspec create mode 100644 .rvmrc delete mode 100644 lib/amqp-failover.rb create mode 100644 lib/amqp/failover.rb create mode 100644 lib/amqp/failover/basic_client.rb create mode 100644 lib/amqp/failover/basic_client/on_disconnect.rb create mode 100644 lib/amqp/failover/config.rb create mode 100644 lib/amqp/failover/fallback.rb create mode 100644 lib/amqp/failover/logger.rb create mode 100644 lib/amqp/failover/logic.rb create mode 100644 lib/amqp/failover/logic/failed_config.rb rename lib/{amqp-failover => amqp/failover}/version.rb (61%) create mode 100644 spec/spec_helper.rb create mode 100644 spec/unit/amqp/failover/config_spec.rb diff --git a/.gitignore b/.gitignore index 4040c6c..1724f70 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,28 @@ +## MAC OS +.DS_Store + +## TEXTMATE +*.tmproj +tmtags + +## EMACS +*~ +\#* +.\#* + +## VIM +*.swp + +## PROJECT::GENERAL +coverage +rdoc *.gem .bundle Gemfile.lock pkg/* + +## PROJECT::SPECIFIC +.yardoc/* +spec/db/* +doc/* + diff --git a/.rspec b/.rspec new file mode 100644 index 0000000..8c18f1a --- /dev/null +++ b/.rspec @@ -0,0 +1,2 @@ +--format documentation +--color diff --git a/.rvmrc b/.rvmrc new file mode 100644 index 0000000..f1c0b0c --- /dev/null +++ b/.rvmrc @@ -0,0 +1 @@ +rvm gemset use amqp-failover diff --git a/Rakefile b/Rakefile index 14cfe0b..9eaaf97 100644 --- a/Rakefile +++ b/Rakefile @@ -1,2 +1,51 @@ +$LOAD_PATH.unshift File.expand_path("lib", File.dirname(__FILE__)) + require 'bundler' Bundler::GemHelper.install_tasks + + +# +# Rspec +# + +require 'rspec/core/rake_task' +desc "Run all specs" +task :spec => ["spec:unit", "spec:integration"] + +RSpec::Core::RakeTask.new('spec:unit') do |spec| + spec.pattern = 'spec/unit/**/*_spec.rb' +end + +RSpec::Core::RakeTask.new('spec:integration') do |spec| + spec.pattern = 'spec/integration/**/*_spec.rb' +end + +RSpec::Core::RakeTask.new(:rcov) do |spec| + spec.pattern = 'spec/**/*_spec.rb' + spec.rcov = true +end + + +# +# Yard +# + +begin + require 'yard' + YARD::Rake::YardocTask.new +rescue LoadError + task :yardoc do + abort "YARD is not available. In order to run yardoc, you must: sudo gem install yard" + end +end + + +# +# Misc. +# + +desc "Start irb with amqp-failover pre-loaded" +task :console do + exec "irb -r spec/spec_helper" +end +task :c => :console diff --git a/amqp-failover.gemspec b/amqp-failover.gemspec index 5709d93..8814860 100644 --- a/amqp-failover.gemspec +++ b/amqp-failover.gemspec @@ -1,16 +1,16 @@ # -*- encoding: utf-8 -*- $:.push File.expand_path("../lib", __FILE__) -require "amqp-failover/version" +require "amqp/failover/version" Gem::Specification.new do |s| s.name = "amqp-failover" - s.version = Amqp::Failover::VERSION + s.version = AMQP::Failover::VERSION s.platform = Gem::Platform::RUBY - s.authors = ["TODO: Write your name"] - s.email = ["TODO: Write your email address"] - s.homepage = "" - s.summary = %q{TODO: Write a gem summary} - s.description = %q{TODO: Write a gem description} + s.authors = ["Jim Myhrberg"] + s.email = ["contact@jimeh.me"] + s.homepage = 'http://github.com/jimeh/amqp-failover' + s.summary = 'Add multi-server failover and fallback to amqp gem.' + s.description = 'Add multi-server failover and fallback to amqp gem.' s.rubyforge_project = "amqp-failover" @@ -18,4 +18,12 @@ Gem::Specification.new do |s| s.test_files = `git ls-files -- {test,spec,features}/*`.split("\n") s.executables = `git ls-files -- bin/*`.split("\n").map{ |f| File.basename(f) } s.require_paths = ["lib"] + + s.add_runtime_dependency 'amqp', '>= 0.7.0' + + s.add_development_dependency 'rake', '>= 0.8.7' + s.add_development_dependency 'rack-test', '>= 0.5.6' + s.add_development_dependency 'rspec', '>= 2.1.0' + s.add_development_dependency 'yard', '>= 0.6.3' + s.add_development_dependency 'ruby-debug' end diff --git a/lib/amqp-failover.rb b/lib/amqp-failover.rb deleted file mode 100644 index 9f348e0..0000000 --- a/lib/amqp-failover.rb +++ /dev/null @@ -1,5 +0,0 @@ -module Amqp - module Failover - # Your code goes here... - end -end diff --git a/lib/amqp/failover.rb b/lib/amqp/failover.rb new file mode 100644 index 0000000..3457ad4 --- /dev/null +++ b/lib/amqp/failover.rb @@ -0,0 +1,18 @@ +# encoding: utf-8 + +require 'amqp' +require 'yaml' + +require 'amqp/failover/basic_client' +require 'amqp/failover/config' +require 'amqp/failover/fallback' +require 'amqp/failover/logger' +require 'amqp/failover/logic' +require 'amqp/failover/logic/failed_config' +require 'amqp/failover/version' + +module AMQP + module Failover + + end +end diff --git a/lib/amqp/failover/basic_client.rb b/lib/amqp/failover/basic_client.rb new file mode 100644 index 0000000..c558829 --- /dev/null +++ b/lib/amqp/failover/basic_client.rb @@ -0,0 +1,55 @@ +# encoding: utf-8 + +module AMQP + module Failover + module BasicClient + include AMQP::BasicClient + + class Error < Exception; end + + attr_accessor :on_disconnect + attr_accessor :settings + + def self.extended(base) + base.on_disconnect = proc { + OnDisconnect.new(base).call + } + end + + def logger + @logger ||= Logger.new + end + + def failover_conf + @failover_conf ||= Config.new + end + + def configs + failover_conf.configs + end + + def clean_exit(msg = nil) + msg ||= "clean exit" + logger.info(msg) + logger.error(msg) + Process.exit + end + + def process_frame(frame) + if mq = channels[frame.channel] + mq.process_frame(frame) + return + end + + if frame.is_a?(::AMQP::Frame::Method) && (method = frame.payload).is_a?(::AMQP::Protocol::Connection::Close) + if method.reply_text =~ /^NOT_ALLOWED/ + raise ::AMQP::Error, "#{method.reply_text} in #{::AMQP::Protocol.classes[method.class_id].methods[method.method_id]}" + end + end + super(frame) + end + + end # BasicClient + end # Failover +end # AMQP + diff --git a/lib/amqp/failover/basic_client/on_disconnect.rb b/lib/amqp/failover/basic_client/on_disconnect.rb new file mode 100644 index 0000000..ebf3927 --- /dev/null +++ b/lib/amqp/failover/basic_client/on_disconnect.rb @@ -0,0 +1,46 @@ +# encoding: utf-8 + +module AMQP + module Failover + module BasicClient + class OnDisconnect + + attr_accessor :base + attr_accessor :failover + attr_accessor :configs + attr_accessor :logger + + def initialize(base) + @base = base + @configs = @base.configs + @failover_conf = @base.failover_conf + @logger = @base.logger + end + + def call + @logic ||= Logic.new(@configs.configs, @failover_conf.get_primary, @failover_conf.failover_config) + if (new_settings = @logic.failover_from(@base.settings)) + log_message = "Could not connect to or lost connection to server #{@base.settings[:host]}:#{@base.settings[:port]}. " + + "Attempting connection to: #{new_settings[:host]}:#{new_settings[:port]}" + @logger.error(log_message) + @logger.info(log_message) + + if @failover_conf.get_primary == @base.settings + FallbackMonitor.monitor(@failover_conf.get_primary) do + @base.clean_exit("Primary server (#{@failover_conf.get_primary[:host]}:#{@failover_conf.get_primary[:port]}) is back. " + + "Performing clean exit to be relaunched with primary config.") + end + end + + @base.settings = new_settings + @base.reconnect + else + raise Error, "Could not connect to server #{@base.settings[:host]}:#{@base.settings[:port]}" + end + end + + end # OnDisconnect + end # BasicClient + end # Failover +end # AMQP + diff --git a/lib/amqp/failover/config.rb b/lib/amqp/failover/config.rb new file mode 100644 index 0000000..6e218d6 --- /dev/null +++ b/lib/amqp/failover/config.rb @@ -0,0 +1,94 @@ +# encoding: utf-8 + +module AMQP + module Failover + class Config + + attr_accessor :configs + attr_accessor :failover_config + + def failover_config + @failover_config ||= { :retry_timeout => 30 } + end + + def refs + @refs ||= {} + end + + def configs + @configs ||= [] + end + + def primary + @primary ||= 0 + end + + def primary=(ref) + @primary = ref + end + + def get_primary + get(primary) || default_config + end + + def set_primary(conf = {}) + set(conf, primary) + end + + def get(ref = nil) + return configs[ref] if ref.is_a?(Fixnum) + configs[refs[ref]] if refs[ref] + end + + def set(conf = {}, ref = nil) + conf = default_config.merge(conf) + configs << conf if (index = configs.index(conf)).nil? + if ref + refs[ref] = (index || configs.index(conf)) + end + end + + def find_next(conf = {}) + current = configs.index(conf) + configs[(current+1 == configs.size) ? 0 : current+1] if current + end + + def load_file(file, env = nil) + raise ArgumentError, "Can't find #{file}" unless File.exists?(file) + load(YAML.load_file(file)[env || "development"]) + end + + def load_yaml(data, env = nil) + load(YAML.load(data)[env || "development"]) + end + + def load(conf) + if conf.is_a?(::Array) + load_array(conf) + elsif conf.is_a?(::Hash) + load_hash(conf) + end + end + + def load_array(confs = []) + @configs = nil + confs.each do |conf| + load_hash(conf) + end + end + + def load_hash(conf = {}) + conf = conf.inject({}) do |result, (key, value)| + result[key.is_a?(String) ? key.to_sym : key] = value + result + end + self.set(conf) + end + + def default_config + AMQP.settings + end + + end # Config + end # Failover +end # AMQP diff --git a/lib/amqp/failover/fallback.rb b/lib/amqp/failover/fallback.rb new file mode 100644 index 0000000..4bf58a9 --- /dev/null +++ b/lib/amqp/failover/fallback.rb @@ -0,0 +1,44 @@ +# encoding: utf-8 + +module AMQP + module Failover + class Fallback < EM::Connection + + class << self + attr_accessor :connection + end + + def initialize(args) + @done = args[:done] + @timer = args[:timer] + end + + def connection_completed + @done.call + @timer.cancel + close_connection + end + + def self.monitor(conf = {}, &block) + if EM.reactor_running? + start_monitoring(conf, &block) + else + EM.run { start_monitoring(conf, &block) } + end + end + + def self.start_monitoring(conf = {}, &block) + conf = conf.clone + conf[:done] = block + conf[:timer] = EM::PeriodicTimer.new(conf[:retry_interval] || 5) do + @connection = connect(conf) + end + end + + def self.connect(conf) + EM.connect(conf[:host], conf[:port], self, conf) + end + + end # Fallback + end # Failover +end # AMQP diff --git a/lib/amqp/failover/logger.rb b/lib/amqp/failover/logger.rb new file mode 100644 index 0000000..4f8d4b0 --- /dev/null +++ b/lib/amqp/failover/logger.rb @@ -0,0 +1,31 @@ +# encoding: utf-8 + +module AMQP + module Failover + class Logger + + attr_accessor :enabled + + def initialize(enabled = nil) + @enabled = enabled || true + end + + def error(*msg) + msg[0] = "[ERROR]: " + msg[0] if msg[0].is_a?(String) + write(*msg) + end + + def info(*msg) + write(*msg) + end + + private + + def write(*msg) + return if !@enabled + puts *msg + end + + end # Logger + end # Failover +end # AMQP diff --git a/lib/amqp/failover/logic.rb b/lib/amqp/failover/logic.rb new file mode 100644 index 0000000..c2092ab --- /dev/null +++ b/lib/amqp/failover/logic.rb @@ -0,0 +1,86 @@ +# encoding: utf-8 + +module AMQP + module Failover + class Logic + + attr_reader :latest_failed + attr_accessor :primary + attr_accessor :retry_timeout + attr_accessor :fallback + + def initialize(confs = nil, primary = nil, options = {}) + @primary = primary + @retry_timeout = (options.delete(:retry_timeout) || 30) + self.configs = confs if !confs.nil? + end + + def refs + @refs ||= {} + end + + def configs + @configs ||= [] + end + + def configs=(confs = []) + @configs = [] + confs.each do |conf| + if conf.is_a?(Array) + add_config(conf[1], conf[0]) + else + add_config(conf) + end + end + end + + def add_config(conf = {}, ref = nil) + index = configs.index(conf) + configs << FailedConfig.new(conf) if index.nil? + refs[ref] = (index || configs.index(conf)) if !ref.nil? + end + + def failover_from(conf = {}, time = nil) + failed_with(conf, nil, time) + next_config + end + + def failed_with(conf = {}, ref = nil, time = nil) + time ||= Time.now + if index = configs.index(conf) + configs[index].last_fail = time + @latest_failed = configs[index] + else + configs << FailedConfig.new(conf, time) + @latest_failed = configs.last + end + refs[ref] = (index || configs.index(conf)) if !ref.nil? + end + + def next_config(retry_timeout = nil, after = nil) + return nil if configs.size <= 1 + retry_timeout ||= @retry_timeout + after ||= @latest_failed + index = configs.index(after) + available = (index > 0) ? configs[index+1..-1] + configs[0..index-1] : configs[1..-1] + available.each do |conf| + return conf if conf.last_fail.nil? || (conf.last_fail + retry_timeout.seconds) < Time.now + end + return nil + end + + def last_fail_of(match) + ((match.is_a?(Hash) ? get_by_conf(match) : get_by_ref(match)) || FailedConfig.new).last_fail + end + + def get_by_conf(conf = {}) + configs[configs.index(conf)] + end + + def get_by_ref(ref = nil) + configs[refs[ref]] if refs[ref] + end + + end # Logic + end # Failover +end # AMQP diff --git a/lib/amqp/failover/logic/failed_config.rb b/lib/amqp/failover/logic/failed_config.rb new file mode 100644 index 0000000..7c488ea --- /dev/null +++ b/lib/amqp/failover/logic/failed_config.rb @@ -0,0 +1,33 @@ +# encoding: utf-8 + +module AMQP + module Failover + class Logic + class FailedConfig < ::Hash + + attr_accessor :last_fail + + def initialize(hash = {}, last_fail_date = nil) + self.replace(hash) + self.last_fail = last_fail_date if last_fail_date + end + + # order by latest fail, potentially useful if random config selection is used + def <=>(other) + if self.respond_to?(:last_fail) && other.respond_to?(:last_fail) + if self.last_fail.nil? && other.last_fail.nil? + return 0 + elsif self.last_fail.nil? && !other.last_fail.nil? + return 1 + elsif !self.last_fail.nil? && other.last_fail.nil? + return -1 + end + return other.last_fail <=> self.last_fail + end + return 0 + end + + end # FailedConfig + end # Logic + end # Failover +end # AMQP diff --git a/lib/amqp-failover/version.rb b/lib/amqp/failover/version.rb similarity index 61% rename from lib/amqp-failover/version.rb rename to lib/amqp/failover/version.rb index 23ec700..7843f36 100644 --- a/lib/amqp-failover/version.rb +++ b/lib/amqp/failover/version.rb @@ -1,4 +1,6 @@ -module Amqp +# encoding: utf-8 + +module AMQP module Failover VERSION = "0.0.1" end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb new file mode 100644 index 0000000..8d3dae3 --- /dev/null +++ b/spec/spec_helper.rb @@ -0,0 +1,9 @@ +# add project-relative load paths +$LOAD_PATH.unshift File.dirname(__FILE__) +$LOAD_PATH.unshift File.join(File.dirname(__FILE__), '..', 'lib') + +# require stuff +require 'rubygems' +require 'amqp/failover' +require 'rspec' +require 'rspec/autorun' \ No newline at end of file diff --git a/spec/unit/amqp/failover/config_spec.rb b/spec/unit/amqp/failover/config_spec.rb new file mode 100644 index 0000000..0372306 --- /dev/null +++ b/spec/unit/amqp/failover/config_spec.rb @@ -0,0 +1,59 @@ +# encoding: utf-8 + +require 'spec_helper' + +describe AMQP::Failover::Config do + + before(:each) do + @conf = AMQP::Failover::Config.new + [:primary, :configs, :refs].each do |var| + @conf.instance_variable_set("@#{var}", nil) + end + @raw_configs = [ + {:host => 'rabbit3.local'}, + {:host => 'rabbit2.local'}, + {:host => 'rabbit2.local', :port => 5673} + ] + @configs = @raw_configs.map { |conf| @conf.default_config.merge(conf) } + end + + after(:each) do + [:primary, :configs, :refs].each do |var| + @conf.instance_variable_set("@#{var}", nil) + end + end + + it "should set and get configs" do + @conf.primary.should == 0 + @conf.configs.should have(0).items + + @conf.set(@raw_configs[0]) + @conf.configs.should have(1).items + @conf.get(0).should == @configs[0] + + @conf.set(@raw_configs[1]) + @conf.configs.should have(2).items + @conf.get(1).should == @configs[1] + + @conf.set(@raw_configs[1], :the_one) + @conf.configs.should have(2).items + @conf.get(1).should == @configs[1] + @conf.get(:the_one).should == @configs[1] + + @conf.load_array(@raw_configs) + @conf.configs.should have(3).items + @conf.get_primary.should == @configs[0] + @conf.primary = 1 + @conf.get_primary.should == @configs[1] + end + + it "should #find_next" do + @conf.load(@raw_configs) + @conf.configs.should have(3).items + @conf.find_next(@configs[0]).should == @configs[1] + @conf.find_next(@configs[1]).should == @configs[2] + @conf.find_next(@configs[2]).should == @configs[0] + end + +end + From 350c7ffdba45a13a3d5ff98185f63d18c7d0f038 Mon Sep 17 00:00:00 2001 From: Jim Myhrberg Date: Wed, 26 Jan 2011 17:36:09 +0000 Subject: [PATCH 02/25] first steps of integration tests are in place --- lib/amqp/failover.rb | 1 - spec/integration/simple_spec.rb | 29 +++++++++++++++++++++++++++++ spec/spec_helper.rb | 5 +++++ spec/spec_server.rb | 18 ++++++++++++++++++ 4 files changed, 52 insertions(+), 1 deletion(-) create mode 100644 spec/integration/simple_spec.rb create mode 100644 spec/spec_server.rb diff --git a/lib/amqp/failover.rb b/lib/amqp/failover.rb index 3457ad4..4cf5bf6 100644 --- a/lib/amqp/failover.rb +++ b/lib/amqp/failover.rb @@ -1,6 +1,5 @@ # encoding: utf-8 -require 'amqp' require 'yaml' require 'amqp/failover/basic_client' diff --git a/spec/integration/simple_spec.rb b/spec/integration/simple_spec.rb new file mode 100644 index 0000000..8ba6021 --- /dev/null +++ b/spec/integration/simple_spec.rb @@ -0,0 +1,29 @@ +# encoding: utf-8 + +require 'spec_helper' +require 'amqp/server' +require 'spec_server' + +describe "Simple AMQP connection with Failover feature loaded" do + + before(:all) do + @log = SpecServer.log + end + + it "should be connected" do + EM.run { + @sig = EM.start_server('localhost', 15672, SpecServer) + conn = AMQP.connect(:host => 'localhost', :port => 15672) + EM.add_timer(0.1) { + conn.should be_connected + @log.size.should == 3 + (0..2).each { |i| @log[i][0].should == "send" } + @log[0][1].payload.should be_a(AMQP::Protocol::Connection::Start) + @log[1][1].payload.should be_a(AMQP::Protocol::Connection::Tune) + @log[2][1].payload.should be_a(AMQP::Protocol::Connection::OpenOk) + EM.stop + } + } + end + +end \ No newline at end of file diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 8d3dae3..ebd9559 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -4,6 +4,11 @@ $LOAD_PATH.unshift File.join(File.dirname(__FILE__), '..', 'lib') # require stuff require 'rubygems' +begin + require 'mq' +rescue Object => e + require 'amqp' +end require 'amqp/failover' require 'rspec' require 'rspec/autorun' \ No newline at end of file diff --git a/spec/spec_server.rb b/spec/spec_server.rb new file mode 100644 index 0000000..516e6bd --- /dev/null +++ b/spec/spec_server.rb @@ -0,0 +1,18 @@ +# encoding: utf-8 + +module SpecServer + include AMQP::Server + + class << self + def log + @log ||= [] + end + attr_writer :log + end + + def log(*args) + SpecServer.log << args + # silence Output + end + +end \ No newline at end of file From 0aab60ba09a020451248d7a17d147b26695994bf Mon Sep 17 00:00:00 2001 From: Jim Myhrberg Date: Thu, 27 Jan 2011 10:19:24 +0000 Subject: [PATCH 03/25] renamed AMQP::Failover::BasicClient to AMQP::FailoverClient --- lib/amqp/failover.rb | 2 +- lib/amqp/failover/basic_client.rb | 55 ------------------------------- lib/amqp/failover_client.rb | 51 ++++++++++++++++++++++++++++ 3 files changed, 52 insertions(+), 56 deletions(-) delete mode 100644 lib/amqp/failover/basic_client.rb create mode 100644 lib/amqp/failover_client.rb diff --git a/lib/amqp/failover.rb b/lib/amqp/failover.rb index 4cf5bf6..9bbac6c 100644 --- a/lib/amqp/failover.rb +++ b/lib/amqp/failover.rb @@ -2,7 +2,7 @@ require 'yaml' -require 'amqp/failover/basic_client' +require 'amqp/failover_client' require 'amqp/failover/config' require 'amqp/failover/fallback' require 'amqp/failover/logger' diff --git a/lib/amqp/failover/basic_client.rb b/lib/amqp/failover/basic_client.rb deleted file mode 100644 index c558829..0000000 --- a/lib/amqp/failover/basic_client.rb +++ /dev/null @@ -1,55 +0,0 @@ -# encoding: utf-8 - -module AMQP - module Failover - module BasicClient - include AMQP::BasicClient - - class Error < Exception; end - - attr_accessor :on_disconnect - attr_accessor :settings - - def self.extended(base) - base.on_disconnect = proc { - OnDisconnect.new(base).call - } - end - - def logger - @logger ||= Logger.new - end - - def failover_conf - @failover_conf ||= Config.new - end - - def configs - failover_conf.configs - end - - def clean_exit(msg = nil) - msg ||= "clean exit" - logger.info(msg) - logger.error(msg) - Process.exit - end - - def process_frame(frame) - if mq = channels[frame.channel] - mq.process_frame(frame) - return - end - - if frame.is_a?(::AMQP::Frame::Method) && (method = frame.payload).is_a?(::AMQP::Protocol::Connection::Close) - if method.reply_text =~ /^NOT_ALLOWED/ - raise ::AMQP::Error, "#{method.reply_text} in #{::AMQP::Protocol.classes[method.class_id].methods[method.method_id]}" - end - end - super(frame) - end - - end # BasicClient - end # Failover -end # AMQP - diff --git a/lib/amqp/failover_client.rb b/lib/amqp/failover_client.rb new file mode 100644 index 0000000..f69a6a3 --- /dev/null +++ b/lib/amqp/failover_client.rb @@ -0,0 +1,51 @@ +# encoding: utf-8 + +module AMQP + module FailoverClient + include AMQP::BasicClient + + attr_accessor :on_disconnect + attr_accessor :settings + + def self.extended(base) + base.on_disconnect = proc { + Failover::OnDisconnect.new(base).call + } + end + + def logger + @logger ||= Failover::Logger.new + end + + def failover_conf + @failover_conf ||= Failover::Config.new + end + + def configs + failover_conf.configs + end + + def clean_exit(msg = nil) + msg ||= "clean exit" + logger.info(msg) + logger.error(msg) + Process.exit + end + + def process_frame(frame) + if mq = channels[frame.channel] + mq.process_frame(frame) + return + end + + if frame.is_a?(AMQP::Frame::Method) && (method = frame.payload).is_a?(AMQP::Protocol::Connection::Close) + if method.reply_text =~ /^NOT_ALLOWED/ + raise AMQP::Error, "#{method.reply_text} in #{::AMQP::Protocol.classes[method.class_id].methods[method.method_id]}" + end + end + super(frame) + end + + end # FailoverClient +end # AMQP + From f35303b8b2a0ca2c28e594146149f4cc3a6ad8e4 Mon Sep 17 00:00:00 2001 From: Jim Myhrberg Date: Thu, 27 Jan 2011 10:19:54 +0000 Subject: [PATCH 04/25] use FailoverClient in simple integration spec --- spec/integration/simple_spec.rb | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/spec/integration/simple_spec.rb b/spec/integration/simple_spec.rb index 8ba6021..11e120c 100644 --- a/spec/integration/simple_spec.rb +++ b/spec/integration/simple_spec.rb @@ -4,13 +4,14 @@ require 'spec_helper' require 'amqp/server' require 'spec_server' -describe "Simple AMQP connection with Failover feature loaded" do +describe "Simple AMQP connection with FailoverClient" do before(:all) do @log = SpecServer.log end it "should be connected" do + AMQP.client = AMQP::FailoverClient EM.run { @sig = EM.start_server('localhost', 15672, SpecServer) conn = AMQP.connect(:host => 'localhost', :port => 15672) From 079bb5388c2f868e02701e6ed480729e0d26ae73 Mon Sep 17 00:00:00 2001 From: Jim Myhrberg Date: Thu, 27 Jan 2011 10:20:13 +0000 Subject: [PATCH 05/25] fix whitespaces, cause really, why not? --- lib/amqp/failover/basic_client/on_disconnect.rb | 1 - lib/amqp/failover/logger.rb | 14 +++++++------- spec/integration/simple_spec.rb | 4 ++-- spec/spec_helper.rb | 2 ++ spec/spec_server.rb | 2 +- 5 files changed, 12 insertions(+), 11 deletions(-) diff --git a/lib/amqp/failover/basic_client/on_disconnect.rb b/lib/amqp/failover/basic_client/on_disconnect.rb index ebf3927..4df3d8e 100644 --- a/lib/amqp/failover/basic_client/on_disconnect.rb +++ b/lib/amqp/failover/basic_client/on_disconnect.rb @@ -43,4 +43,3 @@ module AMQP end # BasicClient end # Failover end # AMQP - diff --git a/lib/amqp/failover/logger.rb b/lib/amqp/failover/logger.rb index 4f8d4b0..b1e1706 100644 --- a/lib/amqp/failover/logger.rb +++ b/lib/amqp/failover/logger.rb @@ -3,29 +3,29 @@ module AMQP module Failover class Logger - + attr_accessor :enabled - + def initialize(enabled = nil) @enabled = enabled || true end - + def error(*msg) msg[0] = "[ERROR]: " + msg[0] if msg[0].is_a?(String) write(*msg) end - + def info(*msg) write(*msg) end - + private - + def write(*msg) return if !@enabled puts *msg end - + end # Logger end # Failover end # AMQP diff --git a/spec/integration/simple_spec.rb b/spec/integration/simple_spec.rb index 11e120c..509fbf9 100644 --- a/spec/integration/simple_spec.rb +++ b/spec/integration/simple_spec.rb @@ -9,7 +9,7 @@ describe "Simple AMQP connection with FailoverClient" do before(:all) do @log = SpecServer.log end - + it "should be connected" do AMQP.client = AMQP::FailoverClient EM.run { @@ -27,4 +27,4 @@ describe "Simple AMQP connection with FailoverClient" do } end -end \ No newline at end of file +end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index ebd9559..74cfe0f 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -1,3 +1,5 @@ +# encoding: utf-8 + # add project-relative load paths $LOAD_PATH.unshift File.dirname(__FILE__) $LOAD_PATH.unshift File.join(File.dirname(__FILE__), '..', 'lib') diff --git a/spec/spec_server.rb b/spec/spec_server.rb index 516e6bd..a83dc2f 100644 --- a/spec/spec_server.rb +++ b/spec/spec_server.rb @@ -15,4 +15,4 @@ module SpecServer # silence Output end -end \ No newline at end of file +end From 01241a168965637716f94d06c7502a3800f8162f Mon Sep 17 00:00:00 2001 From: Jim Myhrberg Date: Thu, 27 Jan 2011 10:20:29 +0000 Subject: [PATCH 06/25] minor changes to spec rake tasks --- Rakefile | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/Rakefile b/Rakefile index 9eaaf97..df548be 100644 --- a/Rakefile +++ b/Rakefile @@ -9,9 +9,14 @@ Bundler::GemHelper.install_tasks # require 'rspec/core/rake_task' -desc "Run all specs" -task :spec => ["spec:unit", "spec:integration"] +RSpec::Core::RakeTask.new('spec:all') do |spec| + spec.pattern = [ 'spec/unit/**/*_spec.rb', + 'spec/integration/**/*_spec.rb' ] +end + +desc "Run unit specs" +task :spec => ["spec:unit"] RSpec::Core::RakeTask.new('spec:unit') do |spec| spec.pattern = 'spec/unit/**/*_spec.rb' end From 425caf9fb52016aff982dc684fc337b2aea1fd78 Mon Sep 17 00:00:00 2001 From: Jim Myhrberg Date: Thu, 27 Jan 2011 11:10:37 +0000 Subject: [PATCH 07/25] updated load paths and structure of specs a bit --- spec/integration/simple_spec.rb | 9 +++++---- spec/{spec_server.rb => server_helper.rb} | 2 +- spec/spec_helper.rb | 10 ++++++---- spec/unit/amqp/failover/config_spec.rb | 1 + 4 files changed, 13 insertions(+), 9 deletions(-) rename spec/{spec_server.rb => server_helper.rb} (91%) diff --git a/spec/integration/simple_spec.rb b/spec/integration/simple_spec.rb index 509fbf9..cfe5968 100644 --- a/spec/integration/simple_spec.rb +++ b/spec/integration/simple_spec.rb @@ -1,19 +1,20 @@ # encoding: utf-8 +$LOAD_PATH.unshift File.expand_path(File.dirname(__FILE__)) require 'spec_helper' require 'amqp/server' -require 'spec_server' +require 'server_helper' describe "Simple AMQP connection with FailoverClient" do before(:all) do - @log = SpecServer.log + @log = ServerHelper.log + AMQP.client = AMQP::FailoverClient end it "should be connected" do - AMQP.client = AMQP::FailoverClient EM.run { - @sig = EM.start_server('localhost', 15672, SpecServer) + sig = EM.start_server('localhost', 15672, ServerHelper) conn = AMQP.connect(:host => 'localhost', :port => 15672) EM.add_timer(0.1) { conn.should be_connected diff --git a/spec/spec_server.rb b/spec/server_helper.rb similarity index 91% rename from spec/spec_server.rb rename to spec/server_helper.rb index a83dc2f..80c8ee4 100644 --- a/spec/spec_server.rb +++ b/spec/server_helper.rb @@ -1,6 +1,6 @@ # encoding: utf-8 -module SpecServer +module ServerHelper include AMQP::Server class << self diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 74cfe0f..0301a91 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -1,16 +1,18 @@ # encoding: utf-8 # add project-relative load paths -$LOAD_PATH.unshift File.dirname(__FILE__) -$LOAD_PATH.unshift File.join(File.dirname(__FILE__), '..', 'lib') +$LOAD_PATH.unshift File.expand_path(File.join(File.dirname(__FILE__), '..', 'lib')) +$LOAD_PATH.unshift File.expand_path(File.dirname(__FILE__)) # require stuff require 'rubygems' + begin require 'mq' -rescue Object => e +rescue LoadError => e require 'amqp' end require 'amqp/failover' + require 'rspec' -require 'rspec/autorun' \ No newline at end of file +require 'rspec/autorun' diff --git a/spec/unit/amqp/failover/config_spec.rb b/spec/unit/amqp/failover/config_spec.rb index 0372306..f335e5e 100644 --- a/spec/unit/amqp/failover/config_spec.rb +++ b/spec/unit/amqp/failover/config_spec.rb @@ -1,4 +1,5 @@ # encoding: utf-8 +$LOAD_PATH.unshift File.expand_path(File.dirname(__FILE__)) require 'spec_helper' From f1f89749a0f678045171bbaacb1231009d6d0d39 Mon Sep 17 00:00:00 2001 From: Jim Myhrberg Date: Thu, 27 Jan 2011 11:18:28 +0000 Subject: [PATCH 08/25] added specs for AMQP::Failover::Fallback --- spec/unit/amqp/failover/fallback_helper.rb | 31 +++++++++++++ spec/unit/amqp/failover/fallback_spec.rb | 51 ++++++++++++++++++++++ 2 files changed, 82 insertions(+) create mode 100644 spec/unit/amqp/failover/fallback_helper.rb create mode 100644 spec/unit/amqp/failover/fallback_spec.rb diff --git a/spec/unit/amqp/failover/fallback_helper.rb b/spec/unit/amqp/failover/fallback_helper.rb new file mode 100644 index 0000000..a7ab4da --- /dev/null +++ b/spec/unit/amqp/failover/fallback_helper.rb @@ -0,0 +1,31 @@ +class FallbackHelper < AMQP::Failover::Fallback + + class << self + alias :real_start_monitoring :start_monitoring + def start_monitoring(*args, &block) + $called << :start_monitoring + real_start_monitoring(*args, &block) + end + end + + alias :real_initialize :initialize + def initialize(*args) + $called << :initialize + EM.start_server('127.0.0.1', 9999) if $start_count == 2 + $start_count += 1 + real_initialize(*args) + end + + alias :real_connection_completed :connection_completed + def connection_completed + $called << :connection_completed + real_connection_completed + end + + alias :real_close_connection :close_connection + def close_connection + $called << :close_connection + real_close_connection + end + +end \ No newline at end of file diff --git a/spec/unit/amqp/failover/fallback_spec.rb b/spec/unit/amqp/failover/fallback_spec.rb new file mode 100644 index 0000000..7a9ea1f --- /dev/null +++ b/spec/unit/amqp/failover/fallback_spec.rb @@ -0,0 +1,51 @@ +# encoding: utf-8 +$LOAD_PATH.unshift File.expand_path(File.dirname(__FILE__)) + +require 'spec_helper' +require 'fallback_helper' + +describe AMQP::Failover::Fallback do + + before(:each) do + $called = [] + $start_count = 0 + @args = { :host => 'localhost', :port => 9999, :retry_interval => 0.01 } + end + + it "should initialize" do + EM.run { + EM.start_server('127.0.0.1', 9999) + @mon = FallbackHelper.monitor(@args) do + $called << :done_block + EM.stop_event_loop + end + } + $start_count.should == 1 + $called.should have(5).items + $called.uniq.should have(5).items + $called.should include(:start_monitoring) + $called.should include(:initialize) + $called.should include(:connection_completed) + $called.should include(:close_connection) + $called.should include(:done_block) + end + + it "should retry on error" do + EM.run { + @mon = FallbackHelper.monitor(@args) do + $called << :done_block + EM.stop_event_loop + end + } + $start_count.should >= 3 + $called.should have($start_count + 4).items + $called.uniq.should have(5).items + $called.should include(:start_monitoring) + $called.should include(:initialize) + $called.should include(:connection_completed) + $called.should include(:close_connection) + $called.should include(:done_block) + end + +end + From 412eafacd2d1d4b5afa1d58ba5e841a8ee8e31a5 Mon Sep 17 00:00:00 2001 From: Jim Myhrberg Date: Thu, 27 Jan 2011 11:53:25 +0000 Subject: [PATCH 09/25] restructured a few things again --- lib/amqp/failover.rb | 3 +- .../failover/basic_client/on_disconnect.rb | 45 ------------------- lib/amqp/failover/disconnected.rb | 43 ++++++++++++++++++ .../{fallback.rb => server_discovery.rb} | 22 ++++----- ...k_helper.rb => server_discovery_helper.rb} | 2 +- ...lback_spec.rb => server_discovery_spec.rb} | 9 ++-- 6 files changed, 61 insertions(+), 63 deletions(-) delete mode 100644 lib/amqp/failover/basic_client/on_disconnect.rb create mode 100644 lib/amqp/failover/disconnected.rb rename lib/amqp/failover/{fallback.rb => server_discovery.rb} (91%) rename spec/unit/amqp/failover/{fallback_helper.rb => server_discovery_helper.rb} (92%) rename spec/unit/amqp/failover/{fallback_spec.rb => server_discovery_spec.rb} (86%) diff --git a/lib/amqp/failover.rb b/lib/amqp/failover.rb index 9bbac6c..c0dd01a 100644 --- a/lib/amqp/failover.rb +++ b/lib/amqp/failover.rb @@ -4,10 +4,11 @@ require 'yaml' require 'amqp/failover_client' require 'amqp/failover/config' -require 'amqp/failover/fallback' +require 'amqp/failover/disconnected' require 'amqp/failover/logger' require 'amqp/failover/logic' require 'amqp/failover/logic/failed_config' +require 'amqp/failover/server_discovery' require 'amqp/failover/version' module AMQP diff --git a/lib/amqp/failover/basic_client/on_disconnect.rb b/lib/amqp/failover/basic_client/on_disconnect.rb deleted file mode 100644 index 4df3d8e..0000000 --- a/lib/amqp/failover/basic_client/on_disconnect.rb +++ /dev/null @@ -1,45 +0,0 @@ -# encoding: utf-8 - -module AMQP - module Failover - module BasicClient - class OnDisconnect - - attr_accessor :base - attr_accessor :failover - attr_accessor :configs - attr_accessor :logger - - def initialize(base) - @base = base - @configs = @base.configs - @failover_conf = @base.failover_conf - @logger = @base.logger - end - - def call - @logic ||= Logic.new(@configs.configs, @failover_conf.get_primary, @failover_conf.failover_config) - if (new_settings = @logic.failover_from(@base.settings)) - log_message = "Could not connect to or lost connection to server #{@base.settings[:host]}:#{@base.settings[:port]}. " + - "Attempting connection to: #{new_settings[:host]}:#{new_settings[:port]}" - @logger.error(log_message) - @logger.info(log_message) - - if @failover_conf.get_primary == @base.settings - FallbackMonitor.monitor(@failover_conf.get_primary) do - @base.clean_exit("Primary server (#{@failover_conf.get_primary[:host]}:#{@failover_conf.get_primary[:port]}) is back. " + - "Performing clean exit to be relaunched with primary config.") - end - end - - @base.settings = new_settings - @base.reconnect - else - raise Error, "Could not connect to server #{@base.settings[:host]}:#{@base.settings[:port]}" - end - end - - end # OnDisconnect - end # BasicClient - end # Failover -end # AMQP diff --git a/lib/amqp/failover/disconnected.rb b/lib/amqp/failover/disconnected.rb new file mode 100644 index 0000000..03f7181 --- /dev/null +++ b/lib/amqp/failover/disconnected.rb @@ -0,0 +1,43 @@ +# encoding: utf-8 + +module AMQP + module Failover + class Disconnected + + attr_accessor :base + attr_accessor :failover + attr_accessor :configs + attr_accessor :logger + + def initialize(base) + @base = base + @configs = @base.configs + @failover_conf = @base.failover_conf + @logger = @base.logger + end + + def call + @logic ||= Logic.new(@configs.configs, @failover_conf.get_primary, @failover_conf.failover_config) + if (new_settings = @logic.failover_from(@base.settings)) + log_message = "Could not connect to or lost connection to server #{@base.settings[:host]}:#{@base.settings[:port]}. " + + "Attempting connection to: #{new_settings[:host]}:#{new_settings[:port]}" + @logger.error(log_message) + @logger.info(log_message) + + if @failover_conf.get_primary == @base.settings + ServerDiscovery.monitor(@failover_conf.get_primary) do + @base.clean_exit("Primary server (#{@failover_conf.get_primary[:host]}:#{@failover_conf.get_primary[:port]}) is back. " + + "Performing clean exit to be relaunched with primary config.") + end + end + + @base.settings = new_settings + @base.reconnect + else + raise Error, "Could not connect to server #{@base.settings[:host]}:#{@base.settings[:port]}" + end + end + + end # Disconnected + end # Failover +end # AMQP diff --git a/lib/amqp/failover/fallback.rb b/lib/amqp/failover/server_discovery.rb similarity index 91% rename from lib/amqp/failover/fallback.rb rename to lib/amqp/failover/server_discovery.rb index 4bf58a9..9ec62db 100644 --- a/lib/amqp/failover/fallback.rb +++ b/lib/amqp/failover/server_discovery.rb @@ -2,12 +2,20 @@ module AMQP module Failover - class Fallback < EM::Connection + class ServerDiscovery < EM::Connection class << self attr_accessor :connection end + def self.monitor(conf = {}, &block) + if EM.reactor_running? + start_monitoring(conf, &block) + else + EM.run { start_monitoring(conf, &block) } + end + end + def initialize(args) @done = args[:done] @timer = args[:timer] @@ -18,15 +26,7 @@ module AMQP @timer.cancel close_connection end - - def self.monitor(conf = {}, &block) - if EM.reactor_running? - start_monitoring(conf, &block) - else - EM.run { start_monitoring(conf, &block) } - end - end - + def self.start_monitoring(conf = {}, &block) conf = conf.clone conf[:done] = block @@ -39,6 +39,6 @@ module AMQP EM.connect(conf[:host], conf[:port], self, conf) end - end # Fallback + end # ServerDiscovery end # Failover end # AMQP diff --git a/spec/unit/amqp/failover/fallback_helper.rb b/spec/unit/amqp/failover/server_discovery_helper.rb similarity index 92% rename from spec/unit/amqp/failover/fallback_helper.rb rename to spec/unit/amqp/failover/server_discovery_helper.rb index a7ab4da..05735c3 100644 --- a/spec/unit/amqp/failover/fallback_helper.rb +++ b/spec/unit/amqp/failover/server_discovery_helper.rb @@ -1,4 +1,4 @@ -class FallbackHelper < AMQP::Failover::Fallback +class ServerDiscoveryHelper < AMQP::Failover::ServerDiscovery class << self alias :real_start_monitoring :start_monitoring diff --git a/spec/unit/amqp/failover/fallback_spec.rb b/spec/unit/amqp/failover/server_discovery_spec.rb similarity index 86% rename from spec/unit/amqp/failover/fallback_spec.rb rename to spec/unit/amqp/failover/server_discovery_spec.rb index 7a9ea1f..4fb5498 100644 --- a/spec/unit/amqp/failover/fallback_spec.rb +++ b/spec/unit/amqp/failover/server_discovery_spec.rb @@ -2,9 +2,9 @@ $LOAD_PATH.unshift File.expand_path(File.dirname(__FILE__)) require 'spec_helper' -require 'fallback_helper' +require 'server_discovery_helper' -describe AMQP::Failover::Fallback do +describe AMQP::Failover::ServerDiscovery do before(:each) do $called = [] @@ -15,7 +15,7 @@ describe AMQP::Failover::Fallback do it "should initialize" do EM.run { EM.start_server('127.0.0.1', 9999) - @mon = FallbackHelper.monitor(@args) do + @mon = ServerDiscoveryHelper.monitor(@args) do $called << :done_block EM.stop_event_loop end @@ -32,7 +32,7 @@ describe AMQP::Failover::Fallback do it "should retry on error" do EM.run { - @mon = FallbackHelper.monitor(@args) do + @mon = ServerDiscoveryHelper.monitor(@args) do $called << :done_block EM.stop_event_loop end @@ -48,4 +48,3 @@ describe AMQP::Failover::Fallback do end end - From d5b081bc629c678c96ec12277c998bcca341bac5 Mon Sep 17 00:00:00 2001 From: Jim Myhrberg Date: Thu, 27 Jan 2011 11:53:45 +0000 Subject: [PATCH 10/25] fixed a bug in spec/server_helper --- spec/server_helper.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spec/server_helper.rb b/spec/server_helper.rb index 80c8ee4..fbd635f 100644 --- a/spec/server_helper.rb +++ b/spec/server_helper.rb @@ -5,13 +5,13 @@ module ServerHelper class << self def log - @log ||= [] + @@log ||= [] end attr_writer :log end def log(*args) - SpecServer.log << args + @@log << args # silence Output end From 8d5771a12ad8277bceb83862bc9fa3fab75ae568 Mon Sep 17 00:00:00 2001 From: Jim Myhrberg Date: Thu, 27 Jan 2011 11:59:54 +0000 Subject: [PATCH 11/25] code comments that make (more) sense ftw! --- spec/server_helper.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spec/server_helper.rb b/spec/server_helper.rb index fbd635f..4ad6ed1 100644 --- a/spec/server_helper.rb +++ b/spec/server_helper.rb @@ -10,9 +10,9 @@ module ServerHelper attr_writer :log end + # log & silence STDOUT output def log(*args) @@log << args - # silence Output end end From 5496445d4da52fda236d01412fc5b00ee40e3fec Mon Sep 17 00:00:00 2001 From: Jim Myhrberg Date: Fri, 28 Jan 2011 17:09:30 +0000 Subject: [PATCH 12/25] major restructuring, specs probably all break right now --- lib/amqp/failover.rb | 113 +++++++++++++++++- lib/amqp/failover/config.rb | 98 ++++----------- lib/amqp/failover/configs.rb | 96 +++++++++++++++ lib/amqp/failover/disconnected.rb | 43 ------- lib/amqp/failover/ext/amqp/client.rb | 52 ++++++++ lib/amqp/failover/logger.rb | 2 +- lib/amqp/failover/logic.rb | 86 ------------- lib/amqp/failover/logic/failed_config.rb | 33 ----- lib/amqp/failover/server_discovery.rb | 13 +- lib/amqp/failover/version.rb | 2 +- lib/amqp/failover_client.rb | 71 +++++++---- spec/integration/simple_spec.rb | 19 ++- spec/server_helper.rb | 18 +++ spec/spec_helper.rb | 31 +++++ spec/unit/amqp/failover/config_spec.rb | 23 ++-- .../amqp/failover/server_discovery_spec.rb | 2 +- 16 files changed, 418 insertions(+), 284 deletions(-) create mode 100644 lib/amqp/failover/configs.rb delete mode 100644 lib/amqp/failover/disconnected.rb create mode 100644 lib/amqp/failover/ext/amqp/client.rb delete mode 100644 lib/amqp/failover/logic.rb delete mode 100644 lib/amqp/failover/logic/failed_config.rb diff --git a/lib/amqp/failover.rb b/lib/amqp/failover.rb index c0dd01a..df6f4b6 100644 --- a/lib/amqp/failover.rb +++ b/lib/amqp/failover.rb @@ -4,15 +4,116 @@ require 'yaml' require 'amqp/failover_client' require 'amqp/failover/config' -require 'amqp/failover/disconnected' +require 'amqp/failover/configs' require 'amqp/failover/logger' -require 'amqp/failover/logic' -require 'amqp/failover/logic/failed_config' require 'amqp/failover/server_discovery' require 'amqp/failover/version' +require 'amqp/failover/ext/amqp/client.rb' + module AMQP - module Failover + class Failover - end -end + attr_reader :latest_failed + attr_accessor :primary + attr_accessor :retry_timeout + attr_accessor :fallback + + def initialize(confs = nil, opts = {}) + @configs = Configs.new(confs) + @options = default_options.merge(opts) + end + + # pluggable logger specifically for tracking failover and fallbacks + def self.logger + @logger ||= Logger.new + end + + def default_options + { :retry_timeout => 1, + :selection => :sequential, #TODO: Impliment next server selection algorithm + :fallback => false, + :fallback_interval => 10 } + end + + def options + @options ||= {} + end + + def fallback_interval + options[:fallback_interval] ||= default_options[:fallback_interval] + end + + def primary + configs[:primary] + end + + def refs + @refs ||= {} + end + + def configs + @configs ||= Config.new + end + + def configs=(confs = []) + @configs = nil + confs.each do |conf| + if conf.is_a?(Array) + add_config(conf[1], conf[0]) + else + add_config(conf) + end + end + end + + def add_config(conf = {}, ref = nil) + index = configs.index(conf) + configs << Config::Failed.new(conf) if index.nil? + refs[ref] = (index || configs.index(conf)) if !ref.nil? + end + + def failover_from(conf = {}, time = nil) + failed_with(conf, nil, time) + next_config + end + alias :from :failover_from + + def failed_with(conf = {}, ref = nil, time = nil) + time ||= Time.now + if index = configs.index(conf) + configs[index].last_fail = time + @latest_failed = configs[index] + else + configs << Config::Failed.new(conf, time) + @latest_failed = configs.last + end + refs[ref] = (index || configs.index(conf)) if !ref.nil? + end + + def next_config(retry_timeout = nil, after = nil) + return nil if configs.size <= 1 + retry_timeout ||= @options[:retry_timeout] + after ||= @latest_failed + index = configs.index(after) + available = (index > 0) ? configs[index+1..-1] + configs[0..index-1] : configs[1..-1] + available.each do |conf| + return conf if conf.last_fail.nil? || (conf.last_fail + retry_timeout.seconds) < Time.now + end + return nil + end + + def last_fail_of(match) + ((match.is_a?(Hash) ? get_by_conf(match) : get_by_ref(match)) || Config::Failed.new).last_fail + end + + def get_by_conf(conf = {}) + configs[configs.index(conf)] + end + + def get_by_ref(ref = nil) + configs[refs[ref]] if refs[ref] + end + + end # Failover +end # AMQP diff --git a/lib/amqp/failover/config.rb b/lib/amqp/failover/config.rb index 6e218d6..784492c 100644 --- a/lib/amqp/failover/config.rb +++ b/lib/amqp/failover/config.rb @@ -1,92 +1,36 @@ # encoding: utf-8 module AMQP - module Failover - class Config + class Failover + class Config < ::Hash - attr_accessor :configs - attr_accessor :failover_config + attr_accessor :last_fail - def failover_config - @failover_config ||= { :retry_timeout => 30 } + def initialize(hash = {}, last_fail_date = nil) + self.replace(symbolize_keys(hash)) + self.last_fail = last_fail_date if last_fail_date end - def refs - @refs ||= {} - end - - def configs - @configs ||= [] - end - - def primary - @primary ||= 0 - end - - def primary=(ref) - @primary = ref - end - - def get_primary - get(primary) || default_config - end - - def set_primary(conf = {}) - set(conf, primary) - end - - def get(ref = nil) - return configs[ref] if ref.is_a?(Fixnum) - configs[refs[ref]] if refs[ref] - end - - def set(conf = {}, ref = nil) - conf = default_config.merge(conf) - configs << conf if (index = configs.index(conf)).nil? - if ref - refs[ref] = (index || configs.index(conf)) - end - end - - def find_next(conf = {}) - current = configs.index(conf) - configs[(current+1 == configs.size) ? 0 : current+1] if current - end - - def load_file(file, env = nil) - raise ArgumentError, "Can't find #{file}" unless File.exists?(file) - load(YAML.load_file(file)[env || "development"]) - end - - def load_yaml(data, env = nil) - load(YAML.load(data)[env || "development"]) - end - - def load(conf) - if conf.is_a?(::Array) - load_array(conf) - elsif conf.is_a?(::Hash) - load_hash(conf) - end - end - - def load_array(confs = []) - @configs = nil - confs.each do |conf| - load_hash(conf) - end - end - - def load_hash(conf = {}) - conf = conf.inject({}) do |result, (key, value)| + def symbolize_keys(hash = {}) + hash.inject({}) do |result, (key, value)| result[key.is_a?(String) ? key.to_sym : key] = value result end - self.set(conf) end - def default_config - AMQP.settings + # order by latest fail, potentially useful if random config selection is used + def <=>(other) + if self.respond_to?(:last_fail) && other.respond_to?(:last_fail) + if self.last_fail.nil? && other.last_fail.nil? + return 0 + elsif self.last_fail.nil? && !other.last_fail.nil? + return 1 + elsif !self.last_fail.nil? && other.last_fail.nil? + return -1 + end + return other.last_fail <=> self.last_fail + end + return 0 end end # Config diff --git a/lib/amqp/failover/configs.rb b/lib/amqp/failover/configs.rb new file mode 100644 index 0000000..251b458 --- /dev/null +++ b/lib/amqp/failover/configs.rb @@ -0,0 +1,96 @@ +# encoding: utf-8 + +module AMQP + class Failover + class Configs < Array + + def initialize(confs = nil) + load(confs) + end + + def [](*args) + return super(*args) if args[0].is_a?(Fixnum) + return get_primary if args[0] == :primary + get(args[0]) + end + + def []=(*args) + return super(*args) if args[0].is_a?(Fixnum) + return set_primary(args.last, args[0]) if args[0] == :primary + set(args.last, args[0]) + end + + def refs + @refs ||= {} + end + + def primary + @primary ||= 0 + end + + def primary=(ref) + @primary = ref + end + + def get_primary + get(primary) || default_config + end + + def set_primary(conf = {}) + set(conf, primary) + end + + def get(ref = nil) + return self[ref] if ref.is_a?(Fixnum) + self[refs[ref]] if refs[ref] + end + + def set(conf = {}, ref = nil) + conf = Config.new(default_config.merge(conf)) + self << conf if (index = self.index(conf)).nil? + if ref + refs[ref] = (index || self.index(conf)) + end + end + + def find_next(conf = {}) + current = self.index(conf) + self[(current+1 == self.size) ? 0 : current+1] if current + end + + def load_file(file, env = nil) + raise ArgumentError, "Can't find #{file}" unless File.exists?(file) + load(YAML.load_file(file)[env || "development"]) + end + + def load_yaml(data, env = nil) + load(YAML.load(data)[env || "development"]) + end + + def load(conf) + if conf.is_a?(::Array) + load_array(conf) + elsif conf.is_a?(::Hash) + load_hash(conf) + end + end + + def self.load_array(confs = []) + self.clear + confs.each do |conf| + conf = AMQP::Client.parse_amqp_url(conf) if conf.is_a?(::String) + load_hash(conf) + end + end + + def load_hash(conf = {}) + set(Config.new(conf)) + end + + def default_config + AMQP.settings + end + + end # Config + end # Failover +end # AMQP diff --git a/lib/amqp/failover/disconnected.rb b/lib/amqp/failover/disconnected.rb deleted file mode 100644 index 03f7181..0000000 --- a/lib/amqp/failover/disconnected.rb +++ /dev/null @@ -1,43 +0,0 @@ -# encoding: utf-8 - -module AMQP - module Failover - class Disconnected - - attr_accessor :base - attr_accessor :failover - attr_accessor :configs - attr_accessor :logger - - def initialize(base) - @base = base - @configs = @base.configs - @failover_conf = @base.failover_conf - @logger = @base.logger - end - - def call - @logic ||= Logic.new(@configs.configs, @failover_conf.get_primary, @failover_conf.failover_config) - if (new_settings = @logic.failover_from(@base.settings)) - log_message = "Could not connect to or lost connection to server #{@base.settings[:host]}:#{@base.settings[:port]}. " + - "Attempting connection to: #{new_settings[:host]}:#{new_settings[:port]}" - @logger.error(log_message) - @logger.info(log_message) - - if @failover_conf.get_primary == @base.settings - ServerDiscovery.monitor(@failover_conf.get_primary) do - @base.clean_exit("Primary server (#{@failover_conf.get_primary[:host]}:#{@failover_conf.get_primary[:port]}) is back. " + - "Performing clean exit to be relaunched with primary config.") - end - end - - @base.settings = new_settings - @base.reconnect - else - raise Error, "Could not connect to server #{@base.settings[:host]}:#{@base.settings[:port]}" - end - end - - end # Disconnected - end # Failover -end # AMQP diff --git a/lib/amqp/failover/ext/amqp/client.rb b/lib/amqp/failover/ext/amqp/client.rb new file mode 100644 index 0000000..4e2b4f2 --- /dev/null +++ b/lib/amqp/failover/ext/amqp/client.rb @@ -0,0 +1,52 @@ +# encoding: utf-8 + +module AMQP + module Client + class << self + alias :connect_without_failover :connect + + # Connect with Failover supports specifying multiple AMQP servers and configurations. + # + # Argument Examples: + # - "amqp://guest:guest@host:5672,amqp://guest:guest@host:5673" + # - ["amqp://guest:guest@host:5672", "amqp://guest:guest@host:5673"] + # - [{:host => "host", :port => 5672}, {:host => "host", :port => 5673}] + # - {:hosts => ["amqp://user:pass@host:5672", "amqp://user:pass@host:5673"]} + # - {:hosts => [{:host => "host", :port => 5672}, {:host => "host", :port => 5673}]} + # + # The last two examples are by far the most flexible, cause they also let you specify + # failover and fallback specific options. Like so: + # - {:hosts => ["amqp://localhost:5672"], :fallback => false} + # + # Available failover options are: + # - :retry_timeout, time to wait before retrying a specific AMQP config after failure. + # - :fallback, monitor for original server's return and fallback to it if so. + # - :fallback_interval, seconds between each check for original server if :fallback is true. + # + def connect_with_failover(opts = nil) + opts = parse_amqp_url_or_opts(opts) + connect_without_failover(opts) + end + alias :connect :connect_with_failover + + def parse_amqp_url_or_opts(opts = nil) + if opts.is_a?(String) && opts.index(',').nil? + opts = init_failover(opts.split(',')) + elsif opts.is_a?(Array) + opts = init_failover(opts) + elsif opts.is_a?(Hash) && opts[:hosts].is_a?(Array) + confs = opts.delete[:hosts] + opts = init_failover(confs, opts) + end + opts + end + + def init_failover(confs = nil, opts = {}) + if !confs.nil? && confs.size > 0 + failover.primary.merge({ :failover => Failover.new(confs, opts) }) + end + end + + end # << self + end # Client +end # AMQP \ No newline at end of file diff --git a/lib/amqp/failover/logger.rb b/lib/amqp/failover/logger.rb index b1e1706..dc4efe7 100644 --- a/lib/amqp/failover/logger.rb +++ b/lib/amqp/failover/logger.rb @@ -1,7 +1,7 @@ # encoding: utf-8 module AMQP - module Failover + class Failover class Logger attr_accessor :enabled diff --git a/lib/amqp/failover/logic.rb b/lib/amqp/failover/logic.rb deleted file mode 100644 index c2092ab..0000000 --- a/lib/amqp/failover/logic.rb +++ /dev/null @@ -1,86 +0,0 @@ -# encoding: utf-8 - -module AMQP - module Failover - class Logic - - attr_reader :latest_failed - attr_accessor :primary - attr_accessor :retry_timeout - attr_accessor :fallback - - def initialize(confs = nil, primary = nil, options = {}) - @primary = primary - @retry_timeout = (options.delete(:retry_timeout) || 30) - self.configs = confs if !confs.nil? - end - - def refs - @refs ||= {} - end - - def configs - @configs ||= [] - end - - def configs=(confs = []) - @configs = [] - confs.each do |conf| - if conf.is_a?(Array) - add_config(conf[1], conf[0]) - else - add_config(conf) - end - end - end - - def add_config(conf = {}, ref = nil) - index = configs.index(conf) - configs << FailedConfig.new(conf) if index.nil? - refs[ref] = (index || configs.index(conf)) if !ref.nil? - end - - def failover_from(conf = {}, time = nil) - failed_with(conf, nil, time) - next_config - end - - def failed_with(conf = {}, ref = nil, time = nil) - time ||= Time.now - if index = configs.index(conf) - configs[index].last_fail = time - @latest_failed = configs[index] - else - configs << FailedConfig.new(conf, time) - @latest_failed = configs.last - end - refs[ref] = (index || configs.index(conf)) if !ref.nil? - end - - def next_config(retry_timeout = nil, after = nil) - return nil if configs.size <= 1 - retry_timeout ||= @retry_timeout - after ||= @latest_failed - index = configs.index(after) - available = (index > 0) ? configs[index+1..-1] + configs[0..index-1] : configs[1..-1] - available.each do |conf| - return conf if conf.last_fail.nil? || (conf.last_fail + retry_timeout.seconds) < Time.now - end - return nil - end - - def last_fail_of(match) - ((match.is_a?(Hash) ? get_by_conf(match) : get_by_ref(match)) || FailedConfig.new).last_fail - end - - def get_by_conf(conf = {}) - configs[configs.index(conf)] - end - - def get_by_ref(ref = nil) - configs[refs[ref]] if refs[ref] - end - - end # Logic - end # Failover -end # AMQP diff --git a/lib/amqp/failover/logic/failed_config.rb b/lib/amqp/failover/logic/failed_config.rb deleted file mode 100644 index 7c488ea..0000000 --- a/lib/amqp/failover/logic/failed_config.rb +++ /dev/null @@ -1,33 +0,0 @@ -# encoding: utf-8 - -module AMQP - module Failover - class Logic - class FailedConfig < ::Hash - - attr_accessor :last_fail - - def initialize(hash = {}, last_fail_date = nil) - self.replace(hash) - self.last_fail = last_fail_date if last_fail_date - end - - # order by latest fail, potentially useful if random config selection is used - def <=>(other) - if self.respond_to?(:last_fail) && other.respond_to?(:last_fail) - if self.last_fail.nil? && other.last_fail.nil? - return 0 - elsif self.last_fail.nil? && !other.last_fail.nil? - return 1 - elsif !self.last_fail.nil? && other.last_fail.nil? - return -1 - end - return other.last_fail <=> self.last_fail - end - return 0 - end - - end # FailedConfig - end # Logic - end # Failover -end # AMQP diff --git a/lib/amqp/failover/server_discovery.rb b/lib/amqp/failover/server_discovery.rb index 9ec62db..dc46bfb 100644 --- a/lib/amqp/failover/server_discovery.rb +++ b/lib/amqp/failover/server_discovery.rb @@ -1,18 +1,18 @@ # encoding: utf-8 module AMQP - module Failover + class Failover class ServerDiscovery < EM::Connection class << self attr_accessor :connection end - def self.monitor(conf = {}, &block) + def self.monitor(conf = {}, retry_interval = nil, &block) if EM.reactor_running? - start_monitoring(conf, &block) + start_monitoring(conf, retry_interval, &block) else - EM.run { start_monitoring(conf, &block) } + EM.run { start_monitoring(conf, retry_interval, &block) } end end @@ -27,10 +27,11 @@ module AMQP close_connection end - def self.start_monitoring(conf = {}, &block) + def self.start_monitoring(conf = {}, retry_interval = nil, &block) conf = conf.clone + retry_interval ||= 5 conf[:done] = block - conf[:timer] = EM::PeriodicTimer.new(conf[:retry_interval] || 5) do + conf[:timer] = EM::PeriodicTimer.new(retry_interval) do @connection = connect(conf) end end diff --git a/lib/amqp/failover/version.rb b/lib/amqp/failover/version.rb index 7843f36..32e75c8 100644 --- a/lib/amqp/failover/version.rb +++ b/lib/amqp/failover/version.rb @@ -1,7 +1,7 @@ # encoding: utf-8 module AMQP - module Failover + class Failover VERSION = "0.0.1" end end diff --git a/lib/amqp/failover_client.rb b/lib/amqp/failover_client.rb index f69a6a3..0f06f9a 100644 --- a/lib/amqp/failover_client.rb +++ b/lib/amqp/failover_client.rb @@ -4,25 +4,24 @@ module AMQP module FailoverClient include AMQP::BasicClient - attr_accessor :on_disconnect + attr_accessor :failover + attr_reader :fallback_monitor + attr_accessor :settings + attr_accessor :on_disconnect def self.extended(base) - base.on_disconnect = proc { - Failover::OnDisconnect.new(base).call - } + if (base.failover = base.settings.delete(:failover)) + base.on_disconnect = base.method(:failover_leap) + end end def logger - @logger ||= Failover::Logger.new - end - - def failover_conf - @failover_conf ||= Failover::Config.new + Failover.logger end def configs - failover_conf.configs + @failover.configs if @failover end def clean_exit(msg = nil) @@ -32,20 +31,50 @@ module AMQP Process.exit end - def process_frame(frame) - if mq = channels[frame.channel] - mq.process_frame(frame) - return + def failover_leap + if (new_settings = @failover.from(@settings)) + log_message = "Could not connect to or lost connection to server #{@settings[:host]}:#{@settings[:port]}. " + + "Attempting connection to: #{new_settings[:host]}:#{new_settings[:port]}" + logger.error(log_message) + logger.info(log_message) + + fallback(@failover.primary, @failover.fallback_interval) if @failover.primary == @settings + @settings = new_settings + reconnect + else + raise Error, "Could not connect to server #{@settings[:host]}:#{@settings[:port]}" end - - if frame.is_a?(AMQP::Frame::Method) && (method = frame.payload).is_a?(AMQP::Protocol::Connection::Close) - if method.reply_text =~ /^NOT_ALLOWED/ - raise AMQP::Error, "#{method.reply_text} in #{::AMQP::Protocol.classes[method.class_id].methods[method.method_id]}" - end - end - super(frame) end + def fallback(conf = {}, retry_interval = nil) + @fallback_monitor = Failover::ServerDiscovery.monitor(conf, retry_interval) do + fallback_callback.call(conf, retry_interval) + end + end + + def fallback_callback + @fallback_callback ||= proc { |conf, retry_interval| + clean_exit("Primary server (#{conf[:host]}:#{conf[:port]}) is back. " + + "Performing clean exit to be relaunched with primary config.") + } + end + attr_writer :fallback_callback + + #TODO: Figure out why I originally needed this + # def process_frame(frame) + # if mq = channels[frame.channel] + # mq.process_frame(frame) + # return + # end + # + # if frame.is_a?(AMQP::Frame::Method) && (method = frame.payload).is_a?(AMQP::Protocol::Connection::Close) + # if method.reply_text =~ /^NOT_ALLOWED/ + # raise AMQP::Error, "#{method.reply_text} in #{::AMQP::Protocol.classes[method.class_id].methods[method.method_id]}" + # end + # end + # super(frame) + # end + end # FailoverClient end # AMQP diff --git a/spec/integration/simple_spec.rb b/spec/integration/simple_spec.rb index cfe5968..d87d425 100644 --- a/spec/integration/simple_spec.rb +++ b/spec/integration/simple_spec.rb @@ -5,7 +5,7 @@ require 'spec_helper' require 'amqp/server' require 'server_helper' -describe "Simple AMQP connection with FailoverClient" do +describe "Simple AMQP connection with FailoverClient loaded" do before(:all) do @log = ServerHelper.log @@ -28,4 +28,21 @@ describe "Simple AMQP connection with FailoverClient" do } end + it "should connect and get disconnected" do + lambda { + EM.run { + spid = start_server + conn = AMQP.connect(:host => 'localhost', :port => 15672) + EM.add_timer(0.1) { + conn.should be_connected + stop_server(spid) + EM.add_timer(0.1) { + conn.should_not be_connected + EM.stop + } + } + } + }.should raise_error(AMQP::Error, "Could not connect to server localhost:15672") + end + end diff --git a/spec/server_helper.rb b/spec/server_helper.rb index 4ad6ed1..ee08359 100644 --- a/spec/server_helper.rb +++ b/spec/server_helper.rb @@ -16,3 +16,21 @@ module ServerHelper end end + + +# +# Helper methods +# + +def start_server(port = 15762, timeout = 2) + bef_fork = EM.forks.clone + EM.fork { + EM.start_server('localhost', port, ServerHelper) + EM.add_timer(timeout) { EM.stop } + } + (EM.forks - bef_fork).first +end + +def stop_server(pid) + Process.kill('TERM', pid) +end \ No newline at end of file diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 0301a91..e1cc05d 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -16,3 +16,34 @@ require 'amqp/failover' require 'rspec' require 'rspec/autorun' + + +# +# Helper methods +# + +def wait_while(timeout = 10, retry_interval = 0.1, &block) + start = Time.now + while block.call + break if (Time.now - start).to_i >= timeout + sleep(retry_interval) + end +end + +# stolen from Pid::running? from daemons gem +def pid_running?(pid) + return false unless pid + + # Check if process is in existence + # The simplest way to do this is to send signal '0' + # (which is a single system call) that doesn't actually + # send a signal + begin + Process.kill(0, pid) + return true + rescue Errno::ESRCH + return false + rescue ::Exception # for example on EPERM (process exists but does not belong to us) + return true + end +end diff --git a/spec/unit/amqp/failover/config_spec.rb b/spec/unit/amqp/failover/config_spec.rb index f335e5e..2a1ae41 100644 --- a/spec/unit/amqp/failover/config_spec.rb +++ b/spec/unit/amqp/failover/config_spec.rb @@ -3,13 +3,17 @@ $LOAD_PATH.unshift File.expand_path(File.dirname(__FILE__)) require 'spec_helper' -describe AMQP::Failover::Config do +describe 'AMQP::Failover::Config' do + + before(:all) do + # @conf = AMQP::Failover::Config.new + end before(:each) do @conf = AMQP::Failover::Config.new - [:primary, :configs, :refs].each do |var| - @conf.instance_variable_set("@#{var}", nil) - end + # [:primary, :configs, :refs].each do |var| + # @conf.instance_variable_set("@#{var}", nil) + # end @raw_configs = [ {:host => 'rabbit3.local'}, {:host => 'rabbit2.local'}, @@ -19,9 +23,9 @@ describe AMQP::Failover::Config do end after(:each) do - [:primary, :configs, :refs].each do |var| - @conf.instance_variable_set("@#{var}", nil) - end + # [:primary, :configs, :refs].each do |var| + # @conf.instance_variable_set("@#{var}", nil) + # end end it "should set and get configs" do @@ -31,21 +35,24 @@ describe AMQP::Failover::Config do @conf.set(@raw_configs[0]) @conf.configs.should have(1).items @conf.get(0).should == @configs[0] + @conf[0].should == @configs[0] @conf.set(@raw_configs[1]) @conf.configs.should have(2).items @conf.get(1).should == @configs[1] + @conf[1].should == @configs[1] @conf.set(@raw_configs[1], :the_one) @conf.configs.should have(2).items @conf.get(1).should == @configs[1] - @conf.get(:the_one).should == @configs[1] + @conf[:the_one].should == @configs[1] @conf.load_array(@raw_configs) @conf.configs.should have(3).items @conf.get_primary.should == @configs[0] @conf.primary = 1 @conf.get_primary.should == @configs[1] + @conf[:primary].should == @configs[1] end it "should #find_next" do diff --git a/spec/unit/amqp/failover/server_discovery_spec.rb b/spec/unit/amqp/failover/server_discovery_spec.rb index 4fb5498..0b61ea7 100644 --- a/spec/unit/amqp/failover/server_discovery_spec.rb +++ b/spec/unit/amqp/failover/server_discovery_spec.rb @@ -4,7 +4,7 @@ $LOAD_PATH.unshift File.expand_path(File.dirname(__FILE__)) require 'spec_helper' require 'server_discovery_helper' -describe AMQP::Failover::ServerDiscovery do +describe 'AMQP::Failover::ServerDiscovery' do before(:each) do $called = [] From 5d4cf18d2db3f3d4093be94fe33bba6e484c3ff7 Mon Sep 17 00:00:00 2001 From: Jim Myhrberg Date: Mon, 31 Jan 2011 09:31:46 +0000 Subject: [PATCH 13/25] Failover::Configs specs pass --- lib/amqp/failover.rb | 15 ++------- lib/amqp/failover/config.rb | 8 +++-- lib/amqp/failover/configs.rb | 10 ++---- lib/amqp/failover_client.rb | 5 ++- .../{config_spec.rb => configs_spec.rb} | 31 ++++++------------- 5 files changed, 24 insertions(+), 45 deletions(-) rename spec/unit/amqp/failover/{config_spec.rb => configs_spec.rb} (61%) diff --git a/lib/amqp/failover.rb b/lib/amqp/failover.rb index df6f4b6..4de6b3d 100644 --- a/lib/amqp/failover.rb +++ b/lib/amqp/failover.rb @@ -20,7 +20,7 @@ module AMQP attr_accessor :fallback def initialize(confs = nil, opts = {}) - @configs = Configs.new(confs) + @configs = Failover::Configs.new(confs) @options = default_options.merge(opts) end @@ -32,7 +32,7 @@ module AMQP def default_options { :retry_timeout => 1, :selection => :sequential, #TODO: Impliment next server selection algorithm - :fallback => false, + :fallback => false, #TODO: Enable by default once a sane solution is found :fallback_interval => 10 } end @@ -56,17 +56,6 @@ module AMQP @configs ||= Config.new end - def configs=(confs = []) - @configs = nil - confs.each do |conf| - if conf.is_a?(Array) - add_config(conf[1], conf[0]) - else - add_config(conf) - end - end - end - def add_config(conf = {}, ref = nil) index = configs.index(conf) configs << Config::Failed.new(conf) if index.nil? diff --git a/lib/amqp/failover/config.rb b/lib/amqp/failover/config.rb index 784492c..dbaef58 100644 --- a/lib/amqp/failover/config.rb +++ b/lib/amqp/failover/config.rb @@ -7,10 +7,14 @@ module AMQP attr_accessor :last_fail def initialize(hash = {}, last_fail_date = nil) - self.replace(symbolize_keys(hash)) + self.replace(symbolize_keys(defaults.merge(hash))) self.last_fail = last_fail_date if last_fail_date end + def defaults + AMQP.settings + end + def symbolize_keys(hash = {}) hash.inject({}) do |result, (key, value)| result[key.is_a?(String) ? key.to_sym : key] = value @@ -30,7 +34,7 @@ module AMQP end return other.last_fail <=> self.last_fail end - return 0 + super(other) end end # Config diff --git a/lib/amqp/failover/configs.rb b/lib/amqp/failover/configs.rb index 251b458..4858b55 100644 --- a/lib/amqp/failover/configs.rb +++ b/lib/amqp/failover/configs.rb @@ -46,7 +46,7 @@ module AMQP end def set(conf = {}, ref = nil) - conf = Config.new(default_config.merge(conf)) + conf = Failover::Config.new(conf) if !conf.is_a?(Failover::Config) self << conf if (index = self.index(conf)).nil? if ref refs[ref] = (index || self.index(conf)) @@ -75,7 +75,7 @@ module AMQP end end - def self.load_array(confs = []) + def load_array(confs = []) self.clear confs.each do |conf| conf = AMQP::Client.parse_amqp_url(conf) if conf.is_a?(::String) @@ -84,11 +84,7 @@ module AMQP end def load_hash(conf = {}) - set(Config.new(conf)) - end - - def default_config - AMQP.settings + set(conf) end end # Config diff --git a/lib/amqp/failover_client.rb b/lib/amqp/failover_client.rb index 0f06f9a..fd6c048 100644 --- a/lib/amqp/failover_client.rb +++ b/lib/amqp/failover_client.rb @@ -38,7 +38,9 @@ module AMQP logger.error(log_message) logger.info(log_message) - fallback(@failover.primary, @failover.fallback_interval) if @failover.primary == @settings + if @failover.options[:fallback] && @failover.primary == @settings + fallback(@failover.primary, @failover.fallback_interval) + end @settings = new_settings reconnect else @@ -53,6 +55,7 @@ module AMQP end def fallback_callback + #TODO: Figure out a way to artificially trigger EM to disconnect on fallback without channels being closed. @fallback_callback ||= proc { |conf, retry_interval| clean_exit("Primary server (#{conf[:host]}:#{conf[:port]}) is back. " + "Performing clean exit to be relaunched with primary config.") diff --git a/spec/unit/amqp/failover/config_spec.rb b/spec/unit/amqp/failover/configs_spec.rb similarity index 61% rename from spec/unit/amqp/failover/config_spec.rb rename to spec/unit/amqp/failover/configs_spec.rb index 2a1ae41..f9a79f6 100644 --- a/spec/unit/amqp/failover/config_spec.rb +++ b/spec/unit/amqp/failover/configs_spec.rb @@ -3,52 +3,39 @@ $LOAD_PATH.unshift File.expand_path(File.dirname(__FILE__)) require 'spec_helper' -describe 'AMQP::Failover::Config' do - - before(:all) do - # @conf = AMQP::Failover::Config.new - end +describe 'AMQP::Failover::Configs' do before(:each) do - @conf = AMQP::Failover::Config.new - # [:primary, :configs, :refs].each do |var| - # @conf.instance_variable_set("@#{var}", nil) - # end + @conf = AMQP::Failover::Configs.new @raw_configs = [ {:host => 'rabbit3.local'}, {:host => 'rabbit2.local'}, {:host => 'rabbit2.local', :port => 5673} ] - @configs = @raw_configs.map { |conf| @conf.default_config.merge(conf) } - end - - after(:each) do - # [:primary, :configs, :refs].each do |var| - # @conf.instance_variable_set("@#{var}", nil) - # end + @configs = @raw_configs.map { |conf| AMQP.settings.merge(conf) } end it "should set and get configs" do @conf.primary.should == 0 - @conf.configs.should have(0).items + @conf.should have(0).items @conf.set(@raw_configs[0]) - @conf.configs.should have(1).items + @conf.should have(1).items @conf.get(0).should == @configs[0] @conf[0].should == @configs[0] @conf.set(@raw_configs[1]) - @conf.configs.should have(2).items + @conf.should have(2).items @conf.get(1).should == @configs[1] @conf[1].should == @configs[1] @conf.set(@raw_configs[1], :the_one) - @conf.configs.should have(2).items + @conf.should have(2).items @conf.get(1).should == @configs[1] @conf[:the_one].should == @configs[1] @conf.load_array(@raw_configs) - @conf.configs.should have(3).items + @conf.should have(3).items @conf.get_primary.should == @configs[0] @conf.primary = 1 @conf.get_primary.should == @configs[1] @@ -57,7 +44,7 @@ describe 'AMQP::Failover::Config' do it "should #find_next" do @conf.load(@raw_configs) - @conf.configs.should have(3).items + @conf.should have(3).items @conf.find_next(@configs[0]).should == @configs[1] @conf.find_next(@configs[1]).should == @configs[2] @conf.find_next(@configs[2]).should == @configs[0] From 3918b5bba6070f08d409959460f63c62e23306e7 Mon Sep 17 00:00:00 2001 From: Jim Myhrberg Date: Mon, 31 Jan 2011 09:37:42 +0000 Subject: [PATCH 14/25] Failover::ServerDiscovery spec passes again --- spec/unit/amqp/failover/server_discovery_spec.rb | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/spec/unit/amqp/failover/server_discovery_spec.rb b/spec/unit/amqp/failover/server_discovery_spec.rb index 0b61ea7..b829163 100644 --- a/spec/unit/amqp/failover/server_discovery_spec.rb +++ b/spec/unit/amqp/failover/server_discovery_spec.rb @@ -9,13 +9,14 @@ describe 'AMQP::Failover::ServerDiscovery' do before(:each) do $called = [] $start_count = 0 - @args = { :host => 'localhost', :port => 9999, :retry_interval => 0.01 } + @args = { :host => 'localhost', :port => 9999 } + @retry_interval = 0.01 end it "should initialize" do EM.run { EM.start_server('127.0.0.1', 9999) - @mon = ServerDiscoveryHelper.monitor(@args) do + @mon = ServerDiscoveryHelper.monitor(@args, @retry_interval) do $called << :done_block EM.stop_event_loop end @@ -32,7 +33,7 @@ describe 'AMQP::Failover::ServerDiscovery' do it "should retry on error" do EM.run { - @mon = ServerDiscoveryHelper.monitor(@args) do + @mon = ServerDiscoveryHelper.monitor(@args, @retry_interval) do $called << :done_block EM.stop_event_loop end From c6100fee331aaaeda2aae3e94ef3cb7873dcf408 Mon Sep 17 00:00:00 2001 From: Jim Myhrberg Date: Mon, 31 Jan 2011 18:34:23 +0000 Subject: [PATCH 15/25] renamed Failover::Configs to Failover::Configurations --- lib/amqp/failover.rb | 6 +- .../{configs.rb => configurations.rb} | 57 +++++++++++-------- 2 files changed, 36 insertions(+), 27 deletions(-) rename lib/amqp/failover/{configs.rb => configurations.rb} (64%) diff --git a/lib/amqp/failover.rb b/lib/amqp/failover.rb index 4de6b3d..9bd0838 100644 --- a/lib/amqp/failover.rb +++ b/lib/amqp/failover.rb @@ -4,7 +4,7 @@ require 'yaml' require 'amqp/failover_client' require 'amqp/failover/config' -require 'amqp/failover/configs' +require 'amqp/failover/configurations' require 'amqp/failover/logger' require 'amqp/failover/server_discovery' require 'amqp/failover/version' @@ -20,7 +20,7 @@ module AMQP attr_accessor :fallback def initialize(confs = nil, opts = {}) - @configs = Failover::Configs.new(confs) + @configs = Failover::Configurations.new(confs) @options = default_options.merge(opts) end @@ -53,7 +53,7 @@ module AMQP end def configs - @configs ||= Config.new + @configs ||= Configurations.new end def add_config(conf = {}, ref = nil) diff --git a/lib/amqp/failover/configs.rb b/lib/amqp/failover/configurations.rb similarity index 64% rename from lib/amqp/failover/configs.rb rename to lib/amqp/failover/configurations.rb index 4858b55..8c87b1f 100644 --- a/lib/amqp/failover/configs.rb +++ b/lib/amqp/failover/configurations.rb @@ -2,42 +2,47 @@ module AMQP class Failover - class Configs < Array + class Configurations < Array def initialize(confs = nil) load(confs) end def [](*args) - return super(*args) if args[0].is_a?(Fixnum) - return get_primary if args[0] == :primary - get(args[0]) + if args[0].is_a?(Symbol) + return primary if args[0] == :primary + get(args[0]) + else + super(*args) + end end def []=(*args) - return super(*args) if args[0].is_a?(Fixnum) - return set_primary(args.last, args[0]) if args[0] == :primary - set(args.last, args[0]) + if args[0].is_a?(Symbol) + return primary = args.last if args[0] == :primary + set(args.last, args[0]) + end + super(*args) end def refs @refs ||= {} end + def primary_ref + @primary_ref ||= 0 + end + + def primary_ref=(ref) + @primary_ref = ref + end + def primary - @primary ||= 0 + get(primary_ref) || AMQP.settings end - def primary=(ref) - @primary = ref - end - - def get_primary - get(primary) || default_config - end - - def set_primary(conf = {}) - set(conf, primary) + def primary=(conf = {}) + set(conf, primary_ref) end def get(ref = nil) @@ -47,10 +52,13 @@ module AMQP def set(conf = {}, ref = nil) conf = Failover::Config.new(conf) if !conf.is_a?(Failover::Config) - self << conf if (index = self.index(conf)).nil? - if ref - refs[ref] = (index || self.index(conf)) + if (index = self.index(conf)).nil? + self << conf + else + conf = self[index] end + refs[ref] = (index || self.index(conf)) if ref + conf end def find_next(conf = {}) @@ -68,17 +76,18 @@ module AMQP end def load(conf) - if conf.is_a?(::Array) + if conf.is_a?(Array) load_array(conf) - elsif conf.is_a?(::Hash) + elsif conf.is_a?(Hash) load_hash(conf) end end def load_array(confs = []) self.clear + refs = {} confs.each do |conf| - conf = AMQP::Client.parse_amqp_url(conf) if conf.is_a?(::String) + conf = AMQP::Client.parse_amqp_url(conf) if conf.is_a?(String) load_hash(conf) end end From 5d3aa69fa0e896776d2df4686e3bafc806817495 Mon Sep 17 00:00:00 2001 From: Jim Myhrberg Date: Tue, 1 Feb 2011 09:44:27 +0000 Subject: [PATCH 16/25] lots of updates, and all specs PASS!! ^_^ --- amqp-failover.gemspec | 1 + lib/amqp/failover.rb | 12 +-- lib/amqp/failover/config.rb | 2 +- lib/amqp/failover/ext/amqp/client.rb | 23 +++-- lib/amqp/failover_client.rb | 36 ++++---- spec/integration/a_simple_spec.rb | 59 +++++++++++++ spec/integration/full_failover_spec.rb | 57 +++++++++++++ spec/integration/simple_spec.rb | 48 ----------- spec/server_helper.rb | 84 ++++++++++++++----- spec/unit/amqp/failover/config_spec.rb | 67 +++++++++++++++ ...configs_spec.rb => configurations_spec.rb} | 41 +++++++-- .../amqp/failover/server_discovery_spec.rb | 5 ++ spec/unit/amqp/failover_spec.rb | 69 +++++++++++++++ 13 files changed, 397 insertions(+), 107 deletions(-) create mode 100644 spec/integration/a_simple_spec.rb create mode 100644 spec/integration/full_failover_spec.rb delete mode 100644 spec/integration/simple_spec.rb create mode 100644 spec/unit/amqp/failover/config_spec.rb rename spec/unit/amqp/failover/{configs_spec.rb => configurations_spec.rb} (54%) create mode 100644 spec/unit/amqp/failover_spec.rb diff --git a/amqp-failover.gemspec b/amqp-failover.gemspec index 8814860..fbaea7d 100644 --- a/amqp-failover.gemspec +++ b/amqp-failover.gemspec @@ -25,5 +25,6 @@ Gem::Specification.new do |s| s.add_development_dependency 'rack-test', '>= 0.5.6' s.add_development_dependency 'rspec', '>= 2.1.0' s.add_development_dependency 'yard', '>= 0.6.3' + s.add_development_dependency 'json', '>= 1.5.0' s.add_development_dependency 'ruby-debug' end diff --git a/lib/amqp/failover.rb b/lib/amqp/failover.rb index 9bd0838..1a7b53d 100644 --- a/lib/amqp/failover.rb +++ b/lib/amqp/failover.rb @@ -32,7 +32,7 @@ module AMQP def default_options { :retry_timeout => 1, :selection => :sequential, #TODO: Impliment next server selection algorithm - :fallback => false, #TODO: Enable by default once a sane solution is found + :fallback => false, #TODO: Enable by default once a sane implimentation is figured out :fallback_interval => 10 } end @@ -58,7 +58,7 @@ module AMQP def add_config(conf = {}, ref = nil) index = configs.index(conf) - configs << Config::Failed.new(conf) if index.nil? + configs.set(conf) if index.nil? refs[ref] = (index || configs.index(conf)) if !ref.nil? end @@ -70,12 +70,12 @@ module AMQP def failed_with(conf = {}, ref = nil, time = nil) time ||= Time.now - if index = configs.index(conf) + if !(index = configs.index(conf)).nil? configs[index].last_fail = time @latest_failed = configs[index] else - configs << Config::Failed.new(conf, time) - @latest_failed = configs.last + @latest_failed = configs.set(conf) + configs.last.last_fail = time end refs[ref] = (index || configs.index(conf)) if !ref.nil? end @@ -87,7 +87,7 @@ module AMQP index = configs.index(after) available = (index > 0) ? configs[index+1..-1] + configs[0..index-1] : configs[1..-1] available.each do |conf| - return conf if conf.last_fail.nil? || (conf.last_fail + retry_timeout.seconds) < Time.now + return conf if conf.last_fail.nil? || (conf.last_fail.to_i + retry_timeout) < Time.now.to_i end return nil end diff --git a/lib/amqp/failover/config.rb b/lib/amqp/failover/config.rb index dbaef58..c8a5fb3 100644 --- a/lib/amqp/failover/config.rb +++ b/lib/amqp/failover/config.rb @@ -34,7 +34,7 @@ module AMQP end return other.last_fail <=> self.last_fail end - super(other) + return 0 end end # Config diff --git a/lib/amqp/failover/ext/amqp/client.rb b/lib/amqp/failover/ext/amqp/client.rb index 4e2b4f2..e3a0092 100644 --- a/lib/amqp/failover/ext/amqp/client.rb +++ b/lib/amqp/failover/ext/amqp/client.rb @@ -1,9 +1,11 @@ # encoding: utf-8 +AMQP.client = AMQP::FailoverClient + module AMQP module Client + class << self - alias :connect_without_failover :connect # Connect with Failover supports specifying multiple AMQP servers and configurations. # @@ -20,13 +22,15 @@ module AMQP # # Available failover options are: # - :retry_timeout, time to wait before retrying a specific AMQP config after failure. - # - :fallback, monitor for original server's return and fallback to it if so. + # - :fallback, check for the return of the primary server, and fallback to it if and when it returns. # - :fallback_interval, seconds between each check for original server if :fallback is true. + # - :selection, not yet implimented. # def connect_with_failover(opts = nil) opts = parse_amqp_url_or_opts(opts) connect_without_failover(opts) end + alias :connect_without_failover :connect alias :connect :connect_with_failover def parse_amqp_url_or_opts(opts = nil) @@ -35,7 +39,7 @@ module AMQP elsif opts.is_a?(Array) opts = init_failover(opts) elsif opts.is_a?(Hash) && opts[:hosts].is_a?(Array) - confs = opts.delete[:hosts] + confs = opts.delete(:hosts) opts = init_failover(confs, opts) end opts @@ -43,10 +47,19 @@ module AMQP def init_failover(confs = nil, opts = {}) if !confs.nil? && confs.size > 0 - failover.primary.merge({ :failover => Failover.new(confs, opts) }) + failover = Failover.new(confs, opts) + failover.primary.merge({ :failover => failover }) end end - + end # << self + + def disconnected_with_failover + return failover_switch if @failover + disconnected_without_failover + end + alias :disconnected_without_failover :disconnected + alias :disconnected :disconnected_with_failover + end # Client end # AMQP \ No newline at end of file diff --git a/lib/amqp/failover_client.rb b/lib/amqp/failover_client.rb index fd6c048..c08b371 100644 --- a/lib/amqp/failover_client.rb +++ b/lib/amqp/failover_client.rb @@ -12,7 +12,24 @@ module AMQP def self.extended(base) if (base.failover = base.settings.delete(:failover)) - base.on_disconnect = base.method(:failover_leap) + base.on_disconnect = base.method(:disconnected) + end + end + + def failover_switch + if (new_settings = @failover.from(@settings)) + log_message = "Could not connect to or lost connection to server #{@settings[:host]}:#{@settings[:port]}. " + + "Attempting connection to: #{new_settings[:host]}:#{new_settings[:port]}" + logger.error(log_message) + logger.info(log_message) + + if @failover.options[:fallback] && @failover.primary == @settings + fallback(@failover.primary, @failover.fallback_interval) + end + @settings = new_settings + reconnect + else + raise Error, "Could not connect to server #{@settings[:host]}:#{@settings[:port]}" end end @@ -31,23 +48,6 @@ module AMQP Process.exit end - def failover_leap - if (new_settings = @failover.from(@settings)) - log_message = "Could not connect to or lost connection to server #{@settings[:host]}:#{@settings[:port]}. " + - "Attempting connection to: #{new_settings[:host]}:#{new_settings[:port]}" - logger.error(log_message) - logger.info(log_message) - - if @failover.options[:fallback] && @failover.primary == @settings - fallback(@failover.primary, @failover.fallback_interval) - end - @settings = new_settings - reconnect - else - raise Error, "Could not connect to server #{@settings[:host]}:#{@settings[:port]}" - end - end - def fallback(conf = {}, retry_interval = nil) @fallback_monitor = Failover::ServerDiscovery.monitor(conf, retry_interval) do fallback_callback.call(conf, retry_interval) diff --git a/spec/integration/a_simple_spec.rb b/spec/integration/a_simple_spec.rb new file mode 100644 index 0000000..541d14e --- /dev/null +++ b/spec/integration/a_simple_spec.rb @@ -0,0 +1,59 @@ +# encoding: utf-8 +$LOAD_PATH.unshift File.expand_path(File.dirname(__FILE__)) + +require 'spec_helper' +require 'mq' +require 'amqp' +require 'amqp/server' +require 'server_helper' + +describe "A simple AMQP connection with FailoverClient loaded" do + + after(:all) do + ServerHelper.clear_logs + end + + it "should be using FailoverClient" do + AMQP.client.should == AMQP::FailoverClient + end + + it "should be able to connect" do + EM.run { + port = 15672 + timeout = 2 + serv = start_server(port) + EM.add_timer(1.5) { + conn = AMQP.connect(:host => 'localhost', :port => 15672) + EM.add_timer(0.1) { + conn.should be_connected + serv.stop + log = serv.log + log.size.should == 3 + (0..2).each { |i| log[i]['method'].should == "send" } + log[0]['class'].should == 'AMQP::Protocol::Connection::Start' + log[1]['class'].should == 'AMQP::Protocol::Connection::Tune' + log[2]['class'].should == 'AMQP::Protocol::Connection::OpenOk' + EM.stop + } + } + } + end + + it "should be able to connect and get disconnected" do + EM.run { + serv = start_server(25672) + EM.add_timer(0.1) { + conn = AMQP.connect(:host => 'localhost', :port => 25672) + EM.add_timer(0.1) { + conn.should be_connected + serv.stop + EM.add_timer(0.1) { + conn.should_not be_connected + EM.stop + } + } + } + } + end + +end diff --git a/spec/integration/full_failover_spec.rb b/spec/integration/full_failover_spec.rb new file mode 100644 index 0000000..2b96936 --- /dev/null +++ b/spec/integration/full_failover_spec.rb @@ -0,0 +1,57 @@ +# encoding: utf-8 +$LOAD_PATH.unshift File.expand_path(File.dirname(__FILE__)) + +require 'spec_helper' +require 'amqp/server' +require 'server_helper' + +describe "Full Failover support of AMQP gem" do + + after(:all) do + ServerHelper.clear_logs + end + + it "should be able to connect" do + EM.run { + serv = start_server(15672) + EM.add_timer(0.1) { + conn = AMQP.connect(:host => 'localhost', :port => 15672) + conn.failover.should be_nil + EM.add_timer(0.1) { + conn.should be_connected + EM.stop + } + } + } + end + + it "should be able to connect and failover" do + EM.run { + serv1 = start_server(25672) + serv2 = start_server(35672) + EM.add_timer(0.1) { + conn = AMQP.connect({:hosts => [{:port => 25672}, {:port => 35672}]}) + conn.failover.primary[:port].should == 25672 + conn.settings[:port].should == 25672 + conn.settings.should == conn.failover.primary + EM.add_timer(0.1) { + conn.should be_connected + serv1.log.should have(3).items + serv2.log.should have(0).items + serv1.stop + EM.add_timer(0.1) { + conn.should be_connected + conn.settings[:port].should == 35672 + serv1.log.should have(3).items + serv2.log.should have(3).items + EM.add_timer(0.1) { + serv2.stop + EM.stop + } + } + } + } + } + end + +end diff --git a/spec/integration/simple_spec.rb b/spec/integration/simple_spec.rb deleted file mode 100644 index d87d425..0000000 --- a/spec/integration/simple_spec.rb +++ /dev/null @@ -1,48 +0,0 @@ -# encoding: utf-8 -$LOAD_PATH.unshift File.expand_path(File.dirname(__FILE__)) - -require 'spec_helper' -require 'amqp/server' -require 'server_helper' - -describe "Simple AMQP connection with FailoverClient loaded" do - - before(:all) do - @log = ServerHelper.log - AMQP.client = AMQP::FailoverClient - end - - it "should be connected" do - EM.run { - sig = EM.start_server('localhost', 15672, ServerHelper) - conn = AMQP.connect(:host => 'localhost', :port => 15672) - EM.add_timer(0.1) { - conn.should be_connected - @log.size.should == 3 - (0..2).each { |i| @log[i][0].should == "send" } - @log[0][1].payload.should be_a(AMQP::Protocol::Connection::Start) - @log[1][1].payload.should be_a(AMQP::Protocol::Connection::Tune) - @log[2][1].payload.should be_a(AMQP::Protocol::Connection::OpenOk) - EM.stop - } - } - end - - it "should connect and get disconnected" do - lambda { - EM.run { - spid = start_server - conn = AMQP.connect(:host => 'localhost', :port => 15672) - EM.add_timer(0.1) { - conn.should be_connected - stop_server(spid) - EM.add_timer(0.1) { - conn.should_not be_connected - EM.stop - } - } - } - }.should raise_error(AMQP::Error, "Could not connect to server localhost:15672") - end - -end diff --git a/spec/server_helper.rb b/spec/server_helper.rb index ee08359..babb28d 100644 --- a/spec/server_helper.rb +++ b/spec/server_helper.rb @@ -1,36 +1,78 @@ # encoding: utf-8 -module ServerHelper - include AMQP::Server +require 'rubygems' +require 'mq' +require 'amqp' +require 'amqp/server' +require 'json' + +class ServerHelper - class << self - def log - @@log ||= [] - end - attr_writer :log + attr_accessor :stdin + attr_accessor :stdout + attr_accessor :stderr + attr_accessor :pid + + + def initialize(port = nil, timeout = nil) + @port = port + @timout = timeout + File.open(log_file, 'w') {} + @pid = start(port, timeout) end - # log & silence STDOUT output - def log(*args) - @@log << args + def self.clear_logs + Dir.glob(File.expand_path('server_helper*.log', File.dirname(__FILE__))).each do |file| + File.delete(file) + end + end + + def start(port = nil, timeout = nil) + port ||= 15672 + timeout ||= 2 + EM.fork_reactor { + $PORT = port + EM.start_server('localhost', port, AmqpServer) + EM.add_timer(timeout) { EM.stop } + } + end + + def stop + Process.kill('TERM', @pid) + end + + def kill + Process.kill('KILL', @pid) + end + + def log + File.open(log_file).to_a.map{ |l| JSON.parse(l) } + end + + def log_file + File.expand_path("server_helper-port#{@port}.log", File.dirname(__FILE__)) end end +module AmqpServer + include AMQP::Server + + # customize log output + def log(*args) + # puts "\n>>>>>> Process.pid / $PORT: " + Process.pid.inspect + " / #{$PORT}\n" + args = {:method => args[0], :class => args[1].payload.class, :pid => Process.pid} + filename = File.expand_path("server_helper-port#{$PORT}.log", File.dirname(__FILE__)) + File.open(filename, 'a') do |f| + f.write("#{args.to_json}\n") + end + end +end # # Helper methods # -def start_server(port = 15762, timeout = 2) - bef_fork = EM.forks.clone - EM.fork { - EM.start_server('localhost', port, ServerHelper) - EM.add_timer(timeout) { EM.stop } - } - (EM.forks - bef_fork).first +def start_server(port = nil, timeout = nil) + ServerHelper.new(port, timeout) end - -def stop_server(pid) - Process.kill('TERM', pid) -end \ No newline at end of file diff --git a/spec/unit/amqp/failover/config_spec.rb b/spec/unit/amqp/failover/config_spec.rb new file mode 100644 index 0000000..179a1e9 --- /dev/null +++ b/spec/unit/amqp/failover/config_spec.rb @@ -0,0 +1,67 @@ +# encoding: utf-8 +$LOAD_PATH.unshift File.expand_path(File.dirname(__FILE__)) + +require 'spec_helper' + +describe 'AMQP::Failover::Config' do + + before(:each) do + configs = [ + {:host => 'rabbit0.local'}, + {:host => 'rabbit1.local'}, + {:host => 'rabbit2.local', :port => 5673} + ] + @configs = configs.map { |conf| AMQP.settings.merge(conf) } + @fail = AMQP::Failover.new(@configs) + end + + it "should initialize" do + fail = AMQP::Failover::Config.new(@configs[0]) + fail.should == @configs[0] + fail.last_fail.should be_nil + + now = Time.now + fail = AMQP::Failover::Config.new(@configs[1], now) + fail.should == @configs[1] + fail.last_fail.should == now + end + + it "should order properly with #<=>" do + one_hour_ago = (Time.now - 3600) + two_hours_ago = (Time.now - 7200) + + fail = [ AMQP::Failover::Config.new(@configs[0]), + AMQP::Failover::Config.new(@configs[1], one_hour_ago), + AMQP::Failover::Config.new(@configs[2], two_hours_ago) ] + + (fail[1] <=> fail[0]).should == -1 + (fail[0] <=> fail[0]).should == 0 + (fail[0] <=> fail[1]).should == 1 + + (fail[1] <=> fail[2]).should == -1 + (fail[1] <=> fail[1]).should == 0 + (fail[2] <=> fail[1]).should == 1 + + fail.sort[0].last_fail.should == one_hour_ago + fail.sort[1].last_fail.should == two_hours_ago + fail.sort[2].last_fail.should == nil + end + + it "should be ordered by last_fail" do + result = [ AMQP::Failover::Config.new(@configs[1], (Time.now - 60)), + AMQP::Failover::Config.new(@configs[2], (Time.now - (60*25))), + AMQP::Failover::Config.new(@configs[0], (Time.now - 3600)) ] + + origin = [ AMQP::Failover::Config.new(@configs[0], (Time.now - 3600)), + AMQP::Failover::Config.new(@configs[1], (Time.now - 60)), + AMQP::Failover::Config.new(@configs[2], (Time.now - (60*25))) ] + origin.sort.should == result + + origin = [ AMQP::Failover::Config.new(@configs[0]), + AMQP::Failover::Config.new(@configs[1], (Time.now - 60)), + AMQP::Failover::Config.new(@configs[2], (Time.now - (60*25))) ] + origin.sort.should == result + end + +end + diff --git a/spec/unit/amqp/failover/configs_spec.rb b/spec/unit/amqp/failover/configurations_spec.rb similarity index 54% rename from spec/unit/amqp/failover/configs_spec.rb rename to spec/unit/amqp/failover/configurations_spec.rb index f9a79f6..2808815 100644 --- a/spec/unit/amqp/failover/configs_spec.rb +++ b/spec/unit/amqp/failover/configurations_spec.rb @@ -3,20 +3,28 @@ $LOAD_PATH.unshift File.expand_path(File.dirname(__FILE__)) require 'spec_helper' -describe 'AMQP::Failover::Configs' do +describe 'AMQP::Failover::Configurations' do before(:each) do - @conf = AMQP::Failover::Configs.new + @conf = AMQP::Failover::Configurations.new @raw_configs = [ - {:host => 'rabbit3.local'}, - {:host => 'rabbit2.local'}, + {:host => 'rabbit0.local'}, + {:host => 'rabbit1.local'}, {:host => 'rabbit2.local', :port => 5673} ] @configs = @raw_configs.map { |conf| AMQP.settings.merge(conf) } end + it "should initialize" do + confs = AMQP::Failover::Configurations.new(@raw_configs) + confs.each_with_index do |conf, i| + conf.should be_a(AMQP::Failover::Config) + conf.should == @configs[i] + end + end + it "should set and get configs" do - @conf.primary.should == 0 + @conf.primary_ref.should == 0 @conf.should have(0).items @conf.set(@raw_configs[0]) @@ -29,6 +37,7 @@ describe 'AMQP::Failover::Configs' do @conf.get(1).should == @configs[1] @conf[1].should == @configs[1] + # should just create a ref, as config exists @conf.set(@raw_configs[1], :the_one) @conf.should have(2).items @conf.get(1).should == @configs[1] @@ -36,9 +45,9 @@ describe 'AMQP::Failover::Configs' do @conf.load_array(@raw_configs) @conf.should have(3).items - @conf.get_primary.should == @configs[0] - @conf.primary = 1 - @conf.get_primary.should == @configs[1] + @conf.primary.should == @configs[0] + @conf.primary_ref = 1 + @conf.primary.should == @configs[1] @conf[:primary].should == @configs[1] end @@ -50,5 +59,21 @@ describe 'AMQP::Failover::Configs' do @conf.find_next(@configs[2]).should == @configs[0] end + it "should #load_hash" do + @conf.should have(0).items + @conf.load_hash(@raw_configs[0]) + @conf.should have(1).items + @conf.primary.should == @configs[0] + end + + it "should #load_array" do + @conf.load_hash(:host => 'rabbid-rabbit') + @conf.should have(1).items + @conf.load_array(@raw_configs) + @conf.should have(3).items + @conf.should == @configs + @conf.primary.should == @configs[0] + end + end diff --git a/spec/unit/amqp/failover/server_discovery_spec.rb b/spec/unit/amqp/failover/server_discovery_spec.rb index b829163..68afdd8 100644 --- a/spec/unit/amqp/failover/server_discovery_spec.rb +++ b/spec/unit/amqp/failover/server_discovery_spec.rb @@ -13,6 +13,11 @@ describe 'AMQP::Failover::ServerDiscovery' do @retry_interval = 0.01 end + after(:all) do + $called = nil + $start_count = nil + end + it "should initialize" do EM.run { EM.start_server('127.0.0.1', 9999) diff --git a/spec/unit/amqp/failover_spec.rb b/spec/unit/amqp/failover_spec.rb new file mode 100644 index 0000000..b088b72 --- /dev/null +++ b/spec/unit/amqp/failover_spec.rb @@ -0,0 +1,69 @@ +# encoding: utf-8 +$LOAD_PATH.unshift File.expand_path(File.dirname(__FILE__)) + +require 'spec_helper' + +describe 'AMQP::Failover' do + + before(:each) do + configs = [ + {:host => 'rabbit0.local'}, + {:host => 'rabbit1.local'}, + {:host => 'rabbit2.local', :port => 5673} + ] + @configs = configs.map { |conf| AMQP.settings.merge(conf) } + @fail = AMQP::Failover.new(@configs) + end + + it "should initialize" do + @fail.configs.should == @configs + end + + it "should #add_config" do + @fail.instance_variable_set("@configs", nil) + @fail.configs.should == [] + @fail.add_config(@configs[0]) + @fail.configs.should have(1).item + @fail.configs.should == [@configs[0]] + @fail.refs.should == {} + @fail.add_config(@configs[1], :hello) + @fail.configs.should have(2).items + @fail.configs.should include(@configs[1]) + @fail.get_by_ref(:hello).should == @configs[1] + end + + it "should #get_by_conf" do + fetched = @fail.get_by_conf(@configs[1]) + fetched.should == @configs[1] + fetched.class.should == AMQP::Failover::Config + fetched.last_fail.should be_nil + end + + it "should #fail_with" do + fail = AMQP::Failover.new + now = Time.now + fail.failed_with(@configs[0], 0, now) + fail.latest_failed.should == @configs[0] + fail.last_fail_of(@configs[0]).should == now + fail.last_fail_of(0).should == now + end + + it "should find #next_config" do + @fail.failed_with(@configs[1]) + @fail.next_config.should == @configs[2] + @fail.next_config.should == @configs[2] + @fail.failed_with(@configs[2]) + @fail.next_config.should == @configs[0] + @fail.failed_with(@configs[0]) + @fail.next_config.should be_nil + end + + it "should #failover_from" do + now = Time.now + @fail.failover_from(@configs[0], now).should == @configs[1] + @fail.latest_failed.should == @configs[0] + @fail.latest_failed.last_fail.should == now + end + +end + From cdb16348652181953af18d20654fbd4a43b7ac74 Mon Sep 17 00:00:00 2001 From: Jim Myhrberg Date: Tue, 1 Feb 2011 09:45:59 +0000 Subject: [PATCH 17/25] added .document config, even though the one comment with usable info renders incorrectly with yard, for now at least :P --- .document | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 .document diff --git a/.document b/.document new file mode 100644 index 0000000..412e5f1 --- /dev/null +++ b/.document @@ -0,0 +1,6 @@ +README.md +lib/**/*.rb +bin/* +features/**/*.feature +LICENSE + From ec9810ea622626831bd9e8d2104c89c719421d76 Mon Sep 17 00:00:00 2001 From: Jim Myhrberg Date: Tue, 1 Feb 2011 10:04:51 +0000 Subject: [PATCH 18/25] renamed integration specs, and updated failover and specs a bit --- lib/amqp/failover.rb | 9 ++++++--- .../{a_simple_spec.rb => basic_spec.rb} | 2 +- ...{full_failover_spec.rb => failover_spec.rb} | 12 ++++++++++++ spec/logger_helper.rb | 18 ++++++++++++++++++ 4 files changed, 37 insertions(+), 4 deletions(-) rename spec/integration/{a_simple_spec.rb => basic_spec.rb} (95%) rename spec/integration/{full_failover_spec.rb => failover_spec.rb} (78%) create mode 100644 spec/logger_helper.rb diff --git a/lib/amqp/failover.rb b/lib/amqp/failover.rb index 1a7b53d..8719a74 100644 --- a/lib/amqp/failover.rb +++ b/lib/amqp/failover.rb @@ -24,9 +24,12 @@ module AMQP @options = default_options.merge(opts) end - # pluggable logger specifically for tracking failover and fallbacks - def self.logger - @logger ||= Logger.new + class << self + # pluggable logger specifically for tracking failover and fallbacks + def logger + @logger ||= Logger.new + end + attr_writer :logger end def default_options diff --git a/spec/integration/a_simple_spec.rb b/spec/integration/basic_spec.rb similarity index 95% rename from spec/integration/a_simple_spec.rb rename to spec/integration/basic_spec.rb index 541d14e..ea0fbff 100644 --- a/spec/integration/a_simple_spec.rb +++ b/spec/integration/basic_spec.rb @@ -7,7 +7,7 @@ require 'amqp' require 'amqp/server' require 'server_helper' -describe "A simple AMQP connection with FailoverClient loaded" do +describe "Basic AMQP connection with FailoverClient loaded" do after(:all) do ServerHelper.clear_logs diff --git a/spec/integration/full_failover_spec.rb b/spec/integration/failover_spec.rb similarity index 78% rename from spec/integration/full_failover_spec.rb rename to spec/integration/failover_spec.rb index 2b96936..a0eaddc 100644 --- a/spec/integration/full_failover_spec.rb +++ b/spec/integration/failover_spec.rb @@ -4,11 +4,18 @@ $LOAD_PATH.unshift File.expand_path(File.dirname(__FILE__)) require 'spec_helper' require 'amqp/server' require 'server_helper' +require 'logger_helper' describe "Full Failover support of AMQP gem" do + before(:all) do + @flog = LoggerHelper.new + AMQP::Failover.logger = @flog + end + after(:all) do ServerHelper.clear_logs + AMQP::Failover.logger = nil end it "should be able to connect" do @@ -41,9 +48,14 @@ describe "Full Failover support of AMQP gem" do serv1.stop EM.add_timer(0.1) { conn.should be_connected + [:error, :info].each do |i| + @flog.send("#{i}_log").should have(1).item + @flog.send("#{i}_log")[0][0].should match(/connect to or lost connection.+25672.+attempting connection.+35672/i) + end conn.settings[:port].should == 35672 serv1.log.should have(3).items serv2.log.should have(3).items + conn.close EM.add_timer(0.1) { serv2.stop EM.stop diff --git a/spec/logger_helper.rb b/spec/logger_helper.rb new file mode 100644 index 0000000..48022b9 --- /dev/null +++ b/spec/logger_helper.rb @@ -0,0 +1,18 @@ +# encoding: utf-8 + +class LoggerHelper + + attr_accessor :error_log + attr_accessor :info_log + + def info(*args) + @info_log ||= [] + @info_log << args + end + + def error(*args) + @error_log ||= [] + @error_log << args + end + +end From a88d3b32842773325b068c2b3bb2fd368b636340 Mon Sep 17 00:00:00 2001 From: Jim Myhrberg Date: Tue, 1 Feb 2011 10:41:05 +0000 Subject: [PATCH 19/25] failover integration spec is "complete", for now --- spec/integration/failover_spec.rb | 74 ++++++++++++++++++++++++++----- 1 file changed, 64 insertions(+), 10 deletions(-) diff --git a/spec/integration/failover_spec.rb b/spec/integration/failover_spec.rb index a0eaddc..a2e11e4 100644 --- a/spec/integration/failover_spec.rb +++ b/spec/integration/failover_spec.rb @@ -8,7 +8,7 @@ require 'logger_helper' describe "Full Failover support of AMQP gem" do - before(:all) do + before(:each) do @flog = LoggerHelper.new AMQP::Failover.logger = @flog end @@ -19,10 +19,11 @@ describe "Full Failover support of AMQP gem" do end it "should be able to connect" do + port1 = 15672 EM.run { - serv = start_server(15672) + serv = start_server(port1) EM.add_timer(0.1) { - conn = AMQP.connect(:host => 'localhost', :port => 15672) + conn = AMQP.connect(:host => 'localhost', :port => port1) conn.failover.should be_nil EM.add_timer(0.1) { conn.should be_connected @@ -33,26 +34,32 @@ describe "Full Failover support of AMQP gem" do end it "should be able to connect and failover" do + port1 = 25672 + port2 = 35672 EM.run { - serv1 = start_server(25672) - serv2 = start_server(35672) + # start mock amqp servers + serv1 = start_server(port1) + serv2 = start_server(port2) EM.add_timer(0.1) { - conn = AMQP.connect({:hosts => [{:port => 25672}, {:port => 35672}]}) - conn.failover.primary[:port].should == 25672 - conn.settings[:port].should == 25672 + # start amqp client connection and make sure it's picked the right config + conn = AMQP.connect({:hosts => [{:port => port1}, {:port => port2}]}) + conn.failover.primary[:port].should == port1 + conn.settings[:port].should == port1 conn.settings.should == conn.failover.primary EM.add_timer(0.1) { + # make sure client connected to the correct server, then kill server conn.should be_connected serv1.log.should have(3).items serv2.log.should have(0).items serv1.stop EM.add_timer(0.1) { + # make sure client performed a failover when primary server died conn.should be_connected [:error, :info].each do |i| @flog.send("#{i}_log").should have(1).item - @flog.send("#{i}_log")[0][0].should match(/connect to or lost connection.+25672.+attempting connection.+35672/i) + @flog.send("#{i}_log")[0][0].should match(/connect to or lost connection.+#{port1}.+attempting connection.+#{port2}/i) end - conn.settings[:port].should == 35672 + conn.settings[:port].should == port2 serv1.log.should have(3).items serv2.log.should have(3).items conn.close @@ -66,4 +73,51 @@ describe "Full Failover support of AMQP gem" do } end + it "should be able to fallback when primary server returns" do + port1 = 45672 + port2 = 55672 + lambda { + EM.run { + # start mock amqp servers + serv1 = start_server(port1) + serv2 = start_server(port2) + EM.add_timer(0.1) { + # start amqp client connection and make sure it's picked the right config + conn = AMQP.connect({:hosts => [{:port => port1}, {:port => port2}], :fallback => true, :fallback_interval => 0.1}) + conn.failover.primary[:port].should == port1 + conn.settings[:port].should == port1 + conn.settings.should == conn.failover.primary + EM.add_timer(0.1) { + # make sure client connected to the correct server, then kill server + conn.should be_connected + serv1.log.should have(3).items + serv2.log.should have(0).items + serv1.stop + EM.add_timer(0.1) { + # make sure client performed a failover when primary server died + conn.should be_connected + [:error, :info].each do |i| + @flog.send("#{i}_log").should have(1).item + @flog.send("#{i}_log")[0][0].should match(/connect to or lost connection.+#{port1}.+attempting connection.+#{port2}/i) + end + conn.settings[:port].should == port2 + serv1.log.should have(3).items + serv2.log.should have(3).items + serv3 = start_server(port1) + EM.add_timer(0.2) { + # by this point client should have raised a SystemExit exception + serv2.stop + EM.stop + } + } + } + } + } + }.should raise_error(SystemExit, "exit") + [:error, :info].each do |i| + @flog.send("#{i}_log").should have(2).item + @flog.send("#{i}_log")[1][0].should match(/primary server.+45672.+performing clean exit/i) + end + end + end From 0dda4399cbc7be60ce72b9f045801affb74ad847 Mon Sep 17 00:00:00 2001 From: Jim Myhrberg Date: Tue, 1 Feb 2011 10:42:41 +0000 Subject: [PATCH 20/25] minor tweak to integrations specs --- spec/integration/basic_spec.rb | 2 +- spec/integration/failover_spec.rb | 2 +- spec/server_helper.rb | 1 - 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/spec/integration/basic_spec.rb b/spec/integration/basic_spec.rb index ea0fbff..1a79c43 100644 --- a/spec/integration/basic_spec.rb +++ b/spec/integration/basic_spec.rb @@ -9,7 +9,7 @@ require 'server_helper' describe "Basic AMQP connection with FailoverClient loaded" do - after(:all) do + after(:each) do ServerHelper.clear_logs end diff --git a/spec/integration/failover_spec.rb b/spec/integration/failover_spec.rb index a2e11e4..595eeec 100644 --- a/spec/integration/failover_spec.rb +++ b/spec/integration/failover_spec.rb @@ -13,7 +13,7 @@ describe "Full Failover support of AMQP gem" do AMQP::Failover.logger = @flog end - after(:all) do + after(:each) do ServerHelper.clear_logs AMQP::Failover.logger = nil end diff --git a/spec/server_helper.rb b/spec/server_helper.rb index babb28d..153f059 100644 --- a/spec/server_helper.rb +++ b/spec/server_helper.rb @@ -60,7 +60,6 @@ module AmqpServer # customize log output def log(*args) - # puts "\n>>>>>> Process.pid / $PORT: " + Process.pid.inspect + " / #{$PORT}\n" args = {:method => args[0], :class => args[1].payload.class, :pid => Process.pid} filename = File.expand_path("server_helper-port#{$PORT}.log", File.dirname(__FILE__)) File.open(filename, 'a') do |f| From 80369c94187397c9837a0920ef80b6124ca047dc Mon Sep 17 00:00:00 2001 From: Jim Myhrberg Date: Tue, 1 Feb 2011 11:09:39 +0000 Subject: [PATCH 21/25] added readme and license files --- LICENSE | 20 ++++++++++++ README.md | 97 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 117 insertions(+) create mode 100644 LICENSE create mode 100644 README.md diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..3fdbbd8 --- /dev/null +++ b/LICENSE @@ -0,0 +1,20 @@ +Copyright (c) 2011 Jim Myhrberg & Global Personals, Ltd. + +Permission is hereby granted, free of charge, to any person obtaining +a copy of this software and associated documentation files (the +"Software"), to deal in the Software without restriction, including +without limitation the rights to use, copy, modify, merge, publish, +distribute, sublicense, and/or sell copies of the Software, and to +permit persons to whom the Software is furnished to do so, subject to +the following conditions: + +The above copyright notice and this permission notice shall be +included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..87b271e --- /dev/null +++ b/README.md @@ -0,0 +1,97 @@ +# amqp-failover # + +Add multi-server support with failover and fallback to the [amqp](https://github.com/ruby-amqp/amqp) gem. + + +## Basic Usage ## + + require 'mq' + require 'amqp/failover' + opts = [{:port => 5672}, {:port => 5673}] + AMQP.start(opts) do + # code... + end + +By default the client will connect to `localhost:5672`, but if for any reason it can't connect, or looses connection to that server, it'll attempt to connect to `localhost:5673` instead. + + +## Options Formats ## + +### Standard Non-Failover ### + +Hash: + + opts = {:host => "hostname", :port => 5673} + +URL: + + opts = "amqp://user:pass@hostname:5673/" + +### With Failover ### + +URLs + + opts = "amqp://localhost:5672/,amqp://localhost:5673/" + +Array of Hashes: + + opts = [{:port => 5672}, {:port => 5673}] + +Array of URLs: + + opts = ["amqp://localhost:5672/", "amqp://localhost:5673/"] + +Specify AMQP servers and Failover options by passing a Hash containing a `:hosts` key with a value of either of the above three examples: + + opts = {:hosts => "amqp://localhost:5672/,amqp://localhost:5673/", :fallback => true} + opts = {:hosts => [{:port => 5672}, {:port => 5673}], :fallback => true} + opts = {:hosts => ["amqp://localhost:5672/", "amqp://localhost:5673/"], :fallback => true} + +## Failover Options ## + +* `:retry_timeout`, time to wait before retrying a specific AMQP config after failure. +* `:fallback`, check for the return of the primary server, and fallback to it if and when it returns. WARNING: This currently calls `Process.exit` cause I haven't figured out a way to artificially kill the EM connection without the AMQP channels also being closed, which causes nothing to work even after EM connects to the primary server. It works for me cause dead workers are automatically relaunched with their default config. +* `:fallback_interval`, seconds between each check for original server if :fallback is true. +* `:selection`, not yet implemented. + + +## Todo ## + +* Figure out a sane way to fallback without having to kill the Ruby process. +* Better Readme/Documentation. +* Convince get failover functionality merged in, or otherwise rewritten/added to the official AMQP gem. + + +## Note on Patches/Pull Requests ## + +* Fork the project. +* Make your feature addition or bug fix. +* Add tests for it. This is important so I don't break it in a + future version unintentionally. +* Commit, do not mess with rakefile, version, or history. + (if you want to have your own version, that is fine but bump version in a commit by itself I can ignore when I pull) +* Send me a pull request. Bonus points for topic branches. + + +## Liccense and Copyright ## + +Copyright (c) 2011 Jim Myhrberg & Global Personals, Ltd. + +Permission is hereby granted, free of charge, to any person obtaining +a copy of this software and associated documentation files (the +"Software"), to deal in the Software without restriction, including +without limitation the rights to use, copy, modify, merge, publish, +distribute, sublicense, and/or sell copies of the Software, and to +permit persons to whom the Software is furnished to do so, subject to +the following conditions: + +The above copyright notice and this permission notice shall be +included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. From 575df3690e32b2817e52490e649ceaee063d5448 Mon Sep 17 00:00:00 2001 From: Jim Myhrberg Date: Tue, 1 Feb 2011 14:06:52 +0000 Subject: [PATCH 22/25] added :primary_config failover option --- lib/amqp/failover.rb | 8 +++++--- lib/amqp/failover/ext/amqp/client.rb | 2 ++ spec/integration/failover_spec.rb | 20 +++++++++++++++++++- 3 files changed, 26 insertions(+), 4 deletions(-) diff --git a/lib/amqp/failover.rb b/lib/amqp/failover.rb index 8719a74..1c1130d 100644 --- a/lib/amqp/failover.rb +++ b/lib/amqp/failover.rb @@ -22,6 +22,7 @@ module AMQP def initialize(confs = nil, opts = {}) @configs = Failover::Configurations.new(confs) @options = default_options.merge(opts) + @configs.primary_ref = @options[:primary_config] end class << self @@ -33,9 +34,10 @@ module AMQP end def default_options - { :retry_timeout => 1, - :selection => :sequential, #TODO: Impliment next server selection algorithm - :fallback => false, #TODO: Enable by default once a sane implimentation is figured out + { :primary_config => 0, + :retry_timeout => 1, + :selection => :sequential, #TODO: Implement next server selection algorithm + :fallback => false, #TODO: Enable by default once a sane implementation is figured out :fallback_interval => 10 } end diff --git a/lib/amqp/failover/ext/amqp/client.rb b/lib/amqp/failover/ext/amqp/client.rb index e3a0092..804fb99 100644 --- a/lib/amqp/failover/ext/amqp/client.rb +++ b/lib/amqp/failover/ext/amqp/client.rb @@ -22,6 +22,8 @@ module AMQP # # Available failover options are: # - :retry_timeout, time to wait before retrying a specific AMQP config after failure. + # - :primary_config, specify which of the supplied configurations is it the primary one. The default + # value is 0, the first item in the config array. Use 1 for the second and so on. # - :fallback, check for the return of the primary server, and fallback to it if and when it returns. # - :fallback_interval, seconds between each check for original server if :fallback is true. # - :selection, not yet implimented. diff --git a/spec/integration/failover_spec.rb b/spec/integration/failover_spec.rb index 595eeec..4ccb8dd 100644 --- a/spec/integration/failover_spec.rb +++ b/spec/integration/failover_spec.rb @@ -6,7 +6,7 @@ require 'amqp/server' require 'server_helper' require 'logger_helper' -describe "Full Failover support of AMQP gem" do +describe "Failover support loaded into AMQP gem" do before(:each) do @flog = LoggerHelper.new @@ -120,4 +120,22 @@ describe "Full Failover support of AMQP gem" do end end + it "should abide to :primary_config option" do + port1 = 75672 + port2 = 65672 + EM.run { + serv = start_server(port1) + EM.add_timer(0.1) { + conn = AMQP.connect({:hosts => [{:port => port1}, {:port => port2}], :primary_config => 1}) + conn.failover.primary[:port].should == port2 + conn.settings[:port].should == port2 + conn.settings.should == conn.failover.primary + EM.add_timer(0.1) { + conn.should be_connected + EM.stop + } + } + } + end + end From b7d7024c824765f5644d7872fc581a77966ea383 Mon Sep 17 00:00:00 2001 From: Jim Myhrberg Date: Tue, 1 Feb 2011 14:07:00 +0000 Subject: [PATCH 23/25] updated readme --- README.md | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 87b271e..0158245 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # amqp-failover # -Add multi-server support with failover and fallback to the [amqp](https://github.com/ruby-amqp/amqp) gem. +Add multi-server support with failover and fallback to the [amqp](https://github.com/ruby-amqp/amqp) gem. Failover is configured by providing multiple servers/configurations to `AMQP.start` or `AMQP.connect`. Both methods will still accept the same options input as they always have, they simply now support additional forms of options which when used, enables the failover features. ## Basic Usage ## @@ -50,11 +50,17 @@ Specify AMQP servers and Failover options by passing a Hash containing a `:hosts ## Failover Options ## * `:retry_timeout`, time to wait before retrying a specific AMQP config after failure. +* `:primary_config`, specify which of the supplied configurations is it the primary one. The default value is 0, the first item in the config array. Use 1 for the second and so on. * `:fallback`, check for the return of the primary server, and fallback to it if and when it returns. WARNING: This currently calls `Process.exit` cause I haven't figured out a way to artificially kill the EM connection without the AMQP channels also being closed, which causes nothing to work even after EM connects to the primary server. It works for me cause dead workers are automatically relaunched with their default config. * `:fallback_interval`, seconds between each check for original server if :fallback is true. * `:selection`, not yet implemented. +## Notes ## + +I would recommend you test the failover functionality in your own infrastructure before deploy to production, as this gem is still very much alpha/beta quality, and it does do a little bit of monkey patching to the amqp gem. That said, it there's a number of specs which should ensure things work as advertised, and nothing breaks. We are currently using it at Global Personals without any problems. + + ## Todo ## * Figure out a sane way to fallback without having to kill the Ruby process. From b3f8aac82ff9ff0d621bbad30f40261a46d7a752 Mon Sep 17 00:00:00 2001 From: Jim Myhrberg Date: Tue, 1 Feb 2011 14:18:33 +0000 Subject: [PATCH 24/25] removed yaml dependency by getting rid of now useless #load_yaml and #load_file methods from Failover::Configurations --- lib/amqp/failover.rb | 2 -- lib/amqp/failover/configurations.rb | 9 --------- 2 files changed, 11 deletions(-) diff --git a/lib/amqp/failover.rb b/lib/amqp/failover.rb index 1c1130d..4c68b01 100644 --- a/lib/amqp/failover.rb +++ b/lib/amqp/failover.rb @@ -1,7 +1,5 @@ # encoding: utf-8 -require 'yaml' - require 'amqp/failover_client' require 'amqp/failover/config' require 'amqp/failover/configurations' diff --git a/lib/amqp/failover/configurations.rb b/lib/amqp/failover/configurations.rb index 8c87b1f..24b24bc 100644 --- a/lib/amqp/failover/configurations.rb +++ b/lib/amqp/failover/configurations.rb @@ -66,15 +66,6 @@ module AMQP self[(current+1 == self.size) ? 0 : current+1] if current end - def load_file(file, env = nil) - raise ArgumentError, "Can't find #{file}" unless File.exists?(file) - load(YAML.load_file(file)[env || "development"]) - end - - def load_yaml(data, env = nil) - load(YAML.load(data)[env || "development"]) - end - def load(conf) if conf.is_a?(Array) load_array(conf) From 6b5fb8aa1ffee838f572dfcb04ed9d46390273ff Mon Sep 17 00:00:00 2001 From: Jim Myhrberg Date: Tue, 1 Feb 2011 14:18:46 +0000 Subject: [PATCH 25/25] added todo item in readme --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 0158245..8bd38f4 100644 --- a/README.md +++ b/README.md @@ -65,6 +65,7 @@ I would recommend you test the failover functionality in your own infrastructure * Figure out a sane way to fallback without having to kill the Ruby process. * Better Readme/Documentation. +* Add option for next server selection on failover to be selected by random rather than next on the list. * Convince get failover functionality merged in, or otherwise rewritten/added to the official AMQP gem.