From 0a47b9209b6c677ad03ad2075f671883ca2b7474 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Wed, 14 Oct 2009 17:39:08 -0700 Subject: preliminary Rev support There is no TeeInput (streaming request body) support, yet, as that does not seem fun nor easy to do (or even possible without using Threads or Fibers or something to save/restore the stack...) --- lib/rainbows.rb | 1 + lib/rainbows/rev.rb | 150 ++++++++++++++++++++++++++++++++++++++ t/bin/content-md5-put | 36 +++++++++ t/content-md5.ru | 23 ++++++ t/t4000-rev-basic.sh | 51 +++++++++++++ t/t4000.ru | 3 + t/t4002-rev-graceful.sh | 52 +++++++++++++ t/t4100-rev-rack-input.sh | 44 +++++++++++ t/t4101-rev-rack-input-trailer.sh | 51 +++++++++++++ t/test-lib.sh | 8 ++ 10 files changed, 419 insertions(+) create mode 100644 lib/rainbows/rev.rb create mode 100755 t/bin/content-md5-put create mode 100644 t/content-md5.ru create mode 100755 t/t4000-rev-basic.sh create mode 100644 t/t4000.ru create mode 100755 t/t4002-rev-graceful.sh create mode 100755 t/t4100-rev-rack-input.sh create mode 100755 t/t4101-rev-rack-input-trailer.sh diff --git a/lib/rainbows.rb b/lib/rainbows.rb index 94246f5..c2813a5 100644 --- a/lib/rainbows.rb +++ b/lib/rainbows.rb @@ -47,6 +47,7 @@ module Rainbows :Revactor => 50, :ThreadSpawn => 30, :ThreadPool => 10, + :Rev => 50, }.each do |model, _| u = model.to_s.gsub(/([a-z0-9])([A-Z0-9])/) { "#{$1}_#{$2.downcase!}" } autoload model, "rainbows/#{u.downcase!}" diff --git a/lib/rainbows/rev.rb b/lib/rainbows/rev.rb new file mode 100644 index 0000000..8cd76ae --- /dev/null +++ b/lib/rainbows/rev.rb @@ -0,0 +1,150 @@ +# -*- encoding: binary -*- +require 'rev' + +# workaround revactor 0.1.4 still using the old Rev::Buffer +# ref: http://rubyforge.org/pipermail/revactor-talk/2009-October/000034.html +defined?(Rev::Buffer) or Rev::Buffer = IO::Buffer + +module Rainbows + + module Rev + + # global vars because class/instance variables are confusing me :< + # this struct is only accessed inside workers and thus private to each + G = Struct.new(:nr, :max, :logger, :alive, :app).new + + include Base + + class Client < ::Rev::IO + include Unicorn + include Rainbows::Const + G = Rainbows::Rev::G + + def initialize(io) + G.nr += 1 + super(io) + @remote_addr = ::TCPSocket === io ? io.peeraddr.last : LOCALHOST + @env = {} + @hp = HttpParser.new + @state = :headers # [ :body [ :trailers ] ] :app_call :close + @buf = "" + end + + def handle_error(e) + msg = case e + when EOFError,Errno::ECONNRESET,Errno::EPIPE,Errno::EINVAL,Errno::EBADF + ERROR_500_RESPONSE + when HttpParserError # try to tell the client they're bad + ERROR_400_RESPONSE + else + G.logger.error "Read error: #{e.inspect}" + G.logger.error e.backtrace.join("\n") + ERROR_500_RESPONSE + end + write(msg) + ensure + @state = :close + end + + def app_call + @input.rewind + @env[RACK_INPUT] = @input + @env[REMOTE_ADDR] = @remote_addr + response = G.app.call(@env.update(RACK_DEFAULTS)) + alive = @hp.keepalive? && G.alive + out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if @hp.headers? + HttpResponse.write(self, response, out) + if alive + @env.clear + @hp.reset + @state = :headers + else + @state = :close + end + rescue Object => e + handle_error(e) + end + + def on_write_complete + :close == @state and close + end + + def on_close + G.nr -= 1 + end + + def tmpio + io = Util.tmpio + def io.size + # already sync=true at creation, so no need to flush before stat + stat.size + end + io + end + + # TeeInput doesn't map too well to this right now... + def on_read(data) + case @state + when :headers + @hp.headers(@env, @buf << data) or return + @state = :body + len = @hp.content_length + if len == 0 + @input = HttpRequest::NULL_IO + app_call # common case + else # nil or len > 0 + # since we don't do streaming input, we have no choice but + # to take over 100-continue handling from the Rack application + if @env[HTTP_EXPECT] =~ /\A100-continue\z/i + write(EXPECT_100_RESPONSE) + @env.delete(HTTP_EXPECT) + end + @input = len && len <= MAX_BODY ? StringIO.new("") : tmpio + @hp.filter_body(@buf2 = @buf.dup, @buf) + @input << @buf2 + on_read("") + end + when :body + if @hp.body_eof? + @state = :trailers + on_read(data) + elsif data.size > 0 + @hp.filter_body(@buf2, @buf << data) + @input << @buf2 + on_read("") + end + when :trailers + @hp.trailers(@env, @buf << data) and app_call + end + end + end + + class Server < ::Rev::IO + G = Rainbows::Rev::G + + def on_readable + return if G.nr >= G.max + begin + Client.new(@_io.accept_nonblock).attach(::Rev::Loop.default) + rescue Errno::EAGAIN, Errno::ECONNBORTED + end + end + + end + + # runs inside each forked worker, this sits around and waits + # for connections and doesn't die until the parent dies (or is + # given a INT, QUIT, or TERM signal) + def worker_loop(worker) + init_worker_process(worker) + G.nr = 0 + G.max = worker_connections + G.alive = true + G.logger = logger + G.app = app + LISTENERS.map! { |s| Server.new(s).attach(::Rev::Loop.default) } + ::Rev::Loop.default.run + end + + end +end diff --git a/t/bin/content-md5-put b/t/bin/content-md5-put new file mode 100755 index 0000000..c35c92c --- /dev/null +++ b/t/bin/content-md5-put @@ -0,0 +1,36 @@ +#!/usr/bin/env ruby +# simple chunked HTTP PUT request generator (and just that), +# it reads stdin and writes to stdout so socat can write to a +# UNIX or TCP socket (or to another filter or file) along with +# a Content-MD5 trailer. +# -*- encoding: binary -*- +require 'digest/md5' +$stdout.sync = $stderr.sync = true +$stdout.binmode +$stdin.binmode + +bs = ENV['bs'] ? ENV['bs'].to_i : 4096 + +if ARGV.grep("--no-headers").empty? + $stdout.write( + "PUT / HTTP/1.1\r\n" \ + "Host: example.com\r\n" \ + "Transfer-Encoding: chunked\r\n" \ + "Trailer: Content-MD5\r\n" \ + "\r\n" + ) +end + +digest = Digest::MD5.new +if buf = $stdin.read(bs) + begin + digest.update(buf) + $stdout.write("%x\r\n" % [ buf.size ]) + $stdout.write(buf) + $stdout.write("\r\n") + end while $stdin.read(bs, buf) +end + +digest = [ digest.digest ].pack('m').strip +$stdout.write("0\r\n") +$stdout.write("Content-MD5: #{digest}\r\n\r\n") diff --git a/t/content-md5.ru b/t/content-md5.ru new file mode 100644 index 0000000..e3ce4d3 --- /dev/null +++ b/t/content-md5.ru @@ -0,0 +1,23 @@ +# SHA1 checksum generator +bs = ENV['bs'] ? ENV['bs'].to_i : 4096 +require 'digest/md5' +use Rack::ContentLength +app = lambda do |env| + /\A100-continue\z/i =~ env['HTTP_EXPECT'] and + return [ 100, {}, [] ] + digest = Digest::MD5.new + input = env['rack.input'] + if buf = input.read(bs) + begin + digest.update(buf) + end while input.read(bs, buf) + end + + expect = env['HTTP_CONTENT_MD5'] + readed = [ digest.digest ].pack('m').strip + body = "expect=#{expect}\nreaded=#{readed}\n" + status = expect == readed ? 200 : 500 + + [ status, {'Content-Type' => 'text/plain'}, [ body ] ] +end +run app diff --git a/t/t4000-rev-basic.sh b/t/t4000-rev-basic.sh new file mode 100755 index 0000000..e5cfcad --- /dev/null +++ b/t/t4000-rev-basic.sh @@ -0,0 +1,51 @@ +#!/bin/sh +. ./test-lib.sh +require_rev + +eval $(unused_listen) +rtmpfiles unicorn_config pid r_err r_out tmp fifo ok +rm -f $fifo +mkfifo $fifo + +nr_client=30 + +cat > $unicorn_config < $tmp 2>&1 +grep 'Re-using existing connection' < $tmp + +echo "pipelining partial requests" +req='GET / HTTP/1.1\r\nHost: example.com\r\n' +( + printf "$req"'\r\n'"$req" + cat $fifo > $tmp & + sleep 1 + printf 'Connection: close\r\n\r\n' + wait + echo ok > $ok +) | socat - TCP:$listen > $fifo + +kill $(cat $pid) + +test 2 -eq $(grep '^HTTP/1.1' $tmp | wc -l) +test 2 -eq $(grep '^HTTP/1.1 200 OK' $tmp | wc -l) +test 1 -eq $(grep '^Connection: keep-alive' $tmp | wc -l) +test 1 -eq $(grep '^Connection: close' $tmp | wc -l) +test x"$(cat $ok)" = xok +! grep Error $r_err diff --git a/t/t4000.ru b/t/t4000.ru new file mode 100644 index 0000000..c2355da --- /dev/null +++ b/t/t4000.ru @@ -0,0 +1,3 @@ +use Rack::ContentLength +use Rack::ContentType +run lambda { |env| [ 200, {}, [ env.inspect << "\n" ] ] } diff --git a/t/t4002-rev-graceful.sh b/t/t4002-rev-graceful.sh new file mode 100755 index 0000000..e286886 --- /dev/null +++ b/t/t4002-rev-graceful.sh @@ -0,0 +1,52 @@ +#!/bin/sh +. ./test-lib.sh +require_rev + +eval $(unused_listen) +rtmpfiles unicorn_config tmp pid r_err r_out out +nr_client=10 +cat > $unicorn_config < $tmp & + sleep 1 + printf 'Host: example.com\r\n' + sleep 1 + printf 'Connection: close\r\n' + sleep 1 + printf '\r\n' + wait + ) | socat - TCP:$listen > $fifo + fgrep 'Hello' $tmp >> $out || : + rm -f $fifo $tmp + ) & +done + +sleep 2 # potentially racy :< +kill -QUIT $(cat $pid) +wait + +test x"$(wc -l < $out)" = x$nr_client +nr=$(sort < $out | uniq | wc -l) +test "$nr" -eq 1 + +test x$(sort < $out | uniq) = xHello +! grep Error $r_err diff --git a/t/t4100-rev-rack-input.sh b/t/t4100-rev-rack-input.sh new file mode 100755 index 0000000..2a37fed --- /dev/null +++ b/t/t4100-rev-rack-input.sh @@ -0,0 +1,44 @@ +#!/bin/sh +nr_client=${nr_client-25} +nr=${nr-50} + +. ./test-lib.sh +require_rev +test -r random_blob || die "random_blob required, run with 'make $0'" + +eval $(unused_listen) +rtmpfiles unicorn_config curl_out curl_err r_err r_out pid + +cat > $unicorn_config <> $curl_out 2>> $curl_err + ) & +done +wait +echo elapsed=$(( $(date +%s) - $start )) + +kill $(cat $pid) +test $nr_client -eq $(wc -l < $curl_out) +test 1 -eq $(sort < $curl_out | uniq | wc -l) +blob_sha1=$( expr "$(sha1sum < random_blob)" : '\([a-f0-9]\+\)') +echo blob_sha1=$blob_sha1 +test x"$blob_sha1" = x"$(sort < $curl_out | uniq)" +! grep Error $r_err diff --git a/t/t4101-rev-rack-input-trailer.sh b/t/t4101-rev-rack-input-trailer.sh new file mode 100755 index 0000000..9dffc43 --- /dev/null +++ b/t/t4101-rev-rack-input-trailer.sh @@ -0,0 +1,51 @@ +#!/bin/sh +nr_client=${nr_client-25} +nr=${nr-50} + +. ./test-lib.sh +require_rev +test -r random_blob || die "random_blob required, run with 'make $0'" + +eval $(unused_listen) +rtmpfiles unicorn_config tmp r_err r_out pid fifo ok +rm -f $fifo +mkfifo $fifo + +cat > $unicorn_config < $tmp & + wait + echo ok > $ok +) | socat - TCP:$listen | tee $fifo + +fgrep 'HTTP/1.1 200 OK' $tmp +test xok = x"$(cat $ok)" +! grep Error $r_err + + +echo "big blob" +( + content-md5-put < random_blob + cat $fifo > $tmp & + wait + echo ok > $ok +) | socat - TCP:$listen | tee $fifo + +fgrep 'HTTP/1.1 200 OK' $tmp +test xok = x"$(cat $ok)" +! grep Error $r_err +kill $(cat $pid) diff --git a/t/test-lib.sh b/t/test-lib.sh index d278329..26adfc9 100644 --- a/t/test-lib.sh +++ b/t/test-lib.sh @@ -42,6 +42,14 @@ require_revactor () { fi } +require_rev() { + if ! $ruby -rrev -e "puts Rev::VERSION" >/dev/null 2>&1 + then + echo >&2 "skipping $T since we don't have Rev" + exit 0 + fi +} + # given a list of variable names, create temporary files and assign # the pathnames to those variables rtmpfiles () { -- cgit v1.2.3-24-ge0c7