about summary refs log tree commit homepage
path: root/lib
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2009-10-18 15:59:29 -0700
committerEric Wong <normalperson@yhbt.net>2009-10-18 21:25:47 -0700
commit7b01d94dd9287ac402d91451f1e93c9faaf913c4 (patch)
tree8f4005f4e92108748af53b8cbf709522f33419db /lib
parentd0103759ae63b0ed1084f6a9d2b7ede538e8c871 (diff)
downloadrainbows-7b01d94dd9287ac402d91451f1e93c9faaf913c4.tar.gz
This new middleware should be a no-op for non-Rev concurrency
models (or by explicitly setting env['rainbows.autochunk'] to
false).

Setting env['rainbows.autochunk'] to true (the default when Rev
is used) allows (e)poll-able IO objects (sockets, pipes) to be
sent asynchronously after app.call(env) returns.

This also has a fortunate side effect of introducing a code path
which allows large, static files to be sent without slurping
them into a Rev IO::Buffer, too.  This new change works even
without the DevFdResponse middleware, so you won't have to
reconfigure your app.

This lets us epoll on response bodies that come in from a pipe
or even a socket and send them either straight through or with
chunked encoding.
Diffstat (limited to 'lib')
-rw-r--r--lib/rainbows.rb1
-rw-r--r--lib/rainbows/const.rb4
-rw-r--r--lib/rainbows/dev_fd_response.rb69
-rw-r--r--lib/rainbows/http_server.rb1
-rw-r--r--lib/rainbows/rev.rb89
5 files changed, 161 insertions, 3 deletions
diff --git a/lib/rainbows.rb b/lib/rainbows.rb
index 096f700..aa58fab 100644
--- a/lib/rainbows.rb
+++ b/lib/rainbows.rb
@@ -14,6 +14,7 @@ module Rainbows
   require 'rainbows/http_response'
   require 'rainbows/base'
   autoload :AppPool, 'rainbows/app_pool'
+  autoload :DevFdResponse, 'rainbows/dev_fd_response'
 
   class << self
 
diff --git a/lib/rainbows/const.rb b/lib/rainbows/const.rb
index 417a5de..403a18a 100644
--- a/lib/rainbows/const.rb
+++ b/lib/rainbows/const.rb
@@ -9,6 +9,10 @@ module Rainbows
 
     RACK_DEFAULTS = ::Unicorn::HttpRequest::DEFAULTS.merge({
       "SERVER_SOFTWARE" => "Rainbows! #{RAINBOWS_VERSION}",
+
+      # using the Rev model, we'll automatically chunk pipe and socket objects
+      # if they're the response body
+      'rainbows.autochunk' => false,
     })
 
     CONN_CLOSE = "Connection: close\r\n"
diff --git a/lib/rainbows/dev_fd_response.rb b/lib/rainbows/dev_fd_response.rb
new file mode 100644
index 0000000..e4e5f0c
--- /dev/null
+++ b/lib/rainbows/dev_fd_response.rb
@@ -0,0 +1,69 @@
+# -*- encoding: binary -*-
+
+module Rainbows
+
+  # Rack response middleware wrapping any IO-like object with an
+  # OS-level file descriptor associated with it.  May also be used to
+  # create responses from integer file descriptors or existing +IO+
+  # objects.  This may be used in conjunction with the #to_path method
+  # on servers that support it to pass arbitrary file descriptors into
+  # the HTTP response without additional open(2) syscalls
+
+  class DevFdResponse < Struct.new(:app, :to_io, :to_path)
+    include Rack::Utils
+
+    # Rack middleware entry point, we'll just pass through responses
+    # unless they respond to +to_io+ or +to_path+
+    def call(env)
+      status, headers, body = response = app.call(env)
+
+      # totally uninteresting to us if there's no body
+      return response if STATUS_WITH_NO_ENTITY_BODY.include?(status)
+
+      io = body.to_io if body.respond_to?(:to_io)
+      io ||= File.open(body.to_path, 'rb') if body.respond_to?(:to_path)
+      return response if io.nil?
+
+      headers = HeaderHash.new(headers)
+      st = io.stat
+      if st.file?
+        headers['Content-Length'] ||= st.size.to_s
+        headers.delete('Transfer-Encoding')
+      elsif st.pipe? || st.socket? # epoll-able things
+        if env['rainbows.autochunk']
+          headers['Transfer-Encoding'] = 'chunked'
+          headers.delete('Content-Length')
+        else
+          headers['X-Rainbows-Autochunk'] = 'no'
+        end
+      else # unlikely, char/block device file, directory, ...
+        return response
+      end
+      resp = dup # be reentrant here
+      resp.to_path = "/dev/fd/#{io.fileno}"
+      resp.to_io = io
+      [ status, headers.to_hash, resp ]
+    end
+
+    # called by the webserver or other middlewares if they can't
+    # handle #to_path
+    def each(&block)
+      to_io.each(&block)
+    end
+
+    # remain Rack::Lint-compatible for people with wonky systems :P
+    unless File.exist?("/dev/fd/0")
+      alias to_path_orig to_path
+      undef_method :to_path
+    end
+
+    # called by the web server after #each
+    def close
+      begin
+        to_io.close if to_io.respond_to?(:close)
+      rescue IOError # could've been IO::new()'ed and closed
+      end
+    end
+
+  end # class
+end
diff --git a/lib/rainbows/http_server.rb b/lib/rainbows/http_server.rb
index 6d61228..5521513 100644
--- a/lib/rainbows/http_server.rb
+++ b/lib/rainbows/http_server.rb
@@ -33,6 +33,7 @@ module Rainbows
       extend(mod)
       Const::RACK_DEFAULTS['rainbows.model'] = @use = model
       Const::RACK_DEFAULTS['rack.multithread'] = !!(/Thread/ =~ model.to_s)
