Merge branch 'release/v0.0.1'

This commit is contained in:
2011-02-01 14:20:12 +00:00
28 changed files with 1351 additions and 17 deletions

6
.document Normal file
View File

@@ -0,0 +1,6 @@
README.md
lib/**/*.rb
bin/*
features/**/*.feature
LICENSE

24
.gitignore vendored
View File

@@ -1,4 +1,28 @@
## MAC OS
.DS_Store
## TEXTMATE
*.tmproj
tmtags
## EMACS
*~
\#*
.\#*
## VIM
*.swp
## PROJECT::GENERAL
coverage
rdoc
*.gem
.bundle
Gemfile.lock
pkg/*
## PROJECT::SPECIFIC
.yardoc/*
spec/db/*
doc/*

2
.rspec Normal file
View File

@@ -0,0 +1,2 @@
--format documentation
--color

1
.rvmrc Normal file
View File

@@ -0,0 +1 @@
rvm gemset use amqp-failover

20
LICENSE Normal file
View File

@@ -0,0 +1,20 @@
Copyright (c) 2011 Jim Myhrberg & Global Personals, Ltd.
Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
"Software"), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to
the following conditions:
The above copyright notice and this permission notice shall be
included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

104
README.md Normal file
View File

@@ -0,0 +1,104 @@
# amqp-failover #
Add multi-server support with failover and fallback to the [amqp](https://github.com/ruby-amqp/amqp) gem. Failover is configured by providing multiple servers/configurations to `AMQP.start` or `AMQP.connect`. Both methods will still accept the same options input as they always have, they simply now support additional forms of options which when used, enables the failover features.
## Basic Usage ##
require 'mq'
require 'amqp/failover'
opts = [{:port => 5672}, {:port => 5673}]
AMQP.start(opts) do
# code...
end
By default the client will connect to `localhost:5672`, but if for any reason it can't connect, or looses connection to that server, it'll attempt to connect to `localhost:5673` instead.
## Options Formats ##
### Standard Non-Failover ###
Hash:
opts = {:host => "hostname", :port => 5673}
URL:
opts = "amqp://user:pass@hostname:5673/"
### With Failover ###
URLs
opts = "amqp://localhost:5672/,amqp://localhost:5673/"
Array of Hashes:
opts = [{:port => 5672}, {:port => 5673}]
Array of URLs:
opts = ["amqp://localhost:5672/", "amqp://localhost:5673/"]
Specify AMQP servers and Failover options by passing a Hash containing a `:hosts` key with a value of either of the above three examples:
opts = {:hosts => "amqp://localhost:5672/,amqp://localhost:5673/", :fallback => true}
opts = {:hosts => [{:port => 5672}, {:port => 5673}], :fallback => true}
opts = {:hosts => ["amqp://localhost:5672/", "amqp://localhost:5673/"], :fallback => true}
## Failover Options ##
* `:retry_timeout`, time to wait before retrying a specific AMQP config after failure.
* `:primary_config`, specify which of the supplied configurations is it the primary one. The default value is 0, the first item in the config array. Use 1 for the second and so on.
* `:fallback`, check for the return of the primary server, and fallback to it if and when it returns. WARNING: This currently calls `Process.exit` cause I haven't figured out a way to artificially kill the EM connection without the AMQP channels also being closed, which causes nothing to work even after EM connects to the primary server. It works for me cause dead workers are automatically relaunched with their default config.
* `:fallback_interval`, seconds between each check for original server if :fallback is true.
* `:selection`, not yet implemented.
## Notes ##
I would recommend you test the failover functionality in your own infrastructure before deploy to production, as this gem is still very much alpha/beta quality, and it does do a little bit of monkey patching to the amqp gem. That said, it there's a number of specs which should ensure things work as advertised, and nothing breaks. We are currently using it at Global Personals without any problems.
## Todo ##
* Figure out a sane way to fallback without having to kill the Ruby process.
* Better Readme/Documentation.
* Add option for next server selection on failover to be selected by random rather than next on the list.
* Convince get failover functionality merged in, or otherwise rewritten/added to the official AMQP gem.
## Note on Patches/Pull Requests ##
* Fork the project.
* Make your feature addition or bug fix.
* Add tests for it. This is important so I don't break it in a
future version unintentionally.
* Commit, do not mess with rakefile, version, or history.
(if you want to have your own version, that is fine but bump version in a commit by itself I can ignore when I pull)
* Send me a pull request. Bonus points for topic branches.
## Liccense and Copyright ##
Copyright (c) 2011 Jim Myhrberg & Global Personals, Ltd.
Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
"Software"), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to
the following conditions:
The above copyright notice and this permission notice shall be
included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

View File

@@ -1,2 +1,56 @@
$LOAD_PATH.unshift File.expand_path("lib", File.dirname(__FILE__))
require 'bundler'
Bundler::GemHelper.install_tasks
#
# Rspec
#
require 'rspec/core/rake_task'
RSpec::Core::RakeTask.new('spec:all') do |spec|
spec.pattern = [ 'spec/unit/**/*_spec.rb',
'spec/integration/**/*_spec.rb' ]
end
desc "Run unit specs"
task :spec => ["spec:unit"]
RSpec::Core::RakeTask.new('spec:unit') do |spec|
spec.pattern = 'spec/unit/**/*_spec.rb'
end
RSpec::Core::RakeTask.new('spec:integration') do |spec|
spec.pattern = 'spec/integration/**/*_spec.rb'
end
RSpec::Core::RakeTask.new(:rcov) do |spec|
spec.pattern = 'spec/**/*_spec.rb'
spec.rcov = true
end
#
# Yard
#
begin
require 'yard'
YARD::Rake::YardocTask.new
rescue LoadError
task :yardoc do
abort "YARD is not available. In order to run yardoc, you must: sudo gem install yard"
end
end
#
# Misc.
#
desc "Start irb with amqp-failover pre-loaded"
task :console do
exec "irb -r spec/spec_helper"
end
task :c => :console

