about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2010-07-04 22:16:52 +0000
committerEric Wong <normalperson@yhbt.net>2010-07-04 22:34:09 +0000
commit39b178cdebe275cbc8ce19cf269bea7cd15ff4ca (patch)
treeb7628ed278895fcf70ea3206956be586ac9e1ac5
parent75f5aa9a0d6b37a94afbea3121fc2c16e70a2b1d (diff)
downloadrainbows-39b178cdebe275cbc8ce19cf269bea7cd15ff4ca.tar.gz
This hopefully allows the "sendfile" gem to be required
anywhere in the Rainbows!/Unicorn config file, and not
have to be required via RUBYOPT or the '-r' command-line
switch.

We also modularize HttpResponse and avoids singleton methods
in the response path.  This (hopefully) makes it easier for
individual concurrency models to share code and override
individual methods.
-rw-r--r--lib/rainbows.rb16
-rw-r--r--lib/rainbows/base.rb67
-rw-r--r--lib/rainbows/event_machine.rb21
-rw-r--r--lib/rainbows/fiber/base.rb32
-rw-r--r--lib/rainbows/fiber/body.rb36
-rw-r--r--lib/rainbows/fiber/rev.rb3
-rw-r--r--lib/rainbows/fiber_pool.rb2
-rw-r--r--lib/rainbows/fiber_spawn.rb2
-rw-r--r--lib/rainbows/http_response.rb21
-rw-r--r--lib/rainbows/http_response/body.rb118
-rw-r--r--lib/rainbows/rev/client.rb39
-rw-r--r--lib/rainbows/rev/core.rb1
-rw-r--r--lib/rainbows/rev/deferred_response.rb38
-rw-r--r--lib/rainbows/rev/thread.rb2
-rw-r--r--lib/rainbows/rev_fiber_spawn.rb2
-rw-r--r--lib/rainbows/revactor.rb3
-rw-r--r--lib/rainbows/sendfile.rb25
-rw-r--r--lib/rainbows/writer_thread_pool.rb12
-rw-r--r--lib/rainbows/writer_thread_spawn.rb9
-rwxr-xr-xt/t0020-large-sendfile-response.sh6
20 files changed, 265 insertions, 190 deletions
diff --git a/lib/rainbows.rb b/lib/rainbows.rb
index 9e5b8a9..906806f 100644
--- a/lib/rainbows.rb
+++ b/lib/rainbows.rb
@@ -122,20 +122,4 @@ module Rainbows
   end
   # :startdoc:
   autoload :Fiber, 'rainbows/fiber' # core class
-
-  # to_io is not part of the Rack spec, but make an exception here
-  # since we can conserve path lookups and file descriptors
-  # \Rainbows! will never get here without checking for the existence
-  # of body.to_path first.
-  def self.body_to_io(body)
-    if body.respond_to?(:to_io)
-      body.to_io
-    else
-      # try to take advantage of Rainbows::DevFdResponse, calling File.open
-      # is a last resort
-      path = body.to_path
-      path =~ %r{\A/dev/fd/(\d+)\z} ? IO.new($1.to_i) : File.open(path, 'rb')
-    end
-  end
-
 end
diff --git a/lib/rainbows/base.rb b/lib/rainbows/base.rb
index 24924cb..cd719d2 100644
--- a/lib/rainbows/base.rb
+++ b/lib/rainbows/base.rb
@@ -10,17 +10,18 @@ module Rainbows::Base
 
   # :stopdoc:
   include Rainbows::Const
+  include Rainbows::HttpResponse
 
   # shortcuts...
   G = Rainbows::G
   NULL_IO = Unicorn::HttpRequest::NULL_IO
   TeeInput = Rainbows::TeeInput
-  HttpResponse = Rainbows::HttpResponse
   HttpParser = Unicorn::HttpParser
 
   # this method is called by all current concurrency models
   def init_worker_process(worker)
     super(worker)
+    Rainbows::HttpResponse.setup(self.class)
     Rainbows::MaxBody.setup
     G.tmp = worker.tmp
 
