about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2009-10-14 17:39:08 -0700
committerEric Wong <normalperson@yhbt.net>2009-10-14 17:39:51 -0700
commit0a47b9209b6c677ad03ad2075f671883ca2b7474 (patch)
tree2c2fc415f527c9f3f8b96912b20a76bbe0420647
parenta42148fe4d62f812bc57418daecdb95f3c4d18cd (diff)
downloadrainbows-0a47b9209b6c677ad03ad2075f671883ca2b7474.tar.gz
There is no TeeInput (streaming request body) support, yet,
as that does not seem fun nor easy to do (or even possible
without using Threads or Fibers or something to save/restore
the stack...)
-rw-r--r--lib/rainbows.rb1
-rw-r--r--lib/rainbows/rev.rb150
-rwxr-xr-xt/bin/content-md5-put36
-rw-r--r--t/content-md5.ru23
-rwxr-xr-xt/t4000-rev-basic.sh51
-rw-r--r--t/t4000.ru3
-rwxr-xr-xt/t4002-rev-graceful.sh52
-rwxr-xr-xt/t4100-rev-rack-input.sh44
-rwxr-xr-xt/t4101-rev-rack-input-trailer.sh51
-rw-r--r--t/test-lib.sh8
10 files changed, 419 insertions, 0 deletions
diff --git a/lib/rainbows.rb b/lib/rainbows.rb
index 94246f5..c2813a5 100644
--- a/lib/rainbows.rb
+++ b/lib/rainbows.rb
@@ -47,6 +47,7 @@ module Rainbows
     :Revactor => 50,
     :ThreadSpawn => 30,
     :ThreadPool => 10,
+    :Rev => 50,
   }.each do |model, _|
     u = model.to_s.gsub(/([a-z0-9])([A-Z0-9])/) { "#{$1}_#{$2.downcase!}" }
     autoload model, "rainbows/#{u.downcase!}"
