about summary refs log tree commit homepage
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/yahns.rb66
-rw-r--r--lib/yahns/acceptor.rb28
-rw-r--r--lib/yahns/client_expire.rb40
-rw-r--r--lib/yahns/client_expire_portable.rb39
-rw-r--r--lib/yahns/config.rb341
-rw-r--r--lib/yahns/daemon.rb51
-rw-r--r--lib/yahns/fdmap.rb90
-rw-r--r--lib/yahns/http_client.rb196
-rw-r--r--lib/yahns/http_context.rb66
-rw-r--r--lib/yahns/http_response.rb183
-rw-r--r--lib/yahns/log.rb73
-rw-r--r--lib/yahns/queue.rb7
-rw-r--r--lib/yahns/queue_egg.rb23
-rw-r--r--lib/yahns/queue_epoll.rb57
-rw-r--r--lib/yahns/rack.rb80
-rw-r--r--lib/yahns/server.rb328
-rw-r--r--lib/yahns/server_mp.rb184
-rw-r--r--lib/yahns/sigevent.rb7
-rw-r--r--lib/yahns/sigevent_efd.rb18
-rw-r--r--lib/yahns/sigevent_pipe.rb29
-rw-r--r--lib/yahns/socket_helper.rb117
-rw-r--r--lib/yahns/stream_file.rb34
-rw-r--r--lib/yahns/stream_input.rb150
-rw-r--r--lib/yahns/tee_input.rb114
-rw-r--r--lib/yahns/tiny_input.rb7
-rw-r--r--lib/yahns/tmpio.rb27
-rw-r--r--lib/yahns/wbuf.rb36
-rw-r--r--lib/yahns/wbuf_common.rb32
-rw-r--r--lib/yahns/worker.rb58
29 files changed, 2481 insertions, 0 deletions
diff --git a/lib/yahns.rb b/lib/yahns.rb
new file mode 100644
index 0000000..373fca0
--- /dev/null
+++ b/lib/yahns.rb
@@ -0,0 +1,66 @@
+# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> and all contributors
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+require 'unicorn' # pulls in raindrops, kgio, fcntl, etc, stringio, and logger
+require 'sleepy_penguin'
+
+# yahns exposes no user-visible API outside of the config file
+# Internals are subject to change.
+module Yahns # :nodoc:
+  # We populate this at startup so we can figure out how to reexecute
+  # and upgrade the currently running instance of yahns
+  # Unlike unicorn, this Hash is NOT a stable/public interface.
+  #
+  # * 0 - the path to the yahns executable
+  # * :argv - a deep copy of the ARGV array the executable originally saw
+  # * :cwd - the working directory of the application, this is where
+  # you originally started yahns.
+  #
+  # To change your yahns executable to a different path without downtime,
+  # you can set the following in your yahns config file, HUP and then
+  # continue with the traditional USR2 + QUIT upgrade steps:
+  #
+  #   Yahns::START[0] = "/home/bofh/2.0.0/bin/yahns"
+  START = {
+    :argv => ARGV.map { |arg| arg.dup },
+    0 => $0.dup,
+  }
+
+  # We favor ENV['PWD'] since it is (usually) symlink aware for Capistrano
+  # and like systems
+  START[:cwd] = begin
+    a = File.stat(pwd = ENV['PWD'])
+    b = File.stat(Dir.pwd)
+    a.ino == b.ino && a.dev == b.dev ? pwd : Dir.pwd
+  rescue
+    Dir.pwd
+  end
+
+  # Raised inside TeeInput when a client closes the socket inside the
+  # application dispatch.  This is always raised with an empty backtrace
+  # since there is nothing in the application stack that is responsible
+  # for client shutdowns/disconnects.
+  class ClientShutdown < EOFError # :nodoc:
+  end
+end
+
+# FIXME: require lazily
+require_relative 'yahns/log'
+require_relative 'yahns/queue_epoll'
+require_relative 'yahns/stream_input'
+require_relative 'yahns/tee_input'
+require_relative 'yahns/queue_egg'
+require_relative 'yahns/client_expire'
+require_relative 'yahns/http_response'
+require_relative 'yahns/http_client'
+require_relative 'yahns/http_context'
+require_relative 'yahns/queue'
+require_relative 'yahns/config'
+require_relative 'yahns/tmpio'
+require_relative 'yahns/worker'
+require_relative 'yahns/sigevent'
+require_relative 'yahns/daemon'
+require_relative 'yahns/socket_helper'
+require_relative 'yahns/server'
+require_relative 'yahns/fdmap'
+require_relative 'yahns/acceptor'
+require_relative 'yahns/wbuf'
diff --git a/lib/yahns/acceptor.rb b/lib/yahns/acceptor.rb
new file mode 100644
index 0000000..b5e7b0e
--- /dev/null
+++ b/lib/yahns/acceptor.rb
@@ -0,0 +1,28 @@
+# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> et. al.
+# License: GPLv3 or later (see COPYING for details)
+module Yahns::Acceptor # :nodoc:
+  def spawn_acceptor(logger, client_class, queue)
+    accept_flags = Kgio::SOCK_NONBLOCK | Kgio::SOCK_CLOEXEC
+    Thread.new do
+      Thread.current.abort_on_exception = true
+      qev_flags = client_class.superclass::QEV_FLAGS
+      begin
+        # We want the accept/accept4 syscall to be _blocking_
+        # so it can distribute work evenly between processes
+        if client = kgio_accept(client_class, accept_flags)
+          client.yahns_init
+
+          # it is not safe to touch client in this thread after this,
+          # a worker thread may grab client right away
+          queue.queue_add(client, qev_flags)
+        end
+      rescue Errno::EMFILE, Errno::ENFILE => e
+        logger.error("#{e.message}, consider raising open file limits")
+        queue.fdmap.desperate_expire_for(self, 5)
+        sleep 1 # let other threads do some work
+      rescue => e
+        Yahns::Log.exception(logger, "accept loop error", e) unless closed?
+      end until closed?
+    end
+  end
+end
diff --git a/lib/yahns/client_expire.rb b/lib/yahns/client_expire.rb
new file mode 100644
index 0000000..7da9498
--- /dev/null
+++ b/lib/yahns/client_expire.rb
@@ -0,0 +1,40 @@
+# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> and all contributors
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+
+# included in Yahns::HttpClient
+#
+# this provides the ability to expire idle clients once we hit a soft limit
+# on idle clients
+#
+# we absolutely DO NOT issue IO#close in here, only BasicSocket#shutdown
+module Yahns::ClientExpire # :nodoc:
+  def yahns_expire(timeout) # rarely called
+    return 0 if closed? # still racy, but avoid the exception in most cases
+
+    info = Raindrops::TCP_Info.new(self)
+    return 0 if info.tcpi_state != 1 # TCP_ESTABLISHED == 1
+
+    # Linux struct tcp_info timers are in milliseconds
+    timeout *= 1000
+
+    send_timedout = !!(info.tcpi_last_data_sent > timeout)
+
+    # tcpi_last_data_recv is not valid unless tcpi_ato (ACK timeout) is set
+    if 0 == info.tcpi_ato
+      sd = send_timedout && (info.tcpi_last_ack_recv > timeout)
+    else
+      sd = send_timedout && (info.tcpi_last_data_recv > timeout)
+    end
+    if sd
+      shutdown
+      1
+    else
+      0
+    end
+  # we also do not expire UNIX domain sockets
+  # (since those are the most trusted of local clients)
+  # the IO#closed? check is racy
+  rescue
+    0
+  end
+end
diff --git a/lib/yahns/client_expire_portable.rb b/lib/yahns/client_expire_portable.rb
new file mode 100644
index 0000000..2ea7706
--- /dev/null
+++ b/lib/yahns/client_expire_portable.rb
@@ -0,0 +1,39 @@
+# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> and all contributors
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+module Yahns::ClientExpire # :nodoc:
+  def __timestamp
+    Time.now.to_f
+  end
+
+  def yahns_expire(timeout)
+    return 0 if closed? # still racy, but avoid the exception in most cases
+    if (__timestamp - @last_io_at) > timeout
+      shutdown
+      1
+    else
+      0
+    end
+  rescue # the IO#closed? check is racy
+    0
+  end
+
+  def kgio_read(*args)
+    @last_io_at = __timestamp
+    super
+  end
+
+  def kgio_write(*args)
+    @last_io_at = __timestamp
+    super
+  end
+
+  def kgio_trywrite(*args)
+    @last_io_at = __timestamp
+    super
+  end
+
+  def kgio_tryread(*args)
+    @last_io_at = __timestamp
+    super
+  end
+end
diff --git a/lib/yahns/config.rb b/lib/yahns/config.rb
new file mode 100644
index 0000000..1d4d110
--- /dev/null
+++ b/lib/yahns/config.rb
@@ -0,0 +1,341 @@
+# -*- encoding: binary -*-
+# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> and all contributors
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+#
+# Implements a DSL for configuring a yahns server.
+# See http://yahns.yhbt.net/examples/yahns_multi.conf.rb for a full
+# example configuration file.
+class Yahns::Config # :nodoc:
+  APP_CLASS = {} # public, see yahns/rack for usage example
+  CfgBlock = Struct.new(:type, :ctx) # :nodoc:
+  attr_reader :config_file, :config_listeners, :set
+  attr_reader :qeggs, :app_ctx
+
+  def initialize(config_file = nil)
+    @config_file = config_file
+    @block = nil
+    config_reload!
+  end
+
+  def _check_in_block(ctx, var)
+    if ctx == nil
+      return var if @block == nil
+      msg = "#{var} must be called outside of #{@block.type}"
+    else
+      return var if @block && ctx == @block.type
+      msg = @block ? "may not be used inside a #{@block.type} block" :
+                     "must be used with a #{ctx} block"
+    end
+    raise ArgumentError, msg
+  end
+
+  def config_reload! #:nodoc:
+    # app_instance:app_ctx is a 1:N relationship
+    @config_listeners = {} # name/address -> options
+    @app_ctx = []
+    @set = Hash.new(:unset)
+    @qeggs = {}
+    @app_instances = {}
+
+    # set defaults:
+    client_expire_threshold(0.5) # default is half of the open file limit
+
+    instance_eval(File.read(@config_file), @config_file) if @config_file
+
+    # working_directory binds immediately (easier error checking that way),
+    # now ensure any paths we changed are correctly set.
+    [ :pid, :stderr_path, :stdout_path ].each do |var|
+      String === (path = @set[var]) or next
+      path = File.expand_path(path)
+      File.writable?(path) || File.writable?(File.dirname(path)) or \
+            raise ArgumentError, "directory for #{var}=#{path} not writable"
+    end
+  end
+
+  def logger(obj)
+    var = :logger
+    %w(debug info warn error fatal).each do |m|
+      obj.respond_to?(m) and next
+      raise ArgumentError, "#{var}=#{obj} does not respond to method=#{m}"
+    end
+    if @block
+      if @block.ctx.respond_to?(:logger=)
+        @block.ctx.logger = obj
+      else
+        raise ArgumentError, "#{var} not valid inside #{@block.type}"
+      end
+    else
+      @set[var] = obj
+    end
+  end
+
+  def worker_processes(nr)
+    # TODO: allow zero
+    var = _check_in_block(nil, :worker_processes)
+    @set[var] = _check_int(var, nr, 1)
+  end
+
+  # sets the +path+ for the PID file of the yahns master process
+  def pid(path)
+    _set_path(:pid, path)
+  end
+
+  def stderr_path(path)
+    _set_path(:stderr_path, path)
+  end
+
+  def stdout_path(path)
+    _set_path(:stdout_path, path)
+  end
+
+  def value(var)
+    val = @set[var]
+    val == :unset ? nil : val
+  end
+
+  # sets the working directory for yahns.  This ensures SIGUSR2 will
+  # start a new instance of yahns in this directory.  This may be
+  # a symlink, a common scenario for Capistrano users.  Unlike
+  # all other yahns configuration directives, this binds immediately
+  # for error checking and cannot be undone by unsetting it in the
+  # configuration file and reloading.
+  def working_directory(path)
+    var = :working_directory
+    @app_ctx.empty? or
+      raise ArgumentError, "#{var} must be declared before any apps"
+
+    # just let chdir raise errors
+    path = File.expand_path(path)
+    if @config_file &&
+       @config_file[0] != ?/ &&
+       ! File.readable?("#{path}/#@config_file")
+      raise ArgumentError,
+            "config_file=#@config_file would not be accessible in" \
+            " #{var}=#{path}"
+    end
+    Dir.chdir(path)
+    @set[var] = ENV["PWD"] = path
+  end
+
+  # Runs worker processes as the specified +user+ and +group+.
+  # The master process always stays running as the user who started it.
+  # This switch will occur after calling the after_fork hooks, and only
+  # if the Worker#user method is not called in the after_fork hooks
+  # +group+ is optional and will not change if unspecified.
+  def user(user, group = nil)
+    var = :user
+    @block and raise "#{var} is not valid inside #{@block.type}"
+    # raises ArgumentError on invalid user/group
+    Etc.getpwnam(user)
+    Etc.getgrnam(group) if group
+    @set[var] = [ user, group ]
+  end
+
+  def _set_path(var, path) #:nodoc:
+    _check_in_block(nil, var)
+    case path
+    when NilClass, String
+      @set[var] = path
+    else
+      raise ArgumentError
+    end
+  end
+
+  def listen(address, options = {})
+    options = options.dup
+    var = _check_in_block(:app, :listen)
+    address = expand_addr(address)
+    String === address or
+      raise ArgumentError, "address=#{address.inspect} must be a string"
+    [ :umask, :backlog, :sndbuf, :rcvbuf ].each do |key|
+      value = options[key] or next
+      Integer === value or
+        raise ArgumentError, "#{var}: not an integer: #{key}=#{value.inspect}"
+    end
+    [ :ipv6only ].each do |key|
+      (value = options[key]).nil? and next
+      [ true, false ].include?(value) or
+        raise ArgumentError, "#{var}: not boolean: #{key}=#{value.inspect}"
+    end
+
+    options[:yahns_app_ctx] = @block.ctx
+    @config_listeners.include?(address) and
+      raise ArgumentError, "listen #{address} already in use"
+    @config_listeners[address] = options
+  end
+
+  # expands "unix:path/to/foo" to a socket relative to the current path
+  # expands pathnames of sockets if relative to "~" or "~username"
+  # expands "*:port and ":port" to "0.0.0.0:port"
+  def expand_addr(address) #:nodoc:
+    return "0.0.0.0:#{address}" if Integer === address
+    return address unless String === address
+
+    case address
+    when %r{\Aunix:(.*)\z}
+      File.expand_path($1)
+    when %r{\A~}
+      File.expand_path(address)
+    when %r{\A(?:\*:)?(\d+)\z}
+      "0.0.0.0:#$1"
+    when %r{\A\[([a-fA-F0-9:]+)\]\z}, %r/\A((?:\d+\.){3}\d+)\z/
+      canonicalize_tcp($1, 80)
+    when %r{\A\[([a-fA-F0-9:]+)\]:(\d+)\z}, %r{\A(.*):(\d+)\z}
+      canonicalize_tcp($1, $2.to_i)
+    else
+      address
+    end
+  end
+
+  def canonicalize_tcp(addr, port)
+    packed = Socket.pack_sockaddr_in(port, addr)
+    port, addr = Socket.unpack_sockaddr_in(packed)
+    /:/ =~ addr ? "[#{addr}]:#{port}" : "#{addr}:#{port}"
+  end
+
+  def queue(name = :default, &block)
+    var = :queue
+    qegg = @qeggs[name] ||= Yahns::QueueEgg.new
+    prev_block = @block
+    _check_in_block(:app, var) if prev_block
+    if block_given?
+      @block = CfgBlock.new(:queue, qegg)
+      instance_eval(&block)
+      @block = prev_block
+    end
+    prev_block.ctx.qegg = qegg if prev_block
+  end
+
+  # queue parameters (Yahns::QueueEgg)
+  %w(max_events worker_threads).each do |_v|
+    eval(
+    %Q(def #{_v}(val);) <<
+    %Q(  _check_in_block(:queue, :#{_v});) <<
+    %Q(  @block.ctx.__send__("#{_v}=", _check_int(:#{_v}, val, 1));) <<
+    %Q(end)
+    )
+  end
+
+  def _check_int(var, n, min)
+    Integer === n or raise ArgumentError, "not an integer: #{var}=#{n.inspect}"
+    n >= min or raise ArgumentError, "too low (< #{min}): #{var}=#{n.inspect}"
+    n
+  end
+
+  # global
+  def client_expire_threshold(val)
+    var = _check_in_block(nil, :client_expire_threshold)
+    val > 0 or raise ArgumentError, "#{var} must be > 0"
+    case val
+    when Float
+      val <= 1.0 or raise ArgumentError, "#{var} must be <= 1.0 if a ratio"
+    when Integer
+    else
+      raise ArgumentError, "#{var} must be a float or integer"
+    end
+    @set[var] = val
+  end
+
+  # type = :rack
+  def app(type, *args, &block)
+    var = _check_in_block(nil, :app)
+    file = "yahns/#{type.to_s}"
+    begin
+      require file
+    rescue LoadError => e
+      raise ArgumentError, "#{type.inspect} is not a supported app type",
+            e.backtrace
+    end
+    klass = APP_CLASS[type] or
+      raise TypeError,
+        "#{var}: #{file} did not register #{type} in #{self.class}::APP_CLASS"
+
+    # apps may have multiple configurator contexts
+    app = @app_instances[klass.instance_key(*args)] = klass.new(*args)
+    ctx = app.config_context
+    if block_given?
+      @block = CfgBlock.new(:app, ctx)
+      instance_eval(&block)
+      @block = nil
+    end
+    @app_ctx << ctx
+  end
+
+  def _check_bool(var, val)
+    return val if [ true, false ].include?(val)
+    raise ArgumentError, "#{var} must be boolean"
+  end
+
+  # boolean config directives for app
+  %w(check_client_connection
+     output_buffering
+     persistent_connections).each do |_v|
+    eval(
+    %Q(def #{_v}(bool);) <<
+    %Q(  _check_in_block(:app, :#{_v});) <<
+    %Q(  @block.ctx.__send__("#{_v}=", _check_bool(:#{_v}, bool));) <<
+    %Q(end)
+    )
+  end
+
+  # integer config directives for app
+  {
+    # config name, minimum value
+    client_body_buffer_size: 1,
+    client_max_body_size: 0,
+    client_header_buffer_size: 1,
+    client_max_header_size: 1,
+    client_timeout: 0,
+  }.each do |_v,minval|
+    eval(
+    %Q(def #{_v}(val);) <<
+    %Q(  _check_in_block(:app, :#{_v});) <<
+    %Q(  @block.ctx.__send__("#{_v}=", _check_int(:#{_v}, val, #{minval}));) <<
+    %Q(end)
+    )
+  end
+
+  def input_buffering(val)
+    var = _check_in_block(:app, :input_buffering)
+    ok = [ :lazy, true, false ]
+    ok.include?(val) or
+      raise ArgumentError, "`#{var}' must be one of: #{ok.inspect}"
+    @block.ctx.__send__("#{var}=", val)
+  end
+
+  # used to configure rack.errors destination
+  def errors(val)
+    var = _check_in_block(:app, :errors)
+    if String === val
+      # we've already bound working_directory by the time we get here
+      val = File.open(File.expand_path(val), "a")
+      val.binmode
+      val.sync = true
+    else
+      rt = %w(puts write flush).map(&:to_sym) # match Rack::Lint
+      rt.all? { |m| val.respond_to?(m) } or raise ArgumentError,
+                   "`#{var}' destination must respond to all of: #{rt.inspect}"
+    end
+    @block.ctx.__send__("#{var}=", val)
+  end
+
+  def commit!(server)
+    # redirect IOs
+    { stdout_path: $stdout, stderr_path: $stderr }.each do |key, io|
+      path = @set[key]
+      if path == :unset && server.daemon_pipe
+        @set[key] = path = "/dev/null"
+      end
+      File.open(path, 'a') { |fp| io.reopen(fp) } if String === path
+      io.sync = true
+    end
+
+    [ :logger, :pid, :worker_processes ].each do |var|
+      val = @set[var]
+      server.__send__("#{var}=", val) if val != :unset
+    end
+    queue(:default) if @qeggs.empty?
+    @qeggs.each_value { |qegg| qegg.logger ||= server.logger }
+    @app_ctx.each { |app| app.logger ||= server.logger }
+  end
+end
diff --git a/lib/yahns/daemon.rb b/lib/yahns/daemon.rb
new file mode 100644
index 0000000..d12ff69
--- /dev/null
+++ b/lib/yahns/daemon.rb
@@ -0,0 +1,51 @@
+# -*- encoding: binary -*-
+# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> and all contributors
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+module Yahns::Daemon # :nodoc:
+  # We don't do a lot of standard daemonization stuff:
+  #   * umask is whatever was set by the parent process at startup
+  #     and can be set in config.ru and config_file, so making it
+  #     0000 and potentially exposing sensitive log data can be bad
+  #     policy.
+  #   * don't bother to chdir("/") here since yahns is designed to
+  #     run inside APP_ROOT.  Yahns will also re-chdir() to
+  #     the directory it was started in when being re-executed
+  #     to pickup code changes if the original deployment directory
+  #     is a symlink or otherwise got replaced.
+  def self.daemon(yahns_server)
+    $stdin.reopen("/dev/null")
+
+    # We only start a new process group if we're not being reexecuted
+    # and inheriting file descriptors from our parent
+    unless ENV['YAHNS_FD']
+      # grandparent - reads pipe, exits when master is ready
+      #  \_ parent  - exits immediately ASAP
+      #      \_ yahns master - writes to pipe when ready
+
+      # We cannot use Yahns::Sigevent (eventfd) here because we need
+      # to detect EOF on unexpected death, not just read/write
+      rd, wr = IO.pipe
+      grandparent = $$
+      if fork
+        wr.close # grandparent does not write
+      else
+        rd.close # yahns master does not read
+        Process.setsid
+        exit if fork # parent dies now
+      end
+
+      if grandparent == $$
+        # this will block until Server#join runs (or it dies)
+        master_pid = (rd.readpartial(16) rescue nil).to_i
+        unless master_pid > 1
+          warn "master failed to start, check stderr log for details"
+          exit!(1)
+        end
+        exit 0
+      else # yahns master process
+        yahns_server.daemon_pipe = wr
+      end
+    end
+    # $stderr/$stderr can/will be redirected separately in the Yahns config
+  end
+end
diff --git a/lib/yahns/fdmap.rb b/lib/yahns/fdmap.rb
new file mode 100644
index 0000000..0272421
--- /dev/null
+++ b/lib/yahns/fdmap.rb
@@ -0,0 +1,90 @@
+# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> and all contributors
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+require 'thread'
+
+# only initialize this after forking, this is highly volatile and won't
+# be able to share data across processes at all.
+# This is really a singleton
+
+class Yahns::Fdmap # :nodoc:
+  def initialize(logger, client_expire_threshold)
+    @logger = logger
+
+    if Float === client_expire_threshold
+      client_expire_threshold *= Process.getrlimit(:NOFILE)[0]
+    elsif client_expire_treshhold < 0
+      client_expire_threshold = Process.getrlimit(:NOFILE)[0] -
+                                client_expire_threshold
+    end
+    @client_expire_threshold = client_expire_threshold.to_i
+
+    # This is an array because any sane OS will frequently reuse FDs
+    # to keep this tightly-packed and favor lower FD numbers
+    # (consider select(2) performance (not that we use select))
+    # An (unpacked) Hash (in MRI) uses 5 more words per entry than an Array,
+    # and we should expect this array to have around 60K elements
+    @fdmap_ary = []
+    @fdmap_mtx = Mutex.new
+    @last_expire = 0.0
+    @count = 0
+  end
+
+  # called immediately after accept()
+  def add(io)
+    fd = io.fileno
+    @fdmap_mtx.synchronize do
+      if (@count += 1) > @client_expire_threshold
+        __expire_for(io)
+      else
+        @fdmap_ary[fd] = io
+      end
+    end
+  end
+
+  # this is only called in Errno::EMFILE/Errno::ENFILE situations
+  def desperate_expire_for(io, timeout)
+    @fdmap_mtx.synchronize { __expire_for(io, timeout) }
+  end
+
+  # called before IO#close
+  def decr
+    # don't bother clearing the element in @fdmap_ary, it'll just be
+    # overwritten when another client connects (soon).  We must not touch
+    # @fdmap_ary[io.fileno] after IO#close on io
+    @fdmap_mtx.synchronize { @count -= 1 }
+  end
+
+  def delete(io) # use with rack.hijack (via yahns)
+    fd = io.fileno
+    @fdmap_mtx.synchronize do
+      @fdmap_ary[fd] = nil
+      @count -= 1
+    end
+  end
+
+  # expire a bunch of idle clients and register the current one
+  # We should not be calling this too frequently, it is expensive
+  # This is called while @fdmap_mtx is held
+  def __expire_for(io, timeout = nil)
+    nr = 0
+    now = Time.now.to_f
+    (now - @last_expire) >= 1.0 or return # don't expire too frequently
+
+    # @fdmap_ary may be huge, so always expire a bunch at once to
+    # avoid getting to this method too frequently
+    @fdmap_ary.each do |c|
+      c.respond_to?(:yahns_expire) or next
+      nr += c.yahns_expire(timeout || c.class.client_timeout)
+    end
+
+    @fdmap_ary[io.fileno] = io
+    @last_expire = Time.now.to_f
+    msg = timeout ? "timeout=#{timeout})" : "client_timeout"
+    @logger.info("dropping #{nr} of #@count clients for #{msg}")
+  end
+
+  # used for graceful shutdown
+  def size
+    @fdmap_mtx.synchronize { @count }
+  end
+end
diff --git a/lib/yahns/http_client.rb b/lib/yahns/http_client.rb
new file mode 100644
index 0000000..8171460
--- /dev/null
+++ b/lib/yahns/http_client.rb
@@ -0,0 +1,196 @@
+# -*- encoding: binary -*-
+# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> and all contributors
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+require 'yahns/tiny_input'
+class Yahns::HttpClient < Kgio::Socket # :nodoc:
+  NULL_IO = Yahns::TinyInput.new("")
+
+  include Yahns::HttpResponse
+  include Yahns::ClientExpire
+  QEV_FLAGS = Yahns::Queue::QEV_RD # used by acceptor
+  HTTP_RESPONSE_START = [ 'HTTP', '/1.1 ' ]
+
+  # A frozen format for this is about 15% faster (note from Mongrel)
+  REMOTE_ADDR = 'REMOTE_ADDR'.freeze
+  RACK_INPUT = 'rack.input'.freeze
+  RACK_HIJACK = 'rack.hijack'.freeze
+  RACK_HIJACK_IO = "rack.hijack_io".freeze
+
+  # called from acceptor thread
+  def yahns_init
+    @hs = Unicorn::HttpRequest.new
+    @response_start_sent = false
+    @state = :headers # :body, :trailers, :pipelined, Wbuf, StreamFile
+    @input = nil
+  end
+
+  # use if writes are deferred by buffering, this return value goes to
+  # the main epoll/kqueue worker loop
+  # returns :wait_readable, :wait_writable, or nil
+  def step_write
+    case rv = @state.wbuf_flush(self)
+    when :wait_writable, :wait_readable
+      return rv # tell epoll/kqueue to wait on this more
+    when :delete # :delete on hijack
+      @state = :delete
+      return :delete
+    when Yahns::StreamFile
+      @state = rv # continue looping
+    when true, false # done
+      return http_response_done(rv)
+    else
+      raise "BUG: #{@state.inspect}#wbuf_flush returned #{rv.inspect}"
+    end while true
+  end
+
+  def mkinput_preread
+    @state = :body
+    @input = self.class.tmpio_for(@hs.content_length)
+    rbuf = Thread.current[:yahns_rbuf]
+    @hs.filter_body(rbuf, @hs.buf)
+    @input.write(rbuf)
+  end
+
+  def input_ready
+    empty_body = 0 == @hs.content_length
+    k = self.class
+    case k.input_buffering
+    when true
+      # common case is an empty body
+      return NULL_IO if empty_body
+
+      # content_length is nil (chunked) or len > 0
+      mkinput_preread # keep looping
+      false
+    else # :lazy, false
+      empty_body ? NULL_IO : k.mkinput(self, @hs)
+    end
+  end
+
+  # the main entry point of the epoll/kqueue worker loop
+  def yahns_step
+    # always write unwritten data first if we have any
+    return step_write if Yahns::WbufCommon === @state
+
+    # only read if we had nothing to write in this event loop iteration
+    k = self.class
+    rbuf = Thread.current[:yahns_rbuf] # running under spawn_worker_threads
+
+    case @state
+    when :pipelined
+      if @hs.parse
+        input = input_ready and return app_call(input)
+        # @state == :body if we get here point (input_ready -> mkinput_preread)
+      else
+        @state = :headers
+      end
+      # continue to outer loop
+    when :headers
+      case rv = kgio_tryread(k.client_header_buffer_size, rbuf)
+      when String
+        if @hs.add_parse(rv)
+          input = input_ready and return app_call(input)
+          break # to outer loop to reevaluate @state == :body
+        end
+        # keep looping on kgio_tryread
+      when :wait_readable, :wait_writable, nil
+        return rv
+      end while true
+    when :body
+      if @hs.body_eof?
+        if @hs.content_length || @hs.parse # hp.parse == trailers done!
+          @input.rewind
+          return app_call(@input)
+        else # possible Transfer-Encoding:chunked, keep looping
+          @state = :trailers
+        end
+      else
+        case rv = kgio_tryread(k.client_body_buffer_size, rbuf)
+        when String
+          @hs.filter_body(rbuf, @hs.buf << rbuf)
+          @input.write(rbuf)
+          # keep looping on kgio_tryread...
+        when :wait_readable, :wait_writable
+          return rv # have epoll/kqueue wait for more
+        when nil # unexpected EOF
+          return @input.close # nil
+        end # continue to outer loop (case @state)
+      end
+    when :trailers
+      case rv = kgio_tryread(k.client_header_buffer_size, rbuf)
+      when String
+        if @hs.add_parse(rbuf)
+          @input.rewind
+          return app_call(@input)
+        end
+        # keep looping on kgio_tryread...
+      when :wait_readable, :wait_writable
+        return rv # wait for more
+      when nil # unexpected EOF
+        return @input.close # nil
+      end while true
+    end while true # outer loop
+  rescue => e
+    handle_error(e)
+  end
+
+  def app_call(input)
+    env = @hs.env
+    env[REMOTE_ADDR] = @kgio_addr
+    env[RACK_HIJACK] = hijack_proc(env)
+    env[RACK_INPUT] = @input ||= input
+    k = self.class
+
+    if k.check_client_connection && @hs.headers?
+      @response_start_sent = true
+      # FIXME: we should buffer this just in case
+      HTTP_RESPONSE_START.each { |c| kgio_write(c) }
+    end
+
+    # run the rack app
+    response = k.app.call(env.merge!(k.app_defaults))
+    return :delete if env.include?(RACK_HIJACK_IO)
+
+    # this returns :wait_readable, :wait_writable, :delete, or nil:
+    http_response_write(*response)
+  end
+
+  def hijack_proc(env)
+    proc { env[RACK_HIJACK_IO] = self }
+  end
+
+  # called automatically by kgio_write
+  def kgio_wait_writable(timeout = self.class.client_timeout)
+    super timeout
+  end
+
+  # called automatically by kgio_read
+  def kgio_wait_readable(timeout = self.class.client_timeout)
+    super timeout
+  end
+
+  # if we get any error, try to write something back to the client
+  # assuming we haven't closed the socket, but don't get hung up
+  # if the socket is already closed or broken.  We'll always return
+  # nil to ensure the socket is closed at the end of this function
+  def handle_error(e)
+    code = case e
+    when EOFError,Errno::ECONNRESET,Errno::EPIPE,Errno::ENOTCONN
+      return # don't send response, drop the connection
+    when Unicorn::RequestURITooLongError
+      414
+    when Unicorn::RequestEntityTooLargeError
+      413
+    when Unicorn::HttpParserError # try to tell the client they're bad
+      400
+    else
+      Yahns::Log.exception(@hs.env["rack.logger"], "app error", e)
+      500
+    end
+    kgio_trywrite(err_response(code))
+  rescue
+  ensure
+    shutdown rescue nil
+    return # always drop the connection on uncaught errors
+  end
+end
diff --git a/lib/yahns/http_context.rb b/lib/yahns/http_context.rb
new file mode 100644
index 0000000..1af41df
--- /dev/null
+++ b/lib/yahns/http_context.rb
@@ -0,0 +1,66 @@
+# -*- encoding: binary -*-
+# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> and all contributors
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+require 'yahns/tiny_input'
+
+# subclasses of Yahns::HttpClient will class extend this
+
+module Yahns::HttpContext # :nodoc:
+  attr_accessor :check_client_connection
+  attr_accessor :client_body_buffer_size
+  attr_accessor :client_header_buffer_size
+  attr_accessor :client_max_body_size
+  attr_accessor :client_max_header_size
+  attr_accessor :input_buffering  # :lazy, true, false
+  attr_accessor :output_buffering # true, false
+  attr_accessor :persistent_connections # true or false only
+  attr_accessor :client_timeout
+  attr_accessor :qegg
+  attr_reader :app
+  attr_reader :app_defaults
+
+  def http_ctx_init(yahns_rack)
+    @yahns_rack = yahns_rack
+    @app_defaults = yahns_rack.app_defaults
+    @check_client_connection = false
+    @client_body_buffer_size = 112 * 1024
+    @client_header_buffer_size = 4000
+    @client_max_body_size = 1024 * 1024
+    @input_buffering = true
+    @output_buffering = true
+    @persistent_connections = true
+    @client_timeout = 15
+    @qegg = nil
+  end
+
+  # call this after forking
+  def after_fork_init
+    @app = @yahns_rack.app_after_fork
+  end
+
+  # call this immediately after successful accept()/accept4()
+  def logger=(l) # cold
+    @logger = @app_defaults["rack.logger"] = l
+  end
+
+  def logger
+    @app_defaults["rack.logger"]
+  end
+
+  def mkinput(client, hs)
+    (@input_buffering ? Yahns::TeeInput : Yahns::StreamInput).new(client, hs)
+  end
+
+  def errors=(dest)
+    @app_defaults["rack.errors"] = dest
+  end
+
+  def errors
+    @app_defaults["rack.errors"]
+  end
+
+  def tmpio_for(len)
+    len && len <= @client_body_buffer_size ?
+      Yahns::TinyInput.new("") : Yahns::TmpIO.new
+  end
+end
diff --git a/lib/yahns/http_response.rb b/lib/yahns/http_response.rb
new file mode 100644
index 0000000..aad2762
--- /dev/null
+++ b/lib/yahns/http_response.rb
@@ -0,0 +1,183 @@
+# -*- encoding: binary -*-
+# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> and all contributors
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+require_relative 'stream_file'
+
+# Writes a Rack response to your client using the HTTP/1.1 specification.
+# You use it by simply doing:
+#
+#   status, headers, body = rack_app.call(env)
+#   http_response_write(status, headers, body)
+#
+# Most header correctness (including Content-Length and Content-Type)
+# is the job of Rack, with the exception of the "Date" header.
+module Yahns::HttpResponse # :nodoc:
+  include Unicorn::HttpResponse
+
+  # avoid GC overhead for frequently used-strings:
+  CONN_KA = "Connection: keep-alive\r\n\r\n"
+  CONN_CLOSE = "Connection: close\r\n\r\n"
+  Z = ""
+  RESPONSE_START = "HTTP/1.1 "
+
+  def response_start
+    @response_start_sent ? Z : RESPONSE_START
+  end
+
+  def response_wait_write(rv)
+    # call the kgio_wait_readable or kgio_wait_writable method
+    ok = __send__("kgio_#{rv}") and return ok
+    k = self.class
+    k.logger.info("fd=#{fileno} ip=#@kgio_addr timeout on :#{rv} after "\
+                  "#{k.client_timeout}s")
+    nil
+  end
+
+  def err_response(code)
+    "#{response_start}#{CODES[code]}\r\n\r\n"
+  end
+
+  def response_header_blocked(ret, header, body, alive, offset, count)
+    if body.respond_to?(:to_path)
+      alive = Yahns::StreamFile.new(body, alive, offset, count)
+      body = nil
+    end
+    wbuf = Yahns::Wbuf.new(body, alive)
+    rv = wbuf.wbuf_write(self, header)
+    body.each { |chunk| rv = wbuf.wbuf_write(self, chunk) } if body
+    wbuf_maybe(wbuf, rv, alive)
+  end
+
+  def wbuf_maybe(wbuf, rv, alive)
+    case rv # trysendfile return value
+    when nil
+      case alive
+      when :delete
+        @state = :delete
+      when true, false
+        http_response_done(alive)
+      end
+    else
+      @state = wbuf
+      rv
+    end
+  end
+
+  def http_response_done(alive)
+    @input = @input.discard if @input
+    if alive
+      @response_start_sent = false
+      # @hs.buf will have data if the client pipelined
+      if @hs.buf.empty?
+        @state = :headers
+        :wait_readable
+      else
+        @state = :pipelined
+        # may need to wait for readability if SSL,
+        # only need writability if plain TCP
+        :wait_readwrite
+      end
+    else
+      # shutdown is needed in case the app forked, we rescue here since
+      # StreamInput may issue shutdown as well
+      shutdown rescue nil
+      nil # trigger close
+    end
+  end
+
+  # writes the rack_response to socket as an HTTP response
+  # returns :wait_readable, :wait_writable, :forget, or nil
+  def http_response_write(status, headers, body)
+    status = CODES[status.to_i] || status
+    offset = 0
+    count = hijack = nil
+    alive = @hs.next?
+
+    if @hs.headers?
+      buf = "#{response_start}#{status}\r\nDate: #{httpdate}\r\n"
+      headers.each do |key, value|
+        case key
+        when %r{\ADate\z}
+          next
+        when %r{\AContent-Range\z}i
+          if %r{\Abytes (\d+)-(\d+)/\d+\z} =~ value
+            offset = $1.to_i
+            count = $2.to_i - offset + 1
+          end
+        when %r{\AConnection\z}i
+          # allow Rack apps to tell us they want to drop the client
+          alive = !!(value =~ /\bclose\b/i)
+        when "rack.hijack"
+          hijack = value
+          body = nil # ensure we do not close body
+        else
+          if value =~ /\n/
+            # avoiding blank, key-only cookies with /\n+/
+            buf << value.split(/\n+/).map! { |v| "#{key}: #{v}\r\n" }.join
+          else
+            buf << "#{key}: #{value}\r\n"
+          end
+        end
+      end
+      buf << (alive ? CONN_KA : CONN_CLOSE)
+      case rv = kgio_trywrite(buf)
+      when nil # all done, likely
+        break
+      when String
+        buf = rv # hope the skb grows
+      when :wait_writable, :wait_readable
+        if self.class.output_buffering
+          alive = hijack ? hijack : alive
+          rv = response_header_blocked(rv, buf, body, alive, offset, count)
+          body = nil # ensure we do not close body in ensure
+          return rv
+        else
+          response_wait_write(rv) or return
+        end
+      end while true
+    end
+
+    if hijack
+      hijack.call(self)
+      return :delete # trigger EPOLL_CTL_DEL
+    end
+
+    if body.respond_to?(:to_path)
+      @state = body = Yahns::StreamFile.new(body, alive, offset, count)
+      return step_write
+    end
+
+    wbuf = rv = nil
+    body.each do |chunk|
+      if wbuf
+        rv = wbuf.wbuf_write(self, chunk)
+      else
+        case rv = kgio_trywrite(chunk)
+        when nil # all done, likely and good!
+          break
+        when String
+          chunk = rv # hope the skb grows when we loop into the trywrite
+        when :wait_writable, :wait_readable
+          if self.class.output_buffering
+            wbuf = Yahns::Wbuf.new(body, alive)
+            rv = wbuf.wbuf_write(self, chunk)
+            break
+          else
+            response_wait_write(rv) or return
+          end
+        end while true
+      end
+    end
+
+    # if we buffered the write body, we must return :wait_writable
+    # (or :wait_readable for SSL) and hit Yahns::HttpClient#step_write
+    if wbuf
+      body = nil # ensure we do not close the body in ensure
+      wbuf_maybe(wbuf, rv, alive)
+    else
+      http_response_done(alive)
+    end
+  ensure
+    body.respond_to?(:close) and body.close
+  end
+end
diff --git a/lib/yahns/log.rb b/lib/yahns/log.rb
new file mode 100644
index 0000000..59baa85
--- /dev/null
+++ b/lib/yahns/log.rb
@@ -0,0 +1,73 @@
+# -*- encoding: binary -*-
+# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> and all contributors
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+
+# logging-related utility functions for all of yahns
+module Yahns::Log # :nodoc:
+  def self.exception(logger, prefix, exc)
+    message = exc.message
+    message = message.dump if /[[:cntrl:]]/ =~ message # prevent code injection
+    logger.error "#{prefix}: #{message} (#{exc.class})"
+    exc.backtrace.each { |line| logger.error(line) }
+  end
+
+  def self.is_log?(fp)
+    append_flags = IO::WRONLY | IO::APPEND
+
+    ! fp.closed? &&
+      fp.stat.file? &&
+      fp.sync &&
+      (fp.fcntl(Fcntl::F_GETFL) & append_flags) == append_flags
+  rescue IOError, Errno::EBADF
+    false
+  end
+
+  def self.chown_all(uid, gid)
+    ObjectSpace.each_object(File) do |fp|
+      fp.chown(uid, gid) if is_log?(fp)
+    end
+  end
+
+  # This reopens ALL logfiles in the process that have been rotated
+  # using logrotate(8) (without copytruncate) or similar tools.
+  # A +File+ object is considered for reopening if it is:
+  #   1) opened with the O_APPEND and O_WRONLY flags
+  #   2) the current open file handle does not match its original open path
+  #   3) unbuffered (as far as userspace buffering goes, not O_SYNC)
+  # Returns the number of files reopened
+  def self.reopen_all
+    to_reopen = []
+    nr = 0
+    ObjectSpace.each_object(File) { |fp| is_log?(fp) and to_reopen << fp }
+
+    to_reopen.each do |fp|
+      orig_st = begin
+        fp.stat
+      rescue IOError, Errno::EBADF
+        next
+      end
+
+      begin
+        b = File.stat(fp.path)
+        next if orig_st.ino == b.ino && orig_st.dev == b.dev
+      rescue Errno::ENOENT
+      end
+
+      begin
+        File.open(fp.path, 'a') { |tmpfp| fp.reopen(tmpfp) }
+        fp.sync = true
+        new_st = fp.stat
+
+        # this should only happen in the master:
+        if orig_st.uid != new_st.uid || orig_st.gid != new_st.gid
+          fp.chown(orig_st.uid, orig_st.gid)
+        end
+
+        nr += 1
+      rescue IOError, Errno::EBADF
+        # not much we can do...
+      end
+    end
+    nr
+  end
+end
diff --git a/lib/yahns/queue.rb b/lib/yahns/queue.rb
new file mode 100644
index 0000000..add4f78
--- /dev/null
+++ b/lib/yahns/queue.rb
@@ -0,0 +1,7 @@
+# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net>
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+if SleepyPenguin.const_defined?(:Epoll)
+  require_relative 'queue_epoll'
+else
+  require_relative 'queue_kqueue' # TODO
+end
diff --git a/lib/yahns/queue_egg.rb b/lib/yahns/queue_egg.rb
new file mode 100644
index 0000000..a2abc2f
--- /dev/null
+++ b/lib/yahns/queue_egg.rb
@@ -0,0 +1,23 @@
+# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> and all contributors
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+
+# this represents a Yahns::Queue before its vivified.  This only
+# lives in the parent process and should be clobbered after qc_vivify
+class Yahns::QueueEgg # :nodoc:
+  attr_writer :max_events, :worker_threads
+  attr_accessor :logger
+
+  def initialize
+    @max_events = 1 # 1 is good if worker_threads > 1
+    @worker_threads = 7 # any default is wrong for most apps...
+    @logger = nil
+  end
+
+  # only call after forking
+  def qc_vivify(fdmap)
+    queue = Yahns::Queue.new
+    queue.fdmap = fdmap
+    queue.spawn_worker_threads(@logger, @worker_threads, @max_events)
+    queue
+  end
+end
diff --git a/lib/yahns/queue_epoll.rb b/lib/yahns/queue_epoll.rb
new file mode 100644
index 0000000..c9febc4
--- /dev/null
+++ b/lib/yahns/queue_epoll.rb
@@ -0,0 +1,57 @@
+# -*- encoding: binary -*-
+# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> and all contributors
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+class Yahns::Queue < SleepyPenguin::Epoll::IO # :nodoc:
+  include SleepyPenguin
+  attr_accessor :fdmap # Yahns::Fdmap
+
+  # public
+  QEV_RD = Epoll::IN | Epoll::ONESHOT
+  QEV_WR = Epoll::OUT | Epoll::ONESHOT
+  QEV_RDWR = QEV_RD | QEV_WR
+
+  def self.new
+    super(SleepyPenguin::Epoll::CLOEXEC)
+  end
+
+  # for HTTP and HTTPS servers, we rely on the io writing to us, first
+  # flags: QEV_RD/QEV_WR (usually QEV_RD)
+  def queue_add(io, flags)
+    @fdmap.add(io)
+    epoll_ctl(Epoll::CTL_ADD, io, flags)
+  end
+
+  # returns an array of infinitely running threads
+  def spawn_worker_threads(logger, worker_threads, max_events)
+    worker_threads.times do
+      Thread.new do
+        Thread.current[:yahns_rbuf] = ""
+        begin
+          epoll_wait(max_events) do |_, io| # don't care for flags for now
+            case rv = io.yahns_step
+            when :wait_readable
+              epoll_ctl(Epoll::CTL_MOD, io, QEV_RD)
+            when :wait_writable
+              epoll_ctl(Epoll::CTL_MOD, io, QEV_WR)
+            when :wait_readwrite
+              epoll_ctl(Epoll::CTL_MOD, io, QEV_RDWR)
+            when :delete # only used by rack.hijack
+              epoll_ctl(Epoll::CTL_DEL, io, 0)
+              @fdmap.delete(io)
+            when nil
+              # this is be the ONLY place where we call IO#close on
+              # things inside the queue
+              io.close
+              @fdmap.decr
+            else
+              raise "BUG: #{io.inspect}#yahns_step returned: #{rv.inspect}"
+            end
+          end
+        rescue => e
+          break if (IOError === e || Errno::EBADF === e) && closed?
+          Yahns::Log.exception(logger, 'queue loop', e)
+        end while true
+      end
+    end
+  end
+end
diff --git a/lib/yahns/rack.rb b/lib/yahns/rack.rb
new file mode 100644
index 0000000..27857ec
--- /dev/null
+++ b/lib/yahns/rack.rb
@@ -0,0 +1,80 @@
+# -*- encoding: binary -*-
+# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> and all contributors
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+require 'rack'
+class Yahns::Rack # :nodoc:
+  attr_reader :preload
+
+  # enforce a single instance for the identical config.ru
+  def self.instance_key(*args)
+    ru = args[0]
+
+    # it's safe to expand_path now since we enforce working_directory in the
+    # top-level config is called before any apps are created
+    # ru may also be a Rack::Builder object or any already-built Rack app
+    ru.respond_to?(:call) ? ru.object_id : File.expand_path(ru)
+  end
+
+  def initialize(ru, opts = {})
+    # always called after config file parsing, may be called after forking
+    @app = lambda do
+      ENV["RACK_ENV"] ||= "none"
+      if ru.respond_to?(:call)
+        inner_app = ru.respond_to?(:to_app) ? ru.to_app : ru
+      else
+        inner_app = case ru
+        when /\.ru$/
+          raw = File.read(ru)
+          raw.sub!(/^__END__\n.*/, '')
+          eval("Rack::Builder.new {(\n#{raw}\n)}.to_app", TOPLEVEL_BINDING, ru)
+        else
+          require ru
+          Object.const_get(File.basename(ru, '.rb').capitalize)
+        end
+      end
+      inner_app
+    end
+    @ru = ru
+    @preload = opts[:preload]
+    build_app! if @preload
+  end
+
+  def config_context
+    ctx_class = Class.new(Yahns::HttpClient)
+    ctx_class.extend(Yahns::HttpContext)
+    ctx_class.http_ctx_init(self)
+    ctx_class
+  end
+
+  def build_app!
+    if @app.respond_to?(:arity) && @app.arity == 0
+      Gem.refresh if defined?(Gem) && Gem.respond_to?(:refresh)
+      @app = @app.call
+    end
+  end
+
+  # allow different HttpContext instances to have different Rack defaults
+  def app_defaults
+    {
+      # logger is set in http_context
+      "rack.errors" => $stderr,
+      "rack.multiprocess" => true,
+      "rack.multithread" => true,
+      "rack.run_once" => false,
+      "rack.hijack?" => true,
+      "rack.version" => [ 1, 2 ],
+      "SCRIPT_NAME" => "",
+
+      # this is not in the Rack spec, but some apps may rely on it
+      "SERVER_SOFTWARE" => "yahns"
+    }
+  end
+
+  def app_after_fork
+    build_app! unless @preload
+    @app
+  end
+end
+
+# register ourselves
+Yahns::Config::APP_CLASS[:rack] = Yahns::Rack
diff --git a/lib/yahns/server.rb b/lib/yahns/server.rb
new file mode 100644
index 0000000..c7a5a57
--- /dev/null
+++ b/lib/yahns/server.rb
@@ -0,0 +1,328 @@
+# -*- encoding: binary -*-
+# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> and all contributors
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+class Yahns::Server # :nodoc:
+  QUEUE_SIGS = [ :WINCH, :QUIT, :INT, :TERM, :USR1, :USR2, :HUP, :TTIN, :TTOU ]
+  attr_accessor :daemon_pipe
+  attr_accessor :logger
+  attr_writer :worker_processes
+  include Yahns::SocketHelper
+
+  def initialize(config)
+    @reexec_pid = 0
+    @daemon_pipe = nil # writable IO or true
+    @config = config
+    @workers = {} # pid -> workers
+    @sig_queue = [] # nil in forked workers
+    @logger = Logger.new($stderr)
+    @sev = Yahns::Sigevent.new
+    @listeners = []
+    @pid = nil
+    @worker_processes = nil
+    @user = nil
+  end
+
+  def sqwakeup(sig)
+    @sig_queue << sig
+    @sev.sev_signal
+  end
+
+  def start
+    @config.commit!(self)
+    inherit_listeners!
+    # we try inheriting listeners first, so we bind them later.
+    # we don't write the pid file until we've bound listeners in case
+    # yahns was started twice by mistake.  Even though our #pid= method
+    # checks for stale/existing pid files, race conditions are still
+    # possible (and difficult/non-portable to avoid) and can be likely
+    # to clobber the pid if the second start was in quick succession
+    # after the first, so we rely on the listener binding to fail in
+    # that case.  Some tests (in and outside of this source tree) and
+    # monitoring tools may also rely on pid files existing before we
+    # attempt to connect to the listener(s)
+
+    # setup signal handlers before writing pid file in case people get
+    # trigger happy and send signals as soon as the pid file exists.
+    QUEUE_SIGS.each { |sig| trap(sig) { sqwakeup(sig) } }
+    self.pid = @config.value(:pid) # write pid file
+    bind_new_listeners!
+    if @worker_processes
+      require 'yahns/server_mp'
+      extend Yahns::ServerMP
+      mp_init
+    end
+    self
+  end
+
+  # replaces current listener set with +listeners+.  This will
+  # close the socket if it will not exist in the new listener set
+  def listeners=(listeners)
+    cur_names, dead_names = [], []
+    listener_names.each do |name|
+      if ?/ == name[0]
+        # mark unlinked sockets as dead so we can rebind them
+        (File.socket?(name) ? cur_names : dead_names) << name
+      else
+        cur_names << name
+      end
+    end
+    set_names = listener_names(listeners)
+    dead_names.concat(cur_names - set_names).uniq!
+
+    @listeners.delete_if do |io|
+      if dead_names.include?(sock_name(io))
+        (io.close rescue nil).nil? # true
+      else
+        set_server_sockopt(io, sock_opts(io))
+        false
+      end
+    end
+
+    (set_names - cur_names).each { |addr| listen(addr) }
+  end
+
+  # sets the path for the PID file of the master process
+  def pid=(path)
+    if path
+      if x = valid_pid?(path)
+        return path if @pid && path == @pid && x == $$
+        if x == @reexec_pid && @pid =~ /\.oldbin\z/
+          @logger.warn("will not set pid=#{path} while reexec-ed "\
+                       "child is running PID:#{x}")
+          return
+        end
+        raise ArgumentError, "Already running on PID:#{x} " \
+                             "(or pid=#{path} is stale)"
+      end
+    end
+    unlink_pid_safe(@pid) if @pid
+
+    if path
+      fp = begin
+        tmp = "#{File.dirname(path)}/#{rand}.#$$"
+        File.open(tmp, File::RDWR|File::CREAT|File::EXCL, 0644)
+      rescue Errno::EEXIST
+        retry
+      end
+      fp.syswrite("#$$\n")
+      File.rename(fp.path, path)
+      fp.close
+    end
+    @pid = path
+  end
+
+  # add a given address to the +listeners+ set, idempotently
+  # Allows workers to add a private, per-process listener via the
+  # after_fork hook.  Very useful for debugging and testing.
+  # +:tries+ may be specified as an option for the number of times
+  # to retry, and +:delay+ may be specified as the time in seconds
+  # to delay between retries.
+  # A negative value for +:tries+ indicates the listen will be
+  # retried indefinitely, this is useful when workers belonging to
+  # different masters are spawned during a transparent upgrade.
+  def listen(address)
+    address = @config.expand_addr(address)
+    return if String === address && listener_names.include?(address)
+
+    begin
+      io = bind_listen(address, sock_opts(address))
+      unless Kgio::TCPServer === io || Kgio::UNIXServer === io
+        io = server_cast(io)
+      end
+      @logger.info "listening on addr=#{sock_name(io)} fd=#{io.fileno}"
+      @listeners << io
+      io
+    rescue Errno::EADDRINUSE => err
+      @logger.error "adding listener failed addr=#{address} (in use)"
+    rescue => err
+      @logger.fatal "error adding listener addr=#{address}"
+      raise err
+    end
+  end
+
+  def daemon_ready
+    @daemon_pipe or return
+    @daemon_pipe.syswrite("#$$")
+    @daemon_pipe.close
+    @daemon_pipe = true # for SIGWINCH
+  end
+
+  # reexecutes the Yahns::START with a new binary
+  def reexec
+    if @reexec_pid > 0
+      begin
+        Process.kill(0, @reexec_pid)
+        @logger.error "reexec-ed child already running PID:#@reexec_pid"
+        return
+      rescue Errno::ESRCH
+        @reexec_pid = 0
+      end
+    end
+
+    if @pid
+      old_pid = "#@pid.oldbin"
+      begin
+        self.pid = old_pid  # clear the path for a new pid file
+      rescue ArgumentError
+        @logger.error "old PID:#{valid_pid?(old_pid)} running with " \
+                      "existing pid=#{old_pid}, refusing rexec"
+        return
+      rescue => e
+        @logger.error "error writing pid=#{old_pid} #{e.class} #{e.message}"
+        return
+      end
+    end
+
+    @reexec_pid = fork do
+      redirects = {}
+      listeners.each do |sock|
+        sock.close_on_exec = false
+        redirects[sock.fileno] = sock
+      end
+      ENV['YAHNS_FD'] = redirects.keys.map(&:to_s).join(',')
+      Dir.chdir(@config.value(:working_directory) || Yahns::START[:cwd])
+      cmd = [ Yahns::START[0] ].concat(Yahns::START[:argv])
+      @logger.info "executing #{cmd.inspect} (in #{Dir.pwd})"
+      cmd << redirects
+      exec(*cmd)
+    end
+    proc_name 'master (old)'
+  end
+
+  # unlinks a PID file at given +path+ if it contains the current PID
+  # still potentially racy without locking the directory (which is
+  # non-portable and may interact badly with other programs), but the
+  # window for hitting the race condition is small
+  def unlink_pid_safe(path)
+    (File.read(path).to_i == $$ and File.unlink(path)) rescue nil
+  end
+
+  # returns a PID if a given path contains a non-stale PID file,
+  # nil otherwise.
+  def valid_pid?(path)
+    wpid = File.read(path).to_i
+    wpid <= 0 and return
+    Process.kill(0, wpid)
+    wpid
+  rescue Errno::EPERM
+    @logger.info "pid=#{path} possibly stale, got EPERM signalling PID:#{wpid}"
+    nil
+  rescue Errno::ESRCH, Errno::ENOENT
+    # don't unlink stale pid files, racy without non-portable locking...
+  end
+
+  def load_config!
+    @logger.info "reloading config_file=#{@config.config_file}"
+    @config.config_reload!
+    @config.commit!(self)
+    kill_each_worker(:QUIT)
+    Yahns::Log.reopen_all
+    @logger.info "done reloading config_file=#{@config.config_file}"
+  rescue StandardError, LoadError, SyntaxError => e
+    Yahns::Log.exception(@logger,
+                     "error reloading config_file=#{@config.config_file}", e)
+  end
+
+  # returns an array of string names for the given listener array
+  def listener_names(listeners = @listeners)
+    listeners.map { |io| sock_name(io) }
+  end
+
+  def sock_opts(io)
+    @config.config_listeners[sock_name(io)]
+  end
+
+  def inherit_listeners!
+    # inherit sockets from parents, they need to be plain Socket objects
+    # before they become Kgio::UNIXServer or Kgio::TCPServer
+    inherited = ENV['YAHNS_FD'].to_s.split(/,/).map do |fd|
+      io = Socket.for_fd(fd.to_i)
+      set_server_sockopt(io, sock_opts(io))
+      @logger.info "inherited addr=#{sock_name(io)} fd=#{fd}"
+      server_cast(io)
+    end
+
+    @listeners.replace(inherited)
+  end
+
+  # call only after calling inherit_listeners!
+  # This binds any listeners we did NOT inherit from the parent
+  def bind_new_listeners!
+    self.listeners = @config.config_listeners.keys
+    raise ArgumentError, "no listeners" if @listeners.empty?
+    @listeners.each { |l| l.extend(Yahns::Acceptor) }
+  end
+
+  def proc_name(tag)
+    s = Yahns::START
+    $0 = ([ File.basename(s[0]), tag ]).concat(s[:argv]).join(' ')
+  end
+
+  # spins up processing threads of the server
+  def fdmap_init
+    thresh = @config.value(:client_expire_threshold)
+
+    # keeps track of all connections, like ObjectSpace, but only for IOs
+    fdmap = Yahns::Fdmap.new(@logger, thresh)
+
+    # initialize queues (epoll/kqueue) and associated worker threads
+    queues = {}
+    @config.qeggs.each do |name, qegg|
+      queue = qegg.qc_vivify(fdmap) # worker threads run after this
+      queues[qegg] = queue
+    end
+
+    # spin up applications (which are preload: false)
+    @config.app_ctx.each { |ctx| ctx.after_fork_init }
+
+    # spin up acceptors, clients flow into worker queues after this
+    @listeners.each do |l|
+      ctx = sock_opts(l)[:yahns_app_ctx]
+      qegg = ctx.qegg || @config.qeggs[:default]
+
+      # acceptors feed the the queues
+      l.spawn_acceptor(@logger, ctx, queues[qegg])
+    end
+    fdmap
+  end
+
+  def usr1_reopen(prefix)
+    @logger.info "#{prefix}reopening logs..."
+    Yahns::Log.reopen_all
+    @logger.info "#{prefix}done reopening logs"
+  end
+
+  def sp_sig_handle(alive)
+    @sev.kgio_wait_readable(alive ? nil : 0.01)
+    @sev.yahns_step
+    case sig = @sig_queue.shift
+    when :QUIT, :TERM, :INT
+      self.listeners = [] # stop accepting new connections
+      exit(0) unless alive
+      return false
+    when :USR1
+      usr1_reopen('')
+    when :USR2
+      reexec
+    when :HUP
+      reexec
+      return false
+    when :TTIN, :TTOU, :WINCH
+      @logger.info("SIG#{sig} ignored in single-process mode")
+    end
+    alive
+  end
+
+  # single-threaded only, this is overriden if @worker_processes is non-nil
+  def join
+    daemon_ready
+    fdmap = fdmap_init
+    alive = true
+    begin
+      alive = sp_sig_handle(alive)
+    rescue => e
+      Yahns::Log.exception(@logger, "main loop", e)
+    end while alive || fdmap.size > 0
+    unlink_pid_safe(@pid) if @pid
+  end
+end
diff --git a/lib/yahns/server_mp.rb b/lib/yahns/server_mp.rb
new file mode 100644
index 0000000..8818bac
--- /dev/null
+++ b/lib/yahns/server_mp.rb
@@ -0,0 +1,184 @@
+# -*- encoding: binary -*-
+# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> and all contributors
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+module Yahns::ServerMP # :nodoc:
+  EXIT_SIGS = [ :QUIT, :TERM, :INT ]
+
+  def mp_init
+    trap(:CHLD) { @sev.sev_signal }
+  end
+
+  # reaps all unreaped workers
+  def reap_all_workers
+    begin
+      wpid, status = Process.waitpid2(-1, Process::WNOHANG)
+      wpid or return
+      if @reexec_pid == wpid
+        @logger.error "reaped #{status.inspect} exec()-ed"
+        @reexec_pid = 0
+        self.pid = @pid.chomp('.oldbin') if @pid
+        proc_name 'master'
+      else
+        worker = @workers.delete(wpid)
+        worker_id = worker ? worker.nr : "(unknown)"
+        m = "reaped #{status.inspect} worker=#{worker_id}"
+        status.success? ? @logger.info(m) : @logger.error(m)
+      end
+    rescue Errno::ECHILD
+      return
+    end while true
+  end
+
+  def maintain_worker_count
+    (off = @workers.size - @worker_processes) == 0 and return
+    off < 0 and return spawn_missing_workers
+    @workers.each_pair do |wpid, worker|
+      worker.nr >= @worker_processes and Process.kill(:QUIT, wpid)
+    end
+  end
+
+  # delivers a signal to each worker
+  def kill_each_worker(signal)
+    @workers.each_key { |wpid| Process.kill(signal, wpid) }
+  end
+
+  # this is the first thing that runs after forking in a child
+  # gets rid of stuff the worker has no business keeping track of
+  # to free some resources and drops all sig handlers.
+  # traps for USR1, USR2, and HUP may be set in the after_fork Proc
+  # by the user.
+  def after_fork_internal(worker)
+    worker.atfork_child
+
+    # daemon_pipe may be true for non-initial workers
+    @daemon_pipe = @daemon_pipe.close if @daemon_pipe.respond_to?(:close)
+
+    srand # in case this pops up again: https://bugs.ruby-lang.org/issues/4338
+
+    # The OpenSSL PRNG is seeded with only the pid, and apps with frequently
+    # dying workers can recycle pids
+    OpenSSL::Random.seed(rand.to_s) if defined?(OpenSSL::Random)
+    # we'll re-trap EXIT_SIGS later for graceful shutdown iff we accept clients
+    EXIT_SIGS.each { |sig| trap(sig) { exit!(0) } }
+    exit!(0) if (@sig_queue & EXIT_SIGS)[0] # did we inherit sigs from parent?
+    @sig_queue = []
+
+    # ignore WINCH, TTIN, TTOU, HUP in the workers
+    (Yahns::Server::QUEUE_SIGS - EXIT_SIGS).each { |sig| trap(sig, nil) }
+    trap(:CHLD, 'DEFAULT')
+    @logger.info("worker=#{worker.nr} spawned pid=#$$")
+    proc_name "worker[#{worker.nr}]"
+    Yahns::START.clear
+    @sev.close
+    @sev = Yahns::Sigevent.new
+    worker.user(*@user) if @user
+    @user = @workers = nil
+  end
+
+  def spawn_missing_workers
+    worker_nr = -1
+    until (worker_nr += 1) == @worker_processes
+      @workers.value?(worker_nr) and next
+      worker = Yahns::Worker.new(worker_nr)
+      @logger.info("worker=#{worker_nr} spawning...")
+      if pid = fork
+        @workers[pid] = worker.atfork_parent
+      else
+        after_fork_internal(worker)
+        run_mp_worker(worker)
+      end
+    end
+  rescue => e
+    Yahns::Log.exception(@logger, "spawning worker", e)
+    exit!
+  end
+
+  # monitors children and receives signals forever
+  # (or until a termination signal is sent).  This handles signals
+  # one-at-a-time time and we'll happily drop signals in case somebody
+  # is signalling us too often.
+  def join
+    spawn_missing_workers
+    state = :respawn # :QUIT, :WINCH
+    proc_name 'master'
+    @logger.info "master process ready"
+    daemon_ready
+    begin
+      @sev.kgio_wait_readable
+      @sev.yahns_step
+      reap_all_workers
+      case @sig_queue.shift
+      when *EXIT_SIGS # graceful shutdown (twice for non graceful)
+        self.listeners = []
+        kill_each_worker(:QUIT)
+        state = :QUIT
+      when :USR1 # rotate logs
+        usr1_reopen("master ")
+        kill_each_worker(:USR1)
+      when :USR2 # exec binary, stay alive in case something went wrong
+        reexec
+      when :WINCH
+        if @daemon_pipe
+          state = :WINCH
+          @logger.info "gracefully stopping all workers"
+          kill_each_worker(:QUIT)
+          @worker_processes = 0
+        else
+          @logger.info "SIGWINCH ignored because we're not daemonized"
+        end
+      when :TTIN
+        state = :respawn unless state == :QUIT
+        @worker_processes += 1
+      when :TTOU
+        @worker_processes -= 1 if @worker_processes > 0
+      when :HUP
+        state = :respawn unless state == :QUIT
+        if @config.config_file
+          load_config!
+        else # exec binary and exit if there's no config file
+          @logger.info "config_file not present, reexecuting binary"
+          reexec
+        end
+      end while @sig_queue[0]
+      maintain_worker_count if state == :respawn
+    rescue => e
+      Yahns::Log.exception(@logger, "master loop error", e)
+    end while state != :QUIT || @workers.size > 0
+    @logger.info "master complete"
+    unlink_pid_safe(@pid) if @pid
+  end
+
+  def fdmap_init_mp
+    fdmap = fdmap_init # builds apps (if not preloading)
+    EXIT_SIGS.each { |sig| trap(sig) { sqwakeup(sig) } }
+    @config = nil
+    fdmap
+  end
+
+  def run_mp_worker(worker)
+    fdmap = fdmap_init_mp
+    alive = true
+    begin
+      alive = mp_sig_handle(worker, alive)
+    rescue => e
+      Yahns::Log.exception(@logger, "main worker loop", e)
+    end while alive || fdmap.size > 0
+    exit
+  end
+
+  def mp_sig_handle(worker, alive)
+    # not performance critical
+    r = IO.select([worker, @sev], nil, nil, alive ? nil : 0.01) and
+      r[0].each { |io| io.yahns_step }
+    case sig = @sig_queue.shift
+    when *EXIT_SIGS
+      self.listeners = []
+      exit(0) unless alive # drop connections immediately if signaled twice
+      @logger.info("received SIG#{sig}, gracefully exiting")
+      return false
+    when :USR1
+      usr1_reopen("worker ")
+    end
+    alive
+  end
+end
diff --git a/lib/yahns/sigevent.rb b/lib/yahns/sigevent.rb
new file mode 100644
index 0000000..aa95f4b
--- /dev/null
+++ b/lib/yahns/sigevent.rb
@@ -0,0 +1,7 @@
+# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net>
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+if SleepyPenguin.const_defined?(:EventFD)
+  require_relative 'sigevent_efd'
+else
+  require_relative 'sigevent_pipe'
+end
diff --git a/lib/yahns/sigevent_efd.rb b/lib/yahns/sigevent_efd.rb
new file mode 100644
index 0000000..8f10ad6
--- /dev/null
+++ b/lib/yahns/sigevent_efd.rb
@@ -0,0 +1,18 @@
+# -*- encoding: binary -*-
+# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> and all contributors
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+class Yahns::Sigevent < SleepyPenguin::EventFD # :nodoc:
+  include Kgio::DefaultWaiters
+  def self.new
+    super(0, SleepyPenguin::EventFD::CLOEXEC)
+  end
+
+  def sev_signal
+    incr(1) # eventfd_write
+  end
+
+  def yahns_step
+    value(true) # eventfd_read, we ignore this data
+    :wait_readable
+  end
+end
diff --git a/lib/yahns/sigevent_pipe.rb b/lib/yahns/sigevent_pipe.rb
new file mode 100644
index 0000000..6e1be53
--- /dev/null
+++ b/lib/yahns/sigevent_pipe.rb
@@ -0,0 +1,29 @@
+# -*- encoding: binary -*-
+# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> and all contributors
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+class Yahns::Sigevent # :nodoc:
+  attr_reader :to_io
+  def initialize
+    @to_io, @wr = Kgio::Pipe.new
+  end
+
+  def kgio_wait_readable
+    @to_io.kgio_wait_readable
+  end
+
+  def sev_signal
+    @wr.kgio_write(".")
+  end
+
+  def yahns_step
+    # 11 byte strings -> no malloc on YARV
+    while String === @to_io.kgio_tryread(11)
+    end
+    :wait_readable
+  end
+
+  def close
+    @to_io.close
+    @wr.close
+  end
+end
diff --git a/lib/yahns/socket_helper.rb b/lib/yahns/socket_helper.rb
new file mode 100644
index 0000000..61f2b0f
--- /dev/null
+++ b/lib/yahns/socket_helper.rb
@@ -0,0 +1,117 @@
+# -*- encoding: binary -*-
+# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> and all contributors
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+# this is only meant for Yahns::Server
+module Yahns::SocketHelper # :nodoc:
+  def set_server_sockopt(sock, opt)
+    opt = {backlog: 1024}.merge!(opt) if opt
+
+    TCPSocket === sock and sock.setsockopt(:IPPROTO_TCP, :TCP_NODELAY, 1)
+    sock.setsockopt(:SOL_SOCKET, :SO_KEEPALIVE, 1)
+
+    if opt[:rcvbuf] || opt[:sndbuf]
+      log_buffer_sizes(sock, "before: ")
+      { SO_RCVBUF: :rcvbuf, SO_SNDBUF: :sndbuf }.each do |optname,cfgname|
+        val = opt[cfgname] and sock.setsockopt(:SOL_SOCKET, optname, val)
+      end
+      log_buffer_sizes(sock, " after: ")
+    end
+    sock.listen(opt[:backlog])
+  rescue => e
+    Yahns::Log.exception(@logger, "#{sock_name(sock)} #{opt.inspect}", e)
+  end
+
+  def log_buffer_sizes(sock, pfx = '')
+    rcvbuf = sock.getsockopt(:SOL_SOCKET, :SO_RCVBUF).int
+    sndbuf = sock.getsockopt(:SOL_SOCKET, :SO_SNDBUF).int
+    @logger.info("#{pfx}#{sock_name(sock)} rcvbuf=#{rcvbuf} sndbuf=#{sndbuf}")
+  end
+
+  # creates a new server, socket. address may be a HOST:PORT or
+  # an absolute path to a UNIX socket.  address can even be a Socket
+  # object in which case it is immediately returned
+  def bind_listen(address, opt)
+    return address unless String === address
+    opt ||= {}
+
+    sock = if address[0] == ?/
+      if File.exist?(address)
+        if File.socket?(address)
+          begin
+            UNIXSocket.new(address).close
+            # fall through, try to bind(2) and fail with EADDRINUSE
+            # (or succeed from a small race condition we can't sanely avoid).
+          rescue Errno::ECONNREFUSED
+            @logger.info "unlinking existing socket=#{address}"
+            File.unlink(address)
+          end
+        else
+          raise ArgumentError,
+                "socket=#{address} specified but it is not a socket!"
+        end
+      end
+      old_umask = File.umask(opt[:umask] || 0)
+      begin
+        Kgio::UNIXServer.new(address)
+      ensure
+        File.umask(old_umask)
+      end
+    elsif /\A\[([a-fA-F0-9:]+)\]:(\d+)\z/ =~ address
+      new_ipv6_server($1, $2.to_i, opt)
+    elsif /\A(\d+\.\d+\.\d+\.\d+):(\d+)\z/ =~ address
+      Kgio::TCPServer.new($1, $2.to_i)
+    else
+      raise ArgumentError, "Don't know how to bind: #{address}"
+    end
+    set_server_sockopt(sock, opt)
+    sock
+  end
+
+  def new_ipv6_server(addr, port, opt)
+    opt.key?(:ipv6only) or return Kgio::TCPServer.new(addr, port)
+    sock = Socket.new(:AF_INET6, :SOCK_STREAM, 0)
+    sock.setsockopt(:IPPROTO_IPV6, :IPV6_V6ONLY, opt[:ipv6only] ? 1 : 0)
+    sock.setsockopt(:SOL_SOCKET, :SO_REUSEADDR, 1)
+    sock.bind(Socket.pack_sockaddr_in(port, addr))
+    sock.autoclose = false
+    Kgio::TCPServer.for_fd(sock.fileno)
+  end
+
+  # returns rfc2732-style (e.g. "[::1]:666") addresses for IPv6
+  def tcp_name(sock)
+    port, addr = Socket.unpack_sockaddr_in(sock.getsockname)
+    /:/ =~ addr ? "[#{addr}]:#{port}" : "#{addr}:#{port}"
+  end
+
+  # Returns the configuration name of a socket as a string.  sock may
+  # be a string value, in which case it is returned as-is
+  # Warning: TCP sockets may not always return the name given to it.
+  def sock_name(sock)
+    case sock
+    when String then sock
+    when UNIXServer
+      Socket.unpack_sockaddr_un(sock.getsockname)
+    when TCPServer
+      tcp_name(sock)
+    when Socket
+      begin
+        tcp_name(sock)
+      rescue ArgumentError
+        Socket.unpack_sockaddr_un(sock.getsockname)
+      end
+    else
+      raise ArgumentError, "Unhandled class #{sock.class}: #{sock.inspect}"
+    end
+  end
+
+  # casts a given Socket to be a TCPServer or UNIXServer
+  def server_cast(sock)
+    sock.autoclose = false
+    begin
+      Socket.unpack_sockaddr_in(sock.getsockname)
+      Kgio::TCPServer.for_fd(sock.fileno)
+    rescue ArgumentError
+      Kgio::UNIXServer.for_fd(sock.fileno)
+    end
+  end
+end
diff --git a/lib/yahns/stream_file.rb b/lib/yahns/stream_file.rb
new file mode 100644
index 0000000..eba9632
--- /dev/null
+++ b/lib/yahns/stream_file.rb
@@ -0,0 +1,34 @@
+# -*- encoding: binary -*-
+# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> et. al.
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+require_relative 'wbuf_common'
+
+class Yahns::StreamFile # :nodoc:
+  include Yahns::WbufCommon
+
+  def initialize(body, persist, offset, count)
+    if body.respond_to?(:to_io)
+      @tmpio = body.to_io
+    else
+      path = body.to_path
+      if path =~ %r{\A/dev/fd/(\d+)\z}
+        @tmpio = IO.for_fd($1.to_i)
+        @tmpio.autoclose = false
+      else
+        @tmpio = File.open(path)
+      end
+    end
+    @sf_offset = offset
+    @sf_count = count || @tmpio.stat.size
+    @wbuf_persist = persist # whether or not we keep the connection alive
+    @body = body
+  end
+
+  # called by last wbuf_flush
+  def wbuf_close(client)
+    if File === @tmpio && @tmpio != @body
+      @tmpio.close
+    end
+    wbuf_close_common(client)
+  end
+end
diff --git a/lib/yahns/stream_input.rb b/lib/yahns/stream_input.rb
new file mode 100644
index 0000000..f0a43b3
--- /dev/null
+++ b/lib/yahns/stream_input.rb
@@ -0,0 +1,150 @@
+# -*- encoding: binary -*-
+# Copyright (C) 2009-2013, Eric Wong <normalperson@yhbt.net> et. al.
+# License: GPLv2 or later (https://www.gnu.org/licenses/gpl-2.0.txt)
+
+# When processing uploads, Yahns may expose a StreamInput object under
+# "rack.input" of the (future) Rack (2.x) environment.
+class Yahns::StreamInput # :nodoc:
+  # Initializes a new StreamInput object.  You normally do not have to call
+  # this unless you are writing an HTTP server.
+  def initialize(client, request)
+    @chunked = request.content_length.nil?
+    @client = client
+    @parser = request
+    @buf = request.buf
+    @rbuf = ''
+    @bytes_read = 0
+    filter_body(@rbuf, @buf) unless @buf.empty?
+  end
+
+  # :call-seq:
+  #   ios.read([length [, buffer ]]) => string, buffer, or nil
+  #
+  # Reads at most length bytes from the I/O stream, or to the end of
+  # file if length is omitted or is nil. length must be a non-negative
+  # integer or nil. If the optional buffer argument is present, it
+  # must reference a String, which will receive the data.
+  #
+  # At end of file, it returns nil or '' depend on length.
+  # ios.read() and ios.read(nil) returns ''.
+  # ios.read(length [, buffer]) returns nil.
+  #
+  # If the Content-Length of the HTTP request is known (as is the common
+  # case for POST requests), then ios.read(length [, buffer]) will block
+  # until the specified length is read (or it is the last chunk).
+  # Otherwise, for uncommon "Transfer-Encoding: chunked" requests,
+  # ios.read(length [, buffer]) will return immediately if there is
+  # any data and only block when nothing is available (providing
+  # IO#readpartial semantics).
+  def read(length = nil, rv = '')
+    if length
+      if length <= @rbuf.size
+        length < 0 and raise ArgumentError, "negative length #{length} given"
+        rv.replace(@rbuf.slice!(0, length))
+      else
+        to_read = length - @rbuf.size
+        rv.replace(@rbuf.slice!(0, @rbuf.size))
+        until to_read == 0 || eof? || (rv.size > 0 && @chunked)
+          @client.kgio_read(to_read, @buf) or eof!
+          filter_body(@rbuf, @buf)
+          rv << @rbuf
+          to_read -= @rbuf.size
+        end
+        @rbuf.replace('')
+      end
+      rv = nil if rv.empty? && length != 0
+    else
+      read_all(rv)
+    end
+    rv
+  end
+
+  def __rsize
+    @client.class.client_body_buffer_size
+  end
+
+  # :call-seq:
+  #   ios.gets   => string or nil
+  #
+  # Reads the next ``line'' from the I/O stream; lines are separated
+  # by the global record separator ($/, typically "\n"). A global
+  # record separator of nil reads the entire unread contents of ios.
+  # Returns nil if called at the end of file.
+  # This takes zero arguments for strict Rack::Lint compatibility,
+  # unlike IO#gets.
+  def gets
+    sep = $/
+    if sep.nil?
+      read_all(rv = '')
+      return rv.empty? ? nil : rv
+    end
+    re = /\A(.*?#{Regexp.escape(sep)})/
+    rsize = __rsize
+    begin
+      @rbuf.sub!(re, '') and return $1
+      return @rbuf.empty? ? nil : @rbuf.slice!(0, @rbuf.size) if eof?
+      @client.kgio_read(rsize, @buf) or eof!
+      filter_body(once = '', @buf)
+      @rbuf << once
+    end while true
+  end
+
+  # :call-seq:
+  #   ios.each { |line| block }  => ios
+  #
+  # Executes the block for every ``line'' in *ios*, where lines are
+  # separated by the global record separator ($/, typically "\n").
+  def each
+    while line = gets
+      yield line
+    end
+
+    self # Rack does not specify what the return value is here
+  end
+
+  def eof?
+    if @parser.body_eof?
+      rsize = __rsize
+      while @chunked && ! @parser.parse
+        once = @client.kgio_read(rsize) or eof!
+        @buf << once
+      end
+      @client = nil
+      true
+    else
+      false
+    end
+  end
+
+  def filter_body(dst, src)
+    rv = @parser.filter_body(dst, src)
+    @bytes_read += dst.size
+    rv
+  end
+
+  def read_all(dst)
+    dst.replace(@rbuf)
+    @client or return
+    rsize = @client.class.client_body_buffer_size
+    until eof?
+      @client.kgio_read(rsize, @buf) or eof!
+      filter_body(@rbuf, @buf)
+      dst << @rbuf
+    end
+  ensure
+    @rbuf.replace('')
+  end
+
+  def eof!
+    # in case client only did a premature shutdown(SHUT_WR)
+    # we do support clients that shutdown(SHUT_WR) after the
+    # _entire_ request has been sent, and those will not have
+    # raised EOFError on us.
+    @client.shutdown if @client
+  ensure
+    raise Yahns::ClientShutdown, "bytes_read=#{@bytes_read}", []
+  end
+
+  def discard # return nil
+  end
+end
diff --git a/lib/yahns/tee_input.rb b/lib/yahns/tee_input.rb
new file mode 100644
index 0000000..0d91a89
--- /dev/null
+++ b/lib/yahns/tee_input.rb
@@ -0,0 +1,114 @@
+# -*- encoding: binary -*-
+# Copyright (C) 2009-2013, Eric Wong <normalperson@yhbt.net> et. al.
+# License: GPLv2 or later (https://www.gnu.org/licenses/gpl-2.0.txt)
+
+# acts like tee(1) on an input input to provide a input-like stream
+# while providing rewindable semantics through a File/StringIO backing
+# store.  On the first pass, the input is only read on demand so your
+# Rack application can use input notification (upload progress and
+# like).  This should fully conform to the Rack::Lint::InputWrapper
+# specification on the public API.  This class is intended to be a
+# strict interpretation of Rack::Lint::InputWrapper functionality and
+# will not support any deviations from it.
+#
+# When processing uploads, Yahns exposes a TeeInput object under
+# "rack.input" of the Rack environment.
+class Yahns::TeeInput < Yahns::StreamInput # :nodoc:
+  # Initializes a new TeeInput object.  You normally do not have to call
+  # this unless you are writing an HTTP server.
+  def initialize(client, request)
+    @len = request.content_length
+    super
+    @tmp = client.class.tmpio_for(@len)
+  end
+
+  # :call-seq:
+  #   ios.size  => Integer
+  #
+  # Returns the size of the input.  For requests with a Content-Length
+  # header value, this will not read data off the socket and just return
+  # the value of the Content-Length header as an Integer.
+  #
+  # For Transfer-Encoding:chunked requests, this requires consuming
+  # all of the input stream before returning since there's no other
+  # way to determine the size of the request body beforehand.
+  #
+  # This method is no longer part of the Rack specification as of
+  # Rack 1.2, so its use is not recommended.  This method only exists
+  # for compatibility with Rack applications designed for Rack 1.1 and
+  # earlier.  Most applications should only need to call +read+ with a
+  # specified +length+ in a loop until it returns +nil+.
+  def size
+    @len and return @len
+    pos = @tmp.pos
+    consume!
+    @tmp.pos = pos
+    @len = @tmp.size
+  end
+
+  # :call-seq:
+  #   ios.read([length [, buffer ]]) => string, buffer, or nil
+  #
+  # Reads at most length bytes from the I/O stream, or to the end of
+  # file if length is omitted or is nil. length must be a non-negative
+  # integer or nil. If the optional buffer argument is present, it
+  # must reference a String, which will receive the data.
+  #
+  # At end of file, it returns nil or "" depend on length.
+  # ios.read() and ios.read(nil) returns "".
+  # ios.read(length [, buffer]) returns nil.
+  #
+  # If the Content-Length of the HTTP request is known (as is the common
+  # case for POST requests), then ios.read(length [, buffer]) will block
+  # until the specified length is read (or it is the last chunk).
+  # Otherwise, for uncommon "Transfer-Encoding: chunked" requests,
+  # ios.read(length [, buffer]) will return immediately if there is
+  # any data and only block when nothing is available (providing
+  # IO#readpartial semantics).
+  def read(*args)
+    @client ? tee(super) : @tmp.read(*args)
+  end
+
+  # :call-seq:
+  #   ios.gets   => string or nil
+  #
+  # Reads the next ``line'' from the I/O stream; lines are separated
+  # by the global record separator ($/, typically "\n"). A global
+  # record separator of nil reads the entire unread contents of ios.
+  # Returns nil if called at the end of file.
+  # This takes zero arguments for strict Rack::Lint compatibility,
+  # unlike IO#gets.
+  def gets
+    @client ? tee(super) : @tmp.gets
+  end
+
+  # :call-seq:
+  #   ios.rewind    => 0
+  #
+  # Positions the *ios* pointer to the beginning of input, returns
+  # the offset (zero) of the +ios+ pointer.  Subsequent reads will
+  # start from the beginning of the previously-buffered input.
+  def rewind
+    return 0 if 0 == @tmp.size
+    consume! if @client
+    @tmp.rewind # Rack does not specify what the return value is here
+  end
+
+  # consumes the stream of the socket
+  def consume!
+    junk = ""
+    rsize = __rsize
+    nil while read(rsize, junk)
+  end
+
+  def tee(buffer)
+    if buffer && buffer.size > 0
+      @tmp.write(buffer)
+    end
+    buffer
+  end
+
+  def discard
+    @tmp = @tmp.close
+  end
+end
diff --git a/lib/yahns/tiny_input.rb b/lib/yahns/tiny_input.rb
new file mode 100644
index 0000000..55bdd03
--- /dev/null
+++ b/lib/yahns/tiny_input.rb
@@ -0,0 +1,7 @@
+# -*- encoding: binary -*-
+# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> et. al.
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+class Yahns::TinyInput < StringIO # :nodoc:
+  def discard # just returns nil
+  end
+end
diff --git a/lib/yahns/tmpio.rb b/lib/yahns/tmpio.rb
new file mode 100644
index 0000000..60751c0
--- /dev/null
+++ b/lib/yahns/tmpio.rb
@@ -0,0 +1,27 @@
+# -*- encoding: binary -*-
+# Copyright (C) 2009-2013, Eric Wong <normalperson@yhbt.net> et. al.
+# License: GPLv2 or later (https://www.gnu.org/licenses/gpl-2.0.txt)
+require 'tmpdir'
+
+# some versions of Ruby had a broken Tempfile which didn't work
+# well with unlinked files.  This one is much shorter, easier
+# to understand, and slightly faster (no delegation).
+class Yahns::TmpIO < File # :nodoc:
+
+  # creates and returns a new File object.  The File is unlinked
+  # immediately, switched to binary mode, and userspace output
+  # buffering is disabled
+  def self.new
+    fp = begin
+      super("#{Dir.tmpdir}/#{rand}", RDWR|CREAT|EXCL, 0600)
+    rescue Errno::EEXIST
+      retry
+    end
+    unlink(fp.path)
+    fp.binmode
+    fp.sync = true
+    fp
+  end
+
+  alias discard close
+end
diff --git a/lib/yahns/wbuf.rb b/lib/yahns/wbuf.rb
new file mode 100644
index 0000000..4828056
--- /dev/null
+++ b/lib/yahns/wbuf.rb
@@ -0,0 +1,36 @@
+# -*- encoding: binary -*-
+# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> et. al.
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+require_relative 'wbuf_common'
+
+class Yahns::Wbuf # :nodoc:
+  include Yahns::WbufCommon
+
+  def initialize(body, persist)
+    @tmpio = Yahns::TmpIO.new
+    @sf_offset = @sf_count = 0
+    @wbuf_persist = persist # whether or not we keep the connection alive
+    @body = body
+  end
+
+  def wbuf_write(client, buf)
+    @sf_count += @tmpio.write(buf)
+    case rv = client.trysendfile(@tmpio, @sf_offset, @sf_count)
+    when Integer
+      @sf_count -= rv
+      @sf_offset += rv
+    when :wait_writable, :wait_readable
+      return rv
+    else
+      raise "BUG: #{rv.nil ? "EOF" : rv.inspect} on tmpio " \
+            "sf_offset=#@sf_offset sf_count=#@sf_count"
+    end while @sf_count > 0
+    nil
+  end
+
+  # called by last wbuf_flush
+  def wbuf_close(client)
+    @tmpio = @tmpio.close
+    wbuf_close_common(client)
+  end
+end
diff --git a/lib/yahns/wbuf_common.rb b/lib/yahns/wbuf_common.rb
new file mode 100644
index 0000000..e621311
--- /dev/null
+++ b/lib/yahns/wbuf_common.rb
@@ -0,0 +1,32 @@
+# -*- encoding: binary -*-
+# Copyright (C) 2009-2013, Eric Wong <normalperson@yhbt.net> et. al.
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+require 'sendfile'
+module Yahns::WbufCommon # :nodoc:
+  # returns nil on success, :wait_*able when blocked
+  # currently, we rely on each thread having exclusive access to the
+  # client socket, so this is never called concurrently with wbuf_write
+  def wbuf_flush(client)
+    case rv = client.trysendfile(@tmpio, @sf_offset, @sf_count)
+    when Integer
+      return wbuf_close(client) if (@sf_count -= rv) == 0 # all sent!
+
+      @sf_offset += rv # keep going otherwise
+    when :wait_writable, :wait_readable
+      return rv
+    else
+      raise "BUG: #{rv.nil? ? "EOF" : rv.inspect} on tmpio=#{@tmpio.inspect} " \
+            "sf_offset=#@sf_offset sf_count=#@sf_count"
+    end while true
+  end
+
+  def wbuf_close_common(client)
+    @body.close if @body.respond_to?(:close)
+    if @wbuf_persist.respond_to?(:call) # hijack
+      @wbuf_persist.call(client)
+      :delete
+    else
+      @wbuf_persist # true or false or Yahns::StreamFile
+    end
+  end
+end
diff --git a/lib/yahns/worker.rb b/lib/yahns/worker.rb
new file mode 100644
index 0000000..980f7bd
--- /dev/null
+++ b/lib/yahns/worker.rb
@@ -0,0 +1,58 @@
+# -*- encoding: binary -*-
+# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> and all contributors
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+class Yahns::Worker # :nodoc:
+  attr_accessor :nr
+  attr_reader :to_io
+
+  def initialize(nr)
+    @nr = nr
+    @to_io, @wr = Kgio::Pipe.new
+  end
+
+  def atfork_child
+    @wr = @wr.close # nil @wr to save space in worker process
+  end
+
+  def atfork_parent
+    @to_io = @to_io.close
+    self
+  end
+
+  # used in the worker process.
+  # This causes the worker to gracefully exit if the master
+  # dies unexpectedly.
+  def yahns_step
+    @to_io.kgio_tryread(11) == nil and Process.kill(:QUIT, $$)
+    :wait_readable
+  end
+
+  # worker objects may be compared to just plain Integers
+  def ==(other_nr) # :nodoc:
+    @nr == other_nr
+  end
+
+  # Changes the worker process to the specified +user+ and +group+
+  # This is only intended to be called from within the worker
+  # process from the +after_fork+ hook.  This should be called in
+  # the +after_fork+ hook after any privileged functions need to be
+  # run (e.g. to set per-worker CPU affinity, niceness, etc)
+  #
+  # Any and all errors raised within this method will be propagated
+  # directly back to the caller (usually the +after_fork+ hook.
+  # These errors commonly include ArgumentError for specifying an
+  # invalid user/group and Errno::EPERM for insufficient privileges
+  def user(user, group = nil)
+    # we do not protect the caller, checking Process.euid == 0 is
+    # insufficient because modern systems have fine-grained
+    # capabilities.  Let the caller handle any and all errors.
+    uid = Etc.getpwnam(user).uid
+    gid = Etc.getgrnam(group).gid if group
+    Yahns::Log.chown_all(uid, gid)
+    if gid && Process.egid != gid
+      Process.initgroups(user, gid)
+      Process::GID.change_privilege(gid)
+    end
+    Process.euid != uid and Process::UID.change_privilege(uid)
+  end
+end