diff --git a/amqp-failover.gemspec b/amqp-failover.gemspec index 8814860..fbaea7d 100644 --- a/amqp-failover.gemspec +++ b/amqp-failover.gemspec @@ -25,5 +25,6 @@ Gem::Specification.new do |s| s.add_development_dependency 'rack-test', '>= 0.5.6' s.add_development_dependency 'rspec', '>= 2.1.0' s.add_development_dependency 'yard', '>= 0.6.3' + s.add_development_dependency 'json', '>= 1.5.0' s.add_development_dependency 'ruby-debug' end diff --git a/lib/amqp/failover.rb b/lib/amqp/failover.rb index 9bd0838..1a7b53d 100644 --- a/lib/amqp/failover.rb +++ b/lib/amqp/failover.rb @@ -32,7 +32,7 @@ module AMQP def default_options { :retry_timeout => 1, :selection => :sequential, #TODO: Impliment next server selection algorithm - :fallback => false, #TODO: Enable by default once a sane solution is found + :fallback => false, #TODO: Enable by default once a sane implimentation is figured out :fallback_interval => 10 } end @@ -58,7 +58,7 @@ module AMQP def add_config(conf = {}, ref = nil) index = configs.index(conf) - configs << Config::Failed.new(conf) if index.nil? + configs.set(conf) if index.nil? refs[ref] = (index || configs.index(conf)) if !ref.nil? end @@ -70,12 +70,12 @@ module AMQP def failed_with(conf = {}, ref = nil, time = nil) time ||= Time.now - if index = configs.index(conf) + if !(index = configs.index(conf)).nil? configs[index].last_fail = time @latest_failed = configs[index] else - configs << Config::Failed.new(conf, time) - @latest_failed = configs.last + @latest_failed = configs.set(conf) + configs.last.last_fail = time end refs[ref] = (index || configs.index(conf)) if !ref.nil? end @@ -87,7 +87,7 @@ module AMQP index = configs.index(after) available = (index > 0) ? configs[index+1..-1] + configs[0..index-1] : configs[1..-1] available.each do |conf| - return conf if conf.last_fail.nil? || (conf.last_fail + retry_timeout.seconds) < Time.now + return conf if conf.last_fail.nil? || (conf.last_fail.to_i + retry_timeout) < Time.now.to_i end return nil end diff --git a/lib/amqp/failover/config.rb b/lib/amqp/failover/config.rb index dbaef58..c8a5fb3 100644 --- a/lib/amqp/failover/config.rb +++ b/lib/amqp/failover/config.rb @@ -34,7 +34,7 @@ module AMQP end return other.last_fail <=> self.last_fail end - super(other) + return 0 end end # Config diff --git a/lib/amqp/failover/ext/amqp/client.rb b/lib/amqp/failover/ext/amqp/client.rb index 4e2b4f2..e3a0092 100644 --- a/lib/amqp/failover/ext/amqp/client.rb +++ b/lib/amqp/failover/ext/amqp/client.rb @@ -1,9 +1,11 @@ # encoding: utf-8 +AMQP.client = AMQP::FailoverClient + module AMQP module Client + class << self - alias :connect_without_failover :connect # Connect with Failover supports specifying multiple AMQP servers and configurations. # @@ -20,13 +22,15 @@ module AMQP # # Available failover options are: # - :retry_timeout, time to wait before retrying a specific AMQP config after failure. - # - :fallback, monitor for original server's return and fallback to it if so. + # - :fallback, check for the return of the primary server, and fallback to it if and when it returns. # - :fallback_interval, seconds between each check for original server if :fallback is true. + # - :selection, not yet implimented. # def connect_with_failover(opts = nil) opts = parse_amqp_url_or_opts(opts) connect_without_failover(opts) end + alias :connect_without_failover :connect alias :connect :connect_with_failover def parse_amqp_url_or_opts(opts = nil) @@ -35,7 +39,7 @@ module AMQP elsif opts.is_a?(Array) opts = init_failover(opts) elsif opts.is_a?(Hash) && opts[:hosts].is_a?(Array) - confs = opts.delete[:hosts] + confs = opts.delete(:hosts) opts = init_failover(confs, opts) end opts @@ -43,10 +47,19 @@ module AMQP def init_failover(confs = nil, opts = {}) if !confs.nil? && confs.size > 0 - failover.primary.merge({ :failover => Failover.new(confs, opts) }) + failover = Failover.new(confs, opts) + failover.primary.merge({ :failover => failover }) end end - + end # << self + + def disconnected_with_failover + return failover_switch if @failover + disconnected_without_failover + end + alias :disconnected_without_failover :disconnected + alias :disconnected :disconnected_with_failover + end # Client end # AMQP \ No newline at end of file diff --git a/lib/amqp/failover_client.rb b/lib/amqp/failover_client.rb index fd6c048..c08b371 100644 --- a/lib/amqp/failover_client.rb +++ b/lib/amqp/failover_client.rb @@ -12,7 +12,24 @@ module AMQP def self.extended(base) if (base.failover = base.settings.delete(:failover)) - base.on_disconnect = base.method(:failover_leap) + base.on_disconnect = base.method(:disconnected) + end + end + + def failover_switch + if (new_settings = @failover.from(@settings)) + log_message = "Could not connect to or lost connection to server #{@settings[:host]}:#{@settings[:port]}. " + + "Attempting connection to: #{new_settings[:host]}:#{new_settings[:port]}" + logger.error(log_message) + logger.info(log_message) + + if @failover.options[:fallback] && @failover.primary == @settings + fallback(@failover.primary, @failover.fallback_interval) + end + @settings = new_settings + reconnect + else + raise Error, "Could not connect to server #{@settings[:host]}:#{@settings[:port]}" end end @@ -31,23 +48,6 @@ module AMQP Process.exit end - def failover_leap - if (new_settings = @failover.from(@settings)) - log_message = "Could not connect to or lost connection to server #{@settings[:host]}:#{@settings[:port]}. " + - "Attempting connection to: #{new_settings[:host]}:#{new_settings[:port]}" - logger.error(log_message) - logger.info(log_message) - - if @failover.options[:fallback] && @failover.primary == @settings - fallback(@failover.primary, @failover.fallback_interval) - end - @settings = new_settings - reconnect - else - raise Error, "Could not connect to server #{@settings[:host]}:#{@settings[:port]}" - end - end - def fallback(conf = {}, retry_interval = nil) @fallback_monitor = Failover::ServerDiscovery.monitor(conf, retry_interval) do fallback_callback.call(conf, retry_interval) diff --git a/spec/integration/a_simple_spec.rb b/spec/integration/a_simple_spec.rb new file mode 100644 index 0000000..541d14e --- /dev/null +++ b/spec/integration/a_simple_spec.rb @@ -0,0 +1,59 @@ +# encoding: utf-8 +$LOAD_PATH.unshift File.expand_path(File.dirname(__FILE__)) + +require 'spec_helper' +require 'mq' +require 'amqp' +require 'amqp/server' +require 'server_helper' + +describe "A simple AMQP connection with FailoverClient loaded" do + + after(:all) do + ServerHelper.clear_logs + end + + it "should be using FailoverClient" do + AMQP.client.should == AMQP::FailoverClient + end + + it "should be able to connect" do + EM.run { + port = 15672 + timeout = 2 + serv = start_server(port) + EM.add_timer(1.5) { + conn = AMQP.connect(:host => 'localhost', :port => 15672) + EM.add_timer(0.1) { + conn.should be_connected + serv.stop + log = serv.log + log.size.should == 3 + (0..2).each { |i| log[i]['method'].should == "send" } + log[0]['class'].should == 'AMQP::Protocol::Connection::Start' + log[1]['class'].should == 'AMQP::Protocol::Connection::Tune' + log[2]['class'].should == 'AMQP::Protocol::Connection::OpenOk' + EM.stop + } + } + } + end + + it "should be able to connect and get disconnected" do + EM.run { + serv = start_server(25672) + EM.add_timer(0.1) { + conn = AMQP.connect(:host => 'localhost', :port => 25672) + EM.add_timer(0.1) { + conn.should be_connected + serv.stop + EM.add_timer(0.1) { + conn.should_not be_connected + EM.stop + } + } + } + } + end + +end diff --git a/spec/integration/full_failover_spec.rb b/spec/integration/full_failover_spec.rb new file mode 100644 index 0000000..2b96936 --- /dev/null +++ b/spec/integration/full_failover_spec.rb @@ -0,0 +1,57 @@ +# encoding: utf-8 +$LOAD_PATH.unshift File.expand_path(File.dirname(__FILE__)) + +require 'spec_helper' +require 'amqp/server' +require 'server_helper' + +describe "Full Failover support of AMQP gem" do + + after(:all) do + ServerHelper.clear_logs + end + + it "should be able to connect" do + EM.run { + serv = start_server(15672) + EM.add_timer(0.1) { + conn = AMQP.connect(:host => 'localhost', :port => 15672) + conn.failover.should be_nil + EM.add_timer(0.1) { + conn.should be_connected + EM.stop + } + } + } + end + + it "should be able to connect and failover" do + EM.run { + serv1 = start_server(25672) + serv2 = start_server(35672) + EM.add_timer(0.1) { + conn = AMQP.connect({:hosts => [{:port => 25672}, {:port => 35672}]}) + conn.failover.primary[:port].should == 25672 + conn.settings[:port].should == 25672 + conn.settings.should == conn.failover.primary + EM.add_timer(0.1) { + conn.should be_connected + serv1.log.should have(3).items + serv2.log.should have(0).items + serv1.stop + EM.add_timer(0.1) { + conn.should be_connected + conn.settings[:port].should == 35672 + serv1.log.should have(3).items + serv2.log.should have(3).items + EM.add_timer(0.1) { + serv2.stop + EM.stop + } + } + } + } + } + end + +end diff --git a/spec/integration/simple_spec.rb b/spec/integration/simple_spec.rb deleted file mode 100644 index d87d425..0000000 --- a/spec/integration/simple_spec.rb +++ /dev/null @@ -1,48 +0,0 @@ -# encoding: utf-8 -$LOAD_PATH.unshift File.expand_path(File.dirname(__FILE__)) - -require 'spec_helper' -require 'amqp/server' -require 'server_helper' - -describe "Simple AMQP connection with FailoverClient loaded" do - - before(:all) do - @log = ServerHelper.log - AMQP.client = AMQP::FailoverClient - end - - it "should be connected" do - EM.run { - sig = EM.start_server('localhost', 15672, ServerHelper) - conn = AMQP.connect(:host => 'localhost', :port => 15672) - EM.add_timer(0.1) { - conn.should be_connected - @log.size.should == 3 - (0..2).each { |i| @log[i][0].should == "send" } - @log[0][1].payload.should be_a(AMQP::Protocol::Connection::Start) - @log[1][1].payload.should be_a(AMQP::Protocol::Connection::Tune) - @log[2][1].payload.should be_a(AMQP::Protocol::Connection::OpenOk) - EM.stop - } - } - end - - it "should connect and get disconnected" do - lambda { - EM.run { - spid = start_server - conn = AMQP.connect(:host => 'localhost', :port => 15672) - EM.add_timer(0.1) { - conn.should be_connected - stop_server(spid) - EM.add_timer(0.1) { - conn.should_not be_connected - EM.stop - } - } - } - }.should raise_error(AMQP::Error, "Could not connect to server localhost:15672") - end - -end diff --git a/spec/server_helper.rb b/spec/server_helper.rb index ee08359..babb28d 100644 --- a/spec/server_helper.rb +++ b/spec/server_helper.rb @@ -1,36 +1,78 @@ # encoding: utf-8 -module ServerHelper - include AMQP::Server +require 'rubygems' +require 'mq' +require 'amqp' +require 'amqp/server' +require 'json' + +class ServerHelper - class << self - def log - @@log ||= [] - end - attr_writer :log + attr_accessor :stdin + attr_accessor :stdout + attr_accessor :stderr + attr_accessor :pid + + + def initialize(port = nil, timeout = nil) + @port = port + @timout = timeout + File.open(log_file, 'w') {} + @pid = start(port, timeout) end - # log & silence STDOUT output - def log(*args) - @@log << args + def self.clear_logs + Dir.glob(File.expand_path('server_helper*.log', File.dirname(__FILE__))).each do |file| + File.delete(file) + end + end + + def start(port = nil, timeout = nil) + port ||= 15672 + timeout ||= 2 + EM.fork_reactor { + $PORT = port + EM.start_server('localhost', port, AmqpServer) + EM.add_timer(timeout) { EM.stop } + } + end + + def stop + Process.kill('TERM', @pid) + end + + def kill + Process.kill('KILL', @pid) + end + + def log + File.open(log_file).to_a.map{ |l| JSON.parse(l) } + end + + def log_file + File.expand_path("server_helper-port#{@port}.log", File.dirname(__FILE__)) end end +module AmqpServer + include AMQP::Server + + # customize log output + def log(*args) + # puts "\n>>>>>> Process.pid / $PORT: " + Process.pid.inspect + " / #{$PORT}\n" + args = {:method => args[0], :class => args[1].payload.class, :pid => Process.pid} + filename = File.expand_path("server_helper-port#{$PORT}.log", File.dirname(__FILE__)) + File.open(filename, 'a') do |f| + f.write("#{args.to_json}\n") + end + end +end # # Helper methods # -def start_server(port = 15762, timeout = 2) - bef_fork = EM.forks.clone - EM.fork { - EM.start_server('localhost', port, ServerHelper) - EM.add_timer(timeout) { EM.stop } - } - (EM.forks - bef_fork).first +def start_server(port = nil, timeout = nil) + ServerHelper.new(port, timeout) end - -def stop_server(pid) - Process.kill('TERM', pid) -end \ No newline at end of file diff --git a/spec/unit/amqp/failover/config_spec.rb b/spec/unit/amqp/failover/config_spec.rb new file mode 100644 index 0000000..179a1e9 --- /dev/null +++ b/spec/unit/amqp/failover/config_spec.rb @@ -0,0 +1,67 @@ +# encoding: utf-8 +$LOAD_PATH.unshift File.expand_path(File.dirname(__FILE__)) + +require 'spec_helper' + +describe 'AMQP::Failover::Config' do + + before(:each) do + configs = [ + {:host => 'rabbit0.local'}, + {:host => 'rabbit1.local'}, + {:host => 'rabbit2.local', :port => 5673} + ] + @configs = configs.map { |conf| AMQP.settings.merge(conf) } + @fail = AMQP::Failover.new(@configs) + end + + it "should initialize" do + fail = AMQP::Failover::Config.new(@configs[0]) + fail.should == @configs[0] + fail.last_fail.should be_nil + + now = Time.now + fail = AMQP::Failover::Config.new(@configs[1], now) + fail.should == @configs[1] + fail.last_fail.should == now + end + + it "should order properly with #<=>" do + one_hour_ago = (Time.now - 3600) + two_hours_ago = (Time.now - 7200) + + fail = [ AMQP::Failover::Config.new(@configs[0]), + AMQP::Failover::Config.new(@configs[1], one_hour_ago), + AMQP::Failover::Config.new(@configs[2], two_hours_ago) ] + + (fail[1] <=> fail[0]).should == -1 + (fail[0] <=> fail[0]).should == 0 + (fail[0] <=> fail[1]).should == 1 + + (fail[1] <=> fail[2]).should == -1 + (fail[1] <=> fail[1]).should == 0 + (fail[2] <=> fail[1]).should == 1 + + fail.sort[0].last_fail.should == one_hour_ago + fail.sort[1].last_fail.should == two_hours_ago + fail.sort[2].last_fail.should == nil + end + + it "should be ordered by last_fail" do + result = [ AMQP::Failover::Config.new(@configs[1], (Time.now - 60)), + AMQP::Failover::Config.new(@configs[2], (Time.now - (60*25))), + AMQP::Failover::Config.new(@configs[0], (Time.now - 3600)) ] + + origin = [ AMQP::Failover::Config.new(@configs[0], (Time.now - 3600)), + AMQP::Failover::Config.new(@configs[1], (Time.now - 60)), + AMQP::Failover::Config.new(@configs[2], (Time.now - (60*25))) ] + origin.sort.should == result + + origin = [ AMQP::Failover::Config.new(@configs[0]), + AMQP::Failover::Config.new(@configs[1], (Time.now - 60)), + AMQP::Failover::Config.new(@configs[2], (Time.now - (60*25))) ] + origin.sort.should == result + end + +end + diff --git a/spec/unit/amqp/failover/configs_spec.rb b/spec/unit/amqp/failover/configurations_spec.rb similarity index 54% rename from spec/unit/amqp/failover/configs_spec.rb rename to spec/unit/amqp/failover/configurations_spec.rb index f9a79f6..2808815 100644 --- a/spec/unit/amqp/failover/configs_spec.rb +++ b/spec/unit/amqp/failover/configurations_spec.rb @@ -3,20 +3,28 @@ $LOAD_PATH.unshift File.expand_path(File.dirname(__FILE__)) require 'spec_helper' -describe 'AMQP::Failover::Configs' do +describe 'AMQP::Failover::Configurations' do before(:each) do - @conf = AMQP::Failover::Configs.new + @conf = AMQP::Failover::Configurations.new @raw_configs = [ - {:host => 'rabbit3.local'}, - {:host => 'rabbit2.local'}, + {:host => 'rabbit0.local'}, + {:host => 'rabbit1.local'}, {:host => 'rabbit2.local', :port => 5673} ] @configs = @raw_configs.map { |conf| AMQP.settings.merge(conf) } end + it "should initialize" do + confs = AMQP::Failover::Configurations.new(@raw_configs) + confs.each_with_index do |conf, i| + conf.should be_a(AMQP::Failover::Config) + conf.should == @configs[i] + end + end + it "should set and get configs" do - @conf.primary.should == 0 + @conf.primary_ref.should == 0 @conf.should have(0).items @conf.set(@raw_configs[0]) @@ -29,6 +37,7 @@ describe 'AMQP::Failover::Configs' do @conf.get(1).should == @configs[1] @conf[1].should == @configs[1] + # should just create a ref, as config exists @conf.set(@raw_configs[1], :the_one) @conf.should have(2).items @conf.get(1).should == @configs[1] @@ -36,9 +45,9 @@ describe 'AMQP::Failover::Configs' do @conf.load_array(@raw_configs) @conf.should have(3).items - @conf.get_primary.should == @configs[0] - @conf.primary = 1 - @conf.get_primary.should == @configs[1] + @conf.primary.should == @configs[0] + @conf.primary_ref = 1 + @conf.primary.should == @configs[1] @conf[:primary].should == @configs[1] end @@ -50,5 +59,21 @@ describe 'AMQP::Failover::Configs' do @conf.find_next(@configs[2]).should == @configs[0] end + it "should #load_hash" do + @conf.should have(0).items + @conf.load_hash(@raw_configs[0]) + @conf.should have(1).items + @conf.primary.should == @configs[0] + end + + it "should #load_array" do + @conf.load_hash(:host => 'rabbid-rabbit') + @conf.should have(1).items + @conf.load_array(@raw_configs) + @conf.should have(3).items + @conf.should == @configs + @conf.primary.should == @configs[0] + end + end diff --git a/spec/unit/amqp/failover/server_discovery_spec.rb b/spec/unit/amqp/failover/server_discovery_spec.rb index b829163..68afdd8 100644 --- a/spec/unit/amqp/failover/server_discovery_spec.rb +++ b/spec/unit/amqp/failover/server_discovery_spec.rb @@ -13,6 +13,11 @@ describe 'AMQP::Failover::ServerDiscovery' do @retry_interval = 0.01 end + after(:all) do + $called = nil + $start_count = nil + end + it "should initialize" do EM.run { EM.start_server('127.0.0.1', 9999) diff --git a/spec/unit/amqp/failover_spec.rb b/spec/unit/amqp/failover_spec.rb new file mode 100644 index 0000000..b088b72 --- /dev/null +++ b/spec/unit/amqp/failover_spec.rb @@ -0,0 +1,69 @@ +# encoding: utf-8 +$LOAD_PATH.unshift File.expand_path(File.dirname(__FILE__)) + +require 'spec_helper' + +describe 'AMQP::Failover' do + + before(:each) do + configs = [ + {:host => 'rabbit0.local'}, + {:host => 'rabbit1.local'}, + {:host => 'rabbit2.local', :port => 5673} + ] + @configs = configs.map { |conf| AMQP.settings.merge(conf) } + @fail = AMQP::Failover.new(@configs) + end + + it "should initialize" do + @fail.configs.should == @configs + end + + it "should #add_config" do + @fail.instance_variable_set("@configs", nil) + @fail.configs.should == [] + @fail.add_config(@configs[0]) + @fail.configs.should have(1).item + @fail.configs.should == [@configs[0]] + @fail.refs.should == {} + @fail.add_config(@configs[1], :hello) + @fail.configs.should have(2).items + @fail.configs.should include(@configs[1]) + @fail.get_by_ref(:hello).should == @configs[1] + end + + it "should #get_by_conf" do + fetched = @fail.get_by_conf(@configs[1]) + fetched.should == @configs[1] + fetched.class.should == AMQP::Failover::Config + fetched.last_fail.should be_nil + end + + it "should #fail_with" do + fail = AMQP::Failover.new + now = Time.now + fail.failed_with(@configs[0], 0, now) + fail.latest_failed.should == @configs[0] + fail.last_fail_of(@configs[0]).should == now + fail.last_fail_of(0).should == now + end + + it "should find #next_config" do + @fail.failed_with(@configs[1]) + @fail.next_config.should == @configs[2] + @fail.next_config.should == @configs[2] + @fail.failed_with(@configs[2]) + @fail.next_config.should == @configs[0] + @fail.failed_with(@configs[0]) + @fail.next_config.should be_nil + end + + it "should #failover_from" do + now = Time.now + @fail.failover_from(@configs[0], now).should == @configs[1] + @fail.latest_failed.should == @configs[0] + @fail.latest_failed.last_fail.should == now + end + +end +