From 0a7abe935ed9cb7f1ee32c1c95e517a2920c04c4 Mon Sep 17 00:00:00 2001 From: Jim Myhrberg Date: Thu, 14 Apr 2011 16:53:29 +0100 Subject: [PATCH 01/13] thread-safe connection handler --- lib/redistat/connection.rb | 48 +++++++++++++++++++++++++++----------- spec/connection_spec.rb | 5 ++++ 2 files changed, 39 insertions(+), 14 deletions(-) diff --git a/lib/redistat/connection.rb b/lib/redistat/connection.rb index 23d9141..10d4c40 100644 --- a/lib/redistat/connection.rb +++ b/lib/redistat/connection.rb @@ -1,29 +1,41 @@ +require 'monitor' + module Redistat module Connection REQUIRED_SERVER_VERSION = "1.3.10" + # TODO: Create a ConnectionPool instance object to replace Connection class + class << self + # TODO: clean/remove all ref-less connections + def get(ref = nil) ref ||= :default - connections[references[ref]] || create + synchronize do + connections[references[ref]] || create + end end def add(conn, ref = nil) ref ||= :default - check_redis_version(conn) - references[ref] = conn.client.id - connections[conn.client.id] = conn + synchronize do + check_redis_version(conn) + references[ref] = conn.client.id + connections[conn.client.id] = conn + end end def create(options = {}) - #TODO clean/remove all ref-less connections - ref = options.delete(:ref) || :default - options.reverse_merge!(default_options) - conn = (connections[connection_id(options)] ||= connection(options)) - references[ref] = conn.client.id - conn + synchronize do + options = options.clone + ref = options.delete(:ref) || :default + options.reverse_merge!(default_options) + conn = (connections[connection_id(options)] ||= connection(options)) + references[ref] = conn.client.id + conn + end end def connections @@ -36,9 +48,12 @@ module Redistat private - def check_redis_version(conn) - raise RedisServerIsTooOld if conn.info["redis_version"] < REQUIRED_SERVER_VERSION - conn + def monitor + @monitor ||= Monitor.new + end + + def synchronize(&block) + monitor.synchronize(&block) end def connection(options) @@ -46,10 +61,15 @@ module Redistat end def connection_id(options = {}) - options.reverse_merge!(default_options) + options = options.reverse_merge(default_options) "redis://#{options[:host]}:#{options[:port]}/#{options[:db]}" end + def check_redis_version(conn) + raise RedisServerIsTooOld if conn.info["redis_version"] < REQUIRED_SERVER_VERSION + conn + end + def default_options { :host => '127.0.0.1', diff --git a/spec/connection_spec.rb b/spec/connection_spec.rb index 04aae55..907a7d1 100644 --- a/spec/connection_spec.rb +++ b/spec/connection_spec.rb @@ -59,4 +59,9 @@ describe Redistat::Connection do Redistat.connect(:port => 8379, :db => 15) end + # TODO: Test thread-safety + it "should be thread-safe" do + pending("need to figure out a way to test thread-safety") + end + end \ No newline at end of file From 5d92c1dbae1397ee138d4cd678bb427465a9566b Mon Sep 17 00:00:00 2001 From: Jim Myhrberg Date: Fri, 15 Apr 2011 14:03:26 +0100 Subject: [PATCH 02/13] created Redistat::Synchronize mixin to help with thread-safety --- lib/redistat.rb | 10 ++++++ lib/redistat/synchronize.rb | 51 +++++++++++++++++++++++++++++ spec/synchronize_spec.rb | 64 +++++++++++++++++++++++++++++++++++++ 3 files changed, 125 insertions(+) create mode 100644 lib/redistat/synchronize.rb create mode 100644 spec/synchronize_spec.rb diff --git a/lib/redistat.rb b/lib/redistat.rb index 11d63db..a2505bd 100644 --- a/lib/redistat.rb +++ b/lib/redistat.rb @@ -3,6 +3,7 @@ require 'rubygems' require 'date' require 'time' require 'digest/sha1' +require 'monitor' # Active Support 2.x or 3.x require 'active_support' @@ -16,6 +17,7 @@ require 'redis' require 'json' require 'redistat/options' +require 'redistat/synchronize' require 'redistat/connection' require 'redistat/database' require 'redistat/collection' @@ -47,6 +49,14 @@ module Redistat class << self + def thread_safe + Synchronize.thread_safe + end + + def thread_safe=(value) + Synchronize.thread_safe = value + end + def connection(ref = nil) Connection.get(ref) end diff --git a/lib/redistat/synchronize.rb b/lib/redistat/synchronize.rb new file mode 100644 index 0000000..2d20b89 --- /dev/null +++ b/lib/redistat/synchronize.rb @@ -0,0 +1,51 @@ +require 'monitor' + +module Redistat + module Synchronize + + class << self + def included(base) + base.send(:include, InstanceMethods) + end + + def monitor + @monitor ||= Monitor.new + end + + def thread_safe + monitor.synchronize do + @thread_safe ||= false + end + end + + def thread_safe=(value) + monitor.synchronize do + @thread_safe = value + end + end + end # << self + + module InstanceMethods + def thread_safe + Synchronize.thread_safe + end + + def thread_safe=(value) + Synchronize.thread_safe = value + end + + def monitor + Synchronize.monitor + end + + def synchronize(&block) + if thread_safe + monitor.synchronize(&block) + else + block.call + end + end + end # InstanceMethods + + end +end \ No newline at end of file diff --git a/spec/synchronize_spec.rb b/spec/synchronize_spec.rb new file mode 100644 index 0000000..56e8f26 --- /dev/null +++ b/spec/synchronize_spec.rb @@ -0,0 +1,64 @@ +require "spec_helper" + +describe Redistat::Synchronize do + it { should respond_to(:monitor) } + it { should respond_to(:thread_safe) } + it { should respond_to(:thread_safe=) } + + describe "instanciated class with Redistat::Synchronize included" do + subject { SynchronizeSpecHelper.new } + it { should respond_to(:monitor) } + it { should respond_to(:thread_safe) } + it { should respond_to(:thread_safe=) } + it { should respond_to(:synchronize) } + + end + + describe "#synchronize method" do + + before(:each) do + Redistat::Synchronize.instance_variable_set("@thread_safe", nil) + @obj = SynchronizeSpecHelper.new + end + + it "should share single Monitor object across all objects" do + @obj.monitor.should == Redistat::Synchronize.monitor + end + + it "should share thread_safe option across all objects" do + obj2 = SynchronizeSpecHelper.new + Redistat::Synchronize.thread_safe.should be_false + @obj.thread_safe.should be_false + obj2.thread_safe.should be_false + @obj.thread_safe = true + Redistat::Synchronize.thread_safe.should be_true + @obj.thread_safe.should be_true + obj2.thread_safe.should be_true + end + + it "should not synchronize when thread_safe is disabled" do + # monitor receives :synchronize twice cause #thread_safe is _always_ synchronized + Redistat::Synchronize.monitor.should_receive(:synchronize).twice + @obj.thread_safe.should be_false # first #synchronize call + @obj.synchronize { 'foo' } # one #synchronize call while checking #thread_safe + end + + it "should synchronize when thread_safe is enabled" do + Monitor.class_eval { + # we're stubbing synchronize to ensure it's being called correctly, but still need it :P + alias :real_synchronize :synchronize + } + Redistat::Synchronize.monitor.should_receive(:synchronize).with.exactly(4).times.and_return { |block| + Redistat::Synchronize.monitor.real_synchronize(&block) + } + @obj.thread_safe.should be_false # first synchronize call + Redistat::Synchronize.thread_safe = true # second synchronize call + @obj.synchronize { 'foo' } # two synchronize calls, once while checking thread_safe, once to call black + end + end + +end + +class SynchronizeSpecHelper + include Redistat::Synchronize +end From a197a04ce89fa8f636215b6324dc238e913c68eb Mon Sep 17 00:00:00 2001 From: Jim Myhrberg Date: Fri, 15 Apr 2011 14:10:51 +0100 Subject: [PATCH 03/13] moved all internal mixin modules to lib/redistat/mixins to tidy up the file structure a bit --- lib/redistat.rb | 8 ++++---- lib/redistat/{ => mixins}/database.rb | 0 lib/redistat/{ => mixins}/date_helper.rb | 0 lib/redistat/{ => mixins}/options.rb | 0 lib/redistat/{ => mixins}/synchronize.rb | 0 5 files changed, 4 insertions(+), 4 deletions(-) rename lib/redistat/{ => mixins}/database.rb (100%) rename lib/redistat/{ => mixins}/date_helper.rb (100%) rename lib/redistat/{ => mixins}/options.rb (100%) rename lib/redistat/{ => mixins}/synchronize.rb (100%) diff --git a/lib/redistat.rb b/lib/redistat.rb index a2505bd..881ec73 100644 --- a/lib/redistat.rb +++ b/lib/redistat.rb @@ -16,13 +16,13 @@ require 'time_ext' require 'redis' require 'json' -require 'redistat/options' -require 'redistat/synchronize' +require 'redistat/mixins/options' +require 'redistat/mixins/synchronize' +require 'redistat/mixins/database' +require 'redistat/mixins/date_helper' require 'redistat/connection' -require 'redistat/database' require 'redistat/collection' require 'redistat/date' -require 'redistat/date_helper' require 'redistat/event' require 'redistat/finder' require 'redistat/key' diff --git a/lib/redistat/database.rb b/lib/redistat/mixins/database.rb similarity index 100% rename from lib/redistat/database.rb rename to lib/redistat/mixins/database.rb diff --git a/lib/redistat/date_helper.rb b/lib/redistat/mixins/date_helper.rb similarity index 100% rename from lib/redistat/date_helper.rb rename to lib/redistat/mixins/date_helper.rb diff --git a/lib/redistat/options.rb b/lib/redistat/mixins/options.rb similarity index 100% rename from lib/redistat/options.rb rename to lib/redistat/mixins/options.rb diff --git a/lib/redistat/synchronize.rb b/lib/redistat/mixins/synchronize.rb similarity index 100% rename from lib/redistat/synchronize.rb rename to lib/redistat/mixins/synchronize.rb From 61231a8b5753c0e18cf14948a70f04549eda86b6 Mon Sep 17 00:00:00 2001 From: Jim Myhrberg Date: Fri, 15 Apr 2011 14:14:17 +0100 Subject: [PATCH 04/13] updated Redistat::Summary to make it easier to plugin the buffer interception code --- lib/redistat/summary.rb | 107 +++++++++++++++++++++------------------- spec/finder_spec.rb | 8 +-- spec/summary_spec.rb | 8 +-- 3 files changed, 65 insertions(+), 58 deletions(-) diff --git a/lib/redistat/summary.rb b/lib/redistat/summary.rb index e089b5a..d54bd18 100644 --- a/lib/redistat/summary.rb +++ b/lib/redistat/summary.rb @@ -2,66 +2,73 @@ module Redistat class Summary include Database - def self.default_options - { :enable_grouping => true, - :label_indexing => true, - :connection_ref => nil } - end - - def self.update_all(key, stats = {}, depth_limit = nil, opts = {}) - stats ||= {} - return nil if stats.size == 0 + class << self - options = default_options.merge((opts || {}).reject { |k,v| v.nil? }) + def default_options + { :enable_grouping => true, + :label_indexing => true, + :connection_ref => nil } + end - depth_limit ||= key.depth + def update_all(key, stats = {}, depth_limit = nil, opts = {}) + stats ||= {} + return nil if stats.size == 0 + + options = default_options.merge((opts || {}).reject { |k,v| v.nil? }) + + depth_limit ||= key.depth + + update(key, stats, depth_limit, options[:enable_grouping], options[:connection_ref], options) + end - if options[:enable_grouping] - stats = inject_group_summaries(stats) - key.groups.each do |k| - update_key(k, stats, depth_limit, options[:connection_ref]) - k.update_index if options[:label_indexing] + private + + def update(key, stats, depth_limit, enable_grouping, connection_ref, options) + if enable_grouping + stats = inject_group_summaries(stats) + key.groups.each do |k| + update_key(k, stats, depth_limit, connection_ref) + k.update_index if options[:label_indexing] + end + else + update_key(key, stats, depth_limit, connection_ref) end - else - update_key(key, stats, depth_limit, options[:connection_ref]) end - end - - private - - def self.update_key(key, stats, depth_limit, connection_ref) - Date::DEPTHS.each do |depth| - update(key, stats, depth, connection_ref) - break if depth == depth_limit + + def update_key(key, stats, depth_limit, connection_ref) + Date::DEPTHS.each do |depth| + update_fields(key, stats, depth, connection_ref) + break if depth == depth_limit + end end - end - - def self.update(key, stats, depth, connection_ref = nil) - stats.each do |field, value| - db(connection_ref).hincrby key.to_s(depth), field, value + + def update_fields(key, stats, depth, connection_ref = nil) + stats.each do |field, value| + db(connection_ref).hincrby key.to_s(depth), field, value + end end - end - - def self.inject_group_summaries!(stats) - summaries = {} - stats.each do |key, value| - parts = key.to_s.split(GROUP_SEPARATOR) - parts.pop - if parts.size > 0 - sum_parts = [] - parts.each do |part| - sum_parts << part - sum_key = sum_parts.join(GROUP_SEPARATOR) - (summaries.has_key?(sum_key)) ? summaries[sum_key] += value : summaries[sum_key] = value + + def inject_group_summaries!(stats) + summaries = {} + stats.each do |key, value| + parts = key.to_s.split(GROUP_SEPARATOR) + parts.pop + if parts.size > 0 + sum_parts = [] + parts.each do |part| + sum_parts << part + sum_key = sum_parts.join(GROUP_SEPARATOR) + (summaries.has_key?(sum_key)) ? summaries[sum_key] += value : summaries[sum_key] = value + end end end + stats.merge_and_incr!(summaries) end - stats.merge_and_incr!(summaries) + + def inject_group_summaries(stats) + inject_group_summaries!(stats.clone) + end + end - - def self.inject_group_summaries(stats) - inject_group_summaries!(stats.clone) - end - end end \ No newline at end of file diff --git a/spec/finder_spec.rb b/spec/finder_spec.rb index 0af91a6..d8748bf 100644 --- a/spec/finder_spec.rb +++ b/spec/finder_spec.rb @@ -194,13 +194,13 @@ describe Redistat::Finder do def create_example_stats key = Redistat::Key.new(@scope, @label, (first = Time.parse("2010-05-14 13:43"))) - Redistat::Summary.update(key, @stats, :hour) + Redistat::Summary.send(:update_fields, key, @stats, :hour) key = Redistat::Key.new(@scope, @label, Time.parse("2010-05-14 13:53")) - Redistat::Summary.update(key, @stats, :hour) + Redistat::Summary.send(:update_fields, key, @stats, :hour) key = Redistat::Key.new(@scope, @label, Time.parse("2010-05-14 14:52")) - Redistat::Summary.update(key, @stats, :hour) + Redistat::Summary.send(:update_fields, key, @stats, :hour) key = Redistat::Key.new(@scope, @label, (last = Time.parse("2010-05-14 15:02"))) - Redistat::Summary.update(key, @stats, :hour) + Redistat::Summary.send(:update_fields, key, @stats, :hour) [first - 1.hour, last + 1.hour] end diff --git a/spec/summary_spec.rb b/spec/summary_spec.rb index ebf33af..fb032c7 100644 --- a/spec/summary_spec.rb +++ b/spec/summary_spec.rb @@ -13,19 +13,19 @@ describe Redistat::Summary do end it "should update a single summary properly" do - Redistat::Summary.update(@key, @stats, :hour) + Redistat::Summary.send(:update_fields, @key, @stats, :hour) summary = db.hgetall(@key.to_s(:hour)) summary.should have(2).items summary["views"].should == "3" summary["visitors"].should == "2" - Redistat::Summary.update(@key, @stats, :hour) + Redistat::Summary.send(:update_fields, @key, @stats, :hour) summary = db.hgetall(@key.to_s(:hour)) summary.should have(2).items summary["views"].should == "6" summary["visitors"].should == "4" - Redistat::Summary.update(@key, {"views" => -4, "visitors" => -3}, :hour) + Redistat::Summary.send(:update_fields, @key, {"views" => -4, "visitors" => -3}, :hour) summary = db.hgetall(@key.to_s(:hour)) summary.should have(2).items summary["views"].should == "2" @@ -48,7 +48,7 @@ describe Redistat::Summary do it "should update summaries even if no label is set" do key = Redistat::Key.new(@scope, nil, @date, {:depth => :day}) - Redistat::Summary.update(key, @stats, :hour) + Redistat::Summary.send(:update_fields, key, @stats, :hour) summary = db.hgetall(key.to_s(:hour)) summary.should have(2).items summary["views"].should == "3" From 3a25fcc788b81d6d3fac666ee8a6440327e0c73d Mon Sep 17 00:00:00 2001 From: Jim Myhrberg Date: Fri, 15 Apr 2011 16:45:21 +0100 Subject: [PATCH 05/13] created Redistat::Buffer, mainly feature complete, still needs a few more specs --- lib/redistat.rb | 13 +++++++ lib/redistat/buffer.rb | 88 ++++++++++++++++++++++++++++++++++++++++++ spec/buffer_spec.rb | 67 ++++++++++++++++++++++++++++++++ 3 files changed, 168 insertions(+) create mode 100644 lib/redistat/buffer.rb create mode 100644 spec/buffer_spec.rb diff --git a/lib/redistat.rb b/lib/redistat.rb index 881ec73..23ca423 100644 --- a/lib/redistat.rb +++ b/lib/redistat.rb @@ -21,6 +21,7 @@ require 'redistat/mixins/synchronize' require 'redistat/mixins/database' require 'redistat/mixins/date_helper' require 'redistat/connection' +require 'redistat/buffer' require 'redistat/collection' require 'redistat/date' require 'redistat/event' @@ -49,6 +50,18 @@ module Redistat class << self + def buffer + Buffer.instance + end + + def buffer_size + buffer.size + end + + def buffer_size=(size) + buffer.size = size + end + def thread_safe Synchronize.thread_safe end diff --git a/lib/redistat/buffer.rb b/lib/redistat/buffer.rb new file mode 100644 index 0000000..13bc3b3 --- /dev/null +++ b/lib/redistat/buffer.rb @@ -0,0 +1,88 @@ +require 'redistat/core_ext/hash' + +module Redistat + class Buffer + include Synchronize + + def self.instance + @instance ||= self.new + end + + def size + @size ||= 0 + end + attr_writer :size + + def queue + @queue ||= {} + end + + def store(key, stats, depth_limit, opts) + return false unless should_buffer? + + to_flush = {} + buffkey = buffer_key(key, opts) + + synchronize do + if !queue.has_key?(buffkey) + queue[buffkey] = { :key => key, + :stats => {}, + :depth_limit => depth_limit, + :opts => opts } + end + + queue[buffkey][:stats].merge_and_incr!(hash) + + # return items to be flushed if buffer size limit has been reached + to_flush = reset_queue + end + + # flush any data that's been cleared from the queue + flush_data(to_flush) + true + end + + def flush(force = false) + to_flush = {} + synchronize do + to_flush = reset_queue(force) + end + flush_data(to_flush) + end + + private + + def should_buffer? + size > 1 # buffer size of 1 would be equal to not using buffer + end + + # should always be called from within a synchronize block + def should_flush? + (!queue.blank? && queue.size >= size) + end + + # returns items to be flushed if buffer size limit has been reached + # should always be called from within a synchronize block + def reset_queue(force = false) + return {} if !force || should_flush? + data = queue + @queue = {} + data + end + + def flush_data(buffer_data) + buffer_data.each do |k, item| + Summary.update(item[:key], item[:stats], item[:depth_limit], item[:connection_ref]) + end + end + + def buffer_key(key, opts) + # depth_limit is not needed as it's evident in key.to_s + opts_index = Summary.default_options.keys.sort { |a,b| a.to_s <=> b.to_s }.map do |k| + opts[k] if opts.has_key?(k) + end + "#{key.to_s}:#{opts_index.join(':')}" + end + + end +end diff --git a/spec/buffer_spec.rb b/spec/buffer_spec.rb new file mode 100644 index 0000000..e202a2f --- /dev/null +++ b/spec/buffer_spec.rb @@ -0,0 +1,67 @@ +require "spec_helper" + +describe Redistat::Buffer do + + before(:each) do + @class = Redistat::Buffer + @buffer = Redistat::Buffer.instance + end + + # let's cleanup after ourselves for the other specs + after(:each) do + @class.instance_variable_set("@instance", nil) + @buffer.size = 0 + end + + it "should provide instance of itself" do + @buffer.should be_a(@class) + end + + it "should only buffer if buffer size setting is greater than 1" do + @buffer.size.should == 0 + @buffer.send(:should_buffer?).should be_false + @buffer.size = 1 + @buffer.size.should == 1 + @buffer.send(:should_buffer?).should be_false + @buffer.size = 2 + @buffer.size.should == 2 + @buffer.send(:should_buffer?).should be_true + end + + it "should only flush buffer if buffer size is greater than or equal to buffer size setting" do + @buffer.size.should == 0 + @buffer.queue.size.should == 0 + @buffer.send(:should_flush?).should be_false + @buffer.queue[:hello] = 'world' + @buffer.send(:should_flush?).should be_true + @buffer.size = 5 + @buffer.send(:should_flush?).should be_false + 3.times { |i| @buffer.queue[i] = i.to_s } + @buffer.send(:should_flush?).should be_false + @buffer.queue[4] = '4' + @buffer.send(:should_flush?).should be_true + end + + it "should force flush queue irregardless of result of #should_flush? when #reset_queue is called with true" do + @buffer.queue[:hello] = 'world' + @buffer.send(:should_flush?).should be_true + @buffer.size = 2 + @buffer.send(:should_flush?).should be_false + @buffer.send(:reset_queue).should == {} + @buffer.send(:reset_queue, true).should == {:hello => 'world'} + end + + it "should flush data into Summary.update properly" do + # the root level key value doesn't actually matter, but it's something like this... + data = {'ScopeName/label/goes/here:2011:nil:true:true' => { + :key => mock("Key"), + :stats => {}, + :depth_limit => :year, + :opts => {:heh => false} + }} + item = data.first[1] + Redistat::Summary.should_receive(:update).with(item[:key], item[:stats], item[:depth_limit], item[:connection_ref]) + @buffer.send(:flush_data, data) + end + +end From 6c63843cd5fdff8b9c0d5ec20ed0b7485bcf9556 Mon Sep 17 00:00:00 2001 From: Jim Myhrberg Date: Fri, 15 Apr 2011 16:45:56 +0100 Subject: [PATCH 06/13] updated Redistat::Summary to incorporate use of write Buffer --- lib/redistat/summary.rb | 36 ++++++++++++++++++++++-------------- 1 file changed, 22 insertions(+), 14 deletions(-) diff --git a/lib/redistat/summary.rb b/lib/redistat/summary.rb index d54bd18..cc0bce3 100644 --- a/lib/redistat/summary.rb +++ b/lib/redistat/summary.rb @@ -10,31 +10,39 @@ module Redistat :connection_ref => nil } end + def buffer + Redistat.buffer + end + def update_all(key, stats = {}, depth_limit = nil, opts = {}) stats ||= {} - return nil if stats.size == 0 + return if stats.empty? options = default_options.merge((opts || {}).reject { |k,v| v.nil? }) depth_limit ||= key.depth - update(key, stats, depth_limit, options[:enable_grouping], options[:connection_ref], options) + update_through_buffer(key, stats, depth_limit, options) + end + + def update_through_buffer(*args) + update(*args) unless buffer.store(*args) + end + + def update(key, stats, depth_limit, opts) + if opts[:enable_grouping] + stats = inject_group_summaries(stats) + key.groups.each do |k| + update_key(k, stats, depth_limit, opts[:connection_ref]) + k.update_index if opts[:label_indexing] + end + else + update_key(key, stats, depth_limit, opts[:connection_ref]) + end end private - def update(key, stats, depth_limit, enable_grouping, connection_ref, options) - if enable_grouping - stats = inject_group_summaries(stats) - key.groups.each do |k| - update_key(k, stats, depth_limit, connection_ref) - k.update_index if options[:label_indexing] - end - else - update_key(key, stats, depth_limit, connection_ref) - end - end - def update_key(key, stats, depth_limit, connection_ref) Date::DEPTHS.each do |depth| update_fields(key, stats, depth, connection_ref) From 2ca5aae4b8277cf78c8a6d51c41512116a89a336 Mon Sep 17 00:00:00 2001 From: Jim Myhrberg Date: Fri, 15 Apr 2011 16:46:12 +0100 Subject: [PATCH 07/13] require required libraries, just cause --- lib/redistat/result.rb | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/redistat/result.rb b/lib/redistat/result.rb index 7a91dff..d1a2de3 100644 --- a/lib/redistat/result.rb +++ b/lib/redistat/result.rb @@ -1,3 +1,5 @@ +require 'active_support/core_ext/hash/indifferent_access' + module Redistat class Result < HashWithIndifferentAccess From 4b06513813c3a5389752aabbbcd3e9c3c08de628 Mon Sep 17 00:00:00 2001 From: Jim Myhrberg Date: Fri, 15 Apr 2011 17:42:48 +0100 Subject: [PATCH 08/13] additional specs for Redistat::Buffer, still a few more needed --- spec/buffer_spec.rb | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/spec/buffer_spec.rb b/spec/buffer_spec.rb index e202a2f..9158464 100644 --- a/spec/buffer_spec.rb +++ b/spec/buffer_spec.rb @@ -51,7 +51,7 @@ describe Redistat::Buffer do @buffer.send(:reset_queue, true).should == {:hello => 'world'} end - it "should flush data into Summary.update properly" do + it "should #flush_data into Summary.update properly" do # the root level key value doesn't actually matter, but it's something like this... data = {'ScopeName/label/goes/here:2011:nil:true:true' => { :key => mock("Key"), @@ -64,4 +64,12 @@ describe Redistat::Buffer do @buffer.send(:flush_data, data) end + it "should build #buffer_key correctly" do + key = mock('Key', :to_s => "Scope/label:2011") + opts = {:enable_grouping => true, :label_indexing => false, :connection_ref => nil} + @buffer.send(:buffer_key, key, opts).should == "#{key.to_s}::true:false" + opts = {:enable_grouping => false, :label_indexing => true, :connection_ref => :omg} + @buffer.send(:buffer_key, key, opts).should == "#{key.to_s}:omg:false:true" + end + end From b129074cd7250f4615fac8873995e81cd5850bd9 Mon Sep 17 00:00:00 2001 From: Jim Myhrberg Date: Mon, 18 Apr 2011 10:00:22 +0100 Subject: [PATCH 09/13] make Buffer#queue a private method as it's never supposed to be modified or read from outside of the Buffer object --- lib/redistat/buffer.rb | 8 ++++---- spec/buffer_spec.rb | 10 +++++----- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/lib/redistat/buffer.rb b/lib/redistat/buffer.rb index 13bc3b3..b78c4e7 100644 --- a/lib/redistat/buffer.rb +++ b/lib/redistat/buffer.rb @@ -13,10 +13,6 @@ module Redistat end attr_writer :size - def queue - @queue ||= {} - end - def store(key, stats, depth_limit, opts) return false unless should_buffer? @@ -52,6 +48,10 @@ module Redistat private + def queue + @queue ||= {} + end + def should_buffer? size > 1 # buffer size of 1 would be equal to not using buffer end diff --git a/spec/buffer_spec.rb b/spec/buffer_spec.rb index 9158464..43bd73a 100644 --- a/spec/buffer_spec.rb +++ b/spec/buffer_spec.rb @@ -30,20 +30,20 @@ describe Redistat::Buffer do it "should only flush buffer if buffer size is greater than or equal to buffer size setting" do @buffer.size.should == 0 - @buffer.queue.size.should == 0 + @buffer.send(:queue).size.should == 0 @buffer.send(:should_flush?).should be_false - @buffer.queue[:hello] = 'world' + @buffer.send(:queue)[:hello] = 'world' @buffer.send(:should_flush?).should be_true @buffer.size = 5 @buffer.send(:should_flush?).should be_false - 3.times { |i| @buffer.queue[i] = i.to_s } + 3.times { |i| @buffer.send(:queue)[i] = i.to_s } @buffer.send(:should_flush?).should be_false - @buffer.queue[4] = '4' + @buffer.send(:queue)[4] = '4' @buffer.send(:should_flush?).should be_true end it "should force flush queue irregardless of result of #should_flush? when #reset_queue is called with true" do - @buffer.queue[:hello] = 'world' + @buffer.send(:queue)[:hello] = 'world' @buffer.send(:should_flush?).should be_true @buffer.size = 2 @buffer.send(:should_flush?).should be_false From eb1d607a6125e4fd705f0a49b66c6646e36ac23c Mon Sep 17 00:00:00 2001 From: Jim Myhrberg Date: Mon, 18 Apr 2011 12:36:19 +0100 Subject: [PATCH 10/13] a number of issues fixed with Buffer class, and specs updated accordingly --- lib/redistat/buffer.rb | 20 ++++++-- spec/buffer_spec.rb | 102 +++++++++++++++++++++++++++++++++++++---- 2 files changed, 108 insertions(+), 14 deletions(-) diff --git a/lib/redistat/buffer.rb b/lib/redistat/buffer.rb index b78c4e7..0caa508 100644 --- a/lib/redistat/buffer.rb +++ b/lib/redistat/buffer.rb @@ -13,6 +13,10 @@ module Redistat end attr_writer :size + def count + @count ||= 0 + end + def store(key, stats, depth_limit, opts) return false unless should_buffer? @@ -27,7 +31,8 @@ module Redistat :opts => opts } end - queue[buffkey][:stats].merge_and_incr!(hash) + queue[buffkey][:stats].merge_and_incr!(stats) + incr_count # return items to be flushed if buffer size limit has been reached to_flush = reset_queue @@ -48,6 +53,12 @@ module Redistat private + # should always be called from within a synchronize block + def incr_count + @count ||= 0 + @count += 1 + end + def queue @queue ||= {} end @@ -58,21 +69,22 @@ module Redistat # should always be called from within a synchronize block def should_flush? - (!queue.blank? && queue.size >= size) + (!queue.blank? && count >= size) end # returns items to be flushed if buffer size limit has been reached # should always be called from within a synchronize block def reset_queue(force = false) - return {} if !force || should_flush? + return {} if !force && !should_flush? data = queue @queue = {} + @count = 0 data end def flush_data(buffer_data) buffer_data.each do |k, item| - Summary.update(item[:key], item[:stats], item[:depth_limit], item[:connection_ref]) + Summary.update(item[:key], item[:stats], item[:depth_limit], item[:opts]) end end diff --git a/spec/buffer_spec.rb b/spec/buffer_spec.rb index 43bd73a..cf6ce0b 100644 --- a/spec/buffer_spec.rb +++ b/spec/buffer_spec.rb @@ -5,6 +5,10 @@ describe Redistat::Buffer do before(:each) do @class = Redistat::Buffer @buffer = Redistat::Buffer.instance + @key = mock('Key', :to_s => "Scope/label:2011") + @stats = {:count => 1, :views => 3} + @depth_limit = :hour + @opts = {:enable_grouping => true} end # let's cleanup after ourselves for the other specs @@ -33,43 +37,121 @@ describe Redistat::Buffer do @buffer.send(:queue).size.should == 0 @buffer.send(:should_flush?).should be_false @buffer.send(:queue)[:hello] = 'world' + @buffer.send(:incr_count) @buffer.send(:should_flush?).should be_true @buffer.size = 5 @buffer.send(:should_flush?).should be_false - 3.times { |i| @buffer.send(:queue)[i] = i.to_s } + 3.times { |i| + @buffer.send(:queue)[i] = i.to_s + @buffer.send(:incr_count) + } @buffer.send(:should_flush?).should be_false @buffer.send(:queue)[4] = '4' + @buffer.send(:incr_count) @buffer.send(:should_flush?).should be_true end it "should force flush queue irregardless of result of #should_flush? when #reset_queue is called with true" do @buffer.send(:queue)[:hello] = 'world' + @buffer.send(:incr_count) @buffer.send(:should_flush?).should be_true @buffer.size = 2 @buffer.send(:should_flush?).should be_false @buffer.send(:reset_queue).should == {} + @buffer.instance_variable_get("@count").should == 1 @buffer.send(:reset_queue, true).should == {:hello => 'world'} + @buffer.instance_variable_get("@count").should == 0 end it "should #flush_data into Summary.update properly" do # the root level key value doesn't actually matter, but it's something like this... - data = {'ScopeName/label/goes/here:2011:nil:true:true' => { - :key => mock("Key"), - :stats => {}, - :depth_limit => :year, - :opts => {:heh => false} + data = {'ScopeName/label/goes/here:2011::true:true' => { + :key => @key, + :stats => @stats, + :depth_limit => @depth_limit, + :opts => @opts }} item = data.first[1] - Redistat::Summary.should_receive(:update).with(item[:key], item[:stats], item[:depth_limit], item[:connection_ref]) + Redistat::Summary.should_receive(:update).with(@key, @stats, @depth_limit, @opts) @buffer.send(:flush_data, data) end it "should build #buffer_key correctly" do - key = mock('Key', :to_s => "Scope/label:2011") opts = {:enable_grouping => true, :label_indexing => false, :connection_ref => nil} - @buffer.send(:buffer_key, key, opts).should == "#{key.to_s}::true:false" + @buffer.send(:buffer_key, @key, opts).should == "#{@key.to_s}::true:false" opts = {:enable_grouping => false, :label_indexing => true, :connection_ref => :omg} - @buffer.send(:buffer_key, key, opts).should == "#{key.to_s}:omg:false:true" + @buffer.send(:buffer_key, @key, opts).should == "#{@key.to_s}:omg:false:true" + end + + describe "Buffering" do + it "should store items on buffer queue" do + @buffer.store(@key, @stats, @depth_limit, @opts).should be_false + @buffer.size = 5 + @buffer.store(@key, @stats, @depth_limit, @opts).should be_true + @buffer.send(:queue).should have(1).item + @buffer.send(:queue)[@buffer.send(:queue).keys.first][:stats][:count].should == 1 + @buffer.send(:queue)[@buffer.send(:queue).keys.first][:stats][:views].should == 3 + @buffer.store(@key, @stats, @depth_limit, @opts).should be_true + @buffer.send(:queue).should have(1).items + @buffer.send(:queue)[@buffer.send(:queue).keys.first][:stats][:count].should == 2 + @buffer.send(:queue)[@buffer.send(:queue).keys.first][:stats][:views].should == 6 + end + + it "should flush buffer queue when size is reached" do + key = mock('Key', :to_s => "Scope/labelx:2011") + @buffer.size = 10 + Redistat::Summary.should_receive(:update).exactly(2).times.and_return do |k, stats, depth_limit, opts| + depth_limit.should == @depth_limit + opts.should == @opts + if k == @key + stats[:count].should == 6 + stats[:views].should == 18 + elsif k == key + stats[:count].should == 4 + stats[:views].should == 12 + end + end + 6.times { @buffer.store(@key, @stats, @depth_limit, @opts).should be_true } + 4.times { @buffer.store(key, @stats, @depth_limit, @opts).should be_true } + end + end + + describe "Thread-Safety" do + it "should read/write to buffer queue in a thread-safe manner" do + + # This spec passes wether thread safety is enabled or not. In short I need + # better specs for thread-safety, and personally a better understanding of + # thread-safety in general. + Redistat.thread_safe = true + + key = mock('Key', :to_s => "Scope/labelx:2011") + @buffer.size = 100 + + Redistat::Summary.should_receive(:update).exactly(2).times.and_return do |k, stats, depth_limit, opts| + depth_limit.should == @depth_limit + opts.should == @opts + if k == @key + stats[:count].should == 60 + stats[:views].should == 180 + elsif k == key + stats[:count].should == 40 + stats[:views].should == 120 + end + end + + threads = [] + 10.times do + threads << Thread.new { + 6.times { @buffer.store(@key, @stats, @depth_limit, @opts).should be_true } + 4.times { @buffer.store(key, @stats, @depth_limit, @opts).should be_true } + } + end + + threads.each { |t| t.join } + end + + it "should have better specs that actually fail when thread-safety is off" + end end From 7b5c3089609f59a12f712def583f05bbcb4b4bd3 Mon Sep 17 00:00:00 2001 From: Jim Myhrberg Date: Mon, 18 Apr 2011 12:36:43 +0100 Subject: [PATCH 11/13] model spec updated to test write buffer --- spec/model_spec.rb | 45 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/spec/model_spec.rb b/spec/model_spec.rb index 347bfd9..7d8404f 100644 --- a/spec/model_spec.rb +++ b/spec/model_spec.rb @@ -156,4 +156,49 @@ describe Redistat::Model do stats.total[:weight].should == 617 end + describe "Write Buffer" do + before(:each) do + Redistat.buffer_size = 20 + end + + after(:each) do + Redistat.buffer_size = 0 + end + + it "should buffer calls in memory before committing to Redis" do + 14.times do + ModelHelper1.store("sheep.black", {:count => 1, :weight => 461}, @time.hours_ago(4)) + end + ModelHelper1.fetch("sheep.black", @time.hours_ago(5), @time.hours_since(1)).total.should == {} + + 5.times do + ModelHelper1.store("sheep.black", {:count => 1, :weight => 156}, @time) + end + ModelHelper1.fetch("sheep.black", @time.hours_ago(5), @time.hours_since(1)).total.should == {} + + ModelHelper1.store("sheep.black", {:count => 1, :weight => 156}, @time) + stats = ModelHelper1.fetch("sheep.black", @time.hours_ago(5), @time.hours_since(1)) + stats.total["count"].should == 20 + stats.total["weight"].should == 7390 + end + + it "should force flush buffer when #flush(true) is called" do + ModelHelper1.fetch("sheep.black", @time.hours_ago(5), @time.hours_since(1)).total.should == {} + 14.times do + ModelHelper1.store("sheep.black", {:count => 1, :weight => 461}, @time.hours_ago(4)) + end + ModelHelper1.fetch("sheep.black", @time.hours_ago(5), @time.hours_since(1)).total.should == {} + Redistat.buffer.flush(true) + + stats = ModelHelper1.fetch("sheep.black", @time.hours_ago(5), @time.hours_since(1)) + stats.total["count"].should == 14 + stats.total["weight"].should == 6454 + end + end + end + + + + + From b13da6f3327caf114a9941fd89d79b61a06b6bc9 Mon Sep 17 00:00:00 2001 From: Jim Myhrberg Date: Mon, 18 Apr 2011 12:37:24 +0100 Subject: [PATCH 12/13] create a flush buffer #at_exit callback to ensure any buffered messages are flushed to Redis on process exit --- lib/redistat.rb | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/lib/redistat.rb b/lib/redistat.rb index 23ca423..4ff2389 100644 --- a/lib/redistat.rb +++ b/lib/redistat.rb @@ -20,6 +20,7 @@ require 'redistat/mixins/options' require 'redistat/mixins/synchronize' require 'redistat/mixins/database' require 'redistat/mixins/date_helper' + require 'redistat/connection' require 'redistat/buffer' require 'redistat/collection' @@ -36,6 +37,7 @@ require 'redistat/version' require 'redistat/core_ext' + module Redistat KEY_NEXT_ID = ".next_id" @@ -91,3 +93,9 @@ module Redistat end end + + +# ensure buffer is flushed on program exit +Kernel.at_exit do + Redistat.buffer.flush(true) +end From b2c31a0e87b89a24892bd4e7f15e9cfb1d9bebd2 Mon Sep 17 00:00:00 2001 From: Jim Myhrberg Date: Mon, 18 Apr 2011 12:40:49 +0100 Subject: [PATCH 13/13] ensure buffer size value is read/written to in a thread-safe manner --- lib/redistat/buffer.rb | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/lib/redistat/buffer.rb b/lib/redistat/buffer.rb index 0caa508..66e4f3c 100644 --- a/lib/redistat/buffer.rb +++ b/lib/redistat/buffer.rb @@ -9,9 +9,16 @@ module Redistat end def size - @size ||= 0 + synchronize do + @size ||= 0 + end + end + + def size=(value) + synchronize do + @size = value + end end - attr_writer :size def count @count ||= 0