about summary refs log tree commit homepage
path: root/lib/raindrops/aggregate/pmq.rb
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2011-03-08 14:18:11 -0800
committerEric Wong <normalperson@yhbt.net>2011-03-08 14:18:11 -0800
commit90726e5187a9053c6eb7caf90ebec1d38d4372ea (patch)
treec1f7b53ca7f57a4c5ba2dee6160895a7abd23e8c /lib/raindrops/aggregate/pmq.rb
parent96c8be2ea8830e2eb3a9108f501df52c21b42546 (diff)
downloadraindrops-90726e5187a9053c6eb7caf90ebec1d38d4372ea.tar.gz
Seems to basically work
Diffstat (limited to 'lib/raindrops/aggregate/pmq.rb')
-rw-r--r--lib/raindrops/aggregate/pmq.rb15
1 files changed, 12 insertions, 3 deletions
diff --git a/lib/raindrops/aggregate/pmq.rb b/lib/raindrops/aggregate/pmq.rb
index 26b35f7..14f73be 100644
--- a/lib/raindrops/aggregate/pmq.rb
+++ b/lib/raindrops/aggregate/pmq.rb
@@ -13,9 +13,9 @@ class Raindrops::Aggregate::PMQ
 
   attr_reader :nr_dropped
 
-  def initialize(params)
+  def initialize(params = {})
     opts = {
-      :queue => "/raindrops",
+      :queue => ENV["RAINDROPS_MQUEUE"] || "/raindrops",
       :worker_interval => 10,
       :master_interval => 5,
       :lossy => false,
@@ -72,8 +72,17 @@ class Raindrops::Aggregate::PMQ
         nr = @master_interval
         flush_master
       end
-      data = Marshal.load(mq.shift(buf)) or return
+      mq.shift(buf)
+      data = begin
+        Marshal.load(buf) or return
+      rescue ArgumentError, TypeError
+        next
+      end
       Array === data ? data.each { |x| a << x } : a << data
+    rescue Errno::EINTR
+    rescue => e
+      warn "Unhandled exception in #{__FILE__}:#{__LINE__}: #{e}"
+      break
     end while true
     ensure
       flush_master