about summary refs log tree commit homepage
path: root/lib/rainbows
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2010-12-27 20:25:39 -0800
committerEric Wong <normalperson@yhbt.net>2010-12-28 17:12:28 -0800
commit3495d59763e6159975debf32728dc53fc41c5ea1 (patch)
tree949415250af66075cc9ca98040a85ddc2ad84380 /lib/rainbows
parent53afe0b23fc67c5b25541cddbd68f905c649e756 (diff)
downloadrainbows-3495d59763e6159975debf32728dc53fc41c5ea1.tar.gz
Some middlewares require the Rack env to be preserved all
the way through to close, so we'll ensure all request models
preserve it.

We also need to better response body wrappers/proxies always get
fired properly when returning.  IO.copy_stream and "sendfile"
gem users could hit cases where wrappers did not fire properly.
Diffstat (limited to 'lib/rainbows')
-rw-r--r--lib/rainbows/event_machine/client.rb27
-rw-r--r--lib/rainbows/event_machine/response_pipe.rb3
-rw-r--r--lib/rainbows/fiber/body.rb4
-rw-r--r--lib/rainbows/response/body.rb37
-rw-r--r--lib/rainbows/rev/client.rb1
-rw-r--r--lib/rainbows/rev/deferred_response.rb2
-rw-r--r--lib/rainbows/revactor/body.rb3
-rw-r--r--lib/rainbows/sync_close.rb37
-rw-r--r--lib/rainbows/writer_thread_pool.rb8
-rw-r--r--lib/rainbows/writer_thread_spawn.rb6
10 files changed, 93 insertions, 35 deletions
diff --git a/lib/rainbows/event_machine/client.rb b/lib/rainbows/event_machine/client.rb
index fab1dbc..49552f3 100644
--- a/lib/rainbows/event_machine/client.rb
+++ b/lib/rainbows/event_machine/client.rb
@@ -16,11 +16,13 @@ class Rainbows::EventMachine::Client < EM::Connection
     # (often a static file), we do not attempt to process another
     # request on the same connection until the first is complete
     if @body
-      @buf << data
-      @_io.shutdown(Socket::SHUT_RD) if @buf.size > 0x1c000
-      EM.next_tick { receive_data('') }
+      if data
+        @buf << data
+        @_io.shutdown(Socket::SHUT_RD) if @buf.size > 0x1c000
+      end
+      EM.next_tick { receive_data(nil) } unless @buf.empty?
     else
-      on_read(data)
+      on_read(data || "") if (@buf.size > 0) || data
     end
   end
 
@@ -43,15 +45,16 @@ class Rainbows::EventMachine::Client < EM::Connection
     # long-running async response
     (response.nil? || -1 == response[0]) and return @state = :close
 
-    alive = @hp.next? && G.alive && G.kato > 0
-    em_write_response(response, alive)
-    if alive
+    if @hp.next? && G.alive && G.kato > 0
       @state = :headers
+      em_write_response(response, true)
       if @buf.empty?
         set_comm_inactivity_timeout(G.kato)
-      else
-        EM.next_tick { receive_data('') }
+      elsif @body.nil?
+        EM.next_tick { receive_data(nil) }
       end
+    else
+      em_write_response(response, false)
     end
   end
 
@@ -84,7 +87,7 @@ class Rainbows::EventMachine::Client < EM::Connection
         @body.callback do
           body.close if body.respond_to?(:close)
           @body = nil
-          alive ? receive_data('') : quit
+          alive ? receive_data(nil) : quit
         end
         return
       elsif st.socket? || st.pipe?
@@ -102,6 +105,10 @@ class Rainbows::EventMachine::Client < EM::Connection
     quit unless alive
   end
 
+  def next!
+    @hp.keepalive? ? receive_data(@body = nil) : quit
+  end
+
   def unbind
     async_close = @env[ASYNC_CLOSE] and async_close.succeed
     @body.respond_to?(:fail) and @body.fail
diff --git a/lib/rainbows/event_machine/response_pipe.rb b/lib/rainbows/event_machine/response_pipe.rb
index 2417dbe..3da2417 100644
--- a/lib/rainbows/event_machine/response_pipe.rb
+++ b/lib/rainbows/event_machine/response_pipe.rb
@@ -22,9 +22,8 @@ module Rainbows::EventMachine::ResponsePipe
   end
 
   def unbind
