about summary refs log tree commit homepage
path: root/lib/rainbows/rev.rb
diff options
context:
space:
mode:
Diffstat (limited to 'lib/rainbows/rev.rb')
-rw-r--r--lib/rainbows/rev.rb89
1 files changed, 86 insertions, 3 deletions
diff --git a/lib/rainbows/rev.rb b/lib/rainbows/rev.rb
index fd25200..c73228a 100644
--- a/lib/rainbows/rev.rb
+++ b/lib/rainbows/rev.rb
@@ -40,6 +40,12 @@ module Rainbows
       include Rainbows::Const
       G = Rainbows::G
 
+      # queued, optional response bodies, it should only be unpollable "fast"
+      # devices where read(2) is uninterruptable.  Unfortunately, NFS and ilk
+      # are also part of this.  We'll also stick AsyncResponse bodies in
+      # here to prevent connections from being closed on us.
+      attr_reader :deferred_bodies
+
       def initialize(io)
         G.cur += 1
         super(io)
@@ -48,10 +54,17 @@ module Rainbows
         @hp = HttpParser.new
         @state = :headers # [ :body [ :trailers ] ] :app_call :close
         @buf = ""
+        @deferred_bodies = [] # for (fast) regular files only
       end
 
-      def handle_error(e)
+      # graceful exit, like SIGQUIT
+      def quit
+        @deferred_bodies.clear
         @state = :close
+      end
+
+      def handle_error(e)
+        quit
         msg = case e
         when EOFError,Errno::ECONNRESET,Errno::EPIPE,Errno::EINVAL,Errno::EBADF
           ERROR_500_RESPONSE
@@ -73,7 +86,12 @@ module Rainbows
           response = G.app.call(@env.update(RACK_DEFAULTS))
           alive &&= G.alive
           out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if @hp.headers?
-          HttpResponse.write(self, response, out)
+
+          if response.last.respond_to?(:to_path)
+            AsyncResponse.new(self, response, out)
+          else
+            HttpResponse.write(self, response, out)
+          end
           if alive
             @env.clear
             @hp.reset
@@ -88,7 +106,21 @@ module Rainbows
       end
 
       def on_write_complete
-        :close == @state and close
+        if body = @deferred_bodies.first
+          return if AsyncResponse === body
+          begin
+            begin
+              write(body.sysread(CHUNK_SIZE))
+            rescue EOFError # expected at file EOF
+              @deferred_bodies.shift
+              body.close
+            end
+          rescue Object => e
+            handle_error(e)
+          end
+        else
+          close if :close == @state
+        end
       end
 
       def on_close
@@ -156,6 +188,57 @@ module Rainbows
 
     end
 
+    class AsyncResponse < ::Rev::IO
+      include Unicorn
+      include Rainbows::Const
+      G = Rainbows::G
+
+      def initialize(client, response, out)
+        @client = client
+        @body = response.last # have to consider response being frozen
+
+        # to_io is not part of the Rack spec, but make an exception
+        # here since we can't get here without checking to_path first
+        io = @body.to_io if @body.respond_to?(:to_io)
+        io ||= ::IO.new($1.to_i) if @body.to_path =~ %r{\A/dev/fd/(\d+)\z}
+        io ||= File.open(@body.to_path, 'rb') # could be a FIFO
+
+        headers = Rack::Utils::HeaderHash.new(response[1])
+        @do_chunk = !!(headers['Transfer-Encoding'] =~ %r{\Achunked\z}i)
+        @do_chunk = false if headers.delete('X-Rainbows-Autochunk') == 'no'
+
+        st = io.stat
+        if st.socket? || st.pipe?
+          super(io)
+          client.deferred_bodies << attach(::Rev::Loop.default)
+
+          # too tricky to support keepalive/pipelining when a response can
+          # take an indeterminate amount of time here.
+          out = [ CONN_CLOSE ] if out
+        elsif st.file?
+          headers.delete('Transfer-Encoding')
+          headers['Content-Length'] ||= st.size.to_s
+          client.deferred_bodies << io
+        else # char/block device, directory, whatever... nobody cares
+          return HttpResponse.write(@client, response, out)
+        end
+        response = [ response.first, headers.to_hash, [] ]
+        HttpResponse.write(@client, response, out)
+      end
+
+      def on_read(data)
+        @do_chunk and @client.write(sprintf("%x\r\n", data.size))
+        @client.write(data)
+        @do_chunk and @client.write("\r\n")
+      end
+
+      def on_close
+        @do_chunk and @client.write("0\r\n\r\n")
+        @client.quit
+        @body.respond_to?(:close) and @body.close
+      end
+    end
+
     # This timer handles the fchmod heartbeat to prevent our master
     # from killing us.
     class Heartbeat < ::Rev::TimerWatcher