View File

@@ -1,16 +1,16 @@
# -*- encoding: utf-8 -*-
$:.push File.expand_path("../lib", __FILE__)
require "amqp-failover/version"
require "amqp/failover/version"
Gem::Specification.new do |s|
s.name = "amqp-failover"
s.version = Amqp::Failover::VERSION
s.version = AMQP::Failover::VERSION
s.platform = Gem::Platform::RUBY
s.authors = ["TODO: Write your name"]
s.email = ["TODO: Write your email address"]
s.homepage = ""
s.summary = %q{TODO: Write a gem summary}
s.description = %q{TODO: Write a gem description}
s.authors = ["Jim Myhrberg"]
s.email = ["contact@jimeh.me"]
s.homepage = 'http://github.com/jimeh/amqp-failover'
s.summary = 'Add multi-server failover and fallback to amqp gem.'
s.description = 'Add multi-server failover and fallback to amqp gem.'
s.rubyforge_project = "amqp-failover"
@@ -18,4 +18,13 @@ Gem::Specification.new do |s|
s.test_files = `git ls-files -- {test,spec,features}/*`.split("\n")
s.executables = `git ls-files -- bin/*`.split("\n").map{ |f| File.basename(f) }
s.require_paths = ["lib"]
s.add_runtime_dependency 'amqp', '>= 0.7.0'
s.add_development_dependency 'rake', '>= 0.8.7'
s.add_development_dependency 'rack-test', '>= 0.5.6'
s.add_development_dependency 'rspec', '>= 2.1.0'
s.add_development_dependency 'yard', '>= 0.6.3'
s.add_development_dependency 'json', '>= 1.5.0'
s.add_development_dependency 'ruby-debug'
end

View File

@@ -1,5 +0,0 @@
module Amqp
module Failover
# Your code goes here...
end
end

View File

@@ -1,5 +0,0 @@
module Amqp
module Failover
VERSION = "0.0.1"
end
end

111
lib/amqp/failover.rb Normal file
View File

