about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2010-07-25 09:28:22 +0000
committerEric Wong <normalperson@yhbt.net>2010-07-26 09:23:49 +0000
commit052e2b3608071d9cd9d6b1d12f8cb69ac29124af (patch)
treed1c72552fd8ccc7dd5d3ad3a4dd76fbf68ebea72
parent84ac2eaa8bd16e44420abf660420698f76ad5473 (diff)
downloadrainbows-052e2b3608071d9cd9d6b1d12f8cb69ac29124af.tar.gz
With sendfile enabled, we must avoid writing headers (or normal,
non-file responses) while a file is deferred for sending.  This
means we must disable processing of new requests while a file
is deferred for sending and use the on_write_complete callback
less aggressively.
-rw-r--r--lib/rainbows/rev/client.rb118
-rw-r--r--lib/rainbows/rev/deferred_response.rb2
-rw-r--r--lib/rainbows/rev/thread.rb12
-rwxr-xr-xt/t0024-pipelined-sendfile-response.sh67
-rw-r--r--t/test_isolate.rb1
5 files changed, 142 insertions, 58 deletions
diff --git a/lib/rainbows/rev/client.rb b/lib/rainbows/rev/client.rb
index 4d88b7b..64784eb 100644
--- a/lib/rainbows/rev/client.rb
+++ b/lib/rainbows/rev/client.rb
@@ -14,12 +14,12 @@ module Rainbows
         CONN[self] = false
         super(io)
         post_init
-        @deferred_bodies = [] # for (fast) regular files only
+        @deferred = nil
       end
 
       def quit
         super
-        close if @deferred_bodies.empty? && @_write_buffer.empty?
+        close if @deferred.nil? && @_write_buffer.empty?
       end
 
       # override the ::Rev::IO#write method try to write directly to the
@@ -29,16 +29,14 @@ module Rainbows
         if @_write_buffer.empty?
           begin
             w = @_io.write_nonblock(buf)
-            if w == Rack::Utils.bytesize(buf)
-              return on_write_complete
-            end
+            return enable_write_watcher if w == Rack::Utils.bytesize(buf)
             # we never care for the return value, but yes, we may return
             # a "fake" short write from super(buf) if anybody cares.
             buf = byte_slice(buf, w..-1)
           rescue Errno::EAGAIN
             break # fall through to super(buf)
-          rescue
-            return close
+          rescue => e
+            return handle_error(e)
           end while true
         end
         super(buf)
@@ -49,19 +47,22 @@ module Rainbows
       # are also part of this.  We'll also stick DeferredResponse bodies in
       # here to prevent connections from being closed on us.
       def defer_body(io)
-        @deferred_bodies << io
-        @_write_buffer.empty? ? on_write_complete : schedule_write
+        @deferred = io
+        enable_write_watcher
       end
 
-      def next
-        @deferred_bodies.shift
-        if :close == @state && @deferred_bodies.empty? && @_write_buffer.empty?
-          close
-        end
+      # allows enabling of write watcher even when read watcher is disabled
+      def evloop
+        Rainbows::Rev::Server::LOOP
+      end
+
+      def next!
+        @deferred = nil
+        on_write_complete
       end
 
       def timeout?
-        @_write_buffer.empty? && @deferred_bodies.empty? and close.nil?
+        @deferred.nil? && @_write_buffer.empty? and close.nil?
       end
 
       # used for streaming sockets and pipes
@@ -101,50 +102,73 @@ module Rainbows
       end
 
       def app_call
-        begin
-          KATO.delete(self)
-          @env[RACK_INPUT] = @input
-          @env[REMOTE_ADDR] = @remote_addr
-          response = APP.call(@env.update(RACK_DEFAULTS))
-
-          rev_write_response(response, alive = @hp.keepalive? && G.alive)
-          if alive
-            @env.clear
-            @hp.reset
-            @state = :headers
-            # keepalive requests are always body-less, so @input is unchanged
-            @hp.headers(@env, @buf) and next
-            KATO[self] = Time.now
+        KATO.delete(self)
+        @env[RACK_INPUT] = @input
+        @env[REMOTE_ADDR] = @remote_addr
+        response = APP.call(@env.update(RACK_DEFAULTS))
+
+        rev_write_response(response, alive = @hp.keepalive? && G.alive)
+        return quit unless alive && :close != @state
+        @env.clear
+        @hp.reset
+        @state = :headers
+        disable if enabled?
+      end
+
+      def on_write_complete
+        case @deferred
+        when DeferredResponse then return
+        when NilClass # fall through
+        else
+          begin
+            return rev_sendfile(@deferred)
+          rescue EOFError # expected at file EOF
+            close_deferred
+          end
+        end
+
+        case @state
+        when :close
+          close if @_write_buffer.empty?
+        when :headers
+          if @hp.headers(@env, @buf)
+            app_call
           else
-            quit
+            unless enabled?
+              enable
+              KATO[self] = Time.now
+            end
           end
-          return
-        end while true
+        end
+        rescue => e
+          handle_error(e)
       end
 
-      def on_write_complete
-        if body = @deferred_bodies[0]
-          # no socket or pipes, body must be a regular file to continue here
-          return if DeferredResponse === body
+      def handle_error(e)
+        close_deferred
+        if msg = Error.response(e)
+          @_io.write_nonblock(msg) rescue nil
+        end
+        @_write_buffer.clear
+        ensure
+          quit
+      end
 