@@ -39,57 +40,6 @@ module Rainbows::Base
     logger.info "Rainbows! #@use worker_connections=#@worker_connections"
   end
 
-  # TODO: move write_body_* stuff out of Base
-  def write_body_each(client, body)
-    body.each { |chunk| client.write(chunk) }
-    ensure
-      body.respond_to?(:close) and body.close
-  end
-
-  # The sendfile 1.0.0 RubyGem includes IO#sendfile and
-  # IO#sendfile_nonblock, previous versions didn't have
-  # IO#sendfile_nonblock, and IO#sendfile in previous versions
-  # could other threads under 1.8 with large files
-  #
-  # IO#sendfile currently (June 2010) beats 1.9 IO.copy_stream with
-  # non-Linux support and large files on 32-bit.  We still fall back to
-  # IO.copy_stream (if available) if we're dealing with DevFdResponse
-  # objects, though.
-  if IO.method_defined?(:sendfile_nonblock)
-    def write_body_path(client, body)
-      file = Rainbows.body_to_io(body)
-      file.stat.file? ? client.sendfile(file, 0) :
-                        write_body_stream(client, file)
-    end
-  end
-
-  if IO.respond_to?(:copy_stream)
-    unless method_defined?(:write_body_path)
-      def write_body_path(client, body)
-        IO.copy_stream(Rainbows.body_to_io(body), client)
-      end
-    end
-
-    def write_body_stream(client, body)
-      IO.copy_stream(body, client)
-    end
-  else
-    alias write_body_stream write_body_each
-  end
-
-  if method_defined?(:write_body_path)
-    def write_body(client, body)
-      body.respond_to?(:to_path) ?
-        write_body_path(client, body) :
-        write_body_each(client, body)
-    end
-  else
-    alias write_body write_body_each
-  end
-
-  module_function :write_body, :write_body_each, :write_body_stream
-  method_defined?(:write_body_path) and module_function(:write_body_path)
-
   def wait_headers_readable(client)
     IO.select([client], nil, nil, G.kato)
   end
@@ -115,20 +65,17 @@ module Rainbows::Base
       env[RACK_INPUT] = 0 == hp.content_length ?
                         NULL_IO : TeeInput.new(client, env, hp, buf)
       env[REMOTE_ADDR] = remote_addr
-      status, headers, body = app.call(env.update(RACK_DEFAULTS))
+      response = app.call(env.update(RACK_DEFAULTS))
 
-      if 100 == status.to_i
+      if 100 == response[0].to_i
         client.write(EXPECT_100_RESPONSE)
         env.delete(HTTP_EXPECT)
-        status, headers, body = app.call(env)
+        response = app.call(env)
       end
 
       alive = hp.keepalive? && G.alive
-      if hp.headers?
-        out = [ alive ? CONN_ALIVE : CONN_CLOSE ]
-        client.write(HttpResponse.header_string(status, headers, out))
-      end
-      write_body(client, body)
+      out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if hp.headers?
+      write_response(client, response, out)
     end while alive and hp.reset.nil? and env.clear
   # if we get any error, try to write something back to the client
   # assuming we haven't closed the socket, but don't get hung up
diff --git a/lib/rainbows/event_machine.rb b/lib/rainbows/event_machine.rb
index 6ba536b..0ad604e 100644
--- a/lib/rainbows/event_machine.rb
+++ b/lib/rainbows/event_machine.rb
@@ -50,6 +50,7 @@ module Rainbows
 
     class Client < EM::Connection
       include Rainbows::EvCore
+      include Rainbows::HttpResponse
       G = Rainbows::G
 
       def initialize(io)
@@ -103,23 +104,23 @@ module Rainbows
         if body.respond_to?(:errback) && body.respond_to?(:callback)
           body.callback { quit }
           body.errback { quit }
-          HttpResponse.write(self, response, out)
+          write_header(self, response, out)
+          write_body_each(self, body)
           return
         elsif ! body.respond_to?(:to_path)
-          HttpResponse.write(self, response, out)
+          write_response(self, response, out)
           quit unless alive
           return
         end
 
         headers = Rack::Utils::HeaderHash.new(response[1])
