From 4372cf8ef8203c93632cdaf163a1c923075e7d0f Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Fri, 4 Mar 2011 16:00:38 -0800 Subject: Aggregate support via POSIX message queues --- lib/raindrops.rb | 1 + lib/raindrops/aggregate.rb | 5 ++ lib/raindrops/aggregate/pmq.rb | 137 +++++++++++++++++++++++++++++++++++++++++ test/test_aggregate_pmq.rb | 61 ++++++++++++++++++ 4 files changed, 204 insertions(+) create mode 100644 lib/raindrops/aggregate.rb create mode 100644 lib/raindrops/aggregate/pmq.rb create mode 100644 test/test_aggregate_pmq.rb diff --git a/lib/raindrops.rb b/lib/raindrops.rb index f4d6b18..a35a158 100644 --- a/lib/raindrops.rb +++ b/lib/raindrops.rb @@ -30,5 +30,6 @@ class Raindrops autoload :Linux, 'raindrops/linux' autoload :Struct, 'raindrops/struct' autoload :Middleware, 'raindrops/middleware' + autoload :Aggregate, 'raindrops/aggregate' end require 'raindrops_ext' diff --git a/lib/raindrops/aggregate.rb b/lib/raindrops/aggregate.rb new file mode 100644 index 0000000..4f217de --- /dev/null +++ b/lib/raindrops/aggregate.rb @@ -0,0 +1,5 @@ +# -*- encoding: binary -*- +require "aggregate" +module Raindrops::Aggregate + autoload :PMQ, "raindrops/aggregate/pmq" +end diff --git a/lib/raindrops/aggregate/pmq.rb b/lib/raindrops/aggregate/pmq.rb new file mode 100644 index 0000000..26b35f7 --- /dev/null +++ b/lib/raindrops/aggregate/pmq.rb @@ -0,0 +1,137 @@ +# -*- encoding: binary -*- +require "tempfile" +require "aggregate" +require "posix_mq" +require "fcntl" +require "io/extra" + +# Aggregate + POSIX message queues support +class Raindrops::Aggregate::PMQ + RDLOCK = [ Fcntl::F_RDLCK ].pack("s @256") + WRLOCK = [ Fcntl::F_WRLCK ].pack("s @256") + UNLOCK = [ Fcntl::F_UNLCK ].pack("s @256") + + attr_reader :nr_dropped + + def initialize(params) + opts = { + :queue => "/raindrops", + :worker_interval => 10, + :master_interval => 5, + :lossy => false, + :mq_attr => nil, + :mq_umask => 0666, + :aggregate => Aggregate.new, + }.merge! params + @master_interval = opts[:master_interval] + @worker_interval = opts[:worker_interval] + @aggregate = opts[:aggregate] + @worker_queue = @worker_interval ? [] : nil + + @mq_name = opts[:queue] + mq = POSIX_MQ.new @mq_name, :w, opts[:mq_umask], opts[:mq_attr] + Tempfile.open("raindrops_pmq") do |t| + @wr = File.open(t.path, "wb") + @rd = File.open(t.path, "rb") + end + @cached_aggregate = @aggregate + flush_master + @mq_send = if opts[:lossy] + @nr_dropped = 0 + mq.nonblock = true + mq.method :trysend + else + mq.method :send + end + end + + def << val + if q = @worker_queue + q << val + if q.size >= @worker_interval + mq_send(q) or @nr_dropped += 1 + q.clear + end + else + mq_send(val) or @nr_dropped += 1 + end + end + + def mq_send(val) + @cached_aggregate = nil + @mq_send.call Marshal.dump(val) + end + + def master_loop + buf = "" + a = @aggregate + nr = 0 + mq = POSIX_MQ.new @mq_name, :r # this one is always blocking + begin + if (nr -= 1) < 0 + nr = @master_interval + flush_master + end + data = Marshal.load(mq.shift(buf)) or return + Array === data ? data.each { |x| a << x } : a << data + end while true + ensure + flush_master + end + + def aggregate + @cached_aggregate ||= begin + flush + Marshal.load(synchronize(@rd, RDLOCK) do |rd| + IO.pread rd.fileno, rd.stat.size, 0 + end) + end + end + + def flush_master + dump = Marshal.dump @aggregate + synchronize(@wr, WRLOCK) do |wr| + wr.truncate 0 + IO.pwrite wr.fileno, dump, 0 + end + end + + def stop_master_loop + sleep 0.1 until mq_send(false) + rescue Errno::EINTR + retry + end + + def lock! io, type + io.fcntl Fcntl::F_SETLKW, type + rescue Errno::EINTR + retry + end + + def synchronize io, type + lock! io, type + yield io + ensure + lock! io, UNLOCK + end + + def flush + if q = @local_queue && ! q.empty? + mq_send q + q.clear + end + nil + end + + def count; aggregate.count; end + def max; aggregate.max; end + def min; aggregate.min; end + def sum; aggregate.sum; end + def mean; aggregate.mean; end + def std_dev; aggregate.std_dev; end + def outliers_low; aggregate.outliers_low; end + def outliers_high; aggregate.outliers_high; end + def to_s(*args); aggregate.to_s *args; end + def each; aggregate.each { |*args| yield *args }; end + def each_nonzero; aggregate.each_nonzero { |*args| yield *args }; end +end diff --git a/test/test_aggregate_pmq.rb b/test/test_aggregate_pmq.rb new file mode 100644 index 0000000..ffa36cf --- /dev/null +++ b/test/test_aggregate_pmq.rb @@ -0,0 +1,61 @@ +require "test/unit" +require "raindrops" +pmq = begin + Raindrops::Aggregate::PMQ +rescue => LoadError + warn "W: #{e} skipping test" + false +end + +Thread.abort_on_exception = true + +class TestAggregatePMQ < Test::Unit::TestCase + + def setup + @queue = "/test.#{rand}" + end + + def teardown + POSIX_MQ.unlink @queue + end + + def test_run + pmq = Raindrops::Aggregate::PMQ.new :queue => @queue + thr = Thread.new { pmq.master_loop } + agg = Aggregate.new + (1..10).each { |i| pmq << i; agg << i } + pmq.stop_master_loop + assert thr.join + assert_equal agg.count, pmq.count + assert_equal agg.mean, pmq.mean + assert_equal agg.std_dev, pmq.std_dev + assert_equal agg.min, pmq.min + assert_equal agg.max, pmq.max + assert_equal agg.to_s, pmq.to_s + end + + def test_multi_process + nr_workers = 4 + nr = 100 + pmq = Raindrops::Aggregate::PMQ.new :queue => @queue + pid = fork { pmq.master_loop } + workers = (1..nr_workers).map { + fork { + (1..nr).each { |i| pmq << i } + pmq.flush + } + } + workers.each { |pid| assert Process.waitpid2(pid).last.success? } + pmq.stop_master_loop + assert Process.waitpid2(pid).last.success? + assert_equal 400, pmq.count + agg = Aggregate.new + (1..nr_workers).map { (1..nr).each { |i| agg << i } } + assert_equal agg.to_s, pmq.to_s + assert_equal agg.mean, pmq.mean + assert_equal agg.std_dev, pmq.std_dev + assert_equal agg.min, pmq.min + assert_equal agg.max, pmq.max + assert_equal agg.to_s, pmq.to_s + end +end if pmq -- cgit v1.2.3-24-ge0c7