diff --git a/lib/redistat.rb b/lib/redistat.rb index 11d63db..4ff2389 100644 --- a/lib/redistat.rb +++ b/lib/redistat.rb @@ -3,6 +3,7 @@ require 'rubygems' require 'date' require 'time' require 'digest/sha1' +require 'monitor' # Active Support 2.x or 3.x require 'active_support' @@ -15,12 +16,15 @@ require 'time_ext' require 'redis' require 'json' -require 'redistat/options' +require 'redistat/mixins/options' +require 'redistat/mixins/synchronize' +require 'redistat/mixins/database' +require 'redistat/mixins/date_helper' + require 'redistat/connection' -require 'redistat/database' +require 'redistat/buffer' require 'redistat/collection' require 'redistat/date' -require 'redistat/date_helper' require 'redistat/event' require 'redistat/finder' require 'redistat/key' @@ -33,6 +37,7 @@ require 'redistat/version' require 'redistat/core_ext' + module Redistat KEY_NEXT_ID = ".next_id" @@ -47,6 +52,26 @@ module Redistat class << self + def buffer + Buffer.instance + end + + def buffer_size + buffer.size + end + + def buffer_size=(size) + buffer.size = size + end + + def thread_safe + Synchronize.thread_safe + end + + def thread_safe=(value) + Synchronize.thread_safe = value + end + def connection(ref = nil) Connection.get(ref) end @@ -68,3 +93,9 @@ module Redistat end end + + +# ensure buffer is flushed on program exit +Kernel.at_exit do + Redistat.buffer.flush(true) +end diff --git a/lib/redistat/buffer.rb b/lib/redistat/buffer.rb new file mode 100644 index 0000000..66e4f3c --- /dev/null +++ b/lib/redistat/buffer.rb @@ -0,0 +1,107 @@ +require 'redistat/core_ext/hash' + +module Redistat + class Buffer + include Synchronize + + def self.instance + @instance ||= self.new + end + + def size + synchronize do + @size ||= 0 + end + end + + def size=(value) + synchronize do + @size = value + end + end + + def count + @count ||= 0 + end + + def store(key, stats, depth_limit, opts) + return false unless should_buffer? + + to_flush = {} + buffkey = buffer_key(key, opts) + + synchronize do + if !queue.has_key?(buffkey) + queue[buffkey] = { :key => key, + :stats => {}, + :depth_limit => depth_limit, + :opts => opts } + end + + queue[buffkey][:stats].merge_and_incr!(stats) + incr_count + + # return items to be flushed if buffer size limit has been reached + to_flush = reset_queue + end + + # flush any data that's been cleared from the queue + flush_data(to_flush) + true + end + + def flush(force = false) + to_flush = {} + synchronize do + to_flush = reset_queue(force) + end + flush_data(to_flush) + end + + private + + # should always be called from within a synchronize block + def incr_count + @count ||= 0 + @count += 1 + end + + def queue + @queue ||= {} + end + + def should_buffer? + size > 1 # buffer size of 1 would be equal to not using buffer + end + + # should always be called from within a synchronize block + def should_flush? + (!queue.blank? && count >= size) + end + + # returns items to be flushed if buffer size limit has been reached + # should always be called from within a synchronize block + def reset_queue(force = false) + return {} if !force && !should_flush? + data = queue + @queue = {} + @count = 0 + data + end + + def flush_data(buffer_data) + buffer_data.each do |k, item| + Summary.update(item[:key], item[:stats], item[:depth_limit], item[:opts]) + end + end + + def buffer_key(key, opts) + # depth_limit is not needed as it's evident in key.to_s + opts_index = Summary.default_options.keys.sort { |a,b| a.to_s <=> b.to_s }.map do |k| + opts[k] if opts.has_key?(k) + end + "#{key.to_s}:#{opts_index.join(':')}" + end + + end +end diff --git a/lib/redistat/connection.rb b/lib/redistat/connection.rb index 23d9141..10d4c40 100644 --- a/lib/redistat/connection.rb +++ b/lib/redistat/connection.rb @@ -1,29 +1,41 @@ +require 'monitor' + module Redistat module Connection REQUIRED_SERVER_VERSION = "1.3.10" + # TODO: Create a ConnectionPool instance object to replace Connection class + class << self + # TODO: clean/remove all ref-less connections + def get(ref = nil) ref ||= :default - connections[references[ref]] || create + synchronize do + connections[references[ref]] || create + end end def add(conn, ref = nil) ref ||= :default - check_redis_version(conn) - references[ref] = conn.client.id - connections[conn.client.id] = conn + synchronize do + check_redis_version(conn) + references[ref] = conn.client.id + connections[conn.client.id] = conn + end end def create(options = {}) - #TODO clean/remove all ref-less connections - ref = options.delete(:ref) || :default - options.reverse_merge!(default_options) - conn = (connections[connection_id(options)] ||= connection(options)) - references[ref] = conn.client.id - conn + synchronize do + options = options.clone + ref = options.delete(:ref) || :default + options.reverse_merge!(default_options) + conn = (connections[connection_id(options)] ||= connection(options)) + references[ref] = conn.client.id + conn + end end def connections @@ -36,9 +48,12 @@ module Redistat private - def check_redis_version(conn) - raise RedisServerIsTooOld if conn.info["redis_version"] < REQUIRED_SERVER_VERSION - conn + def monitor + @monitor ||= Monitor.new + end + + def synchronize(&block) + monitor.synchronize(&block) end def connection(options) @@ -46,10 +61,15 @@ module Redistat end def connection_id(options = {}) - options.reverse_merge!(default_options) + options = options.reverse_merge(default_options) "redis://#{options[:host]}:#{options[:port]}/#{options[:db]}" end + def check_redis_version(conn) + raise RedisServerIsTooOld if conn.info["redis_version"] < REQUIRED_SERVER_VERSION + conn + end + def default_options { :host => '127.0.0.1', diff --git a/lib/redistat/database.rb b/lib/redistat/mixins/database.rb similarity index 100% rename from lib/redistat/database.rb rename to lib/redistat/mixins/database.rb diff --git a/lib/redistat/date_helper.rb b/lib/redistat/mixins/date_helper.rb similarity index 100% rename from lib/redistat/date_helper.rb rename to lib/redistat/mixins/date_helper.rb diff --git a/lib/redistat/options.rb b/lib/redistat/mixins/options.rb similarity index 100% rename from lib/redistat/options.rb rename to lib/redistat/mixins/options.rb diff --git a/lib/redistat/mixins/synchronize.rb b/lib/redistat/mixins/synchronize.rb new file mode 100644 index 0000000..2d20b89 --- /dev/null +++ b/lib/redistat/mixins/synchronize.rb @@ -0,0 +1,51 @@ +require 'monitor' + +module Redistat + module Synchronize + + class << self + def included(base) + base.send(:include, InstanceMethods) + end + + def monitor + @monitor ||= Monitor.new + end + + def thread_safe + monitor.synchronize do + @thread_safe ||= false + end + end + + def thread_safe=(value) + monitor.synchronize do + @thread_safe = value + end + end + end # << self + + module InstanceMethods + def thread_safe + Synchronize.thread_safe + end + + def thread_safe=(value) + Synchronize.thread_safe = value + end + + def monitor + Synchronize.monitor + end + + def synchronize(&block) + if thread_safe + monitor.synchronize(&block) + else + block.call + end + end + end # InstanceMethods + + end +end \ No newline at end of file diff --git a/lib/redistat/result.rb b/lib/redistat/result.rb index 7a91dff..d1a2de3 100644 --- a/lib/redistat/result.rb +++ b/lib/redistat/result.rb @@ -1,3 +1,5 @@ +require 'active_support/core_ext/hash/indifferent_access' + module Redistat class Result < HashWithIndifferentAccess diff --git a/lib/redistat/summary.rb b/lib/redistat/summary.rb index e089b5a..cc0bce3 100644 --- a/lib/redistat/summary.rb +++ b/lib/redistat/summary.rb @@ -2,66 +2,81 @@ module Redistat class Summary include Database - def self.default_options - { :enable_grouping => true, - :label_indexing => true, - :connection_ref => nil } - end - - def self.update_all(key, stats = {}, depth_limit = nil, opts = {}) - stats ||= {} - return nil if stats.size == 0 + class << self - options = default_options.merge((opts || {}).reject { |k,v| v.nil? }) + def default_options + { :enable_grouping => true, + :label_indexing => true, + :connection_ref => nil } + end - depth_limit ||= key.depth + def buffer + Redistat.buffer + end - if options[:enable_grouping] - stats = inject_group_summaries(stats) - key.groups.each do |k| - update_key(k, stats, depth_limit, options[:connection_ref]) - k.update_index if options[:label_indexing] + def update_all(key, stats = {}, depth_limit = nil, opts = {}) + stats ||= {} + return if stats.empty? + + options = default_options.merge((opts || {}).reject { |k,v| v.nil? }) + + depth_limit ||= key.depth + + update_through_buffer(key, stats, depth_limit, options) + end + + def update_through_buffer(*args) + update(*args) unless buffer.store(*args) + end + + def update(key, stats, depth_limit, opts) + if opts[:enable_grouping] + stats = inject_group_summaries(stats) + key.groups.each do |k| + update_key(k, stats, depth_limit, opts[:connection_ref]) + k.update_index if opts[:label_indexing] + end + else + update_key(key, stats, depth_limit, opts[:connection_ref]) end - else - update_key(key, stats, depth_limit, options[:connection_ref]) end - end - - private - - def self.update_key(key, stats, depth_limit, connection_ref) - Date::DEPTHS.each do |depth| - update(key, stats, depth, connection_ref) - break if depth == depth_limit + + private + + def update_key(key, stats, depth_limit, connection_ref) + Date::DEPTHS.each do |depth| + update_fields(key, stats, depth, connection_ref) + break if depth == depth_limit + end end - end - - def self.update(key, stats, depth, connection_ref = nil) - stats.each do |field, value| - db(connection_ref).hincrby key.to_s(depth), field, value + + def update_fields(key, stats, depth, connection_ref = nil) + stats.each do |field, value| + db(connection_ref).hincrby key.to_s(depth), field, value + end end - end - - def self.inject_group_summaries!(stats) - summaries = {} - stats.each do |key, value| - parts = key.to_s.split(GROUP_SEPARATOR) - parts.pop - if parts.size > 0 - sum_parts = [] - parts.each do |part| - sum_parts << part - sum_key = sum_parts.join(GROUP_SEPARATOR) - (summaries.has_key?(sum_key)) ? summaries[sum_key] += value : summaries[sum_key] = value + + def inject_group_summaries!(stats) + summaries = {} + stats.each do |key, value| + parts = key.to_s.split(GROUP_SEPARATOR) + parts.pop + if parts.size > 0 + sum_parts = [] + parts.each do |part| + sum_parts << part + sum_key = sum_parts.join(GROUP_SEPARATOR) + (summaries.has_key?(sum_key)) ? summaries[sum_key] += value : summaries[sum_key] = value + end end end + stats.merge_and_incr!(summaries) end - stats.merge_and_incr!(summaries) + + def inject_group_summaries(stats) + inject_group_summaries!(stats.clone) + end + end - - def self.inject_group_summaries(stats) - inject_group_summaries!(stats.clone) - end - end end \ No newline at end of file diff --git a/spec/buffer_spec.rb b/spec/buffer_spec.rb new file mode 100644 index 0000000..cf6ce0b --- /dev/null +++ b/spec/buffer_spec.rb @@ -0,0 +1,157 @@ +require "spec_helper" + +describe Redistat::Buffer do + + before(:each) do + @class = Redistat::Buffer + @buffer = Redistat::Buffer.instance + @key = mock('Key', :to_s => "Scope/label:2011") + @stats = {:count => 1, :views => 3} + @depth_limit = :hour + @opts = {:enable_grouping => true} + end + + # let's cleanup after ourselves for the other specs + after(:each) do + @class.instance_variable_set("@instance", nil) + @buffer.size = 0 + end + + it "should provide instance of itself" do + @buffer.should be_a(@class) + end + + it "should only buffer if buffer size setting is greater than 1" do + @buffer.size.should == 0 + @buffer.send(:should_buffer?).should be_false + @buffer.size = 1 + @buffer.size.should == 1 + @buffer.send(:should_buffer?).should be_false + @buffer.size = 2 + @buffer.size.should == 2 + @buffer.send(:should_buffer?).should be_true + end + + it "should only flush buffer if buffer size is greater than or equal to buffer size setting" do + @buffer.size.should == 0 + @buffer.send(:queue).size.should == 0 + @buffer.send(:should_flush?).should be_false + @buffer.send(:queue)[:hello] = 'world' + @buffer.send(:incr_count) + @buffer.send(:should_flush?).should be_true + @buffer.size = 5 + @buffer.send(:should_flush?).should be_false + 3.times { |i| + @buffer.send(:queue)[i] = i.to_s + @buffer.send(:incr_count) + } + @buffer.send(:should_flush?).should be_false + @buffer.send(:queue)[4] = '4' + @buffer.send(:incr_count) + @buffer.send(:should_flush?).should be_true + end + + it "should force flush queue irregardless of result of #should_flush? when #reset_queue is called with true" do + @buffer.send(:queue)[:hello] = 'world' + @buffer.send(:incr_count) + @buffer.send(:should_flush?).should be_true + @buffer.size = 2 + @buffer.send(:should_flush?).should be_false + @buffer.send(:reset_queue).should == {} + @buffer.instance_variable_get("@count").should == 1 + @buffer.send(:reset_queue, true).should == {:hello => 'world'} + @buffer.instance_variable_get("@count").should == 0 + end + + it "should #flush_data into Summary.update properly" do + # the root level key value doesn't actually matter, but it's something like this... + data = {'ScopeName/label/goes/here:2011::true:true' => { + :key => @key, + :stats => @stats, + :depth_limit => @depth_limit, + :opts => @opts + }} + item = data.first[1] + Redistat::Summary.should_receive(:update).with(@key, @stats, @depth_limit, @opts) + @buffer.send(:flush_data, data) + end + + it "should build #buffer_key correctly" do + opts = {:enable_grouping => true, :label_indexing => false, :connection_ref => nil} + @buffer.send(:buffer_key, @key, opts).should == "#{@key.to_s}::true:false" + opts = {:enable_grouping => false, :label_indexing => true, :connection_ref => :omg} + @buffer.send(:buffer_key, @key, opts).should == "#{@key.to_s}:omg:false:true" + end + + describe "Buffering" do + it "should store items on buffer queue" do + @buffer.store(@key, @stats, @depth_limit, @opts).should be_false + @buffer.size = 5 + @buffer.store(@key, @stats, @depth_limit, @opts).should be_true + @buffer.send(:queue).should have(1).item + @buffer.send(:queue)[@buffer.send(:queue).keys.first][:stats][:count].should == 1 + @buffer.send(:queue)[@buffer.send(:queue).keys.first][:stats][:views].should == 3 + @buffer.store(@key, @stats, @depth_limit, @opts).should be_true + @buffer.send(:queue).should have(1).items + @buffer.send(:queue)[@buffer.send(:queue).keys.first][:stats][:count].should == 2 + @buffer.send(:queue)[@buffer.send(:queue).keys.first][:stats][:views].should == 6 + end + + it "should flush buffer queue when size is reached" do + key = mock('Key', :to_s => "Scope/labelx:2011") + @buffer.size = 10 + Redistat::Summary.should_receive(:update).exactly(2).times.and_return do |k, stats, depth_limit, opts| + depth_limit.should == @depth_limit + opts.should == @opts + if k == @key + stats[:count].should == 6 + stats[:views].should == 18 + elsif k == key + stats[:count].should == 4 + stats[:views].should == 12 + end + end + 6.times { @buffer.store(@key, @stats, @depth_limit, @opts).should be_true } + 4.times { @buffer.store(key, @stats, @depth_limit, @opts).should be_true } + end + end + + describe "Thread-Safety" do + it "should read/write to buffer queue in a thread-safe manner" do + + # This spec passes wether thread safety is enabled or not. In short I need + # better specs for thread-safety, and personally a better understanding of + # thread-safety in general. + Redistat.thread_safe = true + + key = mock('Key', :to_s => "Scope/labelx:2011") + @buffer.size = 100 + + Redistat::Summary.should_receive(:update).exactly(2).times.and_return do |k, stats, depth_limit, opts| + depth_limit.should == @depth_limit + opts.should == @opts + if k == @key + stats[:count].should == 60 + stats[:views].should == 180 + elsif k == key + stats[:count].should == 40 + stats[:views].should == 120 + end + end + + threads = [] + 10.times do + threads << Thread.new { + 6.times { @buffer.store(@key, @stats, @depth_limit, @opts).should be_true } + 4.times { @buffer.store(key, @stats, @depth_limit, @opts).should be_true } + } + end + + threads.each { |t| t.join } + end + + it "should have better specs that actually fail when thread-safety is off" + + end + +end diff --git a/spec/connection_spec.rb b/spec/connection_spec.rb index 04aae55..907a7d1 100644 --- a/spec/connection_spec.rb +++ b/spec/connection_spec.rb @@ -59,4 +59,9 @@ describe Redistat::Connection do Redistat.connect(:port => 8379, :db => 15) end + # TODO: Test thread-safety + it "should be thread-safe" do + pending("need to figure out a way to test thread-safety") + end + end \ No newline at end of file diff --git a/spec/finder_spec.rb b/spec/finder_spec.rb index 0af91a6..d8748bf 100644 --- a/spec/finder_spec.rb +++ b/spec/finder_spec.rb @@ -194,13 +194,13 @@ describe Redistat::Finder do def create_example_stats key = Redistat::Key.new(@scope, @label, (first = Time.parse("2010-05-14 13:43"))) - Redistat::Summary.update(key, @stats, :hour) + Redistat::Summary.send(:update_fields, key, @stats, :hour) key = Redistat::Key.new(@scope, @label, Time.parse("2010-05-14 13:53")) - Redistat::Summary.update(key, @stats, :hour) + Redistat::Summary.send(:update_fields, key, @stats, :hour) key = Redistat::Key.new(@scope, @label, Time.parse("2010-05-14 14:52")) - Redistat::Summary.update(key, @stats, :hour) + Redistat::Summary.send(:update_fields, key, @stats, :hour) key = Redistat::Key.new(@scope, @label, (last = Time.parse("2010-05-14 15:02"))) - Redistat::Summary.update(key, @stats, :hour) + Redistat::Summary.send(:update_fields, key, @stats, :hour) [first - 1.hour, last + 1.hour] end diff --git a/spec/model_spec.rb b/spec/model_spec.rb index 347bfd9..7d8404f 100644 --- a/spec/model_spec.rb +++ b/spec/model_spec.rb @@ -156,4 +156,49 @@ describe Redistat::Model do stats.total[:weight].should == 617 end + describe "Write Buffer" do + before(:each) do + Redistat.buffer_size = 20 + end + + after(:each) do + Redistat.buffer_size = 0 + end + + it "should buffer calls in memory before committing to Redis" do + 14.times do + ModelHelper1.store("sheep.black", {:count => 1, :weight => 461}, @time.hours_ago(4)) + end + ModelHelper1.fetch("sheep.black", @time.hours_ago(5), @time.hours_since(1)).total.should == {} + + 5.times do + ModelHelper1.store("sheep.black", {:count => 1, :weight => 156}, @time) + end + ModelHelper1.fetch("sheep.black", @time.hours_ago(5), @time.hours_since(1)).total.should == {} + + ModelHelper1.store("sheep.black", {:count => 1, :weight => 156}, @time) + stats = ModelHelper1.fetch("sheep.black", @time.hours_ago(5), @time.hours_since(1)) + stats.total["count"].should == 20 + stats.total["weight"].should == 7390 + end + + it "should force flush buffer when #flush(true) is called" do + ModelHelper1.fetch("sheep.black", @time.hours_ago(5), @time.hours_since(1)).total.should == {} + 14.times do + ModelHelper1.store("sheep.black", {:count => 1, :weight => 461}, @time.hours_ago(4)) + end + ModelHelper1.fetch("sheep.black", @time.hours_ago(5), @time.hours_since(1)).total.should == {} + Redistat.buffer.flush(true) + + stats = ModelHelper1.fetch("sheep.black", @time.hours_ago(5), @time.hours_since(1)) + stats.total["count"].should == 14 + stats.total["weight"].should == 6454 + end + end + end + + + + + diff --git a/spec/summary_spec.rb b/spec/summary_spec.rb index ebf33af..fb032c7 100644 --- a/spec/summary_spec.rb +++ b/spec/summary_spec.rb @@ -13,19 +13,19 @@ describe Redistat::Summary do end it "should update a single summary properly" do - Redistat::Summary.update(@key, @stats, :hour) + Redistat::Summary.send(:update_fields, @key, @stats, :hour) summary = db.hgetall(@key.to_s(:hour)) summary.should have(2).items summary["views"].should == "3" summary["visitors"].should == "2" - Redistat::Summary.update(@key, @stats, :hour) + Redistat::Summary.send(:update_fields, @key, @stats, :hour) summary = db.hgetall(@key.to_s(:hour)) summary.should have(2).items summary["views"].should == "6" summary["visitors"].should == "4" - Redistat::Summary.update(@key, {"views" => -4, "visitors" => -3}, :hour) + Redistat::Summary.send(:update_fields, @key, {"views" => -4, "visitors" => -3}, :hour) summary = db.hgetall(@key.to_s(:hour)) summary.should have(2).items summary["views"].should == "2" @@ -48,7 +48,7 @@ describe Redistat::Summary do it "should update summaries even if no label is set" do key = Redistat::Key.new(@scope, nil, @date, {:depth => :day}) - Redistat::Summary.update(key, @stats, :hour) + Redistat::Summary.send(:update_fields, key, @stats, :hour) summary = db.hgetall(key.to_s(:hour)) summary.should have(2).items summary["views"].should == "3" diff --git a/spec/synchronize_spec.rb b/spec/synchronize_spec.rb new file mode 100644 index 0000000..56e8f26 --- /dev/null +++ b/spec/synchronize_spec.rb @@ -0,0 +1,64 @@ +require "spec_helper" + +describe Redistat::Synchronize do + it { should respond_to(:monitor) } + it { should respond_to(:thread_safe) } + it { should respond_to(:thread_safe=) } + + describe "instanciated class with Redistat::Synchronize included" do + subject { SynchronizeSpecHelper.new } + it { should respond_to(:monitor) } + it { should respond_to(:thread_safe) } + it { should respond_to(:thread_safe=) } + it { should respond_to(:synchronize) } + + end + + describe "#synchronize method" do + + before(:each) do + Redistat::Synchronize.instance_variable_set("@thread_safe", nil) + @obj = SynchronizeSpecHelper.new + end + + it "should share single Monitor object across all objects" do + @obj.monitor.should == Redistat::Synchronize.monitor + end + + it "should share thread_safe option across all objects" do + obj2 = SynchronizeSpecHelper.new + Redistat::Synchronize.thread_safe.should be_false + @obj.thread_safe.should be_false + obj2.thread_safe.should be_false + @obj.thread_safe = true + Redistat::Synchronize.thread_safe.should be_true + @obj.thread_safe.should be_true + obj2.thread_safe.should be_true + end + + it "should not synchronize when thread_safe is disabled" do + # monitor receives :synchronize twice cause #thread_safe is _always_ synchronized + Redistat::Synchronize.monitor.should_receive(:synchronize).twice + @obj.thread_safe.should be_false # first #synchronize call + @obj.synchronize { 'foo' } # one #synchronize call while checking #thread_safe + end + + it "should synchronize when thread_safe is enabled" do + Monitor.class_eval { + # we're stubbing synchronize to ensure it's being called correctly, but still need it :P + alias :real_synchronize :synchronize + } + Redistat::Synchronize.monitor.should_receive(:synchronize).with.exactly(4).times.and_return { |block| + Redistat::Synchronize.monitor.real_synchronize(&block) + } + @obj.thread_safe.should be_false # first synchronize call + Redistat::Synchronize.thread_safe = true # second synchronize call + @obj.synchronize { 'foo' } # two synchronize calls, once while checking thread_safe, once to call black + end + end + +end + +class SynchronizeSpecHelper + include Redistat::Synchronize +end