From 052e2b3608071d9cd9d6b1d12f8cb69ac29124af Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Sun, 25 Jul 2010 09:28:22 +0000 Subject: rev*: properly handle pipelined responses w/sendfile With sendfile enabled, we must avoid writing headers (or normal, non-file responses) while a file is deferred for sending. This means we must disable processing of new requests while a file is deferred for sending and use the on_write_complete callback less aggressively. --- lib/rainbows/rev/client.rb | 118 ++++++++++++++++++++-------------- lib/rainbows/rev/deferred_response.rb | 2 +- lib/rainbows/rev/thread.rb | 12 +--- 3 files changed, 74 insertions(+), 58 deletions(-) (limited to 'lib') diff --git a/lib/rainbows/rev/client.rb b/lib/rainbows/rev/client.rb index 4d88b7b..64784eb 100644 --- a/lib/rainbows/rev/client.rb +++ b/lib/rainbows/rev/client.rb @@ -14,12 +14,12 @@ module Rainbows CONN[self] = false super(io) post_init - @deferred_bodies = [] # for (fast) regular files only + @deferred = nil end def quit super - close if @deferred_bodies.empty? && @_write_buffer.empty? + close if @deferred.nil? && @_write_buffer.empty? end # override the ::Rev::IO#write method try to write directly to the @@ -29,16 +29,14 @@ module Rainbows if @_write_buffer.empty? begin w = @_io.write_nonblock(buf) - if w == Rack::Utils.bytesize(buf) - return on_write_complete - end + return enable_write_watcher if w == Rack::Utils.bytesize(buf) # we never care for the return value, but yes, we may return # a "fake" short write from super(buf) if anybody cares. buf = byte_slice(buf, w..-1) rescue Errno::EAGAIN break # fall through to super(buf) - rescue - return close + rescue => e + return handle_error(e) end while true end super(buf) @@ -49,19 +47,22 @@ module Rainbows # are also part of this. We'll also stick DeferredResponse bodies in # here to prevent connections from being closed on us. def defer_body(io) - @deferred_bodies << io - @_write_buffer.empty? ? on_write_complete : schedule_write + @deferred = io + enable_write_watcher end - def next - @deferred_bodies.shift - if :close == @state && @deferred_bodies.empty? && @_write_buffer.empty? - close - end + # allows enabling of write watcher even when read watcher is disabled + def evloop + Rainbows::Rev::Server::LOOP + end + + def next! + @deferred = nil + on_write_complete end def timeout? - @_write_buffer.empty? && @deferred_bodies.empty? and close.nil? + @deferred.nil? && @_write_buffer.empty? and close.nil? end # used for streaming sockets and pipes @@ -101,50 +102,73 @@ module Rainbows end def app_call - begin - KATO.delete(self) - @env[RACK_INPUT] = @input - @env[REMOTE_ADDR] = @remote_addr - response = APP.call(@env.update(RACK_DEFAULTS)) - - rev_write_response(response, alive = @hp.keepalive? && G.alive) - if alive - @env.clear - @hp.reset - @state = :headers - # keepalive requests are always body-less, so @input is unchanged - @hp.headers(@env, @buf) and next - KATO[self] = Time.now + KATO.delete(self) + @env[RACK_INPUT] = @input + @env[REMOTE_ADDR] = @remote_addr + response = APP.call(@env.update(RACK_DEFAULTS)) + + rev_write_response(response, alive = @hp.keepalive? && G.alive) + return quit unless alive && :close != @state + @env.clear + @hp.reset + @state = :headers + disable if enabled? + end + + def on_write_complete + case @deferred + when DeferredResponse then return + when NilClass # fall through + else + begin + return rev_sendfile(@deferred) + rescue EOFError # expected at file EOF + close_deferred + end + end + + case @state + when :close + close if @_write_buffer.empty? + when :headers + if @hp.headers(@env, @buf) + app_call else - quit + unless enabled? + enable + KATO[self] = Time.now + end end - return - end while true + end + rescue => e + handle_error(e) end - def on_write_complete - if body = @deferred_bodies[0] - # no socket or pipes, body must be a regular file to continue here - return if DeferredResponse === body + def handle_error(e) + close_deferred + if msg = Error.response(e) + @_io.write_nonblock(msg) rescue nil + end + @_write_buffer.clear + ensure + quit + end + def close_deferred + case @deferred + when DeferredResponse, NilClass + else begin - rev_sendfile(body) - rescue EOFError # expected at file EOF - @deferred_bodies.shift - body.close - close if :close == @state && @deferred_bodies.empty? + @deferred.close rescue => e - handle_error(e) + G.server.logger.error("closing #@deferred: #{e}") end - else - close if :close == @state + @deferred = nil end end def on_close - while f = @deferred_bodies.shift - DeferredResponse === f or f.close - end + close_deferred CONN.delete(self) end diff --git a/lib/rainbows/rev/deferred_response.rb b/lib/rainbows/rev/deferred_response.rb index 13396d8..7e00918 100644 --- a/lib/rainbows/rev/deferred_response.rb +++ b/lib/rainbows/rev/deferred_response.rb @@ -14,7 +14,7 @@ class Rainbows::Rev::DeferredResponse < ::Rev::IO end def on_close - @client.next + @client.next! @body.respond_to?(:close) and @body.close end end diff --git a/lib/rainbows/rev/thread.rb b/lib/rainbows/rev/thread.rb index 2dbaa84..cce3e92 100644 --- a/lib/rainbows/rev/thread.rb +++ b/lib/rainbows/rev/thread.rb @@ -13,28 +13,20 @@ module Rainbows def app_call KATO.delete(self) - disable + disable if enabled? @env[RACK_INPUT] = @input app_dispatch # must be implemented by subclass end # this is only called in the master thread def response_write(response) - enable alive = @hp.keepalive? && G.alive rev_write_response(response, alive) - return quit unless alive + return quit unless alive && :close != @state @env.clear @hp.reset @state = :headers - # keepalive requests are always body-less, so @input is unchanged - if @hp.headers(@env, @buf) - @input = HttpRequest::NULL_IO - app_call - else - KATO[self] = Time.now - end end # fails-safe application dispatch, we absolutely cannot -- cgit v1.2.3-24-ge0c7