From 7b01d94dd9287ac402d91451f1e93c9faaf913c4 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Sun, 18 Oct 2009 15:59:29 -0700 Subject: rev: async response bodies with DevFdResponse middleware This new middleware should be a no-op for non-Rev concurrency models (or by explicitly setting env['rainbows.autochunk'] to false). Setting env['rainbows.autochunk'] to true (the default when Rev is used) allows (e)poll-able IO objects (sockets, pipes) to be sent asynchronously after app.call(env) returns. This also has a fortunate side effect of introducing a code path which allows large, static files to be sent without slurping them into a Rev IO::Buffer, too. This new change works even without the DevFdResponse middleware, so you won't have to reconfigure your app. This lets us epoll on response bodies that come in from a pipe or even a socket and send them either straight through or with chunked encoding. --- lib/rainbows/const.rb | 4 ++ lib/rainbows/dev_fd_response.rb | 69 ++++++++++++++++++++++++++++++++ lib/rainbows/http_server.rb | 1 + lib/rainbows/rev.rb | 89 +++++++++++++++++++++++++++++++++++++++-- 4 files changed, 160 insertions(+), 3 deletions(-) create mode 100644 lib/rainbows/dev_fd_response.rb (limited to 'lib/rainbows') diff --git a/lib/rainbows/const.rb b/lib/rainbows/const.rb index 417a5de..403a18a 100644 --- a/lib/rainbows/const.rb +++ b/lib/rainbows/const.rb @@ -9,6 +9,10 @@ module Rainbows RACK_DEFAULTS = ::Unicorn::HttpRequest::DEFAULTS.merge({ "SERVER_SOFTWARE" => "Rainbows! #{RAINBOWS_VERSION}", + + # using the Rev model, we'll automatically chunk pipe and socket objects + # if they're the response body + 'rainbows.autochunk' => false, }) CONN_CLOSE = "Connection: close\r\n" diff --git a/lib/rainbows/dev_fd_response.rb b/lib/rainbows/dev_fd_response.rb new file mode 100644 index 0000000..e4e5f0c --- /dev/null +++ b/lib/rainbows/dev_fd_response.rb @@ -0,0 +1,69 @@ +# -*- encoding: binary -*- + +module Rainbows + + # Rack response middleware wrapping any IO-like object with an + # OS-level file descriptor associated with it. May also be used to + # create responses from integer file descriptors or existing +IO+ + # objects. This may be used in conjunction with the #to_path method + # on servers that support it to pass arbitrary file descriptors into + # the HTTP response without additional open(2) syscalls + + class DevFdResponse < Struct.new(:app, :to_io, :to_path) + include Rack::Utils + + # Rack middleware entry point, we'll just pass through responses + # unless they respond to +to_io+ or +to_path+ + def call(env) + status, headers, body = response = app.call(env) + + # totally uninteresting to us if there's no body + return response if STATUS_WITH_NO_ENTITY_BODY.include?(status) + + io = body.to_io if body.respond_to?(:to_io) + io ||= File.open(body.to_path, 'rb') if body.respond_to?(:to_path) + return response if io.nil? + + headers = HeaderHash.new(headers) + st = io.stat + if st.file? + headers['Content-Length'] ||= st.size.to_s + headers.delete('Transfer-Encoding') + elsif st.pipe? || st.socket? # epoll-able things + if env['rainbows.autochunk'] + headers['Transfer-Encoding'] = 'chunked' + headers.delete('Content-Length') + else + headers['X-Rainbows-Autochunk'] = 'no' + end + else # unlikely, char/block device file, directory, ... + return response + end + resp = dup # be reentrant here + resp.to_path = "/dev/fd/#{io.fileno}" + resp.to_io = io + [ status, headers.to_hash, resp ] + end + + # called by the webserver or other middlewares if they can't + # handle #to_path + def each(&block) + to_io.each(&block) + end + + # remain Rack::Lint-compatible for people with wonky systems :P + unless File.exist?("/dev/fd/0") + alias to_path_orig to_path + undef_method :to_path + end + + # called by the web server after #each + def close + begin + to_io.close if to_io.respond_to?(:close) + rescue IOError # could've been IO::new()'ed and closed + end + end + + end # class +end diff --git a/lib/rainbows/http_server.rb b/lib/rainbows/http_server.rb index 6d61228..5521513 100644 --- a/lib/rainbows/http_server.rb +++ b/lib/rainbows/http_server.rb @@ -33,6 +33,7 @@ module Rainbows extend(mod) Const::RACK_DEFAULTS['rainbows.model'] = @use = model Const::RACK_DEFAULTS['rack.multithread'] = !!(/Thread/ =~ model.to_s) + Const::RACK_DEFAULTS['rainbows.autochunk'] = (model.to_s == "Rev") end def worker_connections(*args) diff --git a/lib/rainbows/rev.rb b/lib/rainbows/rev.rb index fd25200..c73228a 100644 --- a/lib/rainbows/rev.rb +++ b/lib/rainbows/rev.rb @@ -40,6 +40,12 @@ module Rainbows include Rainbows::Const G = Rainbows::G + # queued, optional response bodies, it should only be unpollable "fast" + # devices where read(2) is uninterruptable. Unfortunately, NFS and ilk + # are also part of this. We'll also stick AsyncResponse bodies in + # here to prevent connections from being closed on us. + attr_reader :deferred_bodies + def initialize(io) G.cur += 1 super(io) @@ -48,10 +54,17 @@ module Rainbows @hp = HttpParser.new @state = :headers # [ :body [ :trailers ] ] :app_call :close @buf = "" + @deferred_bodies = [] # for (fast) regular files only end - def handle_error(e) + # graceful exit, like SIGQUIT + def quit + @deferred_bodies.clear @state = :close + end + + def handle_error(e) + quit msg = case e when EOFError,Errno::ECONNRESET,Errno::EPIPE,Errno::EINVAL,Errno::EBADF ERROR_500_RESPONSE @@ -73,7 +86,12 @@ module Rainbows response = G.app.call(@env.update(RACK_DEFAULTS)) alive &&= G.alive out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if @hp.headers? - HttpResponse.write(self, response, out) + + if response.last.respond_to?(:to_path) + AsyncResponse.new(self, response, out) + else + HttpResponse.write(self, response, out) + end if alive @env.clear @hp.reset @@ -88,7 +106,21 @@ module Rainbows end def on_write_complete - :close == @state and close + if body = @deferred_bodies.first + return if AsyncResponse === body + begin + begin + write(body.sysread(CHUNK_SIZE)) + rescue EOFError # expected at file EOF + @deferred_bodies.shift + body.close + end + rescue Object => e + handle_error(e) + end + else + close if :close == @state + end end def on_close @@ -156,6 +188,57 @@ module Rainbows end + class AsyncResponse < ::Rev::IO + include Unicorn + include Rainbows::Const + G = Rainbows::G + + def initialize(client, response, out) + @client = client + @body = response.last # have to consider response being frozen + + # to_io is not part of the Rack spec, but make an exception + # here since we can't get here without checking to_path first + io = @body.to_io if @body.respond_to?(:to_io) + io ||= ::IO.new($1.to_i) if @body.to_path =~ %r{\A/dev/fd/(\d+)\z} + io ||= File.open(@body.to_path, 'rb') # could be a FIFO + + headers = Rack::Utils::HeaderHash.new(response[1]) + @do_chunk = !!(headers['Transfer-Encoding'] =~ %r{\Achunked\z}i) + @do_chunk = false if headers.delete('X-Rainbows-Autochunk') == 'no' + + st = io.stat + if st.socket? || st.pipe? + super(io) + client.deferred_bodies << attach(::Rev::Loop.default) + + # too tricky to support keepalive/pipelining when a response can + # take an indeterminate amount of time here. + out = [ CONN_CLOSE ] if out + elsif st.file? + headers.delete('Transfer-Encoding') + headers['Content-Length'] ||= st.size.to_s + client.deferred_bodies << io + else # char/block device, directory, whatever... nobody cares + return HttpResponse.write(@client, response, out) + end + response = [ response.first, headers.to_hash, [] ] + HttpResponse.write(@client, response, out) + end + + def on_read(data) + @do_chunk and @client.write(sprintf("%x\r\n", data.size)) + @client.write(data) + @do_chunk and @client.write("\r\n") + end + + def on_close + @do_chunk and @client.write("0\r\n\r\n") + @client.quit + @body.respond_to?(:close) and @body.close + end + end + # This timer handles the fchmod heartbeat to prevent our master # from killing us. class Heartbeat < ::Rev::TimerWatcher -- cgit v1.2.3-24-ge0c7