diff options
Diffstat (limited to 'lib/rainbows/rev.rb')
-rw-r--r-- | lib/rainbows/rev.rb | 89 |
1 files changed, 86 insertions, 3 deletions
diff --git a/lib/rainbows/rev.rb b/lib/rainbows/rev.rb index fd25200..c73228a 100644 --- a/lib/rainbows/rev.rb +++ b/lib/rainbows/rev.rb @@ -40,6 +40,12 @@ module Rainbows include Rainbows::Const G = Rainbows::G + # queued, optional response bodies, it should only be unpollable "fast" + # devices where read(2) is uninterruptable. Unfortunately, NFS and ilk + # are also part of this. We'll also stick AsyncResponse bodies in + # here to prevent connections from being closed on us. + attr_reader :deferred_bodies + def initialize(io) G.cur += 1 super(io) @@ -48,10 +54,17 @@ module Rainbows @hp = HttpParser.new @state = :headers # [ :body [ :trailers ] ] :app_call :close @buf = "" + @deferred_bodies = [] # for (fast) regular files only end - def handle_error(e) + # graceful exit, like SIGQUIT + def quit + @deferred_bodies.clear @state = :close + end + + def handle_error(e) + quit msg = case e when EOFError,Errno::ECONNRESET,Errno::EPIPE,Errno::EINVAL,Errno::EBADF ERROR_500_RESPONSE @@ -73,7 +86,12 @@ module Rainbows response = G.app.call(@env.update(RACK_DEFAULTS)) alive &&= G.alive out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if @hp.headers? - HttpResponse.write(self, response, out) + + if response.last.respond_to?(:to_path) + AsyncResponse.new(self, response, out) + else + HttpResponse.write(self, response, out) + end if alive @env.clear @hp.reset @@ -88,7 +106,21 @@ module Rainbows end def on_write_complete - :close == @state and close + if body = @deferred_bodies.first + return if AsyncResponse === body + begin + begin + write(body.sysread(CHUNK_SIZE)) + rescue EOFError # expected at file EOF + @deferred_bodies.shift + body.close + end + rescue Object => e + handle_error(e) + end + else + close if :close == @state + end end def on_close @@ -156,6 +188,57 @@ module Rainbows end + class AsyncResponse < ::Rev::IO + include Unicorn + include Rainbows::Const + G = Rainbows::G + + def initialize(client, response, out) + @client = client + @body = response.last # have to consider response being frozen + + # to_io is not part of the Rack spec, but make an exception + # here since we can't get here without checking to_path first + io = @body.to_io if @body.respond_to?(:to_io) + io ||= ::IO.new($1.to_i) if @body.to_path =~ %r{\A/dev/fd/(\d+)\z} + io ||= File.open(@body.to_path, 'rb') # could be a FIFO + + headers = Rack::Utils::HeaderHash.new(response[1]) + @do_chunk = !!(headers['Transfer-Encoding'] =~ %r{\Achunked\z}i) + @do_chunk = false if headers.delete('X-Rainbows-Autochunk') == 'no' + + st = io.stat + if st.socket? || st.pipe? + super(io) + client.deferred_bodies << attach(::Rev::Loop.default) + + # too tricky to support keepalive/pipelining when a response can + # take an indeterminate amount of time here. + out = [ CONN_CLOSE ] if out + elsif st.file? + headers.delete('Transfer-Encoding') + headers['Content-Length'] ||= st.size.to_s + client.deferred_bodies << io + else # char/block device, directory, whatever... nobody cares + return HttpResponse.write(@client, response, out) + end + response = [ response.first, headers.to_hash, [] ] + HttpResponse.write(@client, response, out) + end + + def on_read(data) + @do_chunk and @client.write(sprintf("%x\r\n", data.size)) + @client.write(data) + @do_chunk and @client.write("\r\n") + end + + def on_close + @do_chunk and @client.write("0\r\n\r\n") + @client.quit + @body.respond_to?(:close) and @body.close + end + end + # This timer handles the fchmod heartbeat to prevent our master # from killing us. class Heartbeat < ::Rev::TimerWatcher |