about summary refs log tree commit homepage
diff options
context:
space:
mode:
-rw-r--r--lib/rainbows.rb1
-rw-r--r--lib/rainbows/event_machine/client.rb27
-rw-r--r--lib/rainbows/event_machine/response_pipe.rb3
-rw-r--r--lib/rainbows/fiber/body.rb4
-rw-r--r--lib/rainbows/response/body.rb37
-rw-r--r--lib/rainbows/rev/client.rb1
-rw-r--r--lib/rainbows/rev/deferred_response.rb2
-rw-r--r--lib/rainbows/revactor/body.rb3
-rw-r--r--lib/rainbows/sync_close.rb37
-rw-r--r--lib/rainbows/writer_thread_pool.rb8
-rw-r--r--lib/rainbows/writer_thread_spawn.rb6
-rw-r--r--t/close-has-env.ru65
-rw-r--r--t/t0050-response-body-close-has-env.sh109
13 files changed, 268 insertions, 35 deletions
diff --git a/lib/rainbows.rb b/lib/rainbows.rb
index dd5a5b2..951c3e5 100644
--- a/lib/rainbows.rb
+++ b/lib/rainbows.rb
@@ -118,6 +118,7 @@ module Rainbows
   autoload :HttpResponse, 'rainbows/http_response' # deprecated
   autoload :ThreadTimeout, 'rainbows/thread_timeout'
   autoload :WorkerYield, 'rainbows/worker_yield'
+  autoload :SyncClose, 'rainbows/sync_close'
 end
 
 require 'rainbows/error'
diff --git a/lib/rainbows/event_machine/client.rb b/lib/rainbows/event_machine/client.rb
index fab1dbc..49552f3 100644
--- a/lib/rainbows/event_machine/client.rb
+++ b/lib/rainbows/event_machine/client.rb
@@ -16,11 +16,13 @@ class Rainbows::EventMachine::Client < EM::Connection
     # (often a static file), we do not attempt to process another
     # request on the same connection until the first is complete
     if @body
-      @buf << data
-      @_io.shutdown(Socket::SHUT_RD) if @buf.size > 0x1c000
-      EM.next_tick { receive_data('') }
+      if data
+        @buf << data
+        @_io.shutdown(Socket::SHUT_RD) if @buf.size > 0x1c000
+      end
+      EM.next_tick { receive_data(nil) } unless @buf.empty?
     else
-      on_read(data)
+      on_read(data || "") if (@buf.size > 0) || data
     end
   end
 
@@ -43,15 +45,16 @@ class Rainbows::EventMachine::Client < EM::Connection
     # long-running async response
     (response.nil? || -1 == response[0]) and return @state = :close
 
-    alive = @hp.next? && G.alive && G.kato > 0
-    em_write_response(response, alive)
-    if alive
+    if @hp.next? && G.alive && G.kato > 0
       @state = :headers
+      em_write_response(response, true)
       if @buf.empty?
         set_comm_inactivity_timeout(G.kato)
-      else
-        EM.next_tick { receive_data('') }
+      elsif @body.nil?
+        EM.next_tick { receive_data(nil) }
       end
+    else
+      em_write_response(response, false)
     end
   end
 
@@ -84,7 +87,7 @@ class Rainbows::EventMachine::Client < EM::Connection
         @body.callback do
           body.close if body.respond_to?(:close)
           @body = nil
-          alive ? receive_data('') : quit
+          alive ? receive_data(nil) : quit
         end
         return
       elsif st.socket? || st.pipe?
@@ -102,6 +105,10 @@ class Rainbows::EventMachine::Client < EM::Connection
     quit unless alive
   end
 
+  def next!
+    @hp.keepalive? ? receive_data(@body = nil) : quit
+  end
+
   def unbind
     async_close = @env[ASYNC_CLOSE] and async_close.succeed
     @body.respond_to?(:fail) and @body.fail
diff --git a/lib/rainbows/event_machine/response_pipe.rb b/lib/rainbows/event_machine/response_pipe.rb
index 2417dbe..3da2417 100644
--- a/lib/rainbows/event_machine/response_pipe.rb
+++ b/lib/rainbows/event_machine/response_pipe.rb
@@ -22,9 +22,8 @@ module Rainbows::EventMachine::ResponsePipe
   end
 
   def unbind
-    @client.body = nil
-    @alive ? @client.on_read('') : @client.quit
     @body.close if @body.respond_to?(:close)
+    @client.next!
     @io.close unless @io.closed?
   end
 end