-    @client.body = nil
-    @alive ? @client.on_read('') : @client.quit
     @body.close if @body.respond_to?(:close)
+    @client.next!
     @io.close unless @io.closed?
   end
 end
diff --git a/lib/rainbows/fiber/body.rb b/lib/rainbows/fiber/body.rb
index 0fe2ec6..29926c6 100644
--- a/lib/rainbows/fiber/body.rb
+++ b/lib/rainbows/fiber/body.rb
@@ -13,7 +13,7 @@ module Rainbows::Fiber::Body # :nodoc:
   # the sendfile 1.0.0+ gem includes IO#sendfile_nonblock
   if IO.method_defined?(:sendfile_nonblock)
     def write_body_file(client, body, range)
-      sock, n = client.to_io, nil
+      sock, n, body = client.to_io, nil, body_to_io(body)
       offset, count = range ? range : [ 0, body.stat.size ]
       begin
         offset += (n = sock.sendfile_nonblock(body, offset, count))
@@ -23,6 +23,8 @@ module Rainbows::Fiber::Body # :nodoc:
       rescue EOFError
         break
       end while (count -= n) > 0
+      ensure
+        close_if_private(body)
     end
   else
     ALIASES[:write_body] = :write_body_each
diff --git a/lib/rainbows/response/body.rb b/lib/rainbows/response/body.rb
index 2535374..e80217d 100644
--- a/lib/rainbows/response/body.rb
+++ b/lib/rainbows/response/body.rb
@@ -32,8 +32,14 @@ module Rainbows::Response::Body # :nodoc:
 
   FD_MAP = Rainbows::FD_MAP
 
+  class F < File; end
+
+  def close_if_private(io)
+    io.close if F === io
+  end
+
   def io_for_fd(fd)
-    FD_MAP.delete(fd) || IO.new(fd)
+    FD_MAP.delete(fd) || F.for_fd(fd)
   end
 
   # to_io is not part of the Rack spec, but make an exception here
@@ -47,13 +53,16 @@ module Rainbows::Response::Body # :nodoc:
       # try to take advantage of Rainbows::DevFdResponse, calling File.open
       # is a last resort
       path = body.to_path
-      path =~ %r{\A/dev/fd/(\d+)\z} ? io_for_fd($1.to_i) : File.open(path)
+      path =~ %r{\A/dev/fd/(\d+)\z} ? io_for_fd($1.to_i) : F.open(path)
     end
   end
 
   if IO.method_defined?(:sendfile_nonblock)
     def write_body_file(sock, body, range)
-      range ? sock.sendfile(body, range[0], range[1]) : sock.sendfile(body, 0)
+      io = body_to_io(body)
+      range ? sock.sendfile(io, range[0], range[1]) : sock.sendfile(io, 0)
+      ensure
+        close_if_private(io)
     end
   end
 
@@ -70,8 +79,6 @@ module Rainbows::Response::Body # :nodoc:
     # pread() semantics
     def write_body_stream(sock, body, range)
       IO.copy_stream(body, sock)
-      ensure
-        body.respond_to?(:close) and body.close
     end
   else
     # fall back to body#each, which is a Rack standard
@@ -79,27 +86,19 @@ module Rainbows::Response::Body # :nodoc:
   end
 
   if method_defined?(:write_body_file)
-
     # middlewares/apps may return with a body that responds to +to_path+
     def write_body_path(sock, body, range)
-      inp = body_to_io(body)
-      if inp.stat.file?
-        begin
-          write_body_file(sock, inp, range)
-        ensure
-          inp.close if inp != body
-        end
-      else
-        write_body_stream(sock, inp, range)
-      end
+      stat = File.stat(body.to_path)
+      stat.file? ? write_body_file(sock, body, range) :
+                   write_body_stream(sock, body, range)
       ensure
-        body.respond_to?(:close) && inp != body and body.close
+        body.respond_to?(:close) and body.close
     end
   elsif method_defined?(:write_body_stream)
     def write_body_path(sock, body, range)
-      write_body_stream(sock, inp = body_to_io(body), range)
+      write_body_stream(sock, body, range)
       ensure
-        body.respond_to?(:close) && inp != body and body.close
+        body.respond_to?(:close) and body.close
     end
   end
 
