about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2011-03-08 14:18:11 -0800
committerEric Wong <normalperson@yhbt.net>2011-03-08 14:18:11 -0800
commit90726e5187a9053c6eb7caf90ebec1d38d4372ea (patch)
treec1f7b53ca7f57a4c5ba2dee6160895a7abd23e8c
parent96c8be2ea8830e2eb3a9108f501df52c21b42546 (diff)
downloadraindrops-90726e5187a9053c6eb7caf90ebec1d38d4372ea.tar.gz
Seems to basically work
-rw-r--r--lib/raindrops.rb1
-rw-r--r--lib/raindrops/aggregate.rb1
-rw-r--r--lib/raindrops/aggregate/last_data_recv.rb53
-rw-r--r--lib/raindrops/aggregate/pmq.rb15
-rw-r--r--lib/raindrops/last_data_recv.rb100
-rw-r--r--lib/raindrops/middleware.rb1
-rw-r--r--test/test_last_data_recv_unicorn.rb65
7 files changed, 233 insertions, 3 deletions
diff --git a/lib/raindrops.rb b/lib/raindrops.rb
index a35a158..88d65f6 100644
--- a/lib/raindrops.rb
+++ b/lib/raindrops.rb
@@ -31,5 +31,6 @@ class Raindrops
   autoload :Struct, 'raindrops/struct'
   autoload :Middleware, 'raindrops/middleware'
   autoload :Aggregate, 'raindrops/aggregate'
+  autoload :LastDataRecv, 'raindrops/last_data_recv'
 end
 require 'raindrops_ext'
diff --git a/lib/raindrops/aggregate.rb b/lib/raindrops/aggregate.rb
index 4f217de..5bb7c04 100644
--- a/lib/raindrops/aggregate.rb
+++ b/lib/raindrops/aggregate.rb
@@ -2,4 +2,5 @@
 require "aggregate"
 module Raindrops::Aggregate
   autoload :PMQ, "raindrops/aggregate/pmq"
+  autoload :LastDataRecv, "raindrops/aggregate/last_data_recv"
 end
diff --git a/lib/raindrops/aggregate/last_data_recv.rb b/lib/raindrops/aggregate/last_data_recv.rb
new file mode 100644
index 0000000..2935927
--- /dev/null
+++ b/lib/raindrops/aggregate/last_data_recv.rb
@@ -0,0 +1,53 @@
+# -*- encoding: binary -*-
+require "socket"
+#
+# Used to aggregate last_data_recv times
+module Raindrops::Aggregate::LastDataRecv
+  TCP_Info = Raindrops::TCP_Info
+  attr_accessor :raindrops_aggregate
+  @@default_aggregate = nil
+
+  def self.default_aggregate
+    @@default_aggregate ||= Raindrops::Aggregate::PMQ.new
+  end
+
+  def self.default_aggregate=(agg)
+    @@default_aggregate = agg
+  end
+
+  def self.cornify!
+    Unicorn::HttpServer::LISTENERS.each do |sock|
+      sock.extend(self) if TCPServer === sock
+    end
+  end
+
+  def self.extended(obj)
+    obj.raindrops_aggregate = default_aggregate
+    obj.setsockopt Socket::SOL_TCP, tcp_defer_accept = 9, seconds = 60
+  end
+
+  def kgio_tryaccept(*args)
+    count! super
+  end
+
+  def kgio_accept(*args)
+    count! super
+  end
+
+  def accept
+    count! super
+  end
+
+  def accept_nonblock
+    count! super
+  end
+
+  def count!(io)
+    if io
+      x = TCP_Info.new(io)
+      @raindrops_aggregate << x.last_data_recv
+    end
+    io
+  end
+end
+
diff --git a/lib/raindrops/aggregate/pmq.rb b/lib/raindrops/aggregate/pmq.rb
index 26b35f7..14f73be 100644
--- a/lib/raindrops/aggregate/pmq.rb
+++ b/lib/raindrops/aggregate/pmq.rb
@@ -13,9 +13,9 @@ class Raindrops::Aggregate::PMQ
 
   attr_reader :nr_dropped
 