-        io = Rainbows.body_to_io(body)
+        io = body_to_io(body)
         st = io.stat
 
         if st.file?
           headers.delete('Transfer-Encoding')
           headers['Content-Length'] ||= st.size.to_s
-          response = [ response[0], headers, [] ]
-          HttpResponse.write(self, response, out)
+          write_header(self, [ response[0], headers ], out)
           stream = stream_file_data(body.to_path)
           stream.callback { quit } unless alive
         elsif st.socket? || st.pipe?
@@ -130,15 +131,14 @@ module Rainbows
           else
             out[0] = CONN_CLOSE
           end
-          response = [ response[0], headers, [] ]
-          HttpResponse.write(self, response, out)
+          write_header(self, [ response[0], headers ], out)
           if do_chunk
             EM.watch(io, ResponseChunkPipe, self).notify_readable = true
           else
             EM.enable_proxy(EM.attach(io, ResponsePipe, self), self, 16384)
           end
         else
-          HttpResponse.write(self, response, out)
+          write_response(self, response, out)
         end
       end
 
@@ -226,6 +226,11 @@ module Rainbows
       end
     end
 
+    def init_worker_process(worker)
+      Rainbows::HttpResponse.setup(Rainbows::EventMachine::Client)
+      super
+    end
+
     # runs inside each forked worker, this sits around and waits
     # for connections and doesn't die until the parent dies (or is
     # given a INT, QUIT, or TERM signal)
diff --git a/lib/rainbows/fiber/base.rb b/lib/rainbows/fiber/base.rb
index 7e39441..9ac3b72 100644
--- a/lib/rainbows/fiber/base.rb
+++ b/lib/rainbows/fiber/base.rb
@@ -72,33 +72,6 @@ module Rainbows
         max.nil? || max > (now + 1) ? 1 : max - now
       end
 
-      # TODO: IO.splice under Linux
-      alias write_body_stream write_body_each
-
-      # the sendfile 1.0.0+ gem includes IO#sendfile_nonblock
-      if ::IO.method_defined?(:sendfile_nonblock)
-        def write_body_path(client, body)
-          file = Rainbows.body_to_io(body)
-          if file.stat.file?
-            sock, off = client.to_io, 0
-            begin
-              off += sock.sendfile_nonblock(file, off, 0x10000)
-            rescue Errno::EAGAIN
-              client.wait_writable
-            rescue EOFError
-              break
-            rescue => e
-              Rainbows::Error.app(e)
-              break
-            end while true
-          else
-            write_body_stream(client, body)
-          end
-        end
-      else
-        alias write_body write_body_each
-      end
-
       def wait_headers_readable(client)
         io = client.to_io
         expire = nil
@@ -120,6 +93,11 @@ module Rainbows
         ZZ.delete(client.f)
       end
 
+      def self.setup(klass, app)
+        require 'rainbows/fiber/body'
+        klass.__send__(:include, Rainbows::Fiber::Body)
+        self.const_set(:APP, app)
+      end
     end
   end
 end
diff --git a/lib/rainbows/fiber/body.rb b/lib/rainbows/fiber/body.rb
new file mode 100644
index 0000000..cd6c55c
--- /dev/null
+++ b/lib/rainbows/fiber/body.rb
@@ -0,0 +1,36 @@
+# -*- encoding: binary -*-
+# non-portable body handling for Fiber-based concurrency goes here
+# this module is required and included in worker processes only
+# this is meant to be included _after_ Rainbows::HttpResponse::Body
+module Rainbows::Fiber::Body # :nodoc:
+
+  # TODO non-blocking splice(2) under Linux
+  ALIASES = {
+    :write_body_stream => :write_body_each
+  }
+
+  # the sendfile 1.0.0+ gem includes IO#sendfile_nonblock
+  if ::IO.method_defined?(:sendfile_nonblock)
+    def write_body_file(client, body)
+      sock, off = client.to_io, 0
+      begin
+        off += sock.sendfile_nonblock(body, off, 0x10000)
+      rescue Errno::EAGAIN
+        client.wait_writable
+      rescue EOFError
+        break
+      rescue => e
+        Rainbows::Error.app(e)
+        break
+      end while true
+    end
+  else
+    ALIASES[:write_body] = :write_body_each
+  end
+
+  def self.included(klass)
+    ALIASES.each do |new_method, orig_method|
+      klass.__send__(:alias_method, new_method, orig_method)
+    end
+  end
+end
diff --git a/lib/rainbows/fiber/rev.rb b/lib/rainbows/fiber/rev.rb
index b8ec56b..2e8f076 100644
--- a/lib/rainbows/fiber/rev.rb
+++ b/lib/rainbows/fiber/rev.rb
@@ -52,6 +52,7 @@ module Rainbows::Fiber
       include Unicorn
       include Rainbows
       include Rainbows::Const
