about summary refs log tree commit homepage
path: root/lib/rainbows/coolio/client.rb
diff options
context:
space:
mode:
Diffstat (limited to 'lib/rainbows/coolio/client.rb')
-rw-r--r--lib/rainbows/coolio/client.rb87
1 files changed, 50 insertions, 37 deletions
diff --git a/lib/rainbows/coolio/client.rb b/lib/rainbows/coolio/client.rb
index b5430f6..b4c7b5a 100644
--- a/lib/rainbows/coolio/client.rb
+++ b/lib/rainbows/coolio/client.rb
@@ -2,11 +2,8 @@
 # :enddoc:
 class Rainbows::Coolio::Client < Coolio::IO
   include Rainbows::EvCore
-  SF = Rainbows::StreamFile
   CONN = Rainbows::Coolio::CONN
   KATO = Rainbows::Coolio::KATO
-  ResponsePipe = Rainbows::Coolio::ResponsePipe
-  ResponseChunkPipe = Rainbows::Coolio::ResponseChunkPipe
 
   def initialize(io)
     CONN[self] = false
@@ -21,7 +18,7 @@ class Rainbows::Coolio::Client < Coolio::IO
 
   def quit
     super
-    close if @deferred.nil? && @_write_buffer.empty?
+    close if nil == @deferred && @_write_buffer.empty?
   end
 
   # override the Coolio::IO#write method try to write directly to the
@@ -58,15 +55,6 @@ class Rainbows::Coolio::Client < Coolio::IO
     close
   end
 
-  # queued, optional response bodies, it should only be unpollable "fast"
-  # devices where read(2) is uninterruptable.  Unfortunately, NFS and ilk
-  # are also part of this.  We'll also stick ResponsePipe bodies in
-  # here to prevent connections from being closed on us.
-  def defer_body(io)
-    @deferred = io
-    enable_write_watcher
-  end
-
   # allows enabling of write watcher even when read watcher is disabled
   def evloop
     LOOP # this constant is set in when a worker starts
@@ -79,15 +67,16 @@ class Rainbows::Coolio::Client < Coolio::IO
   end
 
   def timeout?
-    @deferred.nil? && @_write_buffer.empty? and close.nil?
+    nil == @deferred && @_write_buffer.empty? and close.nil?
   end
 
   # used for streaming sockets and pipes
   def stream_response_body(body, io, chunk)
     # we only want to attach to the Coolio::Loop belonging to the
     # main thread in Ruby 1.9
-    io = (chunk ? ResponseChunkPipe : ResponsePipe).new(io, self, body)
-    defer_body(io.attach(LOOP))
+    (chunk ? Rainbows::Coolio::ResponseChunkPipe :
+             Rainbows::Coolio::ResponsePipe).new(io, self, body).attach(LOOP)
+    @deferred = true
   end
 
   def coolio_write_response(response, alive)
@@ -98,15 +87,7 @@ class Rainbows::Coolio::Client < Coolio::IO
       st = io.stat
 
       if st.file?
-        if respond_to?(:sendfile_range) && r = sendfile_range(status, headers)
-          status, headers, range = r
-          write_headers(status, headers, alive)
-          defer_body(SF.new(range[0], range[1], io, body)) if range
-        else
-          write_headers(status, headers, alive)
-          defer_body(SF.new(0, st.size, io, body))
-        end
-        return
+        return defer_file(status, headers, body, alive, io, st)
       elsif st.socket? || st.pipe?
         chunk = stream_response_headers(status, headers, alive)
         return stream_response_body(body, io, chunk)
@@ -130,11 +111,11 @@ class Rainbows::Coolio::Client < Coolio::IO
 
   def on_write_complete
     case @deferred
-    when ResponsePipe then return
-    when NilClass # fall through
+    when true then return
+    when nil # fall through
     else
       begin
-        return rev_sendfile(@deferred)
+        return stream_file_chunk(@deferred)
       rescue EOFError # expected at file EOF
         close_deferred
       end
@@ -168,16 +149,13 @@ class Rainbows::Coolio::Client < Coolio::IO
   end
 
   def close_deferred
-    case @deferred
-    when ResponsePipe, NilClass
-    else
-      begin
-        @deferred.close
-      rescue => e
-        Rainbows.server.logger.error("closing #@deferred: #{e}")
-      end
-      @deferred = nil
+    @deferred.respond_to?(:close) or return
+    begin
+      @deferred.close
+    rescue => e
+      Rainbows.server.logger.error("closing #@deferred: #{e}")
     end
+    @deferred = nil
   end
 
   def on_close
@@ -185,4 +163,39 @@ class Rainbows::Coolio::Client < Coolio::IO
     CONN.delete(self)
     KATO.delete(self)
   end
+
+  if IO.method_defined?(:sendfile_nonblock)
+    def defer_file(status, headers, body, alive, io, st)
+      if r = sendfile_range(status, headers)
+        status, headers, range = r
+        write_headers(status, headers, alive)
+        range and defer_file_stream(range[0], range[1], io, body)
+      else
+        write_headers(status, headers, alive)
+        defer_file_stream(0, st.size, io, body)
+      end
+    end
+
+    def stream_file_chunk(sf) # +sf+ is a Rainbows::StreamFile object
+      sf.offset += (n = @_io.sendfile_nonblock(sf, sf.offset, sf.count))
+      0 == (sf.count -= n) and raise EOFError
+      enable_write_watcher
+      rescue Errno::EAGAIN
+        enable_write_watcher
+    end
+  else
+    def defer_file(status, headers, body, alive, io, st)
+      write_headers(status, headers, alive)
+      defer_file_stream(0, st.size, io, body)
+    end
+
+    def stream_file_chunk(body)
+      write(body.to_io.sysread(0x4000))
+    end
+  end
+
+  def defer_file_stream(offset, count, io, body)
+    @deferred = Rainbows::StreamFile.new(offset, count, io, body)
+    enable_write_watcher
+  end
 end