summary refs log tree commit homepage
path: root/lib/rainbows
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2010-10-22 16:21:03 -0700
committerEric Wong <normalperson@yhbt.net>2010-10-22 16:21:03 -0700
commit180485d49ea858f83ef2a28a9e07224aa514edc7 (patch)
treeb4c649d2118c0010bf3876a49dadfe3e4cbc3f86 /lib/rainbows
parent41145ed4d335718ac43aec9313b7571a12fe96ee (diff)
This simplifies and disambiguates most constant resolution
issues as well as lowering our identation level.  Hopefully
this makes code easier to understand.
Diffstat (limited to 'lib/rainbows')
-rw-r--r--lib/rainbows/actor_spawn.rb42
-rw-r--r--lib/rainbows/app_pool.rb182
-rw-r--r--lib/rainbows/configurator.rb76
-rw-r--r--lib/rainbows/const.rb38
-rw-r--r--lib/rainbows/error.rb72
-rw-r--r--lib/rainbows/ev_core.rb211
-rw-r--r--lib/rainbows/event_machine.rb382
-rw-r--r--lib/rainbows/fiber.rb1
-rw-r--r--lib/rainbows/http_server.rb163
-rw-r--r--lib/rainbows/max_body.rb18
-rw-r--r--lib/rainbows/queue_pool.rb41
-rw-r--r--lib/rainbows/rev/core.rb2
-rw-r--r--lib/rainbows/rev_fiber_spawn.rb45
13 files changed, 619 insertions, 654 deletions
diff --git a/lib/rainbows/actor_spawn.rb b/lib/rainbows/actor_spawn.rb
index 8cb839d..2b42b68 100644
--- a/lib/rainbows/actor_spawn.rb
+++ b/lib/rainbows/actor_spawn.rb
@@ -1,29 +1,27 @@
 # -*- encoding: binary -*-
 
 require 'actor'
-module Rainbows
 
-  # Actor concurrency model for Rubinius.  We can't seem to get message
-  # passing working right, so we're throwing a Mutex into the mix for
-  # now.  Hopefully somebody can fix things for us.  Currently, this is
-  # exactly the same as the ThreadSpawn model since we don't use the
-  # message passing capabilities of the Actor model (and even then
-  # it wouldn't really make sense since Actors in Rubinius are just
-  # Threads underneath and our ThreadSpawn model is one layer of
-  # complexity less.
-  #
-  # This is different from the Revactor one which is not prone to race
-  # conditions within the same process at all (since it uses Fibers).
-  module ActorSpawn
-    include ThreadSpawn
+# Actor concurrency model for Rubinius.  We can't seem to get message
+# passing working right, so we're throwing a Mutex into the mix for
+# now.  Hopefully somebody can fix things for us.  Currently, this is
+# exactly the same as the ThreadSpawn model since we don't use the
+# message passing capabilities of the Actor model (and even then
+# it wouldn't really make sense since Actors in Rubinius are just
+# Threads underneath and our ThreadSpawn model is one layer of
+# complexity less.
+#
+# This is different from the Revactor one which is not prone to race
+# conditions within the same process at all (since it uses Fibers).
+module Rainbows::ActorSpawn
+  include Rainbows::ThreadSpawn
 
-    # runs inside each forked worker, this sits around and waits
-    # for connections and doesn't die until the parent dies (or is
-    # given a INT, QUIT, or TERM signal)
-    def worker_loop(worker) # :nodoc:
-      Const::RACK_DEFAULTS["rack.multithread"] = true # :(
-      init_worker_process(worker)
-      accept_loop(Actor)
-    end
+  # runs inside each forked worker, this sits around and waits
+  # for connections and doesn't die until the parent dies (or is
+  # given a INT, QUIT, or TERM signal)
+  def worker_loop(worker) # :nodoc:
+    Rainbows::Const::RACK_DEFAULTS["rack.multithread"] = true # :(
+    init_worker_process(worker)
+    accept_loop(Actor)
   end
 end
diff --git a/lib/rainbows/app_pool.rb b/lib/rainbows/app_pool.rb
index 7996e2b..b406b32 100644
--- a/lib/rainbows/app_pool.rb
+++ b/lib/rainbows/app_pool.rb
@@ -2,105 +2,101 @@
 
 require 'thread'
 
-module Rainbows
+# Rack middleware to limit application-level concurrency independently
+# of network conncurrency in \Rainbows!   Since the +worker_connections+
+# option in \Rainbows! is only intended to limit the number of
+# simultaneous clients, this middleware may be used to limit the
+# number of concurrent application dispatches independently of
+# concurrent clients.
+#
+# Instead of using M:N concurrency in \Rainbows!, this middleware
+# allows M:N:P concurrency where +P+ is the AppPool +:size+ while
+# +M+ remains the number of +worker_processes+ and +N+ remains the
+# number of +worker_connections+.
+#
+#   rainbows master
+#    \_ rainbows worker[0]
+#    |  \_ client[0,0]------\      ___app[0]
+#    |  \_ client[0,1]-------\    /___app[1]
+#    |  \_ client[0,2]-------->--<       ...
+#    |  ...                __/    `---app[P]
+#    |  \_ client[0,N]----/
+#    \_ rainbows worker[1]
+#    |  \_ client[1,0]------\      ___app[0]
+#    |  \_ client[1,1]-------\    /___app[1]
+#    |  \_ client[1,2]-------->--<       ...
+#    |  ...                __/    `---app[P]
+#    |  \_ client[1,N]----/
+#    \_ rainbows worker[M]
+#       \_ client[M,0]------\      ___app[0]
+#       \_ client[M,1]-------\    /___app[1]
+#       \_ client[M,2]-------->--<       ...
+#       ...                __/    `---app[P]
+#       \_ client[M,N]----/
+#
+# AppPool should be used if you want to enforce a lower value of +P+
+# than +N+.
+#
+# AppPool has no effect on the Rev or EventMachine concurrency models
+# as those are single-threaded/single-instance as far as application
+# concurrency goes.  In other words, +P+ is always +one+ when using
+# Rev or EventMachine.  As of \Rainbows! 0.7.0, it is safe to use with
+# Revactor and the new FiberSpawn and FiberPool concurrency models.
+#
+# Since this is Rack middleware, you may load this in your Rack
+# config.ru file and even use it in threaded servers other than
+# \Rainbows!
+#
+#   use Rainbows::AppPool, :size => 30
+#   map "/lobster" do
+#     run Rack::Lobster.new
+#   end
+#
+# You may to load this earlier or later in your middleware chain
+# depending on the concurrency/copy-friendliness of your middleware(s).
+class Rainbows::AppPool < Struct.new(:pool, :re)
 
