about summary refs log tree commit homepage
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/raindrops.rb1
-rw-r--r--lib/raindrops/aggregate.rb1
-rw-r--r--lib/raindrops/aggregate/last_data_recv.rb53
-rw-r--r--lib/raindrops/aggregate/pmq.rb15
-rw-r--r--lib/raindrops/last_data_recv.rb100
-rw-r--r--lib/raindrops/middleware.rb1
6 files changed, 168 insertions, 3 deletions
diff --git a/lib/raindrops.rb b/lib/raindrops.rb
index a35a158..88d65f6 100644
--- a/lib/raindrops.rb
+++ b/lib/raindrops.rb
@@ -31,5 +31,6 @@ class Raindrops
   autoload :Struct, 'raindrops/struct'
   autoload :Middleware, 'raindrops/middleware'
   autoload :Aggregate, 'raindrops/aggregate'
+  autoload :LastDataRecv, 'raindrops/last_data_recv'
 end
 require 'raindrops_ext'
diff --git a/lib/raindrops/aggregate.rb b/lib/raindrops/aggregate.rb
index 4f217de..5bb7c04 100644
--- a/lib/raindrops/aggregate.rb
+++ b/lib/raindrops/aggregate.rb
@@ -2,4 +2,5 @@
 require "aggregate"
 module Raindrops::Aggregate
   autoload :PMQ, "raindrops/aggregate/pmq"
+  autoload :LastDataRecv, "raindrops/aggregate/last_data_recv"
 end
diff --git a/lib/raindrops/aggregate/last_data_recv.rb b/lib/raindrops/aggregate/last_data_recv.rb
new file mode 100644
index 0000000..2935927
--- /dev/null
+++ b/lib/raindrops/aggregate/last_data_recv.rb
@@ -0,0 +1,53 @@
+# -*- encoding: binary -*-
+require "socket"
+#
+# Used to aggregate last_data_recv times
+module Raindrops::Aggregate::LastDataRecv
+  TCP_Info = Raindrops::TCP_Info
+  attr_accessor :raindrops_aggregate
+  @@default_aggregate = nil
+
+  def self.default_aggregate
+    @@default_aggregate ||= Raindrops::Aggregate::PMQ.new
+  end
+
+  def self.default_aggregate=(agg)
+    @@default_aggregate = agg
+  end
+
+  def self.cornify!
+    Unicorn::HttpServer::LISTENERS.each do |sock|
+      sock.extend(self) if TCPServer === sock
+    end
+  end
+
+  def self.extended(obj)
+    obj.raindrops_aggregate = default_aggregate
+    obj.setsockopt Socket::SOL_TCP, tcp_defer_accept = 9, seconds = 60
+  end
+
+  def kgio_tryaccept(*args)
+    count! super
+  end
+
+  def kgio_accept(*args)
+    count! super
+  end
+
+  def accept
+    count! super
+  end
+
+  def accept_nonblock
+    count! super
+  end
+
+  def count!(io)
+    if io
+      x = TCP_Info.new(io)
+      @raindrops_aggregate << x.last_data_recv
+    end
+    io
+  end
+end
+
diff --git a/lib/raindrops/aggregate/pmq.rb b/lib/raindrops/aggregate/pmq.rb
index 26b35f7..14f73be 100644
--- a/lib/raindrops/aggregate/pmq.rb
+++ b/lib/raindrops/aggregate/pmq.rb
@@ -13,9 +13,9 @@ class Raindrops::Aggregate::PMQ
 
   attr_reader :nr_dropped
 