diff --git a/lib/rainbows/fiber/body.rb b/lib/rainbows/fiber/body.rb
index 0fe2ec6..29926c6 100644
--- a/lib/rainbows/fiber/body.rb
+++ b/lib/rainbows/fiber/body.rb
@@ -13,7 +13,7 @@ module Rainbows::Fiber::Body # :nodoc:
   # the sendfile 1.0.0+ gem includes IO#sendfile_nonblock
   if IO.method_defined?(:sendfile_nonblock)
     def write_body_file(client, body, range)
-      sock, n = client.to_io, nil
+      sock, n, body = client.to_io, nil, body_to_io(body)
       offset, count = range ? range : [ 0, body.stat.size ]
       begin
         offset += (n = sock.sendfile_nonblock(body, offset, count))
@@ -23,6 +23,8 @@ module Rainbows::Fiber::Body # :nodoc:
       rescue EOFError
         break
       end while (count -= n) > 0
+      ensure
+        close_if_private(body)
     end
   else
     ALIASES[:write_body] = :write_body_each
diff --git a/lib/rainbows/response/body.rb b/lib/rainbows/response/body.rb
index 2535374..e80217d 100644
--- a/lib/rainbows/response/body.rb
+++ b/lib/rainbows/response/body.rb
@@ -32,8 +32,14 @@ module Rainbows::Response::Body # :nodoc:
 
   FD_MAP = Rainbows::FD_MAP
 
+  class F < File; end
+
+  def close_if_private(io)
+    io.close if F === io
+  end
+
   def io_for_fd(fd)
-    FD_MAP.delete(fd) || IO.new(fd)
+    FD_MAP.delete(fd) || F.for_fd(fd)
   end
 
   # to_io is not part of the Rack spec, but make an exception here
@@ -47,13 +53,16 @@ module Rainbows::Response::Body # :nodoc:
       # try to take advantage of Rainbows::DevFdResponse, calling File.open
       # is a last resort
       path = body.to_path
-      path =~ %r{\A/dev/fd/(\d+)\z} ? io_for_fd($1.to_i) : File.open(path)
+      path =~ %r{\A/dev/fd/(\d+)\z} ? io_for_fd($1.to_i) : F.open(path)
     end
   end
 
   if IO.method_defined?(:sendfile_nonblock)
     def write_body_file(sock, body, range)
-      range ? sock.sendfile(body, range[0], range[1]) : sock.sendfile(body, 0)
+      io = body_to_io(body)
+      range ? sock.sendfile(io, range[0], range[1]) : sock.sendfile(io, 0)
+      ensure
+        close_if_private(io)
     end
   end
 
@@ -70,8 +79,6 @@ module Rainbows::Response::Body # :nodoc:
     # pread() semantics
     def write_body_stream(sock, body, range)
       IO.copy_stream(body, sock)
-      ensure
-        body.respond_to?(:close) and body.close
     end
   else
     # fall back to body#each, which is a Rack standard
@@ -79,27 +86,19 @@ module Rainbows::Response::Body # :nodoc:
   end
 
   if method_defined?(:write_body_file)
-
     # middlewares/apps may return with a body that responds to +to_path+
     def write_body_path(sock, body, range)
-      inp = body_to_io(body)
-      if inp.stat.file?
-        begin
-          write_body_file(sock, inp, range)
-        ensure
-          inp.close if inp != body
-        end
-      else
-        write_body_stream(sock, inp, range)
-      end
+      stat = File.stat(body.to_path)
+      stat.file? ? write_body_file(sock, body, range) :
+                   write_body_stream(sock, body, range)
       ensure
-        body.respond_to?(:close) && inp != body and body.close
+        body.respond_to?(:close) and body.close
     end
   elsif method_defined?(:write_body_stream)
     def write_body_path(sock, body, range)
-      write_body_stream(sock, inp = body_to_io(body), range)
+      write_body_stream(sock, body, range)
       ensure
-        body.respond_to?(:close) && inp != body and body.close
+        body.respond_to?(:close) and body.close
     end
   end
 
diff --git a/lib/rainbows/rev/client.rb b/lib/rainbows/rev/client.rb
index 00df4d3..e0bccf0 100644
--- a/lib/rainbows/rev/client.rb
+++ b/lib/rainbows/rev/client.rb
@@ -74,6 +74,7 @@ class Rainbows::Rev::Client < Rev::IO
   end
 
   def next!
+    attached? or return
     @deferred = nil
     enable_write_watcher
   end
