From 3a25fcc788b81d6d3fac666ee8a6440327e0c73d Mon Sep 17 00:00:00 2001 From: Jim Myhrberg Date: Fri, 15 Apr 2011 16:45:21 +0100 Subject: [PATCH] created Redistat::Buffer, mainly feature complete, still needs a few more specs --- lib/redistat.rb | 13 +++++++ lib/redistat/buffer.rb | 88 ++++++++++++++++++++++++++++++++++++++++++ spec/buffer_spec.rb | 67 ++++++++++++++++++++++++++++++++ 3 files changed, 168 insertions(+) create mode 100644 lib/redistat/buffer.rb create mode 100644 spec/buffer_spec.rb diff --git a/lib/redistat.rb b/lib/redistat.rb index 881ec73..23ca423 100644 --- a/lib/redistat.rb +++ b/lib/redistat.rb @@ -21,6 +21,7 @@ require 'redistat/mixins/synchronize' require 'redistat/mixins/database' require 'redistat/mixins/date_helper' require 'redistat/connection' +require 'redistat/buffer' require 'redistat/collection' require 'redistat/date' require 'redistat/event' @@ -49,6 +50,18 @@ module Redistat class << self + def buffer + Buffer.instance + end + + def buffer_size + buffer.size + end + + def buffer_size=(size) + buffer.size = size + end + def thread_safe Synchronize.thread_safe end diff --git a/lib/redistat/buffer.rb b/lib/redistat/buffer.rb new file mode 100644 index 0000000..13bc3b3 --- /dev/null +++ b/lib/redistat/buffer.rb @@ -0,0 +1,88 @@ +require 'redistat/core_ext/hash' + +module Redistat + class Buffer + include Synchronize + + def self.instance + @instance ||= self.new + end + + def size + @size ||= 0 + end + attr_writer :size + + def queue + @queue ||= {} + end + + def store(key, stats, depth_limit, opts) + return false unless should_buffer? + + to_flush = {} + buffkey = buffer_key(key, opts) + + synchronize do + if !queue.has_key?(buffkey) + queue[buffkey] = { :key => key, + :stats => {}, + :depth_limit => depth_limit, + :opts => opts } + end + + queue[buffkey][:stats].merge_and_incr!(hash) + + # return items to be flushed if buffer size limit has been reached + to_flush = reset_queue + end + + # flush any data that's been cleared from the queue + flush_data(to_flush) + true + end + + def flush(force = false) + to_flush = {} + synchronize do + to_flush = reset_queue(force) + end + flush_data(to_flush) + end + + private + + def should_buffer? + size > 1 # buffer size of 1 would be equal to not using buffer + end + + # should always be called from within a synchronize block + def should_flush? + (!queue.blank? && queue.size >= size) + end + + # returns items to be flushed if buffer size limit has been reached + # should always be called from within a synchronize block + def reset_queue(force = false) + return {} if !force || should_flush? + data = queue + @queue = {} + data + end + + def flush_data(buffer_data) + buffer_data.each do |k, item| + Summary.update(item[:key], item[:stats], item[:depth_limit], item[:connection_ref]) + end + end + + def buffer_key(key, opts) + # depth_limit is not needed as it's evident in key.to_s + opts_index = Summary.default_options.keys.sort { |a,b| a.to_s <=> b.to_s }.map do |k| + opts[k] if opts.has_key?(k) + end + "#{key.to_s}:#{opts_index.join(':')}" + end + + end +end diff --git a/spec/buffer_spec.rb b/spec/buffer_spec.rb new file mode 100644 index 0000000..e202a2f --- /dev/null +++ b/spec/buffer_spec.rb @@ -0,0 +1,67 @@ +require "spec_helper" + +describe Redistat::Buffer do + + before(:each) do + @class = Redistat::Buffer + @buffer = Redistat::Buffer.instance + end + + # let's cleanup after ourselves for the other specs + after(:each) do + @class.instance_variable_set("@instance", nil) + @buffer.size = 0 + end + + it "should provide instance of itself" do + @buffer.should be_a(@class) + end + + it "should only buffer if buffer size setting is greater than 1" do + @buffer.size.should == 0 + @buffer.send(:should_buffer?).should be_false + @buffer.size = 1 + @buffer.size.should == 1 + @buffer.send(:should_buffer?).should be_false + @buffer.size = 2 + @buffer.size.should == 2 + @buffer.send(:should_buffer?).should be_true + end + + it "should only flush buffer if buffer size is greater than or equal to buffer size setting" do + @buffer.size.should == 0 + @buffer.queue.size.should == 0 + @buffer.send(:should_flush?).should be_false + @buffer.queue[:hello] = 'world' + @buffer.send(:should_flush?).should be_true + @buffer.size = 5 + @buffer.send(:should_flush?).should be_false + 3.times { |i| @buffer.queue[i] = i.to_s } + @buffer.send(:should_flush?).should be_false + @buffer.queue[4] = '4' + @buffer.send(:should_flush?).should be_true + end + + it "should force flush queue irregardless of result of #should_flush? when #reset_queue is called with true" do + @buffer.queue[:hello] = 'world' + @buffer.send(:should_flush?).should be_true + @buffer.size = 2 + @buffer.send(:should_flush?).should be_false + @buffer.send(:reset_queue).should == {} + @buffer.send(:reset_queue, true).should == {:hello => 'world'} + end + + it "should flush data into Summary.update properly" do + # the root level key value doesn't actually matter, but it's something like this... + data = {'ScopeName/label/goes/here:2011:nil:true:true' => { + :key => mock("Key"), + :stats => {}, + :depth_limit => :year, + :opts => {:heh => false} + }} + item = data.first[1] + Redistat::Summary.should_receive(:update).with(item[:key], item[:stats], item[:depth_limit], item[:connection_ref]) + @buffer.send(:flush_data, data) + end + +end