-  # Rack middleware to limit application-level concurrency independently
-  # of network conncurrency in \Rainbows!   Since the +worker_connections+
-  # option in \Rainbows! is only intended to limit the number of
-  # simultaneous clients, this middleware may be used to limit the
-  # number of concurrent application dispatches independently of
-  # concurrent clients.
-  #
-  # Instead of using M:N concurrency in \Rainbows!, this middleware
-  # allows M:N:P concurrency where +P+ is the AppPool +:size+ while
-  # +M+ remains the number of +worker_processes+ and +N+ remains the
-  # number of +worker_connections+.
-  #
-  #   rainbows master
-  #    \_ rainbows worker[0]
-  #    |  \_ client[0,0]------\      ___app[0]
-  #    |  \_ client[0,1]-------\    /___app[1]
-  #    |  \_ client[0,2]-------->--<       ...
-  #    |  ...                __/    `---app[P]
-  #    |  \_ client[0,N]----/
-  #    \_ rainbows worker[1]
-  #    |  \_ client[1,0]------\      ___app[0]
-  #    |  \_ client[1,1]-------\    /___app[1]
-  #    |  \_ client[1,2]-------->--<       ...
-  #    |  ...                __/    `---app[P]
-  #    |  \_ client[1,N]----/
-  #    \_ rainbows worker[M]
-  #       \_ client[M,0]------\      ___app[0]
-  #       \_ client[M,1]-------\    /___app[1]
-  #       \_ client[M,2]-------->--<       ...
-  #       ...                __/    `---app[P]
-  #       \_ client[M,N]----/
-  #
-  # AppPool should be used if you want to enforce a lower value of +P+
-  # than +N+.
-  #
-  # AppPool has no effect on the Rev or EventMachine concurrency models
-  # as those are single-threaded/single-instance as far as application
-  # concurrency goes.  In other words, +P+ is always +one+ when using
-  # Rev or EventMachine.  As of \Rainbows! 0.7.0, it is safe to use with
-  # Revactor and the new FiberSpawn and FiberPool concurrency models.
-  #
-  # Since this is Rack middleware, you may load this in your Rack
-  # config.ru file and even use it in threaded servers other than
-  # \Rainbows!
-  #
-  #   use Rainbows::AppPool, :size => 30
-  #   map "/lobster" do
-  #     run Rack::Lobster.new
-  #   end
-  #
-  # You may to load this earlier or later in your middleware chain
-  # depending on the concurrency/copy-friendliness of your middleware(s).
-
-  class AppPool < Struct.new(:pool, :re)
-
-    # +opt+ is a hash, +:size+ is the size of the pool (default: 6)
-    # meaning you can have up to 6 concurrent instances of +app+
-    # within one \Rainbows! worker process.  We support various
-    # methods of the +:copy+ option: +dup+, +clone+, +deep+ and +none+.
-    # Depending on your +app+, one of these options should be set.
-    # The default +:copy+ is +:dup+ as is commonly seen in existing
-    # Rack middleware.
-    def initialize(app, opt = {})
-      self.pool = Queue.new
-      (1...(opt[:size] || 6)).each do
-        pool << case (opt[:copy] || :dup)
-        when :none then app
-        when :dup then app.dup
-        when :clone then app.clone
-        when :deep then Marshal.load(Marshal.dump(app)) # unlikely...
-        else
-          raise ArgumentError, "unsupported copy method: #{opt[:copy].inspect}"
-        end
+  # +opt+ is a hash, +:size+ is the size of the pool (default: 6)
+  # meaning you can have up to 6 concurrent instances of +app+
+  # within one \Rainbows! worker process.  We support various
+  # methods of the +:copy+ option: +dup+, +clone+, +deep+ and +none+.
+  # Depending on your +app+, one of these options should be set.
+  # The default +:copy+ is +:dup+ as is commonly seen in existing
+  # Rack middleware.
+  def initialize(app, opt = {})
+    self.pool = Queue.new
+    (1...(opt[:size] || 6)).each do
+      pool << case (opt[:copy] || :dup)
+      when :none then app
+      when :dup then app.dup
+      when :clone then app.clone
+      when :deep then Marshal.load(Marshal.dump(app)) # unlikely...
+      else
+        raise ArgumentError, "unsupported copy method: #{opt[:copy].inspect}"
       end
-      pool << app # the original
     end
+    pool << app # the original
+  end
 
-    # Rack application endpoint, +env+ is the Rack environment
-    def call(env) # :nodoc:
+  # Rack application endpoint, +env+ is the Rack environment
+  def call(env) # :nodoc:
 
-      # we have to do this check at call time (and not initialize)
-      # because of preload_app=true and models being changeable with SIGHUP
-      # fortunately this is safe for all the reentrant (but not multithreaded)
-      # classes that depend on it and a safe no-op for multithreaded
-      # concurrency models
-      self.re ||= begin
-        case env["rainbows.model"]
-        when :FiberSpawn, :FiberPool, :Revactor, :NeverBlock, :RevFiberSpawn
-          self.pool = Rainbows::Fiber::Queue.new(pool)
-        end
-        true
+    # we have to do this check at call time (and not initialize)
+    # because of preload_app=true and models being changeable with SIGHUP
+    # fortunately this is safe for all the reentrant (but not multithreaded)
+    # classes that depend on it and a safe no-op for multithreaded
+    # concurrency models
+    self.re ||= begin
+      case env["rainbows.model"]
+      when :FiberSpawn, :FiberPool, :Revactor, :NeverBlock, :RevFiberSpawn
+        self.pool = Rainbows::Fiber::Queue.new(pool)
       end
-
-      app = pool.shift
-      app.call(env)
-      ensure
-        pool << app
+      true
     end
+
+    app = pool.shift
+    app.call(env)
+    ensure
+      pool << app
   end
 end
diff --git a/lib/rainbows/configurator.rb b/lib/rainbows/configurator.rb
index 7add1f8..e69a3fb 100644
--- a/lib/rainbows/configurator.rb
+++ b/lib/rainbows/configurator.rb
@@ -1,46 +1,44 @@
 # -*- encoding: binary -*-
-module Rainbows
 
-  # This module adds \Rainbows! to the
-  # {Unicorn::Configurator}[http://unicorn.bogomips.org/Unicorn/Configurator.html]
-  module Configurator
-
-    # configures \Rainbows! with a given concurrency model to +use+ and
-    # a +worker_connections+ upper-bound.  This method may be called
-    # inside a Unicorn/\Rainbows! configuration file:
-    #
-    #   Rainbows! do
-    #     use :ThreadSpawn # concurrency model to use
-    #     worker_connections 400
-    #     keepalive_timeout 0 # zero disables keepalives entirely
-    #     client_max_body_size 5*1024*1024 # 5 megabytes
-    #   end
-    #
-    #   # the rest of the Unicorn configuration
-    #   worker_processes 8
-    #
-    # See the documentation for the respective Revactor, ThreadSpawn,
-    # and ThreadPool classes for descriptions and recommendations for
-    # each of them.  The total number of clients we're able to serve is
-    # +worker_processes+ * +worker_connections+, so in the above example
-    # we can serve 8 * 400 = 3200 clients concurrently.
-    #
-    # The default is +keepalive_timeout+ is 5 seconds, which should be
-    # enough under most conditions for browsers to render the page and
-    # start retrieving extra elements for.  Increasing this beyond 5
-    # seconds is not recommended.  Zero disables keepalive entirely
-    # (but pipelining fully-formed requests is still works).
-    #
-    # The default +client_max_body_size+ is 1 megabyte (1024 * 1024 bytes),
-    # setting this to +nil+ will disable body size checks and allow any
-    # size to be specified.
-    def Rainbows!(&block)
-      block_given? or raise ArgumentError, "Rainbows! requires a block"
-      HttpServer.setup(block)
-    end
+# This module adds \Rainbows! to the
+# {Unicorn::Configurator}[http://unicorn.bogomips.org/Unicorn/Configurator.html]
+module Rainbows::Configurator
 
