about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2009-10-18 15:59:29 -0700
committerEric Wong <normalperson@yhbt.net>2009-10-18 21:25:47 -0700
commit7b01d94dd9287ac402d91451f1e93c9faaf913c4 (patch)
tree8f4005f4e92108748af53b8cbf709522f33419db
parentd0103759ae63b0ed1084f6a9d2b7ede538e8c871 (diff)
downloadrainbows-7b01d94dd9287ac402d91451f1e93c9faaf913c4.tar.gz
This new middleware should be a no-op for non-Rev concurrency
models (or by explicitly setting env['rainbows.autochunk'] to
false).

Setting env['rainbows.autochunk'] to true (the default when Rev
is used) allows (e)poll-able IO objects (sockets, pipes) to be
sent asynchronously after app.call(env) returns.

This also has a fortunate side effect of introducing a code path
which allows large, static files to be sent without slurping
them into a Rev IO::Buffer, too.  This new change works even
without the DevFdResponse middleware, so you won't have to
reconfigure your app.

This lets us epoll on response bodies that come in from a pipe
or even a socket and send them either straight through or with
chunked encoding.
-rw-r--r--lib/rainbows.rb1
-rw-r--r--lib/rainbows/const.rb4
-rw-r--r--lib/rainbows/dev_fd_response.rb69
-rw-r--r--lib/rainbows/http_server.rb1
-rw-r--r--lib/rainbows/rev.rb89
-rw-r--r--t/async-response-no-autochunk.ru24
-rw-r--r--t/async-response.ru13
-rw-r--r--t/large-file-response.ru13
-rw-r--r--t/lib-async-response-no-autochunk.sh6
-rw-r--r--t/lib-async-response.sh45
-rw-r--r--t/lib-large-file-response.sh45
l---------t/t1004-thread-pool-async-response.sh1
l---------t/t1005-thread-pool-large-file-response.sh1
l---------t/t1006-thread-pool-async-response-no-autochunk.sh1
l---------t/t2004-thread-spawn-async-response.sh1
l---------t/t2005-thread-spawn-large-file-response.sh1
l---------t/t2006-thread-spawn-async-response-no-autochunk.sh1
l---------t/t3004-revactor-async-response.sh1
-rwxr-xr-xt/t3005-revactor-large-file-response.sh2
l---------t/t3006-revactor-async-response-no-autochunk.sh1
l---------t/t4004-rev-async-response.sh1
-rwxr-xr-xt/t4005-rev-large-file-response.sh2
l---------t/t4006-rev-async-response-no-autochunk.sh1
23 files changed, 321 insertions, 3 deletions
diff --git a/lib/rainbows.rb b/lib/rainbows.rb
index 096f700..aa58fab 100644
--- a/lib/rainbows.rb
+++ b/lib/rainbows.rb
@@ -14,6 +14,7 @@ module Rainbows
   require 'rainbows/http_response'
   require 'rainbows/base'
   autoload :AppPool, 'rainbows/app_pool'
+  autoload :DevFdResponse, 'rainbows/dev_fd_response'
 
   class << self
 
diff --git a/lib/rainbows/const.rb b/lib/rainbows/const.rb
index 417a5de..403a18a 100644
--- a/lib/rainbows/const.rb
+++ b/lib/rainbows/const.rb
@@ -9,6 +9,10 @@ module Rainbows
 
     RACK_DEFAULTS = ::Unicorn::HttpRequest::DEFAULTS.merge({
       "SERVER_SOFTWARE" => "Rainbows! #{RAINBOWS_VERSION}",
+
+      # using the Rev model, we'll automatically chunk pipe and socket objects
+      # if they're the response body
+      'rainbows.autochunk' => false,
     })
 
     CONN_CLOSE = "Connection: close\r\n"
