From 7b01d94dd9287ac402d91451f1e93c9faaf913c4 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Sun, 18 Oct 2009 15:59:29 -0700 Subject: rev: async response bodies with DevFdResponse middleware This new middleware should be a no-op for non-Rev concurrency models (or by explicitly setting env['rainbows.autochunk'] to false). Setting env['rainbows.autochunk'] to true (the default when Rev is used) allows (e)poll-able IO objects (sockets, pipes) to be sent asynchronously after app.call(env) returns. This also has a fortunate side effect of introducing a code path which allows large, static files to be sent without slurping them into a Rev IO::Buffer, too. This new change works even without the DevFdResponse middleware, so you won't have to reconfigure your app. This lets us epoll on response bodies that come in from a pipe or even a socket and send them either straight through or with chunked encoding. --- lib/rainbows.rb | 1 + lib/rainbows/const.rb | 4 + lib/rainbows/dev_fd_response.rb | 69 +++++++++++++++++ lib/rainbows/http_server.rb | 1 + lib/rainbows/rev.rb | 89 +++++++++++++++++++++- t/async-response-no-autochunk.ru | 24 ++++++ t/async-response.ru | 13 ++++ t/large-file-response.ru | 13 ++++ t/lib-async-response-no-autochunk.sh | 6 ++ t/lib-async-response.sh | 45 +++++++++++ t/lib-large-file-response.sh | 45 +++++++++++ t/t1004-thread-pool-async-response.sh | 1 + t/t1005-thread-pool-large-file-response.sh | 1 + t/t1006-thread-pool-async-response-no-autochunk.sh | 1 + t/t2004-thread-spawn-async-response.sh | 1 + t/t2005-thread-spawn-large-file-response.sh | 1 + ...006-thread-spawn-async-response-no-autochunk.sh | 1 + t/t3004-revactor-async-response.sh | 1 + t/t3005-revactor-large-file-response.sh | 2 + t/t3006-revactor-async-response-no-autochunk.sh | 1 + t/t4004-rev-async-response.sh | 1 + t/t4005-rev-large-file-response.sh | 2 + t/t4006-rev-async-response-no-autochunk.sh | 1 + 23 files changed, 321 insertions(+), 3 deletions(-) create mode 100644 lib/rainbows/dev_fd_response.rb create mode 100644 t/async-response-no-autochunk.ru create mode 100644 t/async-response.ru create mode 100644 t/large-file-response.ru create mode 100644 t/lib-async-response-no-autochunk.sh create mode 100644 t/lib-async-response.sh create mode 100644 t/lib-large-file-response.sh create mode 120000 t/t1004-thread-pool-async-response.sh create mode 120000 t/t1005-thread-pool-large-file-response.sh create mode 120000 t/t1006-thread-pool-async-response-no-autochunk.sh create mode 120000 t/t2004-thread-spawn-async-response.sh create mode 120000 t/t2005-thread-spawn-large-file-response.sh create mode 120000 t/t2006-thread-spawn-async-response-no-autochunk.sh create mode 120000 t/t3004-revactor-async-response.sh create mode 100755 t/t3005-revactor-large-file-response.sh create mode 120000 t/t3006-revactor-async-response-no-autochunk.sh create mode 120000 t/t4004-rev-async-response.sh create mode 100755 t/t4005-rev-large-file-response.sh create mode 120000 t/t4006-rev-async-response-no-autochunk.sh diff --git a/lib/rainbows.rb b/lib/rainbows.rb index 096f700..aa58fab 100644 --- a/lib/rainbows.rb +++ b/lib/rainbows.rb @@ -14,6 +14,7 @@ module Rainbows require 'rainbows/http_response' require 'rainbows/base' autoload :AppPool, 'rainbows/app_pool' + autoload :DevFdResponse, 'rainbows/dev_fd_response' class << self diff --git a/lib/rainbows/const.rb b/lib/rainbows/const.rb index 417a5de..403a18a 100644 --- a/lib/rainbows/const.rb +++ b/lib/rainbows/const.rb @@ -9,6 +9,10 @@ module Rainbows RACK_DEFAULTS = ::Unicorn::HttpRequest::DEFAULTS.merge({ "SERVER_SOFTWARE" => "Rainbows! #{RAINBOWS_VERSION}", + + # using the Rev model, we'll automatically chunk pipe and socket objects + # if they're the response body + 'rainbows.autochunk' => false, }) CONN_CLOSE = "Connection: close\r\n" diff --git a/lib/rainbows/dev_fd_response.rb b/lib/rainbows/dev_fd_response.rb new file mode 100644 index 0000000..e4e5f0c --- /dev/null +++ b/lib/rainbows/dev_fd_response.rb @@ -0,0 +1,69 @@ +# -*- encoding: binary -*- + +module Rainbows + + # Rack response middleware wrapping any IO-like object with an + # OS-level file descriptor associated with it. May also be used to + # create responses from integer file descriptors or existing +IO+ + # objects. This may be used in conjunction with the #to_path method + # on servers that support it to pass arbitrary file descriptors into + # the HTTP response without additional open(2) syscalls + + class DevFdResponse < Struct.new(:app, :to_io, :to_path) + include Rack::Utils + + # Rack middleware entry point, we'll just pass through responses + # unless they respond to +to_io+ or +to_path+ + def call(env) + status, headers, body = response = app.call(env) + + # totally uninteresting to us if there's no body + return response if STATUS_WITH_NO_ENTITY_BODY.include?(status) + + io = body.to_io if body.respond_to?(:to_io) + io ||= File.open(body.to_path, 'rb') if body.respond_to?(:to_path) + return response if io.nil? + + headers = HeaderHash.new(headers) + st = io.stat + if st.file? + headers['Content-Length'] ||= st.size.to_s + headers.delete('Transfer-Encoding') + elsif st.pipe? || st.socket? # epoll-able things + if env['rainbows.autochunk'] + headers['Transfer-Encoding'] = 'chunked' + headers.delete('Content-Length') + else + headers['X-Rainbows-Autochunk'] = 'no' + end + else # unlikely, char/block device file, directory, ... + return response + end + resp = dup # be reentrant here + resp.to_path = "/dev/fd/#{io.fileno}" + resp.to_io = io + [ status, headers.to_hash, resp ] + end + + # called by the webserver or other middlewares if they can't + # handle #to_path + def each(&block) + to_io.each(&block) + end + + # remain Rack::Lint-compatible for people with wonky systems :P + unless File.exist?("/dev/fd/0") + alias to_path_orig to_path + undef_method :to_path + end + + # called by the web server after #each + def close + begin + to_io.close if to_io.respond_to?(:close) + rescue IOError # could've been IO::new()'ed and closed + end + end + + end # class +end diff --git a/lib/rainbows/http_server.rb b/lib/rainbows/http_server.rb index 6d61228..5521513 100644 --- a/lib/rainbows/http_server.rb +++ b/lib/rainbows/http_server.rb @@ -33,6 +33,7 @@ module Rainbows extend(mod) Const::RACK_DEFAULTS['rainbows.model'] = @use = model Const::RACK_DEFAULTS['rack.multithread'] = !!(/Thread/ =~ model.to_s) + Const::RACK_DEFAULTS['rainbows.autochunk'] = (model.to_s == "Rev") end def worker_connections(*args) diff --git a/lib/rainbows/rev.rb b/lib/rainbows/rev.rb index fd25200..c73228a 100644 --- a/lib/rainbows/rev.rb +++ b/lib/rainbows/rev.rb @@ -40,6 +40,12 @@ module Rainbows include Rainbows::Const G = Rainbows::G + # queued, optional response bodies, it should only be unpollable "fast" + # devices where read(2) is uninterruptable. Unfortunately, NFS and ilk + # are also part of this. We'll also stick AsyncResponse bodies in + # here to prevent connections from being closed on us. + attr_reader :deferred_bodies + def initialize(io) G.cur += 1 super(io) @@ -48,10 +54,17 @@ module Rainbows @hp = HttpParser.new @state = :headers # [ :body [ :trailers ] ] :app_call :close @buf = "" + @deferred_bodies = [] # for (fast) regular files only end - def handle_error(e) + # graceful exit, like SIGQUIT + def quit + @deferred_bodies.clear @state = :close + end + + def handle_error(e) + quit msg = case e when EOFError,Errno::ECONNRESET,Errno::EPIPE,Errno::EINVAL,Errno::EBADF ERROR_500_RESPONSE @@ -73,7 +86,12 @@ module Rainbows response = G.app.call(@env.update(RACK_DEFAULTS)) alive &&= G.alive out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if @hp.headers? - HttpResponse.write(self, response, out) + + if response.last.respond_to?(:to_path) + AsyncResponse.new(self, response, out) + else + HttpResponse.write(self, response, out) + end if alive @env.clear @hp.reset @@ -88,7 +106,21 @@ module Rainbows end def on_write_complete - :close == @state and close + if body = @deferred_bodies.first + return if AsyncResponse === body + begin + begin + write(body.sysread(CHUNK_SIZE)) + rescue EOFError # expected at file EOF + @deferred_bodies.shift + body.close + end + rescue Object => e + handle_error(e) + end + else + close if :close == @state + end end def on_close @@ -156,6 +188,57 @@ module Rainbows end + class AsyncResponse < ::Rev::IO + include Unicorn + include Rainbows::Const + G = Rainbows::G + + def initialize(client, response, out) + @client = client + @body = response.last # have to consider response being frozen + + # to_io is not part of the Rack spec, but make an exception + # here since we can't get here without checking to_path first + io = @body.to_io if @body.respond_to?(:to_io) + io ||= ::IO.new($1.to_i) if @body.to_path =~ %r{\A/dev/fd/(\d+)\z} + io ||= File.open(@body.to_path, 'rb') # could be a FIFO + + headers = Rack::Utils::HeaderHash.new(response[1]) + @do_chunk = !!(headers['Transfer-Encoding'] =~ %r{\Achunked\z}i) + @do_chunk = false if headers.delete('X-Rainbows-Autochunk') == 'no' + + st = io.stat + if st.socket? || st.pipe? + super(io) + client.deferred_bodies << attach(::Rev::Loop.default) + + # too tricky to support keepalive/pipelining when a response can + # take an indeterminate amount of time here. + out = [ CONN_CLOSE ] if out + elsif st.file? + headers.delete('Transfer-Encoding') + headers['Content-Length'] ||= st.size.to_s + client.deferred_bodies << io + else # char/block device, directory, whatever... nobody cares + return HttpResponse.write(@client, response, out) + end + response = [ response.first, headers.to_hash, [] ] + HttpResponse.write(@client, response, out) + end + + def on_read(data) + @do_chunk and @client.write(sprintf("%x\r\n", data.size)) + @client.write(data) + @do_chunk and @client.write("\r\n") + end + + def on_close + @do_chunk and @client.write("0\r\n\r\n") + @client.quit + @body.respond_to?(:close) and @body.close + end + end + # This timer handles the fchmod heartbeat to prevent our master # from killing us. class Heartbeat < ::Rev::TimerWatcher diff --git a/t/async-response-no-autochunk.ru b/t/async-response-no-autochunk.ru new file mode 100644 index 0000000..67c6403 --- /dev/null +++ b/t/async-response-no-autochunk.ru @@ -0,0 +1,24 @@ +use Rack::Chunked +use Rainbows::DevFdResponse +script = <<-EOF +for i in 0 1 2 3 4 5 6 7 8 9 +do + printf '1\r\n%s\r\n' $i + sleep 1 +done +printf '0\r\n\r\n' +EOF + +run lambda { |env| + env['rainbows.autochunk'] = false + io = IO.popen(script, 'rb') + io.sync = true + [ + 200, + { + 'Content-Type' => 'text/plain', + 'Transfer-Encoding' => 'chunked', + }, + io + ].freeze +} diff --git a/t/async-response.ru b/t/async-response.ru new file mode 100644 index 0000000..ef76504 --- /dev/null +++ b/t/async-response.ru @@ -0,0 +1,13 @@ +use Rack::Chunked +use Rainbows::DevFdResponse +run lambda { |env| + io = IO.popen('for i in 0 1 2 3 4 5 6 7 8 9; do date; sleep 1; done', 'rb') + io.sync = true + [ + 200, + { + 'Content-Type' => 'text/plain', + }, + io + ].freeze +} diff --git a/t/large-file-response.ru b/t/large-file-response.ru new file mode 100644 index 0000000..90dc6c5 --- /dev/null +++ b/t/large-file-response.ru @@ -0,0 +1,13 @@ +# lib-large-file-response will stop running if we're not on Linux here +use Rack::ContentLength +use Rack::ContentType +map "/rss" do + run lambda { |env| + # on Linux, this is in kilobytes + ::File.read("/proc/self/status") =~ /^VmRSS:\s+(\d+)/ + [ 200, {}, [ ($1.to_i * 1024).to_s ] ] + } +end +map "/" do + run Rack::File.new(Dir.pwd) +end diff --git a/t/lib-async-response-no-autochunk.sh b/t/lib-async-response-no-autochunk.sh new file mode 100644 index 0000000..66be85e --- /dev/null +++ b/t/lib-async-response-no-autochunk.sh @@ -0,0 +1,6 @@ +#!/bin/sh +CONFIG_RU=async-response-no-autochunk.ru +. ./lib-async-response.sh +test x"$(cat $a)" = x0123456789 +test x"$(cat $b)" = x0123456789 +test x"$(cat $c)" = x0123456789 diff --git a/t/lib-async-response.sh b/t/lib-async-response.sh new file mode 100644 index 0000000..925455b --- /dev/null +++ b/t/lib-async-response.sh @@ -0,0 +1,45 @@ +CONFIG_RU=${CONFIG_RU-'async-response.ru'} +. ./test-lib.sh +echo "async response for model=$model" +eval $(unused_listen) +rtmpfiles unicorn_config a b c r_err r_out pid curl_err + +cat > $unicorn_config <> $curl_err | utee $a) & +( curl --no-buffer -sSf http://$listen/ 2>> $curl_err | utee $b) & +( curl --no-buffer -sSf http://$listen/ 2>> $curl_err | utee $c) & +wait +t1=$(date +%s) + +rainbows_pid=$(cat $pid) +kill -QUIT $rainbows_pid +elapsed=$(( $t1 - $t0 )) +echo "elapsed=$elapsed < 30" +test $elapsed -lt 30 + +dbgcat a +dbgcat b +dbgcat c +dbgcat r_err +dbgcat curl_err +test ! -s $curl_err +grep Error $r_err && die "errors in $r_err" + +while kill -0 $rainbows_pid >/dev/null 2>&1 +do + sleep 1 +done + +dbgcat r_err diff --git a/t/lib-large-file-response.sh b/t/lib-large-file-response.sh new file mode 100644 index 0000000..830812a --- /dev/null +++ b/t/lib-large-file-response.sh @@ -0,0 +1,45 @@ +. ./test-lib.sh +test -r random_blob || die "random_blob required, run with 'make $0'" +if ! grep -v ^VmRSS: /proc/self/status >/dev/null 2>&1 +then + echo >&2 "skipping, can't read RSS from /proc/self/status" + exit 0 +fi +echo "large file response slurp avoidance for model=$model" +eval $(unused_listen) +rtmpfiles unicorn_config tmp r_err r_out pid ok + +cat > $unicorn_config < $ok) | wc -c) + test $size -eq $random_blob_size + test xok = x$(cat $ok) +done + +dbgcat r_err +curl -v http://$listen/rss +rss_after=$(curl -sSfv http://$listen/rss) +echo "rss_after=$rss_after" +diff=$(( $rss_after - $rss_before )) +echo "test diff=$diff < orig=$random_blob_size" +kill -QUIT $(cat $pid) +test $diff -le $random_blob_size +dbgcat r_err diff --git a/t/t1004-thread-pool-async-response.sh b/t/t1004-thread-pool-async-response.sh new file mode 120000 index 0000000..15c27db --- /dev/null +++ b/t/t1004-thread-pool-async-response.sh @@ -0,0 +1 @@ +lib-async-response.sh \ No newline at end of file diff --git a/t/t1005-thread-pool-large-file-response.sh b/t/t1005-thread-pool-large-file-response.sh new file mode 120000 index 0000000..37d2877 --- /dev/null +++ b/t/t1005-thread-pool-large-file-response.sh @@ -0,0 +1 @@ +lib-large-file-response.sh \ No newline at end of file diff --git a/t/t1006-thread-pool-async-response-no-autochunk.sh b/t/t1006-thread-pool-async-response-no-autochunk.sh new file mode 120000 index 0000000..bb87ca9 --- /dev/null +++ b/t/t1006-thread-pool-async-response-no-autochunk.sh @@ -0,0 +1 @@ +lib-async-response-no-autochunk.sh \ No newline at end of file diff --git a/t/t2004-thread-spawn-async-response.sh b/t/t2004-thread-spawn-async-response.sh new file mode 120000 index 0000000..15c27db --- /dev/null +++ b/t/t2004-thread-spawn-async-response.sh @@ -0,0 +1 @@ +lib-async-response.sh \ No newline at end of file diff --git a/t/t2005-thread-spawn-large-file-response.sh b/t/t2005-thread-spawn-large-file-response.sh new file mode 120000 index 0000000..37d2877 --- /dev/null +++ b/t/t2005-thread-spawn-large-file-response.sh @@ -0,0 +1 @@ +lib-large-file-response.sh \ No newline at end of file diff --git a/t/t2006-thread-spawn-async-response-no-autochunk.sh b/t/t2006-thread-spawn-async-response-no-autochunk.sh new file mode 120000 index 0000000..bb87ca9 --- /dev/null +++ b/t/t2006-thread-spawn-async-response-no-autochunk.sh @@ -0,0 +1 @@ +lib-async-response-no-autochunk.sh \ No newline at end of file diff --git a/t/t3004-revactor-async-response.sh b/t/t3004-revactor-async-response.sh new file mode 120000 index 0000000..15c27db --- /dev/null +++ b/t/t3004-revactor-async-response.sh @@ -0,0 +1 @@ +lib-async-response.sh \ No newline at end of file diff --git a/t/t3005-revactor-large-file-response.sh b/t/t3005-revactor-large-file-response.sh new file mode 100755 index 0000000..ef1a4a3 --- /dev/null +++ b/t/t3005-revactor-large-file-response.sh @@ -0,0 +1,2 @@ +#!/bin/sh +. ./lib-large-file-response.sh diff --git a/t/t3006-revactor-async-response-no-autochunk.sh b/t/t3006-revactor-async-response-no-autochunk.sh new file mode 120000 index 0000000..bb87ca9 --- /dev/null +++ b/t/t3006-revactor-async-response-no-autochunk.sh @@ -0,0 +1 @@ +lib-async-response-no-autochunk.sh \ No newline at end of file diff --git a/t/t4004-rev-async-response.sh b/t/t4004-rev-async-response.sh new file mode 120000 index 0000000..15c27db --- /dev/null +++ b/t/t4004-rev-async-response.sh @@ -0,0 +1 @@ +lib-async-response.sh \ No newline at end of file diff --git a/t/t4005-rev-large-file-response.sh b/t/t4005-rev-large-file-response.sh new file mode 100755 index 0000000..ef1a4a3 --- /dev/null +++ b/t/t4005-rev-large-file-response.sh @@ -0,0 +1,2 @@ +#!/bin/sh +. ./lib-large-file-response.sh diff --git a/t/t4006-rev-async-response-no-autochunk.sh b/t/t4006-rev-async-response-no-autochunk.sh new file mode 120000 index 0000000..bb87ca9 --- /dev/null +++ b/t/t4006-rev-async-response-no-autochunk.sh @@ -0,0 +1 @@ +lib-async-response-no-autochunk.sh \ No newline at end of file -- cgit v1.2.3-24-ge0c7