@@ -0,0 +1,111 @@
# encoding: utf-8
require 'amqp/failover_client'
require 'amqp/failover/config'
require 'amqp/failover/configurations'
require 'amqp/failover/logger'
require 'amqp/failover/server_discovery'
require 'amqp/failover/version'
require 'amqp/failover/ext/amqp/client.rb'
module AMQP
class Failover
attr_reader :latest_failed
attr_accessor :primary
attr_accessor :retry_timeout
attr_accessor :fallback
def initialize(confs = nil, opts = {})
@configs = Failover::Configurations.new(confs)
@options = default_options.merge(opts)
@configs.primary_ref = @options[:primary_config]
end
class << self
# pluggable logger specifically for tracking failover and fallbacks
def logger
@logger ||= Logger.new
end
attr_writer :logger
end
def default_options
{ :primary_config => 0,
:retry_timeout => 1,
:selection => :sequential, #TODO: Implement next server selection algorithm
:fallback => false, #TODO: Enable by default once a sane implementation is figured out
:fallback_interval => 10 }
end
def options
@options ||= {}
end
def fallback_interval
options[:fallback_interval] ||= default_options[:fallback_interval]
end
def primary
configs[:primary]
end
def refs
@refs ||= {}
end
def configs
@configs ||= Configurations.new
end
def add_config(conf = {}, ref = nil)
index = configs.index(conf)
configs.set(conf) if index.nil?
refs[ref] = (index || configs.index(conf)) if !ref.nil?
end
def failover_from(conf = {}, time = nil)
failed_with(conf, nil, time)
next_config
end
alias :from :failover_from
def failed_with(conf = {}, ref = nil, time = nil)
time ||= Time.now
if !(index = configs.index(conf)).nil?
configs[index].last_fail = time
@latest_failed = configs[index]
else
@latest_failed = configs.set(conf)
configs.last.last_fail = time
end
refs[ref] = (index || configs.index(conf)) if !ref.nil?
end
def next_config(retry_timeout = nil, after = nil)
return nil if configs.size <= 1
retry_timeout ||= @options[:retry_timeout]
after ||= @latest_failed
index = configs.index(after)
available = (index > 0) ? configs[index+1..-1] + configs[0..index-1] : configs[1..-1]
available.each do |conf|
return conf if conf.last_fail.nil? || (conf.last_fail.to_i + retry_timeout) < Time.now.to_i
end
return nil
end
def last_fail_of(match)
((match.is_a?(Hash) ? get_by_conf(match) : get_by_ref(match)) || Config::Failed.new).last_fail
end
def get_by_conf(conf = {})
configs[configs.index(conf)]
end
def get_by_ref(ref = nil)
configs[refs[ref]] if refs[ref]
end
end # Failover
end # AMQP

View File

@@ -0,0 +1,42 @@
# encoding: utf-8
module AMQP
class Failover
class Config < ::Hash
attr_accessor :last_fail
def initialize(hash = {}, last_fail_date = nil)
self.replace(symbolize_keys(defaults.merge(hash)))
self.last_fail = last_fail_date if last_fail_date
end
def defaults
AMQP.settings
end
def symbolize_keys(hash = {})
hash.inject({}) do |result, (key, value)|
result[key.is_a?(String) ? key.to_sym : key] = value
result
end
end
# order by latest fail, potentially useful if random config selection is used
def <=>(other)
if self.respond_to?(:last_fail) && other.respond_to?(:last_fail)
if self.last_fail.nil? && other.last_fail.nil?
return 0
elsif self.last_fail.nil? && !other.last_fail.nil?
return 1
elsif !self.last_fail.nil? && other.last_fail.nil?
return -1
end
return other.last_fail <=> self.last_fail
end
return 0
end
end # Config
end # Failover
end # AMQP

View File

@@ -0,0 +1,92 @@
# encoding: utf-8
module AMQP
class Failover
class Configurations < Array
def initialize(confs = nil)
load(confs)
end
def [](*args)
if args[0].is_a?(Symbol)
return primary if args[0] == :primary
get(args[0])
else
super(*args)
end
end
def []=(*args)
if args[0].is_a?(Symbol)
return primary = args.last if args[0] == :primary
set(args.last, args[0])
end
super(*args)
end
def refs
@refs ||= {}
end
def primary_ref
@primary_ref ||= 0
end
def primary_ref=(ref)
@primary_ref = ref
end
def primary
get(primary_ref) || AMQP.settings
end
def primary=(conf = {})
set(conf, primary_ref)
end
def get(ref = nil)
return self[ref] if ref.is_a?(Fixnum)
self[refs[ref]] if refs[ref]
end
def set(conf = {}, ref = nil)
conf = Failover::Config.new(conf) if !conf.is_a?(Failover::Config)
if (index = self.index(conf)).nil?
self << conf
else
conf = self[index]
end
refs[ref] = (index || self.index(conf)) if ref
conf
end
def find_next(conf = {})
current = self.index(conf)
self[(current+1 == self.size) ? 0 : current+1] if current
end
def load(conf)
if conf.is_a?(Array)
load_array(conf)
elsif conf.is_a?(Hash)
load_hash(conf)
end
end
def load_array(confs = [])
self.clear
refs = {}
confs.each do |conf|
conf = AMQP::Client.parse_amqp_url(conf) if conf.is_a?(String)
load_hash(conf)
end
end
def load_hash(conf = {})
set(conf)
end
end # Config
end # Failover
end # AMQP

View File