diff --git a/lib/rainbows/rev/deferred_response.rb b/lib/rainbows/rev/deferred_response.rb
index 146f505..4a92ee4 100644
--- a/lib/rainbows/rev/deferred_response.rb
+++ b/lib/rainbows/rev/deferred_response.rb
@@ -14,7 +14,7 @@ class Rainbows::Rev::DeferredResponse < Rev::IO
   end
 
   def on_close
-    @client.next! if @client.attached? # attached? is false if write fails
     @body.respond_to?(:close) and @body.close
+    @client.next!
   end
 end
diff --git a/lib/rainbows/revactor/body.rb b/lib/rainbows/revactor/body.rb
index ad2bc55..7bfb5de 100644
--- a/lib/rainbows/revactor/body.rb
+++ b/lib/rainbows/revactor/body.rb
@@ -8,6 +8,7 @@ module Rainbows::Revactor::Body
 
   if IO.method_defined?(:sendfile_nonblock)
     def write_body_file(client, body, range)
+      body = body_to_io(body)
       sock = client.instance_variable_get(:@_io)
       pfx = Revactor::TCP::Socket === client ? :tcp : :unix
       write_complete = T[:"#{pfx}_write_complete", client]
@@ -29,6 +30,8 @@ module Rainbows::Revactor::Body
       rescue EOFError
         break
       end while (count -= n) > 0
+      ensure
+        close_if_private(body)
     end
   else
     ALIASES[:write_body] = :write_body_each
diff --git a/lib/rainbows/sync_close.rb b/lib/rainbows/sync_close.rb
new file mode 100644
index 0000000..a336262
--- /dev/null
+++ b/lib/rainbows/sync_close.rb
@@ -0,0 +1,37 @@
+# -*- encoding: binary -*-
+# :enddoc:
+require 'thread'
+class Rainbows::SyncClose
+  def initialize(body)
+    @body = body
+    @mutex = Mutex.new
+    @cv = ConditionVariable.new
+    @mutex.synchronize do
+      yield self
+      @cv.wait(@mutex)
+    end
+  end
+
+  def respond_to?(m)
+    @body.respond_to?(m)
+  end
+
+  def to_path
+    @body.to_path
+  end
+
+  def each(&block)
+    @body.each(&block)
+  end
+
+  def to_io
+    @body.to_io
+  end
+
+  # called by the writer thread to wake up the original thread (in #initialize)
+  def close
+    @body.close
+    ensure
+      @mutex.synchronize { @cv.signal }
+  end
+end
diff --git a/lib/rainbows/writer_thread_pool.rb b/lib/rainbows/writer_thread_pool.rb
index 6896787..67c8e83 100644
--- a/lib/rainbows/writer_thread_pool.rb
+++ b/lib/rainbows/writer_thread_pool.rb
@@ -24,7 +24,13 @@ module Rainbows::WriterThreadPool
   @@q = nil
 
   def async_write_body(qclient, body, range)
-    qclient.q << [ qclient.to_io, :body, body, range ]
+    if body.respond_to?(:close)
+      Rainbows::SyncClose.new(body) do |body|
+        qclient.q << [ qclient.to_io, :body, body, range ]
+      end
+    else
+      qclient.q << [ qclient.to_io, :body, body, range ]
+    end
   end
 
   def process_client(client) # :nodoc:
diff --git a/lib/rainbows/writer_thread_spawn.rb b/lib/rainbows/writer_thread_spawn.rb
index 0e7d1a7..43e4f2c 100644
--- a/lib/rainbows/writer_thread_spawn.rb
+++ b/lib/rainbows/writer_thread_spawn.rb
@@ -23,7 +23,11 @@ module Rainbows::WriterThreadSpawn
   include Rainbows::Base
 
   def write_body(my_sock, body, range) # :nodoc:
-    my_sock.queue_body(body, range)
+    if body.respond_to?(:close)
+      Rainbows::SyncClose.new(body) { |body| my_sock.queue_body(body, range) }
+    else
+      my_sock.queue_body(body, range)
+    end
   end
 
   def process_client(client) # :nodoc:
