about summary refs log tree commit homepage
diff options
context:
space:
mode:
-rw-r--r--lib/rainbows.rb8
-rw-r--r--lib/rainbows/base.rb6
-rw-r--r--lib/rainbows/client.rb5
-rw-r--r--lib/rainbows/coolio.rb2
-rw-r--r--lib/rainbows/coolio/client.rb28
-rw-r--r--lib/rainbows/ev_core.rb5
-rw-r--r--lib/rainbows/event_machine.rb2
-rw-r--r--lib/rainbows/event_machine/client.rb17
-rw-r--r--lib/rainbows/fiber/base.rb4
-rw-r--r--lib/rainbows/fiber/body.rb18
-rw-r--r--lib/rainbows/fiber/coolio/server.rb3
-rw-r--r--lib/rainbows/process_client.rb43
-rw-r--r--lib/rainbows/rack_input.rb6
-rw-r--r--lib/rainbows/response.rb197
-rw-r--r--lib/rainbows/response/body.rb122
-rw-r--r--lib/rainbows/response/range.rb34
-rw-r--r--lib/rainbows/revactor.rb65
-rw-r--r--lib/rainbows/revactor/client.rb59
-rw-r--r--lib/rainbows/revactor/client/methods.rb (renamed from lib/rainbows/revactor/body.rb)29
-rw-r--r--lib/rainbows/revactor/client/tee_socket.rb (renamed from lib/rainbows/revactor/tee_socket.rb)2
-rw-r--r--lib/rainbows/thread_pool.rb4
-rw-r--r--lib/rainbows/thread_spawn.rb2
-rw-r--r--lib/rainbows/writer_thread_pool.rb36
-rw-r--r--lib/rainbows/writer_thread_pool/client.rb45
-rw-r--r--lib/rainbows/writer_thread_spawn.rb13
-rw-r--r--lib/rainbows/writer_thread_spawn/client.rb63
26 files changed, 405 insertions, 413 deletions
diff --git a/lib/rainbows.rb b/lib/rainbows.rb
index 643bdd2..909e97e 100644
--- a/lib/rainbows.rb
+++ b/lib/rainbows.rb
@@ -39,9 +39,11 @@ module Rainbows
   require 'rainbows/const'
   require 'rainbows/http_parser'
   require 'rainbows/http_server'
-  require 'rainbows/response'
-  require 'rainbows/client'
-  require 'rainbows/process_client'
+  autoload :RackInput, 'rainbows/rack_input'
+  autoload :Response, 'rainbows/response'
+  autoload :ProcessClient, 'rainbows/process_client'
+  autoload :TimedRead, 'rainbows/timed_read'
+  autoload :Client, 'rainbows/client'
   autoload :Base, 'rainbows/base'
   autoload :Sendfile, 'rainbows/sendfile'
   autoload :AppPool, 'rainbows/app_pool'
diff --git a/lib/rainbows/base.rb b/lib/rainbows/base.rb
index bf9ef87..5d56063 100644
--- a/lib/rainbows/base.rb
+++ b/lib/rainbows/base.rb
@@ -6,9 +6,7 @@
 # not intended for production use, as keepalive with a pure prefork
 # concurrency model is extremely expensive.
 module Rainbows::Base
-
   # :stopdoc:
-  include Rainbows::ProcessClient
 
   # shortcuts...
   G = Rainbows::G
@@ -34,6 +32,10 @@ module Rainbows::Base
     logger.info "Rainbows! #@use worker_connections=#@worker_connections"
   end
 
+  def process_client(client)
+    client.process_loop
+  end
+
   def self.included(klass) # :nodoc:
     klass.const_set :LISTENERS, Rainbows::HttpServer::LISTENERS
     klass.const_set :G, Rainbows::G
diff --git a/lib/rainbows/client.rb b/lib/rainbows/client.rb
index dc6d95e..8425e9e 100644
--- a/lib/rainbows/client.rb
+++ b/lib/rainbows/client.rb
@@ -1,9 +1,8 @@
 # -*- encoding: binary -*-
 # :enddoc:
 
-require 'rainbows/timed_read'
-
+# this class is used for most synchronous concurrency models
 class Rainbows::Client < Kgio::Socket
   include Rainbows::TimedRead
+  include Rainbows::ProcessClient
 end
-Kgio.accept_class = Rainbows::Client
diff --git a/lib/rainbows/coolio.rb b/lib/rainbows/coolio.rb
index 463bf0a..d0b8b2e 100644
--- a/lib/rainbows/coolio.rb
+++ b/lib/rainbows/coolio.rb
@@ -31,6 +31,7 @@ module Rainbows::Coolio
     KATO.compare_by_identity
   end
 
+  autoload :Client, 'rainbows/coolio/client'
   autoload :Master, 'rainbows/coolio/master'
   autoload :ThreadClient, 'rainbows/coolio/thread_client'
   autoload :ResponsePipe, 'rainbows/coolio/response_pipe'
@@ -41,5 +42,4 @@ end
 require 'rainbows/coolio/heartbeat'
 require 'rainbows/coolio/server'
 require 'rainbows/coolio/core'
-require 'rainbows/coolio/client'
 Rainbows::Coolio.__send__ :include, Rainbows::Coolio::Core
diff --git a/lib/rainbows/coolio/client.rb b/lib/rainbows/coolio/client.rb
index 6360e2d..d0b17a9 100644
--- a/lib/rainbows/coolio/client.rb
+++ b/lib/rainbows/coolio/client.rb
@@ -84,39 +84,37 @@ class Rainbows::Coolio::Client < Coolio::IO
   end
 
   # used for streaming sockets and pipes
-  def stream_response(status, headers, io, body)
-    c = stream_response_headers(status, headers) if headers
+  def stream_response_body(body, io, chunk)
     # we only want to attach to the Coolio::Loop belonging to the
     # main thread in Ruby 1.9
-    io = (c ? ResponseChunkPipe : ResponsePipe).new(io, self, body)
+    io = (chunk ? ResponseChunkPipe : ResponsePipe).new(io, self, body)
     defer_body(io.attach(LOOP))
   end
 
   def coolio_write_response(response, alive)
     status, headers, body = response
-    headers = @hp.headers? ? HH.new(headers) : nil
 