+  # configures \Rainbows! with a given concurrency model to +use+ and
+  # a +worker_connections+ upper-bound.  This method may be called
+  # inside a Unicorn/\Rainbows! configuration file:
+  #
+  #   Rainbows! do
+  #     use :ThreadSpawn # concurrency model to use
+  #     worker_connections 400
+  #     keepalive_timeout 0 # zero disables keepalives entirely
+  #     client_max_body_size 5*1024*1024 # 5 megabytes
+  #   end
+  #
+  #   # the rest of the Unicorn configuration
+  #   worker_processes 8
+  #
+  # See the documentation for the respective Revactor, ThreadSpawn,
+  # and ThreadPool classes for descriptions and recommendations for
+  # each of them.  The total number of clients we're able to serve is
+  # +worker_processes+ * +worker_connections+, so in the above example
+  # we can serve 8 * 400 = 3200 clients concurrently.
+  #
+  # The default is +keepalive_timeout+ is 5 seconds, which should be
+  # enough under most conditions for browsers to render the page and
+  # start retrieving extra elements for.  Increasing this beyond 5
+  # seconds is not recommended.  Zero disables keepalive entirely
+  # (but pipelining fully-formed requests is still works).
+  #
+  # The default +client_max_body_size+ is 1 megabyte (1024 * 1024 bytes),
+  # setting this to +nil+ will disable body size checks and allow any
+  # size to be specified.
+  def Rainbows!(&block)
+    block_given? or raise ArgumentError, "Rainbows! requires a block"
+    Rainbows::HttpServer.setup(block)
   end
 end
 
+# :enddoc:
 # inject the Rainbows! method into Unicorn::Configurator
-Unicorn::Configurator.class_eval { include Rainbows::Configurator }
+Unicorn::Configurator.__send__(:include, Rainbows::Configurator)
diff --git a/lib/rainbows/const.rb b/lib/rainbows/const.rb
index 1c77c76..ecdafab 100644
--- a/lib/rainbows/const.rb
+++ b/lib/rainbows/const.rb
@@ -1,30 +1,28 @@
 # -*- encoding: binary -*-
 # :enddoc:
-module Rainbows
+module Rainbows::Const
 
-  module Const
-    RAINBOWS_VERSION = '0.97.0'
+  RAINBOWS_VERSION = '0.97.0'
 
-    include Unicorn::Const
+  include Unicorn::Const
 
-    RACK_DEFAULTS = Unicorn::HttpRequest::DEFAULTS.update({
-      "SERVER_SOFTWARE" => "Rainbows! #{RAINBOWS_VERSION}",
+  RACK_DEFAULTS = Unicorn::HttpRequest::DEFAULTS.update({
+    "SERVER_SOFTWARE" => "Rainbows! #{RAINBOWS_VERSION}",
 
-      # using the Rev model, we'll automatically chunk pipe and socket objects
-      # if they're the response body.  Unset by default.
-      # "rainbows.autochunk" => false,
-    })
+    # using the Rev model, we'll automatically chunk pipe and socket objects
+    # if they're the response body.  Unset by default.
+    # "rainbows.autochunk" => false,
+  })
 
-    # client IO object that supports reading and writing directly
-    # without filtering it through the HTTP chunk parser.
-    # Maybe we can get this renamed to "rack.io" if it becomes part
-    # of the official spec, but for now it is "hack.io"
-    CLIENT_IO = "hack.io".freeze
+  # client IO object that supports reading and writing directly
+  # without filtering it through the HTTP chunk parser.
+  # Maybe we can get this renamed to "rack.io" if it becomes part
+  # of the official spec, but for now it is "hack.io"
+  CLIENT_IO = "hack.io".freeze
 
-    ERROR_413_RESPONSE = "HTTP/1.1 413 Request Entity Too Large\r\n\r\n"
-    ERROR_416_RESPONSE = "HTTP/1.1 416 Requested Range Not Satisfiable\r\n\r\n"
+  ERROR_413_RESPONSE = "HTTP/1.1 413 Request Entity Too Large\r\n\r\n"
+  ERROR_416_RESPONSE = "HTTP/1.1 416 Requested Range Not Satisfiable\r\n\r\n"
 
-    RACK_INPUT = Unicorn::HttpRequest::RACK_INPUT
-    REMOTE_ADDR = Unicorn::HttpRequest::REMOTE_ADDR
-  end
+  RACK_INPUT = Unicorn::HttpRequest::RACK_INPUT
+  REMOTE_ADDR = Unicorn::HttpRequest::REMOTE_ADDR
 end
diff --git a/lib/rainbows/error.rb b/lib/rainbows/error.rb
index 7c91050..bdbfdc5 100644
--- a/lib/rainbows/error.rb
+++ b/lib/rainbows/error.rb
@@ -1,48 +1,44 @@
 # -*- encoding: binary -*-
 # :enddoc:
-module Rainbows
+module Rainbows::Error
 
-  class Error
-    class << self
+  G = Rainbows::G
 
-      # if we get any error, try to write something back to the client
-      # assuming we haven't closed the socket, but don't get hung up
-      # if the socket is already closed or broken.  We'll always ensure
-      # the socket is closed at the end of this function
-      def write(io, e)
-        msg = Error.response(e) and io.write_nonblock(msg)
-        rescue
-      end
-
-      def app(e)
-        G.server.logger.error "app error: #{e.inspect}"
-        G.server.logger.error e.backtrace.join("\n")
-        rescue
-      end
+  # if we get any error, try to write something back to the client
+  # assuming we haven't closed the socket, but don't get hung up
+  # if the socket is already closed or broken.  We'll always ensure
+  # the socket is closed at the end of this function
+  def self.write(io, e)
+    msg = response(e) and io.write_nonblock(msg)
+    rescue
+  end
 
-      def listen_loop(e)
-        G.alive or return
-        G.server.logger.error "listen loop error: #{e.inspect}."
-        G.server.logger.error e.backtrace.join("\n")
-        rescue
-      end
+  def self.app(e)
+    G.server.logger.error "app error: #{e.inspect}"
+    G.server.logger.error e.backtrace.join("\n")
+    rescue
+  end
 
-      def response(e)
-        case e
-        when EOFError, Errno::ECONNRESET, Errno::EPIPE, Errno::EINVAL,
-             Errno::EBADF, Errno::ENOTCONN
-          # swallow error if client shuts down one end or disconnects
-        when Rainbows::Response416
-          Const::ERROR_416_RESPONSE
-        when Unicorn::HttpParserError
-          Const::ERROR_400_RESPONSE # try to tell the client they're bad
-        when IOError # HttpParserError is an IOError
-        else
-          app(e)
-          Const::ERROR_500_RESPONSE
-        end
-      end
+  def self.listen_loop(e)
+    G.alive or return
+    G.server.logger.error "listen loop error: #{e.inspect}."
+    G.server.logger.error e.backtrace.join("\n")
+    rescue
+  end
 
+  def self.response(e)
+    case e
+    when EOFError, Errno::ECONNRESET, Errno::EPIPE, Errno::EINVAL,
+         Errno::EBADF, Errno::ENOTCONN
+      # swallow error if client shuts down one end or disconnects
+    when Rainbows::Response416
+      Rainbows::Const::ERROR_416_RESPONSE
+    when Unicorn::HttpParserError
+      Rainbows::Const::ERROR_400_RESPONSE # try to tell the client they're bad
+    when IOError # HttpParserError is an IOError
+    else
+      app(e)
+      Rainbows::Const::ERROR_500_RESPONSE
     end
   end
 end
diff --git a/lib/rainbows/ev_core.rb b/lib/rainbows/ev_core.rb
index 9761144..200bf79 100644
--- a/lib/rainbows/ev_core.rb
+++ b/lib/rainbows/ev_core.rb
@@ -1,130 +1,125 @@
 # -*- encoding: binary -*-
 # :enddoc:
-module Rainbows
-
-  # base module for evented models like Rev and EventMachine
-  module EvCore
-    include Unicorn
-    include Rainbows::Const
-    include Rainbows::Response
-    G = Rainbows::G
-    NULL_IO = Unicorn::HttpRequest::NULL_IO
-
-    # Apps may return this Rack response: AsyncResponse = [ -1, {}, [] ]
-    ASYNC_CALLBACK = "async.callback".freeze
-
-    ASYNC_CLOSE = "async.close".freeze
-
-    def post_init
-      @env = {}
-      @hp = HttpParser.new
-      @state = :headers # [ :body [ :trailers ] ] :app_call :close
-      @buf = ""
-    end
+# base module for evented models like Rev and EventMachine
+module Rainbows::EvCore
+  include Rainbows::Const
+  include Rainbows::Response
+  G = Rainbows::G
+  NULL_IO = Unicorn::HttpRequest::NULL_IO
+  HttpParser = Unicorn::HttpParser
+
+  # Apps may return this Rack response: AsyncResponse = [ -1, {}, [] ]
+  ASYNC_CALLBACK = "async.callback".freeze
+
+  ASYNC_CLOSE = "async.close".freeze
+
+  def post_init
+    @env = {}
+    @hp = HttpParser.new
+    @state = :headers # [ :body [ :trailers ] ] :app_call :close
+    @buf = ""
+  end
 
-    # graceful exit, like SIGQUIT
-    def quit
-      @state = :close
-    end
+  # graceful exit, like SIGQUIT
+  def quit
+    @state = :close
+  end
 
-    def handle_error(e)
-      msg = Error.response(e) and write(msg)
-      ensure
-        quit
-    end
+  def handle_error(e)
+    msg = Rainbows::Error.response(e) and write(msg)
+    ensure
+      quit
+  end
 
-    # returns whether to enable response chunking for autochunk models
-    def stream_response_headers(status, headers)
-      if headers['Content-Length']
-        rv = false
-      else
-        rv = !!(headers['Transfer-Encoding'] =~ %r{\Achunked\z}i)
-        rv = false if headers.delete('X-Rainbows-Autochunk') == 'no'
-      end
-      write(response_header(status, headers))
-      rv
+  # returns whether to enable response chunking for autochunk models
+  def stream_response_headers(status, headers)
+    if headers['Content-Length']
+      rv = false
+    else
+      rv = !!(headers['Transfer-Encoding'] =~ %r{\Achunked\z}i)
+      rv = false if headers.delete('X-Rainbows-Autochunk') == 'no'
     end
+    write(response_header(status, headers))
+    rv
+  end
 
-    # TeeInput doesn't map too well to this right now...
-    def on_read(data)
-      case @state
-      when :headers
-        @hp.headers(@env, @buf << data) or return
-        @state = :body
-        len = @hp.content_length
-        if len == 0
-          @input = NULL_IO
-          app_call # common case
-        else # nil or len > 0
-          # since we don't do streaming input, we have no choice but
-          # to take over 100-continue handling from the Rack application
-          if @env[HTTP_EXPECT] =~ /\A100-continue\z/i
-            write(EXPECT_100_RESPONSE)
-            @env.delete(HTTP_EXPECT)
-          end
-          @input = CapInput.new(len, self)
-          @hp.filter_body(@buf2 = "", @buf)
-          @input << @buf2
-          on_read("")
-        end
-      when :body
-        if @hp.body_eof?
-          @state = :trailers
-          on_read(data)
-        elsif data.size > 0
-          @hp.filter_body(@buf2, @buf << data)
-          @input << @buf2
-          on_read("")
-        end
-      when :trailers
-        if @hp.trailers(@env, @buf << data)
-          @input.rewind
-          app_call
+  # TeeInput doesn't map too well to this right now...
+  def on_read(data)
+    case @state
+    when :headers
+      @hp.headers(@env, @buf << data) or return
+      @state = :body
+      len = @hp.content_length
+      if len == 0
+        @input = NULL_IO
+        app_call # common case
+      else # nil or len > 0
+        # since we don't do streaming input, we have no choice but
+        # to take over 100-continue handling from the Rack application
+        if @env[HTTP_EXPECT] =~ /\A100-continue\z/i
+          write(EXPECT_100_RESPONSE)
+          @env.delete(HTTP_EXPECT)
         end
+        @input = CapInput.new(len, self)
+        @hp.filter_body(@buf2 = "", @buf)
+        @input << @buf2
+        on_read("")
+      end
+    when :body
+      if @hp.body_eof?
+        @state = :trailers
+        on_read(data)
+      elsif data.size > 0
+        @hp.filter_body(@buf2, @buf << data)
+        @input << @buf2
+        on_read("")
+      end
+    when :trailers
+      if @hp.trailers(@env, @buf << data)
+        @input.rewind
+        app_call
       end
-      rescue => e
-        handle_error(e)
     end
+    rescue => e
+      handle_error(e)
+  end
 
-    class CapInput < Struct.new(:io, :client, :bytes_left)
-      MAX_BODY = Unicorn::Const::MAX_BODY
-      TmpIO = Unicorn::TmpIO
+  class CapInput < Struct.new(:io, :client, :bytes_left)
+    MAX_BODY = Unicorn::Const::MAX_BODY
+    TmpIO = Unicorn::TmpIO
 
-      def self.err(client, msg)
-        client.write(Const::ERROR_413_RESPONSE)
-        client.quit
+    def self.err(client, msg)
+      client.write(Rainbows::Const::ERROR_413_RESPONSE)
+      client.quit
 
-        # zip back up the stack
-        raise IOError, msg, []
-      end
+      # zip back up the stack
+      raise IOError, msg, []
+    end
 
-      def self.new(len, client)
-        max = Rainbows.max_bytes
-        if len
-          if max && (len > max)
-            err(client, "Content-Length too big: #{len} > #{max}")
-          end
-          len <= MAX_BODY ? StringIO.new("") : TmpIO.new
-        else
-          max ? super(TmpIO.new, client, max) : TmpIO.new
+    def self.new(len, client)
+      max = Rainbows.max_bytes
+      if len
+        if max && (len > max)
+          err(client, "Content-Length too big: #{len} > #{max}")
         end
+        len <= MAX_BODY ? StringIO.new("") : TmpIO.new
+      else
+        max ? super(TmpIO.new, client, max) : TmpIO.new
       end
+    end
 
-      def <<(buf)
-        if (self.bytes_left -= buf.size) < 0
-          io.close
-          CapInput.err(client, "chunked request body too big")
-        end
-        io << buf
+    def <<(buf)
+      if (self.bytes_left -= buf.size) < 0
+        io.close
+        CapInput.err(client, "chunked request body too big")
       end
-
-      def gets; io.gets; end
-      def each(&block); io.each(&block); end
-      def size; io.size; end
-      def rewind; io.rewind; end
-      def read(*args); io.read(*args); end
-
+      io << buf
     end
 
+    def gets; io.gets; end
+    def each(&block); io.each(&block); end
+    def size; io.size; end
+    def rewind; io.rewind; end
+    def read(*args); io.read(*args); end
   end
 end
diff --git a/lib/rainbows/event_machine.rb b/lib/rainbows/event_machine.rb
index 96d9a9e..2f363a1 100644
--- a/lib/rainbows/event_machine.rb
+++ b/lib/rainbows/event_machine.rb
@@ -3,222 +3,216 @@ require 'eventmachine'
 EM::VERSION >= '0.12.10' or abort 'eventmachine 0.12.10 is required'
 require 'rainbows/ev_core'
 
