From 3495d59763e6159975debf32728dc53fc41c5ea1 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 27 Dec 2010 20:25:39 -0800 Subject: several response body#close fixes Some middlewares require the Rack env to be preserved all the way through to close, so we'll ensure all request models preserve it. We also need to better response body wrappers/proxies always get fired properly when returning. IO.copy_stream and "sendfile" gem users could hit cases where wrappers did not fire properly. --- lib/rainbows.rb | 1 + lib/rainbows/event_machine/client.rb | 27 ++++--- lib/rainbows/event_machine/response_pipe.rb | 3 +- lib/rainbows/fiber/body.rb | 4 +- lib/rainbows/response/body.rb | 37 +++++----- lib/rainbows/rev/client.rb | 1 + lib/rainbows/rev/deferred_response.rb | 2 +- lib/rainbows/revactor/body.rb | 3 + lib/rainbows/sync_close.rb | 37 ++++++++++ lib/rainbows/writer_thread_pool.rb | 8 +- lib/rainbows/writer_thread_spawn.rb | 6 +- t/close-has-env.ru | 65 +++++++++++++++++ t/t0050-response-body-close-has-env.sh | 109 ++++++++++++++++++++++++++++ 13 files changed, 268 insertions(+), 35 deletions(-) create mode 100644 lib/rainbows/sync_close.rb create mode 100644 t/close-has-env.ru create mode 100644 t/t0050-response-body-close-has-env.sh diff --git a/lib/rainbows.rb b/lib/rainbows.rb index dd5a5b2..951c3e5 100644 --- a/lib/rainbows.rb +++ b/lib/rainbows.rb @@ -118,6 +118,7 @@ module Rainbows autoload :HttpResponse, 'rainbows/http_response' # deprecated autoload :ThreadTimeout, 'rainbows/thread_timeout' autoload :WorkerYield, 'rainbows/worker_yield' + autoload :SyncClose, 'rainbows/sync_close' end require 'rainbows/error' diff --git a/lib/rainbows/event_machine/client.rb b/lib/rainbows/event_machine/client.rb index fab1dbc..49552f3 100644 --- a/lib/rainbows/event_machine/client.rb +++ b/lib/rainbows/event_machine/client.rb @@ -16,11 +16,13 @@ class Rainbows::EventMachine::Client < EM::Connection # (often a static file), we do not attempt to process another # request on the same connection until the first is complete if @body - @buf << data - @_io.shutdown(Socket::SHUT_RD) if @buf.size > 0x1c000 - EM.next_tick { receive_data('') } + if data + @buf << data + @_io.shutdown(Socket::SHUT_RD) if @buf.size > 0x1c000 + end + EM.next_tick { receive_data(nil) } unless @buf.empty? else - on_read(data) + on_read(data || "") if (@buf.size > 0) || data end end @@ -43,15 +45,16 @@ class Rainbows::EventMachine::Client < EM::Connection # long-running async response (response.nil? || -1 == response[0]) and return @state = :close - alive = @hp.next? && G.alive && G.kato > 0 - em_write_response(response, alive) - if alive + if @hp.next? && G.alive && G.kato > 0 @state = :headers + em_write_response(response, true) if @buf.empty? set_comm_inactivity_timeout(G.kato) - else - EM.next_tick { receive_data('') } + elsif @body.nil? + EM.next_tick { receive_data(nil) } end + else + em_write_response(response, false) end end @@ -84,7 +87,7 @@ class Rainbows::EventMachine::Client < EM::Connection @body.callback do body.close if body.respond_to?(:close) @body = nil - alive ? receive_data('') : quit + alive ? receive_data(nil) : quit end return elsif st.socket? || st.pipe? @@ -102,6 +105,10 @@ class Rainbows::EventMachine::Client < EM::Connection quit unless alive end + def next! + @hp.keepalive? ? receive_data(@body = nil) : quit + end + def unbind async_close = @env[ASYNC_CLOSE] and async_close.succeed @body.respond_to?(:fail) and @body.fail diff --git a/lib/rainbows/event_machine/response_pipe.rb b/lib/rainbows/event_machine/response_pipe.rb index 2417dbe..3da2417 100644 --- a/lib/rainbows/event_machine/response_pipe.rb +++ b/lib/rainbows/event_machine/response_pipe.rb @@ -22,9 +22,8 @@ module Rainbows::EventMachine::ResponsePipe end def unbind - @client.body = nil - @alive ? @client.on_read('') : @client.quit @body.close if @body.respond_to?(:close) + @client.next! @io.close unless @io.closed? end end diff --git a/lib/rainbows/fiber/body.rb b/lib/rainbows/fiber/body.rb index 0fe2ec6..29926c6 100644 --- a/lib/rainbows/fiber/body.rb +++ b/lib/rainbows/fiber/body.rb @@ -13,7 +13,7 @@ module Rainbows::Fiber::Body # :nodoc: # the sendfile 1.0.0+ gem includes IO#sendfile_nonblock if IO.method_defined?(:sendfile_nonblock) def write_body_file(client, body, range) - sock, n = client.to_io, nil + sock, n, body = client.to_io, nil, body_to_io(body) offset, count = range ? range : [ 0, body.stat.size ] begin offset += (n = sock.sendfile_nonblock(body, offset, count)) @@ -23,6 +23,8 @@ module Rainbows::Fiber::Body # :nodoc: rescue EOFError break end while (count -= n) > 0 + ensure + close_if_private(body) end else ALIASES[:write_body] = :write_body_each diff --git a/lib/rainbows/response/body.rb b/lib/rainbows/response/body.rb index 2535374..e80217d 100644 --- a/lib/rainbows/response/body.rb +++ b/lib/rainbows/response/body.rb @@ -32,8 +32,14 @@ module Rainbows::Response::Body # :nodoc: FD_MAP = Rainbows::FD_MAP + class F < File; end + + def close_if_private(io) + io.close if F === io + end + def io_for_fd(fd) - FD_MAP.delete(fd) || IO.new(fd) + FD_MAP.delete(fd) || F.for_fd(fd) end # to_io is not part of the Rack spec, but make an exception here @@ -47,13 +53,16 @@ module Rainbows::Response::Body # :nodoc: # try to take advantage of Rainbows::DevFdResponse, calling File.open # is a last resort path = body.to_path - path =~ %r{\A/dev/fd/(\d+)\z} ? io_for_fd($1.to_i) : File.open(path) + path =~ %r{\A/dev/fd/(\d+)\z} ? io_for_fd($1.to_i) : F.open(path) end end if IO.method_defined?(:sendfile_nonblock) def write_body_file(sock, body, range) - range ? sock.sendfile(body, range[0], range[1]) : sock.sendfile(body, 0) + io = body_to_io(body) + range ? sock.sendfile(io, range[0], range[1]) : sock.sendfile(io, 0) + ensure + close_if_private(io) end end @@ -70,8 +79,6 @@ module Rainbows::Response::Body # :nodoc: # pread() semantics def write_body_stream(sock, body, range) IO.copy_stream(body, sock) - ensure - body.respond_to?(:close) and body.close end else # fall back to body#each, which is a Rack standard @@ -79,27 +86,19 @@ module Rainbows::Response::Body # :nodoc: end if method_defined?(:write_body_file) - # middlewares/apps may return with a body that responds to +to_path+ def write_body_path(sock, body, range) - inp = body_to_io(body) - if inp.stat.file? - begin - write_body_file(sock, inp, range) - ensure - inp.close if inp != body - end - else - write_body_stream(sock, inp, range) - end + stat = File.stat(body.to_path) + stat.file? ? write_body_file(sock, body, range) : + write_body_stream(sock, body, range) ensure - body.respond_to?(:close) && inp != body and body.close + body.respond_to?(:close) and body.close end elsif method_defined?(:write_body_stream) def write_body_path(sock, body, range) - write_body_stream(sock, inp = body_to_io(body), range) + write_body_stream(sock, body, range) ensure - body.respond_to?(:close) && inp != body and body.close + body.respond_to?(:close) and body.close end end diff --git a/lib/rainbows/rev/client.rb b/lib/rainbows/rev/client.rb index 00df4d3..e0bccf0 100644 --- a/lib/rainbows/rev/client.rb +++ b/lib/rainbows/rev/client.rb @@ -74,6 +74,7 @@ class Rainbows::Rev::Client < Rev::IO end def next! + attached? or return @deferred = nil enable_write_watcher end diff --git a/lib/rainbows/rev/deferred_response.rb b/lib/rainbows/rev/deferred_response.rb index 146f505..4a92ee4 100644 --- a/lib/rainbows/rev/deferred_response.rb +++ b/lib/rainbows/rev/deferred_response.rb @@ -14,7 +14,7 @@ class Rainbows::Rev::DeferredResponse < Rev::IO end def on_close - @client.next! if @client.attached? # attached? is false if write fails @body.respond_to?(:close) and @body.close + @client.next! end end diff --git a/lib/rainbows/revactor/body.rb b/lib/rainbows/revactor/body.rb index ad2bc55..7bfb5de 100644 --- a/lib/rainbows/revactor/body.rb +++ b/lib/rainbows/revactor/body.rb @@ -8,6 +8,7 @@ module Rainbows::Revactor::Body if IO.method_defined?(:sendfile_nonblock) def write_body_file(client, body, range) + body = body_to_io(body) sock = client.instance_variable_get(:@_io) pfx = Revactor::TCP::Socket === client ? :tcp : :unix write_complete = T[:"#{pfx}_write_complete", client] @@ -29,6 +30,8 @@ module Rainbows::Revactor::Body rescue EOFError break end while (count -= n) > 0 + ensure + close_if_private(body) end else ALIASES[:write_body] = :write_body_each diff --git a/lib/rainbows/sync_close.rb b/lib/rainbows/sync_close.rb new file mode 100644 index 0000000..a336262 --- /dev/null +++ b/lib/rainbows/sync_close.rb @@ -0,0 +1,37 @@ +# -*- encoding: binary -*- +# :enddoc: +require 'thread' +class Rainbows::SyncClose + def initialize(body) + @body = body + @mutex = Mutex.new + @cv = ConditionVariable.new + @mutex.synchronize do + yield self + @cv.wait(@mutex) + end + end + + def respond_to?(m) + @body.respond_to?(m) + end + + def to_path + @body.to_path + end + + def each(&block) + @body.each(&block) + end + + def to_io + @body.to_io + end + + # called by the writer thread to wake up the original thread (in #initialize) + def close + @body.close + ensure + @mutex.synchronize { @cv.signal } + end +end diff --git a/lib/rainbows/writer_thread_pool.rb b/lib/rainbows/writer_thread_pool.rb index 6896787..67c8e83 100644 --- a/lib/rainbows/writer_thread_pool.rb +++ b/lib/rainbows/writer_thread_pool.rb @@ -24,7 +24,13 @@ module Rainbows::WriterThreadPool @@q = nil def async_write_body(qclient, body, range) - qclient.q << [ qclient.to_io, :body, body, range ] + if body.respond_to?(:close) + Rainbows::SyncClose.new(body) do |body| + qclient.q << [ qclient.to_io, :body, body, range ] + end + else + qclient.q << [ qclient.to_io, :body, body, range ] + end end def process_client(client) # :nodoc: diff --git a/lib/rainbows/writer_thread_spawn.rb b/lib/rainbows/writer_thread_spawn.rb index 0e7d1a7..43e4f2c 100644 --- a/lib/rainbows/writer_thread_spawn.rb +++ b/lib/rainbows/writer_thread_spawn.rb @@ -23,7 +23,11 @@ module Rainbows::WriterThreadSpawn include Rainbows::Base def write_body(my_sock, body, range) # :nodoc: - my_sock.queue_body(body, range) + if body.respond_to?(:close) + Rainbows::SyncClose.new(body) { |body| my_sock.queue_body(body, range) } + else + my_sock.queue_body(body, range) + end end def process_client(client) # :nodoc: diff --git a/t/close-has-env.ru b/t/close-has-env.ru new file mode 100644 index 0000000..471f605 --- /dev/null +++ b/t/close-has-env.ru @@ -0,0 +1,65 @@ +#\ -E none +use Rainbows::DevFdResponse +class ClosablePipe < ::IO + attr_accessor :env + + def self.new(env) + rv = popen "echo hello", "rb" + rv.env = env + rv + end + + def close + super + $stdout.syswrite "path_info=#{@env['PATH_INFO']}\n" + end +end + +class ClosableFile < ::File + attr_accessor :env + alias to_path path + def close + super + $stdout.syswrite "path_info=#{@env['PATH_INFO']}\n" + end +end + +class Blob + def initialize(env) + @env = env + end + + def each(&block) + yield "BLOB\n" + end + + def close + $stdout.syswrite "path_info=#{@env['PATH_INFO']}\n" + end +end + +run(lambda { |env| + case env["PATH_INFO"] + when %r{\A/pipe/} + [ 200, + [ %w(Content-Length 6), %w(Content-Type text/plain)], + ClosablePipe.new(env) + ] + when %r{\A/file/} + f = ClosableFile.open("env.ru", "rb") + f.env = env + [ 200, { + 'X-Req-Path' => env["PATH_INFO"], + 'Content-Length' => f.stat.size.to_s, + 'Content-Type' => 'text/plain' }, + f + ] + when %r{\A/blob/} + [ 200, + [%w(Content-Length 5), %w(Content-Type text/plain)], + Blob.new(env) + ] + else + [ 404, [%w(Content-Length 0), %w(Content-Type text/plain)], [] ] + end +}) diff --git a/t/t0050-response-body-close-has-env.sh b/t/t0050-response-body-close-has-env.sh new file mode 100644 index 0000000..4d0cd6f --- /dev/null +++ b/t/t0050-response-body-close-has-env.sh @@ -0,0 +1,109 @@ +#!/bin/sh +. ./test-lib.sh + +t_plan 29 "keepalive does not clear Rack env prematurely for $model" + +t_begin "setup and start" && { + rainbows_setup + rtmpfiles curl_out curl_err + echo "preload_app true" >> $unicorn_config + rainbows -D close-has-env.ru -c $unicorn_config + rainbows_wait_start +} + +req_pipelined () { + pfx=$1 + t_begin "make pipelined requests to trigger $pfx response body" && { + > $r_out + ( + cat $fifo > $tmp & + printf 'GET /%s/1 HTTP/1.1\r\n' $pfx + printf 'Host: example.com\r\n\r\n' + printf 'GET /%s/2 HTTP/1.1\r\n' $pfx + printf 'Host: example.com\r\n\r\n' + printf 'GET /%s/3 HTTP/1.1\r\n' $pfx + printf 'Host: example.com\r\n' + printf 'Connection: close\r\n\r\n' + wait + echo ok > $ok + ) | socat - TCP4:$listen > $fifo + test xok = x$(cat $ok) + } +} + +reload () { + t_begin 'reloading Rainbows! to ensure writeout' && { + # reload to ensure everything is flushed + kill -HUP $rainbows_pid + test xSTART = x"$(cat $fifo)" + } +} + +check_log () { + pfx="$1" + t_begin "check body close messages" && { + < $r_out awk ' +/^path_info=\/'$pfx'\/[1-3]$/ { next } +{ exit(2) } +END { exit(NR == 3 ? 0 : 1) } +' + } +} + +req_keepalive () { + pfx="$1" + t_begin "make keepalive requests to trigger $pfx response body" && { + > $r_out + rm -f $curl_err $curl_out + curl -vsSf http://$listen/$pfx/[1-3] 2> $curl_err > $curl_out + } +} + +req_keepalive file +reload +check_log file + +req_pipelined file +reload +check_log file + +req_keepalive blob +reload +check_log blob + +req_pipelined blob +reload +check_log blob + +req_keepalive pipe +reload +check_log pipe + +req_pipelined pipe +reload +check_log pipe + +t_begin "enable sendfile gem" && { + echo "require 'sendfile'" >> $unicorn_config + curl http://$listen/ >/dev/null # ensure worker is loaded before HUP +} + +reload + +req_keepalive file +reload +check_log file + +req_pipelined file +reload +check_log file + +t_begin "killing succeeds" && { + kill $rainbows_pid +} + +t_begin "check stderr" && { + check_stderr +} + +t_done -- cgit v1.2.3-24-ge0c7