-    headers[CONNECTION] = alive ? KEEP_ALIVE : CLOSE if headers
     if body.respond_to?(:to_path)
       io = body_to_io(body)
       st = io.stat
 
       if st.file?
-        offset, count = 0, st.size
-        if headers
-          if range = make_range!(@env, status, headers)
-            status, offset, count = range
-          end
-          write(response_header(status, headers))
+        if respond_to?(:sendfile_range) && r = sendfile_range(status, headers)
+          status, headers, range = r
+          write_headers(status, headers, alive)
+          defer_body(SF.new(range[0], range[1], io, body)) if range
+        else
+          write_headers(status, headers, alive)
+          defer_body(SF.new(0, st.size, io, body))
         end
-        return defer_body(SF.new(offset, count, io, body))
+        return
       elsif st.socket? || st.pipe?
-        return stream_response(status, headers, io, body)
+        chunk = stream_response_headers(status, headers, alive)
+        return stream_response_body(body, io, chunk)
       end
       # char or block device... WTF? fall through to body.each
     end
-    write(response_header(status, headers)) if headers
-    write_body_each(self, body, nil)
+    write_response(status, headers, body, alive)
   end
 
   def app_call
diff --git a/lib/rainbows/ev_core.rb b/lib/rainbows/ev_core.rb
index 60fbdca..471f6a3 100644
--- a/lib/rainbows/ev_core.rb
+++ b/lib/rainbows/ev_core.rb
@@ -36,14 +36,15 @@ module Rainbows::EvCore
   end
 
   # returns whether to enable response chunking for autochunk models
-  def stream_response_headers(status, headers)
+  def stream_response_headers(status, headers, alive)
+    headers = Rack::Utils::HeaderHash.new(headers)
     if headers['Content-Length']
       rv = false
     else
       rv = !!(headers['Transfer-Encoding'] =~ %r{\Achunked\z}i)
       rv = false if headers.delete('X-Rainbows-Autochunk') == 'no'
     end
-    write(response_header(status, headers))
+    write_headers(status, headers, alive)
     rv
   end
 
diff --git a/lib/rainbows/event_machine.rb b/lib/rainbows/event_machine.rb
index b226cab..cb76669 100644
--- a/lib/rainbows/event_machine.rb
+++ b/lib/rainbows/event_machine.rb
@@ -44,6 +44,7 @@ module Rainbows::EventMachine
   autoload :ResponsePipe, 'rainbows/event_machine/response_pipe'
   autoload :ResponseChunkPipe, 'rainbows/event_machine/response_chunk_pipe'
   autoload :TryDefer, 'rainbows/event_machine/try_defer'
+  autoload :Client, 'rainbows/event_machine/client'
 
   include Rainbows::Base
 
@@ -89,5 +90,4 @@ module Rainbows::EventMachine
   end
 end
 # :enddoc:
-require 'rainbows/event_machine/client'
 require 'rainbows/event_machine/server'
diff --git a/lib/rainbows/event_machine/client.rb b/lib/rainbows/event_machine/client.rb
index 6863be0..2fc9d03 100644
--- a/lib/rainbows/event_machine/client.rb
+++ b/lib/rainbows/event_machine/client.rb
@@ -58,27 +58,20 @@ class Rainbows::EventMachine::Client < EM::Connection
     end
   end
 
+  # don't change this method signature, "async.callback" relies on it
   def em_write_response(response, alive = false)
     status, headers, body = response
-    if @hp.headers?
-      headers = HH.new(headers)
-      headers[CONNECTION] = alive ? KEEP_ALIVE : CLOSE
-    else
-      headers = nil
-    end
 
     if body.respond_to?(:errback) && body.respond_to?(:callback)
       @body = body
       body.callback { quit }
       body.errback { quit }
-      # async response, this could be a trickle as is in comet-style apps
-      headers[CONNECTION] = CLOSE if headers
       alive = true
     elsif body.respond_to?(:to_path)
       st = File.stat(path = body.to_path)
 
       if st.file?
-        write(response_header(status, headers)) if headers
+        write_headers(status, headers, alive)
         @body = stream_file_data(path)
         @body.errback do
           body.close if body.respond_to?(:close)
@@ -92,16 +85,14 @@ class Rainbows::EventMachine::Client < EM::Connection
         return
       elsif st.socket? || st.pipe?
         io = body_to_io(@body = body)
-        chunk = stream_response_headers(status, headers) if headers
+        chunk = stream_response_headers(status, headers, alive)
         m = chunk ? Rainbows::EventMachine::ResponseChunkPipe :
                     Rainbows::EventMachine::ResponsePipe
         return EM.watch(io, m, self).notify_readable = true
       end
       # char or block device... WTF? fall through to body.each
     end
-
-    write(response_header(status, headers)) if headers
-    write_body_each(self, body)
+    write_response(status, headers, body, alive)
     quit unless alive
   end
 
diff --git a/lib/rainbows/fiber/base.rb b/lib/rainbows/fiber/base.rb
index b693451..ae885b6 100644
--- a/lib/rainbows/fiber/base.rb
+++ b/lib/rainbows/fiber/base.rb
@@ -57,7 +57,7 @@ module Rainbows::Fiber::Base
 
   def process(client)
     G.cur += 1
-    process_client(client)
+    client.process_loop
   ensure
     G.cur -= 1
     ZZ.delete(client.f)
@@ -65,7 +65,7 @@ module Rainbows::Fiber::Base
 
   def self.setup(klass, app)
     require 'rainbows/fiber/body'
-    klass.__send__(:include, Rainbows::Fiber::Body)
+    Rainbows::Client.__send__(:include, Rainbows::Fiber::Body)
     self.const_set(:APP, app)
   end
 end
diff --git a/lib/rainbows/fiber/body.rb b/lib/rainbows/fiber/body.rb
index 1d7d325..872b1df 100644
--- a/lib/rainbows/fiber/body.rb
+++ b/lib/rainbows/fiber/body.rb
@@ -5,20 +5,15 @@
 # this is meant to be included _after_ Rainbows::Response::Body
 module Rainbows::Fiber::Body # :nodoc:
 
-  # TODO non-blocking splice(2) under Linux
-  ALIASES = {
-    :write_body_stream => :write_body_each
-  }
-
   # the sendfile 1.0.0+ gem includes IO#sendfile_nonblock
   if IO.method_defined?(:sendfile_nonblock)
