about summary refs log tree commit homepage
diff options
context:
space:
mode:
-rw-r--r--config.ru2
-rw-r--r--examples/zbatery.config.rb6
-rw-r--r--lib/raindrops.rb1
-rw-r--r--lib/raindrops/watcher.rb338
-rw-r--r--test/test_watcher.rb85
5 files changed, 432 insertions, 0 deletions
diff --git a/config.ru b/config.ru
new file mode 100644
index 0000000..b8f6160
--- /dev/null
+++ b/config.ru
@@ -0,0 +1,2 @@
+require "raindrops"
+run Raindrops::Watcher.new
diff --git a/examples/zbatery.config.rb b/examples/zbatery.config.rb
new file mode 100644
index 0000000..38aa4b6
--- /dev/null
+++ b/examples/zbatery.config.rb
@@ -0,0 +1,6 @@
+# Used for running Raindrops::Watcher, which requires a multi-threaded
+# Rack server capable of streaming a response.  Threads must be used,
+# so Zbatery is recommended: http://zbatery.bogomip.org/
+Rainbows! do
+  use :ThreadSpawn
+end
diff --git a/lib/raindrops.rb b/lib/raindrops.rb
index 073b117..296dc94 100644
--- a/lib/raindrops.rb
+++ b/lib/raindrops.rb
@@ -43,5 +43,6 @@ class Raindrops
   autoload :Middleware, 'raindrops/middleware'
   autoload :Aggregate, 'raindrops/aggregate'
   autoload :LastDataRecv, 'raindrops/last_data_recv'
+  autoload :Watcher, 'raindrops/watcher'
 end
 require 'raindrops_ext'
