diff options
Diffstat (limited to 'lib/raindrops/aggregate/pmq.rb')
-rw-r--r-- | lib/raindrops/aggregate/pmq.rb | 15 |
1 files changed, 12 insertions, 3 deletions
diff --git a/lib/raindrops/aggregate/pmq.rb b/lib/raindrops/aggregate/pmq.rb index 26b35f7..14f73be 100644 --- a/lib/raindrops/aggregate/pmq.rb +++ b/lib/raindrops/aggregate/pmq.rb @@ -13,9 +13,9 @@ class Raindrops::Aggregate::PMQ attr_reader :nr_dropped - def initialize(params) + def initialize(params = {}) opts = { - :queue => "/raindrops", + :queue => ENV["RAINDROPS_MQUEUE"] || "/raindrops", :worker_interval => 10, :master_interval => 5, :lossy => false, @@ -72,8 +72,17 @@ class Raindrops::Aggregate::PMQ nr = @master_interval flush_master end - data = Marshal.load(mq.shift(buf)) or return + mq.shift(buf) + data = begin + Marshal.load(buf) or return + rescue ArgumentError, TypeError + next + end Array === data ? data.each { |x| a << x } : a << data + rescue Errno::EINTR + rescue => e + warn "Unhandled exception in #{__FILE__}:#{__LINE__}: #{e}" + break end while true ensure flush_master |