about summary refs log tree commit homepage
path: root/lib/unicorn.rb
diff options
context:
space:
mode:
Diffstat (limited to 'lib/unicorn.rb')
-rw-r--r--lib/unicorn.rb394
1 files changed, 269 insertions, 125 deletions
diff --git a/lib/unicorn.rb b/lib/unicorn.rb
index dc0b339..9c6aab7 100644
--- a/lib/unicorn.rb
+++ b/lib/unicorn.rb
@@ -1,15 +1,4 @@
-# Standard libraries
-require 'socket'
-require 'tempfile'
-require 'time'
-require 'uri'
-require 'stringio'
-require 'fcntl'
 require 'logger'
-require 'io/nonblock'
-
-# Compiled extension
-require 'http11'
 
 require 'unicorn/socket'
 require 'unicorn/const'
@@ -21,70 +10,247 @@ require 'unicorn/http_response'
 # functionality to service web application requests fast as possible.
 module Unicorn
   class << self
-    # A logger instance that conforms to the API of stdlib's Logger.
-    attr_accessor :logger
-    
     def run(app, options = {})
       HttpServer.new(app, options).start.join
     end
   end
 
-  # We do this to be compatible with the existing API
-  class WorkerTable < Hash
-    def join
-      begin
-        pid = Process.wait
-        self.delete(pid)
-      rescue Errno::ECHLD
-        return
-      end
-    end
-  end
-
-  # This is the main driver of Unicorn, while the Unicorn::HttpParser
-  # and make up the majority of how the server functions.  It forks off
-  # :nr_workers and has the workers accepting connections on a shared
-  # socket and a simple HttpServer.process_client function to
-  # do the heavy lifting with the IO and Ruby.
+  # This is the process manager of Unicorn. This manages worker
+  # processes which in turn handle the I/O and application process.
+  # Listener sockets are started in the master process and shared with
+  # forked worker children.
   class HttpServer
-    attr_reader :workers, :logger, :listeners, :timeout, :nr_workers
-    
+    attr_reader :logger
+    include Process
+    include ::Unicorn::SocketHelper
+
+    DEFAULT_START_CTX = {
+      :argv => ARGV.map { |arg| arg.dup },
+      :cwd => (ENV['PWD'] || Dir.pwd),
+      :zero => $0.dup,
+      :environ => {}.merge!(ENV),
+      :umask => File.umask,
+    }.freeze
+
     DEFAULTS = {
       :timeout => 60,
       :listeners => %w(0.0.0.0:8080),
       :logger => Logger.new(STDERR),
-      :nr_workers => 1
+      :nr_workers => 1,
+      :after_fork => lambda { |server, worker_nr|
+          server.logger.info("worker=#{worker_nr} spawned pid=#{$$}")
+        },
+      :before_fork => lambda { |server, worker_nr|
+          server.logger.info("worker=#{worker_nr} spawning...")
+        },
     }
-
     # Creates a working server on host:port (strange things happen if
     # port isn't a Number).  Use HttpServer::run to start the server and
     # HttpServer.workers.join to join the thread that's processing
     # incoming requests on the socket.
     def initialize(app, options = {})
-      @app = app
-      @workers = WorkerTable.new
-
       (DEFAULTS.to_a + options.to_a).each do |key, value|
         instance_variable_set("@#{key.to_s.downcase}", value)
       end
 
