diff options
Diffstat (limited to 'lib')
29 files changed, 2481 insertions, 0 deletions
diff --git a/lib/yahns.rb b/lib/yahns.rb new file mode 100644 index 0000000..373fca0 --- /dev/null +++ b/lib/yahns.rb @@ -0,0 +1,66 @@ +# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> and all contributors +# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) +require 'unicorn' # pulls in raindrops, kgio, fcntl, etc, stringio, and logger +require 'sleepy_penguin' + +# yahns exposes no user-visible API outside of the config file +# Internals are subject to change. +module Yahns # :nodoc: + # We populate this at startup so we can figure out how to reexecute + # and upgrade the currently running instance of yahns + # Unlike unicorn, this Hash is NOT a stable/public interface. + # + # * 0 - the path to the yahns executable + # * :argv - a deep copy of the ARGV array the executable originally saw + # * :cwd - the working directory of the application, this is where + # you originally started yahns. + # + # To change your yahns executable to a different path without downtime, + # you can set the following in your yahns config file, HUP and then + # continue with the traditional USR2 + QUIT upgrade steps: + # + # Yahns::START[0] = "/home/bofh/2.0.0/bin/yahns" + START = { + :argv => ARGV.map { |arg| arg.dup }, + 0 => $0.dup, + } + + # We favor ENV['PWD'] since it is (usually) symlink aware for Capistrano + # and like systems + START[:cwd] = begin + a = File.stat(pwd = ENV['PWD']) + b = File.stat(Dir.pwd) + a.ino == b.ino && a.dev == b.dev ? pwd : Dir.pwd + rescue + Dir.pwd + end + + # Raised inside TeeInput when a client closes the socket inside the + # application dispatch. This is always raised with an empty backtrace + # since there is nothing in the application stack that is responsible + # for client shutdowns/disconnects. + class ClientShutdown < EOFError # :nodoc: + end +end + +# FIXME: require lazily +require_relative 'yahns/log' +require_relative 'yahns/queue_epoll' +require_relative 'yahns/stream_input' +require_relative 'yahns/tee_input' +require_relative 'yahns/queue_egg' +require_relative 'yahns/client_expire' +require_relative 'yahns/http_response' +require_relative 'yahns/http_client' +require_relative 'yahns/http_context' +require_relative 'yahns/queue' +require_relative 'yahns/config' +require_relative 'yahns/tmpio' +require_relative 'yahns/worker' +require_relative 'yahns/sigevent' +require_relative 'yahns/daemon' +require_relative 'yahns/socket_helper' +require_relative 'yahns/server' +require_relative 'yahns/fdmap' +require_relative 'yahns/acceptor' +require_relative 'yahns/wbuf' diff --git a/lib/yahns/acceptor.rb b/lib/yahns/acceptor.rb new file mode 100644 index 0000000..b5e7b0e --- /dev/null +++ b/lib/yahns/acceptor.rb @@ -0,0 +1,28 @@ +# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> et. al. +# License: GPLv3 or later (see COPYING for details) +module Yahns::Acceptor # :nodoc: + def spawn_acceptor(logger, client_class, queue) + accept_flags = Kgio::SOCK_NONBLOCK | Kgio::SOCK_CLOEXEC + Thread.new do + Thread.current.abort_on_exception = true + qev_flags = client_class.superclass::QEV_FLAGS + begin + # We want the accept/accept4 syscall to be _blocking_ + # so it can distribute work evenly between processes + if client = kgio_accept(client_class, accept_flags) + client.yahns_init + + # it is not safe to touch client in this thread after this, + # a worker thread may grab client right away + queue.queue_add(client, qev_flags) + end + rescue Errno::EMFILE, Errno::ENFILE => e + logger.error("#{e.message}, consider raising open file limits") + queue.fdmap.desperate_expire_for(self, 5) + sleep 1 # let other threads do some work + rescue => e + Yahns::Log.exception(logger, "accept loop error", e) unless closed? + end until closed? + end + end +end diff --git a/lib/yahns/client_expire.rb b/lib/yahns/client_expire.rb new file mode 100644 index 0000000..7da9498 --- /dev/null +++ b/lib/yahns/client_expire.rb @@ -0,0 +1,40 @@ +# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> and all contributors +# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) + +# included in Yahns::HttpClient +# +# this provides the ability to expire idle clients once we hit a soft limit +# on idle clients +# +# we absolutely DO NOT issue IO#close in here, only BasicSocket#shutdown +module Yahns::ClientExpire # :nodoc: + def yahns_expire(timeout) # rarely called + return 0 if closed? # still racy, but avoid the exception in most cases + + info = Raindrops::TCP_Info.new(self) + return 0 if info.tcpi_state != 1 # TCP_ESTABLISHED == 1 + + # Linux struct tcp_info timers are in milliseconds + timeout *= 1000 + + send_timedout = !!(info.tcpi_last_data_sent > timeout) + + # tcpi_last_data_recv is not valid unless tcpi_ato (ACK timeout) is set + if 0 == info.tcpi_ato + sd = send_timedout && (info.tcpi_last_ack_recv > timeout) + else + sd = send_timedout && (info.tcpi_last_data_recv > timeout) + end + if sd + shutdown + 1 + else + 0 + end + # we also do not expire UNIX domain sockets + # (since those are the most trusted of local clients) + # the IO#closed? check is racy + rescue + 0 + end +end diff --git a/lib/yahns/client_expire_portable.rb b/lib/yahns/client_expire_portable.rb new file mode 100644 index 0000000..2ea7706 --- /dev/null +++ b/lib/yahns/client_expire_portable.rb @@ -0,0 +1,39 @@ +# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> and all contributors +# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) +module Yahns::ClientExpire # :nodoc: + def __timestamp + Time.now.to_f + end + + def yahns_expire(timeout) + return 0 if closed? # still racy, but avoid the exception in most cases + if (__timestamp - @last_io_at) > timeout + shutdown + 1 + else + 0 + end + rescue # the IO#closed? check is racy + 0 + end + + def kgio_read(*args) + @last_io_at = __timestamp + super + end + + def kgio_write(*args) + @last_io_at = __timestamp + super + end + + def kgio_trywrite(*args) + @last_io_at = __timestamp + super + end + + def kgio_tryread(*args) + @last_io_at = __timestamp + super + end +end diff --git a/lib/yahns/config.rb b/lib/yahns/config.rb new file mode 100644 index 0000000..1d4d110 --- /dev/null +++ b/lib/yahns/config.rb @@ -0,0 +1,341 @@ +# -*- encoding: binary -*- +# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> and all contributors +# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) +# +# Implements a DSL for configuring a yahns server. +# See http://yahns.yhbt.net/examples/yahns_multi.conf.rb for a full +# example configuration file. +class Yahns::Config # :nodoc: + APP_CLASS = {} # public, see yahns/rack for usage example + CfgBlock = Struct.new(:type, :ctx) # :nodoc: + attr_reader :config_file, :config_listeners, :set + attr_reader :qeggs, :app_ctx + + def initialize(config_file = nil) + @config_file = config_file + @block = nil + config_reload! + end + + def _check_in_block(ctx, var) + if ctx == nil + return var if @block == nil + msg = "#{var} must be called outside of #{@block.type}" + else + return var if @block && ctx == @block.type + msg = @block ? "may not be used inside a #{@block.type} block" : + "must be used with a #{ctx} block" + end + raise ArgumentError, msg + end + + def config_reload! #:nodoc: + # app_instance:app_ctx is a 1:N relationship + @config_listeners = {} # name/address -> options + @app_ctx = [] + @set = Hash.new(:unset) + @qeggs = {} + @app_instances = {} + + # set defaults: + client_expire_threshold(0.5) # default is half of the open file limit + + instance_eval(File.read(@config_file), @config_file) if @config_file + + # working_directory binds immediately (easier error checking that way), + # now ensure any paths we changed are correctly set. + [ :pid, :stderr_path, :stdout_path ].each do |var| + String === (path = @set[var]) or next + path = File.expand_path(path) + File.writable?(path) || File.writable?(File.dirname(path)) or \ + raise ArgumentError, "directory for #{var}=#{path} not writable" + end + end + + def logger(obj) + var = :logger + %w(debug info warn error fatal).each do |m| + obj.respond_to?(m) and next + raise ArgumentError, "#{var}=#{obj} does not respond to method=#{m}" + end + if @block + if @block.ctx.respond_to?(:logger=) + @block.ctx.logger = obj + else + raise ArgumentError, "#{var} not valid inside #{@block.type}" + end + else + @set[var] = obj + end + end + + def worker_processes(nr) + # TODO: allow zero + var = _check_in_block(nil, :worker_processes) + @set[var] = _check_int(var, nr, 1) + end + + # sets the +path+ for the PID file of the yahns master process + def pid(path) + _set_path(:pid, path) + end + + def stderr_path(path) + _set_path(:stderr_path, path) + end + + def stdout_path(path) + _set_path(:stdout_path, path) + end + + def value(var) + val = @set[var] + val == :unset ? nil : val + end + + # sets the working directory for yahns. This ensures SIGUSR2 will + # start a new instance of yahns in this directory. This may be + # a symlink, a common scenario for Capistrano users. Unlike + # all other yahns configuration directives, this binds immediately + # for error checking and cannot be undone by unsetting it in the + # configuration file and reloading. + def working_directory(path) + var = :working_directory + @app_ctx.empty? or + raise ArgumentError, "#{var} must be declared before any apps" + + # just let chdir raise errors + path = File.expand_path(path) + if @config_file && + @config_file[0] != ?/ && + ! File.readable?("#{path}/#@config_file") + raise ArgumentError, + "config_file=#@config_file would not be accessible in" \ + " #{var}=#{path}" + end + Dir.chdir(path) + @set[var] = ENV["PWD"] = path + end + + # Runs worker processes as the specified +user+ and +group+. + # The master process always stays running as the user who started it. + # This switch will occur after calling the after_fork hooks, and only + # if the Worker#user method is not called in the after_fork hooks + # +group+ is optional and will not change if unspecified. + def user(user, group = nil) + var = :user + @block and raise "#{var} is not valid inside #{@block.type}" + # raises ArgumentError on invalid user/group + Etc.getpwnam(user) + Etc.getgrnam(group) if group + @set[var] = [ user, group ] + end + + def _set_path(var, path) #:nodoc: + _check_in_block(nil, var) + case path + when NilClass, String + @set[var] = path + else + raise ArgumentError + end + end + + def listen(address, options = {}) + options = options.dup + var = _check_in_block(:app, :listen) + address = expand_addr(address) + String === address or + raise ArgumentError, "address=#{address.inspect} must be a string" + [ :umask, :backlog, :sndbuf, :rcvbuf ].each do |key| + value = options[key] or next + Integer === value or + raise ArgumentError, "#{var}: not an integer: #{key}=#{value.inspect}" + end + [ :ipv6only ].each do |key| + (value = options[key]).nil? and next + [ true, false ].include?(value) or + raise ArgumentError, "#{var}: not boolean: #{key}=#{value.inspect}" + end + + options[:yahns_app_ctx] = @block.ctx + @config_listeners.include?(address) and + raise ArgumentError, "listen #{address} already in use" + @config_listeners[address] = options + end + + # expands "unix:path/to/foo" to a socket relative to the current path + # expands pathnames of sockets if relative to "~" or "~username" + # expands "*:port and ":port" to "0.0.0.0:port" + def expand_addr(address) #:nodoc: + return "0.0.0.0:#{address}" if Integer === address + return address unless String === address + + case address + when %r{\Aunix:(.*)\z} + File.expand_path($1) + when %r{\A~} + File.expand_path(address) + when %r{\A(?:\*:)?(\d+)\z} + "0.0.0.0:#$1" + when %r{\A\[([a-fA-F0-9:]+)\]\z}, %r/\A((?:\d+\.){3}\d+)\z/ + canonicalize_tcp($1, 80) + when %r{\A\[([a-fA-F0-9:]+)\]:(\d+)\z}, %r{\A(.*):(\d+)\z} + canonicalize_tcp($1, $2.to_i) + else + address + end + end + + def canonicalize_tcp(addr, port) + packed = Socket.pack_sockaddr_in(port, addr) + port, addr = Socket.unpack_sockaddr_in(packed) + /:/ =~ addr ? "[#{addr}]:#{port}" : "#{addr}:#{port}" + end + + def queue(name = :default, &block) + var = :queue + qegg = @qeggs[name] ||= Yahns::QueueEgg.new + prev_block = @block + _check_in_block(:app, var) if prev_block + if block_given? + @block = CfgBlock.new(:queue, qegg) + instance_eval(&block) + @block = prev_block + end + prev_block.ctx.qegg = qegg if prev_block + end + + # queue parameters (Yahns::QueueEgg) + %w(max_events worker_threads).each do |_v| + eval( + %Q(def #{_v}(val);) << + %Q( _check_in_block(:queue, :#{_v});) << + %Q( @block.ctx.__send__("#{_v}=", _check_int(:#{_v}, val, 1));) << + %Q(end) + ) + end + + def _check_int(var, n, min) + Integer === n or raise ArgumentError, "not an integer: #{var}=#{n.inspect}" + n >= min or raise ArgumentError, "too low (< #{min}): #{var}=#{n.inspect}" + n + end + + # global + def client_expire_threshold(val) + var = _check_in_block(nil, :client_expire_threshold) + val > 0 or raise ArgumentError, "#{var} must be > 0" + case val + when Float + val <= 1.0 or raise ArgumentError, "#{var} must be <= 1.0 if a ratio" + when Integer + else + raise ArgumentError, "#{var} must be a float or integer" + end + @set[var] = val + end + + # type = :rack + def app(type, *args, &block) + var = _check_in_block(nil, :app) + file = "yahns/#{type.to_s}" + begin + require file + rescue LoadError => e + raise ArgumentError, "#{type.inspect} is not a supported app type", + e.backtrace + end + klass = APP_CLASS[type] or + raise TypeError, + "#{var}: #{file} did not register #{type} in #{self.class}::APP_CLASS" + + # apps may have multiple configurator contexts + app = @app_instances[klass.instance_key(*args)] = klass.new(*args) + ctx = app.config_context + if block_given? + @block = CfgBlock.new(:app, ctx) + instance_eval(&block) + @block = nil + end + @app_ctx << ctx + end + + def _check_bool(var, val) + return val if [ true, false ].include?(val) + raise ArgumentError, "#{var} must be boolean" + end + + # boolean config directives for app + %w(check_client_connection + output_buffering + persistent_connections).each do |_v| + eval( + %Q(def #{_v}(bool);) << + %Q( _check_in_block(:app, :#{_v});) << + %Q( @block.ctx.__send__("#{_v}=", _check_bool(:#{_v}, bool));) << + %Q(end) + ) + end + + # integer config directives for app + { + # config name, minimum value + client_body_buffer_size: 1, + client_max_body_size: 0, + client_header_buffer_size: 1, + client_max_header_size: 1, + client_timeout: 0, + }.each do |_v,minval| + eval( + %Q(def #{_v}(val);) << + %Q( _check_in_block(:app, :#{_v});) << + %Q( @block.ctx.__send__("#{_v}=", _check_int(:#{_v}, val, #{minval}));) << + %Q(end) + ) + end + + def input_buffering(val) + var = _check_in_block(:app, :input_buffering) + ok = [ :lazy, true, false ] + ok.include?(val) or + raise ArgumentError, "`#{var}' must be one of: #{ok.inspect}" + @block.ctx.__send__("#{var}=", val) + end + + # used to configure rack.errors destination + def errors(val) + var = _check_in_block(:app, :errors) + if String === val + # we've already bound working_directory by the time we get here + val = File.open(File.expand_path(val), "a") + val.binmode + val.sync = true + else + rt = %w(puts write flush).map(&:to_sym) # match Rack::Lint + rt.all? { |m| val.respond_to?(m) } or raise ArgumentError, + "`#{var}' destination must respond to all of: #{rt.inspect}" + end + @block.ctx.__send__("#{var}=", val) + end + + def commit!(server) + # redirect IOs + { stdout_path: $stdout, stderr_path: $stderr }.each do |key, io| + path = @set[key] + if path == :unset && server.daemon_pipe + @set[key] = path = "/dev/null" + end + File.open(path, 'a') { |fp| io.reopen(fp) } if String === path + io.sync = true + end + + [ :logger, :pid, :worker_processes ].each do |var| + val = @set[var] + server.__send__("#{var}=", val) if val != :unset + end + queue(:default) if @qeggs.empty? + @qeggs.each_value { |qegg| qegg.logger ||= server.logger } + @app_ctx.each { |app| app.logger ||= server.logger } + end +end diff --git a/lib/yahns/daemon.rb b/lib/yahns/daemon.rb new file mode 100644 index 0000000..d12ff69 --- /dev/null +++ b/lib/yahns/daemon.rb @@ -0,0 +1,51 @@ +# -*- encoding: binary -*- +# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> and all contributors +# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) +module Yahns::Daemon # :nodoc: + # We don't do a lot of standard daemonization stuff: + # * umask is whatever was set by the parent process at startup + # and can be set in config.ru and config_file, so making it + # 0000 and potentially exposing sensitive log data can be bad + # policy. + # * don't bother to chdir("/") here since yahns is designed to + # run inside APP_ROOT. Yahns will also re-chdir() to + # the directory it was started in when being re-executed + # to pickup code changes if the original deployment directory + # is a symlink or otherwise got replaced. + def self.daemon(yahns_server) + $stdin.reopen("/dev/null") + + # We only start a new process group if we're not being reexecuted + # and inheriting file descriptors from our parent + unless ENV['YAHNS_FD'] + # grandparent - reads pipe, exits when master is ready + # \_ parent - exits immediately ASAP + # \_ yahns master - writes to pipe when ready + + # We cannot use Yahns::Sigevent (eventfd) here because we need + # to detect EOF on unexpected death, not just read/write + rd, wr = IO.pipe + grandparent = $$ + if fork + wr.close # grandparent does not write + else + rd.close # yahns master does not read + Process.setsid + exit if fork # parent dies now + end + + if grandparent == $$ + # this will block until Server#join runs (or it dies) + master_pid = (rd.readpartial(16) rescue nil).to_i + unless master_pid > 1 + warn "master failed to start, check stderr log for details" + exit!(1) + end + exit 0 + else # yahns master process + yahns_server.daemon_pipe = wr + end + end + # $stderr/$stderr can/will be redirected separately in the Yahns config + end +end diff --git a/lib/yahns/fdmap.rb b/lib/yahns/fdmap.rb new file mode 100644 index 0000000..0272421 --- /dev/null +++ b/lib/yahns/fdmap.rb @@ -0,0 +1,90 @@ +# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> and all contributors +# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) +require 'thread' + +# only initialize this after forking, this is highly volatile and won't +# be able to share data across processes at all. +# This is really a singleton + +class Yahns::Fdmap # :nodoc: + def initialize(logger, client_expire_threshold) + @logger = logger + + if Float === client_expire_threshold + client_expire_threshold *= Process.getrlimit(:NOFILE)[0] + elsif client_expire_treshhold < 0 + client_expire_threshold = Process.getrlimit(:NOFILE)[0] - + client_expire_threshold + end + @client_expire_threshold = client_expire_threshold.to_i + + # This is an array because any sane OS will frequently reuse FDs + # to keep this tightly-packed and favor lower FD numbers + # (consider select(2) performance (not that we use select)) + # An (unpacked) Hash (in MRI) uses 5 more words per entry than an Array, + # and we should expect this array to have around 60K elements + @fdmap_ary = [] + @fdmap_mtx = Mutex.new + @last_expire = 0.0 + @count = 0 + end + + # called immediately after accept() + def add(io) + fd = io.fileno + @fdmap_mtx.synchronize do + if (@count += 1) > @client_expire_threshold + __expire_for(io) + else + @fdmap_ary[fd] = io + end + end + end + + # this is only called in Errno::EMFILE/Errno::ENFILE situations + def desperate_expire_for(io, timeout) + @fdmap_mtx.synchronize { __expire_for(io, timeout) } + end + + # called before IO#close + def decr + # don't bother clearing the element in @fdmap_ary, it'll just be + # overwritten when another client connects (soon). We must not touch + # @fdmap_ary[io.fileno] after IO#close on io + @fdmap_mtx.synchronize { @count -= 1 } + end + + def delete(io) # use with rack.hijack (via yahns) + fd = io.fileno + @fdmap_mtx.synchronize do + @fdmap_ary[fd] = nil + @count -= 1 + end + end + + # expire a bunch of idle clients and register the current one + # We should not be calling this too frequently, it is expensive + # This is called while @fdmap_mtx is held + def __expire_for(io, timeout = nil) + nr = 0 + now = Time.now.to_f + (now - @last_expire) >= 1.0 or return # don't expire too frequently + + # @fdmap_ary may be huge, so always expire a bunch at once to + # avoid getting to this method too frequently + @fdmap_ary.each do |c| + c.respond_to?(:yahns_expire) or next + nr += c.yahns_expire(timeout || c.class.client_timeout) + end + + @fdmap_ary[io.fileno] = io + @last_expire = Time.now.to_f + msg = timeout ? "timeout=#{timeout})" : "client_timeout" + @logger.info("dropping #{nr} of #@count clients for #{msg}") + end + + # used for graceful shutdown + def size + @fdmap_mtx.synchronize { @count } + end +end diff --git a/lib/yahns/http_client.rb b/lib/yahns/http_client.rb new file mode 100644 index 0000000..8171460 --- /dev/null +++ b/lib/yahns/http_client.rb @@ -0,0 +1,196 @@ +# -*- encoding: binary -*- +# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> and all contributors +# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) +require 'yahns/tiny_input' +class Yahns::HttpClient < Kgio::Socket # :nodoc: + NULL_IO = Yahns::TinyInput.new("") + + include Yahns::HttpResponse + include Yahns::ClientExpire + QEV_FLAGS = Yahns::Queue::QEV_RD # used by acceptor + HTTP_RESPONSE_START = [ 'HTTP', '/1.1 ' ] + + # A frozen format for this is about 15% faster (note from Mongrel) + REMOTE_ADDR = 'REMOTE_ADDR'.freeze + RACK_INPUT = 'rack.input'.freeze + RACK_HIJACK = 'rack.hijack'.freeze + RACK_HIJACK_IO = "rack.hijack_io".freeze + + # called from acceptor thread + def yahns_init + @hs = Unicorn::HttpRequest.new + @response_start_sent = false + @state = :headers # :body, :trailers, :pipelined, Wbuf, StreamFile + @input = nil + end + + # use if writes are deferred by buffering, this return value goes to + # the main epoll/kqueue worker loop + # returns :wait_readable, :wait_writable, or nil + def step_write + case rv = @state.wbuf_flush(self) + when :wait_writable, :wait_readable + return rv # tell epoll/kqueue to wait on this more + when :delete # :delete on hijack + @state = :delete + return :delete + when Yahns::StreamFile + @state = rv # continue looping + when true, false # done + return http_response_done(rv) + else + raise "BUG: #{@state.inspect}#wbuf_flush returned #{rv.inspect}" + end while true + end + + def mkinput_preread + @state = :body + @input = self.class.tmpio_for(@hs.content_length) + rbuf = Thread.current[:yahns_rbuf] + @hs.filter_body(rbuf, @hs.buf) + @input.write(rbuf) + end + + def input_ready + empty_body = 0 == @hs.content_length + k = self.class + case k.input_buffering + when true + # common case is an empty body + return NULL_IO if empty_body + + # content_length is nil (chunked) or len > 0 + mkinput_preread # keep looping + false + else # :lazy, false + empty_body ? NULL_IO : k.mkinput(self, @hs) + end + end + + # the main entry point of the epoll/kqueue worker loop + def yahns_step + # always write unwritten data first if we have any + return step_write if Yahns::WbufCommon === @state + + # only read if we had nothing to write in this event loop iteration + k = self.class + rbuf = Thread.current[:yahns_rbuf] # running under spawn_worker_threads + + case @state + when :pipelined + if @hs.parse + input = input_ready and return app_call(input) + # @state == :body if we get here point (input_ready -> mkinput_preread) + else + @state = :headers + end + # continue to outer loop + when :headers + case rv = kgio_tryread(k.client_header_buffer_size, rbuf) + when String + if @hs.add_parse(rv) + input = input_ready and return app_call(input) + break # to outer loop to reevaluate @state == :body + end + # keep looping on kgio_tryread + when :wait_readable, :wait_writable, nil + return rv + end while true + when :body + if @hs.body_eof? + if @hs.content_length || @hs.parse # hp.parse == trailers done! + @input.rewind + return app_call(@input) + else # possible Transfer-Encoding:chunked, keep looping + @state = :trailers + end + else + case rv = kgio_tryread(k.client_body_buffer_size, rbuf) + when String + @hs.filter_body(rbuf, @hs.buf << rbuf) + @input.write(rbuf) + # keep looping on kgio_tryread... + when :wait_readable, :wait_writable + return rv # have epoll/kqueue wait for more + when nil # unexpected EOF + return @input.close # nil + end # continue to outer loop (case @state) + end + when :trailers + case rv = kgio_tryread(k.client_header_buffer_size, rbuf) + when String + if @hs.add_parse(rbuf) + @input.rewind + return app_call(@input) + end + # keep looping on kgio_tryread... + when :wait_readable, :wait_writable + return rv # wait for more + when nil # unexpected EOF + return @input.close # nil + end while true + end while true # outer loop + rescue => e + handle_error(e) + end + + def app_call(input) + env = @hs.env + env[REMOTE_ADDR] = @kgio_addr + env[RACK_HIJACK] = hijack_proc(env) + env[RACK_INPUT] = @input ||= input + k = self.class + + if k.check_client_connection && @hs.headers? + @response_start_sent = true + # FIXME: we should buffer this just in case + HTTP_RESPONSE_START.each { |c| kgio_write(c) } + end + + # run the rack app + response = k.app.call(env.merge!(k.app_defaults)) + return :delete if env.include?(RACK_HIJACK_IO) + + # this returns :wait_readable, :wait_writable, :delete, or nil: + http_response_write(*response) + end + + def hijack_proc(env) + proc { env[RACK_HIJACK_IO] = self } + end + + # called automatically by kgio_write + def kgio_wait_writable(timeout = self.class.client_timeout) + super timeout + end + + # called automatically by kgio_read + def kgio_wait_readable(timeout = self.class.client_timeout) + super timeout + end + + # if we get any error, try to write something back to the client + # assuming we haven't closed the socket, but don't get hung up + # if the socket is already closed or broken. We'll always return + # nil to ensure the socket is closed at the end of this function + def handle_error(e) + code = case e + when EOFError,Errno::ECONNRESET,Errno::EPIPE,Errno::ENOTCONN + return # don't send response, drop the connection + when Unicorn::RequestURITooLongError + 414 + when Unicorn::RequestEntityTooLargeError + 413 + when Unicorn::HttpParserError # try to tell the client they're bad + 400 + else + Yahns::Log.exception(@hs.env["rack.logger"], "app error", e) + 500 + end + kgio_trywrite(err_response(code)) + rescue + ensure + shutdown rescue nil + return # always drop the connection on uncaught errors + end +end diff --git a/lib/yahns/http_context.rb b/lib/yahns/http_context.rb new file mode 100644 index 0000000..1af41df --- /dev/null +++ b/lib/yahns/http_context.rb @@ -0,0 +1,66 @@ +# -*- encoding: binary -*- +# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> and all contributors +# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) +require 'yahns/tiny_input' + +# subclasses of Yahns::HttpClient will class extend this + +module Yahns::HttpContext # :nodoc: + attr_accessor :check_client_connection + attr_accessor :client_body_buffer_size + attr_accessor :client_header_buffer_size + attr_accessor :client_max_body_size + attr_accessor :client_max_header_size + attr_accessor :input_buffering # :lazy, true, false + attr_accessor :output_buffering # true, false + attr_accessor :persistent_connections # true or false only + attr_accessor :client_timeout + attr_accessor :qegg + attr_reader :app + attr_reader :app_defaults + + def http_ctx_init(yahns_rack) + @yahns_rack = yahns_rack + @app_defaults = yahns_rack.app_defaults + @check_client_connection = false + @client_body_buffer_size = 112 * 1024 + @client_header_buffer_size = 4000 + @client_max_body_size = 1024 * 1024 + @input_buffering = true + @output_buffering = true + @persistent_connections = true + @client_timeout = 15 + @qegg = nil + end + + # call this after forking + def after_fork_init + @app = @yahns_rack.app_after_fork + end + + # call this immediately after successful accept()/accept4() + def logger=(l) # cold + @logger = @app_defaults["rack.logger"] = l + end + + def logger + @app_defaults["rack.logger"] + end + + def mkinput(client, hs) + (@input_buffering ? Yahns::TeeInput : Yahns::StreamInput).new(client, hs) + end + + def errors=(dest) + @app_defaults["rack.errors"] = dest + end + + def errors + @app_defaults["rack.errors"] + end + + def tmpio_for(len) + len && len <= @client_body_buffer_size ? + Yahns::TinyInput.new("") : Yahns::TmpIO.new + end +end diff --git a/lib/yahns/http_response.rb b/lib/yahns/http_response.rb new file mode 100644 index 0000000..aad2762 --- /dev/null +++ b/lib/yahns/http_response.rb @@ -0,0 +1,183 @@ +# -*- encoding: binary -*- +# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> and all contributors +# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) +require_relative 'stream_file' + +# Writes a Rack response to your client using the HTTP/1.1 specification. +# You use it by simply doing: +# +# status, headers, body = rack_app.call(env) +# http_response_write(status, headers, body) +# +# Most header correctness (including Content-Length and Content-Type) +# is the job of Rack, with the exception of the "Date" header. +module Yahns::HttpResponse # :nodoc: + include Unicorn::HttpResponse + + # avoid GC overhead for frequently used-strings: + CONN_KA = "Connection: keep-alive\r\n\r\n" + CONN_CLOSE = "Connection: close\r\n\r\n" + Z = "" + RESPONSE_START = "HTTP/1.1 " + + def response_start + @response_start_sent ? Z : RESPONSE_START + end + + def response_wait_write(rv) + # call the kgio_wait_readable or kgio_wait_writable method + ok = __send__("kgio_#{rv}") and return ok + k = self.class + k.logger.info("fd=#{fileno} ip=#@kgio_addr timeout on :#{rv} after "\ + "#{k.client_timeout}s") + nil + end + + def err_response(code) + "#{response_start}#{CODES[code]}\r\n\r\n" + end + + def response_header_blocked(ret, header, body, alive, offset, count) + if body.respond_to?(:to_path) + alive = Yahns::StreamFile.new(body, alive, offset, count) + body = nil + end + wbuf = Yahns::Wbuf.new(body, alive) + rv = wbuf.wbuf_write(self, header) + body.each { |chunk| rv = wbuf.wbuf_write(self, chunk) } if body + wbuf_maybe(wbuf, rv, alive) + end + + def wbuf_maybe(wbuf, rv, alive) + case rv # trysendfile return value + when nil + case alive + when :delete + @state = :delete + when true, false + http_response_done(alive) + end + else + @state = wbuf + rv + end + end + + def http_response_done(alive) + @input = @input.discard if @input + if alive + @response_start_sent = false + # @hs.buf will have data if the client pipelined + if @hs.buf.empty? + @state = :headers + :wait_readable + else + @state = :pipelined + # may need to wait for readability if SSL, + # only need writability if plain TCP + :wait_readwrite + end + else + # shutdown is needed in case the app forked, we rescue here since + # StreamInput may issue shutdown as well + shutdown rescue nil + nil # trigger close + end + end + + # writes the rack_response to socket as an HTTP response + # returns :wait_readable, :wait_writable, :forget, or nil + def http_response_write(status, headers, body) + status = CODES[status.to_i] || status + offset = 0 + count = hijack = nil + alive = @hs.next? + + if @hs.headers? + buf = "#{response_start}#{status}\r\nDate: #{httpdate}\r\n" + headers.each do |key, value| + case key + when %r{\ADate\z} + next + when %r{\AContent-Range\z}i + if %r{\Abytes (\d+)-(\d+)/\d+\z} =~ value + offset = $1.to_i + count = $2.to_i - offset + 1 + end + when %r{\AConnection\z}i + # allow Rack apps to tell us they want to drop the client + alive = !!(value =~ /\bclose\b/i) + when "rack.hijack" + hijack = value + body = nil # ensure we do not close body + else + if value =~ /\n/ + # avoiding blank, key-only cookies with /\n+/ + buf << value.split(/\n+/).map! { |v| "#{key}: #{v}\r\n" }.join + else + buf << "#{key}: #{value}\r\n" + end + end + end + buf << (alive ? CONN_KA : CONN_CLOSE) + case rv = kgio_trywrite(buf) + when nil # all done, likely + break + when String + buf = rv # hope the skb grows + when :wait_writable, :wait_readable + if self.class.output_buffering + alive = hijack ? hijack : alive + rv = response_header_blocked(rv, buf, body, alive, offset, count) + body = nil # ensure we do not close body in ensure + return rv + else + response_wait_write(rv) or return + end + end while true + end + + if hijack + hijack.call(self) + return :delete # trigger EPOLL_CTL_DEL + end + + if body.respond_to?(:to_path) + @state = body = Yahns::StreamFile.new(body, alive, offset, count) + return step_write + end + + wbuf = rv = nil + body.each do |chunk| + if wbuf + rv = wbuf.wbuf_write(self, chunk) + else + case rv = kgio_trywrite(chunk) + when nil # all done, likely and good! + break + when String + chunk = rv # hope the skb grows when we loop into the trywrite + when :wait_writable, :wait_readable + if self.class.output_buffering + wbuf = Yahns::Wbuf.new(body, alive) + rv = wbuf.wbuf_write(self, chunk) + break + else + response_wait_write(rv) or return + end + end while true + end + end + + # if we buffered the write body, we must return :wait_writable + # (or :wait_readable for SSL) and hit Yahns::HttpClient#step_write + if wbuf + body = nil # ensure we do not close the body in ensure + wbuf_maybe(wbuf, rv, alive) + else + http_response_done(alive) + end + ensure + body.respond_to?(:close) and body.close + end +end diff --git a/lib/yahns/log.rb b/lib/yahns/log.rb new file mode 100644 index 0000000..59baa85 --- /dev/null +++ b/lib/yahns/log.rb @@ -0,0 +1,73 @@ +# -*- encoding: binary -*- +# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> and all contributors +# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) + +# logging-related utility functions for all of yahns +module Yahns::Log # :nodoc: + def self.exception(logger, prefix, exc) + message = exc.message + message = message.dump if /[[:cntrl:]]/ =~ message # prevent code injection + logger.error "#{prefix}: #{message} (#{exc.class})" + exc.backtrace.each { |line| logger.error(line) } + end + + def self.is_log?(fp) + append_flags = IO::WRONLY | IO::APPEND + + ! fp.closed? && + fp.stat.file? && + fp.sync && + (fp.fcntl(Fcntl::F_GETFL) & append_flags) == append_flags + rescue IOError, Errno::EBADF + false + end + + def self.chown_all(uid, gid) + ObjectSpace.each_object(File) do |fp| + fp.chown(uid, gid) if is_log?(fp) + end + end + + # This reopens ALL logfiles in the process that have been rotated + # using logrotate(8) (without copytruncate) or similar tools. + # A +File+ object is considered for reopening if it is: + # 1) opened with the O_APPEND and O_WRONLY flags + # 2) the current open file handle does not match its original open path + # 3) unbuffered (as far as userspace buffering goes, not O_SYNC) + # Returns the number of files reopened + def self.reopen_all + to_reopen = [] + nr = 0 + ObjectSpace.each_object(File) { |fp| is_log?(fp) and to_reopen << fp } + + to_reopen.each do |fp| + orig_st = begin + fp.stat + rescue IOError, Errno::EBADF + next + end + + begin + b = File.stat(fp.path) + next if orig_st.ino == b.ino && orig_st.dev == b.dev + rescue Errno::ENOENT + end + + begin + File.open(fp.path, 'a') { |tmpfp| fp.reopen(tmpfp) } + fp.sync = true + new_st = fp.stat + + # this should only happen in the master: + if orig_st.uid != new_st.uid || orig_st.gid != new_st.gid + fp.chown(orig_st.uid, orig_st.gid) + end + + nr += 1 + rescue IOError, Errno::EBADF + # not much we can do... + end + end + nr + end +end diff --git a/lib/yahns/queue.rb b/lib/yahns/queue.rb new file mode 100644 index 0000000..add4f78 --- /dev/null +++ b/lib/yahns/queue.rb @@ -0,0 +1,7 @@ +# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> +# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) +if SleepyPenguin.const_defined?(:Epoll) + require_relative 'queue_epoll' +else + require_relative 'queue_kqueue' # TODO +end diff --git a/lib/yahns/queue_egg.rb b/lib/yahns/queue_egg.rb new file mode 100644 index 0000000..a2abc2f --- /dev/null +++ b/lib/yahns/queue_egg.rb @@ -0,0 +1,23 @@ +# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> and all contributors +# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) + +# this represents a Yahns::Queue before its vivified. This only +# lives in the parent process and should be clobbered after qc_vivify +class Yahns::QueueEgg # :nodoc: + attr_writer :max_events, :worker_threads + attr_accessor :logger + + def initialize + @max_events = 1 # 1 is good if worker_threads > 1 + @worker_threads = 7 # any default is wrong for most apps... + @logger = nil + end + + # only call after forking + def qc_vivify(fdmap) + queue = Yahns::Queue.new + queue.fdmap = fdmap + queue.spawn_worker_threads(@logger, @worker_threads, @max_events) + queue + end +end diff --git a/lib/yahns/queue_epoll.rb b/lib/yahns/queue_epoll.rb new file mode 100644 index 0000000..c9febc4 --- /dev/null +++ b/lib/yahns/queue_epoll.rb @@ -0,0 +1,57 @@ +# -*- encoding: binary -*- +# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> and all contributors +# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) +class Yahns::Queue < SleepyPenguin::Epoll::IO # :nodoc: + include SleepyPenguin + attr_accessor :fdmap # Yahns::Fdmap + + # public + QEV_RD = Epoll::IN | Epoll::ONESHOT + QEV_WR = Epoll::OUT | Epoll::ONESHOT + QEV_RDWR = QEV_RD | QEV_WR + + def self.new + super(SleepyPenguin::Epoll::CLOEXEC) + end + + # for HTTP and HTTPS servers, we rely on the io writing to us, first + # flags: QEV_RD/QEV_WR (usually QEV_RD) + def queue_add(io, flags) + @fdmap.add(io) + epoll_ctl(Epoll::CTL_ADD, io, flags) + end + + # returns an array of infinitely running threads + def spawn_worker_threads(logger, worker_threads, max_events) + worker_threads.times do + Thread.new do + Thread.current[:yahns_rbuf] = "" + begin + epoll_wait(max_events) do |_, io| # don't care for flags for now + case rv = io.yahns_step + when :wait_readable + epoll_ctl(Epoll::CTL_MOD, io, QEV_RD) + when :wait_writable + epoll_ctl(Epoll::CTL_MOD, io, QEV_WR) + when :wait_readwrite + epoll_ctl(Epoll::CTL_MOD, io, QEV_RDWR) + when :delete # only used by rack.hijack + epoll_ctl(Epoll::CTL_DEL, io, 0) + @fdmap.delete(io) + when nil + # this is be the ONLY place where we call IO#close on + # things inside the queue + io.close + @fdmap.decr + else + raise "BUG: #{io.inspect}#yahns_step returned: #{rv.inspect}" + end + end + rescue => e + break if (IOError === e || Errno::EBADF === e) && closed? + Yahns::Log.exception(logger, 'queue loop', e) + end while true + end + end + end +end diff --git a/lib/yahns/rack.rb b/lib/yahns/rack.rb new file mode 100644 index 0000000..27857ec --- /dev/null +++ b/lib/yahns/rack.rb @@ -0,0 +1,80 @@ +# -*- encoding: binary -*- +# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> and all contributors +# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) +require 'rack' +class Yahns::Rack # :nodoc: + attr_reader :preload + + # enforce a single instance for the identical config.ru + def self.instance_key(*args) + ru = args[0] + + # it's safe to expand_path now since we enforce working_directory in the + # top-level config is called before any apps are created + # ru may also be a Rack::Builder object or any already-built Rack app + ru.respond_to?(:call) ? ru.object_id : File.expand_path(ru) + end + + def initialize(ru, opts = {}) + # always called after config file parsing, may be called after forking + @app = lambda do + ENV["RACK_ENV"] ||= "none" + if ru.respond_to?(:call) + inner_app = ru.respond_to?(:to_app) ? ru.to_app : ru + else + inner_app = case ru + when /\.ru$/ + raw = File.read(ru) + raw.sub!(/^__END__\n.*/, '') + eval("Rack::Builder.new {(\n#{raw}\n)}.to_app", TOPLEVEL_BINDING, ru) + else + require ru + Object.const_get(File.basename(ru, '.rb').capitalize) + end + end + inner_app + end + @ru = ru + @preload = opts[:preload] + build_app! if @preload + end + + def config_context + ctx_class = Class.new(Yahns::HttpClient) + ctx_class.extend(Yahns::HttpContext) + ctx_class.http_ctx_init(self) + ctx_class + end + + def build_app! + if @app.respond_to?(:arity) && @app.arity == 0 + Gem.refresh if defined?(Gem) && Gem.respond_to?(:refresh) + @app = @app.call + end + end + + # allow different HttpContext instances to have different Rack defaults + def app_defaults + { + # logger is set in http_context + "rack.errors" => $stderr, + "rack.multiprocess" => true, + "rack.multithread" => true, + "rack.run_once" => false, + "rack.hijack?" => true, + "rack.version" => [ 1, 2 ], + "SCRIPT_NAME" => "", + + # this is not in the Rack spec, but some apps may rely on it + "SERVER_SOFTWARE" => "yahns" + } + end + + def app_after_fork + build_app! unless @preload + @app + end +end + +# register ourselves +Yahns::Config::APP_CLASS[:rack] = Yahns::Rack diff --git a/lib/yahns/server.rb b/lib/yahns/server.rb new file mode 100644 index 0000000..c7a5a57 --- /dev/null +++ b/lib/yahns/server.rb @@ -0,0 +1,328 @@ +# -*- encoding: binary -*- +# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> and all contributors +# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) +class Yahns::Server # :nodoc: + QUEUE_SIGS = [ :WINCH, :QUIT, :INT, :TERM, :USR1, :USR2, :HUP, :TTIN, :TTOU ] + attr_accessor :daemon_pipe + attr_accessor :logger + attr_writer :worker_processes + include Yahns::SocketHelper + + def initialize(config) + @reexec_pid = 0 + @daemon_pipe = nil # writable IO or true + @config = config + @workers = {} # pid -> workers + @sig_queue = [] # nil in forked workers + @logger = Logger.new($stderr) + @sev = Yahns::Sigevent.new + @listeners = [] + @pid = nil + @worker_processes = nil + @user = nil + end + + def sqwakeup(sig) + @sig_queue << sig + @sev.sev_signal + end + + def start + @config.commit!(self) + inherit_listeners! + # we try inheriting listeners first, so we bind them later. + # we don't write the pid file until we've bound listeners in case + # yahns was started twice by mistake. Even though our #pid= method + # checks for stale/existing pid files, race conditions are still + # possible (and difficult/non-portable to avoid) and can be likely + # to clobber the pid if the second start was in quick succession + # after the first, so we rely on the listener binding to fail in + # that case. Some tests (in and outside of this source tree) and + # monitoring tools may also rely on pid files existing before we + # attempt to connect to the listener(s) + + # setup signal handlers before writing pid file in case people get + # trigger happy and send signals as soon as the pid file exists. + QUEUE_SIGS.each { |sig| trap(sig) { sqwakeup(sig) } } + self.pid = @config.value(:pid) # write pid file + bind_new_listeners! + if @worker_processes + require 'yahns/server_mp' + extend Yahns::ServerMP + mp_init + end + self + end + + # replaces current listener set with +listeners+. This will + # close the socket if it will not exist in the new listener set + def listeners=(listeners) + cur_names, dead_names = [], [] + listener_names.each do |name| + if ?/ == name[0] + # mark unlinked sockets as dead so we can rebind them + (File.socket?(name) ? cur_names : dead_names) << name + else + cur_names << name + end + end + set_names = listener_names(listeners) + dead_names.concat(cur_names - set_names).uniq! + + @listeners.delete_if do |io| + if dead_names.include?(sock_name(io)) + (io.close rescue nil).nil? # true + else + set_server_sockopt(io, sock_opts(io)) + false + end + end + + (set_names - cur_names).each { |addr| listen(addr) } + end + + # sets the path for the PID file of the master process + def pid=(path) + if path + if x = valid_pid?(path) + return path if @pid && path == @pid && x == $$ + if x == @reexec_pid && @pid =~ /\.oldbin\z/ + @logger.warn("will not set pid=#{path} while reexec-ed "\ + "child is running PID:#{x}") + return + end + raise ArgumentError, "Already running on PID:#{x} " \ + "(or pid=#{path} is stale)" + end + end + unlink_pid_safe(@pid) if @pid + + if path + fp = begin + tmp = "#{File.dirname(path)}/#{rand}.#$$" + File.open(tmp, File::RDWR|File::CREAT|File::EXCL, 0644) + rescue Errno::EEXIST + retry + end + fp.syswrite("#$$\n") + File.rename(fp.path, path) + fp.close + end + @pid = path + end + + # add a given address to the +listeners+ set, idempotently + # Allows workers to add a private, per-process listener via the + # after_fork hook. Very useful for debugging and testing. + # +:tries+ may be specified as an option for the number of times + # to retry, and +:delay+ may be specified as the time in seconds + # to delay between retries. + # A negative value for +:tries+ indicates the listen will be + # retried indefinitely, this is useful when workers belonging to + # different masters are spawned during a transparent upgrade. + def listen(address) + address = @config.expand_addr(address) + return if String === address && listener_names.include?(address) + + begin + io = bind_listen(address, sock_opts(address)) + unless Kgio::TCPServer === io || Kgio::UNIXServer === io + io = server_cast(io) + end + @logger.info "listening on addr=#{sock_name(io)} fd=#{io.fileno}" + @listeners << io + io + rescue Errno::EADDRINUSE => err + @logger.error "adding listener failed addr=#{address} (in use)" + rescue => err + @logger.fatal "error adding listener addr=#{address}" + raise err + end + end + + def daemon_ready + @daemon_pipe or return + @daemon_pipe.syswrite("#$$") + @daemon_pipe.close + @daemon_pipe = true # for SIGWINCH + end + + # reexecutes the Yahns::START with a new binary + def reexec + if @reexec_pid > 0 + begin + Process.kill(0, @reexec_pid) + @logger.error "reexec-ed child already running PID:#@reexec_pid" + return + rescue Errno::ESRCH + @reexec_pid = 0 + end + end + + if @pid + old_pid = "#@pid.oldbin" + begin + self.pid = old_pid # clear the path for a new pid file + rescue ArgumentError + @logger.error "old PID:#{valid_pid?(old_pid)} running with " \ + "existing pid=#{old_pid}, refusing rexec" + return + rescue => e + @logger.error "error writing pid=#{old_pid} #{e.class} #{e.message}" + return + end + end + + @reexec_pid = fork do + redirects = {} + listeners.each do |sock| + sock.close_on_exec = false + redirects[sock.fileno] = sock + end + ENV['YAHNS_FD'] = redirects.keys.map(&:to_s).join(',') + Dir.chdir(@config.value(:working_directory) || Yahns::START[:cwd]) + cmd = [ Yahns::START[0] ].concat(Yahns::START[:argv]) + @logger.info "executing #{cmd.inspect} (in #{Dir.pwd})" + cmd << redirects + exec(*cmd) + end + proc_name 'master (old)' + end + + # unlinks a PID file at given +path+ if it contains the current PID + # still potentially racy without locking the directory (which is + # non-portable and may interact badly with other programs), but the + # window for hitting the race condition is small + def unlink_pid_safe(path) + (File.read(path).to_i == $$ and File.unlink(path)) rescue nil + end + + # returns a PID if a given path contains a non-stale PID file, + # nil otherwise. + def valid_pid?(path) + wpid = File.read(path).to_i + wpid <= 0 and return + Process.kill(0, wpid) + wpid + rescue Errno::EPERM + @logger.info "pid=#{path} possibly stale, got EPERM signalling PID:#{wpid}" + nil + rescue Errno::ESRCH, Errno::ENOENT + # don't unlink stale pid files, racy without non-portable locking... + end + + def load_config! + @logger.info "reloading config_file=#{@config.config_file}" + @config.config_reload! + @config.commit!(self) + kill_each_worker(:QUIT) + Yahns::Log.reopen_all + @logger.info "done reloading config_file=#{@config.config_file}" + rescue StandardError, LoadError, SyntaxError => e + Yahns::Log.exception(@logger, + "error reloading config_file=#{@config.config_file}", e) + end + + # returns an array of string names for the given listener array + def listener_names(listeners = @listeners) + listeners.map { |io| sock_name(io) } + end + + def sock_opts(io) + @config.config_listeners[sock_name(io)] + end + + def inherit_listeners! + # inherit sockets from parents, they need to be plain Socket objects + # before they become Kgio::UNIXServer or Kgio::TCPServer + inherited = ENV['YAHNS_FD'].to_s.split(/,/).map do |fd| + io = Socket.for_fd(fd.to_i) + set_server_sockopt(io, sock_opts(io)) + @logger.info "inherited addr=#{sock_name(io)} fd=#{fd}" + server_cast(io) + end + + @listeners.replace(inherited) + end + + # call only after calling inherit_listeners! + # This binds any listeners we did NOT inherit from the parent + def bind_new_listeners! + self.listeners = @config.config_listeners.keys + raise ArgumentError, "no listeners" if @listeners.empty? + @listeners.each { |l| l.extend(Yahns::Acceptor) } + end + + def proc_name(tag) + s = Yahns::START + $0 = ([ File.basename(s[0]), tag ]).concat(s[:argv]).join(' ') + end + + # spins up processing threads of the server + def fdmap_init + thresh = @config.value(:client_expire_threshold) + + # keeps track of all connections, like ObjectSpace, but only for IOs + fdmap = Yahns::Fdmap.new(@logger, thresh) + + # initialize queues (epoll/kqueue) and associated worker threads + queues = {} + @config.qeggs.each do |name, qegg| + queue = qegg.qc_vivify(fdmap) # worker threads run after this + queues[qegg] = queue + end + + # spin up applications (which are preload: false) + @config.app_ctx.each { |ctx| ctx.after_fork_init } + + # spin up acceptors, clients flow into worker queues after this + @listeners.each do |l| + ctx = sock_opts(l)[:yahns_app_ctx] + qegg = ctx.qegg || @config.qeggs[:default] + + # acceptors feed the the queues + l.spawn_acceptor(@logger, ctx, queues[qegg]) + end + fdmap + end + + def usr1_reopen(prefix) + @logger.info "#{prefix}reopening logs..." + Yahns::Log.reopen_all + @logger.info "#{prefix}done reopening logs" + end + + def sp_sig_handle(alive) + @sev.kgio_wait_readable(alive ? nil : 0.01) + @sev.yahns_step + case sig = @sig_queue.shift + when :QUIT, :TERM, :INT + self.listeners = [] # stop accepting new connections + exit(0) unless alive + return false + when :USR1 + usr1_reopen('') + when :USR2 + reexec + when :HUP + reexec + return false + when :TTIN, :TTOU, :WINCH + @logger.info("SIG#{sig} ignored in single-process mode") + end + alive + end + + # single-threaded only, this is overriden if @worker_processes is non-nil + def join + daemon_ready + fdmap = fdmap_init + alive = true + begin + alive = sp_sig_handle(alive) + rescue => e + Yahns::Log.exception(@logger, "main loop", e) + end while alive || fdmap.size > 0 + unlink_pid_safe(@pid) if @pid + end +end diff --git a/lib/yahns/server_mp.rb b/lib/yahns/server_mp.rb new file mode 100644 index 0000000..8818bac --- /dev/null +++ b/lib/yahns/server_mp.rb @@ -0,0 +1,184 @@ +# -*- encoding: binary -*- +# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> and all contributors +# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) +module Yahns::ServerMP # :nodoc: + EXIT_SIGS = [ :QUIT, :TERM, :INT ] + + def mp_init + trap(:CHLD) { @sev.sev_signal } + end + + # reaps all unreaped workers + def reap_all_workers + begin + wpid, status = Process.waitpid2(-1, Process::WNOHANG) + wpid or return + if @reexec_pid == wpid + @logger.error "reaped #{status.inspect} exec()-ed" + @reexec_pid = 0 + self.pid = @pid.chomp('.oldbin') if @pid + proc_name 'master' + else + worker = @workers.delete(wpid) + worker_id = worker ? worker.nr : "(unknown)" + m = "reaped #{status.inspect} worker=#{worker_id}" + status.success? ? @logger.info(m) : @logger.error(m) + end + rescue Errno::ECHILD + return + end while true + end + + def maintain_worker_count + (off = @workers.size - @worker_processes) == 0 and return + off < 0 and return spawn_missing_workers + @workers.each_pair do |wpid, worker| + worker.nr >= @worker_processes and Process.kill(:QUIT, wpid) + end + end + + # delivers a signal to each worker + def kill_each_worker(signal) + @workers.each_key { |wpid| Process.kill(signal, wpid) } + end + + # this is the first thing that runs after forking in a child + # gets rid of stuff the worker has no business keeping track of + # to free some resources and drops all sig handlers. + # traps for USR1, USR2, and HUP may be set in the after_fork Proc + # by the user. + def after_fork_internal(worker) + worker.atfork_child + + # daemon_pipe may be true for non-initial workers + @daemon_pipe = @daemon_pipe.close if @daemon_pipe.respond_to?(:close) + + srand # in case this pops up again: https://bugs.ruby-lang.org/issues/4338 + + # The OpenSSL PRNG is seeded with only the pid, and apps with frequently + # dying workers can recycle pids + OpenSSL::Random.seed(rand.to_s) if defined?(OpenSSL::Random) + # we'll re-trap EXIT_SIGS later for graceful shutdown iff we accept clients + EXIT_SIGS.each { |sig| trap(sig) { exit!(0) } } + exit!(0) if (@sig_queue & EXIT_SIGS)[0] # did we inherit sigs from parent? + @sig_queue = [] + + # ignore WINCH, TTIN, TTOU, HUP in the workers + (Yahns::Server::QUEUE_SIGS - EXIT_SIGS).each { |sig| trap(sig, nil) } + trap(:CHLD, 'DEFAULT') + @logger.info("worker=#{worker.nr} spawned pid=#$$") + proc_name "worker[#{worker.nr}]" + Yahns::START.clear + @sev.close + @sev = Yahns::Sigevent.new + worker.user(*@user) if @user + @user = @workers = nil + end + + def spawn_missing_workers + worker_nr = -1 + until (worker_nr += 1) == @worker_processes + @workers.value?(worker_nr) and next + worker = Yahns::Worker.new(worker_nr) + @logger.info("worker=#{worker_nr} spawning...") + if pid = fork + @workers[pid] = worker.atfork_parent + else + after_fork_internal(worker) + run_mp_worker(worker) + end + end + rescue => e + Yahns::Log.exception(@logger, "spawning worker", e) + exit! + end + + # monitors children and receives signals forever + # (or until a termination signal is sent). This handles signals + # one-at-a-time time and we'll happily drop signals in case somebody + # is signalling us too often. + def join + spawn_missing_workers + state = :respawn # :QUIT, :WINCH + proc_name 'master' + @logger.info "master process ready" + daemon_ready + begin + @sev.kgio_wait_readable + @sev.yahns_step + reap_all_workers + case @sig_queue.shift + when *EXIT_SIGS # graceful shutdown (twice for non graceful) + self.listeners = [] + kill_each_worker(:QUIT) + state = :QUIT + when :USR1 # rotate logs + usr1_reopen("master ") + kill_each_worker(:USR1) + when :USR2 # exec binary, stay alive in case something went wrong + reexec + when :WINCH + if @daemon_pipe + state = :WINCH + @logger.info "gracefully stopping all workers" + kill_each_worker(:QUIT) + @worker_processes = 0 + else + @logger.info "SIGWINCH ignored because we're not daemonized" + end + when :TTIN + state = :respawn unless state == :QUIT + @worker_processes += 1 + when :TTOU + @worker_processes -= 1 if @worker_processes > 0 + when :HUP + state = :respawn unless state == :QUIT + if @config.config_file + load_config! + else # exec binary and exit if there's no config file + @logger.info "config_file not present, reexecuting binary" + reexec + end + end while @sig_queue[0] + maintain_worker_count if state == :respawn + rescue => e + Yahns::Log.exception(@logger, "master loop error", e) + end while state != :QUIT || @workers.size > 0 + @logger.info "master complete" + unlink_pid_safe(@pid) if @pid + end + + def fdmap_init_mp + fdmap = fdmap_init # builds apps (if not preloading) + EXIT_SIGS.each { |sig| trap(sig) { sqwakeup(sig) } } + @config = nil + fdmap + end + + def run_mp_worker(worker) + fdmap = fdmap_init_mp + alive = true + begin + alive = mp_sig_handle(worker, alive) + rescue => e + Yahns::Log.exception(@logger, "main worker loop", e) + end while alive || fdmap.size > 0 + exit + end + + def mp_sig_handle(worker, alive) + # not performance critical + r = IO.select([worker, @sev], nil, nil, alive ? nil : 0.01) and + r[0].each { |io| io.yahns_step } + case sig = @sig_queue.shift + when *EXIT_SIGS + self.listeners = [] + exit(0) unless alive # drop connections immediately if signaled twice + @logger.info("received SIG#{sig}, gracefully exiting") + return false + when :USR1 + usr1_reopen("worker ") + end + alive + end +end diff --git a/lib/yahns/sigevent.rb b/lib/yahns/sigevent.rb new file mode 100644 index 0000000..aa95f4b --- /dev/null +++ b/lib/yahns/sigevent.rb @@ -0,0 +1,7 @@ +# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> +# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) +if SleepyPenguin.const_defined?(:EventFD) + require_relative 'sigevent_efd' +else + require_relative 'sigevent_pipe' +end diff --git a/lib/yahns/sigevent_efd.rb b/lib/yahns/sigevent_efd.rb new file mode 100644 index 0000000..8f10ad6 --- /dev/null +++ b/lib/yahns/sigevent_efd.rb @@ -0,0 +1,18 @@ +# -*- encoding: binary -*- +# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> and all contributors +# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) +class Yahns::Sigevent < SleepyPenguin::EventFD # :nodoc: + include Kgio::DefaultWaiters + def self.new + super(0, SleepyPenguin::EventFD::CLOEXEC) + end + + def sev_signal + incr(1) # eventfd_write + end + + def yahns_step + value(true) # eventfd_read, we ignore this data + :wait_readable + end +end diff --git a/lib/yahns/sigevent_pipe.rb b/lib/yahns/sigevent_pipe.rb new file mode 100644 index 0000000..6e1be53 --- /dev/null +++ b/lib/yahns/sigevent_pipe.rb @@ -0,0 +1,29 @@ +# -*- encoding: binary -*- +# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> and all contributors +# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) +class Yahns::Sigevent # :nodoc: + attr_reader :to_io + def initialize + @to_io, @wr = Kgio::Pipe.new + end + + def kgio_wait_readable + @to_io.kgio_wait_readable + end + + def sev_signal + @wr.kgio_write(".") + end + + def yahns_step + # 11 byte strings -> no malloc on YARV + while String === @to_io.kgio_tryread(11) + end + :wait_readable + end + + def close + @to_io.close + @wr.close + end +end diff --git a/lib/yahns/socket_helper.rb b/lib/yahns/socket_helper.rb new file mode 100644 index 0000000..61f2b0f --- /dev/null +++ b/lib/yahns/socket_helper.rb @@ -0,0 +1,117 @@ +# -*- encoding: binary -*- +# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> and all contributors +# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) +# this is only meant for Yahns::Server +module Yahns::SocketHelper # :nodoc: + def set_server_sockopt(sock, opt) + opt = {backlog: 1024}.merge!(opt) if opt + + TCPSocket === sock and sock.setsockopt(:IPPROTO_TCP, :TCP_NODELAY, 1) + sock.setsockopt(:SOL_SOCKET, :SO_KEEPALIVE, 1) + + if opt[:rcvbuf] || opt[:sndbuf] + log_buffer_sizes(sock, "before: ") + { SO_RCVBUF: :rcvbuf, SO_SNDBUF: :sndbuf }.each do |optname,cfgname| + val = opt[cfgname] and sock.setsockopt(:SOL_SOCKET, optname, val) + end + log_buffer_sizes(sock, " after: ") + end + sock.listen(opt[:backlog]) + rescue => e + Yahns::Log.exception(@logger, "#{sock_name(sock)} #{opt.inspect}", e) + end + + def log_buffer_sizes(sock, pfx = '') + rcvbuf = sock.getsockopt(:SOL_SOCKET, :SO_RCVBUF).int + sndbuf = sock.getsockopt(:SOL_SOCKET, :SO_SNDBUF).int + @logger.info("#{pfx}#{sock_name(sock)} rcvbuf=#{rcvbuf} sndbuf=#{sndbuf}") + end + + # creates a new server, socket. address may be a HOST:PORT or + # an absolute path to a UNIX socket. address can even be a Socket + # object in which case it is immediately returned + def bind_listen(address, opt) + return address unless String === address + opt ||= {} + + sock = if address[0] == ?/ + if File.exist?(address) + if File.socket?(address) + begin + UNIXSocket.new(address).close + # fall through, try to bind(2) and fail with EADDRINUSE + # (or succeed from a small race condition we can't sanely avoid). + rescue Errno::ECONNREFUSED + @logger.info "unlinking existing socket=#{address}" + File.unlink(address) + end + else + raise ArgumentError, + "socket=#{address} specified but it is not a socket!" + end + end + old_umask = File.umask(opt[:umask] || 0) + begin + Kgio::UNIXServer.new(address) + ensure + File.umask(old_umask) + end + elsif /\A\[([a-fA-F0-9:]+)\]:(\d+)\z/ =~ address + new_ipv6_server($1, $2.to_i, opt) + elsif /\A(\d+\.\d+\.\d+\.\d+):(\d+)\z/ =~ address + Kgio::TCPServer.new($1, $2.to_i) + else + raise ArgumentError, "Don't know how to bind: #{address}" + end + set_server_sockopt(sock, opt) + sock + end + + def new_ipv6_server(addr, port, opt) + opt.key?(:ipv6only) or return Kgio::TCPServer.new(addr, port) + sock = Socket.new(:AF_INET6, :SOCK_STREAM, 0) + sock.setsockopt(:IPPROTO_IPV6, :IPV6_V6ONLY, opt[:ipv6only] ? 1 : 0) + sock.setsockopt(:SOL_SOCKET, :SO_REUSEADDR, 1) + sock.bind(Socket.pack_sockaddr_in(port, addr)) + sock.autoclose = false + Kgio::TCPServer.for_fd(sock.fileno) + end + + # returns rfc2732-style (e.g. "[::1]:666") addresses for IPv6 + def tcp_name(sock) + port, addr = Socket.unpack_sockaddr_in(sock.getsockname) + /:/ =~ addr ? "[#{addr}]:#{port}" : "#{addr}:#{port}" + end + + # Returns the configuration name of a socket as a string. sock may + # be a string value, in which case it is returned as-is + # Warning: TCP sockets may not always return the name given to it. + def sock_name(sock) + case sock + when String then sock + when UNIXServer + Socket.unpack_sockaddr_un(sock.getsockname) + when TCPServer + tcp_name(sock) + when Socket + begin + tcp_name(sock) + rescue ArgumentError + Socket.unpack_sockaddr_un(sock.getsockname) + end + else + raise ArgumentError, "Unhandled class #{sock.class}: #{sock.inspect}" + end + end + + # casts a given Socket to be a TCPServer or UNIXServer + def server_cast(sock) + sock.autoclose = false + begin + Socket.unpack_sockaddr_in(sock.getsockname) + Kgio::TCPServer.for_fd(sock.fileno) + rescue ArgumentError + Kgio::UNIXServer.for_fd(sock.fileno) + end + end +end diff --git a/lib/yahns/stream_file.rb b/lib/yahns/stream_file.rb new file mode 100644 index 0000000..eba9632 --- /dev/null +++ b/lib/yahns/stream_file.rb @@ -0,0 +1,34 @@ +# -*- encoding: binary -*- +# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> et. al. +# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) +require_relative 'wbuf_common' + +class Yahns::StreamFile # :nodoc: + include Yahns::WbufCommon + + def initialize(body, persist, offset, count) + if body.respond_to?(:to_io) + @tmpio = body.to_io + else + path = body.to_path + if path =~ %r{\A/dev/fd/(\d+)\z} + @tmpio = IO.for_fd($1.to_i) + @tmpio.autoclose = false + else + @tmpio = File.open(path) + end + end + @sf_offset = offset + @sf_count = count || @tmpio.stat.size + @wbuf_persist = persist # whether or not we keep the connection alive + @body = body + end + + # called by last wbuf_flush + def wbuf_close(client) + if File === @tmpio && @tmpio != @body + @tmpio.close + end + wbuf_close_common(client) + end +end diff --git a/lib/yahns/stream_input.rb b/lib/yahns/stream_input.rb new file mode 100644 index 0000000..f0a43b3 --- /dev/null +++ b/lib/yahns/stream_input.rb @@ -0,0 +1,150 @@ +# -*- encoding: binary -*- +# Copyright (C) 2009-2013, Eric Wong <normalperson@yhbt.net> et. al. +# License: GPLv2 or later (https://www.gnu.org/licenses/gpl-2.0.txt) + +# When processing uploads, Yahns may expose a StreamInput object under +# "rack.input" of the (future) Rack (2.x) environment. +class Yahns::StreamInput # :nodoc: + # Initializes a new StreamInput object. You normally do not have to call + # this unless you are writing an HTTP server. + def initialize(client, request) + @chunked = request.content_length.nil? + @client = client + @parser = request + @buf = request.buf + @rbuf = '' + @bytes_read = 0 + filter_body(@rbuf, @buf) unless @buf.empty? + end + + # :call-seq: + # ios.read([length [, buffer ]]) => string, buffer, or nil + # + # Reads at most length bytes from the I/O stream, or to the end of + # file if length is omitted or is nil. length must be a non-negative + # integer or nil. If the optional buffer argument is present, it + # must reference a String, which will receive the data. + # + # At end of file, it returns nil or '' depend on length. + # ios.read() and ios.read(nil) returns ''. + # ios.read(length [, buffer]) returns nil. + # + # If the Content-Length of the HTTP request is known (as is the common + # case for POST requests), then ios.read(length [, buffer]) will block + # until the specified length is read (or it is the last chunk). + # Otherwise, for uncommon "Transfer-Encoding: chunked" requests, + # ios.read(length [, buffer]) will return immediately if there is + # any data and only block when nothing is available (providing + # IO#readpartial semantics). + def read(length = nil, rv = '') + if length + if length <= @rbuf.size + length < 0 and raise ArgumentError, "negative length #{length} given" + rv.replace(@rbuf.slice!(0, length)) + else + to_read = length - @rbuf.size + rv.replace(@rbuf.slice!(0, @rbuf.size)) + until to_read == 0 || eof? || (rv.size > 0 && @chunked) + @client.kgio_read(to_read, @buf) or eof! + filter_body(@rbuf, @buf) + rv << @rbuf + to_read -= @rbuf.size + end + @rbuf.replace('') + end + rv = nil if rv.empty? && length != 0 + else + read_all(rv) + end + rv + end + + def __rsize + @client.class.client_body_buffer_size + end + + # :call-seq: + # ios.gets => string or nil + # + # Reads the next ``line'' from the I/O stream; lines are separated + # by the global record separator ($/, typically "\n"). A global + # record separator of nil reads the entire unread contents of ios. + # Returns nil if called at the end of file. + # This takes zero arguments for strict Rack::Lint compatibility, + # unlike IO#gets. + def gets + sep = $/ + if sep.nil? + read_all(rv = '') + return rv.empty? ? nil : rv + end + re = /\A(.*?#{Regexp.escape(sep)})/ + rsize = __rsize + begin + @rbuf.sub!(re, '') and return $1 + return @rbuf.empty? ? nil : @rbuf.slice!(0, @rbuf.size) if eof? + @client.kgio_read(rsize, @buf) or eof! + filter_body(once = '', @buf) + @rbuf << once + end while true + end + + # :call-seq: + # ios.each { |line| block } => ios + # + # Executes the block for every ``line'' in *ios*, where lines are + # separated by the global record separator ($/, typically "\n"). + def each + while line = gets + yield line + end + + self # Rack does not specify what the return value is here + end + + def eof? + if @parser.body_eof? + rsize = __rsize + while @chunked && ! @parser.parse + once = @client.kgio_read(rsize) or eof! + @buf << once + end + @client = nil + true + else + false + end + end + + def filter_body(dst, src) + rv = @parser.filter_body(dst, src) + @bytes_read += dst.size + rv + end + + def read_all(dst) + dst.replace(@rbuf) + @client or return + rsize = @client.class.client_body_buffer_size + until eof? + @client.kgio_read(rsize, @buf) or eof! + filter_body(@rbuf, @buf) + dst << @rbuf + end + ensure + @rbuf.replace('') + end + + def eof! + # in case client only did a premature shutdown(SHUT_WR) + # we do support clients that shutdown(SHUT_WR) after the + # _entire_ request has been sent, and those will not have + # raised EOFError on us. + @client.shutdown if @client + ensure + raise Yahns::ClientShutdown, "bytes_read=#{@bytes_read}", [] + end + + def discard # return nil + end +end diff --git a/lib/yahns/tee_input.rb b/lib/yahns/tee_input.rb new file mode 100644 index 0000000..0d91a89 --- /dev/null +++ b/lib/yahns/tee_input.rb @@ -0,0 +1,114 @@ +# -*- encoding: binary -*- +# Copyright (C) 2009-2013, Eric Wong <normalperson@yhbt.net> et. al. +# License: GPLv2 or later (https://www.gnu.org/licenses/gpl-2.0.txt) + +# acts like tee(1) on an input input to provide a input-like stream +# while providing rewindable semantics through a File/StringIO backing +# store. On the first pass, the input is only read on demand so your +# Rack application can use input notification (upload progress and +# like). This should fully conform to the Rack::Lint::InputWrapper +# specification on the public API. This class is intended to be a +# strict interpretation of Rack::Lint::InputWrapper functionality and +# will not support any deviations from it. +# +# When processing uploads, Yahns exposes a TeeInput object under +# "rack.input" of the Rack environment. +class Yahns::TeeInput < Yahns::StreamInput # :nodoc: + # Initializes a new TeeInput object. You normally do not have to call + # this unless you are writing an HTTP server. + def initialize(client, request) + @len = request.content_length + super + @tmp = client.class.tmpio_for(@len) + end + + # :call-seq: + # ios.size => Integer + # + # Returns the size of the input. For requests with a Content-Length + # header value, this will not read data off the socket and just return + # the value of the Content-Length header as an Integer. + # + # For Transfer-Encoding:chunked requests, this requires consuming + # all of the input stream before returning since there's no other + # way to determine the size of the request body beforehand. + # + # This method is no longer part of the Rack specification as of + # Rack 1.2, so its use is not recommended. This method only exists + # for compatibility with Rack applications designed for Rack 1.1 and + # earlier. Most applications should only need to call +read+ with a + # specified +length+ in a loop until it returns +nil+. + def size + @len and return @len + pos = @tmp.pos + consume! + @tmp.pos = pos + @len = @tmp.size + end + + # :call-seq: + # ios.read([length [, buffer ]]) => string, buffer, or nil + # + # Reads at most length bytes from the I/O stream, or to the end of + # file if length is omitted or is nil. length must be a non-negative + # integer or nil. If the optional buffer argument is present, it + # must reference a String, which will receive the data. + # + # At end of file, it returns nil or "" depend on length. + # ios.read() and ios.read(nil) returns "". + # ios.read(length [, buffer]) returns nil. + # + # If the Content-Length of the HTTP request is known (as is the common + # case for POST requests), then ios.read(length [, buffer]) will block + # until the specified length is read (or it is the last chunk). + # Otherwise, for uncommon "Transfer-Encoding: chunked" requests, + # ios.read(length [, buffer]) will return immediately if there is + # any data and only block when nothing is available (providing + # IO#readpartial semantics). + def read(*args) + @client ? tee(super) : @tmp.read(*args) + end + + # :call-seq: + # ios.gets => string or nil + # + # Reads the next ``line'' from the I/O stream; lines are separated + # by the global record separator ($/, typically "\n"). A global + # record separator of nil reads the entire unread contents of ios. + # Returns nil if called at the end of file. + # This takes zero arguments for strict Rack::Lint compatibility, + # unlike IO#gets. + def gets + @client ? tee(super) : @tmp.gets + end + + # :call-seq: + # ios.rewind => 0 + # + # Positions the *ios* pointer to the beginning of input, returns + # the offset (zero) of the +ios+ pointer. Subsequent reads will + # start from the beginning of the previously-buffered input. + def rewind + return 0 if 0 == @tmp.size + consume! if @client + @tmp.rewind # Rack does not specify what the return value is here + end + + # consumes the stream of the socket + def consume! + junk = "" + rsize = __rsize + nil while read(rsize, junk) + end + + def tee(buffer) + if buffer && buffer.size > 0 + @tmp.write(buffer) + end + buffer + end + + def discard + @tmp = @tmp.close + end +end diff --git a/lib/yahns/tiny_input.rb b/lib/yahns/tiny_input.rb new file mode 100644 index 0000000..55bdd03 --- /dev/null +++ b/lib/yahns/tiny_input.rb @@ -0,0 +1,7 @@ +# -*- encoding: binary -*- +# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> et. al. +# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) +class Yahns::TinyInput < StringIO # :nodoc: + def discard # just returns nil + end +end diff --git a/lib/yahns/tmpio.rb b/lib/yahns/tmpio.rb new file mode 100644 index 0000000..60751c0 --- /dev/null +++ b/lib/yahns/tmpio.rb @@ -0,0 +1,27 @@ +# -*- encoding: binary -*- +# Copyright (C) 2009-2013, Eric Wong <normalperson@yhbt.net> et. al. +# License: GPLv2 or later (https://www.gnu.org/licenses/gpl-2.0.txt) +require 'tmpdir' + +# some versions of Ruby had a broken Tempfile which didn't work +# well with unlinked files. This one is much shorter, easier +# to understand, and slightly faster (no delegation). +class Yahns::TmpIO < File # :nodoc: + + # creates and returns a new File object. The File is unlinked + # immediately, switched to binary mode, and userspace output + # buffering is disabled + def self.new + fp = begin + super("#{Dir.tmpdir}/#{rand}", RDWR|CREAT|EXCL, 0600) + rescue Errno::EEXIST + retry + end + unlink(fp.path) + fp.binmode + fp.sync = true + fp + end + + alias discard close +end diff --git a/lib/yahns/wbuf.rb b/lib/yahns/wbuf.rb new file mode 100644 index 0000000..4828056 --- /dev/null +++ b/lib/yahns/wbuf.rb @@ -0,0 +1,36 @@ +# -*- encoding: binary -*- +# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> et. al. +# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) +require_relative 'wbuf_common' + +class Yahns::Wbuf # :nodoc: + include Yahns::WbufCommon + + def initialize(body, persist) + @tmpio = Yahns::TmpIO.new + @sf_offset = @sf_count = 0 + @wbuf_persist = persist # whether or not we keep the connection alive + @body = body + end + + def wbuf_write(client, buf) + @sf_count += @tmpio.write(buf) + case rv = client.trysendfile(@tmpio, @sf_offset, @sf_count) + when Integer + @sf_count -= rv + @sf_offset += rv + when :wait_writable, :wait_readable + return rv + else + raise "BUG: #{rv.nil ? "EOF" : rv.inspect} on tmpio " \ + "sf_offset=#@sf_offset sf_count=#@sf_count" + end while @sf_count > 0 + nil + end + + # called by last wbuf_flush + def wbuf_close(client) + @tmpio = @tmpio.close + wbuf_close_common(client) + end +end diff --git a/lib/yahns/wbuf_common.rb b/lib/yahns/wbuf_common.rb new file mode 100644 index 0000000..e621311 --- /dev/null +++ b/lib/yahns/wbuf_common.rb @@ -0,0 +1,32 @@ +# -*- encoding: binary -*- +# Copyright (C) 2009-2013, Eric Wong <normalperson@yhbt.net> et. al. +# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) +require 'sendfile' +module Yahns::WbufCommon # :nodoc: + # returns nil on success, :wait_*able when blocked + # currently, we rely on each thread having exclusive access to the + # client socket, so this is never called concurrently with wbuf_write + def wbuf_flush(client) + case rv = client.trysendfile(@tmpio, @sf_offset, @sf_count) + when Integer + return wbuf_close(client) if (@sf_count -= rv) == 0 # all sent! + + @sf_offset += rv # keep going otherwise + when :wait_writable, :wait_readable + return rv + else + raise "BUG: #{rv.nil? ? "EOF" : rv.inspect} on tmpio=#{@tmpio.inspect} " \ + "sf_offset=#@sf_offset sf_count=#@sf_count" + end while true + end + + def wbuf_close_common(client) + @body.close if @body.respond_to?(:close) + if @wbuf_persist.respond_to?(:call) # hijack + @wbuf_persist.call(client) + :delete + else + @wbuf_persist # true or false or Yahns::StreamFile + end + end +end diff --git a/lib/yahns/worker.rb b/lib/yahns/worker.rb new file mode 100644 index 0000000..980f7bd --- /dev/null +++ b/lib/yahns/worker.rb @@ -0,0 +1,58 @@ +# -*- encoding: binary -*- +# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> and all contributors +# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) +class Yahns::Worker # :nodoc: + attr_accessor :nr + attr_reader :to_io + + def initialize(nr) + @nr = nr + @to_io, @wr = Kgio::Pipe.new + end + + def atfork_child + @wr = @wr.close # nil @wr to save space in worker process + end + + def atfork_parent + @to_io = @to_io.close + self + end + + # used in the worker process. + # This causes the worker to gracefully exit if the master + # dies unexpectedly. + def yahns_step + @to_io.kgio_tryread(11) == nil and Process.kill(:QUIT, $$) + :wait_readable + end + + # worker objects may be compared to just plain Integers + def ==(other_nr) # :nodoc: + @nr == other_nr + end + + # Changes the worker process to the specified +user+ and +group+ + # This is only intended to be called from within the worker + # process from the +after_fork+ hook. This should be called in + # the +after_fork+ hook after any privileged functions need to be + # run (e.g. to set per-worker CPU affinity, niceness, etc) + # + # Any and all errors raised within this method will be propagated + # directly back to the caller (usually the +after_fork+ hook. + # These errors commonly include ArgumentError for specifying an + # invalid user/group and Errno::EPERM for insufficient privileges + def user(user, group = nil) + # we do not protect the caller, checking Process.euid == 0 is + # insufficient because modern systems have fine-grained + # capabilities. Let the caller handle any and all errors. + uid = Etc.getpwnam(user).uid + gid = Etc.getgrnam(group).gid if group + Yahns::Log.chown_all(uid, gid) + if gid && Process.egid != gid + Process.initgroups(user, gid) + Process::GID.change_privilege(gid) + end + Process.euid != uid and Process::UID.change_privilege(uid) + end +end |