diff --git a/lib/rainbows/dev_fd_response.rb b/lib/rainbows/dev_fd_response.rb
new file mode 100644
index 0000000..e4e5f0c
--- /dev/null
+++ b/lib/rainbows/dev_fd_response.rb
@@ -0,0 +1,69 @@
+# -*- encoding: binary -*-
+
+module Rainbows
+
+  # Rack response middleware wrapping any IO-like object with an
+  # OS-level file descriptor associated with it.  May also be used to
+  # create responses from integer file descriptors or existing +IO+
+  # objects.  This may be used in conjunction with the #to_path method
+  # on servers that support it to pass arbitrary file descriptors into
+  # the HTTP response without additional open(2) syscalls
+
+  class DevFdResponse < Struct.new(:app, :to_io, :to_path)
+    include Rack::Utils
+
+    # Rack middleware entry point, we'll just pass through responses
+    # unless they respond to +to_io+ or +to_path+
+    def call(env)
+      status, headers, body = response = app.call(env)
+
+      # totally uninteresting to us if there's no body
+      return response if STATUS_WITH_NO_ENTITY_BODY.include?(status)
+
+      io = body.to_io if body.respond_to?(:to_io)
+      io ||= File.open(body.to_path, 'rb') if body.respond_to?(:to_path)
+      return response if io.nil?
+
+      headers = HeaderHash.new(headers)
+      st = io.stat
+      if st.file?
+        headers['Content-Length'] ||= st.size.to_s
+        headers.delete('Transfer-Encoding')
+      elsif st.pipe? || st.socket? # epoll-able things
+        if env['rainbows.autochunk']
+          headers['Transfer-Encoding'] = 'chunked'
+          headers.delete('Content-Length')
+        else
+          headers['X-Rainbows-Autochunk'] = 'no'
+        end
+      else # unlikely, char/block device file, directory, ...
+        return response
+      end
+      resp = dup # be reentrant here
+      resp.to_path = "/dev/fd/#{io.fileno}"
+      resp.to_io = io
+      [ status, headers.to_hash, resp ]
+    end
+
+    # called by the webserver or other middlewares if they can't
+    # handle #to_path
+    def each(&block)
+      to_io.each(&block)
+    end
+
+    # remain Rack::Lint-compatible for people with wonky systems :P
+    unless File.exist?("/dev/fd/0")
+      alias to_path_orig to_path
+      undef_method :to_path
+    end
+
+    # called by the web server after #each
+    def close
+      begin
+        to_io.close if to_io.respond_to?(:close)
+      rescue IOError # could've been IO::new()'ed and closed
+      end
+    end
+
+  end # class
+end
diff --git a/lib/rainbows/http_server.rb b/lib/rainbows/http_server.rb
index 6d61228..5521513 100644
--- a/lib/rainbows/http_server.rb
+++ b/lib/rainbows/http_server.rb
@@ -33,6 +33,7 @@ module Rainbows
       extend(mod)
       Const::RACK_DEFAULTS['rainbows.model'] = @use = model
       Const::RACK_DEFAULTS['rack.multithread'] = !!(/Thread/ =~ model.to_s)
+      Const::RACK_DEFAULTS['rainbows.autochunk'] = (model.to_s == "Rev")
     end
 
     def worker_connections(*args)
diff --git a/lib/rainbows/rev.rb b/lib/rainbows/rev.rb
index fd25200..c73228a 100644
--- a/lib/rainbows/rev.rb
+++ b/lib/rainbows/rev.rb
@@ -40,6 +40,12 @@ module Rainbows
       include Rainbows::Const
       G = Rainbows::G
 
+      # queued, optional response bodies, it should only be unpollable "fast"
+      # devices where read(2) is uninterruptable.  Unfortunately, NFS and ilk
+      # are also part of this.  We'll also stick AsyncResponse bodies in
+      # here to prevent connections from being closed on us.
+      attr_reader :deferred_bodies
+
       def initialize(io)
         G.cur += 1
         super(io)
@@ -48,10 +54,17 @@ module Rainbows
         @hp = HttpParser.new
         @state = :headers # [ :body [ :trailers ] ] :app_call :close
         @buf = ""
+        @deferred_bodies = [] # for (fast) regular files only
       end
 
-      def handle_error(e)
+      # graceful exit, like SIGQUIT
+      def quit
+        @deferred_bodies.clear
         @state = :close
+      end
+
+      def handle_error(e)
+        quit
         msg = case e
         when EOFError,Errno::ECONNRESET,Errno::EPIPE,Errno::EINVAL,Errno::EBADF
           ERROR_500_RESPONSE
@@ -73,7 +86,12 @@ module Rainbows
           response = G.app.call(@env.update(RACK_DEFAULTS))
           alive &&= G.alive
           out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if @hp.headers?
-          HttpResponse.write(self, response, out)
+
+          if response.last.respond_to?(:to_path)
+            AsyncResponse.new(self, response, out)
+          else
+            HttpResponse.write(self, response, out)
+          end
           if alive
             @env.clear
             @hp.reset
@@ -88,7 +106,21 @@ module Rainbows
       end
 
       def on_write_complete