-  def initialize(params)
+  def initialize(params = {})
     opts = {
-      :queue => "/raindrops",
+      :queue => ENV["RAINDROPS_MQUEUE"] || "/raindrops",
       :worker_interval => 10,
       :master_interval => 5,
       :lossy => false,
@@ -72,8 +72,17 @@ class Raindrops::Aggregate::PMQ
         nr = @master_interval
         flush_master
       end
-      data = Marshal.load(mq.shift(buf)) or return
+      mq.shift(buf)
+      data = begin
+        Marshal.load(buf) or return
+      rescue ArgumentError, TypeError
+        next
+      end
       Array === data ? data.each { |x| a << x } : a << data
+    rescue Errno::EINTR
+    rescue => e
+      warn "Unhandled exception in #{__FILE__}:#{__LINE__}: #{e}"
+      break
     end while true
     ensure
       flush_master
diff --git a/lib/raindrops/last_data_recv.rb b/lib/raindrops/last_data_recv.rb
new file mode 100644
index 0000000..3ec056e
--- /dev/null
+++ b/lib/raindrops/last_data_recv.rb
@@ -0,0 +1,100 @@
+# -*- encoding: binary -*-
+require "raindrops"
+
+# This is highly experimental!
+#
+# A self-contained Rack application for aggregating in the
+# +tcpi_last_data_recv+ field in +struct tcp_info+ if
+# /usr/include/linux/tcp.h.  This is only useful for Linux 2.6 and later.
+# This primarily supports Unicorn and derived servers, but may also be
+# used with any Ruby web server using the core TCPServer class in Ruby.
+#
+# Hitting the Rack endpoint configured for this application will return
+# a an ASCII histogram response body with the following headers:
+#
+# - X-Count   - number of requests received
+#
+# The following headers are only present if X-Count is greater than one.
+#
+# - X-Min     - lowest last_data_recv time recorded (in milliseconds)
+# - X-Max     - highest last_data_recv time recorded (in milliseconds)
+# - X-Mean    - mean last_data_recv time recorded (rounded, in milliseconds)
+# - X-Std-Dev - standard deviation of last_data_recv times
+# - X-Outliers-Low - number of low outliers (hopefully many!)
+# - X-Outliers-High - number of high outliers (hopefully zero!)
+#
+# == To use with Unicorn and derived servers (preload_app=false):
+#
+# Put the following in our Unicorn config file (not config.ru):
+#
+#   require "raindrops/last_data_recv"
+#
+# Then follow the instructions below for config.ru:
+#
+# == To use with any Rack server using TCPServer
+#
+# Setup a route for Raindrops::LastDataRecv in your Rackup config file
+# (typically config.ru):
+#
+#   require "raindrops"
+#   map "/raindrops/last_data_recv" do
+#     run Raindrops::LastDataRecv.new
+#   end
+#   map "/" do
+#     use SomeMiddleware
+#     use MoreMiddleware
+#     # ...
+#     run YourAppHere.new
+#   end
+#
+# == To use with any other Ruby web server that uses TCPServer
+#
+# Put the following in any piece of Ruby code loaded after the server has
+# bound its TCP listeners:
+#
+#   ObjectSpace.each_object(TCPServer) do |s|
+#     s.extend Raindrops::Aggregate::LastDataRecv
+#   end
+#
+#   Thread.new do
+#     Raindrops::Aggregate::LastDataRecv.default_aggregate.master_loop
+#   end
+#
+# Then follow the above instructions for config.ru
+#
+class Raindrops::LastDataRecv
+  # :stopdoc:
+
+  # trigger autoloads
+  if defined?(Unicorn)
+    agg = Raindrops::Aggregate::LastDataRecv.default_aggregate
+    AGGREGATE_THREAD = Thread.new { agg.master_loop }
+  end
+  # :startdoc
+
+  def initialize(opts = {})
+    Raindrops::Aggregate::LastDataRecv.cornify! if defined?(Unicorn)
+    @aggregate =
+      opts[:aggregate] || Raindrops::Aggregate::LastDataRecv.default_aggregate
+  end
+
+  def call(_)
+    a = @aggregate
+    count = a.count
+    headers = {
+      "Content-Type" => "text/plain",
+      "X-Count" => count.to_s,
+    }
+    if count > 1
+      headers["X-Min"] = a.min.to_s
+      headers["X-Max"] = a.max.to_s
+      headers["X-Mean"] = a.mean.round.to_s
+      headers["X-Std-Dev"] = a.std_dev.round.to_s
+      headers["X-Outliers-Low"] = a.outliers_low.to_s
+      headers["X-Outliers-High"] = a.outliers_high.to_s
+    end
+    body = a.to_s
+    headers["Content-Length"] = body.size.to_s
+    [ 200, headers, [ body ] ]
+  end
+end
diff --git a/lib/raindrops/middleware.rb b/lib/raindrops/middleware.rb
index 1ea4863..d45fa1a 100644
--- a/lib/raindrops/middleware.rb
+++ b/lib/raindrops/middleware.rb
@@ -10,6 +10,7 @@ class Raindrops::Middleware
   Stats = Raindrops::Struct.new(:calling, :writing)
   PATH_INFO = "PATH_INFO"
   require "raindrops/middleware/proxy"
+  autoload :TCP, "raindrops/middleware/tcp"
   # :startdoc:
 
   def initialize(app, opts = {})