diff --git a/lib/raindrops/watcher.rb b/lib/raindrops/watcher.rb
new file mode 100644
index 0000000..51e1b19
--- /dev/null
+++ b/lib/raindrops/watcher.rb
@@ -0,0 +1,338 @@
+# -*- encoding: binary -*-
+require "thread"
+require "time"
+require "socket"
+require "rack"
+require "aggregate"
+
+# Raindrops::Watcher is a stand-alone Rack application for watching
+# any number of TCP and UNIX listeners (all of them by default).
+#
+# It depends on the {Aggregate RubyGem}[http://rubygems.org/gems/aggregate]
+#
+# In your Rack config.ru:
+#
+#    run Raindrops::Watcher(options = {})
+#
+# It takes the following options hash:
+#
+# - :listeners - an array of listener names, (e.g. %w(0.0.0.0:80 /tmp/sock))
+# - :delay - interval between stats updates in seconds (default: 1)
+#
+# Raindrops::Watcher is compatible any thread-safe/thread-aware Rack
+# middleware.  It does not work well with multi-process web servers
+# but can be used to monitor them.  It consumes minimal resources
+# with the default :delay.
+#
+# == HTTP endpoints
+#
+# === GET /
+#
+# Returns an HTML summary listing of all listen interfaces watched on
+#
+# === GET /active/$LISTENER.txt
+#
+# Returns a plain text summary + histogram with X-* HTTP headers for
+# active connections.
+#
+# e.g.: curl http://example.com/active/0.0.0.0%3A80.txt
+#
+# === GET /active/$LISTENER.html
+#
+# Returns an HTML summary + histogram with X-* HTTP headers for
+# active connections.
+#
+# e.g.: curl http://example.com/active/0.0.0.0%3A80.html
+#
+# === GET /queued/$LISTENER.txt
+#
+# Returns a plain text summary + histogram with X-* HTTP headers for
+# queued connections.
+#
+# e.g.: curl http://example.com/queued/0.0.0.0%3A80.txt
+#
+# === GET /queued/$LISTENER.html
+#
+# Returns an HTML summary + histogram with X-* HTTP headers for
+# queued connections.
+#
+# e.g.: curl http://example.com/queued/0.0.0.0%3A80.html
+#
+# === GET /tail/$LISTENER.txt?active_min=1&queued_min=1
+#
+# Streams chunked a response to the client.
+# Interval is the preconfigured +:delay+ of the application (default 1 second)
+#
+# The response is plain text in the following format:
+#
+#   ISO8601_TIMESTAMP LISTENER_NAME ACTIVE_COUNT QUEUED_COUNT LINEFEED
+#
+# Query parameters:
+#
+# - active_min - do not stream a line until this active count is reached
+# - queued_min - do not stream a line until this queued count is reached
+#
+# == Response headers (mostly the same as Raindrops::LastDataRecv)
+#
+# - X-Count   - number of requests received
+# - X-Last-Reset - date since the last reset
+#
+# The following headers are only present if X-Count is greater than one.
+#
+# - X-Min     - lowest last_data_recv time recorded (in milliseconds)
+# - X-Max     - highest last_data_recv time recorded (in milliseconds)
+# - X-Mean    - mean last_data_recv time recorded (rounded, in milliseconds)
+# - X-Std-Dev - standard deviation of last_data_recv times
+# - X-Outliers-Low - number of low outliers (hopefully many!)
+# - X-Outliers-High - number of high outliers (hopefully zero!)
+#
+class Raindrops::Watcher
+  # :stopdoc:
+  attr_reader :snapshot
+  include Rack::Utils
+  include Raindrops::Linux
+
+  def initialize(opts = {})
+    @tcp_listeners = @unix_listeners = nil
+    if l = opts[:listeners]
+      tcp, unix = [], []
+      Array(l).each { |addr| (addr =~ %r{\A/} ? unix : tcp) << addr }
+      unless tcp.empty? && unix.empty?
+        @tcp_listeners = tcp
+        @unix_listeners = unix
+      end
+    end
+
+    agg_class = opts[:agg_class] || Aggregate
+    start = Time.now.utc
+    @active = Hash.new { |h,k| h[k] = agg_class.new }
+    @queued = Hash.new { |h,k| h[k] = agg_class.new }
+    @resets = Hash.new { |h,k| h[k] = start }
+    @snapshot = [ start, {} ]
+    @delay = opts[:delay] || 1
+    @lock = Mutex.new
+    @start = Mutex.new
+    @cond = ConditionVariable.new
+    @thr = nil
+  end
+
+  def hostname
+    Socket.gethostname
+  end
+
+  # rack endpoint
+  def call(env)
+    @start.synchronize { @thr ||= aggregator_thread(env["rack.logger"]) }
+    case env["REQUEST_METHOD"]
+    when "HEAD", "GET"
+      get env
+    when "POST"
+      post env
+    else
+      Rack::Response.new(["Method Not Allowed"], 405).finish
+    end
+  end
+
+  def aggregator_thread(logger) # :nodoc:
+    @socket = sock = Raindrops::InetDiagSocket.new
+    thr = Thread.new do
+      begin
+        combined = tcp_listener_stats(@tcp_listeners, sock)
+        combined.merge!(unix_listener_stats(@unix_listeners))
+        @lock.synchronize do
+          combined.each do |addr,stats|
+            @active[addr] << stats.active
+            @queued[addr] << stats.queued
+          end
+          @snapshot = [ Time.now.utc, combined ]
+          @cond.broadcast
+        end
+      rescue => e
+        logger.error "#{e.class} #{e.inspect}"
+      end while sleep(@delay) && @socket
+      sock.close
+    end
+    wait_snapshot
+    thr
+  end
+
+  def active_stats(addr) # :nodoc:
+    @lock.synchronize do
+      tmp = @active[addr] or return
+      [ @resets[addr], tmp.dup ]
+    end
+  end
+
+  def queued_stats(addr) # :nodoc:
+    @lock.synchronize do
+      tmp = @queued[addr] or return
+      [ @resets[addr], tmp.dup ]
+    end
+  end
+
+  def wait_snapshot
+    @lock.synchronize do
+      @cond.wait @lock
+      @snapshot
+    end
+  end
+
+  def agg_to_hash(reset_at, agg)
+    {
+      "X-Count" => agg.count.to_s,
+      "X-Min" => agg.min.to_s,
+      "X-Max" => agg.max.to_s,
+      "X-Mean" => agg.mean.to_s,
+      "X-Std-Dev" => agg.std_dev.to_s,
+      "X-Outliers-Low" => agg.outliers_low.to_s,
+      "X-Outliers-High" => agg.outliers_high.to_s,
+      "X-Last-Reset" => reset_at.httpdate,
+    }
+  end
+
+  def histogram_txt(agg)
+    reset_at, agg = *agg
+    headers = agg_to_hash(reset_at, agg)
+    body = agg.to_s
+    headers["Content-Type"] = "text/plain"
+    headers["Content-Length"] = bytesize(body).to_s
+    [ 200, headers, [ body ] ]
+  end
+
+  def histogram_html(agg, addr)
+    reset_at, agg = *agg
+    headers = agg_to_hash(reset_at, agg)
+    body = "<html>" \
+      "<head><title>#{hostname} - #{escape_html addr}</title></head>" \
+      "<body><table>" <<
+      headers.map { |k,v|
+        "<tr><td>#{k.gsub(/^X-/, '')}</td><td>#{v}</td></tr>"
+      }.join << "</table><pre>#{escape_html agg}</pre>" \
+      "<form action='/reset/#{escape addr}' method='post'>" \
+      "<input type='submit' name='x' value='reset' /></form>" \
+      "</body>"
+    headers["Content-Type"] = "text/html"
+    headers["Content-Length"] = bytesize(body).to_s
+    [ 200, headers, [ body ] ]
+  end
+
+  def get(env)
+    case env["PATH_INFO"]
+    when "/"
+      index
+    when %r{\A/active/(.+)\.txt\z}
+      histogram_txt(active_stats(unescape($1)))
+    when %r{\A/active/(.+)\.html\z}
+      addr = unescape $1
+      histogram_html(active_stats(addr), addr)
+    when %r{\A/queued/(.+)\.txt\z}
+      histogram_txt(queued_stats(unescape($1)))
+    when %r{\A/queued/(.+)\.html\z}
+      addr = unescape $1
+      histogram_html(queued_stats(addr), addr)
+    when %r{\A/tail/(.+)\.txt\z}
+      tail(unescape($1), env)
+    else
+      not_found
+    end
+  end
+
+  def not_found
+    Rack::Response.new(["Not Found"], 404).finish
+  end
+
+  def post(env)
+    case env["PATH_INFO"]
+    when %r{\A/reset/(.+)\z}
+      reset!(env, unescape($1))
+    else
+      Rack::Response.new(["Not Found"], 404).finish
+    end
+  end
+
+  def reset!(env, addr)
+    @lock.synchronize do
+      @active.include?(addr) or return not_found
+      @active.delete addr
+      @queued.delete addr
+      @resets[addr] = Time.now.utc
+      @cond.wait @lock
+    end
+    req = Rack::Request.new(env)
+    res = Rack::Response.new
+    url = req.referer || "#{req.host_with_port}/"
+    res.redirect(url)
+    res.content_type.replace "text/plain"
+    res.write "Redirecting to #{url}"
+    res.finish
+  end
+
+  def index
+    updated_at, all = snapshot
+    headers = {
+      "Content-Type" => "text/html",
+      "Last-Modified" => updated_at.httpdate,
+    }
+    body = "<html><head>" \
+      "<title>#{hostname} - all interfaces</title>" \
+      "</head><body><h3>Updated at #{updated_at.iso8601}</h3>" \
+      "<table><tr>" \
+        "<th>address</th><th>active</th><th>queued</th><th>reset</th>" \
+      "</tr>" <<
+      all.map do |addr,stats|
+        e_addr = escape addr
+        "<tr>" \
+          "<td><a href='/tail/#{e_addr}.txt'>#{escape_html addr}</a></td>" \
+          "<td><a href='/active/#{e_addr}.html'>#{stats.active}</a></td>" \
+          "<td><a href='/queued/#{e_addr}.html'>#{stats.queued}</a></td>" \
+          "<td><form action='/reset/#{e_addr}' method='post'>" \
+            "<input type='submit' name='x' value='x' /></form></td>" \
+        "</tr>" \
+      end.join << "</table></body></html>"
+    headers["Content-Length"] = bytesize(body).to_s
+    [ 200, headers, [ body ] ]
+  end
+
+  def tail(addr, env)
+    [ 200,
+      { "Transfer-Encoding" => "chunked", "Content-Type" => "text/plain" },
+      Tailer.new(self, addr, env) ]
+  end
+  # :startdoc:
+
+  # This is the response body returned for "/tail/$ADDRESS.txt".  This
+  # must use a multi-threaded Rack server with streaming response support.
+  # It is an internal class and not expected to be used directly
+  class Tailer
+    def initialize(rdmon, addr, env) # :nodoc:
+      @rdmon = rdmon
+      @addr = addr
+      q = Rack::Utils.parse_query env["QUERY_STRING"]
+      @active_min = q["active_min"].to_i
+      @queued_min = q["queued_min"].to_i
+      len = Rack::Utils.bytesize(addr)
+      len = 35 if len > 35
+      @fmt = "%20s % #{len}s % 10u % 10u\n"
+    end
+
+    # called by the Rack server
+    def each # :nodoc:
+      begin
+        time, all = @rdmon.wait_snapshot
+        stats = all[@addr] or next
+        stats.queued >= @queued_min or next
+        stats.active >= @active_min or next
+        body = sprintf(@fmt, time.iso8601, @addr, stats.active, stats.queued)
+        yield "#{body.size.to_s(16)}\r\n#{body}\r\n"
+      end while true
+      yield "0\r\n\r\n"
+    end
+  end
+
+  # shuts down the background thread
+  def shutdown
+    @socket = nil
+    @thr.join if @thr
+    @thr = nil
+  end
+end
diff --git a/test/test_watcher.rb b/test/test_watcher.rb
new file mode 100644
index 0000000..f353862
--- /dev/null
+++ b/test/test_watcher.rb
@@ -0,0 +1,85 @@
+# -*- encoding: binary -*-
+require "test/unit"
+require "rack"
+require "raindrops"
+
+class TestWatcher < Test::Unit::TestCase
+  TEST_ADDR = ENV['UNICORN_TEST_ADDR'] || '127.0.0.1'
+  def check_headers(headers)
+    %w(X-Count X-Std-Dev X-Min X-Max X-Mean
+       X-Outliers-Low X-Outliers-Low X-Last-Reset).each { |x|
+      assert_kind_of String, headers[x], "#{x} missing"
+    }
+  end
+
+  def teardown
+    @app.shutdown
+    @ios.each { |io| io.close unless io.closed? }
+  end
+
+  def setup
+    @ios = []
+    @srv = TCPServer.new TEST_ADDR, 0
+    @ios << @srv
+    @port = @srv.addr[1]
+    @client = TCPSocket.new TEST_ADDR, @port
+    @addr = "#{TEST_ADDR}:#{@port}"
+    @ios << @client
+    @app = Raindrops::Watcher.new :delay => 0.001
+    @req = Rack::MockRequest.new @app
+  end
+
+  def test_index
+    resp = @req.get "/"
+    assert_equal 200, resp.status.to_i
+    t = Time.parse resp.headers["Last-Modified"]
+    assert_in_delta Time.now.to_f, t.to_f, 2.0
+  end
+
+  def test_active_txt
+    resp = @req.get "/active/#@addr.txt"
+    assert_equal 200, resp.status.to_i
+    assert_equal "text/plain", resp.headers["Content-Type"]
+    check_headers(resp.headers)
+  end
+
+  def test_active_html
+    resp = @req.get "/active/#@addr.html"
+    assert_equal 200, resp.status.to_i
+    assert_equal "text/html", resp.headers["Content-Type"]
+    check_headers(resp.headers)
+  end
+
+  def test_reset
+    resp = @req.post "/reset/#@addr"
+    assert_equal 302, resp.status.to_i
+  end
+
+  def test_tail
+    env = @req.class.env_for "/tail/#@addr.txt"
+    status, headers, body = @app.call env
+    assert_equal "text/plain", headers["Content-Type"]
+    assert_equal 200, status.to_i
+    tmp = []
+    body.each do |x|
+      assert_kind_of String, x
+      tmp << x
+      break if tmp.size > 1
+    end
+  end
+
+  def test_tail_queued_min
+    env = @req.class.env_for "/tail/#@addr.txt?queued_min=1"
+    status, headers, body = @app.call env
+    assert_equal "text/plain", headers["Content-Type"]
+    assert_equal 200, status.to_i
+    tmp = []
+    body.each do |x|
+      tmp = TCPSocket.new TEST_ADDR, @port
+      @ios << tmp
+      assert_kind_of String, x
+      assert_equal 1, x.strip.split(/\s+/).last.to_i
+      break
+    end
+  end
+end if RUBY_PLATFORM =~ /linux/