-        :close == @state and close
+        if body = @deferred_bodies.first
+          return if AsyncResponse === body
+          begin
+            begin
+              write(body.sysread(CHUNK_SIZE))
+            rescue EOFError # expected at file EOF
+              @deferred_bodies.shift
+              body.close
+            end
+          rescue Object => e
+            handle_error(e)
+          end
+        else
+          close if :close == @state
+        end
       end
 
       def on_close
@@ -156,6 +188,57 @@ module Rainbows
 
     end
 
+    class AsyncResponse < ::Rev::IO
+      include Unicorn
+      include Rainbows::Const
+      G = Rainbows::G
+
+      def initialize(client, response, out)
+        @client = client
+        @body = response.last # have to consider response being frozen
+
+        # to_io is not part of the Rack spec, but make an exception
+        # here since we can't get here without checking to_path first
+        io = @body.to_io if @body.respond_to?(:to_io)
+        io ||= ::IO.new($1.to_i) if @body.to_path =~ %r{\A/dev/fd/(\d+)\z}
+        io ||= File.open(@body.to_path, 'rb') # could be a FIFO
+
+        headers = Rack::Utils::HeaderHash.new(response[1])
+        @do_chunk = !!(headers['Transfer-Encoding'] =~ %r{\Achunked\z}i)
+        @do_chunk = false if headers.delete('X-Rainbows-Autochunk') == 'no'
+
+        st = io.stat
+        if st.socket? || st.pipe?
+          super(io)
+          client.deferred_bodies << attach(::Rev::Loop.default)
+
+          # too tricky to support keepalive/pipelining when a response can
+          # take an indeterminate amount of time here.
+          out = [ CONN_CLOSE ] if out
+        elsif st.file?
+          headers.delete('Transfer-Encoding')
+          headers['Content-Length'] ||= st.size.to_s
+          client.deferred_bodies << io
+        else # char/block device, directory, whatever... nobody cares
+          return HttpResponse.write(@client, response, out)
+        end
+        response = [ response.first, headers.to_hash, [] ]
+        HttpResponse.write(@client, response, out)
+      end
+
+      def on_read(data)
+        @do_chunk and @client.write(sprintf("%x\r\n", data.size))
+        @client.write(data)
+        @do_chunk and @client.write("\r\n")
+      end
+
+      def on_close
+        @do_chunk and @client.write("0\r\n\r\n")
+        @client.quit
+        @body.respond_to?(:close) and @body.close
+      end
+    end
+
     # This timer handles the fchmod heartbeat to prevent our master
     # from killing us.
     class Heartbeat < ::Rev::TimerWatcher