-    def write_body_file_sendfile_fiber(client, body, range)
-      sock, n, body = client.to_io, nil, body_to_io(body)
+    def write_body_file(body, range)
+      sock, n, body = to_io, nil, body_to_io(body)
       offset, count = range ? range : [ 0, body.stat.size ]
       begin
         offset += (n = sock.sendfile_nonblock(body, offset, count))
       rescue Errno::EAGAIN
-        client.kgio_wait_writable
+        kgio_wait_writable
         retry
       rescue EOFError
         break
@@ -26,14 +21,9 @@ module Rainbows::Fiber::Body # :nodoc:
       ensure
         close_if_private(body)
     end
-    ALIASES[:write_body_file] = :write_body_file_sendfile_fiber
-  else
-    ALIASES[:write_body] = :write_body_each
   end
 
   def self.included(klass)
-    ALIASES.each do |new_method, orig_method|
-      klass.__send__(:alias_method, new_method, orig_method)
-    end
+    klass.__send__ :alias_method, :write_body_stream, :write_body_each
   end
 end
diff --git a/lib/rainbows/fiber/coolio/server.rb b/lib/rainbows/fiber/coolio/server.rb
index 0de1ab3..b064953 100644
--- a/lib/rainbows/fiber/coolio/server.rb
+++ b/lib/rainbows/fiber/coolio/server.rb
@@ -2,7 +2,6 @@
 # :enddoc:
 class Rainbows::Fiber::Coolio::Server < Coolio::IOWatcher
   G = Rainbows::G
-  include Rainbows::ProcessClient
 
   def to_io
     @io
@@ -25,7 +24,7 @@ class Rainbows::Fiber::Coolio::Server < Coolio::IOWatcher
 
   def process(io)
     G.cur += 1
-    process_client(io)
+    io.process_loop
   ensure
     G.cur -= 1
   end
diff --git a/lib/rainbows/process_client.rb b/lib/rainbows/process_client.rb
index 54e59e8..d840778 100644
--- a/lib/rainbows/process_client.rb
+++ b/lib/rainbows/process_client.rb
@@ -1,54 +1,41 @@
 # -*- encoding: binary -*-
-# :enddoc:
-require 'rainbows/rack_input'
 module Rainbows::ProcessClient
-  G = Rainbows::G
   include Rainbows::Response
-  HttpParser = Rainbows::HttpParser
   include Rainbows::RackInput
   include Rainbows::Const
 
-  # once a client is accepted, it is processed in its entirety here
-  # in 3 easy steps: read request, call app, write app response
-  # this is used by synchronous concurrency models
-  #   Base, ThreadSpawn, ThreadPool
-  def process_client(client) # :nodoc:
-    hp = HttpParser.new
-    client.kgio_read!(16384, buf = hp.buf)
-    remote_addr = client.kgio_addr
-    alive = false
+  def process_loop
+    @hp = hp = Rainbows::HttpParser.new
+    kgio_read!(16384, buf = hp.buf) or return
 
     begin # loop
       until env = hp.parse
-        client.timed_read(buf2 ||= "") or return
+        timed_read(buf2 ||= "") or return
         buf << buf2
       end
 
-      set_input(env, hp, client)
-      env[REMOTE_ADDR] = remote_addr
-      status, headers, body = APP.call(env.update(RACK_DEFAULTS))
+      set_input(env, hp)
+      env[REMOTE_ADDR] = kgio_addr
+      status, headers, body = APP.call(env.merge!(RACK_DEFAULTS))
 
       if 100 == status.to_i
-        client.write(EXPECT_100_RESPONSE)
+        write(EXPECT_100_RESPONSE)
         env.delete(HTTP_EXPECT)
         status, headers, body = APP.call(env)
       end
-
-      if hp.headers?
-        headers = HH.new(headers)
-        range = make_range!(env, status, headers) and status = range.shift
-        headers[CONNECTION] = (alive = hp.next?) ? KEEP_ALIVE : CLOSE
-        client.write(response_header(status, headers))
-      end
-      write_body(client, body, range)
+      write_response(status, headers, body, alive = @hp.next?)
     end while alive
   # if we get any error, try to write something back to the client
   # assuming we haven't closed the socket, but don't get hung up
   # if the socket is already closed or broken.  We'll always ensure
   # the socket is closed at the end of this function
   rescue => e
-    Rainbows::Error.write(client, e)
+    handle_error(e)
   ensure
-    client.close unless client.closed?
+    close unless closed?
+  end
+
+  def handle_error(e)
+    Rainbows::Error.write(self, e)
   end
 end
diff --git a/lib/rainbows/rack_input.rb b/lib/rainbows/rack_input.rb
index df51ac1..bc68ed1 100644
--- a/lib/rainbows/rack_input.rb
+++ b/lib/rainbows/rack_input.rb
@@ -10,8 +10,8 @@ module Rainbows::RackInput
     const_set(:IC, Unicorn::HttpRequest.input_class)
   end
 
-  def set_input(env, hp, client)
-    env[RACK_INPUT] = 0 == hp.content_length ? NULL_IO : IC.new(client, hp)
-    env[CLIENT_IO] = client
+  def set_input(env, hp)
+    env[RACK_INPUT] = 0 == hp.content_length ? NULL_IO : IC.new(self, hp)
+    env[CLIENT_IO] = self
   end
 end
diff --git a/lib/rainbows/response.rb b/lib/rainbows/response.rb
index ca381b8..c0d0740 100644
--- a/lib/rainbows/response.rb
+++ b/lib/rainbows/response.rb
@@ -3,60 +3,179 @@
 require 'time' # for Time#httpdate
 
 module Rainbows::Response
-  autoload :Body, 'rainbows/response/body'
-  autoload :Range, 'rainbows/response/range'
-
+  CRLF = Unicorn::HttpResponse::CRLF
   CODES = Unicorn::HttpResponse::CODES
-  CRLF = "\r\n"
+  Close = "close"
+  KeepAlive = "keep-alive"
+
+  # private file class for IO objects opened by Rainbows! itself (and not
+  # the app or middleware)
+  class F < File; end
 
-  # freeze headers we may set as hash keys for a small speedup
-  CONNECTION = "Connection".freeze
-  CLOSE = "close"
-  KEEP_ALIVE = "keep-alive"
-  HH = Rack::Utils::HeaderHash
+  # called after forking
+  def self.setup(klass)
+    Kgio.accept_class = Rainbows::Client
+    0 == Rainbows::G.kato and Rainbows::HttpParser.keepalive_requests = 0
+  end
 