+      include Rainbows::HttpResponse
       FIO = Rainbows::Fiber::IO
 
       def to_io
@@ -99,7 +100,7 @@ module Rainbows::Fiber
 
           alive = hp.keepalive? && G.alive
           out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if hp.headers?
-          HttpResponse.write(client, response, out)
+          write_response(client, response, out)
         end while alive and hp.reset.nil? and env.clear
       rescue => e
         Error.write(io, e)
diff --git a/lib/rainbows/fiber_pool.rb b/lib/rainbows/fiber_pool.rb
index 2a1c5f7..745e2a5 100644
--- a/lib/rainbows/fiber_pool.rb
+++ b/lib/rainbows/fiber_pool.rb
@@ -24,7 +24,7 @@ module Rainbows
           process_client(::Fiber.yield) while pool << ::Fiber.current
         }.resume # resume to hit ::Fiber.yield so it waits on a client
       }
-      Fiber::Base.const_set(:APP, app)
+      Fiber::Base.setup(self.class, app)
 
       begin
         schedule do |l|
diff --git a/lib/rainbows/fiber_spawn.rb b/lib/rainbows/fiber_spawn.rb
index 6104a7b..40971e7 100644
--- a/lib/rainbows/fiber_spawn.rb
+++ b/lib/rainbows/fiber_spawn.rb
@@ -15,7 +15,7 @@ module Rainbows
 
     def worker_loop(worker)
       init_worker_process(worker)
-      Fiber::Base.const_set(:APP, app)
+      Fiber::Base.setup(self.class, app)
       limit = worker_connections
       fio = Rainbows::Fiber::IO
 
diff --git a/lib/rainbows/http_response.rb b/lib/rainbows/http_response.rb
index 811a793..677b5a7 100644
--- a/lib/rainbows/http_response.rb
+++ b/lib/rainbows/http_response.rb
@@ -6,7 +6,8 @@ module Rainbows::HttpResponse
 
   CODES = Unicorn::HttpResponse::CODES
 
-  def self.header_string(status, headers, out)
+  def response_header(response, out)
+    status, headers = response
     status = CODES[status.to_i] || status
 
     headers.each do |key, value|
@@ -25,13 +26,19 @@ module Rainbows::HttpResponse
     "#{out.join('')}\r\n"
   end
 
-  def self.write(socket, rack_response, out = [])
-    status, headers, body = rack_response
-    out and socket.write(header_string(status, headers, out))
+  def write_header(socket, response, out)
+    out and socket.write(response_header(response, out))
+  end
+
+  def write_response(socket, response, out)
+    write_header(socket, response, out)
+    write_body(socket, response[2])
+  end
 
-    body.each { |chunk| socket.write(chunk) }
-    ensure
-      body.respond_to?(:close) and body.close
+  # called after forking
+  def self.setup(klass)
+    require('rainbows/http_response/body') and
+      klass.__send__(:include, Rainbows::HttpResponse::Body)
   end
 end
 # :startdoc:
diff --git a/lib/rainbows/http_response/body.rb b/lib/rainbows/http_response/body.rb
new file mode 100644
index 0000000..2ce09da
--- /dev/null
+++ b/lib/rainbows/http_response/body.rb
@@ -0,0 +1,118 @@
+# -*- encoding: binary -*-
+# non-portable body response stuff goes here
+#
+# The sendfile 1.0.0 RubyGem includes IO#sendfile and
+# IO#sendfile_nonblock.   Previous versions of "sendfile" didn't have
+# IO#sendfile_nonblock, and IO#sendfile in previous versions could
+# block other threads under 1.8 with large files
+#
+# IO#sendfile currently (June 2010) beats 1.9 IO.copy_stream with
+# non-Linux support and large files on 32-bit.  We still fall back to
+# IO.copy_stream (if available) if we're dealing with DevFdResponse
+# objects, though.
+#
+# Linux-only splice(2) support via the "io_splice" gem will eventually
+# be added for streaming sockets/pipes, too.
+#
+# * write_body_file - regular files (sendfile or pread+write)
+# * write_body_stream - socket/pipes (read+write, splice later)
+# * write_body_each - generic fallback
+#
+# callgraph is as follows:
+#
+#         write_body
+#         `- write_body_each
+#         `- write_body_path
+#            `- write_body_file
+#            `- write_body_stream
+#
+module Rainbows::HttpResponse::Body # :nodoc:
+  ALIASES = {}
+
+  # to_io is not part of the Rack spec, but make an exception here
+  # since we can conserve path lookups and file descriptors.
+  # \Rainbows! will never get here without checking for the existence
+  # of body.to_path first.
+  def body_to_io(body)
+    if body.respond_to?(:to_io)
+      body.to_io
+    else
+      # try to take advantage of Rainbows::DevFdResponse, calling File.open
+      # is a last resort
+      path = body.to_path
+      path =~ %r{\A/dev/fd/(\d+)\z} ? IO.new($1.to_i) : File.open(path, 'rb')
+    end
+  end
+
+  if IO.method_defined?(:sendfile_nonblock)
+    def write_body_file(sock, body)
+      sock.sendfile(body, 0)
+    end
+  end
+
+  if IO.respond_to?(:copy_stream)
+    unless method_defined?(:write_body_file)
+      # try to use sendfile() via IO.copy_stream, otherwise pread()+write()
+      def write_body_file(sock, body)
+        IO.copy_stream(body, sock, nil, 0)
+      end
+    end
+
+    # only used when body is a pipe or socket that can't handle
+    # pread() semantics
+    def write_body_stream(sock, body)
+      IO.copy_stream(body, sock)
+      ensure
+        body.respond_to?(:close) and body.close
+    end
+  else
+    # fall back to body#each, which is a Rack standard
+    ALIASES[:write_body_stream] = :write_body_each
+  end
+
+  if method_defined?(:write_body_file)
+
+    # middlewares/apps may return with a body that responds to +to_path+
+    def write_body_path(sock, body)
+      inp = body_to_io(body)
+      if inp.stat.file?
+        begin
+          write_body_file(sock, inp)
+        ensure
+          inp.close if inp != body
+        end
+      else
+        write_body_stream(sock, inp)
+      end
+      ensure
+        body.respond_to?(:close) && inp != body and body.close
+    end
+  else
+    def write_body_path(sock, body)
+      write_body_stream(sock, body_to_io(body))
+    end
+  end
+
+  if method_defined?(:write_body_path)
+    def write_body(client, body)
+      body.respond_to?(:to_path) ?
+        write_body_path(client, body) :
+        write_body_each(client, body)
+    end
+  else
+    ALIASES[:write_body] = :write_body_each
+  end
+
+  # generic body writer, used for most dynamically generated responses
+  def write_body_each(socket, body)
+    body.each { |chunk| socket.write(chunk) }
+    ensure
+      body.respond_to?(:close) and body.close
+  end
+
+  def self.included(klass)
+    ALIASES.each do |new_method, orig_method|
+      klass.__send__(:alias_method, new_method, orig_method)
+    end
+  end
+end
diff --git a/lib/rainbows/rev/client.rb b/lib/rainbows/rev/client.rb
index 8d3a9c9..ababe50 100644
--- a/lib/rainbows/rev/client.rb
+++ b/lib/rainbows/rev/client.rb
@@ -5,7 +5,9 @@ module Rainbows
 
     class Client < ::Rev::IO
       include Rainbows::EvCore