-module Rainbows
-
-  # Implements a basic single-threaded event model with
-  # {EventMachine}[http://rubyeventmachine.com/].  It is capable of
-  # handling thousands of simultaneous client connections, but with only
-  # a single-threaded app dispatch.  It is suited for slow clients,
-  # and can work with slow applications via asynchronous libraries such as
-  # {async_sinatra}[http://github.com/raggi/async_sinatra],
-  # {Cramp}[http://m.onkey.org/2010/1/7/introducing-cramp],
-  # and {rack-fiber_pool}[http://github.com/mperham/rack-fiber_pool].
-  #
-  # It does not require your Rack application to be thread-safe,
-  # reentrancy is only required for the DevFdResponse body
-  # generator.
-  #
-  # Compatibility: Whatever \EventMachine ~> 0.12.10 and Unicorn both
-  # support, currently Ruby 1.8/1.9.
-  #
-  # This model is compatible with users of "async.callback" in the Rack
-  # environment such as
-  # {async_sinatra}[http://github.com/raggi/async_sinatra].
-  #
-  # For a complete asynchronous framework,
-  # {Cramp}[http://m.onkey.org/2010/1/7/introducing-cramp] is fully
-  # supported when using this concurrency model.
-  #
-  # This model is fully-compatible with
-  # {rack-fiber_pool}[http://github.com/mperham/rack-fiber_pool]
-  # which allows each request to run inside its own \Fiber after
-  # all request processing is complete.
-  #
-  # Merb (and other frameworks/apps) supporting +deferred?+ execution as
-  # documented at http://brainspl.at/articles/2008/04/18/deferred-requests-with-merb-ebb-and-thin
-  # will also get the ability to conditionally defer request processing
-  # to a separate thread.
-  #
-  # This model does not implement as streaming "rack.input" which allows
-  # the Rack application to process data as it arrives.  This means
-  # "rack.input" will be fully buffered in memory or to a temporary file
-  # before the application is entered.
-
-  module EventMachine
-
-    include Base
-    autoload :ResponsePipe, 'rainbows/event_machine/response_pipe'
-    autoload :ResponseChunkPipe, 'rainbows/event_machine/response_chunk_pipe'
-    autoload :TryDefer, 'rainbows/event_machine/try_defer'
-
-    class Client < EM::Connection # :nodoc: all
-      attr_writer :body
-      include Rainbows::EvCore
-      G = Rainbows::G
-
-      def initialize(io)
-        @_io = io
-        @body = nil
+# Implements a basic single-threaded event model with
+# {EventMachine}[http://rubyeventmachine.com/].  It is capable of
+# handling thousands of simultaneous client connections, but with only
+# a single-threaded app dispatch.  It is suited for slow clients,
+# and can work with slow applications via asynchronous libraries such as
+# {async_sinatra}[http://github.com/raggi/async_sinatra],
+# {Cramp}[http://m.onkey.org/2010/1/7/introducing-cramp],
+# and {rack-fiber_pool}[http://github.com/mperham/rack-fiber_pool].
+#
+# It does not require your Rack application to be thread-safe,
+# reentrancy is only required for the DevFdResponse body
+# generator.
+#
+# Compatibility: Whatever \EventMachine ~> 0.12.10 and Unicorn both
+# support, currently Ruby 1.8/1.9.
+#
+# This model is compatible with users of "async.callback" in the Rack
+# environment such as
+# {async_sinatra}[http://github.com/raggi/async_sinatra].
+#
+# For a complete asynchronous framework,
+# {Cramp}[http://m.onkey.org/2010/1/7/introducing-cramp] is fully
+# supported when using this concurrency model.
+#
+# This model is fully-compatible with
+# {rack-fiber_pool}[http://github.com/mperham/rack-fiber_pool]
+# which allows each request to run inside its own \Fiber after
+# all request processing is complete.
+#
+# Merb (and other frameworks/apps) supporting +deferred?+ execution as
+# documented at http://brainspl.at/articles/2008/04/18/deferred-requests-with-merb-ebb-and-thin
+# will also get the ability to conditionally defer request processing
+# to a separate thread.
+#
+# This model does not implement as streaming "rack.input" which allows
+# the Rack application to process data as it arrives.  This means
+# "rack.input" will be fully buffered in memory or to a temporary file
+# before the application is entered.
+module Rainbows::EventMachine
+
+  include Rainbows::Base
+  autoload :ResponsePipe, 'rainbows/event_machine/response_pipe'
+  autoload :ResponseChunkPipe, 'rainbows/event_machine/response_chunk_pipe'
+  autoload :TryDefer, 'rainbows/event_machine/try_defer'
+
+  class Client < EM::Connection # :nodoc: all
+    attr_writer :body
+    include Rainbows::EvCore
+
+    def initialize(io)
+      @_io = io
+      @body = nil
+    end
+
+    alias write send_data
+
+    def receive_data(data)
+      # To avoid clobbering the current streaming response
+      # (often a static file), we do not attempt to process another
+      # request on the same connection until the first is complete
+      if @body
+        @buf << data
+        @_io.shutdown(Socket::SHUT_RD) if @buf.size > 0x1c000
+        EM.next_tick { receive_data('') }
+      else
+        on_read(data)
       end
+    end
 
-      alias write send_data
+    def quit
+      super
+      close_connection_after_writing
+    end
 
-      def receive_data(data)
-        # To avoid clobbering the current streaming response
-        # (often a static file), we do not attempt to process another
-        # request on the same connection until the first is complete
-        if @body
-          @buf << data
-          @_io.shutdown(Socket::SHUT_RD) if @buf.size > 0x1c000
-          EM.next_tick { receive_data('') }
+    def app_call
+      set_comm_inactivity_timeout 0
+      @env[RACK_INPUT] = @input
+      @env[REMOTE_ADDR] = @_io.kgio_addr
+      @env[ASYNC_CALLBACK] = method(:em_write_response)
+      @env[ASYNC_CLOSE] = EM::DefaultDeferrable.new
+
+      response = catch(:async) { APP.call(@env.update(RACK_DEFAULTS)) }
+
+      # too tricky to support pipelining with :async since the
+      # second (pipelined) request could be a stuck behind a
+      # long-running async response
+      (response.nil? || -1 == response[0]) and return @state = :close
+
+      alive = @hp.keepalive? && G.alive && G.kato > 0
+      em_write_response(response, alive)
+      if alive
+        @env.clear
+        @hp.reset
+        @state = :headers
+        if @buf.empty?
+          set_comm_inactivity_timeout(G.kato)
         else
-          on_read(data)
+          EM.next_tick { receive_data('') }
         end
       end
+    end
 
-      def quit
-        super
-        close_connection_after_writing
+    def em_write_response(response, alive = false)
+      status, headers, body = response
+      if @hp.headers?
+        headers = HH.new(headers)
+        headers[CONNECTION] = alive ? KEEP_ALIVE : CLOSE
+      else
+        headers = nil
       end
 
