From f155f6db056257faf454e349c893825de57dd37a Mon Sep 17 00:00:00 2001 From: Jim Myhrberg Date: Thu, 14 Apr 2011 16:50:43 +0100 Subject: [PATCH 01/17] cleaned up Connection spec a bit --- spec/connection_spec.rb | 33 +++++++++++++++++---------------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/spec/connection_spec.rb b/spec/connection_spec.rb index e5c1c24..04aae55 100644 --- a/spec/connection_spec.rb +++ b/spec/connection_spec.rb @@ -3,33 +3,34 @@ include Redistat describe Redistat::Connection do + before(:each) do + @redis = Redistat.redis + end + it "should have a valid Redis client instance" do Redistat.redis.should_not be_nil end it "should have initialized custom testing connection" do - redis = Redistat.redis - redis.client.host.should == '127.0.0.1' - redis.client.port.should == 8379 - redis.client.db.should == 15 + @redis.client.host.should == '127.0.0.1' + @redis.client.port.should == 8379 + @redis.client.db.should == 15 end it "should be able to set and get data" do - redis = Redistat.redis - redis.set("hello", "world") - redis.get("hello").should == "world" - redis.del("hello").should be_true + @redis.set("hello", "world") + @redis.get("hello").should == "world" + @redis.del("hello").should be_true end it "should be able to store hashes to Redis" do - redis = Redistat.redis - redis.hset("hash", "field", "1") - redis.hget("hash", "field").should == "1" - redis.hincrby("hash", "field", 1) - redis.hget("hash", "field").should == "2" - redis.hincrby("hash", "field", -1) - redis.hget("hash", "field").should == "1" - redis.del("hash") + @redis.hset("hash", "field", "1") + @redis.hget("hash", "field").should == "1" + @redis.hincrby("hash", "field", 1) + @redis.hget("hash", "field").should == "2" + @redis.hincrby("hash", "field", -1) + @redis.hget("hash", "field").should == "1" + @redis.del("hash") end it "should be accessible from Redistat module" do From 0a7abe935ed9cb7f1ee32c1c95e517a2920c04c4 Mon Sep 17 00:00:00 2001 From: Jim Myhrberg Date: Thu, 14 Apr 2011 16:53:29 +0100 Subject: [PATCH 02/17] thread-safe connection handler --- lib/redistat/connection.rb | 48 +++++++++++++++++++++++++++----------- spec/connection_spec.rb | 5 ++++ 2 files changed, 39 insertions(+), 14 deletions(-) diff --git a/lib/redistat/connection.rb b/lib/redistat/connection.rb index 23d9141..10d4c40 100644 --- a/lib/redistat/connection.rb +++ b/lib/redistat/connection.rb @@ -1,29 +1,41 @@ +require 'monitor' + module Redistat module Connection REQUIRED_SERVER_VERSION = "1.3.10" + # TODO: Create a ConnectionPool instance object to replace Connection class + class << self + # TODO: clean/remove all ref-less connections + def get(ref = nil) ref ||= :default - connections[references[ref]] || create + synchronize do + connections[references[ref]] || create + end end def add(conn, ref = nil) ref ||= :default - check_redis_version(conn) - references[ref] = conn.client.id - connections[conn.client.id] = conn + synchronize do + check_redis_version(conn) + references[ref] = conn.client.id + connections[conn.client.id] = conn + end end def create(options = {}) - #TODO clean/remove all ref-less connections - ref = options.delete(:ref) || :default - options.reverse_merge!(default_options) - conn = (connections[connection_id(options)] ||= connection(options)) - references[ref] = conn.client.id - conn + synchronize do + options = options.clone + ref = options.delete(:ref) || :default + options.reverse_merge!(default_options) + conn = (connections[connection_id(options)] ||= connection(options)) + references[ref] = conn.client.id + conn + end end def connections @@ -36,9 +48,12 @@ module Redistat private - def check_redis_version(conn) - raise RedisServerIsTooOld if conn.info["redis_version"] < REQUIRED_SERVER_VERSION - conn + def monitor + @monitor ||= Monitor.new + end + + def synchronize(&block) + monitor.synchronize(&block) end def connection(options) @@ -46,10 +61,15 @@ module Redistat end def connection_id(options = {}) - options.reverse_merge!(default_options) + options = options.reverse_merge(default_options) "redis://#{options[:host]}:#{options[:port]}/#{options[:db]}" end + def check_redis_version(conn) + raise RedisServerIsTooOld if conn.info["redis_version"] < REQUIRED_SERVER_VERSION + conn + end + def default_options { :host => '127.0.0.1', diff --git a/spec/connection_spec.rb b/spec/connection_spec.rb index 04aae55..907a7d1 100644 --- a/spec/connection_spec.rb +++ b/spec/connection_spec.rb @@ -59,4 +59,9 @@ describe Redistat::Connection do Redistat.connect(:port => 8379, :db => 15) end + # TODO: Test thread-safety + it "should be thread-safe" do + pending("need to figure out a way to test thread-safety") + end + end \ No newline at end of file From 5d92c1dbae1397ee138d4cd678bb427465a9566b Mon Sep 17 00:00:00 2001 From: Jim Myhrberg Date: Fri, 15 Apr 2011 14:03:26 +0100 Subject: [PATCH 03/17] created Redistat::Synchronize mixin to help with thread-safety --- lib/redistat.rb | 10 ++++++ lib/redistat/synchronize.rb | 51 +++++++++++++++++++++++++++++ spec/synchronize_spec.rb | 64 +++++++++++++++++++++++++++++++++++++ 3 files changed, 125 insertions(+) create mode 100644 lib/redistat/synchronize.rb create mode 100644 spec/synchronize_spec.rb diff --git a/lib/redistat.rb b/lib/redistat.rb index 11d63db..a2505bd 100644 --- a/lib/redistat.rb +++ b/lib/redistat.rb @@ -3,6 +3,7 @@ require 'rubygems' require 'date' require 'time' require 'digest/sha1' +require 'monitor' # Active Support 2.x or 3.x require 'active_support' @@ -16,6 +17,7 @@ require 'redis' require 'json' require 'redistat/options' +require 'redistat/synchronize' require 'redistat/connection' require 'redistat/database' require 'redistat/collection' @@ -47,6 +49,14 @@ module Redistat class << self + def thread_safe + Synchronize.thread_safe + end + + def thread_safe=(value) + Synchronize.thread_safe = value + end + def connection(ref = nil) Connection.get(ref) end diff --git a/lib/redistat/synchronize.rb b/lib/redistat/synchronize.rb new file mode 100644 index 0000000..2d20b89 --- /dev/null +++ b/lib/redistat/synchronize.rb @@ -0,0 +1,51 @@ +require 'monitor' + +module Redistat + module Synchronize + + class << self + def included(base) + base.send(:include, InstanceMethods) + end + + def monitor + @monitor ||= Monitor.new + end + + def thread_safe + monitor.synchronize do + @thread_safe ||= false + end + end + + def thread_safe=(value) + monitor.synchronize do + @thread_safe = value + end + end + end # << self + + module InstanceMethods + def thread_safe + Synchronize.thread_safe + end + + def thread_safe=(value) + Synchronize.thread_safe = value + end + + def monitor + Synchronize.monitor + end + + def synchronize(&block) + if thread_safe + monitor.synchronize(&block) + else + block.call + end + end + end # InstanceMethods + + end +end \ No newline at end of file diff --git a/spec/synchronize_spec.rb b/spec/synchronize_spec.rb new file mode 100644 index 0000000..56e8f26 --- /dev/null +++ b/spec/synchronize_spec.rb @@ -0,0 +1,64 @@ +require "spec_helper" + +describe Redistat::Synchronize do + it { should respond_to(:monitor) } + it { should respond_to(:thread_safe) } + it { should respond_to(:thread_safe=) } + + describe "instanciated class with Redistat::Synchronize included" do + subject { SynchronizeSpecHelper.new } + it { should respond_to(:monitor) } + it { should respond_to(:thread_safe) } + it { should respond_to(:thread_safe=) } + it { should respond_to(:synchronize) } + + end + + describe "#synchronize method" do + + before(:each) do + Redistat::Synchronize.instance_variable_set("@thread_safe", nil) + @obj = SynchronizeSpecHelper.new + end + + it "should share single Monitor object across all objects" do + @obj.monitor.should == Redistat::Synchronize.monitor + end + + it "should share thread_safe option across all objects" do + obj2 = SynchronizeSpecHelper.new + Redistat::Synchronize.thread_safe.should be_false + @obj.thread_safe.should be_false + obj2.thread_safe.should be_false + @obj.thread_safe = true + Redistat::Synchronize.thread_safe.should be_true + @obj.thread_safe.should be_true + obj2.thread_safe.should be_true + end + + it "should not synchronize when thread_safe is disabled" do + # monitor receives :synchronize twice cause #thread_safe is _always_ synchronized + Redistat::Synchronize.monitor.should_receive(:synchronize).twice + @obj.thread_safe.should be_false # first #synchronize call + @obj.synchronize { 'foo' } # one #synchronize call while checking #thread_safe + end + + it "should synchronize when thread_safe is enabled" do + Monitor.class_eval { + # we're stubbing synchronize to ensure it's being called correctly, but still need it :P + alias :real_synchronize :synchronize + } + Redistat::Synchronize.monitor.should_receive(:synchronize).with.exactly(4).times.and_return { |block| + Redistat::Synchronize.monitor.real_synchronize(&block) + } + @obj.thread_safe.should be_false # first synchronize call + Redistat::Synchronize.thread_safe = true # second synchronize call + @obj.synchronize { 'foo' } # two synchronize calls, once while checking thread_safe, once to call black + end + end + +end + +class SynchronizeSpecHelper + include Redistat::Synchronize +end From a197a04ce89fa8f636215b6324dc238e913c68eb Mon Sep 17 00:00:00 2001 From: Jim Myhrberg Date: Fri, 15 Apr 2011 14:10:51 +0100 Subject: [PATCH 04/17] moved all internal mixin modules to lib/redistat/mixins to tidy up the file structure a bit --- lib/redistat.rb | 8 ++++---- lib/redistat/{ => mixins}/database.rb | 0 lib/redistat/{ => mixins}/date_helper.rb | 0 lib/redistat/{ => mixins}/options.rb | 0 lib/redistat/{ => mixins}/synchronize.rb | 0 5 files changed, 4 insertions(+), 4 deletions(-) rename lib/redistat/{ => mixins}/database.rb (100%) rename lib/redistat/{ => mixins}/date_helper.rb (100%) rename lib/redistat/{ => mixins}/options.rb (100%) rename lib/redistat/{ => mixins}/synchronize.rb (100%) diff --git a/lib/redistat.rb b/lib/redistat.rb index a2505bd..881ec73 100644 --- a/lib/redistat.rb +++ b/lib/redistat.rb @@ -16,13 +16,13 @@ require 'time_ext' require 'redis' require 'json' -require 'redistat/options' -require 'redistat/synchronize' +require 'redistat/mixins/options' +require 'redistat/mixins/synchronize' +require 'redistat/mixins/database' +require 'redistat/mixins/date_helper' require 'redistat/connection' -require 'redistat/database' require 'redistat/collection' require 'redistat/date' -require 'redistat/date_helper' require 'redistat/event' require 'redistat/finder' require 'redistat/key' diff --git a/lib/redistat/database.rb b/lib/redistat/mixins/database.rb similarity index 100% rename from lib/redistat/database.rb rename to lib/redistat/mixins/database.rb diff --git a/lib/redistat/date_helper.rb b/lib/redistat/mixins/date_helper.rb similarity index 100% rename from lib/redistat/date_helper.rb rename to lib/redistat/mixins/date_helper.rb diff --git a/lib/redistat/options.rb b/lib/redistat/mixins/options.rb similarity index 100% rename from lib/redistat/options.rb rename to lib/redistat/mixins/options.rb diff --git a/lib/redistat/synchronize.rb b/lib/redistat/mixins/synchronize.rb similarity index 100% rename from lib/redistat/synchronize.rb rename to lib/redistat/mixins/synchronize.rb From 61231a8b5753c0e18cf14948a70f04549eda86b6 Mon Sep 17 00:00:00 2001 From: Jim Myhrberg Date: Fri, 15 Apr 2011 14:14:17 +0100 Subject: [PATCH 05/17] updated Redistat::Summary to make it easier to plugin the buffer interception code --- lib/redistat/summary.rb | 107 +++++++++++++++++++++------------------- spec/finder_spec.rb | 8 +-- spec/summary_spec.rb | 8 +-- 3 files changed, 65 insertions(+), 58 deletions(-) diff --git a/lib/redistat/summary.rb b/lib/redistat/summary.rb index e089b5a..d54bd18 100644 --- a/lib/redistat/summary.rb +++ b/lib/redistat/summary.rb @@ -2,66 +2,73 @@ module Redistat class Summary include Database - def self.default_options - { :enable_grouping => true, - :label_indexing => true, - :connection_ref => nil } - end - - def self.update_all(key, stats = {}, depth_limit = nil, opts = {}) - stats ||= {} - return nil if stats.size == 0 + class << self - options = default_options.merge((opts || {}).reject { |k,v| v.nil? }) + def default_options + { :enable_grouping => true, + :label_indexing => true, + :connection_ref => nil } + end - depth_limit ||= key.depth + def update_all(key, stats = {}, depth_limit = nil, opts = {}) + stats ||= {} + return nil if stats.size == 0 + + options = default_options.merge((opts || {}).reject { |k,v| v.nil? }) + + depth_limit ||= key.depth + + update(key, stats, depth_limit, options[:enable_grouping], options[:connection_ref], options) + end - if options[:enable_grouping] - stats = inject_group_summaries(stats) - key.groups.each do |k| - update_key(k, stats, depth_limit, options[:connection_ref]) - k.update_index if options[:label_indexing] + private + + def update(key, stats, depth_limit, enable_grouping, connection_ref, options) + if enable_grouping + stats = inject_group_summaries(stats) + key.groups.each do |k| + update_key(k, stats, depth_limit, connection_ref) + k.update_index if options[:label_indexing] + end + else + update_key(key, stats, depth_limit, connection_ref) end - else - update_key(key, stats, depth_limit, options[:connection_ref]) end - end - - private - - def self.update_key(key, stats, depth_limit, connection_ref) - Date::DEPTHS.each do |depth| - update(key, stats, depth, connection_ref) - break if depth == depth_limit + + def update_key(key, stats, depth_limit, connection_ref) + Date::DEPTHS.each do |depth| + update_fields(key, stats, depth, connection_ref) + break if depth == depth_limit + end end - end - - def self.update(key, stats, depth, connection_ref = nil) - stats.each do |field, value| - db(connection_ref).hincrby key.to_s(depth), field, value + + def update_fields(key, stats, depth, connection_ref = nil) + stats.each do |field, value| + db(connection_ref).hincrby key.to_s(depth), field, value + end end - end - - def self.inject_group_summaries!(stats) - summaries = {} - stats.each do |key, value| - parts = key.to_s.split(GROUP_SEPARATOR) - parts.pop - if parts.size > 0 - sum_parts = [] - parts.each do |part| - sum_parts << part - sum_key = sum_parts.join(GROUP_SEPARATOR) - (summaries.has_key?(sum_key)) ? summaries[sum_key] += value : summaries[sum_key] = value + + def inject_group_summaries!(stats) + summaries = {} + stats.each do |key, value| + parts = key.to_s.split(GROUP_SEPARATOR) + parts.pop + if parts.size > 0 + sum_parts = [] + parts.each do |part| + sum_parts << part + sum_key = sum_parts.join(GROUP_SEPARATOR) + (summaries.has_key?(sum_key)) ? summaries[sum_key] += value : summaries[sum_key] = value + end end end + stats.merge_and_incr!(summaries) end - stats.merge_and_incr!(summaries) + + def inject_group_summaries(stats) + inject_group_summaries!(stats.clone) + end + end - - def self.inject_group_summaries(stats) - inject_group_summaries!(stats.clone) - end - end end \ No newline at end of file diff --git a/spec/finder_spec.rb b/spec/finder_spec.rb index 0af91a6..d8748bf 100644 --- a/spec/finder_spec.rb +++ b/spec/finder_spec.rb @@ -194,13 +194,13 @@ describe Redistat::Finder do def create_example_stats key = Redistat::Key.new(@scope, @label, (first = Time.parse("2010-05-14 13:43"))) - Redistat::Summary.update(key, @stats, :hour) + Redistat::Summary.send(:update_fields, key, @stats, :hour) key = Redistat::Key.new(@scope, @label, Time.parse("2010-05-14 13:53")) - Redistat::Summary.update(key, @stats, :hour) + Redistat::Summary.send(:update_fields, key, @stats, :hour) key = Redistat::Key.new(@scope, @label, Time.parse("2010-05-14 14:52")) - Redistat::Summary.update(key, @stats, :hour) + Redistat::Summary.send(:update_fields, key, @stats, :hour) key = Redistat::Key.new(@scope, @label, (last = Time.parse("2010-05-14 15:02"))) - Redistat::Summary.update(key, @stats, :hour) + Redistat::Summary.send(:update_fields, key, @stats, :hour) [first - 1.hour, last + 1.hour] end diff --git a/spec/summary_spec.rb b/spec/summary_spec.rb index ebf33af..fb032c7 100644 --- a/spec/summary_spec.rb +++ b/spec/summary_spec.rb @@ -13,19 +13,19 @@ describe Redistat::Summary do end it "should update a single summary properly" do - Redistat::Summary.update(@key, @stats, :hour) + Redistat::Summary.send(:update_fields, @key, @stats, :hour) summary = db.hgetall(@key.to_s(:hour)) summary.should have(2).items summary["views"].should == "3" summary["visitors"].should == "2" - Redistat::Summary.update(@key, @stats, :hour) + Redistat::Summary.send(:update_fields, @key, @stats, :hour) summary = db.hgetall(@key.to_s(:hour)) summary.should have(2).items summary["views"].should == "6" summary["visitors"].should == "4" - Redistat::Summary.update(@key, {"views" => -4, "visitors" => -3}, :hour) + Redistat::Summary.send(:update_fields, @key, {"views" => -4, "visitors" => -3}, :hour) summary = db.hgetall(@key.to_s(:hour)) summary.should have(2).items summary["views"].should == "2" @@ -48,7 +48,7 @@ describe Redistat::Summary do it "should update summaries even if no label is set" do key = Redistat::Key.new(@scope, nil, @date, {:depth => :day}) - Redistat::Summary.update(key, @stats, :hour) + Redistat::Summary.send(:update_fields, key, @stats, :hour) summary = db.hgetall(key.to_s(:hour)) summary.should have(2).items summary["views"].should == "3" From 3a25fcc788b81d6d3fac666ee8a6440327e0c73d Mon Sep 17 00:00:00 2001 From: Jim Myhrberg Date: Fri, 15 Apr 2011 16:45:21 +0100 Subject: [PATCH 06/17] created Redistat::Buffer, mainly feature complete, still needs a few more specs --- lib/redistat.rb | 13 +++++++ lib/redistat/buffer.rb | 88 ++++++++++++++++++++++++++++++++++++++++++ spec/buffer_spec.rb | 67 ++++++++++++++++++++++++++++++++ 3 files changed, 168 insertions(+) create mode 100644 lib/redistat/buffer.rb create mode 100644 spec/buffer_spec.rb diff --git a/lib/redistat.rb b/lib/redistat.rb index 881ec73..23ca423 100644 --- a/lib/redistat.rb +++ b/lib/redistat.rb @@ -21,6 +21,7 @@ require 'redistat/mixins/synchronize' require 'redistat/mixins/database' require 'redistat/mixins/date_helper' require 'redistat/connection' +require 'redistat/buffer' require 'redistat/collection' require 'redistat/date' require 'redistat/event' @@ -49,6 +50,18 @@ module Redistat class << self + def buffer + Buffer.instance + end + + def buffer_size + buffer.size + end + + def buffer_size=(size) + buffer.size = size + end + def thread_safe Synchronize.thread_safe end diff --git a/lib/redistat/buffer.rb b/lib/redistat/buffer.rb new file mode 100644 index 0000000..13bc3b3 --- /dev/null +++ b/lib/redistat/buffer.rb @@ -0,0 +1,88 @@ +require 'redistat/core_ext/hash' + +module Redistat + class Buffer + include Synchronize + + def self.instance + @instance ||= self.new + end + + def size + @size ||= 0 + end + attr_writer :size + + def queue + @queue ||= {} + end + + def store(key, stats, depth_limit, opts) + return false unless should_buffer? + + to_flush = {} + buffkey = buffer_key(key, opts) + + synchronize do + if !queue.has_key?(buffkey) + queue[buffkey] = { :key => key, + :stats => {}, + :depth_limit => depth_limit, + :opts => opts } + end + + queue[buffkey][:stats].merge_and_incr!(hash) + + # return items to be flushed if buffer size limit has been reached + to_flush = reset_queue + end + + # flush any data that's been cleared from the queue + flush_data(to_flush) + true + end + + def flush(force = false) + to_flush = {} + synchronize do + to_flush = reset_queue(force) + end + flush_data(to_flush) + end + + private + + def should_buffer? + size > 1 # buffer size of 1 would be equal to not using buffer + end + + # should always be called from within a synchronize block + def should_flush? + (!queue.blank? && queue.size >= size) + end + + # returns items to be flushed if buffer size limit has been reached + # should always be called from within a synchronize block + def reset_queue(force = false) + return {} if !force || should_flush? + data = queue + @queue = {} + data + end + + def flush_data(buffer_data) + buffer_data.each do |k, item| + Summary.update(item[:key], item[:stats], item[:depth_limit], item[:connection_ref]) + end + end + + def buffer_key(key, opts) + # depth_limit is not needed as it's evident in key.to_s + opts_index = Summary.default_options.keys.sort { |a,b| a.to_s <=> b.to_s }.map do |k| + opts[k] if opts.has_key?(k) + end + "#{key.to_s}:#{opts_index.join(':')}" + end + + end +end diff --git a/spec/buffer_spec.rb b/spec/buffer_spec.rb new file mode 100644 index 0000000..e202a2f --- /dev/null +++ b/spec/buffer_spec.rb @@ -0,0 +1,67 @@ +require "spec_helper" + +describe Redistat::Buffer do + + before(:each) do + @class = Redistat::Buffer + @buffer = Redistat::Buffer.instance + end + + # let's cleanup after ourselves for the other specs + after(:each) do + @class.instance_variable_set("@instance", nil) + @buffer.size = 0 + end + + it "should provide instance of itself" do + @buffer.should be_a(@class) + end + + it "should only buffer if buffer size setting is greater than 1" do + @buffer.size.should == 0 + @buffer.send(:should_buffer?).should be_false + @buffer.size = 1 + @buffer.size.should == 1 + @buffer.send(:should_buffer?).should be_false + @buffer.size = 2 + @buffer.size.should == 2 + @buffer.send(:should_buffer?).should be_true + end + + it "should only flush buffer if buffer size is greater than or equal to buffer size setting" do + @buffer.size.should == 0 + @buffer.queue.size.should == 0 + @buffer.send(:should_flush?).should be_false + @buffer.queue[:hello] = 'world' + @buffer.send(:should_flush?).should be_true + @buffer.size = 5 + @buffer.send(:should_flush?).should be_false + 3.times { |i| @buffer.queue[i] = i.to_s } + @buffer.send(:should_flush?).should be_false + @buffer.queue[4] = '4' + @buffer.send(:should_flush?).should be_true + end + + it "should force flush queue irregardless of result of #should_flush? when #reset_queue is called with true" do + @buffer.queue[:hello] = 'world' + @buffer.send(:should_flush?).should be_true + @buffer.size = 2 + @buffer.send(:should_flush?).should be_false + @buffer.send(:reset_queue).should == {} + @buffer.send(:reset_queue, true).should == {:hello => 'world'} + end + + it "should flush data into Summary.update properly" do + # the root level key value doesn't actually matter, but it's something like this... + data = {'ScopeName/label/goes/here:2011:nil:true:true' => { + :key => mock("Key"), + :stats => {}, + :depth_limit => :year, + :opts => {:heh => false} + }} + item = data.first[1] + Redistat::Summary.should_receive(:update).with(item[:key], item[:stats], item[:depth_limit], item[:connection_ref]) + @buffer.send(:flush_data, data) + end + +end From 6c63843cd5fdff8b9c0d5ec20ed0b7485bcf9556 Mon Sep 17 00:00:00 2001 From: Jim Myhrberg Date: Fri, 15 Apr 2011 16:45:56 +0100 Subject: [PATCH 07/17] updated Redistat::Summary to incorporate use of write Buffer --- lib/redistat/summary.rb | 36 ++++++++++++++++++++++-------------- 1 file changed, 22 insertions(+), 14 deletions(-) diff --git a/lib/redistat/summary.rb b/lib/redistat/summary.rb index d54bd18..cc0bce3 100644 --- a/lib/redistat/summary.rb +++ b/lib/redistat/summary.rb @@ -10,31 +10,39 @@ module Redistat :connection_ref => nil } end + def buffer + Redistat.buffer + end + def update_all(key, stats = {}, depth_limit = nil, opts = {}) stats ||= {} - return nil if stats.size == 0 + return if stats.empty? options = default_options.merge((opts || {}).reject { |k,v| v.nil? }) depth_limit ||= key.depth - update(key, stats, depth_limit, options[:enable_grouping], options[:connection_ref], options) + update_through_buffer(key, stats, depth_limit, options) + end + + def update_through_buffer(*args) + update(*args) unless buffer.store(*args) + end + + def update(key, stats, depth_limit, opts) + if opts[:enable_grouping] + stats = inject_group_summaries(stats) + key.groups.each do |k| + update_key(k, stats, depth_limit, opts[:connection_ref]) + k.update_index if opts[:label_indexing] + end + else + update_key(key, stats, depth_limit, opts[:connection_ref]) + end end private - def update(key, stats, depth_limit, enable_grouping, connection_ref, options) - if enable_grouping - stats = inject_group_summaries(stats) - key.groups.each do |k| - update_key(k, stats, depth_limit, connection_ref) - k.update_index if options[:label_indexing] - end - else - update_key(key, stats, depth_limit, connection_ref) - end - end - def update_key(key, stats, depth_limit, connection_ref) Date::DEPTHS.each do |depth| update_fields(key, stats, depth, connection_ref) From 2ca5aae4b8277cf78c8a6d51c41512116a89a336 Mon Sep 17 00:00:00 2001 From: Jim Myhrberg Date: Fri, 15 Apr 2011 16:46:12 +0100 Subject: [PATCH 08/17] require required libraries, just cause --- lib/redistat/result.rb | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/redistat/result.rb b/lib/redistat/result.rb index 7a91dff..d1a2de3 100644 --- a/lib/redistat/result.rb +++ b/lib/redistat/result.rb @@ -1,3 +1,5 @@ +require 'active_support/core_ext/hash/indifferent_access' + module Redistat class Result < HashWithIndifferentAccess From 4b06513813c3a5389752aabbbcd3e9c3c08de628 Mon Sep 17 00:00:00 2001 From: Jim Myhrberg Date: Fri, 15 Apr 2011 17:42:48 +0100 Subject: [PATCH 09/17] additional specs for Redistat::Buffer, still a few more needed --- spec/buffer_spec.rb | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/spec/buffer_spec.rb b/spec/buffer_spec.rb index e202a2f..9158464 100644 --- a/spec/buffer_spec.rb +++ b/spec/buffer_spec.rb @@ -51,7 +51,7 @@ describe Redistat::Buffer do @buffer.send(:reset_queue, true).should == {:hello => 'world'} end - it "should flush data into Summary.update properly" do + it "should #flush_data into Summary.update properly" do # the root level key value doesn't actually matter, but it's something like this... data = {'ScopeName/label/goes/here:2011:nil:true:true' => { :key => mock("Key"), @@ -64,4 +64,12 @@ describe Redistat::Buffer do @buffer.send(:flush_data, data) end + it "should build #buffer_key correctly" do + key = mock('Key', :to_s => "Scope/label:2011") + opts = {:enable_grouping => true, :label_indexing => false, :connection_ref => nil} + @buffer.send(:buffer_key, key, opts).should == "#{key.to_s}::true:false" + opts = {:enable_grouping => false, :label_indexing => true, :connection_ref => :omg} + @buffer.send(:buffer_key, key, opts).should == "#{key.to_s}:omg:false:true" + end + end From b129074cd7250f4615fac8873995e81cd5850bd9 Mon Sep 17 00:00:00 2001 From: Jim Myhrberg Date: Mon, 18 Apr 2011 10:00:22 +0100 Subject: [PATCH 10/17] make Buffer#queue a private method as it's never supposed to be modified or read from outside of the Buffer object --- lib/redistat/buffer.rb | 8 ++++---- spec/buffer_spec.rb | 10 +++++----- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/lib/redistat/buffer.rb b/lib/redistat/buffer.rb index 13bc3b3..b78c4e7 100644 --- a/lib/redistat/buffer.rb +++ b/lib/redistat/buffer.rb @@ -13,10 +13,6 @@ module Redistat end attr_writer :size - def queue - @queue ||= {} - end - def store(key, stats, depth_limit, opts) return false unless should_buffer? @@ -52,6 +48,10 @@ module Redistat private + def queue + @queue ||= {} + end + def should_buffer? size > 1 # buffer size of 1 would be equal to not using buffer end diff --git a/spec/buffer_spec.rb b/spec/buffer_spec.rb index 9158464..43bd73a 100644 --- a/spec/buffer_spec.rb +++ b/spec/buffer_spec.rb @@ -30,20 +30,20 @@ describe Redistat::Buffer do it "should only flush buffer if buffer size is greater than or equal to buffer size setting" do @buffer.size.should == 0 - @buffer.queue.size.should == 0 + @buffer.send(:queue).size.should == 0 @buffer.send(:should_flush?).should be_false - @buffer.queue[:hello] = 'world' + @buffer.send(:queue)[:hello] = 'world' @buffer.send(:should_flush?).should be_true @buffer.size = 5 @buffer.send(:should_flush?).should be_false - 3.times { |i| @buffer.queue[i] = i.to_s } + 3.times { |i| @buffer.send(:queue)[i] = i.to_s } @buffer.send(:should_flush?).should be_false - @buffer.queue[4] = '4' + @buffer.send(:queue)[4] = '4' @buffer.send(:should_flush?).should be_true end it "should force flush queue irregardless of result of #should_flush? when #reset_queue is called with true" do - @buffer.queue[:hello] = 'world' + @buffer.send(:queue)[:hello] = 'world' @buffer.send(:should_flush?).should be_true @buffer.size = 2 @buffer.send(:should_flush?).should be_false From eb1d607a6125e4fd705f0a49b66c6646e36ac23c Mon Sep 17 00:00:00 2001 From: Jim Myhrberg Date: Mon, 18 Apr 2011 12:36:19 +0100 Subject: [PATCH 11/17] a number of issues fixed with Buffer class, and specs updated accordingly --- lib/redistat/buffer.rb | 20 ++++++-- spec/buffer_spec.rb | 102 +++++++++++++++++++++++++++++++++++++---- 2 files changed, 108 insertions(+), 14 deletions(-) diff --git a/lib/redistat/buffer.rb b/lib/redistat/buffer.rb index b78c4e7..0caa508 100644 --- a/lib/redistat/buffer.rb +++ b/lib/redistat/buffer.rb @@ -13,6 +13,10 @@ module Redistat end attr_writer :size + def count + @count ||= 0 + end + def store(key, stats, depth_limit, opts) return false unless should_buffer? @@ -27,7 +31,8 @@ module Redistat :opts => opts } end - queue[buffkey][:stats].merge_and_incr!(hash) + queue[buffkey][:stats].merge_and_incr!(stats) + incr_count # return items to be flushed if buffer size limit has been reached to_flush = reset_queue @@ -48,6 +53,12 @@ module Redistat private + # should always be called from within a synchronize block + def incr_count + @count ||= 0 + @count += 1 + end + def queue @queue ||= {} end @@ -58,21 +69,22 @@ module Redistat # should always be called from within a synchronize block def should_flush? - (!queue.blank? && queue.size >= size) + (!queue.blank? && count >= size) end # returns items to be flushed if buffer size limit has been reached # should always be called from within a synchronize block def reset_queue(force = false) - return {} if !force || should_flush? + return {} if !force && !should_flush? data = queue @queue = {} + @count = 0 data end def flush_data(buffer_data) buffer_data.each do |k, item| - Summary.update(item[:key], item[:stats], item[:depth_limit], item[:connection_ref]) + Summary.update(item[:key], item[:stats], item[:depth_limit], item[:opts]) end end diff --git a/spec/buffer_spec.rb b/spec/buffer_spec.rb index 43bd73a..cf6ce0b 100644 --- a/spec/buffer_spec.rb +++ b/spec/buffer_spec.rb @@ -5,6 +5,10 @@ describe Redistat::Buffer do before(:each) do @class = Redistat::Buffer @buffer = Redistat::Buffer.instance + @key = mock('Key', :to_s => "Scope/label:2011") + @stats = {:count => 1, :views => 3} + @depth_limit = :hour + @opts = {:enable_grouping => true} end # let's cleanup after ourselves for the other specs @@ -33,43 +37,121 @@ describe Redistat::Buffer do @buffer.send(:queue).size.should == 0 @buffer.send(:should_flush?).should be_false @buffer.send(:queue)[:hello] = 'world' + @buffer.send(:incr_count) @buffer.send(:should_flush?).should be_true @buffer.size = 5 @buffer.send(:should_flush?).should be_false - 3.times { |i| @buffer.send(:queue)[i] = i.to_s } + 3.times { |i| + @buffer.send(:queue)[i] = i.to_s + @buffer.send(:incr_count) + } @buffer.send(:should_flush?).should be_false @buffer.send(:queue)[4] = '4' + @buffer.send(:incr_count) @buffer.send(:should_flush?).should be_true end it "should force flush queue irregardless of result of #should_flush? when #reset_queue is called with true" do @buffer.send(:queue)[:hello] = 'world' + @buffer.send(:incr_count) @buffer.send(:should_flush?).should be_true @buffer.size = 2 @buffer.send(:should_flush?).should be_false @buffer.send(:reset_queue).should == {} + @buffer.instance_variable_get("@count").should == 1 @buffer.send(:reset_queue, true).should == {:hello => 'world'} + @buffer.instance_variable_get("@count").should == 0 end it "should #flush_data into Summary.update properly" do # the root level key value doesn't actually matter, but it's something like this... - data = {'ScopeName/label/goes/here:2011:nil:true:true' => { - :key => mock("Key"), - :stats => {}, - :depth_limit => :year, - :opts => {:heh => false} + data = {'ScopeName/label/goes/here:2011::true:true' => { + :key => @key, + :stats => @stats, + :depth_limit => @depth_limit, + :opts => @opts }} item = data.first[1] - Redistat::Summary.should_receive(:update).with(item[:key], item[:stats], item[:depth_limit], item[:connection_ref]) + Redistat::Summary.should_receive(:update).with(@key, @stats, @depth_limit, @opts) @buffer.send(:flush_data, data) end it "should build #buffer_key correctly" do - key = mock('Key', :to_s => "Scope/label:2011") opts = {:enable_grouping => true, :label_indexing => false, :connection_ref => nil} - @buffer.send(:buffer_key, key, opts).should == "#{key.to_s}::true:false" + @buffer.send(:buffer_key, @key, opts).should == "#{@key.to_s}::true:false" opts = {:enable_grouping => false, :label_indexing => true, :connection_ref => :omg} - @buffer.send(:buffer_key, key, opts).should == "#{key.to_s}:omg:false:true" + @buffer.send(:buffer_key, @key, opts).should == "#{@key.to_s}:omg:false:true" + end + + describe "Buffering" do + it "should store items on buffer queue" do + @buffer.store(@key, @stats, @depth_limit, @opts).should be_false + @buffer.size = 5 + @buffer.store(@key, @stats, @depth_limit, @opts).should be_true + @buffer.send(:queue).should have(1).item + @buffer.send(:queue)[@buffer.send(:queue).keys.first][:stats][:count].should == 1 + @buffer.send(:queue)[@buffer.send(:queue).keys.first][:stats][:views].should == 3 + @buffer.store(@key, @stats, @depth_limit, @opts).should be_true + @buffer.send(:queue).should have(1).items + @buffer.send(:queue)[@buffer.send(:queue).keys.first][:stats][:count].should == 2 + @buffer.send(:queue)[@buffer.send(:queue).keys.first][:stats][:views].should == 6 + end + + it "should flush buffer queue when size is reached" do + key = mock('Key', :to_s => "Scope/labelx:2011") + @buffer.size = 10 + Redistat::Summary.should_receive(:update).exactly(2).times.and_return do |k, stats, depth_limit, opts| + depth_limit.should == @depth_limit + opts.should == @opts + if k == @key + stats[:count].should == 6 + stats[:views].should == 18 + elsif k == key + stats[:count].should == 4 + stats[:views].should == 12 + end + end + 6.times { @buffer.store(@key, @stats, @depth_limit, @opts).should be_true } + 4.times { @buffer.store(key, @stats, @depth_limit, @opts).should be_true } + end + end + + describe "Thread-Safety" do + it "should read/write to buffer queue in a thread-safe manner" do + + # This spec passes wether thread safety is enabled or not. In short I need + # better specs for thread-safety, and personally a better understanding of + # thread-safety in general. + Redistat.thread_safe = true + + key = mock('Key', :to_s => "Scope/labelx:2011") + @buffer.size = 100 + + Redistat::Summary.should_receive(:update).exactly(2).times.and_return do |k, stats, depth_limit, opts| + depth_limit.should == @depth_limit + opts.should == @opts + if k == @key + stats[:count].should == 60 + stats[:views].should == 180 + elsif k == key + stats[:count].should == 40 + stats[:views].should == 120 + end + end + + threads = [] + 10.times do + threads << Thread.new { + 6.times { @buffer.store(@key, @stats, @depth_limit, @opts).should be_true } + 4.times { @buffer.store(key, @stats, @depth_limit, @opts).should be_true } + } + end + + threads.each { |t| t.join } + end + + it "should have better specs that actually fail when thread-safety is off" + end end From 7b5c3089609f59a12f712def583f05bbcb4b4bd3 Mon Sep 17 00:00:00 2001 From: Jim Myhrberg Date: Mon, 18 Apr 2011 12:36:43 +0100 Subject: [PATCH 12/17] model spec updated to test write buffer --- spec/model_spec.rb | 45 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/spec/model_spec.rb b/spec/model_spec.rb index 347bfd9..7d8404f 100644 --- a/spec/model_spec.rb +++ b/spec/model_spec.rb @@ -156,4 +156,49 @@ describe Redistat::Model do stats.total[:weight].should == 617 end + describe "Write Buffer" do + before(:each) do + Redistat.buffer_size = 20 + end + + after(:each) do + Redistat.buffer_size = 0 + end + + it "should buffer calls in memory before committing to Redis" do + 14.times do + ModelHelper1.store("sheep.black", {:count => 1, :weight => 461}, @time.hours_ago(4)) + end + ModelHelper1.fetch("sheep.black", @time.hours_ago(5), @time.hours_since(1)).total.should == {} + + 5.times do + ModelHelper1.store("sheep.black", {:count => 1, :weight => 156}, @time) + end + ModelHelper1.fetch("sheep.black", @time.hours_ago(5), @time.hours_since(1)).total.should == {} + + ModelHelper1.store("sheep.black", {:count => 1, :weight => 156}, @time) + stats = ModelHelper1.fetch("sheep.black", @time.hours_ago(5), @time.hours_since(1)) + stats.total["count"].should == 20 + stats.total["weight"].should == 7390 + end + + it "should force flush buffer when #flush(true) is called" do + ModelHelper1.fetch("sheep.black", @time.hours_ago(5), @time.hours_since(1)).total.should == {} + 14.times do + ModelHelper1.store("sheep.black", {:count => 1, :weight => 461}, @time.hours_ago(4)) + end + ModelHelper1.fetch("sheep.black", @time.hours_ago(5), @time.hours_since(1)).total.should == {} + Redistat.buffer.flush(true) + + stats = ModelHelper1.fetch("sheep.black", @time.hours_ago(5), @time.hours_since(1)) + stats.total["count"].should == 14 + stats.total["weight"].should == 6454 + end + end + end + + + + + From b13da6f3327caf114a9941fd89d79b61a06b6bc9 Mon Sep 17 00:00:00 2001 From: Jim Myhrberg Date: Mon, 18 Apr 2011 12:37:24 +0100 Subject: [PATCH 13/17] create a flush buffer #at_exit callback to ensure any buffered messages are flushed to Redis on process exit --- lib/redistat.rb | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/lib/redistat.rb b/lib/redistat.rb index 23ca423..4ff2389 100644 --- a/lib/redistat.rb +++ b/lib/redistat.rb @@ -20,6 +20,7 @@ require 'redistat/mixins/options' require 'redistat/mixins/synchronize' require 'redistat/mixins/database' require 'redistat/mixins/date_helper' + require 'redistat/connection' require 'redistat/buffer' require 'redistat/collection' @@ -36,6 +37,7 @@ require 'redistat/version' require 'redistat/core_ext' + module Redistat KEY_NEXT_ID = ".next_id" @@ -91,3 +93,9 @@ module Redistat end end + + +# ensure buffer is flushed on program exit +Kernel.at_exit do + Redistat.buffer.flush(true) +end From b2c31a0e87b89a24892bd4e7f15e9cfb1d9bebd2 Mon Sep 17 00:00:00 2001 From: Jim Myhrberg Date: Mon, 18 Apr 2011 12:40:49 +0100 Subject: [PATCH 14/17] ensure buffer size value is read/written to in a thread-safe manner --- lib/redistat/buffer.rb | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/lib/redistat/buffer.rb b/lib/redistat/buffer.rb index 0caa508..66e4f3c 100644 --- a/lib/redistat/buffer.rb +++ b/lib/redistat/buffer.rb @@ -9,9 +9,16 @@ module Redistat end def size - @size ||= 0 + synchronize do + @size ||= 0 + end + end + + def size=(value) + synchronize do + @size = value + end end - attr_writer :size def count @count ||= 0 From d9a8aefcc58e58d4e9e0a68686779caf5ad8310a Mon Sep 17 00:00:00 2001 From: Jim Myhrberg Date: Mon, 18 Apr 2011 14:25:38 +0100 Subject: [PATCH 15/17] updated readme with thread_safe and buffer info --- README.md | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/README.md b/README.md index 7ff82b6..c01fba5 100644 --- a/README.md +++ b/README.md @@ -25,6 +25,10 @@ view\_stats.rb: class ViewStats include Redistat::Model end + + # if using Redistat in multiple threads set this + # somewhere in the beginning of the execution stack + Redistat.thread_safe = true ### Simple Example ### @@ -187,6 +191,12 @@ By default when fetching statistics, Redistat will figure out how to do the leas It is also intelligent enough to not fetch each day from 3-31 of a month, instead it would fetch the data for the whole month and the first two days, which are then removed from the summary of the whole month. This means three calls to `hgetall` instead of 29 if each whole day was fetched. +### Buffer ### + +The buffer is a new, still semi-beta, feature aimed to reduce the number of Redis `hincrby` that Redistat sends. This should only really be useful when you're hitting north of 30,000 Redis requests per second, if your Redis server has limited resources, or against my recommendation you've opted to use 10, 20, or more label grouping levels. + +Buffering tries to fold together multiple `store` calls into as few as possible by merging the statistics hashes from all calls and groups them based on scope, label, date depth, and more. You configure the the buffer by setting `Redistat.buffer_size` to an integer higher than 1. This basically tells Redistat how many `store` calls to buffer in memory before writing all data to Redis. + ## Todo ## From c53c7116dd1bf0a852f79787fe6db0c014a6a38e Mon Sep 17 00:00:00 2001 From: Jim Myhrberg Date: Mon, 18 Apr 2011 14:26:52 +0100 Subject: [PATCH 16/17] updated Connection TODO comment --- lib/redistat/connection.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/redistat/connection.rb b/lib/redistat/connection.rb index 10d4c40..97e84a4 100644 --- a/lib/redistat/connection.rb +++ b/lib/redistat/connection.rb @@ -5,7 +5,7 @@ module Redistat REQUIRED_SERVER_VERSION = "1.3.10" - # TODO: Create a ConnectionPool instance object to replace Connection class + # TODO: Create a ConnectionPool instance object using Sychronize mixin to replace Connection class class << self From ae5a39101249dfcf3a829ec294f02600181723f9 Mon Sep 17 00:00:00 2001 From: Jim Myhrberg Date: Mon, 18 Apr 2011 14:27:40 +0100 Subject: [PATCH 17/17] started release v0.3.0 --- lib/redistat/version.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/redistat/version.rb b/lib/redistat/version.rb index cee60b3..1df48e6 100644 --- a/lib/redistat/version.rb +++ b/lib/redistat/version.rb @@ -1,3 +1,3 @@ module Redistat - VERSION = "0.2.6" + VERSION = "0.3.0" end