+      include Rainbows::HttpResponse
       G = Rainbows::G
+      HH = Rack::Utils::HeaderHash
 
       def initialize(io)
         CONN[self] = false
@@ -56,6 +58,41 @@ module Rainbows
         @_write_buffer.empty? && @deferred_bodies.empty? and close.nil?
       end
 
+      def rev_write_response(response, out)
+        status, headers, body = response
+
+        body.respond_to?(:to_path) or
+          return write_response(self, response, out)
+
+        headers = HH.new(headers)
+        io = body_to_io(body)
+        st = io.stat
+
+        if st.socket? || st.pipe?
+          do_chunk = !!(headers['Transfer-Encoding'] =~ %r{\Achunked\z}i)
+          do_chunk = false if headers.delete('X-Rainbows-Autochunk') == 'no'
+          # too tricky to support keepalive/pipelining when a response can
+          # take an indeterminate amount of time here.
+          if out.nil?
+            do_chunk = false
+          else
+            out[0] = CONN_CLOSE
+          end
+
+          # we only want to attach to the Rev::Loop belonging to the
+          # main thread in Ruby 1.9
+          io = DeferredResponse.new(io, self, do_chunk, body).
+                                    attach(Server::LOOP)
+        elsif st.file?
+          headers.delete('Transfer-Encoding')
+          headers['Content-Length'] ||= st.size.to_s
+        else # char/block device, directory, whatever... nobody cares
+          return write_response(self, response, out)
+        end
+        defer_body(io, out)
+        write_header(self, response, out)
+      end
+
       def app_call
         begin
           KATO.delete(self)
@@ -65,7 +102,7 @@ module Rainbows
           alive = @hp.keepalive? && G.alive
           out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if @hp.headers?
 
-          DeferredResponse.write(self, response, out)
+          rev_write_response(response, out)
           if alive
             @env.clear
             @hp.reset
diff --git a/lib/rainbows/rev/core.rb b/lib/rainbows/rev/core.rb
index 122d8f4..7457f12 100644
--- a/lib/rainbows/rev/core.rb
+++ b/lib/rainbows/rev/core.rb
@@ -22,6 +22,7 @@ module Rainbows
       # for connections and doesn't die until the parent dies (or is
       # given a INT, QUIT, or TERM signal)
       def worker_loop(worker)
+        Rainbows::HttpResponse.setup(Rainbows::Rev::Client)
         init_worker_process(worker)
         mod = self.class.const_get(@use)
         rloop = Server.const_set(:LOOP, ::Rev::Loop.default)
diff --git a/lib/rainbows/rev/deferred_response.rb b/lib/rainbows/rev/deferred_response.rb
index 63af6b4..f710b5b 100644
--- a/lib/rainbows/rev/deferred_response.rb
+++ b/lib/rainbows/rev/deferred_response.rb
@@ -6,44 +6,6 @@ module Rainbows
     # or proxying IO-derived objects
     class DeferredResponse < ::Rev::IO
       include Rainbows::Const
-      G = Rainbows::G
-      HH = Rack::Utils::HeaderHash
-
-      def self.write(client, response, out)
-        status, headers, body = response
-
-        body.respond_to?(:to_path) or
-            return HttpResponse.write(client, response, out)
-
-        headers = HH.new(headers)
-        io = Rainbows.body_to_io(body)
-        st = io.stat
-
-        if st.socket? || st.pipe?
-          do_chunk = !!(headers['Transfer-Encoding'] =~ %r{\Achunked\z}i)
-          do_chunk = false if headers.delete('X-Rainbows-Autochunk') == 'no'
-          # too tricky to support keepalive/pipelining when a response can
-          # take an indeterminate amount of time here.
-          if out.nil?
-            do_chunk = false
-          else
-            out[0] = CONN_CLOSE
-          end
-
-          # we only want to attach to the Rev::Loop belonging to the
-          # main thread in Ruby 1.9
-          io = new(io, client, do_chunk, body).attach(Server::LOOP)
-        elsif st.file?
-          headers.delete('Transfer-Encoding')
-          headers['Content-Length'] ||= st.size.to_s
-        else # char/block device, directory, whatever... nobody cares
-          return HttpResponse.write(client, response, out)
-        end
-        client.defer_body(io, out)
-        out.nil? or
-          client.write(HttpResponse.header_string(status, headers, out))
-      end
-
       def initialize(io, client, do_chunk, body)
         super(io)
         @client, @do_chunk, @body = client, do_chunk, body