diff --git a/lib/rainbows/rev.rb b/lib/rainbows/rev.rb
new file mode 100644
index 0000000..8cd76ae
--- /dev/null
+++ b/lib/rainbows/rev.rb
@@ -0,0 +1,150 @@
+# -*- encoding: binary -*-
+require 'rev'
+
+# workaround revactor 0.1.4 still using the old Rev::Buffer
+# ref: http://rubyforge.org/pipermail/revactor-talk/2009-October/000034.html
+defined?(Rev::Buffer) or Rev::Buffer = IO::Buffer
+
+module Rainbows
+
+  module Rev
+
+    # global vars because class/instance variables are confusing me :<
+    # this struct is only accessed inside workers and thus private to each
+    G = Struct.new(:nr, :max, :logger, :alive, :app).new
+
+    include Base
+
+    class Client < ::Rev::IO
+      include Unicorn
+      include Rainbows::Const
+      G = Rainbows::Rev::G
+
+      def initialize(io)
+        G.nr += 1
+        super(io)
+        @remote_addr = ::TCPSocket === io ? io.peeraddr.last : LOCALHOST
+        @env = {}
+        @hp = HttpParser.new
+        @state = :headers # [ :body [ :trailers ] ] :app_call :close
+        @buf = ""
+      end
+
+      def handle_error(e)
+        msg = case e
+        when EOFError,Errno::ECONNRESET,Errno::EPIPE,Errno::EINVAL,Errno::EBADF
+          ERROR_500_RESPONSE
+        when HttpParserError # try to tell the client they're bad
+          ERROR_400_RESPONSE
+        else
+          G.logger.error "Read error: #{e.inspect}"
+          G.logger.error e.backtrace.join("\n")
+          ERROR_500_RESPONSE
+        end
+        write(msg)
+        ensure
+          @state = :close
+      end
+
+      def app_call
+        @input.rewind
+        @env[RACK_INPUT] = @input
+        @env[REMOTE_ADDR] = @remote_addr
+        response = G.app.call(@env.update(RACK_DEFAULTS))
+        alive = @hp.keepalive? && G.alive
+        out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if @hp.headers?
+        HttpResponse.write(self, response, out)
+        if alive
+          @env.clear
+          @hp.reset
+          @state = :headers
+        else
+          @state = :close
+        end
+        rescue Object => e
+          handle_error(e)
+      end
+
+      def on_write_complete
+        :close == @state and close
+      end
+
+      def on_close
+        G.nr -= 1
+      end
+
+      def tmpio
+        io = Util.tmpio
+        def io.size
+          # already sync=true at creation, so no need to flush before stat
+          stat.size
+        end
+        io
+      end
+
+      # TeeInput doesn't map too well to this right now...
+      def on_read(data)
+        case @state
+        when :headers
+          @hp.headers(@env, @buf << data) or return
+          @state = :body
+          len = @hp.content_length
+          if len == 0
+            @input = HttpRequest::NULL_IO
+            app_call # common case
+          else # nil or len > 0
+            # since we don't do streaming input, we have no choice but
+            # to take over 100-continue handling from the Rack application
+            if @env[HTTP_EXPECT] =~ /\A100-continue\z/i
+              write(EXPECT_100_RESPONSE)
+              @env.delete(HTTP_EXPECT)
+            end
+            @input = len && len <= MAX_BODY ? StringIO.new("") : tmpio
+            @hp.filter_body(@buf2 = @buf.dup, @buf)
+            @input << @buf2
+            on_read("")
+          end
+        when :body
+          if @hp.body_eof?
+            @state = :trailers
+            on_read(data)
+          elsif data.size > 0
+            @hp.filter_body(@buf2, @buf << data)
+            @input << @buf2
+            on_read("")
+          end
+        when :trailers
+          @hp.trailers(@env, @buf << data) and app_call
+        end
+      end
+    end
+
+    class Server < ::Rev::IO
+      G = Rainbows::Rev::G
+
+      def on_readable
+        return if G.nr >= G.max
+        begin
+          Client.new(@_io.accept_nonblock).attach(::Rev::Loop.default)
+        rescue Errno::EAGAIN, Errno::ECONNBORTED
+        end
+      end
+
+    end
+
+    # runs inside each forked worker, this sits around and waits
+    # for connections and doesn't die until the parent dies (or is
+    # given a INT, QUIT, or TERM signal)
+    def worker_loop(worker)
+      init_worker_process(worker)
+      G.nr = 0
+      G.max = worker_connections
+      G.alive = true
+      G.logger = logger
+      G.app = app
+      LISTENERS.map! { |s| Server.new(s).attach(::Rev::Loop.default) }
+      ::Rev::Loop.default.run
+    end
+
+  end
+end
diff --git a/t/bin/content-md5-put b/t/bin/content-md5-put
new file mode 100755
index 0000000..c35c92c
--- /dev/null
+++ b/t/bin/content-md5-put
@@ -0,0 +1,36 @@
+#!/usr/bin/env ruby
+# simple chunked HTTP PUT request generator (and just that),
+# it reads stdin and writes to stdout so socat can write to a
+# UNIX or TCP socket (or to another filter or file) along with
+# a Content-MD5 trailer.
+# -*- encoding: binary -*-
+require 'digest/md5'
+$stdout.sync = $stderr.sync = true
+$stdout.binmode
+$stdin.binmode
+
+bs = ENV['bs'] ? ENV['bs'].to_i : 4096
+
+if ARGV.grep("--no-headers").empty?
+  $stdout.write(
+      "PUT / HTTP/1.1\r\n" \
+      "Host: example.com\r\n" \
+      "Transfer-Encoding: chunked\r\n" \
+      "Trailer: Content-MD5\r\n" \
+      "\r\n"
+    )
+end
+
+digest = Digest::MD5.new
+if buf = $stdin.read(bs)
+  begin
+    digest.update(buf)
+    $stdout.write("%x\r\n" % [ buf.size ])
+    $stdout.write(buf)
+    $stdout.write("\r\n")
+  end while $stdin.read(bs, buf)
+end
+
+digest = [ digest.digest ].pack('m').strip
+$stdout.write("0\r\n")
+$stdout.write("Content-MD5: #{digest}\r\n\r\n")
diff --git a/t/content-md5.ru b/t/content-md5.ru
new file mode 100644
index 0000000..e3ce4d3
--- /dev/null
+++ b/t/content-md5.ru
@@ -0,0 +1,23 @@
+# SHA1 checksum generator
+bs = ENV['bs'] ? ENV['bs'].to_i : 4096
+require 'digest/md5'
+use Rack::ContentLength
+app = lambda do |env|
+  /\A100-continue\z/i =~ env['HTTP_EXPECT'] and
+    return [ 100, {}, [] ]
+  digest = Digest::MD5.new
+  input = env['rack.input']
+  if buf = input.read(bs)
+    begin
+      digest.update(buf)
+    end while input.read(bs, buf)
+  end
+
+  expect = env['HTTP_CONTENT_MD5']
+  readed = [ digest.digest ].pack('m').strip
+  body = "expect=#{expect}\nreaded=#{readed}\n"
+  status = expect == readed ? 200 : 500
+
+  [ status, {'Content-Type' => 'text/plain'}, [ body ] ]
+end
+run app
diff --git a/t/t4000-rev-basic.sh b/t/t4000-rev-basic.sh
new file mode 100755
index 0000000..e5cfcad
--- /dev/null
+++ b/t/t4000-rev-basic.sh
@@ -0,0 +1,51 @@
+#!/bin/sh
+. ./test-lib.sh
+require_rev
+
+eval $(unused_listen)
+rtmpfiles unicorn_config pid r_err r_out tmp fifo ok
+rm -f $fifo
+mkfifo $fifo
+
+nr_client=30
+
+cat > $unicorn_config <<EOF
+listen "$listen"
+pid "$pid"
+stderr_path "$r_err"
+stdout_path "$r_out"
+Rainbows! do
+  use :Rev
+  worker_connections 50
+end
+EOF
+
+rainbows -D t4000.ru -c $unicorn_config
+wait_for_pid $pid
+
+echo "single request"
+curl -sSfv http://$listen/
+
+echo "two requests with keepalive"
+curl -sSfv http://$listen/a http://$listen/b > $tmp 2>&1
+grep 'Re-using existing connection' < $tmp
+
+echo "pipelining partial requests"
+req='GET / HTTP/1.1\r\nHost: example.com\r\n'
+(
+        printf "$req"'\r\n'"$req"
+        cat $fifo > $tmp &
+        sleep 1
+        printf 'Connection: close\r\n\r\n'
+        wait
+        echo ok > $ok
+) | socat - TCP:$listen > $fifo
+
+kill $(cat $pid)
+
+test 2 -eq $(grep '^HTTP/1.1' $tmp | wc -l)
+test 2 -eq $(grep '^HTTP/1.1 200 OK' $tmp | wc -l)
+test 1 -eq $(grep '^Connection: keep-alive' $tmp | wc -l)
+test 1 -eq $(grep '^Connection: close' $tmp | wc -l)
+test x"$(cat $ok)" = xok
+! grep Error $r_err
diff --git a/t/t4000.ru b/t/t4000.ru
new file mode 100644
index 0000000..c2355da
--- /dev/null
+++ b/t/t4000.ru
@@ -0,0 +1,3 @@
+use Rack::ContentLength
+use Rack::ContentType
+run lambda { |env| [ 200, {}, [ env.inspect << "\n" ] ] }
diff --git a/t/t4002-rev-graceful.sh b/t/t4002-rev-graceful.sh
new file mode 100755
index 0000000..e286886
--- /dev/null
+++ b/t/t4002-rev-graceful.sh
@@ -0,0 +1,52 @@
+#!/bin/sh
+. ./test-lib.sh
+require_rev
+
+eval $(unused_listen)
+rtmpfiles unicorn_config tmp pid r_err r_out out
+nr_client=10
+cat > $unicorn_config <<EOF
+listen "$listen"
+stderr_path "$r_err"
+stdout_path "$r_out"
+pid "$pid"
+Rainbows! do
+  use :Rev
+end
+EOF
+
+rainbows -D sleep.ru -c $unicorn_config
+wait_for_pid $pid
+
+for i in $(awk "BEGIN{for(i=0;i<$nr_client;++i) print i}" </dev/null)
+do
+        (
+                rtmpfiles fifo tmp
+                rm -f $fifo
+                mkfifo $fifo
+                (
+                        printf 'GET /0 HTTP/1.1\r\n'
+                        cat $fifo > $tmp &
+                        sleep 1
+                        printf 'Host: example.com\r\n'
+                        sleep 1
+                        printf 'Connection: close\r\n'
+                        sleep 1
+                        printf '\r\n'
+                        wait
+                ) | socat - TCP:$listen > $fifo
+                fgrep 'Hello' $tmp >> $out || :
+                rm -f $fifo $tmp
+        ) &
+done
+
+sleep 2 # potentially racy :<
+kill -QUIT $(cat $pid)
+wait
+
+test x"$(wc -l < $out)" = x$nr_client
+nr=$(sort < $out | uniq | wc -l)
+test "$nr" -eq 1
+
+test x$(sort < $out | uniq) = xHello
+! grep Error $r_err
diff --git a/t/t4100-rev-rack-input.sh b/t/t4100-rev-rack-input.sh
new file mode 100755
index 0000000..2a37fed
--- /dev/null
+++ b/t/t4100-rev-rack-input.sh
@@ -0,0 +1,44 @@
+#!/bin/sh
+nr_client=${nr_client-25}
+nr=${nr-50}
+
+. ./test-lib.sh
+require_rev
+test -r random_blob || die "random_blob required, run with 'make $0'"
+
+eval $(unused_listen)
+rtmpfiles unicorn_config curl_out curl_err r_err r_out pid
+
+cat > $unicorn_config <<EOF
+listen "$listen"
+pid "$pid"
+stderr_path "$r_err"
+stdout_path "$r_out"
+Rainbows! do
+  use :Rev
+  worker_connections $nr
+end
+EOF
+
+echo pid=$pid
+rainbows -D sha1.ru -c $unicorn_config
+wait_for_pid $pid
+
+start=$(date +%s)
+for i in $(awk "BEGIN{for(i=0;i<$nr_client;++i) print i}" </dev/null)
+do
+        (
+                curl -sSf -T- http://$listen/$i \
+                  < random_blob >> $curl_out 2>> $curl_err
+        ) &
+done
+wait
+echo elapsed=$(( $(date +%s) - $start ))
+
+kill $(cat $pid)
+test $nr_client -eq $(wc -l < $curl_out)
+test 1 -eq $(sort < $curl_out | uniq | wc -l)
+blob_sha1=$( expr "$(sha1sum < random_blob)" : '\([a-f0-9]\+\)')
+echo blob_sha1=$blob_sha1
+test x"$blob_sha1" = x"$(sort < $curl_out | uniq)"
+! grep Error $r_err
diff --git a/t/t4101-rev-rack-input-trailer.sh b/t/t4101-rev-rack-input-trailer.sh
new file mode 100755
index 0000000..9dffc43
--- /dev/null
+++ b/t/t4101-rev-rack-input-trailer.sh
@@ -0,0 +1,51 @@
+#!/bin/sh
+nr_client=${nr_client-25}
+nr=${nr-50}
+
+. ./test-lib.sh
+require_rev
+test -r random_blob || die "random_blob required, run with 'make $0'"
+
+eval $(unused_listen)
+rtmpfiles unicorn_config tmp r_err r_out pid fifo ok
+rm -f $fifo
+mkfifo $fifo
+
+cat > $unicorn_config <<EOF
+listen "$listen"
+pid "$pid"
+stderr_path "$r_err"
+stdout_path "$r_out"
+Rainbows! do
+  use :Rev
+end
+EOF
+
+rainbows -D content-md5.ru -c $unicorn_config
+wait_for_pid $pid
+
+echo "small blob"
+(
+        echo hello world | content-md5-put
+        cat $fifo > $tmp &
+        wait
+        echo ok > $ok
+) | socat - TCP:$listen | tee $fifo
+
+fgrep 'HTTP/1.1 200 OK' $tmp
+test xok = x"$(cat $ok)"
+! grep Error $r_err
+
+
+echo "big blob"
+(
+        content-md5-put < random_blob
+        cat $fifo > $tmp &
+        wait
+        echo ok > $ok
+) | socat - TCP:$listen | tee $fifo
+
+fgrep 'HTTP/1.1 200 OK' $tmp
+test xok = x"$(cat $ok)"
+! grep Error $r_err
+kill $(cat $pid)
diff --git a/t/test-lib.sh b/t/test-lib.sh
index d278329..26adfc9 100644
--- a/t/test-lib.sh
+++ b/t/test-lib.sh
@@ -42,6 +42,14 @@ require_revactor () {
         fi
 }
 
+require_rev() {
+        if ! $ruby -rrev -e "puts Rev::VERSION" >/dev/null 2>&1
+        then
+                echo >&2 "skipping $T since we don't have Rev"
+                exit 0
+        fi
+}
+
 # given a list of variable names, create temporary files and assign
 # the pathnames to those variables
 rtmpfiles () {