-      def app_call
-        set_comm_inactivity_timeout 0
-        @env[RACK_INPUT] = @input
-        @env[REMOTE_ADDR] = @_io.kgio_addr
-        @env[ASYNC_CALLBACK] = method(:em_write_response)
-        @env[ASYNC_CLOSE] = EM::DefaultDeferrable.new
-
-        response = catch(:async) { APP.call(@env.update(RACK_DEFAULTS)) }
-
-        # too tricky to support pipelining with :async since the
-        # second (pipelined) request could be a stuck behind a
-        # long-running async response
-        (response.nil? || -1 == response[0]) and return @state = :close
-
-        alive = @hp.keepalive? && G.alive && G.kato > 0
-        em_write_response(response, alive)
-        if alive
-          @env.clear
-          @hp.reset
-          @state = :headers
-          if @buf.empty?
-            set_comm_inactivity_timeout(G.kato)
-          else
-            EM.next_tick { receive_data('') }
+      if body.respond_to?(:errback) && body.respond_to?(:callback)
+        @body = body
+        body.callback { quit }
+        body.errback { quit }
+        # async response, this could be a trickle as is in comet-style apps
+        headers[CONNECTION] = CLOSE if headers
+        alive = true
+      elsif body.respond_to?(:to_path)
+        st = File.stat(path = body.to_path)
+
+        if st.file?
+          write(response_header(status, headers)) if headers
+          @body = stream_file_data(path)
+          @body.errback do
+            body.close if body.respond_to?(:close)
+            quit
           end
-        end
-      end
-
-      def em_write_response(response, alive = false)
-        status, headers, body = response
-        if @hp.headers?
-          headers = HH.new(headers)
-          headers[CONNECTION] = alive ? KEEP_ALIVE : CLOSE
-        else
-          headers = nil
-        end
-
-        if body.respond_to?(:errback) && body.respond_to?(:callback)
-          @body = body
-          body.callback { quit }
-          body.errback { quit }
-          # async response, this could be a trickle as is in comet-style apps
-          headers[CONNECTION] = CLOSE if headers
-          alive = true
-        elsif body.respond_to?(:to_path)
-          st = File.stat(path = body.to_path)
-
-          if st.file?
-            write(response_header(status, headers)) if headers
-            @body = stream_file_data(path)
-            @body.errback do
-              body.close if body.respond_to?(:close)
-              quit
-            end
-            @body.callback do
-              body.close if body.respond_to?(:close)
-              @body = nil
-              alive ? receive_data('') : quit
-            end
-            return
-          elsif st.socket? || st.pipe?
-            @body = io = body_to_io(body)
-            chunk = stream_response_headers(status, headers) if headers
-            m = chunk ? ResponseChunkPipe : ResponsePipe
-            return EM.watch(io, m, self, alive, body).notify_readable = true
+          @body.callback do
+            body.close if body.respond_to?(:close)
+            @body = nil
+            alive ? receive_data('') : quit
           end
-          # char or block device... WTF? fall through to body.each
+          return
+        elsif st.socket? || st.pipe?
+          @body = io = body_to_io(body)
+          chunk = stream_response_headers(status, headers) if headers
+          m = chunk ? ResponseChunkPipe : ResponsePipe
+          return EM.watch(io, m, self, alive, body).notify_readable = true
         end
-
-        write(response_header(status, headers)) if headers
-        write_body_each(self, body)
-        quit unless alive
+        # char or block device... WTF? fall through to body.each
       end
 
-      def unbind
-        async_close = @env[ASYNC_CLOSE] and async_close.succeed
-        @body.respond_to?(:fail) and @body.fail
-        begin
-          @_io.close
-        rescue Errno::EBADF
-          # EventMachine's EventableDescriptor::Close() may close
-          # the underlying file descriptor without invalidating the
-          # associated IO object on errors, so @_io.closed? isn't
-          # sufficient.
-        end
-      end
+      write(response_header(status, headers)) if headers
+      write_body_each(self, body)
+      quit unless alive
     end
 
-    module Server # :nodoc: all
-      def close
-        detach
-        @io.close
-      end
-
-      def notify_readable
-        return if CUR.size >= MAX
-        io = @io.kgio_tryaccept or return
-        sig = EM.attach_fd(io.fileno, false)
-        CUR[sig] = CL.new(sig, io)
+    def unbind
+      async_close = @env[ASYNC_CLOSE] and async_close.succeed
+      @body.respond_to?(:fail) and @body.fail
+      begin
+        @_io.close
+      rescue Errno::EBADF
+        # EventMachine's EventableDescriptor::Close() may close
+        # the underlying file descriptor without invalidating the
+        # associated IO object on errors, so @_io.closed? isn't
+        # sufficient.
       end
     end
+  end
 
-    def init_worker_process(worker) # :nodoc:
-      Rainbows::Response.setup(Rainbows::EventMachine::Client)
-      super
+  module Server # :nodoc: all
+    def close
+      detach
+      @io.close
     end
 
-    # runs inside each forked worker, this sits around and waits
-    # for connections and doesn't die until the parent dies (or is
-    # given a INT, QUIT, or TERM signal)
-    def worker_loop(worker) # :nodoc:
-      init_worker_process(worker)
-      G.server.app.respond_to?(:deferred?) and
-        G.server.app = TryDefer[G.server.app]
-
-      # enable them both, should be non-fatal if not supported
-      EM.epoll
-      EM.kqueue
-      logger.info "#@use: epoll=#{EM.epoll?} kqueue=#{EM.kqueue?}"
-      client_class = Rainbows.const_get(@use).const_get(:Client)
-      Server.const_set(:MAX, worker_connections + LISTENERS.size)
-      Server.const_set(:CL, client_class)
-      client_class.const_set(:APP, G.server.app)
-      EM.run {
-        conns = EM.instance_variable_get(:@conns) or
-          raise RuntimeError, "EM @conns instance variable not accessible!"
-        Server.const_set(:CUR, conns)
-        EM.add_periodic_timer(1) do
-          unless G.tick
-            conns.each_value { |c| client_class === c and c.quit }
-            EM.stop if conns.empty? && EM.reactor_running?
-          end
-        end
-        LISTENERS.map! do |s|
-          EM.watch(s, Server) { |c| c.notify_readable = true }
-        end
-      }
+    def notify_readable
+      return if CUR.size >= MAX
+      io = @io.kgio_tryaccept or return
+      sig = EM.attach_fd(io.fileno, false)
+      CUR[sig] = CL.new(sig, io)
     end
+  end
+
+  def init_worker_process(worker) # :nodoc:
+    Rainbows::Response.setup(Rainbows::EventMachine::Client)
+    super
+  end
 
+  # runs inside each forked worker, this sits around and waits
+  # for connections and doesn't die until the parent dies (or is
+  # given a INT, QUIT, or TERM signal)
+  def worker_loop(worker) # :nodoc:
+    init_worker_process(worker)
+    G.server.app.respond_to?(:deferred?) and
+      G.server.app = TryDefer[G.server.app]
+
+    # enable them both, should be non-fatal if not supported
+    EM.epoll
+    EM.kqueue
+    logger.info "#@use: epoll=#{EM.epoll?} kqueue=#{EM.kqueue?}"
+    client_class = Rainbows.const_get(@use).const_get(:Client)
+    Server.const_set(:MAX, worker_connections + LISTENERS.size)
+    Server.const_set(:CL, client_class)
+    client_class.const_set(:APP, G.server.app)
+    EM.run {
+      conns = EM.instance_variable_get(:@conns) or
+        raise RuntimeError, "EM @conns instance variable not accessible!"
+      Server.const_set(:CUR, conns)
+      EM.add_periodic_timer(1) do
+        unless G.tick
+          conns.each_value { |c| client_class === c and c.quit }
+          EM.stop if conns.empty? && EM.reactor_running?
+        end
+      end
+      LISTENERS.map! do |s|
+        EM.watch(s, Server) { |c| c.notify_readable = true }
+      end
+    }
   end
 end