+      Const::RACK_DEFAULTS['rainbows.autochunk'] = (model.to_s == "Rev")
     end
 
     def worker_connections(*args)
diff --git a/lib/rainbows/rev.rb b/lib/rainbows/rev.rb
index fd25200..c73228a 100644
--- a/lib/rainbows/rev.rb
+++ b/lib/rainbows/rev.rb
@@ -40,6 +40,12 @@ module Rainbows
       include Rainbows::Const
       G = Rainbows::G
 
+      # queued, optional response bodies, it should only be unpollable "fast"
+      # devices where read(2) is uninterruptable.  Unfortunately, NFS and ilk
+      # are also part of this.  We'll also stick AsyncResponse bodies in
+      # here to prevent connections from being closed on us.
+      attr_reader :deferred_bodies
+
       def initialize(io)
         G.cur += 1
         super(io)
@@ -48,10 +54,17 @@ module Rainbows
         @hp = HttpParser.new
         @state = :headers # [ :body [ :trailers ] ] :app_call :close
         @buf = ""
+        @deferred_bodies = [] # for (fast) regular files only
       end
 
-      def handle_error(e)
+      # graceful exit, like SIGQUIT
+      def quit
+        @deferred_bodies.clear
         @state = :close
+      end
+
+      def handle_error(e)
+        quit
         msg = case e
         when EOFError,Errno::ECONNRESET,Errno::EPIPE,Errno::EINVAL,Errno::EBADF
           ERROR_500_RESPONSE
@@ -73,7 +86,12 @@ module Rainbows
           response = G.app.call(@env.update(RACK_DEFAULTS))
           alive &&= G.alive
           out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if @hp.headers?
-          HttpResponse.write(self, response, out)
+
+          if response.last.respond_to?(:to_path)
+            AsyncResponse.new(self, response, out)
+          else
+            HttpResponse.write(self, response, out)
+          end
           if alive
             @env.clear
             @hp.reset
@@ -88,7 +106,21 @@ module Rainbows
       end
 
       def on_write_complete
-        :close == @state and close
+        if body = @deferred_bodies.first
+          return if AsyncResponse === body
+          begin
+            begin
+              write(body.sysread(CHUNK_SIZE))
+            rescue EOFError # expected at file EOF
+              @deferred_bodies.shift
+              body.close
+            end
+          rescue Object => e
+            handle_error(e)
+          end
+        else
+          close if :close == @state
+        end
       end
 
       def on_close
@@ -156,6 +188,57 @@ module Rainbows
 
     end
 
+    class AsyncResponse < ::Rev::IO
+      include Unicorn
+      include Rainbows::Const
+      G = Rainbows::G
+
+      def initialize(client, response, out)
+        @client = client
+        @body = response.last # have to consider response being frozen
+
+        # to_io is not part of the Rack spec, but make an exception
+        # here since we can't get here without checking to_path first
+        io = @body.to_io if @body.respond_to?(:to_io)
+        io ||= ::IO.new($1.to_i) if @body.to_path =~ %r{\A/dev/fd/(\d+)\z}
+        io ||= File.open(@body.to_path, 'rb') # could be a FIFO
+
+        headers = Rack::Utils::HeaderHash.new(response[1])
+        @do_chunk = !!(headers['Transfer-Encoding'] =~ %r{\Achunked\z}i)
+        @do_chunk = false if headers.delete('X-Rainbows-Autochunk') == 'no'
+
+        st = io.stat
+        if st.socket? || st.pipe?
+          super(io)
+          client.deferred_bodies << attach(::Rev::Loop.default)
+
+          # too tricky to support keepalive/pipelining when a response can
+          # take an indeterminate amount of time here.
+          out = [ CONN_CLOSE ] if out
+        elsif st.file?
+          headers.delete('Transfer-Encoding')
+          headers['Content-Length'] ||= st.size.to_s
+          client.deferred_bodies << io
+        else # char/block device, directory, whatever... nobody cares
+          return HttpResponse.write(@client, response, out)
+        end
+        response = [ response.first, headers.to_hash, [] ]
+        HttpResponse.write(@client, response, out)
+      end
+
+      def on_read(data)
+        @do_chunk and @client.write(sprintf("%x\r\n", data.size))
+        @client.write(data)
+        @do_chunk and @client.write("\r\n")
+      end
+
+      def on_close
+        @do_chunk and @client.write("0\r\n\r\n")
+        @client.quit
+        @body.respond_to?(:close) and @body.close
+      end
+    end
+
     # This timer handles the fchmod heartbeat to prevent our master
     # from killing us.
     class Heartbeat < ::Rev::TimerWatcher