about summary refs log tree commit homepage
path: root/lib/raindrops/aggregate
diff options
context:
space:
mode:
Diffstat (limited to 'lib/raindrops/aggregate')
-rw-r--r--lib/raindrops/aggregate/last_data_recv.rb53
-rw-r--r--lib/raindrops/aggregate/pmq.rb15
2 files changed, 65 insertions, 3 deletions
diff --git a/lib/raindrops/aggregate/last_data_recv.rb b/lib/raindrops/aggregate/last_data_recv.rb
new file mode 100644
index 0000000..2935927
--- /dev/null
+++ b/lib/raindrops/aggregate/last_data_recv.rb
@@ -0,0 +1,53 @@
+# -*- encoding: binary -*-
+require "socket"
+#
+# Used to aggregate last_data_recv times
+module Raindrops::Aggregate::LastDataRecv
+  TCP_Info = Raindrops::TCP_Info
+  attr_accessor :raindrops_aggregate
+  @@default_aggregate = nil
+
+  def self.default_aggregate
+    @@default_aggregate ||= Raindrops::Aggregate::PMQ.new
+  end
+
+  def self.default_aggregate=(agg)
+    @@default_aggregate = agg
+  end
+
+  def self.cornify!
+    Unicorn::HttpServer::LISTENERS.each do |sock|
+      sock.extend(self) if TCPServer === sock
+    end
+  end
+
+  def self.extended(obj)
+    obj.raindrops_aggregate = default_aggregate
+    obj.setsockopt Socket::SOL_TCP, tcp_defer_accept = 9, seconds = 60
+  end
+
+  def kgio_tryaccept(*args)
+    count! super
+  end
+
+  def kgio_accept(*args)
+    count! super
+  end
+
+  def accept
+    count! super
+  end
+
+  def accept_nonblock
+    count! super
+  end
+
+  def count!(io)
+    if io
+      x = TCP_Info.new(io)
+      @raindrops_aggregate << x.last_data_recv
+    end
+    io
+  end
+end
+
diff --git a/lib/raindrops/aggregate/pmq.rb b/lib/raindrops/aggregate/pmq.rb
index 26b35f7..14f73be 100644
--- a/lib/raindrops/aggregate/pmq.rb
+++ b/lib/raindrops/aggregate/pmq.rb
@@ -13,9 +13,9 @@ class Raindrops::Aggregate::PMQ
 
   attr_reader :nr_dropped
 
-  def initialize(params)
+  def initialize(params = {})
     opts = {
-      :queue => "/raindrops",
+      :queue => ENV["RAINDROPS_MQUEUE"] || "/raindrops",
       :worker_interval => 10,
       :master_interval => 5,
       :lossy => false,
@@ -72,8 +72,17 @@ class Raindrops::Aggregate::PMQ
         nr = @master_interval
         flush_master
       end
-      data = Marshal.load(mq.shift(buf)) or return
+      mq.shift(buf)
+      data = begin
+        Marshal.load(buf) or return
+      rescue ArgumentError, TypeError
+        next
+      end
       Array === data ? data.each { |x| a << x } : a << data
+    rescue Errno::EINTR
+    rescue => e
+      warn "Unhandled exception in #{__FILE__}:#{__LINE__}: #{e}"
+      break
     end while true
     ensure
       flush_master