diff --git a/lib/rainbows/fiber.rb b/lib/rainbows/fiber.rb
index 94f7d94..b516fb9 100644
--- a/lib/rainbows/fiber.rb
+++ b/lib/rainbows/fiber.rb
@@ -30,4 +30,5 @@ module Rainbows::Fiber
 
   autoload :Base, 'rainbows/fiber/base'
   autoload :Queue, 'rainbows/fiber/queue'
+  autoload :IO, 'rainbows/fiber/io'
 end
diff --git a/lib/rainbows/http_server.rb b/lib/rainbows/http_server.rb
index 0ed6717..3826a23 100644
--- a/lib/rainbows/http_server.rb
+++ b/lib/rainbows/http_server.rb
@@ -1,100 +1,97 @@
 # -*- encoding: binary -*-
 # :enddoc:
-module Rainbows
 
-  class HttpServer < ::Unicorn::HttpServer
-    include Rainbows
+class Rainbows::HttpServer < Unicorn::HttpServer
+  G = Rainbows::G
 
-    class << self
-      def setup(block)
-        G.server.instance_eval(&block)
-      end
-    end
-
-    def initialize(app, options)
-      G.server = self
-      @logger = Unicorn::Configurator::DEFAULTS[:logger]
-      rv = super(app, options)
-      defined?(@use) or use(:Base)
-      @worker_connections ||= MODEL_WORKER_CONNECTIONS[@use]
-    end
+  def self.setup(block)
+    G.server.instance_eval(&block)
+  end
 
-    def reopen_worker_logs(worker_nr)
-      logger.info "worker=#{worker_nr} reopening logs..."
-      Unicorn::Util.reopen_logs
-      logger.info "worker=#{worker_nr} done reopening logs"
-      rescue
-        G.quit! # let the master reopen and refork us
-    end
+  def initialize(app, options)
+    G.server = self
+    @logger = Unicorn::Configurator::DEFAULTS[:logger]
+    rv = super(app, options)
+    defined?(@use) or use(:Base)
+    @worker_connections ||= Rainbows::MODEL_WORKER_CONNECTIONS[@use]
+  end
 
-    #:stopdoc:
-    #
-    # Add one second to the timeout since our fchmod heartbeat is less
-    # precise (and must be more conservative) than Unicorn does.  We
-    # handle many clients per process and can't chmod on every
-    # connection we accept without wasting cycles.  That added to the
-    # fact that we let clients keep idle connections open for long
-    # periods of time means we have to chmod at a fixed interval.
-    def timeout=(nr)
-      @timeout = nr + 1
-    end
-    #:startdoc:
+  def reopen_worker_logs(worker_nr)
+    logger.info "worker=#{worker_nr} reopening logs..."
+    Unicorn::Util.reopen_logs
+    logger.info "worker=#{worker_nr} done reopening logs"
+    rescue
+      G.quit! # let the master reopen and refork us
+  end
 
-    def use(*args)
-      model = args.shift or return @use
-      mod = begin
-        Rainbows.const_get(model)
-      rescue NameError => e
-        logger.error "error loading #{model.inspect}: #{e}"
-        e.backtrace.each { |l| logger.error l }
-        raise ArgumentError, "concurrency model #{model.inspect} not supported"
-      end
+  #:stopdoc:
+  #
+  # Add one second to the timeout since our fchmod heartbeat is less
+  # precise (and must be more conservative) than Unicorn does.  We
+  # handle many clients per process and can't chmod on every
+  # connection we accept without wasting cycles.  That added to the
+  # fact that we let clients keep idle connections open for long
+  # periods of time means we have to chmod at a fixed interval.
+  def timeout=(nr)
+    @timeout = nr + 1
+  end
+  #:startdoc:
 
-      Module === mod or
-        raise ArgumentError, "concurrency model #{model.inspect} not supported"
-      extend(mod)
-      args.each do |opt|
-        case opt
-        when Hash; O.update(opt)
-        when Symbol; O[opt] = true
-        else; raise ArgumentError, "can't handle option: #{opt.inspect}"
-        end
-      end
-      mod.setup if mod.respond_to?(:setup)
-      Const::RACK_DEFAULTS['rainbows.model'] = @use = model.to_sym
-      Const::RACK_DEFAULTS['rack.multithread'] = !!(model.to_s =~ /Thread/)
+  def use(*args)
+    model = args.shift or return @use
+    mod = begin
+      Rainbows.const_get(model)
+    rescue NameError => e
+      logger.error "error loading #{model.inspect}: #{e}"
+      e.backtrace.each { |l| logger.error l }
+      raise ArgumentError, "concurrency model #{model.inspect} not supported"
+    end
 
-      case @use
-      when :Rev, :EventMachine, :NeverBlock
-        Const::RACK_DEFAULTS['rainbows.autochunk'] = true
+    Module === mod or
+      raise ArgumentError, "concurrency model #{model.inspect} not supported"
+    extend(mod)
+    args.each do |opt|
+      case opt
+      when Hash; O.update(opt)
+      when Symbol; O[opt] = true
+      else; raise ArgumentError, "can't handle option: #{opt.inspect}"
       end
     end
-
-    def worker_connections(*args)
-      return @worker_connections if args.empty?
-      nr = args[0]
-      (Integer === nr && nr > 0) or
-        raise ArgumentError, "worker_connections must be a positive Integer"
-      @worker_connections = nr
+    mod.setup if mod.respond_to?(:setup)
+    new_defaults = {
+      'rainbows.model' => (@use = model.to_sym),
+      'rack.multithread' => !!(model.to_s =~ /Thread/),
+    }
+    case @use
+    when :Rev, :EventMachine, :NeverBlock
+      new_defaults['rainbows.autochunk'] = true
     end
+    Rainbows::Const::RACK_DEFAULTS.update(new_defaults)
+  end
 
-    def keepalive_timeout(nr)
-      (Integer === nr && nr >= 0) or
-        raise ArgumentError, "keepalive must be a non-negative Integer"
-      G.kato = nr
-    end
+  def worker_connections(*args)
+    return @worker_connections if args.empty?
+    nr = args[0]
+    (Integer === nr && nr > 0) or
+      raise ArgumentError, "worker_connections must be a positive Integer"
+    @worker_connections = nr
+  end
 
-    def client_max_body_size(nr)
-      err = "client_max_body_size must be nil or a non-negative Integer"
-      case nr
-      when nil
-      when Integer
-        nr >= 0 or raise ArgumentError, err
-      else
-        raise ArgumentError, err
-      end
-      Rainbows.max_bytes = nr
-    end
+  def keepalive_timeout(nr)
+    (Integer === nr && nr >= 0) or
+      raise ArgumentError, "keepalive must be a non-negative Integer"
+    G.kato = nr
   end
 
+  def client_max_body_size(nr)
+    err = "client_max_body_size must be nil or a non-negative Integer"
+    case nr
+    when nil
+    when Integer
+      nr >= 0 or raise ArgumentError, err
+    else
+      raise ArgumentError, err
+    end
+    Rainbows.max_bytes = nr
+  end
 end
diff --git a/lib/rainbows/max_body.rb b/lib/rainbows/max_body.rb
index d825d2f..223c57e 100644
--- a/lib/rainbows/max_body.rb
+++ b/lib/rainbows/max_body.rb
@@ -1,17 +1,17 @@
 # -*- encoding: binary -*-
 # :enddoc:
-module Rainbows
 
 # middleware used to enforce client_max_body_size for TeeInput users,
 # there is no need to configure this middleware manually, it will
 # automatically be configured for you based on the client_max_body_size
 # setting
