mirror of
https://github.com/jimeh/redistat.git
synced 2026-02-18 21:06:40 +00:00
a number of issues fixed with Buffer class, and specs updated accordingly
This commit is contained in:
@@ -13,6 +13,10 @@ module Redistat
|
||||
end
|
||||
attr_writer :size
|
||||
|
||||
def count
|
||||
@count ||= 0
|
||||
end
|
||||
|
||||
def store(key, stats, depth_limit, opts)
|
||||
return false unless should_buffer?
|
||||
|
||||
@@ -27,7 +31,8 @@ module Redistat
|
||||
:opts => opts }
|
||||
end
|
||||
|
||||
queue[buffkey][:stats].merge_and_incr!(hash)
|
||||
queue[buffkey][:stats].merge_and_incr!(stats)
|
||||
incr_count
|
||||
|
||||
# return items to be flushed if buffer size limit has been reached
|
||||
to_flush = reset_queue
|
||||
@@ -48,6 +53,12 @@ module Redistat
|
||||
|
||||
private
|
||||
|
||||
# should always be called from within a synchronize block
|
||||
def incr_count
|
||||
@count ||= 0
|
||||
@count += 1
|
||||
end
|
||||
|
||||
def queue
|
||||
@queue ||= {}
|
||||
end
|
||||
@@ -58,21 +69,22 @@ module Redistat
|
||||
|
||||
# should always be called from within a synchronize block
|
||||
def should_flush?
|
||||
(!queue.blank? && queue.size >= size)
|
||||
(!queue.blank? && count >= size)
|
||||
end
|
||||
|
||||
# returns items to be flushed if buffer size limit has been reached
|
||||
# should always be called from within a synchronize block
|
||||
def reset_queue(force = false)
|
||||
return {} if !force || should_flush?
|
||||
return {} if !force && !should_flush?
|
||||
data = queue
|
||||
@queue = {}
|
||||
@count = 0
|
||||
data
|
||||
end
|
||||
|
||||
def flush_data(buffer_data)
|
||||
buffer_data.each do |k, item|
|
||||
Summary.update(item[:key], item[:stats], item[:depth_limit], item[:connection_ref])
|
||||
Summary.update(item[:key], item[:stats], item[:depth_limit], item[:opts])
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
@@ -5,6 +5,10 @@ describe Redistat::Buffer do
|
||||
before(:each) do
|
||||
@class = Redistat::Buffer
|
||||
@buffer = Redistat::Buffer.instance
|
||||
@key = mock('Key', :to_s => "Scope/label:2011")
|
||||
@stats = {:count => 1, :views => 3}
|
||||
@depth_limit = :hour
|
||||
@opts = {:enable_grouping => true}
|
||||
end
|
||||
|
||||
# let's cleanup after ourselves for the other specs
|
||||
@@ -33,43 +37,121 @@ describe Redistat::Buffer do
|
||||
@buffer.send(:queue).size.should == 0
|
||||
@buffer.send(:should_flush?).should be_false
|
||||
@buffer.send(:queue)[:hello] = 'world'
|
||||
@buffer.send(:incr_count)
|
||||
@buffer.send(:should_flush?).should be_true
|
||||
@buffer.size = 5
|
||||
@buffer.send(:should_flush?).should be_false
|
||||
3.times { |i| @buffer.send(:queue)[i] = i.to_s }
|
||||
3.times { |i|
|
||||
@buffer.send(:queue)[i] = i.to_s
|
||||
@buffer.send(:incr_count)
|
||||
}
|
||||
@buffer.send(:should_flush?).should be_false
|
||||
@buffer.send(:queue)[4] = '4'
|
||||
@buffer.send(:incr_count)
|
||||
@buffer.send(:should_flush?).should be_true
|
||||
end
|
||||
|
||||
it "should force flush queue irregardless of result of #should_flush? when #reset_queue is called with true" do
|
||||
@buffer.send(:queue)[:hello] = 'world'
|
||||
@buffer.send(:incr_count)
|
||||
@buffer.send(:should_flush?).should be_true
|
||||
@buffer.size = 2
|
||||
@buffer.send(:should_flush?).should be_false
|
||||
@buffer.send(:reset_queue).should == {}
|
||||
@buffer.instance_variable_get("@count").should == 1
|
||||
@buffer.send(:reset_queue, true).should == {:hello => 'world'}
|
||||
@buffer.instance_variable_get("@count").should == 0
|
||||
end
|
||||
|
||||
it "should #flush_data into Summary.update properly" do
|
||||
# the root level key value doesn't actually matter, but it's something like this...
|
||||
data = {'ScopeName/label/goes/here:2011:nil:true:true' => {
|
||||
:key => mock("Key"),
|
||||
:stats => {},
|
||||
:depth_limit => :year,
|
||||
:opts => {:heh => false}
|
||||
data = {'ScopeName/label/goes/here:2011::true:true' => {
|
||||
:key => @key,
|
||||
:stats => @stats,
|
||||
:depth_limit => @depth_limit,
|
||||
:opts => @opts
|
||||
}}
|
||||
item = data.first[1]
|
||||
Redistat::Summary.should_receive(:update).with(item[:key], item[:stats], item[:depth_limit], item[:connection_ref])
|
||||
Redistat::Summary.should_receive(:update).with(@key, @stats, @depth_limit, @opts)
|
||||
@buffer.send(:flush_data, data)
|
||||
end
|
||||
|
||||
it "should build #buffer_key correctly" do
|
||||
key = mock('Key', :to_s => "Scope/label:2011")
|
||||
opts = {:enable_grouping => true, :label_indexing => false, :connection_ref => nil}
|
||||
@buffer.send(:buffer_key, key, opts).should == "#{key.to_s}::true:false"
|
||||
@buffer.send(:buffer_key, @key, opts).should == "#{@key.to_s}::true:false"
|
||||
opts = {:enable_grouping => false, :label_indexing => true, :connection_ref => :omg}
|
||||
@buffer.send(:buffer_key, key, opts).should == "#{key.to_s}:omg:false:true"
|
||||
@buffer.send(:buffer_key, @key, opts).should == "#{@key.to_s}:omg:false:true"
|
||||
end
|
||||
|
||||
describe "Buffering" do
|
||||
it "should store items on buffer queue" do
|
||||
@buffer.store(@key, @stats, @depth_limit, @opts).should be_false
|
||||
@buffer.size = 5
|
||||
@buffer.store(@key, @stats, @depth_limit, @opts).should be_true
|
||||
@buffer.send(:queue).should have(1).item
|
||||
@buffer.send(:queue)[@buffer.send(:queue).keys.first][:stats][:count].should == 1
|
||||
@buffer.send(:queue)[@buffer.send(:queue).keys.first][:stats][:views].should == 3
|
||||
@buffer.store(@key, @stats, @depth_limit, @opts).should be_true
|
||||
@buffer.send(:queue).should have(1).items
|
||||
@buffer.send(:queue)[@buffer.send(:queue).keys.first][:stats][:count].should == 2
|
||||
@buffer.send(:queue)[@buffer.send(:queue).keys.first][:stats][:views].should == 6
|
||||
end
|
||||
|
||||
it "should flush buffer queue when size is reached" do
|
||||
key = mock('Key', :to_s => "Scope/labelx:2011")
|
||||
@buffer.size = 10
|
||||
Redistat::Summary.should_receive(:update).exactly(2).times.and_return do |k, stats, depth_limit, opts|
|
||||
depth_limit.should == @depth_limit
|
||||
opts.should == @opts
|
||||
if k == @key
|
||||
stats[:count].should == 6
|
||||
stats[:views].should == 18
|
||||
elsif k == key
|
||||
stats[:count].should == 4
|
||||
stats[:views].should == 12
|
||||
end
|
||||
end
|
||||
6.times { @buffer.store(@key, @stats, @depth_limit, @opts).should be_true }
|
||||
4.times { @buffer.store(key, @stats, @depth_limit, @opts).should be_true }
|
||||
end
|
||||
end
|
||||
|
||||
describe "Thread-Safety" do
|
||||
it "should read/write to buffer queue in a thread-safe manner" do
|
||||
|
||||
# This spec passes wether thread safety is enabled or not. In short I need
|
||||
# better specs for thread-safety, and personally a better understanding of
|
||||
# thread-safety in general.
|
||||
Redistat.thread_safe = true
|
||||
|
||||
key = mock('Key', :to_s => "Scope/labelx:2011")
|
||||
@buffer.size = 100
|
||||
|
||||
Redistat::Summary.should_receive(:update).exactly(2).times.and_return do |k, stats, depth_limit, opts|
|
||||
depth_limit.should == @depth_limit
|
||||
opts.should == @opts
|
||||
if k == @key
|
||||
stats[:count].should == 60
|
||||
stats[:views].should == 180
|
||||
elsif k == key
|
||||
stats[:count].should == 40
|
||||
stats[:views].should == 120
|
||||
end
|
||||
end
|
||||
|
||||
threads = []
|
||||
10.times do
|
||||
threads << Thread.new {
|
||||
6.times { @buffer.store(@key, @stats, @depth_limit, @opts).should be_true }
|
||||
4.times { @buffer.store(key, @stats, @depth_limit, @opts).should be_true }
|
||||
}
|
||||
end
|
||||
|
||||
threads.each { |t| t.join }
|
||||
end
|
||||
|
||||
it "should have better specs that actually fail when thread-safety is off"
|
||||
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
Reference in New Issue
Block a user