diff --git a/t/async-response-no-autochunk.ru b/t/async-response-no-autochunk.ru
new file mode 100644
index 0000000..67c6403
--- /dev/null
+++ b/t/async-response-no-autochunk.ru
@@ -0,0 +1,24 @@
+use Rack::Chunked
+use Rainbows::DevFdResponse
+script = <<-EOF
+for i in 0 1 2 3 4 5 6 7 8 9
+do
+        printf '1\r\n%s\r\n' $i
+        sleep 1
+done
+printf '0\r\n\r\n'
+EOF
+
+run lambda { |env|
+  env['rainbows.autochunk'] = false
+  io = IO.popen(script, 'rb')
+  io.sync = true
+  [
+    200,
+    {
+      'Content-Type' => 'text/plain',
+      'Transfer-Encoding' => 'chunked',
+    },
+    io
+  ].freeze
+}
diff --git a/t/async-response.ru b/t/async-response.ru
new file mode 100644
index 0000000..ef76504
--- /dev/null
+++ b/t/async-response.ru
@@ -0,0 +1,13 @@
+use Rack::Chunked
+use Rainbows::DevFdResponse
+run lambda { |env|
+  io = IO.popen('for i in 0 1 2 3 4 5 6 7 8 9; do date; sleep 1; done', 'rb')
+  io.sync = true
+  [
+    200,
+    {
+      'Content-Type' => 'text/plain',
+    },
+    io
+  ].freeze
+}
diff --git a/t/large-file-response.ru b/t/large-file-response.ru
new file mode 100644
index 0000000..90dc6c5
--- /dev/null
+++ b/t/large-file-response.ru
@@ -0,0 +1,13 @@
+# lib-large-file-response will stop running if we're not on Linux here
+use Rack::ContentLength
+use Rack::ContentType
+map "/rss" do
+  run lambda { |env|
+    # on Linux, this is in kilobytes
+    ::File.read("/proc/self/status") =~ /^VmRSS:\s+(\d+)/
+    [ 200, {}, [ ($1.to_i * 1024).to_s ] ]
+  }
+end
+map "/" do
+  run Rack::File.new(Dir.pwd)
+end
diff --git a/t/lib-async-response-no-autochunk.sh b/t/lib-async-response-no-autochunk.sh
new file mode 100644
index 0000000..66be85e
--- /dev/null
+++ b/t/lib-async-response-no-autochunk.sh
@@ -0,0 +1,6 @@
+#!/bin/sh
+CONFIG_RU=async-response-no-autochunk.ru
+. ./lib-async-response.sh
+test x"$(cat $a)" = x0123456789
+test x"$(cat $b)" = x0123456789
+test x"$(cat $c)" = x0123456789
diff --git a/t/lib-async-response.sh b/t/lib-async-response.sh
new file mode 100644
index 0000000..925455b
--- /dev/null
+++ b/t/lib-async-response.sh
@@ -0,0 +1,45 @@
+CONFIG_RU=${CONFIG_RU-'async-response.ru'}
+. ./test-lib.sh
+echo "async response for model=$model"
+eval $(unused_listen)
+rtmpfiles unicorn_config a b c r_err r_out pid curl_err
+
+cat > $unicorn_config <<EOF
+listen "$listen"
+stderr_path "$r_err"
+stdout_path "$r_out"
+pid "$pid"
+Rainbows! { use :$model }
+EOF
+
+# can't load Rack::Lint here since it'll cause Rev to slurp
+rainbows -E none -D $CONFIG_RU -c $unicorn_config
+wait_for_pid $pid
+
+t0=$(date +%s)
+( curl --no-buffer -sSf http://$listen/ 2>> $curl_err | utee $a) &
+( curl --no-buffer -sSf http://$listen/ 2>> $curl_err | utee $b) &
+( curl --no-buffer -sSf http://$listen/ 2>> $curl_err | utee $c) &
+wait
+t1=$(date +%s)
+
+rainbows_pid=$(cat $pid)
+kill -QUIT $rainbows_pid
+elapsed=$(( $t1 - $t0 ))
+echo "elapsed=$elapsed < 30"
+test $elapsed -lt 30
+
+dbgcat a
+dbgcat b
+dbgcat c
+dbgcat r_err
+dbgcat curl_err
+test ! -s $curl_err
+grep Error $r_err && die "errors in $r_err"
+
+while kill -0 $rainbows_pid >/dev/null 2>&1
+do
+        sleep 1
+done
+
+dbgcat r_err
diff --git a/t/lib-large-file-response.sh b/t/lib-large-file-response.sh
new file mode 100644
index 0000000..830812a
--- /dev/null
+++ b/t/lib-large-file-response.sh
@@ -0,0 +1,45 @@
+. ./test-lib.sh
+test -r random_blob || die "random_blob required, run with 'make $0'"
+if ! grep -v ^VmRSS: /proc/self/status >/dev/null 2>&1
+then
+        echo >&2 "skipping, can't read RSS from /proc/self/status"
+        exit 0
+fi
+echo "large file response slurp avoidance for model=$model"
+eval $(unused_listen)
+rtmpfiles unicorn_config tmp r_err r_out pid ok
+
+cat > $unicorn_config <<EOF
+listen "$listen"
+stderr_path "$r_err"
+stdout_path "$r_out"
+pid "$pid"
+Rainbows! { use :$model }
+EOF
+
+# can't load Rack::Lint here since it'll cause Rev to slurp
+rainbows -E none -D large-file-response.ru -c $unicorn_config
+wait_for_pid $pid
+
+random_blob_size=$(wc -c < random_blob)
+curl -v http://$listen/rss
+dbgcat r_err
+rss_before=$(curl -sSfv http://$listen/rss)
+echo "rss_before=$rss_before"
+
+for i in a b c
+do
+        size=$( (curl -sSfv http://$listen/random_blob && echo ok > $ok) | wc -c)
+        test $size -eq $random_blob_size
+        test xok = x$(cat $ok)
+done
+
+dbgcat r_err
+curl -v http://$listen/rss
+rss_after=$(curl -sSfv http://$listen/rss)
+echo "rss_after=$rss_after"
+diff=$(( $rss_after - $rss_before ))
+echo "test diff=$diff < orig=$random_blob_size"
+kill -QUIT $(cat $pid)
+test $diff -le $random_blob_size
+dbgcat r_err
diff --git a/t/t1004-thread-pool-async-response.sh b/t/t1004-thread-pool-async-response.sh
new file mode 120000
index 0000000..15c27db
--- /dev/null
+++ b/t/t1004-thread-pool-async-response.sh
@@ -0,0 +1 @@
+lib-async-response.sh \ No newline at end of file
diff --git a/t/t1005-thread-pool-large-file-response.sh b/t/t1005-thread-pool-large-file-response.sh
new file mode 120000
index 0000000..37d2877
--- /dev/null
+++ b/t/t1005-thread-pool-large-file-response.sh
@@ -0,0 +1 @@
+lib-large-file-response.sh \ No newline at end of file
diff --git a/t/t1006-thread-pool-async-response-no-autochunk.sh b/t/t1006-thread-pool-async-response-no-autochunk.sh
new file mode 120000
index 0000000..bb87ca9
--- /dev/null
+++ b/t/t1006-thread-pool-async-response-no-autochunk.sh
@@ -0,0 +1 @@
+lib-async-response-no-autochunk.sh \ No newline at end of file
diff --git a/t/t2004-thread-spawn-async-response.sh b/t/t2004-thread-spawn-async-response.sh
new file mode 120000
index 0000000..15c27db
--- /dev/null
+++ b/t/t2004-thread-spawn-async-response.sh
@@ -0,0 +1 @@
+lib-async-response.sh \ No newline at end of file
diff --git a/t/t2005-thread-spawn-large-file-response.sh b/t/t2005-thread-spawn-large-file-response.sh
new file mode 120000
index 0000000..37d2877
--- /dev/null
+++ b/t/t2005-thread-spawn-large-file-response.sh
@@ -0,0 +1 @@
+lib-large-file-response.sh \ No newline at end of file
diff --git a/t/t2006-thread-spawn-async-response-no-autochunk.sh b/t/t2006-thread-spawn-async-response-no-autochunk.sh
new file mode 120000
index 0000000..bb87ca9
--- /dev/null
+++ b/t/t2006-thread-spawn-async-response-no-autochunk.sh
@@ -0,0 +1 @@
+lib-async-response-no-autochunk.sh \ No newline at end of file
diff --git a/t/t3004-revactor-async-response.sh b/t/t3004-revactor-async-response.sh
new file mode 120000
index 0000000..15c27db
--- /dev/null
+++ b/t/t3004-revactor-async-response.sh
@@ -0,0 +1 @@
+lib-async-response.sh \ No newline at end of file
diff --git a/t/t3005-revactor-large-file-response.sh b/t/t3005-revactor-large-file-response.sh
new file mode 100755
index 0000000..ef1a4a3
--- /dev/null
+++ b/t/t3005-revactor-large-file-response.sh
@@ -0,0 +1,2 @@
+#!/bin/sh
+. ./lib-large-file-response.sh
diff --git a/t/t3006-revactor-async-response-no-autochunk.sh b/t/t3006-revactor-async-response-no-autochunk.sh
new file mode 120000
index 0000000..bb87ca9
--- /dev/null
+++ b/t/t3006-revactor-async-response-no-autochunk.sh
@@ -0,0 +1 @@
+lib-async-response-no-autochunk.sh \ No newline at end of file
diff --git a/t/t4004-rev-async-response.sh b/t/t4004-rev-async-response.sh
new file mode 120000
index 0000000..15c27db
--- /dev/null
+++ b/t/t4004-rev-async-response.sh
@@ -0,0 +1 @@
+lib-async-response.sh \ No newline at end of file
diff --git a/t/t4005-rev-large-file-response.sh b/t/t4005-rev-large-file-response.sh
new file mode 100755
index 0000000..ef1a4a3
--- /dev/null
+++ b/t/t4005-rev-large-file-response.sh
@@ -0,0 +1,2 @@
+#!/bin/sh
+. ./lib-large-file-response.sh
diff --git a/t/t4006-rev-async-response-no-autochunk.sh b/t/t4006-rev-async-response-no-autochunk.sh
new file mode 120000
index 0000000..bb87ca9
--- /dev/null
+++ b/t/t4006-rev-async-response-no-autochunk.sh
@@ -0,0 +1 @@
+lib-async-response-no-autochunk.sh \ No newline at end of file