diff --git a/lib/rainbows/rev/client.rb b/lib/rainbows/rev/client.rb
index 00df4d3..e0bccf0 100644
--- a/lib/rainbows/rev/client.rb
+++ b/lib/rainbows/rev/client.rb
@@ -74,6 +74,7 @@ class Rainbows::Rev::Client < Rev::IO
   end
 
   def next!
+    attached? or return
     @deferred = nil
     enable_write_watcher
   end
diff --git a/lib/rainbows/rev/deferred_response.rb b/lib/rainbows/rev/deferred_response.rb
index 146f505..4a92ee4 100644
--- a/lib/rainbows/rev/deferred_response.rb
+++ b/lib/rainbows/rev/deferred_response.rb
@@ -14,7 +14,7 @@ class Rainbows::Rev::DeferredResponse < Rev::IO
   end
 
   def on_close
-    @client.next! if @client.attached? # attached? is false if write fails
     @body.respond_to?(:close) and @body.close
+    @client.next!
   end
 end
diff --git a/lib/rainbows/revactor/body.rb b/lib/rainbows/revactor/body.rb
index ad2bc55..7bfb5de 100644
--- a/lib/rainbows/revactor/body.rb
+++ b/lib/rainbows/revactor/body.rb
@@ -8,6 +8,7 @@ module Rainbows::Revactor::Body
 
   if IO.method_defined?(:sendfile_nonblock)
     def write_body_file(client, body, range)
+      body = body_to_io(body)
       sock = client.instance_variable_get(:@_io)
       pfx = Revactor::TCP::Socket === client ? :tcp : :unix
       write_complete = T[:"#{pfx}_write_complete", client]
@@ -29,6 +30,8 @@ module Rainbows::Revactor::Body
       rescue EOFError
         break
       end while (count -= n) > 0
+      ensure
+        close_if_private(body)
     end
   else
     ALIASES[:write_body] = :write_body_each
diff --git a/lib/rainbows/sync_close.rb b/lib/rainbows/sync_close.rb
new file mode 100644
index 0000000..a336262
--- /dev/null
+++ b/lib/rainbows/sync_close.rb
@@ -0,0 +1,37 @@
+# -*- encoding: binary -*-
+# :enddoc:
+require 'thread'
+class Rainbows::SyncClose
+  def initialize(body)
+    @body = body
+    @mutex = Mutex.new
+    @cv = ConditionVariable.new
+    @mutex.synchronize do
+      yield self
+      @cv.wait(@mutex)
+    end
+  end
+
+  def respond_to?(m)
+    @body.respond_to?(m)
+  end
+
+  def to_path
+    @body.to_path
+  end
+
+  def each(&block)
+    @body.each(&block)
+  end
+
+  def to_io
+    @body.to_io
+  end
+
+  # called by the writer thread to wake up the original thread (in #initialize)
+  def close
+    @body.close
+    ensure
+      @mutex.synchronize { @cv.signal }
+  end
+end
diff --git a/lib/rainbows/writer_thread_pool.rb b/lib/rainbows/writer_thread_pool.rb
index 6896787..67c8e83 100644
--- a/lib/rainbows/writer_thread_pool.rb
+++ b/lib/rainbows/writer_thread_pool.rb
@@ -24,7 +24,13 @@ module Rainbows::WriterThreadPool
   @@q = nil
 
   def async_write_body(qclient, body, range)
-    qclient.q << [ qclient.to_io, :body, body, range ]
+    if body.respond_to?(:close)
+      Rainbows::SyncClose.new(body) do |body|
+        qclient.q << [ qclient.to_io, :body, body, range ]
+      end
+    else
+      qclient.q << [ qclient.to_io, :body, body, range ]
+    end
   end
 
   def process_client(client) # :nodoc:
diff --git a/lib/rainbows/writer_thread_spawn.rb b/lib/rainbows/writer_thread_spawn.rb
index 0e7d1a7..43e4f2c 100644
--- a/lib/rainbows/writer_thread_spawn.rb
+++ b/lib/rainbows/writer_thread_spawn.rb
@@ -23,7 +23,11 @@ module Rainbows::WriterThreadSpawn
   include Rainbows::Base
 
   def write_body(my_sock, body, range) # :nodoc:
-    my_sock.queue_body(body, range)
+    if body.respond_to?(:close)
+      Rainbows::SyncClose.new(body) { |body| my_sock.queue_body(body, range) }
+    else
+      my_sock.queue_body(body, range)
+    end
   end
 
   def process_client(client) # :nodoc: