From db9162575b885add7c3b7ab06f9c03a2ebc44a1f Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Wed, 16 Mar 2011 15:22:42 -0700 Subject: add Watcher Rack application It does streaming! --- config.ru | 2 + examples/zbatery.config.rb | 6 + lib/raindrops.rb | 1 + lib/raindrops/watcher.rb | 338 +++++++++++++++++++++++++++++++++++++++++++++ test/test_watcher.rb | 85 ++++++++++++ 5 files changed, 432 insertions(+) create mode 100644 config.ru create mode 100644 examples/zbatery.config.rb create mode 100644 lib/raindrops/watcher.rb create mode 100644 test/test_watcher.rb diff --git a/config.ru b/config.ru new file mode 100644 index 0000000..b8f6160 --- /dev/null +++ b/config.ru @@ -0,0 +1,2 @@ +require "raindrops" +run Raindrops::Watcher.new diff --git a/examples/zbatery.config.rb b/examples/zbatery.config.rb new file mode 100644 index 0000000..38aa4b6 --- /dev/null +++ b/examples/zbatery.config.rb @@ -0,0 +1,6 @@ +# Used for running Raindrops::Watcher, which requires a multi-threaded +# Rack server capable of streaming a response. Threads must be used, +# so Zbatery is recommended: http://zbatery.bogomip.org/ +Rainbows! do + use :ThreadSpawn +end diff --git a/lib/raindrops.rb b/lib/raindrops.rb index 073b117..296dc94 100644 --- a/lib/raindrops.rb +++ b/lib/raindrops.rb @@ -43,5 +43,6 @@ class Raindrops autoload :Middleware, 'raindrops/middleware' autoload :Aggregate, 'raindrops/aggregate' autoload :LastDataRecv, 'raindrops/last_data_recv' + autoload :Watcher, 'raindrops/watcher' end require 'raindrops_ext' diff --git a/lib/raindrops/watcher.rb b/lib/raindrops/watcher.rb new file mode 100644 index 0000000..51e1b19 --- /dev/null +++ b/lib/raindrops/watcher.rb @@ -0,0 +1,338 @@ +# -*- encoding: binary -*- +require "thread" +require "time" +require "socket" +require "rack" +require "aggregate" + +# Raindrops::Watcher is a stand-alone Rack application for watching +# any number of TCP and UNIX listeners (all of them by default). +# +# It depends on the {Aggregate RubyGem}[http://rubygems.org/gems/aggregate] +# +# In your Rack config.ru: +# +# run Raindrops::Watcher(options = {}) +# +# It takes the following options hash: +# +# - :listeners - an array of listener names, (e.g. %w(0.0.0.0:80 /tmp/sock)) +# - :delay - interval between stats updates in seconds (default: 1) +# +# Raindrops::Watcher is compatible any thread-safe/thread-aware Rack +# middleware. It does not work well with multi-process web servers +# but can be used to monitor them. It consumes minimal resources +# with the default :delay. +# +# == HTTP endpoints +# +# === GET / +# +# Returns an HTML summary listing of all listen interfaces watched on +# +# === GET /active/$LISTENER.txt +# +# Returns a plain text summary + histogram with X-* HTTP headers for +# active connections. +# +# e.g.: curl http://example.com/active/0.0.0.0%3A80.txt +# +# === GET /active/$LISTENER.html +# +# Returns an HTML summary + histogram with X-* HTTP headers for +# active connections. +# +# e.g.: curl http://example.com/active/0.0.0.0%3A80.html +# +# === GET /queued/$LISTENER.txt +# +# Returns a plain text summary + histogram with X-* HTTP headers for +# queued connections. +# +# e.g.: curl http://example.com/queued/0.0.0.0%3A80.txt +# +# === GET /queued/$LISTENER.html +# +# Returns an HTML summary + histogram with X-* HTTP headers for +# queued connections. +# +# e.g.: curl http://example.com/queued/0.0.0.0%3A80.html +# +# === GET /tail/$LISTENER.txt?active_min=1&queued_min=1 +# +# Streams chunked a response to the client. +# Interval is the preconfigured +:delay+ of the application (default 1 second) +# +# The response is plain text in the following format: +# +# ISO8601_TIMESTAMP LISTENER_NAME ACTIVE_COUNT QUEUED_COUNT LINEFEED +# +# Query parameters: +# +# - active_min - do not stream a line until this active count is reached +# - queued_min - do not stream a line until this queued count is reached +# +# == Response headers (mostly the same as Raindrops::LastDataRecv) +# +# - X-Count - number of requests received +# - X-Last-Reset - date since the last reset +# +# The following headers are only present if X-Count is greater than one. +# +# - X-Min - lowest last_data_recv time recorded (in milliseconds) +# - X-Max - highest last_data_recv time recorded (in milliseconds) +# - X-Mean - mean last_data_recv time recorded (rounded, in milliseconds) +# - X-Std-Dev - standard deviation of last_data_recv times +# - X-Outliers-Low - number of low outliers (hopefully many!) +# - X-Outliers-High - number of high outliers (hopefully zero!) +# +class Raindrops::Watcher + # :stopdoc: + attr_reader :snapshot + include Rack::Utils + include Raindrops::Linux + + def initialize(opts = {}) + @tcp_listeners = @unix_listeners = nil + if l = opts[:listeners] + tcp, unix = [], [] + Array(l).each { |addr| (addr =~ %r{\A/} ? unix : tcp) << addr } + unless tcp.empty? && unix.empty? + @tcp_listeners = tcp + @unix_listeners = unix + end + end + + agg_class = opts[:agg_class] || Aggregate + start = Time.now.utc + @active = Hash.new { |h,k| h[k] = agg_class.new } + @queued = Hash.new { |h,k| h[k] = agg_class.new } + @resets = Hash.new { |h,k| h[k] = start } + @snapshot = [ start, {} ] + @delay = opts[:delay] || 1 + @lock = Mutex.new + @start = Mutex.new + @cond = ConditionVariable.new + @thr = nil + end + + def hostname + Socket.gethostname + end + + # rack endpoint + def call(env) + @start.synchronize { @thr ||= aggregator_thread(env["rack.logger"]) } + case env["REQUEST_METHOD"] + when "HEAD", "GET" + get env + when "POST" + post env + else + Rack::Response.new(["Method Not Allowed"], 405).finish + end + end + + def aggregator_thread(logger) # :nodoc: + @socket = sock = Raindrops::InetDiagSocket.new + thr = Thread.new do + begin + combined = tcp_listener_stats(@tcp_listeners, sock) + combined.merge!(unix_listener_stats(@unix_listeners)) + @lock.synchronize do + combined.each do |addr,stats| + @active[addr] << stats.active + @queued[addr] << stats.queued + end + @snapshot = [ Time.now.utc, combined ] + @cond.broadcast + end + rescue => e + logger.error "#{e.class} #{e.inspect}" + end while sleep(@delay) && @socket + sock.close + end + wait_snapshot + thr + end + + def active_stats(addr) # :nodoc: + @lock.synchronize do + tmp = @active[addr] or return + [ @resets[addr], tmp.dup ] + end + end + + def queued_stats(addr) # :nodoc: + @lock.synchronize do + tmp = @queued[addr] or return + [ @resets[addr], tmp.dup ] + end + end + + def wait_snapshot + @lock.synchronize do + @cond.wait @lock + @snapshot + end + end + + def agg_to_hash(reset_at, agg) + { + "X-Count" => agg.count.to_s, + "X-Min" => agg.min.to_s, + "X-Max" => agg.max.to_s, + "X-Mean" => agg.mean.to_s, + "X-Std-Dev" => agg.std_dev.to_s, + "X-Outliers-Low" => agg.outliers_low.to_s, + "X-Outliers-High" => agg.outliers_high.to_s, + "X-Last-Reset" => reset_at.httpdate, + } + end + + def histogram_txt(agg) + reset_at, agg = *agg + headers = agg_to_hash(reset_at, agg) + body = agg.to_s + headers["Content-Type"] = "text/plain" + headers["Content-Length"] = bytesize(body).to_s + [ 200, headers, [ body ] ] + end + + def histogram_html(agg, addr) + reset_at, agg = *agg + headers = agg_to_hash(reset_at, agg) + body = "" \ + "#{hostname} - #{escape_html addr}" \ + "" << + headers.map { |k,v| + "" + }.join << "
#{k.gsub(/^X-/, '')}#{v}
#{escape_html agg}
" \ + "
" \ + "
" \ + "" + headers["Content-Type"] = "text/html" + headers["Content-Length"] = bytesize(body).to_s + [ 200, headers, [ body ] ] + end + + def get(env) + case env["PATH_INFO"] + when "/" + index + when %r{\A/active/(.+)\.txt\z} + histogram_txt(active_stats(unescape($1))) + when %r{\A/active/(.+)\.html\z} + addr = unescape $1 + histogram_html(active_stats(addr), addr) + when %r{\A/queued/(.+)\.txt\z} + histogram_txt(queued_stats(unescape($1))) + when %r{\A/queued/(.+)\.html\z} + addr = unescape $1 + histogram_html(queued_stats(addr), addr) + when %r{\A/tail/(.+)\.txt\z} + tail(unescape($1), env) + else + not_found + end + end + + def not_found + Rack::Response.new(["Not Found"], 404).finish + end + + def post(env) + case env["PATH_INFO"] + when %r{\A/reset/(.+)\z} + reset!(env, unescape($1)) + else + Rack::Response.new(["Not Found"], 404).finish + end + end + + def reset!(env, addr) + @lock.synchronize do + @active.include?(addr) or return not_found + @active.delete addr + @queued.delete addr + @resets[addr] = Time.now.utc + @cond.wait @lock + end + req = Rack::Request.new(env) + res = Rack::Response.new + url = req.referer || "#{req.host_with_port}/" + res.redirect(url) + res.content_type.replace "text/plain" + res.write "Redirecting to #{url}" + res.finish + end + + def index + updated_at, all = snapshot + headers = { + "Content-Type" => "text/html", + "Last-Modified" => updated_at.httpdate, + } + body = "" \ + "#{hostname} - all interfaces" \ + "

Updated at #{updated_at.iso8601}

" \ + "" \ + "" \ + "" << + all.map do |addr,stats| + e_addr = escape addr + "" \ + "" \ + "" \ + "" \ + "" \ + "" \ + end.join << "
addressactivequeuedreset
#{escape_html addr}#{stats.active}#{stats.queued}
" \ + "
" + headers["Content-Length"] = bytesize(body).to_s + [ 200, headers, [ body ] ] + end + + def tail(addr, env) + [ 200, + { "Transfer-Encoding" => "chunked", "Content-Type" => "text/plain" }, + Tailer.new(self, addr, env) ] + end + # :startdoc: + + # This is the response body returned for "/tail/$ADDRESS.txt". This + # must use a multi-threaded Rack server with streaming response support. + # It is an internal class and not expected to be used directly + class Tailer + def initialize(rdmon, addr, env) # :nodoc: + @rdmon = rdmon + @addr = addr + q = Rack::Utils.parse_query env["QUERY_STRING"] + @active_min = q["active_min"].to_i + @queued_min = q["queued_min"].to_i + len = Rack::Utils.bytesize(addr) + len = 35 if len > 35 + @fmt = "%20s % #{len}s % 10u % 10u\n" + end + + # called by the Rack server + def each # :nodoc: + begin + time, all = @rdmon.wait_snapshot + stats = all[@addr] or next + stats.queued >= @queued_min or next + stats.active >= @active_min or next + body = sprintf(@fmt, time.iso8601, @addr, stats.active, stats.queued) + yield "#{body.size.to_s(16)}\r\n#{body}\r\n" + end while true + yield "0\r\n\r\n" + end + end + + # shuts down the background thread + def shutdown + @socket = nil + @thr.join if @thr + @thr = nil + end +end diff --git a/test/test_watcher.rb b/test/test_watcher.rb new file mode 100644 index 0000000..f353862 --- /dev/null +++ b/test/test_watcher.rb @@ -0,0 +1,85 @@ +# -*- encoding: binary -*- +require "test/unit" +require "rack" +require "raindrops" + +class TestWatcher < Test::Unit::TestCase + TEST_ADDR = ENV['UNICORN_TEST_ADDR'] || '127.0.0.1' + def check_headers(headers) + %w(X-Count X-Std-Dev X-Min X-Max X-Mean + X-Outliers-Low X-Outliers-Low X-Last-Reset).each { |x| + assert_kind_of String, headers[x], "#{x} missing" + } + end + + def teardown + @app.shutdown + @ios.each { |io| io.close unless io.closed? } + end + + def setup + @ios = [] + @srv = TCPServer.new TEST_ADDR, 0 + @ios << @srv + @port = @srv.addr[1] + @client = TCPSocket.new TEST_ADDR, @port + @addr = "#{TEST_ADDR}:#{@port}" + @ios << @client + @app = Raindrops::Watcher.new :delay => 0.001 + @req = Rack::MockRequest.new @app + end + + def test_index + resp = @req.get "/" + assert_equal 200, resp.status.to_i + t = Time.parse resp.headers["Last-Modified"] + assert_in_delta Time.now.to_f, t.to_f, 2.0 + end + + def test_active_txt + resp = @req.get "/active/#@addr.txt" + assert_equal 200, resp.status.to_i + assert_equal "text/plain", resp.headers["Content-Type"] + check_headers(resp.headers) + end + + def test_active_html + resp = @req.get "/active/#@addr.html" + assert_equal 200, resp.status.to_i + assert_equal "text/html", resp.headers["Content-Type"] + check_headers(resp.headers) + end + + def test_reset + resp = @req.post "/reset/#@addr" + assert_equal 302, resp.status.to_i + end + + def test_tail + env = @req.class.env_for "/tail/#@addr.txt" + status, headers, body = @app.call env + assert_equal "text/plain", headers["Content-Type"] + assert_equal 200, status.to_i + tmp = [] + body.each do |x| + assert_kind_of String, x + tmp << x + break if tmp.size > 1 + end + end + + def test_tail_queued_min + env = @req.class.env_for "/tail/#@addr.txt?queued_min=1" + status, headers, body = @app.call env + assert_equal "text/plain", headers["Content-Type"] + assert_equal 200, status.to_i + tmp = [] + body.each do |x| + tmp = TCPSocket.new TEST_ADDR, @port + @ios << tmp + assert_kind_of String, x + assert_equal 1, x.strip.split(/\s+/).last.to_i + break + end + end +end if RUBY_PLATFORM =~ /linux/ -- cgit v1.2.3-24-ge0c7