about summary refs log tree commit homepage
path: root/test/test_aggregate_pmq.rb
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_aggregate_pmq.rb')
-rw-r--r--test/test_aggregate_pmq.rb61
1 files changed, 61 insertions, 0 deletions
diff --git a/test/test_aggregate_pmq.rb b/test/test_aggregate_pmq.rb
new file mode 100644
index 0000000..ffa36cf
--- /dev/null
+++ b/test/test_aggregate_pmq.rb
@@ -0,0 +1,61 @@
+require "test/unit"
+require "raindrops"
+pmq = begin
+  Raindrops::Aggregate::PMQ
+rescue => LoadError
+  warn "W: #{e} skipping test"
+  false
+end
+
+Thread.abort_on_exception = true
+
+class TestAggregatePMQ < Test::Unit::TestCase
+
+  def setup
+    @queue = "/test.#{rand}"
+  end
+
+  def teardown
+    POSIX_MQ.unlink @queue
+  end
+
+  def test_run
+    pmq = Raindrops::Aggregate::PMQ.new :queue => @queue
+    thr = Thread.new { pmq.master_loop }
+    agg = Aggregate.new
+    (1..10).each { |i| pmq << i; agg << i }
+    pmq.stop_master_loop
+    assert thr.join
+    assert_equal agg.count, pmq.count
+    assert_equal agg.mean, pmq.mean
+    assert_equal agg.std_dev, pmq.std_dev
+    assert_equal agg.min, pmq.min
+    assert_equal agg.max, pmq.max
+    assert_equal agg.to_s, pmq.to_s
+  end
+
+  def test_multi_process
+    nr_workers = 4
+    nr = 100
+    pmq = Raindrops::Aggregate::PMQ.new :queue => @queue
+    pid = fork { pmq.master_loop }
+    workers = (1..nr_workers).map {
+      fork {
+        (1..nr).each { |i| pmq << i }
+        pmq.flush
+      }
+    }
+    workers.each { |pid| assert Process.waitpid2(pid).last.success? }
+    pmq.stop_master_loop
+    assert Process.waitpid2(pid).last.success?
+    assert_equal 400, pmq.count
+    agg = Aggregate.new
+    (1..nr_workers).map { (1..nr).each { |i| agg << i } }
+    assert_equal agg.to_s, pmq.to_s
+    assert_equal agg.mean, pmq.mean
+    assert_equal agg.std_dev, pmq.std_dev
+    assert_equal agg.min, pmq.min
+    assert_equal agg.max, pmq.max
+    assert_equal agg.to_s, pmq.to_s
+  end
+end if pmq