-  def initialize(params)
+  def initialize(params = {})
     opts = {
-      :queue => "/raindrops",
+      :queue => ENV["RAINDROPS_MQUEUE"] || "/raindrops",
       :worker_interval => 10,
       :master_interval => 5,
       :lossy => false,
@@ -72,8 +72,17 @@ class Raindrops::Aggregate::PMQ
         nr = @master_interval
         flush_master
       end
-      data = Marshal.load(mq.shift(buf)) or return
+      mq.shift(buf)
+      data = begin
+        Marshal.load(buf) or return
+      rescue ArgumentError, TypeError
+        next
+      end
       Array === data ? data.each { |x| a << x } : a << data
+    rescue Errno::EINTR
+    rescue => e
+      warn "Unhandled exception in #{__FILE__}:#{__LINE__}: #{e}"
+      break
     end while true
     ensure
       flush_master
diff --git a/lib/raindrops/last_data_recv.rb b/lib/raindrops/last_data_recv.rb
new file mode 100644
index 0000000..3ec056e
--- /dev/null
+++ b/lib/raindrops/last_data_recv.rb
@@ -0,0 +1,100 @@
+# -*- encoding: binary -*-
+require "raindrops"
+
+# This is highly experimental!
+#
+# A self-contained Rack application for aggregating in the
+# +tcpi_last_data_recv+ field in +struct tcp_info+ if
+# /usr/include/linux/tcp.h.  This is only useful for Linux 2.6 and later.
+# This primarily supports Unicorn and derived servers, but may also be
+# used with any Ruby web server using the core TCPServer class in Ruby.
+#
+# Hitting the Rack endpoint configured for this application will return
+# a an ASCII histogram response body with the following headers:
+#
+# - X-Count   - number of requests received
+#
+# The following headers are only present if X-Count is greater than one.
+#
+# - X-Min     - lowest last_data_recv time recorded (in milliseconds)
+# - X-Max     - highest last_data_recv time recorded (in milliseconds)
+# - X-Mean    - mean last_data_recv time recorded (rounded, in milliseconds)
+# - X-Std-Dev - standard deviation of last_data_recv times
+# - X-Outliers-Low - number of low outliers (hopefully many!)
+# - X-Outliers-High - number of high outliers (hopefully zero!)
+#
+# == To use with Unicorn and derived servers (preload_app=false):
+#
+# Put the following in our Unicorn config file (not config.ru):
+#
+#   require "raindrops/last_data_recv"
+#
+# Then follow the instructions below for config.ru:
+#
+# == To use with any Rack server using TCPServer
+#
+# Setup a route for Raindrops::LastDataRecv in your Rackup config file
+# (typically config.ru):
+#
+#   require "raindrops"
+#   map "/raindrops/last_data_recv" do
+#     run Raindrops::LastDataRecv.new
+#   end
+#   map "/" do
+#     use SomeMiddleware
+#     use MoreMiddleware
+#     # ...
+#     run YourAppHere.new
+#   end
+#
+# == To use with any other Ruby web server that uses TCPServer
+#
+# Put the following in any piece of Ruby code loaded after the server has
+# bound its TCP listeners:
+#
+#   ObjectSpace.each_object(TCPServer) do |s|
+#     s.extend Raindrops::Aggregate::LastDataRecv
+#   end
+#
+#   Thread.new do
+#     Raindrops::Aggregate::LastDataRecv.default_aggregate.master_loop
+#   end
+#
+# Then follow the above instructions for config.ru
+#
+class Raindrops::LastDataRecv
+  # :stopdoc:
+
+  # trigger autoloads
+  if defined?(Unicorn)
+    agg = Raindrops::Aggregate::LastDataRecv.default_aggregate
+    AGGREGATE_THREAD = Thread.new { agg.master_loop }
+  end
+  # :startdoc
+
+  def initialize(opts = {})
+    Raindrops::Aggregate::LastDataRecv.cornify! if defined?(Unicorn)
+    @aggregate =
+      opts[:aggregate] || Raindrops::Aggregate::LastDataRecv.default_aggregate
+  end
+
+  def call(_)
+    a = @aggregate
+    count = a.count
+    headers = {
+      "Content-Type" => "text/plain",
+      "X-Count" => count.to_s,
+    }
+    if count > 1
+      headers["X-Min"] = a.min.to_s
+      headers["X-Max"] = a.max.to_s
+      headers["X-Mean"] = a.mean.round.to_s
+      headers["X-Std-Dev"] = a.std_dev.round.to_s
+      headers["X-Outliers-Low"] = a.outliers_low.to_s
+      headers["X-Outliers-High"] = a.outliers_high.to_s
+    end
+    body = a.to_s
+    headers["Content-Length"] = body.size.to_s
+    [ 200, headers, [ body ] ]
+  end
+end
diff --git a/lib/raindrops/middleware.rb b/lib/raindrops/middleware.rb
index 1ea4863..d45fa1a 100644
--- a/lib/raindrops/middleware.rb
+++ b/lib/raindrops/middleware.rb
@@ -10,6 +10,7 @@ class Raindrops::Middleware
   Stats = Raindrops::Struct.new(:calling, :writing)
   PATH_INFO = "PATH_INFO"
   require "raindrops/middleware/proxy"
+  autoload :TCP, "raindrops/middleware/tcp"
   # :startdoc:
 
   def initialize(app, opts = {})
diff --git a/test/test_last_data_recv_unicorn.rb b/test/test_last_data_recv_unicorn.rb
new file mode 100644
index 0000000..19aba0d
--- /dev/null
+++ b/test/test_last_data_recv_unicorn.rb
@@ -0,0 +1,65 @@
+# -*- encoding: binary -*-
+require "./test/rack_unicorn"
+require "tempfile"
+require "net/http"
+
+$stderr.sync = $stdout.sync = true
+pmq = begin
+  Raindrops::Aggregate::PMQ
+rescue => LoadError
+  warn "W: #{e} skipping test"
+  false
+end
+
+class TestLastDataRecvUnicorn < Test::Unit::TestCase
+  def setup
+    @queue = "/test.#{rand}"
+    @host = ENV["UNICORN_TEST_ADDR"] || "127.0.0.1"
+    @sock = TCPServer.new @host, 0
+    @port = @sock.addr[1]
+    ENV["UNICORN_FD"] = @sock.fileno.to_s
+    @host_with_port = "#@host:#@port"
+    @cfg = Tempfile.new 'unicorn_config_file'
+    @cfg.puts "require 'raindrops'"
+    @cfg.puts "preload_app true"
+      ENV['RAINDROPS_MQUEUE'] = @queue
+    # @cfg.puts "worker_processes 4"
+    @opts = { :listeners => [ @host_with_port ], :config_file => @cfg.path }
+  end
+
+  def test_auto_listener
+    @srv = fork {
+      Thread.abort_on_exception = true
+      app = %q!Rack::Builder.new do
+        map("/ldr") { run Raindrops::LastDataRecv.new }
+        map("/") { run Rack::Lobster.new }
+      end.to_app!
+      def app.arity; 0; end
+      def app.call; eval self; end
+      Unicorn.run(app, @opts)
+    }
+    400.times { assert_kind_of Net::HTTPSuccess, get("/") }
+    resp = get("/ldr")
+    # # p(resp.methods - Object.methods)
+    # resp.each_header { |k,v| p [k, "=" , v] }
+    assert resp.header["x-count"]
+    assert resp.header["x-min"]
+    assert resp.header["x-max"]
+    assert resp.header["x-mean"]
+    assert resp.header["x-std-dev"]
+    assert resp.header["x-outliers-low"]
+    assert resp.header["x-outliers-high"]
+    assert resp.body.size > 0
+  end
+
+  def get(path)
+    Net::HTTP.start(@host, @port) { |http| http.get path }
+  end
+
+  def teardown
+    Process.kill :QUIT, @srv
+    _, status = Process.waitpid2 @srv
+    assert status.success?
+    POSIX_MQ.unlink @queue
+  end
+end if defined?(Unicorn) && RUBY_PLATFORM =~ /linux/ && pmq