@@ -0,0 +1,67 @@
# encoding: utf-8
AMQP.client = AMQP::FailoverClient
module AMQP
module Client
class << self
# Connect with Failover supports specifying multiple AMQP servers and configurations.
#
# Argument Examples:
# - "amqp://guest:guest@host:5672,amqp://guest:guest@host:5673"
# - ["amqp://guest:guest@host:5672", "amqp://guest:guest@host:5673"]
# - [{:host => "host", :port => 5672}, {:host => "host", :port => 5673}]
# - {:hosts => ["amqp://user:pass@host:5672", "amqp://user:pass@host:5673"]}
# - {:hosts => [{:host => "host", :port => 5672}, {:host => "host", :port => 5673}]}
#
# The last two examples are by far the most flexible, cause they also let you specify
# failover and fallback specific options. Like so:
# - {:hosts => ["amqp://localhost:5672"], :fallback => false}
#
# Available failover options are:
# - :retry_timeout, time to wait before retrying a specific AMQP config after failure.
# - :primary_config, specify which of the supplied configurations is it the primary one. The default
# value is 0, the first item in the config array. Use 1 for the second and so on.
# - :fallback, check for the return of the primary server, and fallback to it if and when it returns.
# - :fallback_interval, seconds between each check for original server if :fallback is true.
# - :selection, not yet implimented.
#
def connect_with_failover(opts = nil)
opts = parse_amqp_url_or_opts(opts)
connect_without_failover(opts)
end
alias :connect_without_failover :connect
alias :connect :connect_with_failover
def parse_amqp_url_or_opts(opts = nil)
if opts.is_a?(String) && opts.index(',').nil?
opts = init_failover(opts.split(','))
elsif opts.is_a?(Array)
opts = init_failover(opts)
elsif opts.is_a?(Hash) && opts[:hosts].is_a?(Array)
confs = opts.delete(:hosts)
opts = init_failover(confs, opts)
end
opts
end
def init_failover(confs = nil, opts = {})
if !confs.nil? && confs.size > 0
failover = Failover.new(confs, opts)
failover.primary.merge({ :failover => failover })
end
end
end # << self
def disconnected_with_failover
return failover_switch if @failover
disconnected_without_failover
end
alias :disconnected_without_failover :disconnected
alias :disconnected :disconnected_with_failover
end # Client
end # AMQP

View File

@@ -0,0 +1,31 @@
# encoding: utf-8
module AMQP
class Failover
class Logger
attr_accessor :enabled
def initialize(enabled = nil)
@enabled = enabled || true
end
def error(*msg)
msg[0] = "[ERROR]: " + msg[0] if msg[0].is_a?(String)
write(*msg)
end
def info(*msg)
write(*msg)
end
private
def write(*msg)
return if !@enabled
puts *msg
end
end # Logger
end # Failover
end # AMQP

View File

@@ -0,0 +1,45 @@
# encoding: utf-8
module AMQP
class Failover
class ServerDiscovery < EM::Connection
class << self
attr_accessor :connection
end
def self.monitor(conf = {}, retry_interval = nil, &block)
if EM.reactor_running?
start_monitoring(conf, retry_interval, &block)
else
EM.run { start_monitoring(conf, retry_interval, &block) }
end
end
def initialize(args)
@done = args[:done]
@timer = args[:timer]
end
def connection_completed
@done.call
@timer.cancel
close_connection
end
def self.start_monitoring(conf = {}, retry_interval = nil, &block)
conf = conf.clone
retry_interval ||= 5
conf[:done] = block
conf[:timer] = EM::PeriodicTimer.new(retry_interval) do
@connection = connect(conf)
end
end
def self.connect(conf)
EM.connect(conf[:host], conf[:port], self, conf)
end
end # ServerDiscovery
end # Failover
end # AMQP

View File

@@ -0,0 +1,7 @@
# encoding: utf-8
module AMQP
class Failover
VERSION = "0.0.1"
end
end

View File

@@ -0,0 +1,83 @@
# encoding: utf-8
module AMQP
module FailoverClient
include AMQP::BasicClient
attr_accessor :failover
attr_reader :fallback_monitor
attr_accessor :settings
attr_accessor :on_disconnect
def self.extended(base)
if (base.failover = base.settings.delete(:failover))
base.on_disconnect = base.method(:disconnected)
end
end
def failover_switch
if (new_settings = @failover.from(@settings))
log_message = "Could not connect to or lost connection to server #{@settings[:host]}:#{@settings[:port]}. " +
"Attempting connection to: #{new_settings[:host]}:#{new_settings[:port]}"
logger.error(log_message)
logger.info(log_message)
if @failover.options[:fallback] && @failover.primary == @settings
fallback(@failover.primary, @failover.fallback_interval)
end
@settings = new_settings
reconnect
else
raise Error, "Could not connect to server #{@settings[:host]}:#{@settings[:port]}"
end
end
def logger
Failover.logger
end
def configs
@failover.configs if @failover
end
def clean_exit(msg = nil)
msg ||= "clean exit"
logger.info(msg)
logger.error(msg)
Process.exit
end
def fallback(conf = {}, retry_interval = nil)
@fallback_monitor = Failover::ServerDiscovery.monitor(conf, retry_interval) do
fallback_callback.call(conf, retry_interval)
end
end
def fallback_callback
#TODO: Figure out a way to artificially trigger EM to disconnect on fallback without channels being closed.
@fallback_callback ||= proc { |conf, retry_interval|
clean_exit("Primary server (#{conf[:host]}:#{conf[:port]}) is back. " +
"Performing clean exit to be relaunched with primary config.")
}
end
attr_writer :fallback_callback
#TODO: Figure out why I originally needed this
# def process_frame(frame)
# if mq = channels[frame.channel]
# mq.process_frame(frame)
# return
# end
#
# if frame.is_a?(AMQP::Frame::Method) && (method = frame.payload).is_a?(AMQP::Protocol::Connection::Close)
# if method.reply_text =~ /^NOT_ALLOWED/
# raise AMQP::Error, "#{method.reply_text} in #{::AMQP::Protocol.classes[method.class_id].methods[method.method_id]}"
# end
# end
# super(frame)
# end
end # FailoverClient
end # AMQP