-  def response_header(status, headers)
+  def write_headers(status, headers, alive)
+    @hp.headers? or return
     status = CODES[status.to_i] || status
-    rv = "HTTP/1.1 #{status}\r\n" \
-         "Date: #{Time.now.httpdate}\r\n" \
-         "Status: #{status}\r\n"
+    buf = "HTTP/1.1 #{status}\r\n" \
+          "Date: #{Time.now.httpdate}\r\n" \
+          "Status: #{status}\r\n" \
+          "Connection: #{alive ? KeepAlive : Close}\r\n"
     headers.each do |key, value|
-      next if %r{\A(?:X-Rainbows-|Date\z|Status\z)}i =~ key
+      next if %r{\A(?:X-Rainbows-|Date\z|Status\z\|Connection\z)}i =~ key
       if value =~ /\n/
         # avoiding blank, key-only cookies with /\n+/
-        rv << value.split(/\n+/).map! { |v| "#{key}: #{v}\r\n" }.join('')
+        buf << value.split(/\n+/).map! { |v| "#{key}: #{v}\r\n" }.join
       else
-        rv << "#{key}: #{value}\r\n"
+        buf << "#{key}: #{value}\r\n"
       end
     end
-    rv << CRLF
+    write(buf << CRLF)
   end
 
-  # called after forking
-  def self.setup(klass)
-    if 0 == Rainbows::G.kato
-      KEEP_ALIVE.replace(CLOSE)
-      Rainbows::HttpParser.keepalive_requests = 0
-    end
-    range_class = body_class = klass
-    case Rainbows::Const::RACK_DEFAULTS['rainbows.model']
-    when :WriterThreadSpawn
-      body_class = Rainbows::WriterThreadSpawn::Client
-      range_class = Rainbows::HttpServer
-    when :EventMachine, :NeverBlock
-      range_class = nil # :<
-    end
-    return if body_class.included_modules.include?(Body)
-    body_class.__send__(:include, Body)
-    sf = IO.respond_to?(:copy_stream) || IO.method_defined?(:sendfile_nonblock)
-    if range_class
-      range_class.__send__(:include, sf ? Range : NoRange)
+  def close_if_private(io)
+    io.close if F === io
+  end
+
+  def io_for_fd(fd)
+    Rainbows::FD_MAP.delete(fd) || F.for_fd(fd)
+  end
+
+  # to_io is not part of the Rack spec, but make an exception here
+  # since we can conserve path lookups and file descriptors.
+  # \Rainbows! will never get here without checking for the existence
+  # of body.to_path first.
+  def body_to_io(body)
+    if body.respond_to?(:to_io)
+      body.to_io
+    else
+      # try to take advantage of Rainbows::DevFdResponse, calling F.open
+      # is a last resort
+      path = body.to_path
+      %r{\A/dev/fd/(\d+)\z} =~ path ? io_for_fd($1.to_i) : F.open(path)
     end
   end
 
-  module NoRange
-    # dummy method if we can't send range responses
-    def make_range!(env, status, headers)
+  module Each
+    # generic body writer, used for most dynamically-generated responses
+    def write_body_each(body)
+      body.each { |chunk| write(chunk) }
+    end
+
+    # generic response writer, used for most dynamically-generated responses
+    # and also when IO.copy_stream and/or IO#sendfile_nonblock is unavailable
+    def write_response(status, headers, body, alive)
+      write_headers(status, headers, alive)
+      write_body_each(body)
+      ensure
+        body.close if body.respond_to?(:close)
     end
   end
+  include Each
+
+  if IO.method_defined?(:sendfile_nonblock)
+    module Sendfile
+      def write_body_file(body, range)
+        io = body_to_io(body)
+        range ? sendfile(io, range[0], range[1]) : sendfile(io, 0)
+        ensure
+          close_if_private(io)
+      end
+    end
+    include Sendfile
+  end
+
+  if IO.respond_to?(:copy_stream)
+    unless IO.method_defined?(:sendfile_nonblock)
+      module CopyStream
+        def write_body_file(body, range)
+          range ? IO.copy_stream(body, self, range[1], range[0]) :
+                  IO.copy_stream(body, self, nil, 0)
+        end
+      end
+      include CopyStream
+    end
+
+    # write_body_stream is an alias for write_body_each if IO.copy_stream
+    # isn't used or available.
+    def write_body_stream(body)
+      IO.copy_stream(io = body_to_io(body), self)
+      ensure
+        close_if_private(io)
+    end
+  else # ! IO.respond_to?(:copy_stream)
+    alias write_body_stream write_body_each
+  end  # ! IO.respond_to?(:copy_stream)
+
+  if IO.method_defined?(:sendfile_nonblock) || IO.respond_to?(:copy_stream)
+    HTTP_RANGE = 'HTTP_RANGE'
+    Content_Range = 'Content-Range'.freeze
+    Content_Length = 'Content-Length'.freeze
+
+    # This does not support multipart responses (does anybody actually
+    # use those?)
+    def sendfile_range(status, headers)
+      200 == status.to_i &&
+      /\Abytes=(\d+-\d*|\d*-\d+)\z/ =~ @hp.env[HTTP_RANGE] or
+        return
+      a, b = $1.split(/-/)
+
+      headers = Rack::Utils::HeaderHash.new(headers)
+      clen = headers[Content_Length] or return
+      size = clen.to_i
+
+      if b.nil? # bytes=M-
+        offset = a.to_i
+        count = size - offset
+      elsif a.empty? # bytes=-N
+        offset = size - b.to_i
+        count = size - offset
+      else  # bytes=M-N
+        offset = a.to_i
+        count = b.to_i + 1 - offset
+      end
+
+      if 0 > count || offset >= size
+        return 416, headers, nil
+      else
+        count = size if count > size
+        headers[Content_Length] = count.to_s
+        headers[Content_Range] = "bytes #{offset}-#{offset+count-1}/#{clen}"
+        return 206, headers, [ offset, count ]
+      end
+    end
+
+    def write_response_path(status, headers, body, alive)
+      if File.file?(body.to_path)
+        if r = sendfile_range(status, headers)
+          status, headers, range = r
+          write_headers(status, headers, alive)
+          write_body_file(body, range) if range
+        else
+          write_headers(status, headers, alive)
+          write_body_file(body, nil)
+        end
+      else
+        write_headers(status, headers, alive)
+        write_body_stream(body)
+      end
+      ensure
+        body.close if body.respond_to?(:close)
+    end
+
+    module ToPath
+      def write_response(status, headers, body, alive)
+        if body.respond_to?(:to_path)
+          write_response_path(status, headers, body, alive)
+        else
+          super
+        end
+      end
+    end
+    include ToPath
+  end # IO.respond_to?(:copy_stream) || IO.method_defined?(:sendfile_nonblock)
 end