diff --git a/lib/rainbows/rev/thread.rb b/lib/rainbows/rev/thread.rb
index 387740c..ba80bb1 100644
--- a/lib/rainbows/rev/thread.rb
+++ b/lib/rainbows/rev/thread.rb
@@ -22,7 +22,7 @@ module Rainbows
         enable
         alive = @hp.keepalive? && G.alive
         out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if @hp.headers?
-        DeferredResponse.write(self, response, out)
+        rev_write_response(response, out)
         return quit unless alive && G.alive
 
         @env.clear
diff --git a/lib/rainbows/rev_fiber_spawn.rb b/lib/rainbows/rev_fiber_spawn.rb
index afaf82a..4d64e39 100644
--- a/lib/rainbows/rev_fiber_spawn.rb
+++ b/lib/rainbows/rev_fiber_spawn.rb
@@ -16,8 +16,10 @@ module Rainbows
     include Fiber::Rev
 
     def worker_loop(worker)
+      Rainbows::HttpResponse.setup(Rainbows::Fiber::Rev::Server)
       init_worker_process(worker)
       Server.const_set(:MAX, @worker_connections)
+      Rainbows::Fiber::Base.setup(Rainbows::Fiber::Rev::Server, nil)
       Server.const_set(:APP, G.server.app)
       Heartbeat.new(1, true).attach(::Rev::Loop.default)
       kato = Kato.new.attach(::Rev::Loop.default)
diff --git a/lib/rainbows/revactor.rb b/lib/rainbows/revactor.rb
index 7a063ab..de423a3 100644
--- a/lib/rainbows/revactor.rb
+++ b/lib/rainbows/revactor.rb
@@ -60,7 +60,7 @@ module Rainbows::Revactor
 
       alive = hp.keepalive? && G.alive
       out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if hp.headers?
-      HttpResponse.write(client, response, out)
+      write_response(client, response, out)
     end while alive and hp.reset.nil? and env.clear
   rescue ::Revactor::TCP::ReadError
   rescue => e
@@ -74,6 +74,7 @@ module Rainbows::Revactor
   # given a INT, QUIT, or TERM signal)
   def worker_loop(worker)
     init_worker_process(worker)
+    self.class.__send__(:alias_method, :write_body, :write_body_each)
     RD_ARGS[:timeout] = G.kato if G.kato > 0
     nr = 0
     limit = worker_connections
diff --git a/lib/rainbows/sendfile.rb b/lib/rainbows/sendfile.rb
index 146c4c5..3f82047 100644
--- a/lib/rainbows/sendfile.rb
+++ b/lib/rainbows/sendfile.rb
@@ -57,34 +57,23 @@ class Sendfile < Struct.new(:app)
   # Body wrapper, this allows us to fall back gracefully to
   # +each+ in case a given concurrency model does not optimize
   # +to_path+ calls.
-  class Body < Struct.new(:to_io)
-
-    def initialize(path, headers)
-      # Rainbows! will try #to_io if #to_path exists to avoid unnecessary
-      # open() calls.
-      self.to_io = File.open(path, 'rb')
+  class Body < Struct.new(:to_path)
 
+    def self.new(path, headers)
       unless headers['Content-Length']
-        stat = to_io.stat
+        stat = File.stat(path)
         headers['Content-Length'] = stat.size.to_s if stat.file?
       end
-    end
-
-    def to_path
-      to_io.path
+      super(path)
     end
 
     # fallback in case our +to_path+ doesn't get handled for whatever reason
     def each(&block)
-      buf = ''
-      while to_io.read(0x4000, buf)
-        yield buf
+      File.open(to_path, 'rb') do |fp|
+        buf = ''
+        yield buf while fp.read(0x4000, buf)
       end
     end