View File

@@ -0,0 +1,59 @@
# encoding: utf-8
$LOAD_PATH.unshift File.expand_path(File.dirname(__FILE__))
require 'spec_helper'
require 'mq'
require 'amqp'
require 'amqp/server'
require 'server_helper'
describe "Basic AMQP connection with FailoverClient loaded" do
after(:each) do
ServerHelper.clear_logs
end
it "should be using FailoverClient" do
AMQP.client.should == AMQP::FailoverClient
end
it "should be able to connect" do
EM.run {
port = 15672
timeout = 2
serv = start_server(port)
EM.add_timer(1.5) {
conn = AMQP.connect(:host => 'localhost', :port => 15672)
EM.add_timer(0.1) {
conn.should be_connected
serv.stop
log = serv.log
log.size.should == 3
(0..2).each { |i| log[i]['method'].should == "send" }
log[0]['class'].should == 'AMQP::Protocol::Connection::Start'
log[1]['class'].should == 'AMQP::Protocol::Connection::Tune'
log[2]['class'].should == 'AMQP::Protocol::Connection::OpenOk'
EM.stop
}
}
}
end
it "should be able to connect and get disconnected" do
EM.run {
serv = start_server(25672)
EM.add_timer(0.1) {
conn = AMQP.connect(:host => 'localhost', :port => 25672)
EM.add_timer(0.1) {
conn.should be_connected
serv.stop
EM.add_timer(0.1) {
conn.should_not be_connected
EM.stop
}
}
}
}
end
end

View File

