mirror of
https://github.com/jimeh/redistat.git
synced 2026-02-18 21:06:40 +00:00
Merge branch 'feature/buffer' into dev
This commit is contained in:
@@ -3,6 +3,7 @@ require 'rubygems'
|
||||
require 'date'
|
||||
require 'time'
|
||||
require 'digest/sha1'
|
||||
require 'monitor'
|
||||
|
||||
# Active Support 2.x or 3.x
|
||||
require 'active_support'
|
||||
@@ -15,12 +16,15 @@ require 'time_ext'
|
||||
require 'redis'
|
||||
require 'json'
|
||||
|
||||
require 'redistat/options'
|
||||
require 'redistat/mixins/options'
|
||||
require 'redistat/mixins/synchronize'
|
||||
require 'redistat/mixins/database'
|
||||
require 'redistat/mixins/date_helper'
|
||||
|
||||
require 'redistat/connection'
|
||||
require 'redistat/database'
|
||||
require 'redistat/buffer'
|
||||
require 'redistat/collection'
|
||||
require 'redistat/date'
|
||||
require 'redistat/date_helper'
|
||||
require 'redistat/event'
|
||||
require 'redistat/finder'
|
||||
require 'redistat/key'
|
||||
@@ -33,6 +37,7 @@ require 'redistat/version'
|
||||
|
||||
require 'redistat/core_ext'
|
||||
|
||||
|
||||
module Redistat
|
||||
|
||||
KEY_NEXT_ID = ".next_id"
|
||||
@@ -47,6 +52,26 @@ module Redistat
|
||||
|
||||
class << self
|
||||
|
||||
def buffer
|
||||
Buffer.instance
|
||||
end
|
||||
|
||||
def buffer_size
|
||||
buffer.size
|
||||
end
|
||||
|
||||
def buffer_size=(size)
|
||||
buffer.size = size
|
||||
end
|
||||
|
||||
def thread_safe
|
||||
Synchronize.thread_safe
|
||||
end
|
||||
|
||||
def thread_safe=(value)
|
||||
Synchronize.thread_safe = value
|
||||
end
|
||||
|
||||
def connection(ref = nil)
|
||||
Connection.get(ref)
|
||||
end
|
||||
@@ -68,3 +93,9 @@ module Redistat
|
||||
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
# ensure buffer is flushed on program exit
|
||||
Kernel.at_exit do
|
||||
Redistat.buffer.flush(true)
|
||||
end
|
||||
|
||||
107
lib/redistat/buffer.rb
Normal file
107
lib/redistat/buffer.rb
Normal file
@@ -0,0 +1,107 @@
|
||||
require 'redistat/core_ext/hash'
|
||||
|
||||
module Redistat
|
||||
class Buffer
|
||||
include Synchronize
|
||||
|
||||
def self.instance
|
||||
@instance ||= self.new
|
||||
end
|
||||
|
||||
def size
|
||||
synchronize do
|
||||
@size ||= 0
|
||||
end
|
||||
end
|
||||
|
||||
def size=(value)
|
||||
synchronize do
|
||||
@size = value
|
||||
end
|
||||
end
|
||||
|
||||
def count
|
||||
@count ||= 0
|
||||
end
|
||||
|
||||
def store(key, stats, depth_limit, opts)
|
||||
return false unless should_buffer?
|
||||
|
||||
to_flush = {}
|
||||
buffkey = buffer_key(key, opts)
|
||||
|
||||
synchronize do
|
||||
if !queue.has_key?(buffkey)
|
||||
queue[buffkey] = { :key => key,
|
||||
:stats => {},
|
||||
:depth_limit => depth_limit,
|
||||
:opts => opts }
|
||||
end
|
||||
|
||||
queue[buffkey][:stats].merge_and_incr!(stats)
|
||||
incr_count
|
||||
|
||||
# return items to be flushed if buffer size limit has been reached
|
||||
to_flush = reset_queue
|
||||
end
|
||||
|
||||
# flush any data that's been cleared from the queue
|
||||
flush_data(to_flush)
|
||||
true
|
||||
end
|
||||
|
||||
def flush(force = false)
|
||||
to_flush = {}
|
||||
synchronize do
|
||||
to_flush = reset_queue(force)
|
||||
end
|
||||
flush_data(to_flush)
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
# should always be called from within a synchronize block
|
||||
def incr_count
|
||||
@count ||= 0
|
||||
@count += 1
|
||||
end
|
||||
|
||||
def queue
|
||||
@queue ||= {}
|
||||
end
|
||||
|
||||
def should_buffer?
|
||||
size > 1 # buffer size of 1 would be equal to not using buffer
|
||||
end
|
||||
|
||||
# should always be called from within a synchronize block
|
||||
def should_flush?
|
||||
(!queue.blank? && count >= size)
|
||||
end
|
||||
|
||||
# returns items to be flushed if buffer size limit has been reached
|
||||
# should always be called from within a synchronize block
|
||||
def reset_queue(force = false)
|
||||
return {} if !force && !should_flush?
|
||||
data = queue
|
||||
@queue = {}
|
||||
@count = 0
|
||||
data
|
||||
end
|
||||
|
||||
def flush_data(buffer_data)
|
||||
buffer_data.each do |k, item|
|
||||
Summary.update(item[:key], item[:stats], item[:depth_limit], item[:opts])
|
||||
end
|
||||
end
|
||||
|
||||
def buffer_key(key, opts)
|
||||
# depth_limit is not needed as it's evident in key.to_s
|
||||
opts_index = Summary.default_options.keys.sort { |a,b| a.to_s <=> b.to_s }.map do |k|
|
||||
opts[k] if opts.has_key?(k)
|
||||
end
|
||||
"#{key.to_s}:#{opts_index.join(':')}"
|
||||
end
|
||||
|
||||
end
|
||||
end
|
||||
@@ -1,29 +1,41 @@
|
||||
require 'monitor'
|
||||
|
||||
module Redistat
|
||||
module Connection
|
||||
|
||||
REQUIRED_SERVER_VERSION = "1.3.10"
|
||||
|
||||
# TODO: Create a ConnectionPool instance object to replace Connection class
|
||||
|
||||
class << self
|
||||
|
||||
# TODO: clean/remove all ref-less connections
|
||||
|
||||
def get(ref = nil)
|
||||
ref ||= :default
|
||||
connections[references[ref]] || create
|
||||
synchronize do
|
||||
connections[references[ref]] || create
|
||||
end
|
||||
end
|
||||
|
||||
def add(conn, ref = nil)
|
||||
ref ||= :default
|
||||
check_redis_version(conn)
|
||||
references[ref] = conn.client.id
|
||||
connections[conn.client.id] = conn
|
||||
synchronize do
|
||||
check_redis_version(conn)
|
||||
references[ref] = conn.client.id
|
||||
connections[conn.client.id] = conn
|
||||
end
|
||||
end
|
||||
|
||||
def create(options = {})
|
||||
#TODO clean/remove all ref-less connections
|
||||
ref = options.delete(:ref) || :default
|
||||
options.reverse_merge!(default_options)
|
||||
conn = (connections[connection_id(options)] ||= connection(options))
|
||||
references[ref] = conn.client.id
|
||||
conn
|
||||
synchronize do
|
||||
options = options.clone
|
||||
ref = options.delete(:ref) || :default
|
||||
options.reverse_merge!(default_options)
|
||||
conn = (connections[connection_id(options)] ||= connection(options))
|
||||
references[ref] = conn.client.id
|
||||
conn
|
||||
end
|
||||
end
|
||||
|
||||
def connections
|
||||
@@ -36,9 +48,12 @@ module Redistat
|
||||
|
||||
private
|
||||
|
||||
def check_redis_version(conn)
|
||||
raise RedisServerIsTooOld if conn.info["redis_version"] < REQUIRED_SERVER_VERSION
|
||||
conn
|
||||
def monitor
|
||||
@monitor ||= Monitor.new
|
||||
end
|
||||
|
||||
def synchronize(&block)
|
||||
monitor.synchronize(&block)
|
||||
end
|
||||
|
||||
def connection(options)
|
||||
@@ -46,10 +61,15 @@ module Redistat
|
||||
end
|
||||
|
||||
def connection_id(options = {})
|
||||
options.reverse_merge!(default_options)
|
||||
options = options.reverse_merge(default_options)
|
||||
"redis://#{options[:host]}:#{options[:port]}/#{options[:db]}"
|
||||
end
|
||||
|
||||
def check_redis_version(conn)
|
||||
raise RedisServerIsTooOld if conn.info["redis_version"] < REQUIRED_SERVER_VERSION
|
||||
conn
|
||||
end
|
||||
|
||||
def default_options
|
||||
{
|
||||
:host => '127.0.0.1',
|
||||
|
||||
51
lib/redistat/mixins/synchronize.rb
Normal file
51
lib/redistat/mixins/synchronize.rb
Normal file
@@ -0,0 +1,51 @@
|
||||
require 'monitor'
|
||||
|
||||
module Redistat
|
||||
module Synchronize
|
||||
|
||||
class << self
|
||||
def included(base)
|
||||
base.send(:include, InstanceMethods)
|
||||
end
|
||||
|
||||
def monitor
|
||||
@monitor ||= Monitor.new
|
||||
end
|
||||
|
||||
def thread_safe
|
||||
monitor.synchronize do
|
||||
@thread_safe ||= false
|
||||
end
|
||||
end
|
||||
|
||||
def thread_safe=(value)
|
||||
monitor.synchronize do
|
||||
@thread_safe = value
|
||||
end
|
||||
end
|
||||
end # << self
|
||||
|
||||
module InstanceMethods
|
||||
def thread_safe
|
||||
Synchronize.thread_safe
|
||||
end
|
||||
|
||||
def thread_safe=(value)
|
||||
Synchronize.thread_safe = value
|
||||
end
|
||||
|
||||
def monitor
|
||||
Synchronize.monitor
|
||||
end
|
||||
|
||||
def synchronize(&block)
|
||||
if thread_safe
|
||||
monitor.synchronize(&block)
|
||||
else
|
||||
block.call
|
||||
end
|
||||
end
|
||||
end # InstanceMethods
|
||||
|
||||
end
|
||||
end
|
||||
@@ -1,3 +1,5 @@
|
||||
require 'active_support/core_ext/hash/indifferent_access'
|
||||
|
||||
module Redistat
|
||||
class Result < HashWithIndifferentAccess
|
||||
|
||||
|
||||
@@ -2,66 +2,81 @@ module Redistat
|
||||
class Summary
|
||||
include Database
|
||||
|
||||
def self.default_options
|
||||
{ :enable_grouping => true,
|
||||
:label_indexing => true,
|
||||
:connection_ref => nil }
|
||||
end
|
||||
|
||||
def self.update_all(key, stats = {}, depth_limit = nil, opts = {})
|
||||
stats ||= {}
|
||||
return nil if stats.size == 0
|
||||
class << self
|
||||
|
||||
options = default_options.merge((opts || {}).reject { |k,v| v.nil? })
|
||||
def default_options
|
||||
{ :enable_grouping => true,
|
||||
:label_indexing => true,
|
||||
:connection_ref => nil }
|
||||
end
|
||||
|
||||
depth_limit ||= key.depth
|
||||
def buffer
|
||||
Redistat.buffer
|
||||
end
|
||||
|
||||
if options[:enable_grouping]
|
||||
stats = inject_group_summaries(stats)
|
||||
key.groups.each do |k|
|
||||
update_key(k, stats, depth_limit, options[:connection_ref])
|
||||
k.update_index if options[:label_indexing]
|
||||
def update_all(key, stats = {}, depth_limit = nil, opts = {})
|
||||
stats ||= {}
|
||||
return if stats.empty?
|
||||
|
||||
options = default_options.merge((opts || {}).reject { |k,v| v.nil? })
|
||||
|
||||
depth_limit ||= key.depth
|
||||
|
||||
update_through_buffer(key, stats, depth_limit, options)
|
||||
end
|
||||
|
||||
def update_through_buffer(*args)
|
||||
update(*args) unless buffer.store(*args)
|
||||
end
|
||||
|
||||
def update(key, stats, depth_limit, opts)
|
||||
if opts[:enable_grouping]
|
||||
stats = inject_group_summaries(stats)
|
||||
key.groups.each do |k|
|
||||
update_key(k, stats, depth_limit, opts[:connection_ref])
|
||||
k.update_index if opts[:label_indexing]
|
||||
end
|
||||
else
|
||||
update_key(key, stats, depth_limit, opts[:connection_ref])
|
||||
end
|
||||
else
|
||||
update_key(key, stats, depth_limit, options[:connection_ref])
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def self.update_key(key, stats, depth_limit, connection_ref)
|
||||
Date::DEPTHS.each do |depth|
|
||||
update(key, stats, depth, connection_ref)
|
||||
break if depth == depth_limit
|
||||
|
||||
private
|
||||
|
||||
def update_key(key, stats, depth_limit, connection_ref)
|
||||
Date::DEPTHS.each do |depth|
|
||||
update_fields(key, stats, depth, connection_ref)
|
||||
break if depth == depth_limit
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def self.update(key, stats, depth, connection_ref = nil)
|
||||
stats.each do |field, value|
|
||||
db(connection_ref).hincrby key.to_s(depth), field, value
|
||||
|
||||
def update_fields(key, stats, depth, connection_ref = nil)
|
||||
stats.each do |field, value|
|
||||
db(connection_ref).hincrby key.to_s(depth), field, value
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def self.inject_group_summaries!(stats)
|
||||
summaries = {}
|
||||
stats.each do |key, value|
|
||||
parts = key.to_s.split(GROUP_SEPARATOR)
|
||||
parts.pop
|
||||
if parts.size > 0
|
||||
sum_parts = []
|
||||
parts.each do |part|
|
||||
sum_parts << part
|
||||
sum_key = sum_parts.join(GROUP_SEPARATOR)
|
||||
(summaries.has_key?(sum_key)) ? summaries[sum_key] += value : summaries[sum_key] = value
|
||||
|
||||
def inject_group_summaries!(stats)
|
||||
summaries = {}
|
||||
stats.each do |key, value|
|
||||
parts = key.to_s.split(GROUP_SEPARATOR)
|
||||
parts.pop
|
||||
if parts.size > 0
|
||||
sum_parts = []
|
||||
parts.each do |part|
|
||||
sum_parts << part
|
||||
sum_key = sum_parts.join(GROUP_SEPARATOR)
|
||||
(summaries.has_key?(sum_key)) ? summaries[sum_key] += value : summaries[sum_key] = value
|
||||
end
|
||||
end
|
||||
end
|
||||
stats.merge_and_incr!(summaries)
|
||||
end
|
||||
stats.merge_and_incr!(summaries)
|
||||
|
||||
def inject_group_summaries(stats)
|
||||
inject_group_summaries!(stats.clone)
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
def self.inject_group_summaries(stats)
|
||||
inject_group_summaries!(stats.clone)
|
||||
end
|
||||
|
||||
end
|
||||
end
|
||||
157
spec/buffer_spec.rb
Normal file
157
spec/buffer_spec.rb
Normal file
@@ -0,0 +1,157 @@
|
||||
require "spec_helper"
|
||||
|
||||
describe Redistat::Buffer do
|
||||
|
||||
before(:each) do
|
||||
@class = Redistat::Buffer
|
||||
@buffer = Redistat::Buffer.instance
|
||||
@key = mock('Key', :to_s => "Scope/label:2011")
|
||||
@stats = {:count => 1, :views => 3}
|
||||
@depth_limit = :hour
|
||||
@opts = {:enable_grouping => true}
|
||||
end
|
||||
|
||||
# let's cleanup after ourselves for the other specs
|
||||
after(:each) do
|
||||
@class.instance_variable_set("@instance", nil)
|
||||
@buffer.size = 0
|
||||
end
|
||||
|
||||
it "should provide instance of itself" do
|
||||
@buffer.should be_a(@class)
|
||||
end
|
||||
|
||||
it "should only buffer if buffer size setting is greater than 1" do
|
||||
@buffer.size.should == 0
|
||||
@buffer.send(:should_buffer?).should be_false
|
||||
@buffer.size = 1
|
||||
@buffer.size.should == 1
|
||||
@buffer.send(:should_buffer?).should be_false
|
||||
@buffer.size = 2
|
||||
@buffer.size.should == 2
|
||||
@buffer.send(:should_buffer?).should be_true
|
||||
end
|
||||
|
||||
it "should only flush buffer if buffer size is greater than or equal to buffer size setting" do
|
||||
@buffer.size.should == 0
|
||||
@buffer.send(:queue).size.should == 0
|
||||
@buffer.send(:should_flush?).should be_false
|
||||
@buffer.send(:queue)[:hello] = 'world'
|
||||
@buffer.send(:incr_count)
|
||||
@buffer.send(:should_flush?).should be_true
|
||||
@buffer.size = 5
|
||||
@buffer.send(:should_flush?).should be_false
|
||||
3.times { |i|
|
||||
@buffer.send(:queue)[i] = i.to_s
|
||||
@buffer.send(:incr_count)
|
||||
}
|
||||
@buffer.send(:should_flush?).should be_false
|
||||
@buffer.send(:queue)[4] = '4'
|
||||
@buffer.send(:incr_count)
|
||||
@buffer.send(:should_flush?).should be_true
|
||||
end
|
||||
|
||||
it "should force flush queue irregardless of result of #should_flush? when #reset_queue is called with true" do
|
||||
@buffer.send(:queue)[:hello] = 'world'
|
||||
@buffer.send(:incr_count)
|
||||
@buffer.send(:should_flush?).should be_true
|
||||
@buffer.size = 2
|
||||
@buffer.send(:should_flush?).should be_false
|
||||
@buffer.send(:reset_queue).should == {}
|
||||
@buffer.instance_variable_get("@count").should == 1
|
||||
@buffer.send(:reset_queue, true).should == {:hello => 'world'}
|
||||
@buffer.instance_variable_get("@count").should == 0
|
||||
end
|
||||
|
||||
it "should #flush_data into Summary.update properly" do
|
||||
# the root level key value doesn't actually matter, but it's something like this...
|
||||
data = {'ScopeName/label/goes/here:2011::true:true' => {
|
||||
:key => @key,
|
||||
:stats => @stats,
|
||||
:depth_limit => @depth_limit,
|
||||
:opts => @opts
|
||||
}}
|
||||
item = data.first[1]
|
||||
Redistat::Summary.should_receive(:update).with(@key, @stats, @depth_limit, @opts)
|
||||
@buffer.send(:flush_data, data)
|
||||
end
|
||||
|
||||
it "should build #buffer_key correctly" do
|
||||
opts = {:enable_grouping => true, :label_indexing => false, :connection_ref => nil}
|
||||
@buffer.send(:buffer_key, @key, opts).should == "#{@key.to_s}::true:false"
|
||||
opts = {:enable_grouping => false, :label_indexing => true, :connection_ref => :omg}
|
||||
@buffer.send(:buffer_key, @key, opts).should == "#{@key.to_s}:omg:false:true"
|
||||
end
|
||||
|
||||
describe "Buffering" do
|
||||
it "should store items on buffer queue" do
|
||||
@buffer.store(@key, @stats, @depth_limit, @opts).should be_false
|
||||
@buffer.size = 5
|
||||
@buffer.store(@key, @stats, @depth_limit, @opts).should be_true
|
||||
@buffer.send(:queue).should have(1).item
|
||||
@buffer.send(:queue)[@buffer.send(:queue).keys.first][:stats][:count].should == 1
|
||||
@buffer.send(:queue)[@buffer.send(:queue).keys.first][:stats][:views].should == 3
|
||||
@buffer.store(@key, @stats, @depth_limit, @opts).should be_true
|
||||
@buffer.send(:queue).should have(1).items
|
||||
@buffer.send(:queue)[@buffer.send(:queue).keys.first][:stats][:count].should == 2
|
||||
@buffer.send(:queue)[@buffer.send(:queue).keys.first][:stats][:views].should == 6
|
||||
end
|
||||
|
||||
it "should flush buffer queue when size is reached" do
|
||||
key = mock('Key', :to_s => "Scope/labelx:2011")
|
||||
@buffer.size = 10
|
||||
Redistat::Summary.should_receive(:update).exactly(2).times.and_return do |k, stats, depth_limit, opts|
|
||||
depth_limit.should == @depth_limit
|
||||
opts.should == @opts
|
||||
if k == @key
|
||||
stats[:count].should == 6
|
||||
stats[:views].should == 18
|
||||
elsif k == key
|
||||
stats[:count].should == 4
|
||||
stats[:views].should == 12
|
||||
end
|
||||
end
|
||||
6.times { @buffer.store(@key, @stats, @depth_limit, @opts).should be_true }
|
||||
4.times { @buffer.store(key, @stats, @depth_limit, @opts).should be_true }
|
||||
end
|
||||
end
|
||||
|
||||
describe "Thread-Safety" do
|
||||
it "should read/write to buffer queue in a thread-safe manner" do
|
||||
|
||||
# This spec passes wether thread safety is enabled or not. In short I need
|
||||
# better specs for thread-safety, and personally a better understanding of
|
||||
# thread-safety in general.
|
||||
Redistat.thread_safe = true
|
||||
|
||||
key = mock('Key', :to_s => "Scope/labelx:2011")
|
||||
@buffer.size = 100
|
||||
|
||||
Redistat::Summary.should_receive(:update).exactly(2).times.and_return do |k, stats, depth_limit, opts|
|
||||
depth_limit.should == @depth_limit
|
||||
opts.should == @opts
|
||||
if k == @key
|
||||
stats[:count].should == 60
|
||||
stats[:views].should == 180
|
||||
elsif k == key
|
||||
stats[:count].should == 40
|
||||
stats[:views].should == 120
|
||||
end
|
||||
end
|
||||
|
||||
threads = []
|
||||
10.times do
|
||||
threads << Thread.new {
|
||||
6.times { @buffer.store(@key, @stats, @depth_limit, @opts).should be_true }
|
||||
4.times { @buffer.store(key, @stats, @depth_limit, @opts).should be_true }
|
||||
}
|
||||
end
|
||||
|
||||
threads.each { |t| t.join }
|
||||
end
|
||||
|
||||
it "should have better specs that actually fail when thread-safety is off"
|
||||
|
||||
end
|
||||
|
||||
end
|
||||
@@ -59,4 +59,9 @@ describe Redistat::Connection do
|
||||
Redistat.connect(:port => 8379, :db => 15)
|
||||
end
|
||||
|
||||
# TODO: Test thread-safety
|
||||
it "should be thread-safe" do
|
||||
pending("need to figure out a way to test thread-safety")
|
||||
end
|
||||
|
||||
end
|
||||
@@ -194,13 +194,13 @@ describe Redistat::Finder do
|
||||
|
||||
def create_example_stats
|
||||
key = Redistat::Key.new(@scope, @label, (first = Time.parse("2010-05-14 13:43")))
|
||||
Redistat::Summary.update(key, @stats, :hour)
|
||||
Redistat::Summary.send(:update_fields, key, @stats, :hour)
|
||||
key = Redistat::Key.new(@scope, @label, Time.parse("2010-05-14 13:53"))
|
||||
Redistat::Summary.update(key, @stats, :hour)
|
||||
Redistat::Summary.send(:update_fields, key, @stats, :hour)
|
||||
key = Redistat::Key.new(@scope, @label, Time.parse("2010-05-14 14:52"))
|
||||
Redistat::Summary.update(key, @stats, :hour)
|
||||
Redistat::Summary.send(:update_fields, key, @stats, :hour)
|
||||
key = Redistat::Key.new(@scope, @label, (last = Time.parse("2010-05-14 15:02")))
|
||||
Redistat::Summary.update(key, @stats, :hour)
|
||||
Redistat::Summary.send(:update_fields, key, @stats, :hour)
|
||||
[first - 1.hour, last + 1.hour]
|
||||
end
|
||||
|
||||
|
||||
@@ -156,4 +156,49 @@ describe Redistat::Model do
|
||||
stats.total[:weight].should == 617
|
||||
end
|
||||
|
||||
describe "Write Buffer" do
|
||||
before(:each) do
|
||||
Redistat.buffer_size = 20
|
||||
end
|
||||
|
||||
after(:each) do
|
||||
Redistat.buffer_size = 0
|
||||
end
|
||||
|
||||
it "should buffer calls in memory before committing to Redis" do
|
||||
14.times do
|
||||
ModelHelper1.store("sheep.black", {:count => 1, :weight => 461}, @time.hours_ago(4))
|
||||
end
|
||||
ModelHelper1.fetch("sheep.black", @time.hours_ago(5), @time.hours_since(1)).total.should == {}
|
||||
|
||||
5.times do
|
||||
ModelHelper1.store("sheep.black", {:count => 1, :weight => 156}, @time)
|
||||
end
|
||||
ModelHelper1.fetch("sheep.black", @time.hours_ago(5), @time.hours_since(1)).total.should == {}
|
||||
|
||||
ModelHelper1.store("sheep.black", {:count => 1, :weight => 156}, @time)
|
||||
stats = ModelHelper1.fetch("sheep.black", @time.hours_ago(5), @time.hours_since(1))
|
||||
stats.total["count"].should == 20
|
||||
stats.total["weight"].should == 7390
|
||||
end
|
||||
|
||||
it "should force flush buffer when #flush(true) is called" do
|
||||
ModelHelper1.fetch("sheep.black", @time.hours_ago(5), @time.hours_since(1)).total.should == {}
|
||||
14.times do
|
||||
ModelHelper1.store("sheep.black", {:count => 1, :weight => 461}, @time.hours_ago(4))
|
||||
end
|
||||
ModelHelper1.fetch("sheep.black", @time.hours_ago(5), @time.hours_since(1)).total.should == {}
|
||||
Redistat.buffer.flush(true)
|
||||
|
||||
stats = ModelHelper1.fetch("sheep.black", @time.hours_ago(5), @time.hours_since(1))
|
||||
stats.total["count"].should == 14
|
||||
stats.total["weight"].should == 6454
|
||||
end
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -13,19 +13,19 @@ describe Redistat::Summary do
|
||||
end
|
||||
|
||||
it "should update a single summary properly" do
|
||||
Redistat::Summary.update(@key, @stats, :hour)
|
||||
Redistat::Summary.send(:update_fields, @key, @stats, :hour)
|
||||
summary = db.hgetall(@key.to_s(:hour))
|
||||
summary.should have(2).items
|
||||
summary["views"].should == "3"
|
||||
summary["visitors"].should == "2"
|
||||
|
||||
Redistat::Summary.update(@key, @stats, :hour)
|
||||
Redistat::Summary.send(:update_fields, @key, @stats, :hour)
|
||||
summary = db.hgetall(@key.to_s(:hour))
|
||||
summary.should have(2).items
|
||||
summary["views"].should == "6"
|
||||
summary["visitors"].should == "4"
|
||||
|
||||
Redistat::Summary.update(@key, {"views" => -4, "visitors" => -3}, :hour)
|
||||
Redistat::Summary.send(:update_fields, @key, {"views" => -4, "visitors" => -3}, :hour)
|
||||
summary = db.hgetall(@key.to_s(:hour))
|
||||
summary.should have(2).items
|
||||
summary["views"].should == "2"
|
||||
@@ -48,7 +48,7 @@ describe Redistat::Summary do
|
||||
|
||||
it "should update summaries even if no label is set" do
|
||||
key = Redistat::Key.new(@scope, nil, @date, {:depth => :day})
|
||||
Redistat::Summary.update(key, @stats, :hour)
|
||||
Redistat::Summary.send(:update_fields, key, @stats, :hour)
|
||||
summary = db.hgetall(key.to_s(:hour))
|
||||
summary.should have(2).items
|
||||
summary["views"].should == "3"
|
||||
|
||||
64
spec/synchronize_spec.rb
Normal file
64
spec/synchronize_spec.rb
Normal file
@@ -0,0 +1,64 @@
|
||||
require "spec_helper"
|
||||
|
||||
describe Redistat::Synchronize do
|
||||
it { should respond_to(:monitor) }
|
||||
it { should respond_to(:thread_safe) }
|
||||
it { should respond_to(:thread_safe=) }
|
||||
|
||||
describe "instanciated class with Redistat::Synchronize included" do
|
||||
subject { SynchronizeSpecHelper.new }
|
||||
it { should respond_to(:monitor) }
|
||||
it { should respond_to(:thread_safe) }
|
||||
it { should respond_to(:thread_safe=) }
|
||||
it { should respond_to(:synchronize) }
|
||||
|
||||
end
|
||||
|
||||
describe "#synchronize method" do
|
||||
|
||||
before(:each) do
|
||||
Redistat::Synchronize.instance_variable_set("@thread_safe", nil)
|
||||
@obj = SynchronizeSpecHelper.new
|
||||
end
|
||||
|
||||
it "should share single Monitor object across all objects" do
|
||||
@obj.monitor.should == Redistat::Synchronize.monitor
|
||||
end
|
||||
|
||||
it "should share thread_safe option across all objects" do
|
||||
obj2 = SynchronizeSpecHelper.new
|
||||
Redistat::Synchronize.thread_safe.should be_false
|
||||
@obj.thread_safe.should be_false
|
||||
obj2.thread_safe.should be_false
|
||||
@obj.thread_safe = true
|
||||
Redistat::Synchronize.thread_safe.should be_true
|
||||
@obj.thread_safe.should be_true
|
||||
obj2.thread_safe.should be_true
|
||||
end
|
||||
|
||||
it "should not synchronize when thread_safe is disabled" do
|
||||
# monitor receives :synchronize twice cause #thread_safe is _always_ synchronized
|
||||
Redistat::Synchronize.monitor.should_receive(:synchronize).twice
|
||||
@obj.thread_safe.should be_false # first #synchronize call
|
||||
@obj.synchronize { 'foo' } # one #synchronize call while checking #thread_safe
|
||||
end
|
||||
|
||||
it "should synchronize when thread_safe is enabled" do
|
||||
Monitor.class_eval {
|
||||
# we're stubbing synchronize to ensure it's being called correctly, but still need it :P
|
||||
alias :real_synchronize :synchronize
|
||||
}
|
||||
Redistat::Synchronize.monitor.should_receive(:synchronize).with.exactly(4).times.and_return { |block|
|
||||
Redistat::Synchronize.monitor.real_synchronize(&block)
|
||||
}
|
||||
@obj.thread_safe.should be_false # first synchronize call
|
||||
Redistat::Synchronize.thread_safe = true # second synchronize call
|
||||
@obj.synchronize { 'foo' } # two synchronize calls, once while checking thread_safe, once to call black
|
||||
end
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
class SynchronizeSpecHelper
|
||||
include Redistat::Synchronize
|
||||
end
|
||||
Reference in New Issue
Block a user