diff --git a/.document b/.document new file mode 100644 index 0000000..412e5f1 --- /dev/null +++ b/.document @@ -0,0 +1,6 @@ +README.md +lib/**/*.rb +bin/* +features/**/*.feature +LICENSE + diff --git a/.gitignore b/.gitignore index 4040c6c..1724f70 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,28 @@ +## MAC OS +.DS_Store + +## TEXTMATE +*.tmproj +tmtags + +## EMACS +*~ +\#* +.\#* + +## VIM +*.swp + +## PROJECT::GENERAL +coverage +rdoc *.gem .bundle Gemfile.lock pkg/* + +## PROJECT::SPECIFIC +.yardoc/* +spec/db/* +doc/* + diff --git a/.rspec b/.rspec new file mode 100644 index 0000000..8c18f1a --- /dev/null +++ b/.rspec @@ -0,0 +1,2 @@ +--format documentation +--color diff --git a/.rvmrc b/.rvmrc new file mode 100644 index 0000000..f1c0b0c --- /dev/null +++ b/.rvmrc @@ -0,0 +1 @@ +rvm gemset use amqp-failover diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..3fdbbd8 --- /dev/null +++ b/LICENSE @@ -0,0 +1,20 @@ +Copyright (c) 2011 Jim Myhrberg & Global Personals, Ltd. + +Permission is hereby granted, free of charge, to any person obtaining +a copy of this software and associated documentation files (the +"Software"), to deal in the Software without restriction, including +without limitation the rights to use, copy, modify, merge, publish, +distribute, sublicense, and/or sell copies of the Software, and to +permit persons to whom the Software is furnished to do so, subject to +the following conditions: + +The above copyright notice and this permission notice shall be +included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/README.md b/README.md new file mode 100644 index 0000000..8bd38f4 --- /dev/null +++ b/README.md @@ -0,0 +1,104 @@ +# amqp-failover # + +Add multi-server support with failover and fallback to the [amqp](https://github.com/ruby-amqp/amqp) gem. Failover is configured by providing multiple servers/configurations to `AMQP.start` or `AMQP.connect`. Both methods will still accept the same options input as they always have, they simply now support additional forms of options which when used, enables the failover features. + + +## Basic Usage ## + + require 'mq' + require 'amqp/failover' + opts = [{:port => 5672}, {:port => 5673}] + AMQP.start(opts) do + # code... + end + +By default the client will connect to `localhost:5672`, but if for any reason it can't connect, or looses connection to that server, it'll attempt to connect to `localhost:5673` instead. + + +## Options Formats ## + +### Standard Non-Failover ### + +Hash: + + opts = {:host => "hostname", :port => 5673} + +URL: + + opts = "amqp://user:pass@hostname:5673/" + +### With Failover ### + +URLs + + opts = "amqp://localhost:5672/,amqp://localhost:5673/" + +Array of Hashes: + + opts = [{:port => 5672}, {:port => 5673}] + +Array of URLs: + + opts = ["amqp://localhost:5672/", "amqp://localhost:5673/"] + +Specify AMQP servers and Failover options by passing a Hash containing a `:hosts` key with a value of either of the above three examples: + + opts = {:hosts => "amqp://localhost:5672/,amqp://localhost:5673/", :fallback => true} + opts = {:hosts => [{:port => 5672}, {:port => 5673}], :fallback => true} + opts = {:hosts => ["amqp://localhost:5672/", "amqp://localhost:5673/"], :fallback => true} + +## Failover Options ## + +* `:retry_timeout`, time to wait before retrying a specific AMQP config after failure. +* `:primary_config`, specify which of the supplied configurations is it the primary one. The default value is 0, the first item in the config array. Use 1 for the second and so on. +* `:fallback`, check for the return of the primary server, and fallback to it if and when it returns. WARNING: This currently calls `Process.exit` cause I haven't figured out a way to artificially kill the EM connection without the AMQP channels also being closed, which causes nothing to work even after EM connects to the primary server. It works for me cause dead workers are automatically relaunched with their default config. +* `:fallback_interval`, seconds between each check for original server if :fallback is true. +* `:selection`, not yet implemented. + + +## Notes ## + +I would recommend you test the failover functionality in your own infrastructure before deploy to production, as this gem is still very much alpha/beta quality, and it does do a little bit of monkey patching to the amqp gem. That said, it there's a number of specs which should ensure things work as advertised, and nothing breaks. We are currently using it at Global Personals without any problems. + + +## Todo ## + +* Figure out a sane way to fallback without having to kill the Ruby process. +* Better Readme/Documentation. +* Add option for next server selection on failover to be selected by random rather than next on the list. +* Convince get failover functionality merged in, or otherwise rewritten/added to the official AMQP gem. + + +## Note on Patches/Pull Requests ## + +* Fork the project. +* Make your feature addition or bug fix. +* Add tests for it. This is important so I don't break it in a + future version unintentionally. +* Commit, do not mess with rakefile, version, or history. + (if you want to have your own version, that is fine but bump version in a commit by itself I can ignore when I pull) +* Send me a pull request. Bonus points for topic branches. + + +## Liccense and Copyright ## + +Copyright (c) 2011 Jim Myhrberg & Global Personals, Ltd. + +Permission is hereby granted, free of charge, to any person obtaining +a copy of this software and associated documentation files (the +"Software"), to deal in the Software without restriction, including +without limitation the rights to use, copy, modify, merge, publish, +distribute, sublicense, and/or sell copies of the Software, and to +permit persons to whom the Software is furnished to do so, subject to +the following conditions: + +The above copyright notice and this permission notice shall be +included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE +LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION +WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/Rakefile b/Rakefile index 14cfe0b..df548be 100644 --- a/Rakefile +++ b/Rakefile @@ -1,2 +1,56 @@ +$LOAD_PATH.unshift File.expand_path("lib", File.dirname(__FILE__)) + require 'bundler' Bundler::GemHelper.install_tasks + + +# +# Rspec +# + +require 'rspec/core/rake_task' + +RSpec::Core::RakeTask.new('spec:all') do |spec| + spec.pattern = [ 'spec/unit/**/*_spec.rb', + 'spec/integration/**/*_spec.rb' ] +end + +desc "Run unit specs" +task :spec => ["spec:unit"] +RSpec::Core::RakeTask.new('spec:unit') do |spec| + spec.pattern = 'spec/unit/**/*_spec.rb' +end + +RSpec::Core::RakeTask.new('spec:integration') do |spec| + spec.pattern = 'spec/integration/**/*_spec.rb' +end + +RSpec::Core::RakeTask.new(:rcov) do |spec| + spec.pattern = 'spec/**/*_spec.rb' + spec.rcov = true +end + + +# +# Yard +# + +begin + require 'yard' + YARD::Rake::YardocTask.new +rescue LoadError + task :yardoc do + abort "YARD is not available. In order to run yardoc, you must: sudo gem install yard" + end +end + + +# +# Misc. +# + +desc "Start irb with amqp-failover pre-loaded" +task :console do + exec "irb -r spec/spec_helper" +end +task :c => :console diff --git a/amqp-failover.gemspec b/amqp-failover.gemspec index 5709d93..fbaea7d 100644 --- a/amqp-failover.gemspec +++ b/amqp-failover.gemspec @@ -1,16 +1,16 @@ # -*- encoding: utf-8 -*- $:.push File.expand_path("../lib", __FILE__) -require "amqp-failover/version" +require "amqp/failover/version" Gem::Specification.new do |s| s.name = "amqp-failover" - s.version = Amqp::Failover::VERSION + s.version = AMQP::Failover::VERSION s.platform = Gem::Platform::RUBY - s.authors = ["TODO: Write your name"] - s.email = ["TODO: Write your email address"] - s.homepage = "" - s.summary = %q{TODO: Write a gem summary} - s.description = %q{TODO: Write a gem description} + s.authors = ["Jim Myhrberg"] + s.email = ["contact@jimeh.me"] + s.homepage = 'http://github.com/jimeh/amqp-failover' + s.summary = 'Add multi-server failover and fallback to amqp gem.' + s.description = 'Add multi-server failover and fallback to amqp gem.' s.rubyforge_project = "amqp-failover" @@ -18,4 +18,13 @@ Gem::Specification.new do |s| s.test_files = `git ls-files -- {test,spec,features}/*`.split("\n") s.executables = `git ls-files -- bin/*`.split("\n").map{ |f| File.basename(f) } s.require_paths = ["lib"] + + s.add_runtime_dependency 'amqp', '>= 0.7.0' + + s.add_development_dependency 'rake', '>= 0.8.7' + s.add_development_dependency 'rack-test', '>= 0.5.6' + s.add_development_dependency 'rspec', '>= 2.1.0' + s.add_development_dependency 'yard', '>= 0.6.3' + s.add_development_dependency 'json', '>= 1.5.0' + s.add_development_dependency 'ruby-debug' end diff --git a/lib/amqp-failover.rb b/lib/amqp-failover.rb deleted file mode 100644 index 9f348e0..0000000 --- a/lib/amqp-failover.rb +++ /dev/null @@ -1,5 +0,0 @@ -module Amqp - module Failover - # Your code goes here... - end -end diff --git a/lib/amqp-failover/version.rb b/lib/amqp-failover/version.rb deleted file mode 100644 index 23ec700..0000000 --- a/lib/amqp-failover/version.rb +++ /dev/null @@ -1,5 +0,0 @@ -module Amqp - module Failover - VERSION = "0.0.1" - end -end diff --git a/lib/amqp/failover.rb b/lib/amqp/failover.rb new file mode 100644 index 0000000..4c68b01 --- /dev/null +++ b/lib/amqp/failover.rb @@ -0,0 +1,111 @@ +# encoding: utf-8 + +require 'amqp/failover_client' +require 'amqp/failover/config' +require 'amqp/failover/configurations' +require 'amqp/failover/logger' +require 'amqp/failover/server_discovery' +require 'amqp/failover/version' +require 'amqp/failover/ext/amqp/client.rb' + + +module AMQP + class Failover + + attr_reader :latest_failed + attr_accessor :primary + attr_accessor :retry_timeout + attr_accessor :fallback + + def initialize(confs = nil, opts = {}) + @configs = Failover::Configurations.new(confs) + @options = default_options.merge(opts) + @configs.primary_ref = @options[:primary_config] + end + + class << self + # pluggable logger specifically for tracking failover and fallbacks + def logger + @logger ||= Logger.new + end + attr_writer :logger + end + + def default_options + { :primary_config => 0, + :retry_timeout => 1, + :selection => :sequential, #TODO: Implement next server selection algorithm + :fallback => false, #TODO: Enable by default once a sane implementation is figured out + :fallback_interval => 10 } + end + + def options + @options ||= {} + end + + def fallback_interval + options[:fallback_interval] ||= default_options[:fallback_interval] + end + + def primary + configs[:primary] + end + + def refs + @refs ||= {} + end + + def configs + @configs ||= Configurations.new + end + + def add_config(conf = {}, ref = nil) + index = configs.index(conf) + configs.set(conf) if index.nil? + refs[ref] = (index || configs.index(conf)) if !ref.nil? + end + + def failover_from(conf = {}, time = nil) + failed_with(conf, nil, time) + next_config + end + alias :from :failover_from + + def failed_with(conf = {}, ref = nil, time = nil) + time ||= Time.now + if !(index = configs.index(conf)).nil? + configs[index].last_fail = time + @latest_failed = configs[index] + else + @latest_failed = configs.set(conf) + configs.last.last_fail = time + end + refs[ref] = (index || configs.index(conf)) if !ref.nil? + end + + def next_config(retry_timeout = nil, after = nil) + return nil if configs.size <= 1 + retry_timeout ||= @options[:retry_timeout] + after ||= @latest_failed + index = configs.index(after) + available = (index > 0) ? configs[index+1..-1] + configs[0..index-1] : configs[1..-1] + available.each do |conf| + return conf if conf.last_fail.nil? || (conf.last_fail.to_i + retry_timeout) < Time.now.to_i + end + return nil + end + + def last_fail_of(match) + ((match.is_a?(Hash) ? get_by_conf(match) : get_by_ref(match)) || Config::Failed.new).last_fail + end + + def get_by_conf(conf = {}) + configs[configs.index(conf)] + end + + def get_by_ref(ref = nil) + configs[refs[ref]] if refs[ref] + end + + end # Failover +end # AMQP diff --git a/lib/amqp/failover/config.rb b/lib/amqp/failover/config.rb new file mode 100644 index 0000000..c8a5fb3 --- /dev/null +++ b/lib/amqp/failover/config.rb @@ -0,0 +1,42 @@ +# encoding: utf-8 + +module AMQP + class Failover + class Config < ::Hash + + attr_accessor :last_fail + + def initialize(hash = {}, last_fail_date = nil) + self.replace(symbolize_keys(defaults.merge(hash))) + self.last_fail = last_fail_date if last_fail_date + end + + def defaults + AMQP.settings + end + + def symbolize_keys(hash = {}) + hash.inject({}) do |result, (key, value)| + result[key.is_a?(String) ? key.to_sym : key] = value + result + end + end + + # order by latest fail, potentially useful if random config selection is used + def <=>(other) + if self.respond_to?(:last_fail) && other.respond_to?(:last_fail) + if self.last_fail.nil? && other.last_fail.nil? + return 0 + elsif self.last_fail.nil? && !other.last_fail.nil? + return 1 + elsif !self.last_fail.nil? && other.last_fail.nil? + return -1 + end + return other.last_fail <=> self.last_fail + end + return 0 + end + + end # Config + end # Failover +end # AMQP diff --git a/lib/amqp/failover/configurations.rb b/lib/amqp/failover/configurations.rb new file mode 100644 index 0000000..24b24bc --- /dev/null +++ b/lib/amqp/failover/configurations.rb @@ -0,0 +1,92 @@ +# encoding: utf-8 + +module AMQP + class Failover + class Configurations < Array + + def initialize(confs = nil) + load(confs) + end + + def [](*args) + if args[0].is_a?(Symbol) + return primary if args[0] == :primary + get(args[0]) + else + super(*args) + end + end + + def []=(*args) + if args[0].is_a?(Symbol) + return primary = args.last if args[0] == :primary + set(args.last, args[0]) + end + super(*args) + end + + def refs + @refs ||= {} + end + + def primary_ref + @primary_ref ||= 0 + end + + def primary_ref=(ref) + @primary_ref = ref + end + + def primary + get(primary_ref) || AMQP.settings + end + + def primary=(conf = {}) + set(conf, primary_ref) + end + + def get(ref = nil) + return self[ref] if ref.is_a?(Fixnum) + self[refs[ref]] if refs[ref] + end + + def set(conf = {}, ref = nil) + conf = Failover::Config.new(conf) if !conf.is_a?(Failover::Config) + if (index = self.index(conf)).nil? + self << conf + else + conf = self[index] + end + refs[ref] = (index || self.index(conf)) if ref + conf + end + + def find_next(conf = {}) + current = self.index(conf) + self[(current+1 == self.size) ? 0 : current+1] if current + end + + def load(conf) + if conf.is_a?(Array) + load_array(conf) + elsif conf.is_a?(Hash) + load_hash(conf) + end + end + + def load_array(confs = []) + self.clear + refs = {} + confs.each do |conf| + conf = AMQP::Client.parse_amqp_url(conf) if conf.is_a?(String) + load_hash(conf) + end + end + + def load_hash(conf = {}) + set(conf) + end + + end # Config + end # Failover +end # AMQP diff --git a/lib/amqp/failover/ext/amqp/client.rb b/lib/amqp/failover/ext/amqp/client.rb new file mode 100644 index 0000000..804fb99 --- /dev/null +++ b/lib/amqp/failover/ext/amqp/client.rb @@ -0,0 +1,67 @@ +# encoding: utf-8 + +AMQP.client = AMQP::FailoverClient + +module AMQP + module Client + + class << self + + # Connect with Failover supports specifying multiple AMQP servers and configurations. + # + # Argument Examples: + # - "amqp://guest:guest@host:5672,amqp://guest:guest@host:5673" + # - ["amqp://guest:guest@host:5672", "amqp://guest:guest@host:5673"] + # - [{:host => "host", :port => 5672}, {:host => "host", :port => 5673}] + # - {:hosts => ["amqp://user:pass@host:5672", "amqp://user:pass@host:5673"]} + # - {:hosts => [{:host => "host", :port => 5672}, {:host => "host", :port => 5673}]} + # + # The last two examples are by far the most flexible, cause they also let you specify + # failover and fallback specific options. Like so: + # - {:hosts => ["amqp://localhost:5672"], :fallback => false} + # + # Available failover options are: + # - :retry_timeout, time to wait before retrying a specific AMQP config after failure. + # - :primary_config, specify which of the supplied configurations is it the primary one. The default + # value is 0, the first item in the config array. Use 1 for the second and so on. + # - :fallback, check for the return of the primary server, and fallback to it if and when it returns. + # - :fallback_interval, seconds between each check for original server if :fallback is true. + # - :selection, not yet implimented. + # + def connect_with_failover(opts = nil) + opts = parse_amqp_url_or_opts(opts) + connect_without_failover(opts) + end + alias :connect_without_failover :connect + alias :connect :connect_with_failover + + def parse_amqp_url_or_opts(opts = nil) + if opts.is_a?(String) && opts.index(',').nil? + opts = init_failover(opts.split(',')) + elsif opts.is_a?(Array) + opts = init_failover(opts) + elsif opts.is_a?(Hash) && opts[:hosts].is_a?(Array) + confs = opts.delete(:hosts) + opts = init_failover(confs, opts) + end + opts + end + + def init_failover(confs = nil, opts = {}) + if !confs.nil? && confs.size > 0 + failover = Failover.new(confs, opts) + failover.primary.merge({ :failover => failover }) + end + end + + end # << self + + def disconnected_with_failover + return failover_switch if @failover + disconnected_without_failover + end + alias :disconnected_without_failover :disconnected + alias :disconnected :disconnected_with_failover + + end # Client +end # AMQP \ No newline at end of file diff --git a/lib/amqp/failover/logger.rb b/lib/amqp/failover/logger.rb new file mode 100644 index 0000000..dc4efe7 --- /dev/null +++ b/lib/amqp/failover/logger.rb @@ -0,0 +1,31 @@ +# encoding: utf-8 + +module AMQP + class Failover + class Logger + + attr_accessor :enabled + + def initialize(enabled = nil) + @enabled = enabled || true + end + + def error(*msg) + msg[0] = "[ERROR]: " + msg[0] if msg[0].is_a?(String) + write(*msg) + end + + def info(*msg) + write(*msg) + end + + private + + def write(*msg) + return if !@enabled + puts *msg + end + + end # Logger + end # Failover +end # AMQP diff --git a/lib/amqp/failover/server_discovery.rb b/lib/amqp/failover/server_discovery.rb new file mode 100644 index 0000000..dc46bfb --- /dev/null +++ b/lib/amqp/failover/server_discovery.rb @@ -0,0 +1,45 @@ +# encoding: utf-8 + +module AMQP + class Failover + class ServerDiscovery < EM::Connection + + class << self + attr_accessor :connection + end + + def self.monitor(conf = {}, retry_interval = nil, &block) + if EM.reactor_running? + start_monitoring(conf, retry_interval, &block) + else + EM.run { start_monitoring(conf, retry_interval, &block) } + end + end + + def initialize(args) + @done = args[:done] + @timer = args[:timer] + end + + def connection_completed + @done.call + @timer.cancel + close_connection + end + + def self.start_monitoring(conf = {}, retry_interval = nil, &block) + conf = conf.clone + retry_interval ||= 5 + conf[:done] = block + conf[:timer] = EM::PeriodicTimer.new(retry_interval) do + @connection = connect(conf) + end + end + + def self.connect(conf) + EM.connect(conf[:host], conf[:port], self, conf) + end + + end # ServerDiscovery + end # Failover +end # AMQP diff --git a/lib/amqp/failover/version.rb b/lib/amqp/failover/version.rb new file mode 100644 index 0000000..32e75c8 --- /dev/null +++ b/lib/amqp/failover/version.rb @@ -0,0 +1,7 @@ +# encoding: utf-8 + +module AMQP + class Failover + VERSION = "0.0.1" + end +end diff --git a/lib/amqp/failover_client.rb b/lib/amqp/failover_client.rb new file mode 100644 index 0000000..c08b371 --- /dev/null +++ b/lib/amqp/failover_client.rb @@ -0,0 +1,83 @@ +# encoding: utf-8 + +module AMQP + module FailoverClient + include AMQP::BasicClient + + attr_accessor :failover + attr_reader :fallback_monitor + + attr_accessor :settings + attr_accessor :on_disconnect + + def self.extended(base) + if (base.failover = base.settings.delete(:failover)) + base.on_disconnect = base.method(:disconnected) + end + end + + def failover_switch + if (new_settings = @failover.from(@settings)) + log_message = "Could not connect to or lost connection to server #{@settings[:host]}:#{@settings[:port]}. " + + "Attempting connection to: #{new_settings[:host]}:#{new_settings[:port]}" + logger.error(log_message) + logger.info(log_message) + + if @failover.options[:fallback] && @failover.primary == @settings + fallback(@failover.primary, @failover.fallback_interval) + end + @settings = new_settings + reconnect + else + raise Error, "Could not connect to server #{@settings[:host]}:#{@settings[:port]}" + end + end + + def logger + Failover.logger + end + + def configs + @failover.configs if @failover + end + + def clean_exit(msg = nil) + msg ||= "clean exit" + logger.info(msg) + logger.error(msg) + Process.exit + end + + def fallback(conf = {}, retry_interval = nil) + @fallback_monitor = Failover::ServerDiscovery.monitor(conf, retry_interval) do + fallback_callback.call(conf, retry_interval) + end + end + + def fallback_callback + #TODO: Figure out a way to artificially trigger EM to disconnect on fallback without channels being closed. + @fallback_callback ||= proc { |conf, retry_interval| + clean_exit("Primary server (#{conf[:host]}:#{conf[:port]}) is back. " + + "Performing clean exit to be relaunched with primary config.") + } + end + attr_writer :fallback_callback + + #TODO: Figure out why I originally needed this + # def process_frame(frame) + # if mq = channels[frame.channel] + # mq.process_frame(frame) + # return + # end + # + # if frame.is_a?(AMQP::Frame::Method) && (method = frame.payload).is_a?(AMQP::Protocol::Connection::Close) + # if method.reply_text =~ /^NOT_ALLOWED/ + # raise AMQP::Error, "#{method.reply_text} in #{::AMQP::Protocol.classes[method.class_id].methods[method.method_id]}" + # end + # end + # super(frame) + # end + + end # FailoverClient +end # AMQP + diff --git a/spec/integration/basic_spec.rb b/spec/integration/basic_spec.rb new file mode 100644 index 0000000..1a79c43 --- /dev/null +++ b/spec/integration/basic_spec.rb @@ -0,0 +1,59 @@ +# encoding: utf-8 +$LOAD_PATH.unshift File.expand_path(File.dirname(__FILE__)) + +require 'spec_helper' +require 'mq' +require 'amqp' +require 'amqp/server' +require 'server_helper' + +describe "Basic AMQP connection with FailoverClient loaded" do + + after(:each) do + ServerHelper.clear_logs + end + + it "should be using FailoverClient" do + AMQP.client.should == AMQP::FailoverClient + end + + it "should be able to connect" do + EM.run { + port = 15672 + timeout = 2 + serv = start_server(port) + EM.add_timer(1.5) { + conn = AMQP.connect(:host => 'localhost', :port => 15672) + EM.add_timer(0.1) { + conn.should be_connected + serv.stop + log = serv.log + log.size.should == 3 + (0..2).each { |i| log[i]['method'].should == "send" } + log[0]['class'].should == 'AMQP::Protocol::Connection::Start' + log[1]['class'].should == 'AMQP::Protocol::Connection::Tune' + log[2]['class'].should == 'AMQP::Protocol::Connection::OpenOk' + EM.stop + } + } + } + end + + it "should be able to connect and get disconnected" do + EM.run { + serv = start_server(25672) + EM.add_timer(0.1) { + conn = AMQP.connect(:host => 'localhost', :port => 25672) + EM.add_timer(0.1) { + conn.should be_connected + serv.stop + EM.add_timer(0.1) { + conn.should_not be_connected + EM.stop + } + } + } + } + end + +end diff --git a/spec/integration/failover_spec.rb b/spec/integration/failover_spec.rb new file mode 100644 index 0000000..4ccb8dd --- /dev/null +++ b/spec/integration/failover_spec.rb @@ -0,0 +1,141 @@ +# encoding: utf-8 +$LOAD_PATH.unshift File.expand_path(File.dirname(__FILE__)) + +require 'spec_helper' +require 'amqp/server' +require 'server_helper' +require 'logger_helper' + +describe "Failover support loaded into AMQP gem" do + + before(:each) do + @flog = LoggerHelper.new + AMQP::Failover.logger = @flog + end + + after(:each) do + ServerHelper.clear_logs + AMQP::Failover.logger = nil + end + + it "should be able to connect" do + port1 = 15672 + EM.run { + serv = start_server(port1) + EM.add_timer(0.1) { + conn = AMQP.connect(:host => 'localhost', :port => port1) + conn.failover.should be_nil + EM.add_timer(0.1) { + conn.should be_connected + EM.stop + } + } + } + end + + it "should be able to connect and failover" do + port1 = 25672 + port2 = 35672 + EM.run { + # start mock amqp servers + serv1 = start_server(port1) + serv2 = start_server(port2) + EM.add_timer(0.1) { + # start amqp client connection and make sure it's picked the right config + conn = AMQP.connect({:hosts => [{:port => port1}, {:port => port2}]}) + conn.failover.primary[:port].should == port1 + conn.settings[:port].should == port1 + conn.settings.should == conn.failover.primary + EM.add_timer(0.1) { + # make sure client connected to the correct server, then kill server + conn.should be_connected + serv1.log.should have(3).items + serv2.log.should have(0).items + serv1.stop + EM.add_timer(0.1) { + # make sure client performed a failover when primary server died + conn.should be_connected + [:error, :info].each do |i| + @flog.send("#{i}_log").should have(1).item + @flog.send("#{i}_log")[0][0].should match(/connect to or lost connection.+#{port1}.+attempting connection.+#{port2}/i) + end + conn.settings[:port].should == port2 + serv1.log.should have(3).items + serv2.log.should have(3).items + conn.close + EM.add_timer(0.1) { + serv2.stop + EM.stop + } + } + } + } + } + end + + it "should be able to fallback when primary server returns" do + port1 = 45672 + port2 = 55672 + lambda { + EM.run { + # start mock amqp servers + serv1 = start_server(port1) + serv2 = start_server(port2) + EM.add_timer(0.1) { + # start amqp client connection and make sure it's picked the right config + conn = AMQP.connect({:hosts => [{:port => port1}, {:port => port2}], :fallback => true, :fallback_interval => 0.1}) + conn.failover.primary[:port].should == port1 + conn.settings[:port].should == port1 + conn.settings.should == conn.failover.primary + EM.add_timer(0.1) { + # make sure client connected to the correct server, then kill server + conn.should be_connected + serv1.log.should have(3).items + serv2.log.should have(0).items + serv1.stop + EM.add_timer(0.1) { + # make sure client performed a failover when primary server died + conn.should be_connected + [:error, :info].each do |i| + @flog.send("#{i}_log").should have(1).item + @flog.send("#{i}_log")[0][0].should match(/connect to or lost connection.+#{port1}.+attempting connection.+#{port2}/i) + end + conn.settings[:port].should == port2 + serv1.log.should have(3).items + serv2.log.should have(3).items + serv3 = start_server(port1) + EM.add_timer(0.2) { + # by this point client should have raised a SystemExit exception + serv2.stop + EM.stop + } + } + } + } + } + }.should raise_error(SystemExit, "exit") + [:error, :info].each do |i| + @flog.send("#{i}_log").should have(2).item + @flog.send("#{i}_log")[1][0].should match(/primary server.+45672.+performing clean exit/i) + end + end + + it "should abide to :primary_config option" do + port1 = 75672 + port2 = 65672 + EM.run { + serv = start_server(port1) + EM.add_timer(0.1) { + conn = AMQP.connect({:hosts => [{:port => port1}, {:port => port2}], :primary_config => 1}) + conn.failover.primary[:port].should == port2 + conn.settings[:port].should == port2 + conn.settings.should == conn.failover.primary + EM.add_timer(0.1) { + conn.should be_connected + EM.stop + } + } + } + end + +end diff --git a/spec/logger_helper.rb b/spec/logger_helper.rb new file mode 100644 index 0000000..48022b9 --- /dev/null +++ b/spec/logger_helper.rb @@ -0,0 +1,18 @@ +# encoding: utf-8 + +class LoggerHelper + + attr_accessor :error_log + attr_accessor :info_log + + def info(*args) + @info_log ||= [] + @info_log << args + end + + def error(*args) + @error_log ||= [] + @error_log << args + end + +end diff --git a/spec/server_helper.rb b/spec/server_helper.rb new file mode 100644 index 0000000..153f059 --- /dev/null +++ b/spec/server_helper.rb @@ -0,0 +1,77 @@ +# encoding: utf-8 + +require 'rubygems' +require 'mq' +require 'amqp' +require 'amqp/server' +require 'json' + +class ServerHelper + + attr_accessor :stdin + attr_accessor :stdout + attr_accessor :stderr + attr_accessor :pid + + + def initialize(port = nil, timeout = nil) + @port = port + @timout = timeout + File.open(log_file, 'w') {} + @pid = start(port, timeout) + end + + def self.clear_logs + Dir.glob(File.expand_path('server_helper*.log', File.dirname(__FILE__))).each do |file| + File.delete(file) + end + end + + def start(port = nil, timeout = nil) + port ||= 15672 + timeout ||= 2 + EM.fork_reactor { + $PORT = port + EM.start_server('localhost', port, AmqpServer) + EM.add_timer(timeout) { EM.stop } + } + end + + def stop + Process.kill('TERM', @pid) + end + + def kill + Process.kill('KILL', @pid) + end + + def log + File.open(log_file).to_a.map{ |l| JSON.parse(l) } + end + + def log_file + File.expand_path("server_helper-port#{@port}.log", File.dirname(__FILE__)) + end + +end + +module AmqpServer + include AMQP::Server + + # customize log output + def log(*args) + args = {:method => args[0], :class => args[1].payload.class, :pid => Process.pid} + filename = File.expand_path("server_helper-port#{$PORT}.log", File.dirname(__FILE__)) + File.open(filename, 'a') do |f| + f.write("#{args.to_json}\n") + end + end +end + +# +# Helper methods +# + +def start_server(port = nil, timeout = nil) + ServerHelper.new(port, timeout) +end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb new file mode 100644 index 0000000..e1cc05d --- /dev/null +++ b/spec/spec_helper.rb @@ -0,0 +1,49 @@ +# encoding: utf-8 + +# add project-relative load paths +$LOAD_PATH.unshift File.expand_path(File.join(File.dirname(__FILE__), '..', 'lib')) +$LOAD_PATH.unshift File.expand_path(File.dirname(__FILE__)) + +# require stuff +require 'rubygems' + +begin + require 'mq' +rescue LoadError => e + require 'amqp' +end +require 'amqp/failover' + +require 'rspec' +require 'rspec/autorun' + + +# +# Helper methods +# + +def wait_while(timeout = 10, retry_interval = 0.1, &block) + start = Time.now + while block.call + break if (Time.now - start).to_i >= timeout + sleep(retry_interval) + end +end + +# stolen from Pid::running? from daemons gem +def pid_running?(pid) + return false unless pid + + # Check if process is in existence + # The simplest way to do this is to send signal '0' + # (which is a single system call) that doesn't actually + # send a signal + begin + Process.kill(0, pid) + return true + rescue Errno::ESRCH + return false + rescue ::Exception # for example on EPERM (process exists but does not belong to us) + return true + end +end diff --git a/spec/unit/amqp/failover/config_spec.rb b/spec/unit/amqp/failover/config_spec.rb new file mode 100644 index 0000000..179a1e9 --- /dev/null +++ b/spec/unit/amqp/failover/config_spec.rb @@ -0,0 +1,67 @@ +# encoding: utf-8 +$LOAD_PATH.unshift File.expand_path(File.dirname(__FILE__)) + +require 'spec_helper' + +describe 'AMQP::Failover::Config' do + + before(:each) do + configs = [ + {:host => 'rabbit0.local'}, + {:host => 'rabbit1.local'}, + {:host => 'rabbit2.local', :port => 5673} + ] + @configs = configs.map { |conf| AMQP.settings.merge(conf) } + @fail = AMQP::Failover.new(@configs) + end + + it "should initialize" do + fail = AMQP::Failover::Config.new(@configs[0]) + fail.should == @configs[0] + fail.last_fail.should be_nil + + now = Time.now + fail = AMQP::Failover::Config.new(@configs[1], now) + fail.should == @configs[1] + fail.last_fail.should == now + end + + it "should order properly with #<=>" do + one_hour_ago = (Time.now - 3600) + two_hours_ago = (Time.now - 7200) + + fail = [ AMQP::Failover::Config.new(@configs[0]), + AMQP::Failover::Config.new(@configs[1], one_hour_ago), + AMQP::Failover::Config.new(@configs[2], two_hours_ago) ] + + (fail[1] <=> fail[0]).should == -1 + (fail[0] <=> fail[0]).should == 0 + (fail[0] <=> fail[1]).should == 1 + + (fail[1] <=> fail[2]).should == -1 + (fail[1] <=> fail[1]).should == 0 + (fail[2] <=> fail[1]).should == 1 + + fail.sort[0].last_fail.should == one_hour_ago + fail.sort[1].last_fail.should == two_hours_ago + fail.sort[2].last_fail.should == nil + end + + it "should be ordered by last_fail" do + result = [ AMQP::Failover::Config.new(@configs[1], (Time.now - 60)), + AMQP::Failover::Config.new(@configs[2], (Time.now - (60*25))), + AMQP::Failover::Config.new(@configs[0], (Time.now - 3600)) ] + + origin = [ AMQP::Failover::Config.new(@configs[0], (Time.now - 3600)), + AMQP::Failover::Config.new(@configs[1], (Time.now - 60)), + AMQP::Failover::Config.new(@configs[2], (Time.now - (60*25))) ] + origin.sort.should == result + + origin = [ AMQP::Failover::Config.new(@configs[0]), + AMQP::Failover::Config.new(@configs[1], (Time.now - 60)), + AMQP::Failover::Config.new(@configs[2], (Time.now - (60*25))) ] + origin.sort.should == result + end + +end + diff --git a/spec/unit/amqp/failover/configurations_spec.rb b/spec/unit/amqp/failover/configurations_spec.rb new file mode 100644 index 0000000..2808815 --- /dev/null +++ b/spec/unit/amqp/failover/configurations_spec.rb @@ -0,0 +1,79 @@ +# encoding: utf-8 +$LOAD_PATH.unshift File.expand_path(File.dirname(__FILE__)) + +require 'spec_helper' + +describe 'AMQP::Failover::Configurations' do + + before(:each) do + @conf = AMQP::Failover::Configurations.new + @raw_configs = [ + {:host => 'rabbit0.local'}, + {:host => 'rabbit1.local'}, + {:host => 'rabbit2.local', :port => 5673} + ] + @configs = @raw_configs.map { |conf| AMQP.settings.merge(conf) } + end + + it "should initialize" do + confs = AMQP::Failover::Configurations.new(@raw_configs) + confs.each_with_index do |conf, i| + conf.should be_a(AMQP::Failover::Config) + conf.should == @configs[i] + end + end + + it "should set and get configs" do + @conf.primary_ref.should == 0 + @conf.should have(0).items + + @conf.set(@raw_configs[0]) + @conf.should have(1).items + @conf.get(0).should == @configs[0] + @conf[0].should == @configs[0] + + @conf.set(@raw_configs[1]) + @conf.should have(2).items + @conf.get(1).should == @configs[1] + @conf[1].should == @configs[1] + + # should just create a ref, as config exists + @conf.set(@raw_configs[1], :the_one) + @conf.should have(2).items + @conf.get(1).should == @configs[1] + @conf[:the_one].should == @configs[1] + + @conf.load_array(@raw_configs) + @conf.should have(3).items + @conf.primary.should == @configs[0] + @conf.primary_ref = 1 + @conf.primary.should == @configs[1] + @conf[:primary].should == @configs[1] + end + + it "should #find_next" do + @conf.load(@raw_configs) + @conf.should have(3).items + @conf.find_next(@configs[0]).should == @configs[1] + @conf.find_next(@configs[1]).should == @configs[2] + @conf.find_next(@configs[2]).should == @configs[0] + end + + it "should #load_hash" do + @conf.should have(0).items + @conf.load_hash(@raw_configs[0]) + @conf.should have(1).items + @conf.primary.should == @configs[0] + end + + it "should #load_array" do + @conf.load_hash(:host => 'rabbid-rabbit') + @conf.should have(1).items + @conf.load_array(@raw_configs) + @conf.should have(3).items + @conf.should == @configs + @conf.primary.should == @configs[0] + end + +end + diff --git a/spec/unit/amqp/failover/server_discovery_helper.rb b/spec/unit/amqp/failover/server_discovery_helper.rb new file mode 100644 index 0000000..05735c3 --- /dev/null +++ b/spec/unit/amqp/failover/server_discovery_helper.rb @@ -0,0 +1,31 @@ +class ServerDiscoveryHelper < AMQP::Failover::ServerDiscovery + + class << self + alias :real_start_monitoring :start_monitoring + def start_monitoring(*args, &block) + $called << :start_monitoring + real_start_monitoring(*args, &block) + end + end + + alias :real_initialize :initialize + def initialize(*args) + $called << :initialize + EM.start_server('127.0.0.1', 9999) if $start_count == 2 + $start_count += 1 + real_initialize(*args) + end + + alias :real_connection_completed :connection_completed + def connection_completed + $called << :connection_completed + real_connection_completed + end + + alias :real_close_connection :close_connection + def close_connection + $called << :close_connection + real_close_connection + end + +end \ No newline at end of file diff --git a/spec/unit/amqp/failover/server_discovery_spec.rb b/spec/unit/amqp/failover/server_discovery_spec.rb new file mode 100644 index 0000000..68afdd8 --- /dev/null +++ b/spec/unit/amqp/failover/server_discovery_spec.rb @@ -0,0 +1,56 @@ +# encoding: utf-8 +$LOAD_PATH.unshift File.expand_path(File.dirname(__FILE__)) + +require 'spec_helper' +require 'server_discovery_helper' + +describe 'AMQP::Failover::ServerDiscovery' do + + before(:each) do + $called = [] + $start_count = 0 + @args = { :host => 'localhost', :port => 9999 } + @retry_interval = 0.01 + end + + after(:all) do + $called = nil + $start_count = nil + end + + it "should initialize" do + EM.run { + EM.start_server('127.0.0.1', 9999) + @mon = ServerDiscoveryHelper.monitor(@args, @retry_interval) do + $called << :done_block + EM.stop_event_loop + end + } + $start_count.should == 1 + $called.should have(5).items + $called.uniq.should have(5).items + $called.should include(:start_monitoring) + $called.should include(:initialize) + $called.should include(:connection_completed) + $called.should include(:close_connection) + $called.should include(:done_block) + end + + it "should retry on error" do + EM.run { + @mon = ServerDiscoveryHelper.monitor(@args, @retry_interval) do + $called << :done_block + EM.stop_event_loop + end + } + $start_count.should >= 3 + $called.should have($start_count + 4).items + $called.uniq.should have(5).items + $called.should include(:start_monitoring) + $called.should include(:initialize) + $called.should include(:connection_completed) + $called.should include(:close_connection) + $called.should include(:done_block) + end + +end diff --git a/spec/unit/amqp/failover_spec.rb b/spec/unit/amqp/failover_spec.rb new file mode 100644 index 0000000..b088b72 --- /dev/null +++ b/spec/unit/amqp/failover_spec.rb @@ -0,0 +1,69 @@ +# encoding: utf-8 +$LOAD_PATH.unshift File.expand_path(File.dirname(__FILE__)) + +require 'spec_helper' + +describe 'AMQP::Failover' do + + before(:each) do + configs = [ + {:host => 'rabbit0.local'}, + {:host => 'rabbit1.local'}, + {:host => 'rabbit2.local', :port => 5673} + ] + @configs = configs.map { |conf| AMQP.settings.merge(conf) } + @fail = AMQP::Failover.new(@configs) + end + + it "should initialize" do + @fail.configs.should == @configs + end + + it "should #add_config" do + @fail.instance_variable_set("@configs", nil) + @fail.configs.should == [] + @fail.add_config(@configs[0]) + @fail.configs.should have(1).item + @fail.configs.should == [@configs[0]] + @fail.refs.should == {} + @fail.add_config(@configs[1], :hello) + @fail.configs.should have(2).items + @fail.configs.should include(@configs[1]) + @fail.get_by_ref(:hello).should == @configs[1] + end + + it "should #get_by_conf" do + fetched = @fail.get_by_conf(@configs[1]) + fetched.should == @configs[1] + fetched.class.should == AMQP::Failover::Config + fetched.last_fail.should be_nil + end + + it "should #fail_with" do + fail = AMQP::Failover.new + now = Time.now + fail.failed_with(@configs[0], 0, now) + fail.latest_failed.should == @configs[0] + fail.last_fail_of(@configs[0]).should == now + fail.last_fail_of(0).should == now + end + + it "should find #next_config" do + @fail.failed_with(@configs[1]) + @fail.next_config.should == @configs[2] + @fail.next_config.should == @configs[2] + @fail.failed_with(@configs[2]) + @fail.next_config.should == @configs[0] + @fail.failed_with(@configs[0]) + @fail.next_config.should be_nil + end + + it "should #failover_from" do + now = Time.now + @fail.failover_from(@configs[0], now).should == @configs[1] + @fail.latest_failed.should == @configs[0] + @fail.latest_failed.last_fail.should == now + end + +end +