diff --git a/t/close-has-env.ru b/t/close-has-env.ru
new file mode 100644
index 0000000..471f605
--- /dev/null
+++ b/t/close-has-env.ru
@@ -0,0 +1,65 @@
+#\ -E none
+use Rainbows::DevFdResponse
+class ClosablePipe < ::IO
+  attr_accessor :env
+
+  def self.new(env)
+    rv = popen "echo hello", "rb"
+    rv.env = env
+    rv
+  end
+
+  def close
+    super
+    $stdout.syswrite "path_info=#{@env['PATH_INFO']}\n"
+  end
+end
+
+class ClosableFile < ::File
+  attr_accessor :env
+  alias to_path path
+  def close
+    super
+    $stdout.syswrite "path_info=#{@env['PATH_INFO']}\n"
+  end
+end
+
+class Blob
+  def initialize(env)
+    @env = env
+  end
+
+  def each(&block)
+    yield "BLOB\n"
+  end
+
+  def close
+    $stdout.syswrite "path_info=#{@env['PATH_INFO']}\n"
+  end
+end
+
+run(lambda { |env|
+  case env["PATH_INFO"]
+  when %r{\A/pipe/}
+    [ 200,
+      [ %w(Content-Length 6), %w(Content-Type text/plain)],
+      ClosablePipe.new(env)
+    ]
+  when %r{\A/file/}
+    f = ClosableFile.open("env.ru", "rb")
+    f.env = env
+    [ 200, {
+      'X-Req-Path' => env["PATH_INFO"],
+      'Content-Length' => f.stat.size.to_s,
+      'Content-Type' => 'text/plain' },
+      f
+    ]
+  when %r{\A/blob/}
+    [ 200,
+      [%w(Content-Length 5), %w(Content-Type text/plain)],
+      Blob.new(env)
+    ]
+  else
+    [ 404, [%w(Content-Length 0), %w(Content-Type text/plain)], [] ]
+  end
+})
diff --git a/t/t0050-response-body-close-has-env.sh b/t/t0050-response-body-close-has-env.sh
new file mode 100644
index 0000000..4d0cd6f
--- /dev/null
+++ b/t/t0050-response-body-close-has-env.sh
@@ -0,0 +1,109 @@
+#!/bin/sh
+. ./test-lib.sh
+
+t_plan 29 "keepalive does not clear Rack env prematurely for $model"
+
+t_begin "setup and start" && {
+        rainbows_setup
+        rtmpfiles curl_out curl_err
+        echo "preload_app true" >> $unicorn_config
+        rainbows -D close-has-env.ru -c $unicorn_config
+        rainbows_wait_start
+}
+
+req_pipelined () {
+        pfx=$1
+        t_begin "make pipelined requests to trigger $pfx response body" && {
+                > $r_out
+                (
+                        cat $fifo > $tmp &
+                        printf 'GET /%s/1 HTTP/1.1\r\n' $pfx
+                        printf 'Host: example.com\r\n\r\n'
+                        printf 'GET /%s/2 HTTP/1.1\r\n' $pfx
+                        printf 'Host: example.com\r\n\r\n'
+                        printf 'GET /%s/3 HTTP/1.1\r\n' $pfx
+                        printf 'Host: example.com\r\n'
+                        printf 'Connection: close\r\n\r\n'
+                        wait
+                        echo ok > $ok
+                ) | socat - TCP4:$listen > $fifo
+                test xok = x$(cat $ok)
+        }
+}
+
+reload () {
+        t_begin 'reloading Rainbows! to ensure writeout' && {
+                # reload to ensure everything is flushed
+                kill -HUP $rainbows_pid
+                test xSTART = x"$(cat $fifo)"
+        }
+}
+
+check_log () {
+        pfx="$1"
+        t_begin "check body close messages" && {
+                < $r_out awk '
+/^path_info=\/'$pfx'\/[1-3]$/ { next }
+{ exit(2) }
+END { exit(NR == 3 ? 0 : 1) }
+'
+        }
+}
+
+req_keepalive () {
+        pfx="$1"
+        t_begin "make keepalive requests to trigger $pfx response body" && {
+                > $r_out
+                rm -f $curl_err $curl_out
+                curl -vsSf http://$listen/$pfx/[1-3] 2> $curl_err > $curl_out
+        }
+}
+
+req_keepalive file
+reload
+check_log file
+
+req_pipelined file
+reload
+check_log file
+
+req_keepalive blob
+reload
+check_log blob
+
+req_pipelined blob
+reload
+check_log blob
+
+req_keepalive pipe
+reload
+check_log pipe
+
+req_pipelined pipe
+reload
+check_log pipe
+
+t_begin "enable sendfile gem" && {
+        echo "require 'sendfile'" >> $unicorn_config
+        curl http://$listen/ >/dev/null # ensure worker is loaded before HUP
+}
+
+reload
+
+req_keepalive file
+reload
+check_log file
+
+req_pipelined file
+reload
+check_log file
+
+t_begin "killing succeeds" && {
+        kill $rainbows_pid
+}
+
+t_begin "check stderr" && {
+        check_stderr
+}
+
+t_done