+      def close_deferred
+        case @deferred
+        when DeferredResponse, NilClass
+        else
           begin
-            rev_sendfile(body)
-          rescue EOFError # expected at file EOF
-            @deferred_bodies.shift
-            body.close
-            close if :close == @state && @deferred_bodies.empty?
+            @deferred.close
           rescue => e
-            handle_error(e)
+            G.server.logger.error("closing #@deferred: #{e}")
           end
-        else
-          close if :close == @state
+          @deferred = nil
         end
       end
 
       def on_close
-        while f = @deferred_bodies.shift
-          DeferredResponse === f or f.close
-        end
+        close_deferred
         CONN.delete(self)
       end
 
diff --git a/lib/rainbows/rev/deferred_response.rb b/lib/rainbows/rev/deferred_response.rb
index 13396d8..7e00918 100644
--- a/lib/rainbows/rev/deferred_response.rb
+++ b/lib/rainbows/rev/deferred_response.rb
@@ -14,7 +14,7 @@ class Rainbows::Rev::DeferredResponse < ::Rev::IO
   end
 
   def on_close
-    @client.next
+    @client.next!
     @body.respond_to?(:close) and @body.close
   end
 end
diff --git a/lib/rainbows/rev/thread.rb b/lib/rainbows/rev/thread.rb
index 2dbaa84..cce3e92 100644
--- a/lib/rainbows/rev/thread.rb
+++ b/lib/rainbows/rev/thread.rb
@@ -13,28 +13,20 @@ module Rainbows
 
       def app_call
         KATO.delete(self)
-        disable
+        disable if enabled?
         @env[RACK_INPUT] = @input
         app_dispatch # must be implemented by subclass
       end
 
       # this is only called in the master thread
       def response_write(response)
-        enable
         alive = @hp.keepalive? && G.alive
         rev_write_response(response, alive)
-        return quit unless alive
+        return quit unless alive && :close != @state
 
         @env.clear
         @hp.reset
         @state = :headers
-        # keepalive requests are always body-less, so @input is unchanged
-        if @hp.headers(@env, @buf)
-          @input = HttpRequest::NULL_IO
-          app_call
-        else
-          KATO[self] = Time.now
-        end
       end
 
       # fails-safe application dispatch, we absolutely cannot
diff --git a/t/t0024-pipelined-sendfile-response.sh b/t/t0024-pipelined-sendfile-response.sh
new file mode 100755
index 0000000..2acc243
--- /dev/null
+++ b/t/t0024-pipelined-sendfile-response.sh
@@ -0,0 +1,67 @@
+#!/bin/sh
+. ./test-lib.sh
+
+case $model in
+EventMachine|NeverBlock)
+        t_info "skipping $T since it's not compatible with $model"
+        exit 0
+        ;;
+*) ;;
+esac
+
+t_plan 5 "pipelined sendfile response for $model"
+
+t_begin "setup and startup" && {
+        rtmpfiles err out
+        rainbows_setup $model
+        echo 'require "sendfile"' >> $unicorn_config
+        echo 'def (::IO).copy_stream(*x); abort "NO"; end' >> $unicorn_config
+
+        # can't load Rack::Lint here since it clobbers body#to_path
+        rainbows -E none -D large-file-response.ru -c $unicorn_config
+        rainbows_wait_start
+}
+
+t_begin "read random blob sha1" && {
+        random_blob_sha1=$(rsha1 < random_blob)
+}
+
+script='
+require "digest/sha1"
+require "kcar"
+$stdin.binmode
+expect = ENV["random_blob_sha1"]
+kcar = Kcar::Response.new($stdin, {})
+3.times do
+        nr = 0
+        status, headers, body = kcar.rack
+        dig = Digest::SHA1.new
+        body.each { |buf| dig << buf ; nr += buf.size }
+        sha1 = dig.hexdigest
+        sha1 == expect or abort "mismatch: sha1=#{sha1} != expect=#{expect}"
+        body.close
+end
+$stdout.syswrite("ok\n")
+'
+
+t_begin "pipeline 3 HTTP requests" && {
+        req='GET /random_blob HTTP/1.1\r\nHost: example.com\r\n'
+        req="$req"'\r\n'"$req"'\r\n'"$req"
+        req="$req"'Connection: close\r\n\r\n'
+        (
+                export random_blob_sha1
+                $RUBY -e "$script" < $fifo >> $ok &
+                printf "$req"
+                wait
+                echo ok >> $ok
+        ) | socat - TCP:$listen > $fifo
+        test 2 -eq $(grep '^ok$' $ok |wc -l)
+}
+
+t_begin "shutdown server" && {
+        kill -QUIT $rainbows_pid
+}
+
+t_begin "check stderr" && check_stderr
+
+t_done
diff --git a/t/test_isolate.rb b/t/test_isolate.rb
index f4b4b77..fb911af 100644
--- a/t/test_isolate.rb
+++ b/t/test_isolate.rb
@@ -16,6 +16,7 @@ $stdout.reopen($stderr)
 Isolate.now!(opts) do
   gem 'rack', '1.1.0' # Cramp currently requires ~> 1.1.0
   gem 'unicorn', '1.1.1'
+  gem 'kcar', '0.1.1'
 
   if engine == "ruby"
     gem 'sendfile', '1.0.0' # next Rubinius should support this