From b3fc9826b0e9d15f7eab6a80fb16c5619547597e Mon Sep 17 00:00:00 2001 From: Jim Myhrberg Date: Wed, 26 Jan 2011 15:29:46 +0000 Subject: [PATCH] initial import of failover code --- .gitignore | 24 +++++ .rspec | 2 + .rvmrc | 1 + Rakefile | 49 ++++++++++ amqp-failover.gemspec | 22 +++-- lib/amqp-failover.rb | 5 - lib/amqp/failover.rb | 18 ++++ lib/amqp/failover/basic_client.rb | 55 +++++++++++ .../failover/basic_client/on_disconnect.rb | 46 +++++++++ lib/amqp/failover/config.rb | 94 +++++++++++++++++++ lib/amqp/failover/fallback.rb | 44 +++++++++ lib/amqp/failover/logger.rb | 31 ++++++ lib/amqp/failover/logic.rb | 86 +++++++++++++++++ lib/amqp/failover/logic/failed_config.rb | 33 +++++++ .../failover}/version.rb | 4 +- spec/spec_helper.rb | 9 ++ spec/unit/amqp/failover/config_spec.rb | 59 ++++++++++++ 17 files changed, 569 insertions(+), 13 deletions(-) create mode 100644 .rspec create mode 100644 .rvmrc delete mode 100644 lib/amqp-failover.rb create mode 100644 lib/amqp/failover.rb create mode 100644 lib/amqp/failover/basic_client.rb create mode 100644 lib/amqp/failover/basic_client/on_disconnect.rb create mode 100644 lib/amqp/failover/config.rb create mode 100644 lib/amqp/failover/fallback.rb create mode 100644 lib/amqp/failover/logger.rb create mode 100644 lib/amqp/failover/logic.rb create mode 100644 lib/amqp/failover/logic/failed_config.rb rename lib/{amqp-failover => amqp/failover}/version.rb (61%) create mode 100644 spec/spec_helper.rb create mode 100644 spec/unit/amqp/failover/config_spec.rb diff --git a/.gitignore b/.gitignore index 4040c6c..1724f70 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,28 @@ +## MAC OS +.DS_Store + +## TEXTMATE +*.tmproj +tmtags + +## EMACS +*~ +\#* +.\#* + +## VIM +*.swp + +## PROJECT::GENERAL +coverage +rdoc *.gem .bundle Gemfile.lock pkg/* + +## PROJECT::SPECIFIC +.yardoc/* +spec/db/* +doc/* + diff --git a/.rspec b/.rspec new file mode 100644 index 0000000..8c18f1a --- /dev/null +++ b/.rspec @@ -0,0 +1,2 @@ +--format documentation +--color diff --git a/.rvmrc b/.rvmrc new file mode 100644 index 0000000..f1c0b0c --- /dev/null +++ b/.rvmrc @@ -0,0 +1 @@ +rvm gemset use amqp-failover diff --git a/Rakefile b/Rakefile index 14cfe0b..9eaaf97 100644 --- a/Rakefile +++ b/Rakefile @@ -1,2 +1,51 @@ +$LOAD_PATH.unshift File.expand_path("lib", File.dirname(__FILE__)) + require 'bundler' Bundler::GemHelper.install_tasks + + +# +# Rspec +# + +require 'rspec/core/rake_task' +desc "Run all specs" +task :spec => ["spec:unit", "spec:integration"] + +RSpec::Core::RakeTask.new('spec:unit') do |spec| + spec.pattern = 'spec/unit/**/*_spec.rb' +end + +RSpec::Core::RakeTask.new('spec:integration') do |spec| + spec.pattern = 'spec/integration/**/*_spec.rb' +end + +RSpec::Core::RakeTask.new(:rcov) do |spec| + spec.pattern = 'spec/**/*_spec.rb' + spec.rcov = true +end + + +# +# Yard +# + +begin + require 'yard' + YARD::Rake::YardocTask.new +rescue LoadError + task :yardoc do + abort "YARD is not available. In order to run yardoc, you must: sudo gem install yard" + end +end + + +# +# Misc. +# + +desc "Start irb with amqp-failover pre-loaded" +task :console do + exec "irb -r spec/spec_helper" +end +task :c => :console diff --git a/amqp-failover.gemspec b/amqp-failover.gemspec index 5709d93..8814860 100644 --- a/amqp-failover.gemspec +++ b/amqp-failover.gemspec @@ -1,16 +1,16 @@ # -*- encoding: utf-8 -*- $:.push File.expand_path("../lib", __FILE__) -require "amqp-failover/version" +require "amqp/failover/version" Gem::Specification.new do |s| s.name = "amqp-failover" - s.version = Amqp::Failover::VERSION + s.version = AMQP::Failover::VERSION s.platform = Gem::Platform::RUBY - s.authors = ["TODO: Write your name"] - s.email = ["TODO: Write your email address"] - s.homepage = "" - s.summary = %q{TODO: Write a gem summary} - s.description = %q{TODO: Write a gem description} + s.authors = ["Jim Myhrberg"] + s.email = ["contact@jimeh.me"] + s.homepage = 'http://github.com/jimeh/amqp-failover' + s.summary = 'Add multi-server failover and fallback to amqp gem.' + s.description = 'Add multi-server failover and fallback to amqp gem.' s.rubyforge_project = "amqp-failover" @@ -18,4 +18,12 @@ Gem::Specification.new do |s| s.test_files = `git ls-files -- {test,spec,features}/*`.split("\n") s.executables = `git ls-files -- bin/*`.split("\n").map{ |f| File.basename(f) } s.require_paths = ["lib"] + + s.add_runtime_dependency 'amqp', '>= 0.7.0' + + s.add_development_dependency 'rake', '>= 0.8.7' + s.add_development_dependency 'rack-test', '>= 0.5.6' + s.add_development_dependency 'rspec', '>= 2.1.0' + s.add_development_dependency 'yard', '>= 0.6.3' + s.add_development_dependency 'ruby-debug' end diff --git a/lib/amqp-failover.rb b/lib/amqp-failover.rb deleted file mode 100644 index 9f348e0..0000000 --- a/lib/amqp-failover.rb +++ /dev/null @@ -1,5 +0,0 @@ -module Amqp - module Failover - # Your code goes here... - end -end diff --git a/lib/amqp/failover.rb b/lib/amqp/failover.rb new file mode 100644 index 0000000..3457ad4 --- /dev/null +++ b/lib/amqp/failover.rb @@ -0,0 +1,18 @@ +# encoding: utf-8 + +require 'amqp' +require 'yaml' + +require 'amqp/failover/basic_client' +require 'amqp/failover/config' +require 'amqp/failover/fallback' +require 'amqp/failover/logger' +require 'amqp/failover/logic' +require 'amqp/failover/logic/failed_config' +require 'amqp/failover/version' + +module AMQP + module Failover + + end +end diff --git a/lib/amqp/failover/basic_client.rb b/lib/amqp/failover/basic_client.rb new file mode 100644 index 0000000..c558829 --- /dev/null +++ b/lib/amqp/failover/basic_client.rb @@ -0,0 +1,55 @@ +# encoding: utf-8 + +module AMQP + module Failover + module BasicClient + include AMQP::BasicClient + + class Error < Exception; end + + attr_accessor :on_disconnect + attr_accessor :settings + + def self.extended(base) + base.on_disconnect = proc { + OnDisconnect.new(base).call + } + end + + def logger + @logger ||= Logger.new + end + + def failover_conf + @failover_conf ||= Config.new + end + + def configs + failover_conf.configs + end + + def clean_exit(msg = nil) + msg ||= "clean exit" + logger.info(msg) + logger.error(msg) + Process.exit + end + + def process_frame(frame) + if mq = channels[frame.channel] + mq.process_frame(frame) + return + end + + if frame.is_a?(::AMQP::Frame::Method) && (method = frame.payload).is_a?(::AMQP::Protocol::Connection::Close) + if method.reply_text =~ /^NOT_ALLOWED/ + raise ::AMQP::Error, "#{method.reply_text} in #{::AMQP::Protocol.classes[method.class_id].methods[method.method_id]}" + end + end + super(frame) + end + + end # BasicClient + end # Failover +end # AMQP + diff --git a/lib/amqp/failover/basic_client/on_disconnect.rb b/lib/amqp/failover/basic_client/on_disconnect.rb new file mode 100644 index 0000000..ebf3927 --- /dev/null +++ b/lib/amqp/failover/basic_client/on_disconnect.rb @@ -0,0 +1,46 @@ +# encoding: utf-8 + +module AMQP + module Failover + module BasicClient + class OnDisconnect + + attr_accessor :base + attr_accessor :failover + attr_accessor :configs + attr_accessor :logger + + def initialize(base) + @base = base + @configs = @base.configs + @failover_conf = @base.failover_conf + @logger = @base.logger + end + + def call + @logic ||= Logic.new(@configs.configs, @failover_conf.get_primary, @failover_conf.failover_config) + if (new_settings = @logic.failover_from(@base.settings)) + log_message = "Could not connect to or lost connection to server #{@base.settings[:host]}:#{@base.settings[:port]}. " + + "Attempting connection to: #{new_settings[:host]}:#{new_settings[:port]}" + @logger.error(log_message) + @logger.info(log_message) + + if @failover_conf.get_primary == @base.settings + FallbackMonitor.monitor(@failover_conf.get_primary) do + @base.clean_exit("Primary server (#{@failover_conf.get_primary[:host]}:#{@failover_conf.get_primary[:port]}) is back. " + + "Performing clean exit to be relaunched with primary config.") + end + end + + @base.settings = new_settings + @base.reconnect + else + raise Error, "Could not connect to server #{@base.settings[:host]}:#{@base.settings[:port]}" + end + end + + end # OnDisconnect + end # BasicClient + end # Failover +end # AMQP + diff --git a/lib/amqp/failover/config.rb b/lib/amqp/failover/config.rb new file mode 100644 index 0000000..6e218d6 --- /dev/null +++ b/lib/amqp/failover/config.rb @@ -0,0 +1,94 @@ +# encoding: utf-8 + +module AMQP + module Failover + class Config + + attr_accessor :configs + attr_accessor :failover_config + + def failover_config + @failover_config ||= { :retry_timeout => 30 } + end + + def refs + @refs ||= {} + end + + def configs + @configs ||= [] + end + + def primary + @primary ||= 0 + end + + def primary=(ref) + @primary = ref + end + + def get_primary + get(primary) || default_config + end + + def set_primary(conf = {}) + set(conf, primary) + end + + def get(ref = nil) + return configs[ref] if ref.is_a?(Fixnum) + configs[refs[ref]] if refs[ref] + end + + def set(conf = {}, ref = nil) + conf = default_config.merge(conf) + configs << conf if (index = configs.index(conf)).nil? + if ref + refs[ref] = (index || configs.index(conf)) + end + end + + def find_next(conf = {}) + current = configs.index(conf) + configs[(current+1 == configs.size) ? 0 : current+1] if current + end + + def load_file(file, env = nil) + raise ArgumentError, "Can't find #{file}" unless File.exists?(file) + load(YAML.load_file(file)[env || "development"]) + end + + def load_yaml(data, env = nil) + load(YAML.load(data)[env || "development"]) + end + + def load(conf) + if conf.is_a?(::Array) + load_array(conf) + elsif conf.is_a?(::Hash) + load_hash(conf) + end + end + + def load_array(confs = []) + @configs = nil + confs.each do |conf| + load_hash(conf) + end + end + + def load_hash(conf = {}) + conf = conf.inject({}) do |result, (key, value)| + result[key.is_a?(String) ? key.to_sym : key] = value + result + end + self.set(conf) + end + + def default_config + AMQP.settings + end + + end # Config + end # Failover +end # AMQP diff --git a/lib/amqp/failover/fallback.rb b/lib/amqp/failover/fallback.rb new file mode 100644 index 0000000..4bf58a9 --- /dev/null +++ b/lib/amqp/failover/fallback.rb @@ -0,0 +1,44 @@ +# encoding: utf-8 + +module AMQP + module Failover + class Fallback < EM::Connection + + class << self + attr_accessor :connection + end + + def initialize(args) + @done = args[:done] + @timer = args[:timer] + end + + def connection_completed + @done.call + @timer.cancel + close_connection + end + + def self.monitor(conf = {}, &block) + if EM.reactor_running? + start_monitoring(conf, &block) + else + EM.run { start_monitoring(conf, &block) } + end + end + + def self.start_monitoring(conf = {}, &block) + conf = conf.clone + conf[:done] = block + conf[:timer] = EM::PeriodicTimer.new(conf[:retry_interval] || 5) do + @connection = connect(conf) + end + end + + def self.connect(conf) + EM.connect(conf[:host], conf[:port], self, conf) + end + + end # Fallback + end # Failover +end # AMQP diff --git a/lib/amqp/failover/logger.rb b/lib/amqp/failover/logger.rb new file mode 100644 index 0000000..4f8d4b0 --- /dev/null +++ b/lib/amqp/failover/logger.rb @@ -0,0 +1,31 @@ +# encoding: utf-8 + +module AMQP + module Failover + class Logger + + attr_accessor :enabled + + def initialize(enabled = nil) + @enabled = enabled || true + end + + def error(*msg) + msg[0] = "[ERROR]: " + msg[0] if msg[0].is_a?(String) + write(*msg) + end + + def info(*msg) + write(*msg) + end + + private + + def write(*msg) + return if !@enabled + puts *msg + end + + end # Logger + end # Failover +end # AMQP diff --git a/lib/amqp/failover/logic.rb b/lib/amqp/failover/logic.rb new file mode 100644 index 0000000..c2092ab --- /dev/null +++ b/lib/amqp/failover/logic.rb @@ -0,0 +1,86 @@ +# encoding: utf-8 + +module AMQP + module Failover + class Logic + + attr_reader :latest_failed + attr_accessor :primary + attr_accessor :retry_timeout + attr_accessor :fallback + + def initialize(confs = nil, primary = nil, options = {}) + @primary = primary + @retry_timeout = (options.delete(:retry_timeout) || 30) + self.configs = confs if !confs.nil? + end + + def refs + @refs ||= {} + end + + def configs + @configs ||= [] + end + + def configs=(confs = []) + @configs = [] + confs.each do |conf| + if conf.is_a?(Array) + add_config(conf[1], conf[0]) + else + add_config(conf) + end + end + end + + def add_config(conf = {}, ref = nil) + index = configs.index(conf) + configs << FailedConfig.new(conf) if index.nil? + refs[ref] = (index || configs.index(conf)) if !ref.nil? + end + + def failover_from(conf = {}, time = nil) + failed_with(conf, nil, time) + next_config + end + + def failed_with(conf = {}, ref = nil, time = nil) + time ||= Time.now + if index = configs.index(conf) + configs[index].last_fail = time + @latest_failed = configs[index] + else + configs << FailedConfig.new(conf, time) + @latest_failed = configs.last + end + refs[ref] = (index || configs.index(conf)) if !ref.nil? + end + + def next_config(retry_timeout = nil, after = nil) + return nil if configs.size <= 1 + retry_timeout ||= @retry_timeout + after ||= @latest_failed + index = configs.index(after) + available = (index > 0) ? configs[index+1..-1] + configs[0..index-1] : configs[1..-1] + available.each do |conf| + return conf if conf.last_fail.nil? || (conf.last_fail + retry_timeout.seconds) < Time.now + end + return nil + end + + def last_fail_of(match) + ((match.is_a?(Hash) ? get_by_conf(match) : get_by_ref(match)) || FailedConfig.new).last_fail + end + + def get_by_conf(conf = {}) + configs[configs.index(conf)] + end + + def get_by_ref(ref = nil) + configs[refs[ref]] if refs[ref] + end + + end # Logic + end # Failover +end # AMQP diff --git a/lib/amqp/failover/logic/failed_config.rb b/lib/amqp/failover/logic/failed_config.rb new file mode 100644 index 0000000..7c488ea --- /dev/null +++ b/lib/amqp/failover/logic/failed_config.rb @@ -0,0 +1,33 @@ +# encoding: utf-8 + +module AMQP + module Failover + class Logic + class FailedConfig < ::Hash + + attr_accessor :last_fail + + def initialize(hash = {}, last_fail_date = nil) + self.replace(hash) + self.last_fail = last_fail_date if last_fail_date + end + + # order by latest fail, potentially useful if random config selection is used + def <=>(other) + if self.respond_to?(:last_fail) && other.respond_to?(:last_fail) + if self.last_fail.nil? && other.last_fail.nil? + return 0 + elsif self.last_fail.nil? && !other.last_fail.nil? + return 1 + elsif !self.last_fail.nil? && other.last_fail.nil? + return -1 + end + return other.last_fail <=> self.last_fail + end + return 0 + end + + end # FailedConfig + end # Logic + end # Failover +end # AMQP diff --git a/lib/amqp-failover/version.rb b/lib/amqp/failover/version.rb similarity index 61% rename from lib/amqp-failover/version.rb rename to lib/amqp/failover/version.rb index 23ec700..7843f36 100644 --- a/lib/amqp-failover/version.rb +++ b/lib/amqp/failover/version.rb @@ -1,4 +1,6 @@ -module Amqp +# encoding: utf-8 + +module AMQP module Failover VERSION = "0.0.1" end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb new file mode 100644 index 0000000..8d3dae3 --- /dev/null +++ b/spec/spec_helper.rb @@ -0,0 +1,9 @@ +# add project-relative load paths +$LOAD_PATH.unshift File.dirname(__FILE__) +$LOAD_PATH.unshift File.join(File.dirname(__FILE__), '..', 'lib') + +# require stuff +require 'rubygems' +require 'amqp/failover' +require 'rspec' +require 'rspec/autorun' \ No newline at end of file diff --git a/spec/unit/amqp/failover/config_spec.rb b/spec/unit/amqp/failover/config_spec.rb new file mode 100644 index 0000000..0372306 --- /dev/null +++ b/spec/unit/amqp/failover/config_spec.rb @@ -0,0 +1,59 @@ +# encoding: utf-8 + +require 'spec_helper' + +describe AMQP::Failover::Config do + + before(:each) do + @conf = AMQP::Failover::Config.new + [:primary, :configs, :refs].each do |var| + @conf.instance_variable_set("@#{var}", nil) + end + @raw_configs = [ + {:host => 'rabbit3.local'}, + {:host => 'rabbit2.local'}, + {:host => 'rabbit2.local', :port => 5673} + ] + @configs = @raw_configs.map { |conf| @conf.default_config.merge(conf) } + end + + after(:each) do + [:primary, :configs, :refs].each do |var| + @conf.instance_variable_set("@#{var}", nil) + end + end + + it "should set and get configs" do + @conf.primary.should == 0 + @conf.configs.should have(0).items + + @conf.set(@raw_configs[0]) + @conf.configs.should have(1).items + @conf.get(0).should == @configs[0] + + @conf.set(@raw_configs[1]) + @conf.configs.should have(2).items + @conf.get(1).should == @configs[1] + + @conf.set(@raw_configs[1], :the_one) + @conf.configs.should have(2).items + @conf.get(1).should == @configs[1] + @conf.get(:the_one).should == @configs[1] + + @conf.load_array(@raw_configs) + @conf.configs.should have(3).items + @conf.get_primary.should == @configs[0] + @conf.primary = 1 + @conf.get_primary.should == @configs[1] + end + + it "should #find_next" do + @conf.load(@raw_configs) + @conf.configs.should have(3).items + @conf.find_next(@configs[0]).should == @configs[1] + @conf.find_next(@configs[1]).should == @configs[2] + @conf.find_next(@configs[2]).should == @configs[0] + end + +end +