diff --git a/lib/rainbows/response/body.rb b/lib/rainbows/response/body.rb
deleted file mode 100644
index a5d04dd..0000000
--- a/lib/rainbows/response/body.rb
+++ /dev/null
@@ -1,122 +0,0 @@
-# -*- encoding: binary -*-
-# :enddoc:
-# non-portable body response stuff goes here
-#
-# The sendfile 1.0.0 RubyGem includes IO#sendfile and
-# IO#sendfile_nonblock.   Previous versions of "sendfile" didn't have
-# IO#sendfile_nonblock, and IO#sendfile in previous versions could
-# block other threads under 1.8 with large files
-#
-# IO#sendfile currently (June 2010) beats 1.9 IO.copy_stream with
-# non-Linux support and large files on 32-bit.  We still fall back to
-# IO.copy_stream (if available) if we're dealing with DevFdResponse
-# objects, though.
-#
-# Linux-only splice(2) support via the "io_splice" gem will eventually
-# be added for streaming sockets/pipes, too.
-#
-# * write_body_file - regular files (sendfile or pread+write)
-# * write_body_stream - socket/pipes (read+write, splice later)
-# * write_body_each - generic fallback
-#
-# callgraph is as follows:
-#
-#         write_body
-#         `- write_body_each
-#         `- write_body_path
-#            `- write_body_file
-#            `- write_body_stream
-#
-module Rainbows::Response::Body # :nodoc:
-  ALIASES = {}
-
-  FD_MAP = Rainbows::FD_MAP
-
-  class F < File; end
-
-  def close_if_private(io)
-    io.close if F === io
-  end
-
-  def io_for_fd(fd)
-    FD_MAP.delete(fd) || F.for_fd(fd)
-  end
-
-  # to_io is not part of the Rack spec, but make an exception here
-  # since we can conserve path lookups and file descriptors.
-  # \Rainbows! will never get here without checking for the existence
-  # of body.to_path first.
-  def body_to_io(body)
-    if body.respond_to?(:to_io)
-      body.to_io
-    else
-      # try to take advantage of Rainbows::DevFdResponse, calling File.open
-      # is a last resort
-      path = body.to_path
-      path =~ %r{\A/dev/fd/(\d+)\z} ? io_for_fd($1.to_i) : F.open(path)
-    end
-  end
-
-  if IO.method_defined?(:sendfile_nonblock)
-    def write_body_file_sendfile(sock, body, range)
-      io = body_to_io(body)
-      range ? sock.sendfile(io, range[0], range[1]) : sock.sendfile(io, 0)
-      ensure
-        close_if_private(io)
-    end
-    ALIASES[:write_body_file] = :write_body_file_sendfile
-  end
-
-  if IO.respond_to?(:copy_stream)
-    unless method_defined?(:write_body_file_sendfile)
-      # try to use sendfile() via IO.copy_stream, otherwise pread()+write()
-      def write_body_file_copy_stream(sock, body, range)
-        range ? IO.copy_stream(body, sock, range[1], range[0]) :
-                IO.copy_stream(body, sock, nil, 0)
-      end
-      ALIASES[:write_body_file] = :write_body_file_copy_stream
-    end
-
-    # only used when body is a pipe or socket that can't handle
-    # pread() semantics
-    def write_body_stream(sock, body)
-      IO.copy_stream(body, sock)
-    end
-  else
-    # fall back to body#each, which is a Rack standard
-    ALIASES[:write_body_stream] = :write_body_each
-  end
-
-  if ALIASES[:write_body_file]
-    # middlewares/apps may return with a body that responds to +to_path+
-    def write_body_path(sock, body, range)
-      File.file?(body.to_path) ? write_body_file(sock, body, range) :
-                                 write_body_stream(sock, body)
-      ensure
-        body.respond_to?(:close) and body.close
-    end
-  end
-
-  if method_defined?(:write_body_path)
-    def write_body(client, body, range)
-      body.respond_to?(:to_path) ?
-        write_body_path(client, body, range) :
-        write_body_each(client, body)
-    end
-  else
-    ALIASES[:write_body] = :write_body_each
-  end
-
-  # generic body writer, used for most dynamically generated responses
-  def write_body_each(socket, body, range = nil)
-    body.each { |chunk| socket.write(chunk) }
-    ensure
-      body.respond_to?(:close) and body.close
-  end
-
-  def self.included(klass)
-    ALIASES.each do |new_method, orig_method|
-      klass.__send__(:alias_method, new_method, orig_method)
-    end
-  end
-end
diff --git a/lib/rainbows/response/range.rb b/lib/rainbows/response/range.rb
deleted file mode 100644
index b383587..0000000
--- a/lib/rainbows/response/range.rb
+++ /dev/null
@@ -1,34 +0,0 @@
-# -*- encoding: binary -*-
-# :enddoc:
-module Rainbows::Response::Range
-  HTTP_RANGE = 'HTTP_RANGE'
-  Content_Range = 'Content-Range'.freeze
-  Content_Length = 'Content-Length'.freeze
-
-  # This does not support multipart responses (does anybody actually
-  # use those?) +headers+ is always a Rack::Utils::HeaderHash
-  def make_range!(env, status, headers)
-    if 200 == status.to_i &&
-        (clen = headers[Content_Length]) &&
-        /\Abytes=(\d+-\d*|\d*-\d+)\z/ =~ env[HTTP_RANGE]
-      a, b = $1.split(/-/)
-      clen = clen.to_i
-      if b.nil? # bytes=M-
-        offset = a.to_i
-        count = clen - offset
-      elsif a.empty? # bytes=-N
-        offset = clen - b.to_i
-        count = clen - offset
-      else  # bytes=M-N
-        offset = a.to_i
-        count = b.to_i + 1 - offset
-      end
-      raise Rainbows::Response416 if count <= 0 || offset >= clen
-      count = clen if count > clen
-      headers[Content_Length] = count.to_s
-      headers[Content_Range] = "bytes #{offset}-#{offset+count-1}/#{clen}"
-      [ 206, offset, count ]
-    end
-    # nil if no status
-  end
-end
diff --git a/lib/rainbows/revactor.rb b/lib/rainbows/revactor.rb
index f4e8fca..be4badf 100644
--- a/lib/rainbows/revactor.rb
+++ b/lib/rainbows/revactor.rb
@@ -19,76 +19,17 @@ Revactor::VERSION >= '0.1.5' or abort 'revactor 0.1.5 is required'
 # \Revactor library as well, to take advantage of the networking
 # concurrency features this model provides.
 module Rainbows::Revactor