-
-    def close
-      to_io.close
-    end
   end
 
   def call(env)
diff --git a/lib/rainbows/writer_thread_pool.rb b/lib/rainbows/writer_thread_pool.rb
index f7eb2aa..b6c53e8 100644
--- a/lib/rainbows/writer_thread_pool.rb
+++ b/lib/rainbows/writer_thread_pool.rb
@@ -46,8 +46,10 @@ module Rainbows
       end
     end
 
-    def write_body(qclient, body)
-      qclient.q << [ qclient.to_io, :body, body ]
+    module Response
+      def write_body(qclient, body)
+        qclient.q << [ qclient.to_io, :body, body ]
+      end
     end
 
     @@nr = 0
@@ -59,6 +61,10 @@ module Rainbows
     end
 
     def worker_loop(worker)
+      Rainbows::HttpResponse.setup(self.class)
+      self.class.__send__(:alias_method, :sync_write_body, :write_body)
+      self.class.__send__(:include, Response)
+
       # we have multiple, single-thread queues since we don't want to
       # interleave writes from the same client
       qp = (1..worker_connections).map do |n|
@@ -66,7 +72,7 @@ module Rainbows
           begin
             io, arg1, arg2 = response
             case arg1
-            when :body then Base.write_body(io, arg2)
+            when :body then sync_write_body(io, arg2)
             when :close then io.close unless io.closed?
             else
               io.write(arg1)
diff --git a/lib/rainbows/writer_thread_spawn.rb b/lib/rainbows/writer_thread_spawn.rb
index 0a8988f..e1f9e53 100644
--- a/lib/rainbows/writer_thread_spawn.rb
+++ b/lib/rainbows/writer_thread_spawn.rb
@@ -28,6 +28,8 @@ module Rainbows
     # used to wrap a BasicSocket to use with +q+ for all writes
     # this is compatible with IO.select
     class MySocket < Struct.new(:to_io, :q, :thr)
+      include Rainbows::HttpResponse
+
       def readpartial(size, buf = "")
         to_io.readpartial(size, buf)
       end
@@ -51,7 +53,7 @@ module Rainbows
             begin
               arg1, arg2 = response
               case arg1
-              when :body then Base.write_body(io, arg2)
+              when :body then write_body(io, arg2)
               when :close
                 io.close unless io.closed?
                 break
@@ -71,7 +73,7 @@ module Rainbows
         (self.q ||= queue_writer) << buf
       end
 
-      def write_body(body)
+      def queue_body(body)
         (self.q ||= queue_writer) << [ :body, body ]
       end
 
@@ -89,7 +91,7 @@ module Rainbows
     end
 
     def write_body(my_sock, body)
-      my_sock.write_body(body)
+      my_sock.queue_body(body)
     end
 
     def process_client(client)
@@ -98,6 +100,7 @@ module Rainbows
 
     def worker_loop(worker)
       MySocket.const_set(:MAX, worker_connections)
+      Rainbows::HttpResponse.setup(MySocket)
       super(worker) # accept loop from Unicorn
       CUR.delete_if do |t,q|
         q << nil
diff --git a/t/t0020-large-sendfile-response.sh b/t/t0020-large-sendfile-response.sh
index 822a23f..8e71db1 100755
--- a/t/t0020-large-sendfile-response.sh
+++ b/t/t0020-large-sendfile-response.sh
@@ -14,10 +14,8 @@ t_plan 7 "large sendfile response for $model"
 t_begin "setup and startup" && {
         rtmpfiles curl_out a b c
         rainbows_setup $model 2
-
-        # FIXME: allow "require 'sendfile'" to work in $unicorn_config
-        RUBYOPT="-rsendfile"
-        export RUBYOPT
+        echo 'require "sendfile"' >> $unicorn_config
+        echo 'def (::IO).copy_stream(*x); abort "NO"; end' >> $unicorn_config
 
         # can't load Rack::Lint here since it clobbers body#to_path
         rainbows -E none -D large-file-response.ru -c $unicorn_config