mirror of
https://github.com/jimeh/redistat.git
synced 2026-02-19 05:16:39 +00:00
Merge branch 'release/v0.3.0'
This commit is contained in:
10
README.md
10
README.md
@@ -25,6 +25,10 @@ view\_stats.rb:
|
||||
class ViewStats
|
||||
include Redistat::Model
|
||||
end
|
||||
|
||||
# if using Redistat in multiple threads set this
|
||||
# somewhere in the beginning of the execution stack
|
||||
Redistat.thread_safe = true
|
||||
|
||||
|
||||
### Simple Example ###
|
||||
@@ -187,6 +191,12 @@ By default when fetching statistics, Redistat will figure out how to do the leas
|
||||
|
||||
It is also intelligent enough to not fetch each day from 3-31 of a month, instead it would fetch the data for the whole month and the first two days, which are then removed from the summary of the whole month. This means three calls to `hgetall` instead of 29 if each whole day was fetched.
|
||||
|
||||
### Buffer ###
|
||||
|
||||
The buffer is a new, still semi-beta, feature aimed to reduce the number of Redis `hincrby` that Redistat sends. This should only really be useful when you're hitting north of 30,000 Redis requests per second, if your Redis server has limited resources, or against my recommendation you've opted to use 10, 20, or more label grouping levels.
|
||||
|
||||
Buffering tries to fold together multiple `store` calls into as few as possible by merging the statistics hashes from all calls and groups them based on scope, label, date depth, and more. You configure the the buffer by setting `Redistat.buffer_size` to an integer higher than 1. This basically tells Redistat how many `store` calls to buffer in memory before writing all data to Redis.
|
||||
|
||||
|
||||
## Todo ##
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@ require 'rubygems'
|
||||
require 'date'
|
||||
require 'time'
|
||||
require 'digest/sha1'
|
||||
require 'monitor'
|
||||
|
||||
# Active Support 2.x or 3.x
|
||||
require 'active_support'
|
||||
@@ -15,12 +16,15 @@ require 'time_ext'
|
||||
require 'redis'
|
||||
require 'json'
|
||||
|
||||
require 'redistat/options'
|
||||
require 'redistat/mixins/options'
|
||||
require 'redistat/mixins/synchronize'
|
||||
require 'redistat/mixins/database'
|
||||
require 'redistat/mixins/date_helper'
|
||||
|
||||
require 'redistat/connection'
|
||||
require 'redistat/database'
|
||||
require 'redistat/buffer'
|
||||
require 'redistat/collection'
|
||||
require 'redistat/date'
|
||||
require 'redistat/date_helper'
|
||||
require 'redistat/event'
|
||||
require 'redistat/finder'
|
||||
require 'redistat/key'
|
||||
@@ -33,6 +37,7 @@ require 'redistat/version'
|
||||
|
||||
require 'redistat/core_ext'
|
||||
|
||||
|
||||
module Redistat
|
||||
|
||||
KEY_NEXT_ID = ".next_id"
|
||||
@@ -47,6 +52,26 @@ module Redistat
|
||||
|
||||
class << self
|
||||
|
||||
def buffer
|
||||
Buffer.instance
|
||||
end
|
||||
|
||||
def buffer_size
|
||||
buffer.size
|
||||
end
|
||||
|
||||
def buffer_size=(size)
|
||||
buffer.size = size
|
||||
end
|
||||
|
||||
def thread_safe
|
||||
Synchronize.thread_safe
|
||||
end
|
||||
|
||||
def thread_safe=(value)
|
||||
Synchronize.thread_safe = value
|
||||
end
|
||||
|
||||
def connection(ref = nil)
|
||||
Connection.get(ref)
|
||||
end
|
||||
@@ -68,3 +93,9 @@ module Redistat
|
||||
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
# ensure buffer is flushed on program exit
|
||||
Kernel.at_exit do
|
||||
Redistat.buffer.flush(true)
|
||||
end
|
||||
|
||||
107
lib/redistat/buffer.rb
Normal file
107
lib/redistat/buffer.rb
Normal file
@@ -0,0 +1,107 @@
|
||||
require 'redistat/core_ext/hash'
|
||||
|
||||
module Redistat
|
||||
class Buffer
|
||||
include Synchronize
|
||||
|
||||
def self.instance
|
||||
@instance ||= self.new
|
||||
end
|
||||
|
||||
def size
|
||||
synchronize do
|
||||
@size ||= 0
|
||||
end
|
||||
end
|
||||
|
||||
def size=(value)
|
||||
synchronize do
|
||||
@size = value
|
||||
end
|
||||
end
|
||||
|
||||
def count
|
||||
@count ||= 0
|
||||
end
|
||||
|
||||
def store(key, stats, depth_limit, opts)
|
||||
return false unless should_buffer?
|
||||
|
||||
to_flush = {}
|
||||
buffkey = buffer_key(key, opts)
|
||||
|
||||
synchronize do
|
||||
if !queue.has_key?(buffkey)
|
||||
queue[buffkey] = { :key => key,
|
||||
:stats => {},
|
||||
:depth_limit => depth_limit,
|
||||
:opts => opts }
|
||||
end
|
||||
|
||||
queue[buffkey][:stats].merge_and_incr!(stats)
|
||||
incr_count
|
||||
|
||||
# return items to be flushed if buffer size limit has been reached
|
||||
to_flush = reset_queue
|
||||
end
|
||||
|
||||
# flush any data that's been cleared from the queue
|
||||
flush_data(to_flush)
|
||||
true
|
||||
end
|
||||
|
||||
def flush(force = false)
|
||||
to_flush = {}
|
||||
synchronize do
|
||||
to_flush = reset_queue(force)
|
||||
end
|
||||
flush_data(to_flush)
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
# should always be called from within a synchronize block
|
||||
def incr_count
|
||||
@count ||= 0
|
||||
@count += 1
|
||||
end
|
||||
|
||||
def queue
|
||||
@queue ||= {}
|
||||
end
|
||||
|
||||
def should_buffer?
|
||||
size > 1 # buffer size of 1 would be equal to not using buffer
|
||||
end
|
||||
|
||||
# should always be called from within a synchronize block
|
||||
def should_flush?
|
||||
(!queue.blank? && count >= size)
|
||||
end
|
||||
|
||||
# returns items to be flushed if buffer size limit has been reached
|
||||
# should always be called from within a synchronize block
|
||||
def reset_queue(force = false)
|
||||
return {} if !force && !should_flush?
|
||||
data = queue
|
||||
@queue = {}
|
||||
@count = 0
|
||||
data
|
||||
end
|
||||
|
||||
def flush_data(buffer_data)
|
||||
buffer_data.each do |k, item|
|
||||
Summary.update(item[:key], item[:stats], item[:depth_limit], item[:opts])
|
||||
end
|
||||
end
|
||||
|
||||
def buffer_key(key, opts)
|
||||
# depth_limit is not needed as it's evident in key.to_s
|
||||
opts_index = Summary.default_options.keys.sort { |a,b| a.to_s <=> b.to_s }.map do |k|
|
||||
opts[k] if opts.has_key?(k)
|
||||
end
|
||||
"#{key.to_s}:#{opts_index.join(':')}"
|
||||
end
|
||||
|
||||
end
|
||||
end
|
||||
@@ -1,29 +1,41 @@
|
||||
require 'monitor'
|
||||
|
||||
module Redistat
|
||||
module Connection
|
||||
|
||||
REQUIRED_SERVER_VERSION = "1.3.10"
|
||||
|
||||
# TODO: Create a ConnectionPool instance object using Sychronize mixin to replace Connection class
|
||||
|
||||
class << self
|
||||
|
||||
# TODO: clean/remove all ref-less connections
|
||||
|
||||
def get(ref = nil)
|
||||
ref ||= :default
|
||||
connections[references[ref]] || create
|
||||
synchronize do
|
||||
connections[references[ref]] || create
|
||||
end
|
||||
end
|
||||
|
||||
def add(conn, ref = nil)
|
||||
ref ||= :default
|
||||
check_redis_version(conn)
|
||||
references[ref] = conn.client.id
|
||||
connections[conn.client.id] = conn
|
||||
synchronize do
|
||||
check_redis_version(conn)
|
||||
references[ref] = conn.client.id
|
||||
connections[conn.client.id] = conn
|
||||
end
|
||||
end
|
||||
|
||||
def create(options = {})
|
||||
#TODO clean/remove all ref-less connections
|
||||
ref = options.delete(:ref) || :default
|
||||
options.reverse_merge!(default_options)
|
||||
conn = (connections[connection_id(options)] ||= connection(options))
|
||||
references[ref] = conn.client.id
|
||||
conn
|
||||
synchronize do
|
||||
options = options.clone
|
||||
ref = options.delete(:ref) || :default
|
||||
options.reverse_merge!(default_options)
|
||||
conn = (connections[connection_id(options)] ||= connection(options))
|
||||
references[ref] = conn.client.id
|
||||
conn
|
||||
end
|
||||
end
|
||||
|
||||
def connections
|
||||
@@ -36,9 +48,12 @@ module Redistat
|
||||
|
||||
private
|
||||
|
||||
def check_redis_version(conn)
|
||||
raise RedisServerIsTooOld if conn.info["redis_version"] < REQUIRED_SERVER_VERSION
|
||||
conn
|
||||
def monitor
|
||||
@monitor ||= Monitor.new
|
||||
end
|
||||
|
||||
def synchronize(&block)
|
||||
monitor.synchronize(&block)
|
||||
end
|
||||
|
||||
def connection(options)
|
||||
@@ -46,10 +61,15 @@ module Redistat
|
||||
end
|
||||
|
||||
def connection_id(options = {})
|
||||
options.reverse_merge!(default_options)
|
||||
options = options.reverse_merge(default_options)
|
||||
"redis://#{options[:host]}:#{options[:port]}/#{options[:db]}"
|
||||
end
|
||||
|
||||
def check_redis_version(conn)
|
||||
raise RedisServerIsTooOld if conn.info["redis_version"] < REQUIRED_SERVER_VERSION
|
||||
conn
|
||||
end
|
||||
|
||||
def default_options
|
||||
{
|
||||
:host => '127.0.0.1',
|
||||
|
||||
51
lib/redistat/mixins/synchronize.rb
Normal file
51
lib/redistat/mixins/synchronize.rb
Normal file
@@ -0,0 +1,51 @@
|
||||
require 'monitor'
|
||||
|
||||
module Redistat
|
||||
module Synchronize
|
||||
|
||||
class << self
|
||||
def included(base)
|
||||
base.send(:include, InstanceMethods)
|
||||
end
|
||||
|
||||
def monitor
|
||||
@monitor ||= Monitor.new
|
||||
end
|
||||
|
||||
def thread_safe
|
||||
monitor.synchronize do
|
||||
@thread_safe ||= false
|
||||
end
|
||||
end
|
||||
|
||||
def thread_safe=(value)
|
||||
monitor.synchronize do
|
||||
@thread_safe = value
|
||||
end
|
||||
end
|
||||
end # << self
|
||||
|
||||
module InstanceMethods
|
||||
def thread_safe
|
||||
Synchronize.thread_safe
|
||||
end
|
||||
|
||||
def thread_safe=(value)
|
||||
Synchronize.thread_safe = value
|
||||
end
|
||||
|
||||
def monitor
|
||||
Synchronize.monitor
|
||||
end
|
||||
|
||||
def synchronize(&block)
|
||||
if thread_safe
|
||||
monitor.synchronize(&block)
|
||||
else
|
||||
block.call
|
||||
end
|
||||
end
|
||||
end # InstanceMethods
|
||||
|
||||
end
|
||||
end
|
||||
@@ -1,3 +1,5 @@
|
||||
require 'active_support/core_ext/hash/indifferent_access'
|
||||
|
||||
module Redistat
|
||||
class Result < HashWithIndifferentAccess
|
||||
|
||||
|
||||
@@ -2,66 +2,81 @@ module Redistat
|
||||
class Summary
|
||||
include Database
|
||||
|
||||
def self.default_options
|
||||
{ :enable_grouping => true,
|
||||
:label_indexing => true,
|
||||
:connection_ref => nil }
|
||||
end
|
||||
|
||||
def self.update_all(key, stats = {}, depth_limit = nil, opts = {})
|
||||
stats ||= {}
|
||||
return nil if stats.size == 0
|
||||
class << self
|
||||
|
||||
options = default_options.merge((opts || {}).reject { |k,v| v.nil? })
|
||||
def default_options
|
||||
{ :enable_grouping => true,
|
||||
:label_indexing => true,
|
||||
:connection_ref => nil }
|
||||
end
|
||||
|
||||
depth_limit ||= key.depth
|
||||
def buffer
|
||||
Redistat.buffer
|
||||
end
|
||||
|
||||
if options[:enable_grouping]
|
||||
stats = inject_group_summaries(stats)
|
||||
key.groups.each do |k|
|
||||
update_key(k, stats, depth_limit, options[:connection_ref])
|
||||
k.update_index if options[:label_indexing]
|
||||
def update_all(key, stats = {}, depth_limit = nil, opts = {})
|
||||
stats ||= {}
|
||||
return if stats.empty?
|
||||
|
||||
options = default_options.merge((opts || {}).reject { |k,v| v.nil? })
|
||||
|
||||
depth_limit ||= key.depth
|
||||
|
||||
update_through_buffer(key, stats, depth_limit, options)
|
||||
end
|
||||
|
||||
def update_through_buffer(*args)
|
||||
update(*args) unless buffer.store(*args)
|
||||
end
|
||||
|
||||
def update(key, stats, depth_limit, opts)
|
||||
if opts[:enable_grouping]
|
||||
stats = inject_group_summaries(stats)
|
||||
key.groups.each do |k|
|
||||
update_key(k, stats, depth_limit, opts[:connection_ref])
|
||||
k.update_index if opts[:label_indexing]
|
||||
end
|
||||
else
|
||||
update_key(key, stats, depth_limit, opts[:connection_ref])
|
||||
end
|
||||
else
|
||||
update_key(key, stats, depth_limit, options[:connection_ref])
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def self.update_key(key, stats, depth_limit, connection_ref)
|
||||
Date::DEPTHS.each do |depth|
|
||||
update(key, stats, depth, connection_ref)
|
||||
break if depth == depth_limit
|
||||
|
||||
private
|
||||
|
||||
def update_key(key, stats, depth_limit, connection_ref)
|
||||
Date::DEPTHS.each do |depth|
|
||||
update_fields(key, stats, depth, connection_ref)
|
||||
break if depth == depth_limit
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def self.update(key, stats, depth, connection_ref = nil)
|
||||
stats.each do |field, value|
|
||||
db(connection_ref).hincrby key.to_s(depth), field, value
|
||||
|
||||
def update_fields(key, stats, depth, connection_ref = nil)
|
||||
stats.each do |field, value|
|
||||
db(connection_ref).hincrby key.to_s(depth), field, value
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def self.inject_group_summaries!(stats)
|
||||
summaries = {}
|
||||
stats.each do |key, value|
|
||||
parts = key.to_s.split(GROUP_SEPARATOR)
|
||||
parts.pop
|
||||
if parts.size > 0
|
||||
sum_parts = []
|
||||
parts.each do |part|
|
||||
sum_parts << part
|
||||
sum_key = sum_parts.join(GROUP_SEPARATOR)
|
||||
(summaries.has_key?(sum_key)) ? summaries[sum_key] += value : summaries[sum_key] = value
|
||||
|
||||
def inject_group_summaries!(stats)
|
||||
summaries = {}
|
||||
stats.each do |key, value|
|
||||
parts = key.to_s.split(GROUP_SEPARATOR)
|
||||
parts.pop
|
||||
if parts.size > 0
|
||||
sum_parts = []
|
||||
parts.each do |part|
|
||||
sum_parts << part
|
||||
sum_key = sum_parts.join(GROUP_SEPARATOR)
|
||||
(summaries.has_key?(sum_key)) ? summaries[sum_key] += value : summaries[sum_key] = value
|
||||
end
|
||||
end
|
||||
end
|
||||
stats.merge_and_incr!(summaries)
|
||||
end
|
||||
stats.merge_and_incr!(summaries)
|
||||
|
||||
def inject_group_summaries(stats)
|
||||
inject_group_summaries!(stats.clone)
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
def self.inject_group_summaries(stats)
|
||||
inject_group_summaries!(stats.clone)
|
||||
end
|
||||
|
||||
end
|
||||
end
|
||||
@@ -1,3 +1,3 @@
|
||||
module Redistat
|
||||
VERSION = "0.2.6"
|
||||
VERSION = "0.3.0"
|
||||
end
|
||||
|
||||
157
spec/buffer_spec.rb
Normal file
157
spec/buffer_spec.rb
Normal file
@@ -0,0 +1,157 @@
|
||||
require "spec_helper"
|
||||
|
||||
describe Redistat::Buffer do
|
||||
|
||||
before(:each) do
|
||||
@class = Redistat::Buffer
|
||||
@buffer = Redistat::Buffer.instance
|
||||
@key = mock('Key', :to_s => "Scope/label:2011")
|
||||
@stats = {:count => 1, :views => 3}
|
||||
@depth_limit = :hour
|
||||
@opts = {:enable_grouping => true}
|
||||
end
|
||||
|
||||
# let's cleanup after ourselves for the other specs
|
||||
after(:each) do
|
||||
@class.instance_variable_set("@instance", nil)
|
||||
@buffer.size = 0
|
||||
end
|
||||
|
||||
it "should provide instance of itself" do
|
||||
@buffer.should be_a(@class)
|
||||
end
|
||||
|
||||
it "should only buffer if buffer size setting is greater than 1" do
|
||||
@buffer.size.should == 0
|
||||
@buffer.send(:should_buffer?).should be_false
|
||||
@buffer.size = 1
|
||||
@buffer.size.should == 1
|
||||
@buffer.send(:should_buffer?).should be_false
|
||||
@buffer.size = 2
|
||||
@buffer.size.should == 2
|
||||
@buffer.send(:should_buffer?).should be_true
|
||||
end
|
||||
|
||||
it "should only flush buffer if buffer size is greater than or equal to buffer size setting" do
|
||||
@buffer.size.should == 0
|
||||
@buffer.send(:queue).size.should == 0
|
||||
@buffer.send(:should_flush?).should be_false
|
||||
@buffer.send(:queue)[:hello] = 'world'
|
||||
@buffer.send(:incr_count)
|
||||
@buffer.send(:should_flush?).should be_true
|
||||
@buffer.size = 5
|
||||
@buffer.send(:should_flush?).should be_false
|
||||
3.times { |i|
|
||||
@buffer.send(:queue)[i] = i.to_s
|
||||
@buffer.send(:incr_count)
|
||||
}
|
||||
@buffer.send(:should_flush?).should be_false
|
||||
@buffer.send(:queue)[4] = '4'
|
||||
@buffer.send(:incr_count)
|
||||
@buffer.send(:should_flush?).should be_true
|
||||
end
|
||||
|
||||
it "should force flush queue irregardless of result of #should_flush? when #reset_queue is called with true" do
|
||||
@buffer.send(:queue)[:hello] = 'world'
|
||||
@buffer.send(:incr_count)
|
||||
@buffer.send(:should_flush?).should be_true
|
||||
@buffer.size = 2
|
||||
@buffer.send(:should_flush?).should be_false
|
||||
@buffer.send(:reset_queue).should == {}
|
||||
@buffer.instance_variable_get("@count").should == 1
|
||||
@buffer.send(:reset_queue, true).should == {:hello => 'world'}
|
||||
@buffer.instance_variable_get("@count").should == 0
|
||||
end
|
||||
|
||||
it "should #flush_data into Summary.update properly" do
|
||||
# the root level key value doesn't actually matter, but it's something like this...
|
||||
data = {'ScopeName/label/goes/here:2011::true:true' => {
|
||||
:key => @key,
|
||||
:stats => @stats,
|
||||
:depth_limit => @depth_limit,
|
||||
:opts => @opts
|
||||
}}
|
||||
item = data.first[1]
|
||||
Redistat::Summary.should_receive(:update).with(@key, @stats, @depth_limit, @opts)
|
||||
@buffer.send(:flush_data, data)
|
||||
end
|
||||
|
||||
it "should build #buffer_key correctly" do
|
||||
opts = {:enable_grouping => true, :label_indexing => false, :connection_ref => nil}
|
||||
@buffer.send(:buffer_key, @key, opts).should == "#{@key.to_s}::true:false"
|
||||
opts = {:enable_grouping => false, :label_indexing => true, :connection_ref => :omg}
|
||||
@buffer.send(:buffer_key, @key, opts).should == "#{@key.to_s}:omg:false:true"
|
||||
end
|
||||
|
||||
describe "Buffering" do
|
||||
it "should store items on buffer queue" do
|
||||
@buffer.store(@key, @stats, @depth_limit, @opts).should be_false
|
||||
@buffer.size = 5
|
||||
@buffer.store(@key, @stats, @depth_limit, @opts).should be_true
|
||||
@buffer.send(:queue).should have(1).item
|
||||
@buffer.send(:queue)[@buffer.send(:queue).keys.first][:stats][:count].should == 1
|
||||
@buffer.send(:queue)[@buffer.send(:queue).keys.first][:stats][:views].should == 3
|
||||
@buffer.store(@key, @stats, @depth_limit, @opts).should be_true
|
||||
@buffer.send(:queue).should have(1).items
|
||||
@buffer.send(:queue)[@buffer.send(:queue).keys.first][:stats][:count].should == 2
|
||||
@buffer.send(:queue)[@buffer.send(:queue).keys.first][:stats][:views].should == 6
|
||||
end
|
||||
|
||||
it "should flush buffer queue when size is reached" do
|
||||
key = mock('Key', :to_s => "Scope/labelx:2011")
|
||||
@buffer.size = 10
|
||||
Redistat::Summary.should_receive(:update).exactly(2).times.and_return do |k, stats, depth_limit, opts|
|
||||
depth_limit.should == @depth_limit
|
||||
opts.should == @opts
|
||||
if k == @key
|
||||
stats[:count].should == 6
|
||||
stats[:views].should == 18
|
||||
elsif k == key
|
||||
stats[:count].should == 4
|
||||
stats[:views].should == 12
|
||||
end
|
||||
end
|
||||
6.times { @buffer.store(@key, @stats, @depth_limit, @opts).should be_true }
|
||||
4.times { @buffer.store(key, @stats, @depth_limit, @opts).should be_true }
|
||||
end
|
||||
end
|
||||
|
||||
describe "Thread-Safety" do
|
||||
it "should read/write to buffer queue in a thread-safe manner" do
|
||||
|
||||
# This spec passes wether thread safety is enabled or not. In short I need
|
||||
# better specs for thread-safety, and personally a better understanding of
|
||||
# thread-safety in general.
|
||||
Redistat.thread_safe = true
|
||||
|
||||
key = mock('Key', :to_s => "Scope/labelx:2011")
|
||||
@buffer.size = 100
|
||||
|
||||
Redistat::Summary.should_receive(:update).exactly(2).times.and_return do |k, stats, depth_limit, opts|
|
||||
depth_limit.should == @depth_limit
|
||||
opts.should == @opts
|
||||
if k == @key
|
||||
stats[:count].should == 60
|
||||
stats[:views].should == 180
|
||||
elsif k == key
|
||||
stats[:count].should == 40
|
||||
stats[:views].should == 120
|
||||
end
|
||||
end
|
||||
|
||||
threads = []
|
||||
10.times do
|
||||
threads << Thread.new {
|
||||
6.times { @buffer.store(@key, @stats, @depth_limit, @opts).should be_true }
|
||||
4.times { @buffer.store(key, @stats, @depth_limit, @opts).should be_true }
|
||||
}
|
||||
end
|
||||
|
||||
threads.each { |t| t.join }
|
||||
end
|
||||
|
||||
it "should have better specs that actually fail when thread-safety is off"
|
||||
|
||||
end
|
||||
|
||||
end
|
||||
@@ -3,33 +3,34 @@ include Redistat
|
||||
|
||||
describe Redistat::Connection do
|
||||
|
||||
before(:each) do
|
||||
@redis = Redistat.redis
|
||||
end
|
||||
|
||||
it "should have a valid Redis client instance" do
|
||||
Redistat.redis.should_not be_nil
|
||||
end
|
||||
|
||||
it "should have initialized custom testing connection" do
|
||||
redis = Redistat.redis
|
||||
redis.client.host.should == '127.0.0.1'
|
||||
redis.client.port.should == 8379
|
||||
redis.client.db.should == 15
|
||||
@redis.client.host.should == '127.0.0.1'
|
||||
@redis.client.port.should == 8379
|
||||
@redis.client.db.should == 15
|
||||
end
|
||||
|
||||
it "should be able to set and get data" do
|
||||
redis = Redistat.redis
|
||||
redis.set("hello", "world")
|
||||
redis.get("hello").should == "world"
|
||||
redis.del("hello").should be_true
|
||||
@redis.set("hello", "world")
|
||||
@redis.get("hello").should == "world"
|
||||
@redis.del("hello").should be_true
|
||||
end
|
||||
|
||||
it "should be able to store hashes to Redis" do
|
||||
redis = Redistat.redis
|
||||
redis.hset("hash", "field", "1")
|
||||
redis.hget("hash", "field").should == "1"
|
||||
redis.hincrby("hash", "field", 1)
|
||||
redis.hget("hash", "field").should == "2"
|
||||
redis.hincrby("hash", "field", -1)
|
||||
redis.hget("hash", "field").should == "1"
|
||||
redis.del("hash")
|
||||
@redis.hset("hash", "field", "1")
|
||||
@redis.hget("hash", "field").should == "1"
|
||||
@redis.hincrby("hash", "field", 1)
|
||||
@redis.hget("hash", "field").should == "2"
|
||||
@redis.hincrby("hash", "field", -1)
|
||||
@redis.hget("hash", "field").should == "1"
|
||||
@redis.del("hash")
|
||||
end
|
||||
|
||||
it "should be accessible from Redistat module" do
|
||||
@@ -58,4 +59,9 @@ describe Redistat::Connection do
|
||||
Redistat.connect(:port => 8379, :db => 15)
|
||||
end
|
||||
|
||||
# TODO: Test thread-safety
|
||||
it "should be thread-safe" do
|
||||
pending("need to figure out a way to test thread-safety")
|
||||
end
|
||||
|
||||
end
|
||||
@@ -194,13 +194,13 @@ describe Redistat::Finder do
|
||||
|
||||
def create_example_stats
|
||||
key = Redistat::Key.new(@scope, @label, (first = Time.parse("2010-05-14 13:43")))
|
||||
Redistat::Summary.update(key, @stats, :hour)
|
||||
Redistat::Summary.send(:update_fields, key, @stats, :hour)
|
||||
key = Redistat::Key.new(@scope, @label, Time.parse("2010-05-14 13:53"))
|
||||
Redistat::Summary.update(key, @stats, :hour)
|
||||
Redistat::Summary.send(:update_fields, key, @stats, :hour)
|
||||
key = Redistat::Key.new(@scope, @label, Time.parse("2010-05-14 14:52"))
|
||||
Redistat::Summary.update(key, @stats, :hour)
|
||||
Redistat::Summary.send(:update_fields, key, @stats, :hour)
|
||||
key = Redistat::Key.new(@scope, @label, (last = Time.parse("2010-05-14 15:02")))
|
||||
Redistat::Summary.update(key, @stats, :hour)
|
||||
Redistat::Summary.send(:update_fields, key, @stats, :hour)
|
||||
[first - 1.hour, last + 1.hour]
|
||||
end
|
||||
|
||||
|
||||
@@ -156,4 +156,49 @@ describe Redistat::Model do
|
||||
stats.total[:weight].should == 617
|
||||
end
|
||||
|
||||
describe "Write Buffer" do
|
||||
before(:each) do
|
||||
Redistat.buffer_size = 20
|
||||
end
|
||||
|
||||
after(:each) do
|
||||
Redistat.buffer_size = 0
|
||||
end
|
||||
|
||||
it "should buffer calls in memory before committing to Redis" do
|
||||
14.times do
|
||||
ModelHelper1.store("sheep.black", {:count => 1, :weight => 461}, @time.hours_ago(4))
|
||||
end
|
||||
ModelHelper1.fetch("sheep.black", @time.hours_ago(5), @time.hours_since(1)).total.should == {}
|
||||
|
||||
5.times do
|
||||
ModelHelper1.store("sheep.black", {:count => 1, :weight => 156}, @time)
|
||||
end
|
||||
ModelHelper1.fetch("sheep.black", @time.hours_ago(5), @time.hours_since(1)).total.should == {}
|
||||
|
||||
ModelHelper1.store("sheep.black", {:count => 1, :weight => 156}, @time)
|
||||
stats = ModelHelper1.fetch("sheep.black", @time.hours_ago(5), @time.hours_since(1))
|
||||
stats.total["count"].should == 20
|
||||
stats.total["weight"].should == 7390
|
||||
end
|
||||
|
||||
it "should force flush buffer when #flush(true) is called" do
|
||||
ModelHelper1.fetch("sheep.black", @time.hours_ago(5), @time.hours_since(1)).total.should == {}
|
||||
14.times do
|
||||
ModelHelper1.store("sheep.black", {:count => 1, :weight => 461}, @time.hours_ago(4))
|
||||
end
|
||||
ModelHelper1.fetch("sheep.black", @time.hours_ago(5), @time.hours_since(1)).total.should == {}
|
||||
Redistat.buffer.flush(true)
|
||||
|
||||
stats = ModelHelper1.fetch("sheep.black", @time.hours_ago(5), @time.hours_since(1))
|
||||
stats.total["count"].should == 14
|
||||
stats.total["weight"].should == 6454
|
||||
end
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -13,19 +13,19 @@ describe Redistat::Summary do
|
||||
end
|
||||
|
||||
it "should update a single summary properly" do
|
||||
Redistat::Summary.update(@key, @stats, :hour)
|
||||
Redistat::Summary.send(:update_fields, @key, @stats, :hour)
|
||||
summary = db.hgetall(@key.to_s(:hour))
|
||||
summary.should have(2).items
|
||||
summary["views"].should == "3"
|
||||
summary["visitors"].should == "2"
|
||||
|
||||
Redistat::Summary.update(@key, @stats, :hour)
|
||||
Redistat::Summary.send(:update_fields, @key, @stats, :hour)
|
||||
summary = db.hgetall(@key.to_s(:hour))
|
||||
summary.should have(2).items
|
||||
summary["views"].should == "6"
|
||||
summary["visitors"].should == "4"
|
||||
|
||||
Redistat::Summary.update(@key, {"views" => -4, "visitors" => -3}, :hour)
|
||||
Redistat::Summary.send(:update_fields, @key, {"views" => -4, "visitors" => -3}, :hour)
|
||||
summary = db.hgetall(@key.to_s(:hour))
|
||||
summary.should have(2).items
|
||||
summary["views"].should == "2"
|
||||
@@ -48,7 +48,7 @@ describe Redistat::Summary do
|
||||
|
||||
it "should update summaries even if no label is set" do
|
||||
key = Redistat::Key.new(@scope, nil, @date, {:depth => :day})
|
||||
Redistat::Summary.update(key, @stats, :hour)
|
||||
Redistat::Summary.send(:update_fields, key, @stats, :hour)
|
||||
summary = db.hgetall(key.to_s(:hour))
|
||||
summary.should have(2).items
|
||||
summary["views"].should == "3"
|
||||
|
||||
64
spec/synchronize_spec.rb
Normal file
64
spec/synchronize_spec.rb
Normal file
@@ -0,0 +1,64 @@
|
||||
require "spec_helper"
|
||||
|
||||
describe Redistat::Synchronize do
|
||||
it { should respond_to(:monitor) }
|
||||
it { should respond_to(:thread_safe) }
|
||||
it { should respond_to(:thread_safe=) }
|
||||
|
||||
describe "instanciated class with Redistat::Synchronize included" do
|
||||
subject { SynchronizeSpecHelper.new }
|
||||
it { should respond_to(:monitor) }
|
||||
it { should respond_to(:thread_safe) }
|
||||
it { should respond_to(:thread_safe=) }
|
||||
it { should respond_to(:synchronize) }
|
||||
|
||||
end
|
||||
|
||||
describe "#synchronize method" do
|
||||
|
||||
before(:each) do
|
||||
Redistat::Synchronize.instance_variable_set("@thread_safe", nil)
|
||||
@obj = SynchronizeSpecHelper.new
|
||||
end
|
||||
|
||||
it "should share single Monitor object across all objects" do
|
||||
@obj.monitor.should == Redistat::Synchronize.monitor
|
||||
end
|
||||
|
||||
it "should share thread_safe option across all objects" do
|
||||
obj2 = SynchronizeSpecHelper.new
|
||||
Redistat::Synchronize.thread_safe.should be_false
|
||||
@obj.thread_safe.should be_false
|
||||
obj2.thread_safe.should be_false
|
||||
@obj.thread_safe = true
|
||||
Redistat::Synchronize.thread_safe.should be_true
|
||||
@obj.thread_safe.should be_true
|
||||
obj2.thread_safe.should be_true
|
||||
end
|
||||
|
||||
it "should not synchronize when thread_safe is disabled" do
|
||||
# monitor receives :synchronize twice cause #thread_safe is _always_ synchronized
|
||||
Redistat::Synchronize.monitor.should_receive(:synchronize).twice
|
||||
@obj.thread_safe.should be_false # first #synchronize call
|
||||
@obj.synchronize { 'foo' } # one #synchronize call while checking #thread_safe
|
||||
end
|
||||
|
||||
it "should synchronize when thread_safe is enabled" do
|
||||
Monitor.class_eval {
|
||||
# we're stubbing synchronize to ensure it's being called correctly, but still need it :P
|
||||
alias :real_synchronize :synchronize
|
||||
}
|
||||
Redistat::Synchronize.monitor.should_receive(:synchronize).with.exactly(4).times.and_return { |block|
|
||||
Redistat::Synchronize.monitor.real_synchronize(&block)
|
||||
}
|
||||
@obj.thread_safe.should be_false # first synchronize call
|
||||
Redistat::Synchronize.thread_safe = true # second synchronize call
|
||||
@obj.synchronize { 'foo' } # two synchronize calls, once while checking thread_safe, once to call black
|
||||
end
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
class SynchronizeSpecHelper
|
||||
include Redistat::Synchronize
|
||||
end
|
||||
Reference in New Issue
Block a user