-
-  # :stopdoc:
-  RD_ARGS = {}
-
+  autoload :Client, 'rainbows/revactor/client'
   autoload :Proxy, 'rainbows/revactor/proxy'
-  autoload :TeeSocket, 'rainbows/revactor/tee_socket'
 
   include Rainbows::Base
-  LOCALHOST = Kgio::LOCALHOST
-  TCP = Revactor::TCP::Socket
-
-  # once a client is accepted, it is processed in its entirety here
-  # in 3 easy steps: read request, call app, write app response
-  def process_client(client) # :nodoc:
-    io = client.instance_variable_get(:@_io)
-    io.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC)
-    rd_args = [ nil ]
-    remote_addr = if TCP === client
-      rd_args << RD_ARGS
-      client.remote_addr
-    else
-      LOCALHOST
-    end
-    hp = Rainbows::HttpParser.new
-    buf = hp.buf
-    alive = false
-
-    begin
-      ts = nil
-      until env = hp.parse
-        buf << client.read(*rd_args)
-      end
-
-      env[CLIENT_IO] = client
-      env[RACK_INPUT] = 0 == hp.content_length ?
-               NULL_IO : IC.new(ts = TeeSocket.new(client), hp)
-      env[REMOTE_ADDR] = remote_addr
-      status, headers, body = app.call(env.update(RACK_DEFAULTS))
-
-      if 100 == status.to_i
-        client.write(EXPECT_100_RESPONSE)
-        env.delete(HTTP_EXPECT)
-        status, headers, body = app.call(env)
-      end
-
-      if hp.headers?
-        headers = HH.new(headers)
-        range = make_range!(env, status, headers) and status = range.shift
-        headers[CONNECTION] = (alive = hp.next?) ? KEEP_ALIVE : CLOSE
-        client.write(response_header(status, headers))
-        alive && ts and buf << ts.leftover
-      end
-      write_body(client, body, range)
-    end while alive
-  rescue Revactor::TCP::ReadError
-  rescue => e
-    Rainbows::Error.write(io, e)
-  ensure
-    client.close
-  end
 
   # runs inside each forked worker, this sits around and waits
   # for connections and doesn't die until the parent dies (or is
   # given a INT, QUIT, or TERM signal)
   def worker_loop(worker) #:nodoc:
+    Client.setup
     init_worker_process(worker)
-    require 'rainbows/revactor/body'
-    self.class.__send__(:include, Rainbows::Revactor::Body)
-    self.class.const_set(:IC, Unicorn::HttpRequest.input_class)
-    RD_ARGS[:timeout] = G.kato if G.kato > 0
     nr = 0
     limit = worker_connections
     actor_exit = Case[:exit, Actor, Object]
@@ -114,7 +55,7 @@ module Rainbows::Revactor
             f.when(actor_exit) { nr -= 1 }
             f.when(accept) do |_, _, s|
               nr += 1
-              Actor.spawn_link(s) { |c| process_client(c) }
+              Actor.spawn_link(s) { |c| Client.new(c).process_loop }
             end
           end
         rescue => e
diff --git a/lib/rainbows/revactor/client.rb b/lib/rainbows/revactor/client.rb
new file mode 100644
index 0000000..7c4b53d
--- /dev/null
+++ b/lib/rainbows/revactor/client.rb
@@ -0,0 +1,59 @@
+# -*- encoding: binary -*-
+# :enddoc:
+require 'fcntl'
+class Rainbows::Revactor::Client
+  autoload :TeeSocket, 'rainbows/revactor/client/tee_socket'
+  RD_ARGS = {}
+  RD_ARGS[:timeout] = Rainbows::G.kato if Rainbows::G.kato > 0
+  attr_reader :kgio_addr
+
+  def initialize(client)
+    @client, @rd_args, @ts = client, [ nil ], nil
+    io = client.instance_variable_get(:@_io)
+    io.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC)
+    @kgio_addr = if Revactor::TCP::Socket === client
+      @rd_args << RD_ARGS
+      client.remote_addr
+    else
+      Kgio::LOCALHOST
+    end
+  end
+
+  def kgio_read!(nr, buf)
+    buf.replace(@client.read)
+  end
+
+  def write(buf)
+    @client.write(buf)
+  end
+
+  def write_nonblock(buf) # only used for errors
+    @client.instance_variable_get(:@_io).write_nonblock(buf)
+  end
+
+  def timed_read(buf2)
+    buf2.replace(@client.read(*@rd_args))
+  end
+
+  def set_input(env, hp)
+    env[RACK_INPUT] = 0 == hp.content_length ?
+                      NULL_IO : IC.new(@ts = TeeSocket.new(@client), hp)
+    env[CLIENT_IO] = @client
+  end
+
+  def close
+    @client.close
+    @client = nil
+  end
+
+  def closed?
+    @client.nil?
+  end
+
+  def self.setup
+    self.const_set(:IC, Unicorn::HttpRequest.input_class)
+    include Rainbows::ProcessClient
+    include Methods
+  end
+end
+require 'rainbows/revactor/client/methods'
diff --git a/lib/rainbows/revactor/body.rb b/lib/rainbows/revactor/client/methods.rb
index 9820df3..e9b39a3 100644
--- a/lib/rainbows/revactor/body.rb
+++ b/lib/rainbows/revactor/client/methods.rb
@@ -1,15 +1,10 @@
 # -*- encoding: binary -*-
 # :enddoc:
-module Rainbows::Revactor::Body
-  # TODO non-blocking splice(2) under Linux
-  ALIASES = {
-    :write_body_stream => :write_body_each
-  }
-
+module Rainbows::Revactor::Client::Methods
   if IO.method_defined?(:sendfile_nonblock)
-    def write_body_file_sendfile_revactor(client, body, range)
-      body = body_to_io(body)
-      sock = client.instance_variable_get(:@_io)
+    def write_body_file(body, range)
+      body, client = body_to_io(body), @client
+      sock = @client.instance_variable_get(:@_io)
       pfx = Revactor::TCP::Socket === client ? :tcp : :unix
       write_complete = T[:"#{pfx}_write_complete", client]
       closed = T[:"#{pfx}_closed", client]
@@ -33,14 +28,18 @@ module Rainbows::Revactor::Body
       ensure
         close_if_private(body)
     end
-    ALIASES[:write_body_file] = :write_body_file_sendfile_revactor
-  else
-    ALIASES[:write_body] = :write_body_each
+  end
+
+  def handle_error(e)
+    Revactor::TCP::ReadError === e or super
+  end
+
+  def write_response(status, headers, body, alive)
+    super(status, headers, body, alive)
+    alive && @ts and @hp.buf << @ts.leftover
   end
 
   def self.included(klass)
-    ALIASES.each do |new_method, orig_method|
-      klass.__send__(:alias_method, new_method, orig_method)
-    end
+    klass.__send__ :alias_method, :write_body_stream, :write_body_each
   end
 end
diff --git a/lib/rainbows/revactor/tee_socket.rb b/lib/rainbows/revactor/client/tee_socket.rb
index 71aeb88..2f9f52e 100644
--- a/lib/rainbows/revactor/tee_socket.rb
+++ b/lib/rainbows/revactor/client/tee_socket.rb
@@ -5,7 +5,7 @@
 # enough to avoid mucking with TeeInput internals.  Fortunately
 # this code is not heavily used so we can usually avoid the overhead
 # of adding a userspace buffer.
-class Rainbows::Revactor::TeeSocket
+class Rainbows::Revactor::Client::TeeSocket
   def initialize(socket)
     # IO::Buffer is used internally by Rev which Revactor is based on
     # so we'll always have it available
diff --git a/lib/rainbows/thread_pool.rb b/lib/rainbows/thread_pool.rb
index f243dc5..c82e22a 100644
--- a/lib/rainbows/thread_pool.rb
+++ b/lib/rainbows/thread_pool.rb
@@ -41,7 +41,7 @@ module Rainbows::ThreadPool
   def sync_worker # :nodoc:
     s = LISTENERS[0]
     begin
-      c = s.kgio_accept and process_client(c)
+      c = s.kgio_accept and c.process_loop
     rescue => e
       Rainbows::Error.listen_loop(e)
     end while G.alive
@@ -55,7 +55,7 @@ module Rainbows::ThreadPool
       # problem.  On the other hand, a thundering herd may not
       # even incur as much overhead as an extra Mutex#synchronize
       ret = select(LISTENERS) and ret[0].each do |s|
-        s = s.kgio_tryaccept and process_client(s)
+        s = s.kgio_tryaccept and s.process_loop
       end
     rescue Errno::EINTR
     rescue => e
diff --git a/lib/rainbows/thread_spawn.rb b/lib/rainbows/thread_spawn.rb
index acdaa69..d2d41e8 100644
--- a/lib/rainbows/thread_spawn.rb
+++ b/lib/rainbows/thread_spawn.rb
@@ -31,7 +31,7 @@ module Rainbows::ThreadSpawn
             klass.new(c) do |c|
               begin
                 lock.synchronize { G.cur += 1 }
-                process_client(c)
+                c.process_loop
               ensure
                 lock.synchronize { G.cur -= 1 }
               end
diff --git a/lib/rainbows/writer_thread_pool.rb b/lib/rainbows/writer_thread_pool.rb
index 67c8e83..558827f 100644
--- a/lib/rainbows/writer_thread_pool.rb
+++ b/lib/rainbows/writer_thread_pool.rb
@@ -19,30 +19,14 @@
 module Rainbows::WriterThreadPool
   # :stopdoc:
   include Rainbows::Base
+  autoload :Client, 'rainbows/writer_thread_pool/client'
 
   @@nr = 0
   @@q = nil
 
-  def async_write_body(qclient, body, range)
-    if body.respond_to?(:close)
-      Rainbows::SyncClose.new(body) do |body|
-        qclient.q << [ qclient.to_io, :body, body, range ]
-      end
-    else
-      qclient.q << [ qclient.to_io, :body, body, range ]
-    end
-  end
-
   def process_client(client) # :nodoc:
     @@nr += 1
-    super(Client.new(client, @@q[@@nr %= @@q.size]))
-  end
-
-  def init_worker_process(worker)
-    super
-    self.class.__send__(:alias_method, :sync_write_body, :write_body)
-    Rainbows::WriterThreadPool.__send__(
-                        :alias_method, :write_body, :async_write_body)
+    Client.new(client, @@q[@@nr %= @@q.size]).process_loop
   end
 
   def worker_loop(worker) # :nodoc:
@@ -51,12 +35,16 @@ module Rainbows::WriterThreadPool
     qp = (1..worker_connections).map do |n|
       Rainbows::QueuePool.new(1) do |response|
         begin
-          io, arg1, arg2, arg3 = response
-          case arg1
-          when :body then sync_write_body(io, arg2, arg3)
-          when :close then io.close unless io.closed?
+          io, arg, *rest = response
+          case arg
+          when String
+            io.kgio_write(arg)
+          when :close
+            warn "#{Thread.current} #{io} close"
+            io.close unless io.closed?
           else
-            io.write(arg1)
+            warn "#{Thread.current} #{io} #{arg}"
+            io.__send__(arg, *rest)
           end
         rescue => err
           Rainbows::Error.write(io, err)
@@ -70,5 +58,3 @@ module Rainbows::WriterThreadPool
   end
   # :startdoc:
 end
-# :enddoc:
-require 'rainbows/writer_thread_pool/client'
diff --git a/lib/rainbows/writer_thread_pool/client.rb b/lib/rainbows/writer_thread_pool/client.rb
index 3cc3335..526a623 100644
--- a/lib/rainbows/writer_thread_pool/client.rb
+++ b/lib/rainbows/writer_thread_pool/client.rb
@@ -4,6 +4,49 @@
 # this is compatible with IO.select
 class Rainbows::WriterThreadPool::Client < Struct.new(:to_io, :q)
   include Rainbows::SocketProxy