@@ -0,0 +1,141 @@
# encoding: utf-8
$LOAD_PATH.unshift File.expand_path(File.dirname(__FILE__))
require 'spec_helper'
require 'amqp/server'
require 'server_helper'
require 'logger_helper'
describe "Failover support loaded into AMQP gem" do
before(:each) do
@flog = LoggerHelper.new
AMQP::Failover.logger = @flog
end
after(:each) do
ServerHelper.clear_logs
AMQP::Failover.logger = nil
end
it "should be able to connect" do
port1 = 15672
EM.run {
serv = start_server(port1)
EM.add_timer(0.1) {
conn = AMQP.connect(:host => 'localhost', :port => port1)
conn.failover.should be_nil
EM.add_timer(0.1) {
conn.should be_connected
EM.stop
}
}
}
end
it "should be able to connect and failover" do
port1 = 25672
port2 = 35672
EM.run {
# start mock amqp servers
serv1 = start_server(port1)
serv2 = start_server(port2)
EM.add_timer(0.1) {
# start amqp client connection and make sure it's picked the right config
conn = AMQP.connect({:hosts => [{:port => port1}, {:port => port2}]})
conn.failover.primary[:port].should == port1
conn.settings[:port].should == port1
conn.settings.should == conn.failover.primary
EM.add_timer(0.1) {
# make sure client connected to the correct server, then kill server
conn.should be_connected
serv1.log.should have(3).items
serv2.log.should have(0).items
serv1.stop
EM.add_timer(0.1) {
# make sure client performed a failover when primary server died
conn.should be_connected
[:error, :info].each do |i|
@flog.send("#{i}_log").should have(1).item
@flog.send("#{i}_log")[0][0].should match(/connect to or lost connection.+#{port1}.+attempting connection.+#{port2}/i)
end
conn.settings[:port].should == port2
serv1.log.should have(3).items
serv2.log.should have(3).items
conn.close
EM.add_timer(0.1) {
serv2.stop
EM.stop
}
}
}
}
}
end
it "should be able to fallback when primary server returns" do
port1 = 45672
port2 = 55672
lambda {
EM.run {
# start mock amqp servers
serv1 = start_server(port1)
serv2 = start_server(port2)
EM.add_timer(0.1) {
# start amqp client connection and make sure it's picked the right config
conn = AMQP.connect({:hosts => [{:port => port1}, {:port => port2}], :fallback => true, :fallback_interval => 0.1})
conn.failover.primary[:port].should == port1
conn.settings[:port].should == port1
conn.settings.should == conn.failover.primary
EM.add_timer(0.1) {
# make sure client connected to the correct server, then kill server
conn.should be_connected
serv1.log.should have(3).items
serv2.log.should have(0).items
serv1.stop
EM.add_timer(0.1) {
# make sure client performed a failover when primary server died
conn.should be_connected
[:error, :info].each do |i|
@flog.send("#{i}_log").should have(1).item
@flog.send("#{i}_log")[0][0].should match(/connect to or lost connection.+#{port1}.+attempting connection.+#{port2}/i)
end
conn.settings[:port].should == port2
serv1.log.should have(3).items
serv2.log.should have(3).items
serv3 = start_server(port1)
EM.add_timer(0.2) {
# by this point client should have raised a SystemExit exception
serv2.stop
EM.stop
}
}
}
}
}
}.should raise_error(SystemExit, "exit")
[:error, :info].each do |i|
@flog.send("#{i}_log").should have(2).item
@flog.send("#{i}_log")[1][0].should match(/primary server.+45672.+performing clean exit/i)
end
end
it "should abide to :primary_config option" do
port1 = 75672
port2 = 65672
EM.run {
serv = start_server(port1)
EM.add_timer(0.1) {
conn = AMQP.connect({:hosts => [{:port => port1}, {:port => port2}], :primary_config => 1})
conn.failover.primary[:port].should == port2
conn.settings[:port].should == port2
conn.settings.should == conn.failover.primary
EM.add_timer(0.1) {
conn.should be_connected
EM.stop
}
}
}
end
end

18
spec/logger_helper.rb Normal file
View File

@@ -0,0 +1,18 @@
# encoding: utf-8
class LoggerHelper
attr_accessor :error_log
attr_accessor :info_log
def info(*args)
@info_log ||= []
@info_log << args
end
def error(*args)
@error_log ||= []
@error_log << args
end
end

77
spec/server_helper.rb Normal file
View File

@@ -0,0 +1,77 @@
# encoding: utf-8
require 'rubygems'
require 'mq'
require 'amqp'
require 'amqp/server'
require 'json'
class ServerHelper
attr_accessor :stdin
attr_accessor :stdout
attr_accessor :stderr
attr_accessor :pid
def initialize(port = nil, timeout = nil)
@port = port
@timout = timeout
File.open(log_file, 'w') {}
@pid = start(port, timeout)
end
def self.clear_logs
Dir.glob(File.expand_path('server_helper*.log', File.dirname(__FILE__))).each do |file|
File.delete(file)
end
end
def start(port = nil, timeout = nil)
port ||= 15672
timeout ||= 2
EM.fork_reactor {
$PORT = port
EM.start_server('localhost', port, AmqpServer)
EM.add_timer(timeout) { EM.stop }
}
end
def stop
Process.kill('TERM', @pid)
end
def kill
Process.kill('KILL', @pid)
end
def log
File.open(log_file).to_a.map{ |l| JSON.parse(l) }
end
def log_file
File.expand_path("server_helper-port#{@port}.log", File.dirname(__FILE__))
end
end
module AmqpServer
include AMQP::Server
# customize log output
def log(*args)
args = {:method => args[0], :class => args[1].payload.class, :pid => Process.pid}
filename = File.expand_path("server_helper-port#{$PORT}.log", File.dirname(__FILE__))
File.open(filename, 'a') do |f|
f.write("#{args.to_json}\n")
end
end
end
#
# Helper methods
#
def start_server(port = nil, timeout = nil)
ServerHelper.new(port, timeout)
end

49
spec/spec_helper.rb Normal file
View File

@@ -0,0 +1,49 @@
# encoding: utf-8
# add project-relative load paths
$LOAD_PATH.unshift File.expand_path(File.join(File.dirname(__FILE__), '..', 'lib'))
$LOAD_PATH.unshift File.expand_path(File.dirname(__FILE__))
# require stuff
require 'rubygems'
begin
require 'mq'
rescue LoadError => e
require 'amqp'
end
require 'amqp/failover'
require 'rspec'
require 'rspec/autorun'
#
# Helper methods
#
def wait_while(timeout = 10, retry_interval = 0.1, &block)
start = Time.now
while block.call
break if (Time.now - start).to_i >= timeout
sleep(retry_interval)
end
end
# stolen from Pid::running? from daemons gem
def pid_running?(pid)
return false unless pid
# Check if process is in existence
# The simplest way to do this is to send signal '0'
# (which is a single system call) that doesn't actually
# send a signal
begin
Process.kill(0, pid)
return true
rescue Errno::ESRCH
return false
rescue ::Exception # for example on EPERM (process exists but does not belong to us)
return true
end
end

View File

@@ -0,0 +1,67 @@
# encoding: utf-8
$LOAD_PATH.unshift File.expand_path(File.dirname(__FILE__))
require 'spec_helper'
describe 'AMQP::Failover::Config' do
before(:each) do
configs = [
{:host => 'rabbit0.local'},
{:host => 'rabbit1.local'},
{:host => 'rabbit2.local', :port => 5673}
]
@configs = configs.map { |conf| AMQP.settings.merge(conf) }
@fail = AMQP::Failover.new(@configs)
end
it "should initialize" do
fail = AMQP::Failover::Config.new(@configs[0])
fail.should == @configs[0]
fail.last_fail.should be_nil
now = Time.now
fail = AMQP::Failover::Config.new(@configs[1], now)
fail.should == @configs[1]
fail.last_fail.should == now
end
it "should order properly with #<=>" do
one_hour_ago = (Time.now - 3600)
two_hours_ago = (Time.now - 7200)
fail = [ AMQP::Failover::Config.new(@configs[0]),
AMQP::Failover::Config.new(@configs[1], one_hour_ago),
AMQP::Failover::Config.new(@configs[2], two_hours_ago) ]
(fail[1] <=> fail[0]).should == -1
(fail[0] <=> fail[0]).should == 0
(fail[0] <=> fail[1]).should == 1
(fail[1] <=> fail[2]).should == -1
(fail[1] <=> fail[1]).should == 0
(fail[2] <=> fail[1]).should == 1
fail.sort[0].last_fail.should == one_hour_ago
fail.sort[1].last_fail.should == two_hours_ago
fail.sort[2].last_fail.should == nil
end
it "should be ordered by last_fail" do
result = [ AMQP::Failover::Config.new(@configs[1], (Time.now - 60)),
AMQP::Failover::Config.new(@configs[2], (Time.now - (60*25))),
AMQP::Failover::Config.new(@configs[0], (Time.now - 3600)) ]
origin = [ AMQP::Failover::Config.new(@configs[0], (Time.now - 3600)),
AMQP::Failover::Config.new(@configs[1], (Time.now - 60)),
AMQP::Failover::Config.new(@configs[2], (Time.now - (60*25))) ]
origin.sort.should == result
origin = [ AMQP::Failover::Config.new(@configs[0]),
AMQP::Failover::Config.new(@configs[1], (Time.now - 60)),
AMQP::Failover::Config.new(@configs[2], (Time.now - (60*25))) ]
origin.sort.should == result
end
end

View File

@@ -0,0 +1,79 @@
# encoding: utf-8
$LOAD_PATH.unshift File.expand_path(File.dirname(__FILE__))
require 'spec_helper'
describe 'AMQP::Failover::Configurations' do
before(:each) do
@conf = AMQP::Failover::Configurations.new
@raw_configs = [
{:host => 'rabbit0.local'},
{:host => 'rabbit1.local'},
{:host => 'rabbit2.local', :port => 5673}
]
@configs = @raw_configs.map { |conf| AMQP.settings.merge(conf) }
end
it "should initialize" do
confs = AMQP::Failover::Configurations.new(@raw_configs)
confs.each_with_index do |conf, i|
conf.should be_a(AMQP::Failover::Config)
conf.should == @configs[i]
end
end
it "should set and get configs" do
@conf.primary_ref.should == 0
@conf.should have(0).items
@conf.set(@raw_configs[0])
@conf.should have(1).items
@conf.get(0).should == @configs[0]
@conf[0].should == @configs[0]
@conf.set(@raw_configs[1])
@conf.should have(2).items
@conf.get(1).should == @configs[1]
@conf[1].should == @configs[1]
# should just create a ref, as config exists
@conf.set(@raw_configs[1], :the_one)
@conf.should have(2).items
@conf.get(1).should == @configs[1]
@conf[:the_one].should == @configs[1]
@conf.load_array(@raw_configs)
@conf.should have(3).items
@conf.primary.should == @configs[0]
@conf.primary_ref = 1
@conf.primary.should == @configs[1]
@conf[:primary].should == @configs[1]
end
it "should #find_next" do
@conf.load(@raw_configs)
@conf.should have(3).items
@conf.find_next(@configs[0]).should == @configs[1]
@conf.find_next(@configs[1]).should == @configs[2]
@conf.find_next(@configs[2]).should == @configs[0]
end
it "should #load_hash" do
@conf.should have(0).items
@conf.load_hash(@raw_configs[0])
@conf.should have(1).items
@conf.primary.should == @configs[0]
end
it "should #load_array" do
@conf.load_hash(:host => 'rabbid-rabbit')
@conf.should have(1).items
@conf.load_array(@raw_configs)
@conf.should have(3).items
@conf.should == @configs
@conf.primary.should == @configs[0]
end
end

View File

@@ -0,0 +1,31 @@
class ServerDiscoveryHelper < AMQP::Failover::ServerDiscovery
class << self
alias :real_start_monitoring :start_monitoring
def start_monitoring(*args, &block)
$called << :start_monitoring
real_start_monitoring(*args, &block)
end
end
alias :real_initialize :initialize
def initialize(*args)
$called << :initialize
EM.start_server('127.0.0.1', 9999) if $start_count == 2
$start_count += 1
real_initialize(*args)
end
alias :real_connection_completed :connection_completed
def connection_completed
$called << :connection_completed
real_connection_completed
end
alias :real_close_connection :close_connection
def close_connection
$called << :close_connection
real_close_connection
end
end

View File

@@ -0,0 +1,56 @@
# encoding: utf-8
$LOAD_PATH.unshift File.expand_path(File.dirname(__FILE__))
require 'spec_helper'
require 'server_discovery_helper'
describe 'AMQP::Failover::ServerDiscovery' do
before(:each) do
$called = []
$start_count = 0
@args = { :host => 'localhost', :port => 9999 }
@retry_interval = 0.01
end
after(:all) do
$called = nil
$start_count = nil
end
it "should initialize" do
EM.run {
EM.start_server('127.0.0.1', 9999)
@mon = ServerDiscoveryHelper.monitor(@args, @retry_interval) do
$called << :done_block
EM.stop_event_loop
end
}
$start_count.should == 1
$called.should have(5).items
$called.uniq.should have(5).items
$called.should include(:start_monitoring)
$called.should include(:initialize)
$called.should include(:connection_completed)
$called.should include(:close_connection)
$called.should include(:done_block)
end
it "should retry on error" do
EM.run {
@mon = ServerDiscoveryHelper.monitor(@args, @retry_interval) do
$called << :done_block
EM.stop_event_loop
end
}
$start_count.should >= 3
$called.should have($start_count + 4).items
$called.uniq.should have(5).items
$called.should include(:start_monitoring)
$called.should include(:initialize)
$called.should include(:connection_completed)
$called.should include(:close_connection)
$called.should include(:done_block)
end
end

View File

@@ -0,0 +1,69 @@
# encoding: utf-8
$LOAD_PATH.unshift File.expand_path(File.dirname(__FILE__))
require 'spec_helper'
describe 'AMQP::Failover' do
before(:each) do
configs = [
{:host => 'rabbit0.local'},
{:host => 'rabbit1.local'},
{:host => 'rabbit2.local', :port => 5673}
]
@configs = configs.map { |conf| AMQP.settings.merge(conf) }
@fail = AMQP::Failover.new(@configs)
end
it "should initialize" do
@fail.configs.should == @configs
end
it "should #add_config" do
@fail.instance_variable_set("@configs", nil)
@fail.configs.should == []
@fail.add_config(@configs[0])
@fail.configs.should have(1).item
@fail.configs.should == [@configs[0]]
@fail.refs.should == {}
@fail.add_config(@configs[1], :hello)
@fail.configs.should have(2).items
@fail.configs.should include(@configs[1])
@fail.get_by_ref(:hello).should == @configs[1]
end
it "should #get_by_conf" do
fetched = @fail.get_by_conf(@configs[1])
fetched.should == @configs[1]
fetched.class.should == AMQP::Failover::Config
fetched.last_fail.should be_nil
end
it "should #fail_with" do
fail = AMQP::Failover.new
now = Time.now
fail.failed_with(@configs[0], 0, now)
fail.latest_failed.should == @configs[0]
fail.last_fail_of(@configs[0]).should == now
fail.last_fail_of(0).should == now
end
it "should find #next_config" do
@fail.failed_with(@configs[1])
@fail.next_config.should == @configs[2]
@fail.next_config.should == @configs[2]
@fail.failed_with(@configs[2])
@fail.next_config.should == @configs[0]
@fail.failed_with(@configs[0])
@fail.next_config.should be_nil
end
it "should #failover_from" do
now = Time.now
@fail.failover_from(@configs[0], now).should == @configs[1]
@fail.latest_failed.should == @configs[0]
@fail.latest_failed.last_fail.should == now
end
end