-class MaxBody < Struct.new(:app)
+class Rainbows::MaxBody < Struct.new(:app)
 
   # this is meant to be included in Rainbows::TeeInput (and derived
   # classes) to limit body sizes
   module Limit
     TmpIO = Unicorn::TmpIO
+    MAX_BODY = Rainbows::Const::MAX_BODY
 
     def initialize(socket, request)
       @parser = request
@@ -21,7 +21,7 @@ class MaxBody < Struct.new(:app)
 
       max = Rainbows.max_bytes # never nil, see MaxBody.setup
       if @len && @len > max
-        socket.write(Const::ERROR_413_RESPONSE)
+        socket.write(Rainbows::Const::ERROR_413_RESPONSE)
         socket.close
         raise IOError, "Content-Length too big: #@len > #{max}", []
       end
@@ -32,7 +32,7 @@ class MaxBody < Struct.new(:app)
         parser.filter_body(@buf2, @buf) and finalize_input
         @buf2.size > max and raise IOError, "chunked request body too big", []
       end
-      @tmp = @len && @len < Const::MAX_BODY ? StringIO.new("") : TmpIO.new
+      @tmp = @len && @len < MAX_BODY ? StringIO.new("") : TmpIO.new
       if @buf2.size > 0
         @tmp.write(@buf2)
         @tmp.rewind
@@ -58,15 +58,15 @@ class MaxBody < Struct.new(:app)
   # if it's reconfigured
   def self.setup
     Rainbows.max_bytes or return
-    case G.server.use
+    case Rainbows::G.server.use
     when :Rev, :EventMachine, :NeverBlock
       return
     end
 
-    TeeInput.class_eval { include Limit }
+    Rainbows::TeeInput.__send__(:include, Limit)
 
     # force ourselves to the outermost middleware layer
-    G.server.app = MaxBody.new(G.server.app)
+    Rainbows::G.server.app = self.new(Rainbows::G.server.app)
   end
 
   # Rack response returned when there's an error
@@ -78,6 +78,4 @@ class MaxBody < Struct.new(:app)
   def call(env)
     catch(:rainbows_EFBIG) { app.call(env) } || err(env)
   end
-
-end # class
-end # module
+end
diff --git a/lib/rainbows/queue_pool.rb b/lib/rainbows/queue_pool.rb
index 3ae899c..4a2ab8c 100644
--- a/lib/rainbows/queue_pool.rb
+++ b/lib/rainbows/queue_pool.rb
@@ -2,32 +2,29 @@
 # :enddoc:
 require 'thread'
 
-module Rainbows
+# Thread pool class based on pulling off a single Ruby Queue.
+# This is NOT used for the ThreadPool class, since that class does not
+# need a userspace Queue.
+class Rainbows::QueuePool < Struct.new(:queue, :threads)
+  G = Rainbows::G
 
-  # Thread pool class based on pulling off a single Ruby Queue.
-  # This is NOT used for the ThreadPool class, since that class does not
-  # need a userspace Queue.
-  class QueuePool < Struct.new(:queue, :threads)
-    G = Rainbows::G
-
-    def initialize(size = 20, &block)
-      q = Queue.new
-      self.threads = (1..size).map do
-        Thread.new do
-          while job = q.shift
-            block.call(job)
-          end
+  def initialize(size = 20, &block)
+    q = Queue.new
+    self.threads = (1..size).map do
+      Thread.new do
+        while job = q.shift
+          block.call(job)
         end
       end
-      self.queue = q
     end
+    self.queue = q
+  end
 
-    def quit!
-      threads.each { |_| queue << nil }
-      threads.delete_if do |t|
-        G.tick
-        t.alive? ? t.join(0.01) : true
-      end until threads.empty?
-    end
+  def quit!
+    threads.each { |_| queue << nil }
+    threads.delete_if do |t|
+      G.tick
+      t.alive? ? t.join(0.01) : true
+    end until threads.empty?
   end
 end
diff --git a/lib/rainbows/rev/core.rb b/lib/rainbows/rev/core.rb
index 2273b24..abd7ca7 100644
--- a/lib/rainbows/rev/core.rb
+++ b/lib/rainbows/rev/core.rb
@@ -26,7 +26,7 @@ module Rainbows
         require 'rainbows/rev/sendfile'
         Rainbows::Rev::Client.__send__(:include, Rainbows::Rev::Sendfile)
         init_worker_process(worker)
-        mod = self.class.const_get(@use)
+        mod = Rainbows.const_get(@use)
         rloop = Server.const_set(:LOOP, ::Rev::Loop.default)
         Server.const_set(:MAX, @worker_connections)
         Server.const_set(:CL, mod.const_get(:Client))
diff --git a/lib/rainbows/rev_fiber_spawn.rb b/lib/rainbows/rev_fiber_spawn.rb
index 522ae71..39483b3 100644
--- a/lib/rainbows/rev_fiber_spawn.rb
+++ b/lib/rainbows/rev_fiber_spawn.rb
@@ -1,31 +1,28 @@
 # -*- encoding: binary -*-
 require 'rainbows/fiber/rev'
 
-module Rainbows
+# A combination of the Rev and FiberSpawn models.  This allows Ruby
+# 1.9 Fiber-based concurrency for application processing while
+# exposing a synchronous execution model and using scalable network
+# concurrency provided by Rev.  A "rack.input" is exposed as well
+# being Sunshowers-compatible.  Applications are strongly advised to
+# wrap all slow IO objects (sockets, pipes) using the
+# Rainbows::Fiber::IO or a Rev-compatible class whenever possible.
+module Rainbows::RevFiberSpawn
 
-  # A combination of the Rev and FiberSpawn models.  This allows Ruby
-  # 1.9 Fiber-based concurrency for application processing while
-  # exposing a synchronous execution model and using scalable network
-  # concurrency provided by Rev.  A "rack.input" is exposed as well
-  # being Sunshowers-compatible.  Applications are strongly advised to
-  # wrap all slow IO objects (sockets, pipes) using the
-  # Rainbows::Fiber::IO or a Rev-compatible class whenever possible.
-  module RevFiberSpawn
+  include Rainbows::Base
+  include Rainbows::Fiber::Rev
 
-    include Base
-    include Fiber::Rev
-
-    def worker_loop(worker) # :nodoc:
-      Rainbows::Response.setup(Rainbows::Fiber::Rev::Server)
-      init_worker_process(worker)
-      Server.const_set(:MAX, @worker_connections)
-      Rainbows::Fiber::Base.setup(Rainbows::Fiber::Rev::Server, nil)
-      Server.const_set(:APP, G.server.app)
-      Heartbeat.new(1, true).attach(::Rev::Loop.default)
-      kato = Kato.new.attach(::Rev::Loop.default)
-      Rainbows::Fiber::Rev::Methods.const_set(:KATO, kato)
-      LISTENERS.map! { |s| Server.new(s).attach(::Rev::Loop.default) }
-      ::Rev::Loop.default.run
-    end
+  def worker_loop(worker) # :nodoc:
+    Rainbows::Response.setup(Server)
+    init_worker_process(worker)
+    Server.const_set(:MAX, @worker_connections)
+    Rainbows::Fiber::Base.setup(Server, nil)
+    Server.const_set(:APP, G.server.app)
+    Heartbeat.new(1, true).attach(Rev::Loop.default)
+    kato = Kato.new.attach(Rev::Loop.default)
+    Rainbows::Fiber::Rev::Methods.const_set(:KATO, kato)
+    LISTENERS.map! { |s| Server.new(s).attach(Rev::Loop.default) }
+    Rev::Loop.default.run
   end
 end