-      @listeners.map! { |address| Socket.unicorn_server_new(address, 1024) }
+      @app = app
+      @mode = :idle
+      @master_pid = $$
+      @workers = Hash.new
+      @request = HttpRequest.new(logger) # shared between all worker processes
+      @start_ctx = DEFAULT_START_CTX.dup
+      @start_ctx.merge!(options[:start_ctx]) if options[:start_ctx]
+      @purgatory = [] # prevents objects in here from being GC-ed
+    end
+
+    # Runs the thing.  Returns self so you can run join on it
+    def start
+      BasicSocket.do_not_reverse_lookup = true
+
+      # inherit sockets from parents, they need to be plain Socket objects
+      # before they become UNIXServer or TCPServer
+      inherited = ENV['UNICORN_FD'].to_s.split(/,/).map do |fd|
+        io = Socket.for_fd(fd.to_i)
+        set_server_sockopt(io)
+        logger.info "inherited: #{io} fd=#{fd} addr=#{sock_name(io)}"
+        io
+      end
+
+      # avoid binding inherited sockets, probably not perfect for TCPSockets
+      # but it works for UNIXSockets
+      @listeners -= inherited.map { |io| sock_name(io) }
+
+      # try binding new listeners
+      @listeners.map! do |addr|
+        if sock = bind_listen(addr, 1024)
+          sock
+        elsif inherited.empty? || addr[0..0] == "/"
+          raise Errno::EADDRINUSE, "couldn't bind #{addr}"
+        else
+          logger.info "couldn't bind #{addr}, inherited?"
+          nil
+        end
+      end
+      @listeners += inherited
+      @listeners.compact!
+      @listeners.empty? and raise ArgumentError, 'No listener sockets'
+
+      # we start out with generic Socket objects that get cast to either
+      # TCPServer or UNIXServer objects; but since the Socket objects
+      # share the same OS-level file descriptor as the higher-level *Server
+      # objects; we need to prevent Socket objects from being garbage-collected
+      @purgatory += @listeners
+      @listeners.map! { |io| server_cast(io) }
+      @listeners.each do |io|
+        logger.info "#{io} listening on fd=#{io.fileno} addr=#{sock_name(io)}"
+      end
+      spawn_missing_workers
+      self
+    end
+
+    # monitors children and receives signals forever
+    # (or until a termination signal is sent)
+    def join
+      %w(QUIT INT TERM USR1 USR2 HUP).each { |sig| trap_deferred(sig) }
+      begin
+        loop do
+          reap_all_workers
+          case @mode
+          when :idle
+            kill_each_worker(0) # ensure they're running
+            spawn_missing_workers
+          when 'QUIT' # graceful shutdown
+            break
+          when 'TERM', 'INT' # immediate shutdown
+            stop(false)
+            break
+          when 'USR1' # user-defined (probably something like log reopening)
+            kill_each_worker('USR1')
+            @mode = :idle
+            trap_deferred('USR1')
+          when 'USR2' # exec binary, stay alive in case something went wrong
+            reexec
+            @mode = :idle
+            trap_deferred('USR2')
+          when 'HUP' # exec binary and exit
+            reexec
+            break
+          else
+            logger.error "master process in unknown mode: #{@mode}, resetting"
+            @mode = :idle
+          end
+          reap_all_workers
+          sleep 1
+        end
+      rescue Errno::EINTR
+        retry
+      rescue Object => e
+        logger.error "Unhandled master loop exception #{e.inspect}."
+        logger.error e.backtrace.join("\n")
+        sleep 1 rescue nil
+        retry
+      end
+      stop # gracefully shutdown all workers on our way out
+      logger.info "master pid=#{$$} exit"
+    end
+
+    # Terminates all workers, but does not exit master process
+    def stop(graceful = true)
+      kill_each_worker(graceful ? 'QUIT' : 'TERM')
+      timeleft = @timeout
+      step = 0.2
+      reap_all_workers
+      until @workers.empty?
+        sleep(step)
+        reap_all_workers
+        (timeleft -= step) > 0 and next
+        kill_each_worker('KILL')
+      end
+    ensure
+      @listeners.each { |sock| sock.close rescue nil }
+      @listeners.clear
+    end
+
+    private
+
+    # defer a signal for later processing
+    def trap_deferred(signal)
+      trap(signal) do |sig_nr|
+        trap(signal, 'IGNORE') # prevent double signalling
+        @mode = signal if Symbol === @mode
+      end
     end
 
