1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
| | require "test/unit"
require "raindrops"
pmq = begin
Raindrops::Aggregate::PMQ
rescue LoadError => e
warn "W: #{e} skipping test"
false
end
if RUBY_VERSION.to_f < 1.9
pmq = false
warn "W: skipping #{__FILE__}, only Ruby 1.9 supported for now"
end
Thread.abort_on_exception = true
class TestAggregatePMQ < Test::Unit::TestCase
def setup
@queue = "/test.#{rand}"
end
def teardown
POSIX_MQ.unlink @queue
end
def test_run
pmq = Raindrops::Aggregate::PMQ.new :queue => @queue
thr = Thread.new { pmq.master_loop }
agg = Aggregate.new
(1..10).each { |i| pmq << i; agg << i }
pmq.stop_master_loop
assert thr.join
assert_equal agg.count, pmq.count
assert_equal agg.mean, pmq.mean
assert_equal agg.std_dev, pmq.std_dev
assert_equal agg.min, pmq.min
assert_equal agg.max, pmq.max
assert_equal agg.to_s, pmq.to_s
end
def test_multi_process
nr_workers = 4
nr = 100
pmq = Raindrops::Aggregate::PMQ.new :queue => @queue
pid = fork { pmq.master_loop }
workers = (1..nr_workers).map {
fork {
(1..nr).each { |i| pmq << i }
pmq.flush
}
}
workers.each { |wpid| assert Process.waitpid2(wpid).last.success? }
pmq.stop_master_loop
assert Process.waitpid2(pid).last.success?
assert_equal 400, pmq.count
agg = Aggregate.new
(1..nr_workers).map { (1..nr).each { |i| agg << i } }
assert_equal agg.to_s, pmq.to_s
assert_equal agg.mean, pmq.mean
assert_equal agg.std_dev, pmq.std_dev
assert_equal agg.min, pmq.min
assert_equal agg.max, pmq.max
assert_equal agg.to_s, pmq.to_s
end
end if pmq
|