+  include Rainbows::ProcessClient
+
+  module Methods
+    def write_body_each(body)
+      q << [ to_io, :write_body_each, body ]
+    end
+
+    def write_response_close(status, headers, body, alive)
+      to_io.instance_variable_set(:@hp, @hp) # XXX ugh
+      Rainbows::SyncClose.new(body) { |sync_body|
+        q << [ to_io, :write_response, status, headers, sync_body, alive ]
+      }
+    end
+
+    if IO.respond_to?(:copy_stream) || IO.method_defined?(:sendfile_nonblock)
+      def write_response(status, headers, body, alive)
+        if body.respond_to?(:close)
+          write_response_close(status, headers, body, alive)
+        elsif body.respond_to?(:to_path)
+          write_response_path(status, headers, body, alive)
+        else
+          super
+        end
+      end
+
+      def write_body_file(body, range)
+        q << [ to_io, :write_body_file, body, range ]
+      end
+
+      def write_body_stream(body)
+        q << [ to_io, :write_body_stream, body ]
+      end
+    else # each-only body response
+      def write_response(status, headers, body, alive)
+        if body.respond_to?(:close)
+          write_response_close(status, headers, body, alive)
+        else
+          super
+        end
+      end
+    end # each-only body response
+  end # module Methods
+  include Methods
 
   def write(buf)
     q << [ to_io, buf ]
@@ -14,6 +57,6 @@ class Rainbows::WriterThreadPool::Client < Struct.new(:to_io, :q)
   end
 
   def closed?
-    false
+    to_io.closed?
   end
 end
diff --git a/lib/rainbows/writer_thread_spawn.rb b/lib/rainbows/writer_thread_spawn.rb
index 43e4f2c..2f264d9 100644
--- a/lib/rainbows/writer_thread_spawn.rb
+++ b/lib/rainbows/writer_thread_spawn.rb
@@ -19,19 +19,11 @@ require 'thread'
 # vulnerable to slow client denial-of-service attacks.
 
 module Rainbows::WriterThreadSpawn
-  # :stopdoc:
   include Rainbows::Base
-
-  def write_body(my_sock, body, range) # :nodoc:
-    if body.respond_to?(:close)
-      Rainbows::SyncClose.new(body) { |body| my_sock.queue_body(body, range) }
-    else
-      my_sock.queue_body(body, range)
-    end
-  end
+  autoload :Client, 'rainbows/writer_thread_spawn/client'
 
   def process_client(client) # :nodoc:
-    super(Client.new(client))
+    Client.new(client).process_loop
   end
 
   def worker_loop(worker)  # :nodoc:
@@ -42,4 +34,3 @@ module Rainbows::WriterThreadSpawn
   # :startdoc:
 end
 # :enddoc:
-require 'rainbows/writer_thread_spawn/client'
diff --git a/lib/rainbows/writer_thread_spawn/client.rb b/lib/rainbows/writer_thread_spawn/client.rb
index 8f65c19..15264d0 100644
--- a/lib/rainbows/writer_thread_spawn/client.rb
+++ b/lib/rainbows/writer_thread_spawn/client.rb
@@ -3,12 +3,56 @@
 # used to wrap a BasicSocket to use with +q+ for all writes
 # this is compatible with IO.select
 class Rainbows::WriterThreadSpawn::Client < Struct.new(:to_io, :q, :thr)
-  include Rainbows::Response
   include Rainbows::SocketProxy
+  include Rainbows::ProcessClient
   include Rainbows::WorkerYield
 
   CUR = {} # :nodoc:
 
+  module Methods
+    def write_body_each(body)
+      q << [ :write_body_each, body ]
+    end
+
+    def write_response_close(status, headers, body, alive)
+      to_io.instance_variable_set(:@hp, @hp) # XXX ugh
+      Rainbows::SyncClose.new(body) { |sync_body|
+        q << [ :write_response, status, headers, sync_body, alive ]
+      }
+    end
+
+    if IO.respond_to?(:copy_stream) || IO.method_defined?(:sendfile_nonblock)
+      def write_response(status, headers, body, alive)
+        self.q ||= queue_writer
+        if body.respond_to?(:close)
+          write_response_close(status, headers, body, alive)
+        elsif body.respond_to?(:to_path)
+          write_response_path(status, headers, body, alive)
+        else
+          super
+        end
+      end
+
+      def write_body_file(body, range)
+        q << [ :write_body_file, body, range ]
+      end
+
+      def write_body_stream(body)
+        q << [ :write_body_stream, body ]
+      end
+    else # each-only body response
+      def write_response(status, headers, body, alive)
+        self.q ||= queue_writer
+        if body.respond_to?(:close)
+          write_response_close(status, headers, body, alive)
+        else
+          super
+        end
+      end
+    end # each-only body response
+  end # module Methods
+  include Methods
+
   def self.quit
     g = Rainbows::G
     CUR.delete_if do |t,q|
@@ -27,16 +71,17 @@ class Rainbows::WriterThreadSpawn::Client < Struct.new(:to_io, :q, :thr)
 
     q = Queue.new
     self.thr = Thread.new(to_io, q) do |io, q|
-      while response = q.shift
+      while op = q.shift
         begin
-          arg1, arg2, arg3 = response
-          case arg1
-          when :body then write_body(io, arg2, arg3)
+          op, *rest = op
+          case op
+          when String
+            io.kgio_write(op)
           when :close
             io.close unless io.closed?
             break
           else
-            io.write(arg1)
+            io.__send__ op, *rest
           end
         rescue => e
           Rainbows::Error.write(io, e)
@@ -51,10 +96,6 @@ class Rainbows::WriterThreadSpawn::Client < Struct.new(:to_io, :q, :thr)
     (self.q ||= queue_writer) << buf
   end
 
-  def queue_body(body, range)
-    (self.q ||= queue_writer) << [ :body, body, range ]
-  end
-
   def close
     if q
       q << :close
@@ -64,6 +105,6 @@ class Rainbows::WriterThreadSpawn::Client < Struct.new(:to_io, :q, :thr)
   end
 
   def closed?
-    false
+    to_io.closed?
   end
 end