diff --git a/lib/redistat/buffer.rb b/lib/redistat/buffer.rb index b78c4e7..0caa508 100644 --- a/lib/redistat/buffer.rb +++ b/lib/redistat/buffer.rb @@ -13,6 +13,10 @@ module Redistat end attr_writer :size + def count + @count ||= 0 + end + def store(key, stats, depth_limit, opts) return false unless should_buffer? @@ -27,7 +31,8 @@ module Redistat :opts => opts } end - queue[buffkey][:stats].merge_and_incr!(hash) + queue[buffkey][:stats].merge_and_incr!(stats) + incr_count # return items to be flushed if buffer size limit has been reached to_flush = reset_queue @@ -48,6 +53,12 @@ module Redistat private + # should always be called from within a synchronize block + def incr_count + @count ||= 0 + @count += 1 + end + def queue @queue ||= {} end @@ -58,21 +69,22 @@ module Redistat # should always be called from within a synchronize block def should_flush? - (!queue.blank? && queue.size >= size) + (!queue.blank? && count >= size) end # returns items to be flushed if buffer size limit has been reached # should always be called from within a synchronize block def reset_queue(force = false) - return {} if !force || should_flush? + return {} if !force && !should_flush? data = queue @queue = {} + @count = 0 data end def flush_data(buffer_data) buffer_data.each do |k, item| - Summary.update(item[:key], item[:stats], item[:depth_limit], item[:connection_ref]) + Summary.update(item[:key], item[:stats], item[:depth_limit], item[:opts]) end end diff --git a/spec/buffer_spec.rb b/spec/buffer_spec.rb index 43bd73a..cf6ce0b 100644 --- a/spec/buffer_spec.rb +++ b/spec/buffer_spec.rb @@ -5,6 +5,10 @@ describe Redistat::Buffer do before(:each) do @class = Redistat::Buffer @buffer = Redistat::Buffer.instance + @key = mock('Key', :to_s => "Scope/label:2011") + @stats = {:count => 1, :views => 3} + @depth_limit = :hour + @opts = {:enable_grouping => true} end # let's cleanup after ourselves for the other specs @@ -33,43 +37,121 @@ describe Redistat::Buffer do @buffer.send(:queue).size.should == 0 @buffer.send(:should_flush?).should be_false @buffer.send(:queue)[:hello] = 'world' + @buffer.send(:incr_count) @buffer.send(:should_flush?).should be_true @buffer.size = 5 @buffer.send(:should_flush?).should be_false - 3.times { |i| @buffer.send(:queue)[i] = i.to_s } + 3.times { |i| + @buffer.send(:queue)[i] = i.to_s + @buffer.send(:incr_count) + } @buffer.send(:should_flush?).should be_false @buffer.send(:queue)[4] = '4' + @buffer.send(:incr_count) @buffer.send(:should_flush?).should be_true end it "should force flush queue irregardless of result of #should_flush? when #reset_queue is called with true" do @buffer.send(:queue)[:hello] = 'world' + @buffer.send(:incr_count) @buffer.send(:should_flush?).should be_true @buffer.size = 2 @buffer.send(:should_flush?).should be_false @buffer.send(:reset_queue).should == {} + @buffer.instance_variable_get("@count").should == 1 @buffer.send(:reset_queue, true).should == {:hello => 'world'} + @buffer.instance_variable_get("@count").should == 0 end it "should #flush_data into Summary.update properly" do # the root level key value doesn't actually matter, but it's something like this... - data = {'ScopeName/label/goes/here:2011:nil:true:true' => { - :key => mock("Key"), - :stats => {}, - :depth_limit => :year, - :opts => {:heh => false} + data = {'ScopeName/label/goes/here:2011::true:true' => { + :key => @key, + :stats => @stats, + :depth_limit => @depth_limit, + :opts => @opts }} item = data.first[1] - Redistat::Summary.should_receive(:update).with(item[:key], item[:stats], item[:depth_limit], item[:connection_ref]) + Redistat::Summary.should_receive(:update).with(@key, @stats, @depth_limit, @opts) @buffer.send(:flush_data, data) end it "should build #buffer_key correctly" do - key = mock('Key', :to_s => "Scope/label:2011") opts = {:enable_grouping => true, :label_indexing => false, :connection_ref => nil} - @buffer.send(:buffer_key, key, opts).should == "#{key.to_s}::true:false" + @buffer.send(:buffer_key, @key, opts).should == "#{@key.to_s}::true:false" opts = {:enable_grouping => false, :label_indexing => true, :connection_ref => :omg} - @buffer.send(:buffer_key, key, opts).should == "#{key.to_s}:omg:false:true" + @buffer.send(:buffer_key, @key, opts).should == "#{@key.to_s}:omg:false:true" + end + + describe "Buffering" do + it "should store items on buffer queue" do + @buffer.store(@key, @stats, @depth_limit, @opts).should be_false + @buffer.size = 5 + @buffer.store(@key, @stats, @depth_limit, @opts).should be_true + @buffer.send(:queue).should have(1).item + @buffer.send(:queue)[@buffer.send(:queue).keys.first][:stats][:count].should == 1 + @buffer.send(:queue)[@buffer.send(:queue).keys.first][:stats][:views].should == 3 + @buffer.store(@key, @stats, @depth_limit, @opts).should be_true + @buffer.send(:queue).should have(1).items + @buffer.send(:queue)[@buffer.send(:queue).keys.first][:stats][:count].should == 2 + @buffer.send(:queue)[@buffer.send(:queue).keys.first][:stats][:views].should == 6 + end + + it "should flush buffer queue when size is reached" do + key = mock('Key', :to_s => "Scope/labelx:2011") + @buffer.size = 10 + Redistat::Summary.should_receive(:update).exactly(2).times.and_return do |k, stats, depth_limit, opts| + depth_limit.should == @depth_limit + opts.should == @opts + if k == @key + stats[:count].should == 6 + stats[:views].should == 18 + elsif k == key + stats[:count].should == 4 + stats[:views].should == 12 + end + end + 6.times { @buffer.store(@key, @stats, @depth_limit, @opts).should be_true } + 4.times { @buffer.store(key, @stats, @depth_limit, @opts).should be_true } + end + end + + describe "Thread-Safety" do + it "should read/write to buffer queue in a thread-safe manner" do + + # This spec passes wether thread safety is enabled or not. In short I need + # better specs for thread-safety, and personally a better understanding of + # thread-safety in general. + Redistat.thread_safe = true + + key = mock('Key', :to_s => "Scope/labelx:2011") + @buffer.size = 100 + + Redistat::Summary.should_receive(:update).exactly(2).times.and_return do |k, stats, depth_limit, opts| + depth_limit.should == @depth_limit + opts.should == @opts + if k == @key + stats[:count].should == 60 + stats[:views].should == 180 + elsif k == key + stats[:count].should == 40 + stats[:views].should == 120 + end + end + + threads = [] + 10.times do + threads << Thread.new { + 6.times { @buffer.store(@key, @stats, @depth_limit, @opts).should be_true } + 4.times { @buffer.store(key, @stats, @depth_limit, @opts).should be_true } + } + end + + threads.each { |t| t.join } + end + + it "should have better specs that actually fail when thread-safety is off" + end end