-    def process_client(client)
+    # reaps all unreaped workers
+    def reap_all_workers
+      begin
+        loop do
+          pid = waitpid(-1, WNOHANG) or break
+          worker_nr = @workers.delete(pid)
+          logger.info "reaped pid=#{pid} worker=#{worker_nr || 'unknown'} " \
+                      "status=#{$?.exitstatus}"
+        end
+      rescue Errno::ECHILD
+      end
+    end
+
+    # Forks, sets current environment, sets the umask, chdirs to the desired
+    # start directory, and execs the command line originally passed to us to
+    # start Unicorn.
+    # Returns the pid of the forked process
+    def spawn_start_ctx(check = nil)
+      fork do
+        ENV.replace(@start_ctx[:environ])
+        ENV['UNICORN_FD'] = @listeners.map { |sock| sock.fileno }.join(',')
+        File.umask(@start_ctx[:umask])
+        Dir.chdir(@start_ctx[:cwd])
+        cmd = [ @start_ctx[:zero] ] + @start_ctx[:argv]
+        cmd << 'check' if check
+        logger.info "executing #{cmd.inspect}"
+        exec *cmd
+      end
+    end
+
+    # ensures @start_ctx is reusable for re-execution
+    def check_reexec
+      pid = waitpid(spawn_start_ctx(:check))
+      $?.success? and return true
+      logger.error "exec check failed with #{$?.exitstatus}"
+    end
+
+    # reexecutes the @start_ctx with a new binary
+    def reexec
+      check_reexec or return false
+      pid = spawn_start_ctx
+      if waitpid(pid, WNOHANG)
+        logger.error "rexec pid=#{pid} died with #{$?.exitstatus}"
+      end
+    end
+
+    def spawn_missing_workers
+      return if @workers.size == @nr_workers
+      (0...@nr_workers).each do |worker_nr|
+        @workers.values.include?(worker_nr) and next
+        @before_fork.call(self, worker_nr)
+        pid = fork { worker_loop(worker_nr) }
+        @workers[pid] = worker_nr
+      end
+    end
+
+    # once a client is accepted, it is processed in its entirety here
+    # in 3 easy steps: read request, call app, write app response
+    def process_client(client, client_nr)
       env = @request.read(client) or return
       app_response = @app.call(env)
       HttpResponse.write(client, app_response)
     rescue EOFError,Errno::ECONNRESET,Errno::EPIPE,Errno::EINVAL,Errno::EBADF
-      client.close rescue nil
+      client.closed? or client.close rescue nil
     rescue Object => e
       logger.error "Read error: #{e.inspect}"
       logger.error e.backtrace.join("\n")
     ensure
       begin
-        client.close
-      rescue IOError
-        # Already closed
+        client.closed? or client.close
       rescue Object => e
         logger.error "Client error: #{e.inspect}"
         logger.error e.backtrace.join("\n")
@@ -92,75 +258,73 @@ module Unicorn
       @request.reset
     end
 
-    # Runs the thing.  Returns a hash keyed by pid with worker number values
-    # for which to wait on.  Access the HttpServer.workers attribute
-    # to get this hash later.
-    def start
-      BasicSocket.do_not_reverse_lookup = true
-      @listeners.each do |sock|
-        sock.unicorn_server_init if sock.respond_to?(:unicorn_server_init)
-      end
-
-      (1..@nr_workers).each do |worker_nr|
-        pid = fork do
-          nr = 0
-          alive = true
-          listeners = @listeners
-          @request = HttpRequest.new(logger)
-          trap('TERM') { exit 0 }
-          trap('QUIT') do
-            alive = false
-            @listeners.each { |sock| sock.close rescue nil }
-          end
+    # runs inside each forked worker, this sits around and waits
+    # for connections and doesn't die until the parent dies
+    def worker_loop(worker_nr)
+      # allow @after_fork to override these signals:
+      %w(USR1 USR2 HUP).each { |sig| trap(sig, 'IGNORE') }
+      @after_fork.call(self, worker_nr) if @after_fork
 
