diff options
author | Eric Wong <normalperson@yhbt.net> | 2011-03-08 14:18:11 -0800 |
---|---|---|
committer | Eric Wong <normalperson@yhbt.net> | 2011-03-08 14:18:11 -0800 |
commit | 90726e5187a9053c6eb7caf90ebec1d38d4372ea (patch) | |
tree | c1f7b53ca7f57a4c5ba2dee6160895a7abd23e8c /lib/raindrops/aggregate/pmq.rb | |
parent | 96c8be2ea8830e2eb3a9108f501df52c21b42546 (diff) | |
download | raindrops-90726e5187a9053c6eb7caf90ebec1d38d4372ea.tar.gz |
Seems to basically work
Diffstat (limited to 'lib/raindrops/aggregate/pmq.rb')
-rw-r--r-- | lib/raindrops/aggregate/pmq.rb | 15 |
1 files changed, 12 insertions, 3 deletions
diff --git a/lib/raindrops/aggregate/pmq.rb b/lib/raindrops/aggregate/pmq.rb index 26b35f7..14f73be 100644 --- a/lib/raindrops/aggregate/pmq.rb +++ b/lib/raindrops/aggregate/pmq.rb @@ -13,9 +13,9 @@ class Raindrops::Aggregate::PMQ attr_reader :nr_dropped - def initialize(params) + def initialize(params = {}) opts = { - :queue => "/raindrops", + :queue => ENV["RAINDROPS_MQUEUE"] || "/raindrops", :worker_interval => 10, :master_interval => 5, :lossy => false, @@ -72,8 +72,17 @@ class Raindrops::Aggregate::PMQ nr = @master_interval flush_master end - data = Marshal.load(mq.shift(buf)) or return + mq.shift(buf) + data = begin + Marshal.load(buf) or return + rescue ArgumentError, TypeError + next + end Array === data ? data.each { |x| a << x } : a << data + rescue Errno::EINTR + rescue => e + warn "Unhandled exception in #{__FILE__}:#{__LINE__}: #{e}" + break end while true ensure flush_master |