diff options
Diffstat (limited to 'lib/unicorn.rb')
-rw-r--r-- | lib/unicorn.rb | 394 |
1 files changed, 269 insertions, 125 deletions
diff --git a/lib/unicorn.rb b/lib/unicorn.rb index dc0b339..9c6aab7 100644 --- a/lib/unicorn.rb +++ b/lib/unicorn.rb @@ -1,15 +1,4 @@ -# Standard libraries -require 'socket' -require 'tempfile' -require 'time' -require 'uri' -require 'stringio' -require 'fcntl' require 'logger' -require 'io/nonblock' - -# Compiled extension -require 'http11' require 'unicorn/socket' require 'unicorn/const' @@ -21,70 +10,247 @@ require 'unicorn/http_response' # functionality to service web application requests fast as possible. module Unicorn class << self - # A logger instance that conforms to the API of stdlib's Logger. - attr_accessor :logger - def run(app, options = {}) HttpServer.new(app, options).start.join end end - # We do this to be compatible with the existing API - class WorkerTable < Hash - def join - begin - pid = Process.wait - self.delete(pid) - rescue Errno::ECHLD - return - end - end - end - - # This is the main driver of Unicorn, while the Unicorn::HttpParser - # and make up the majority of how the server functions. It forks off - # :nr_workers and has the workers accepting connections on a shared - # socket and a simple HttpServer.process_client function to - # do the heavy lifting with the IO and Ruby. + # This is the process manager of Unicorn. This manages worker + # processes which in turn handle the I/O and application process. + # Listener sockets are started in the master process and shared with + # forked worker children. class HttpServer - attr_reader :workers, :logger, :listeners, :timeout, :nr_workers - + attr_reader :logger + include Process + include ::Unicorn::SocketHelper + + DEFAULT_START_CTX = { + :argv => ARGV.map { |arg| arg.dup }, + :cwd => (ENV['PWD'] || Dir.pwd), + :zero => $0.dup, + :environ => {}.merge!(ENV), + :umask => File.umask, + }.freeze + DEFAULTS = { :timeout => 60, :listeners => %w(0.0.0.0:8080), :logger => Logger.new(STDERR), - :nr_workers => 1 + :nr_workers => 1, + :after_fork => lambda { |server, worker_nr| + server.logger.info("worker=#{worker_nr} spawned pid=#{$$}") + }, + :before_fork => lambda { |server, worker_nr| + server.logger.info("worker=#{worker_nr} spawning...") + }, } - # Creates a working server on host:port (strange things happen if # port isn't a Number). Use HttpServer::run to start the server and # HttpServer.workers.join to join the thread that's processing # incoming requests on the socket. def initialize(app, options = {}) - @app = app - @workers = WorkerTable.new - (DEFAULTS.to_a + options.to_a).each do |key, value| instance_variable_set("@#{key.to_s.downcase}", value) end - @listeners.map! { |address| Socket.unicorn_server_new(address, 1024) } + @app = app + @mode = :idle + @master_pid = $$ + @workers = Hash.new + @request = HttpRequest.new(logger) # shared between all worker processes + @start_ctx = DEFAULT_START_CTX.dup + @start_ctx.merge!(options[:start_ctx]) if options[:start_ctx] + @purgatory = [] # prevents objects in here from being GC-ed + end + + # Runs the thing. Returns self so you can run join on it + def start + BasicSocket.do_not_reverse_lookup = true + + # inherit sockets from parents, they need to be plain Socket objects + # before they become UNIXServer or TCPServer + inherited = ENV['UNICORN_FD'].to_s.split(/,/).map do |fd| + io = Socket.for_fd(fd.to_i) + set_server_sockopt(io) + logger.info "inherited: #{io} fd=#{fd} addr=#{sock_name(io)}" + io + end + + # avoid binding inherited sockets, probably not perfect for TCPSockets + # but it works for UNIXSockets + @listeners -= inherited.map { |io| sock_name(io) } + + # try binding new listeners + @listeners.map! do |addr| + if sock = bind_listen(addr, 1024) + sock + elsif inherited.empty? || addr[0..0] == "/" + raise Errno::EADDRINUSE, "couldn't bind #{addr}" + else + logger.info "couldn't bind #{addr}, inherited?" + nil + end + end + @listeners += inherited + @listeners.compact! + @listeners.empty? and raise ArgumentError, 'No listener sockets' + + # we start out with generic Socket objects that get cast to either + # TCPServer or UNIXServer objects; but since the Socket objects + # share the same OS-level file descriptor as the higher-level *Server + # objects; we need to prevent Socket objects from being garbage-collected + @purgatory += @listeners + @listeners.map! { |io| server_cast(io) } + @listeners.each do |io| + logger.info "#{io} listening on fd=#{io.fileno} addr=#{sock_name(io)}" + end + spawn_missing_workers + self + end + + # monitors children and receives signals forever + # (or until a termination signal is sent) + def join + %w(QUIT INT TERM USR1 USR2 HUP).each { |sig| trap_deferred(sig) } + begin + loop do + reap_all_workers + case @mode + when :idle + kill_each_worker(0) # ensure they're running + spawn_missing_workers + when 'QUIT' # graceful shutdown + break + when 'TERM', 'INT' # immediate shutdown + stop(false) + break + when 'USR1' # user-defined (probably something like log reopening) + kill_each_worker('USR1') + @mode = :idle + trap_deferred('USR1') + when 'USR2' # exec binary, stay alive in case something went wrong + reexec + @mode = :idle + trap_deferred('USR2') + when 'HUP' # exec binary and exit + reexec + break + else + logger.error "master process in unknown mode: #{@mode}, resetting" + @mode = :idle + end + reap_all_workers + sleep 1 + end + rescue Errno::EINTR + retry + rescue Object => e + logger.error "Unhandled master loop exception #{e.inspect}." + logger.error e.backtrace.join("\n") + sleep 1 rescue nil + retry + end + stop # gracefully shutdown all workers on our way out + logger.info "master pid=#{$$} exit" + end + + # Terminates all workers, but does not exit master process + def stop(graceful = true) + kill_each_worker(graceful ? 'QUIT' : 'TERM') + timeleft = @timeout + step = 0.2 + reap_all_workers + until @workers.empty? + sleep(step) + reap_all_workers + (timeleft -= step) > 0 and next + kill_each_worker('KILL') + end + ensure + @listeners.each { |sock| sock.close rescue nil } + @listeners.clear + end + + private + + # defer a signal for later processing + def trap_deferred(signal) + trap(signal) do |sig_nr| + trap(signal, 'IGNORE') # prevent double signalling + @mode = signal if Symbol === @mode + end end - def process_client(client) + # reaps all unreaped workers + def reap_all_workers + begin + loop do + pid = waitpid(-1, WNOHANG) or break + worker_nr = @workers.delete(pid) + logger.info "reaped pid=#{pid} worker=#{worker_nr || 'unknown'} " \ + "status=#{$?.exitstatus}" + end + rescue Errno::ECHILD + end + end + + # Forks, sets current environment, sets the umask, chdirs to the desired + # start directory, and execs the command line originally passed to us to + # start Unicorn. + # Returns the pid of the forked process + def spawn_start_ctx(check = nil) + fork do + ENV.replace(@start_ctx[:environ]) + ENV['UNICORN_FD'] = @listeners.map { |sock| sock.fileno }.join(',') + File.umask(@start_ctx[:umask]) + Dir.chdir(@start_ctx[:cwd]) + cmd = [ @start_ctx[:zero] ] + @start_ctx[:argv] + cmd << 'check' if check + logger.info "executing #{cmd.inspect}" + exec *cmd + end + end + + # ensures @start_ctx is reusable for re-execution + def check_reexec + pid = waitpid(spawn_start_ctx(:check)) + $?.success? and return true + logger.error "exec check failed with #{$?.exitstatus}" + end + + # reexecutes the @start_ctx with a new binary + def reexec + check_reexec or return false + pid = spawn_start_ctx + if waitpid(pid, WNOHANG) + logger.error "rexec pid=#{pid} died with #{$?.exitstatus}" + end + end + + def spawn_missing_workers + return if @workers.size == @nr_workers + (0...@nr_workers).each do |worker_nr| + @workers.values.include?(worker_nr) and next + @before_fork.call(self, worker_nr) + pid = fork { worker_loop(worker_nr) } + @workers[pid] = worker_nr + end + end + + # once a client is accepted, it is processed in its entirety here + # in 3 easy steps: read request, call app, write app response + def process_client(client, client_nr) env = @request.read(client) or return app_response = @app.call(env) HttpResponse.write(client, app_response) rescue EOFError,Errno::ECONNRESET,Errno::EPIPE,Errno::EINVAL,Errno::EBADF - client.close rescue nil + client.closed? or client.close rescue nil rescue Object => e logger.error "Read error: #{e.inspect}" logger.error e.backtrace.join("\n") ensure begin - client.close - rescue IOError - # Already closed + client.closed? or client.close rescue Object => e logger.error "Client error: #{e.inspect}" logger.error e.backtrace.join("\n") @@ -92,75 +258,73 @@ module Unicorn @request.reset end - # Runs the thing. Returns a hash keyed by pid with worker number values - # for which to wait on. Access the HttpServer.workers attribute - # to get this hash later. - def start - BasicSocket.do_not_reverse_lookup = true - @listeners.each do |sock| - sock.unicorn_server_init if sock.respond_to?(:unicorn_server_init) - end - - (1..@nr_workers).each do |worker_nr| - pid = fork do - nr = 0 - alive = true - listeners = @listeners - @request = HttpRequest.new(logger) - trap('TERM') { exit 0 } - trap('QUIT') do - alive = false - @listeners.each { |sock| sock.close rescue nil } - end + # runs inside each forked worker, this sits around and waits + # for connections and doesn't die until the parent dies + def worker_loop(worker_nr) + # allow @after_fork to override these signals: + %w(USR1 USR2 HUP).each { |sig| trap(sig, 'IGNORE') } + @after_fork.call(self, worker_nr) if @after_fork - while alive - begin - nr_before = nr - listeners.each do |sock| - begin - client, addr = begin - sock.accept_nonblock - rescue Errno::EAGAIN - next - end - nr += 1 - client.unicorn_client_init - process_client(client) - rescue Errno::ECONNABORTED - # client closed the socket even before accept - client.close rescue nil - end - alive or exit(0) - end + if defined?(Fcntl::FD_CLOEXEC) + @listeners.each { |s| s.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) } + end + nr_before = nr = 0 + client = nil + alive = true + ready = @listeners + %w(TERM INT).each { |sig| trap(sig) { exit(0) } } # instant shutdown + trap('QUIT') do + alive = false + @listeners.each { |sock| sock.close rescue nil } # break IO.select + end - # make the following bet: if we accepted clients this round, - # we're probably reasonably busy, so avoid calling select(2) - # and try to do a blind non-blocking accept(2) on everything - # before we sleep again in select - if nr > nr_before - listeners = @listeners - else - begin - ret = IO.select(@listeners, nil, nil, nil) or next - listeners = ret[0] - rescue Errno::EBADF - exit(alive ? 1 : 0) - end + while alive && @master_pid == ppid + begin + nr_before = nr + ready.each do |sock| + begin + client = begin + sock.accept_nonblock + rescue Errno::EAGAIN + next end - rescue Object => e - if alive - logger.error "Unhandled listen loop exception #{e.inspect}." - logger.error e.backtrace.join("\n") + client.sync = true + client.nonblock = false + set_client_sockopt(client) if client.class == TCPSocket + nr += 1 + process_client(client, nr) + rescue Errno::ECONNABORTED + # client closed the socket even before accept + if client && !client.closed? + client.close rescue nil end end end - exit 0 - end # fork - @workers[pid] = worker_nr + # make the following bet: if we accepted clients this round, + # we're probably reasonably busy, so avoid calling select(2) + # and try to do a blind non-blocking accept(2) on everything + # before we sleep again in select + if nr != nr_before + ready = @listeners + else + begin + # timeout used so we can detect parent death: + ret = IO.select(@listeners, nil, nil, @timeout) or next + ready = ret[0] + rescue Errno::EBADF => e + exit(alive ? 1 : 0) + end + end + rescue SystemExit => e + exit(e.status) + rescue Object => e + if alive + logger.error "Unhandled listen loop exception #{e.inspect}." + logger.error e.backtrace.join("\n") + end + end end - - @workers end # delivers a signal to each worker @@ -174,25 +338,5 @@ module Unicorn end end - # Terminates all workers - def stop(graceful = true) - old_chld_handler = trap('CHLD') do - pid = Process.waitpid(-1, Process::WNOHANG) and @workers.delete(pid) - end - - kill_each_worker(graceful ? 'QUIT' : 'TERM') - - timeleft = @timeout - until @workers.empty? - sleep(1) - (timeleft -= 1) > 0 and next - kill_each_worker('KILL') - end - - ensure - trap('CHLD', old_chld_handler) - @listeners.each { |sock| sock.close rescue nil } - end - end end |