-          while alive
-            begin
-              nr_before = nr
-              listeners.each do |sock|
-                begin
-                  client, addr = begin
-                    sock.accept_nonblock
-                  rescue Errno::EAGAIN
-                    next
-                  end
-                  nr += 1
-                  client.unicorn_client_init
-                  process_client(client)
-                rescue Errno::ECONNABORTED
-                  # client closed the socket even before accept
-                  client.close rescue nil
-                end
-                alive or exit(0)
-              end
+      if defined?(Fcntl::FD_CLOEXEC)
+        @listeners.each { |s| s.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) }
+      end
+      nr_before = nr = 0
+      client = nil
+      alive = true
+      ready = @listeners
+      %w(TERM INT).each { |sig| trap(sig) { exit(0) } } # instant shutdown
+      trap('QUIT') do
+        alive = false
+        @listeners.each { |sock| sock.close rescue nil } # break IO.select
+      end
 
-              # make the following bet: if we accepted clients this round,
-              # we're probably reasonably busy, so avoid calling select(2)
-              # and try to do a blind non-blocking accept(2) on everything
-              # before we sleep again in select
-              if nr > nr_before
-                listeners = @listeners
-              else
-                begin
-                  ret = IO.select(@listeners, nil, nil, nil) or next
-                  listeners = ret[0]
-                rescue Errno::EBADF
-                  exit(alive ? 1 : 0)
-                end
+      while alive && @master_pid == ppid
+        begin
+          nr_before = nr
+          ready.each do |sock|
+            begin
+              client = begin
+                sock.accept_nonblock
+              rescue Errno::EAGAIN
+                next
               end
-            rescue Object => e
-              if alive
-                logger.error "Unhandled listen loop exception #{e.inspect}."
-                logger.error e.backtrace.join("\n")
+              client.sync = true
+              client.nonblock = false
+              set_client_sockopt(client) if client.class == TCPSocket
+              nr += 1
+              process_client(client, nr)
+            rescue Errno::ECONNABORTED
+              # client closed the socket even before accept
+              if client && !client.closed?
+                client.close rescue nil
               end
             end
           end
-          exit 0
-        end # fork
 
-        @workers[pid] = worker_nr
+          # make the following bet: if we accepted clients this round,
+          # we're probably reasonably busy, so avoid calling select(2)
+          # and try to do a blind non-blocking accept(2) on everything
+          # before we sleep again in select
+          if nr != nr_before
+            ready = @listeners
+          else
+            begin
+              # timeout used so we can detect parent death:
+              ret = IO.select(@listeners, nil, nil, @timeout) or next
+              ready = ret[0]
+            rescue Errno::EBADF => e
+              exit(alive ? 1 : 0)
+            end
+          end
+        rescue SystemExit => e
+          exit(e.status)
+        rescue Object => e
+          if alive
+            logger.error "Unhandled listen loop exception #{e.inspect}."
+            logger.error e.backtrace.join("\n")
+          end
+        end
       end
-
-      @workers
     end
 
     # delivers a signal to each worker
@@ -174,25 +338,5 @@ module Unicorn
       end
     end
 
-    # Terminates all workers
-    def stop(graceful = true)
-      old_chld_handler = trap('CHLD') do
-        pid = Process.waitpid(-1, Process::WNOHANG) and @workers.delete(pid)
-      end
-
-      kill_each_worker(graceful ? 'QUIT' : 'TERM')
-
-      timeleft = @timeout
-      until @workers.empty?
-        sleep(1)
-        (timeleft -= 1) > 0 and next
-        kill_each_worker('KILL')
-      end
-
-    ensure
-      trap('CHLD', old_chld_handler)
-      @listeners.each { |sock| sock.close rescue nil }
-    end
-
   end
 end