about summary refs log tree commit homepage
diff options
context:
space:
mode:
-rw-r--r--lib/raindrops.rb1
-rw-r--r--lib/raindrops/aggregate.rb5
-rw-r--r--lib/raindrops/aggregate/pmq.rb137
-rw-r--r--test/test_aggregate_pmq.rb61
4 files changed, 204 insertions, 0 deletions
diff --git a/lib/raindrops.rb b/lib/raindrops.rb
index f4d6b18..a35a158 100644
--- a/lib/raindrops.rb
+++ b/lib/raindrops.rb
@@ -30,5 +30,6 @@ class Raindrops
   autoload :Linux, 'raindrops/linux'
   autoload :Struct, 'raindrops/struct'
   autoload :Middleware, 'raindrops/middleware'
+  autoload :Aggregate, 'raindrops/aggregate'
 end
 require 'raindrops_ext'
diff --git a/lib/raindrops/aggregate.rb b/lib/raindrops/aggregate.rb
new file mode 100644
index 0000000..4f217de
--- /dev/null
+++ b/lib/raindrops/aggregate.rb
@@ -0,0 +1,5 @@
+# -*- encoding: binary -*-
+require "aggregate"
+module Raindrops::Aggregate
+  autoload :PMQ, "raindrops/aggregate/pmq"
+end
diff --git a/lib/raindrops/aggregate/pmq.rb b/lib/raindrops/aggregate/pmq.rb
new file mode 100644
index 0000000..26b35f7
--- /dev/null
+++ b/lib/raindrops/aggregate/pmq.rb
@@ -0,0 +1,137 @@
+# -*- encoding: binary -*-
+require "tempfile"
+require "aggregate"
+require "posix_mq"
+require "fcntl"
+require "io/extra"
+
+# Aggregate + POSIX message queues support
+class Raindrops::Aggregate::PMQ
+  RDLOCK = [ Fcntl::F_RDLCK ].pack("s @256")
+  WRLOCK = [ Fcntl::F_WRLCK ].pack("s @256")
+  UNLOCK = [ Fcntl::F_UNLCK ].pack("s @256")
+
+  attr_reader :nr_dropped
+
+  def initialize(params)
+    opts = {
+      :queue => "/raindrops",
+      :worker_interval => 10,
+      :master_interval => 5,
+      :lossy => false,
+      :mq_attr => nil,
+      :mq_umask => 0666,
+      :aggregate => Aggregate.new,
+    }.merge! params
+    @master_interval = opts[:master_interval]
+    @worker_interval = opts[:worker_interval]
+    @aggregate = opts[:aggregate]
+    @worker_queue = @worker_interval ? [] : nil
+
+    @mq_name = opts[:queue]
+    mq = POSIX_MQ.new @mq_name, :w, opts[:mq_umask], opts[:mq_attr]
+    Tempfile.open("raindrops_pmq") do |t|
+      @wr = File.open(t.path, "wb")
+      @rd = File.open(t.path, "rb")
+    end
+    @cached_aggregate = @aggregate
+    flush_master
+    @mq_send = if opts[:lossy]
+      @nr_dropped = 0
+      mq.nonblock = true
+      mq.method :trysend
+    else
+      mq.method :send
+    end
+  end
+
+  def << val
+    if q = @worker_queue
+      q << val
+      if q.size >= @worker_interval
+        mq_send(q) or @nr_dropped += 1
+        q.clear
+      end
+    else
+      mq_send(val) or @nr_dropped += 1
+    end
+  end
+
+  def mq_send(val)
+    @cached_aggregate = nil
+    @mq_send.call Marshal.dump(val)
+  end
+
+  def master_loop
+    buf = ""
+    a = @aggregate
+    nr = 0
+    mq = POSIX_MQ.new @mq_name, :r # this one is always blocking
+    begin
+      if (nr -= 1) < 0
+        nr = @master_interval
+        flush_master
+      end
+      data = Marshal.load(mq.shift(buf)) or return
+      Array === data ? data.each { |x| a << x } : a << data
+    end while true
+    ensure
+      flush_master
+  end
+
+  def aggregate
+    @cached_aggregate ||= begin
+      flush
+      Marshal.load(synchronize(@rd, RDLOCK) do |rd|
+        IO.pread rd.fileno, rd.stat.size, 0
+      end)
+    end
+  end
+
+  def flush_master
+    dump = Marshal.dump @aggregate
+    synchronize(@wr, WRLOCK) do |wr|
+      wr.truncate 0
+      IO.pwrite wr.fileno, dump, 0
+    end
+  end
+
+  def stop_master_loop
+    sleep 0.1 until mq_send(false)
+    rescue Errno::EINTR
+      retry
+  end
+
+  def lock! io, type
+    io.fcntl Fcntl::F_SETLKW, type
+    rescue Errno::EINTR
+      retry
+  end
+
+  def synchronize io, type
+    lock! io, type
+    yield io
+    ensure
+      lock! io, UNLOCK
+  end
+
+  def flush
+    if q = @local_queue && ! q.empty?
+      mq_send q
+      q.clear
+    end
+    nil
+  end
+
+  def count; aggregate.count; end
+  def max; aggregate.max; end
+  def min; aggregate.min; end
+  def sum; aggregate.sum; end
+  def mean; aggregate.mean; end
+  def std_dev; aggregate.std_dev; end
+  def outliers_low; aggregate.outliers_low; end
+  def outliers_high; aggregate.outliers_high; end
+  def to_s(*args); aggregate.to_s *args; end
+  def each; aggregate.each { |*args| yield *args }; end
+  def each_nonzero; aggregate.each_nonzero { |*args| yield *args }; end
+end
diff --git a/test/test_aggregate_pmq.rb b/test/test_aggregate_pmq.rb
new file mode 100644
index 0000000..ffa36cf
--- /dev/null
+++ b/test/test_aggregate_pmq.rb
@@ -0,0 +1,61 @@
+require "test/unit"
+require "raindrops"
+pmq = begin
+  Raindrops::Aggregate::PMQ
+rescue => LoadError
+  warn "W: #{e} skipping test"
+  false
+end
+
+Thread.abort_on_exception = true
+
+class TestAggregatePMQ < Test::Unit::TestCase
+
+  def setup
+    @queue = "/test.#{rand}"
+  end
+
+  def teardown
+    POSIX_MQ.unlink @queue
+  end
+
+  def test_run
+    pmq = Raindrops::Aggregate::PMQ.new :queue => @queue
+    thr = Thread.new { pmq.master_loop }
+    agg = Aggregate.new
+    (1..10).each { |i| pmq << i; agg << i }
+    pmq.stop_master_loop
+    assert thr.join
+    assert_equal agg.count, pmq.count
+    assert_equal agg.mean, pmq.mean
+    assert_equal agg.std_dev, pmq.std_dev
+    assert_equal agg.min, pmq.min
+    assert_equal agg.max, pmq.max
+    assert_equal agg.to_s, pmq.to_s
+  end
+
+  def test_multi_process
+    nr_workers = 4
+    nr = 100
+    pmq = Raindrops::Aggregate::PMQ.new :queue => @queue
+    pid = fork { pmq.master_loop }
+    workers = (1..nr_workers).map {
+      fork {
+        (1..nr).each { |i| pmq << i }
+        pmq.flush
+      }
+    }
+    workers.each { |pid| assert Process.waitpid2(pid).last.success? }
+    pmq.stop_master_loop
+    assert Process.waitpid2(pid).last.success?
+    assert_equal 400, pmq.count
+    agg = Aggregate.new
+    (1..nr_workers).map { (1..nr).each { |i| agg << i } }
+    assert_equal agg.to_s, pmq.to_s
+    assert_equal agg.mean, pmq.mean
+    assert_equal agg.std_dev, pmq.std_dev
+    assert_equal agg.min, pmq.min
+    assert_equal agg.max, pmq.max
+    assert_equal agg.to_s, pmq.to_s
+  end
+end if pmq