about summary refs log tree commit homepage
path: root/lib/unicorn.rb
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2009-02-09 14:26:34 -0800
committerEric Wong <normalperson@yhbt.net>2009-02-09 19:52:20 -0800
commit101fb9ad1372e97ddf998c7fd677e352719c90e8 (patch)
tree354b0cd16fa1ca81b2f24b843e7bedf2f1eeef97 /lib/unicorn.rb
parent0b9dac5de7ecf8111dd3d9fa621edc759c9c47e3 (diff)
downloadunicorn-101fb9ad1372e97ddf998c7fd677e352719c90e8.tar.gz
Along with worker process management.  This is nginx-style
inplace upgrading (I don't know of another web server that does
this).  Basically we can preserve our opened listen sockets
across entire executable upgrades.

Signals:

  USR2 - Sending USR2 to the master unicorn process will cause
  it to exec a new master and keep the original workers running.
  This is useful to validate that the new code changes took place
  are valid and don't immediately die.  Once the changes are
  validated (manually), you may send QUIT to the original
  master process to have it gracefully exit.

  HUP - Sending this to the master will make it immediately exec
  a new binary and cause the old workers to gracefully exit.
  Use this if you're certain the latest changes to Unicorn (and
  your app) are ready and don't need validating.

Unlike nginx, re-execing a new binary will pick up any and all
configuration changes.  However listener sockets cannot be
removed when exec-ing; only added (for now).

I apologize for making such a big change in one commit, but once
I got the ability to replace the entire codebase while preserving
connections, it was too tempting to continue working.

So I wrote a large chunk of this while hitting
the unicorn-hello-world app with the following loop:

   while curl -vSsfN http://0:8080; do date +%N; done

_Zero_ requests lost across multiple restarts.
Diffstat (limited to 'lib/unicorn.rb')
-rw-r--r--lib/unicorn.rb394
1 files changed, 269 insertions, 125 deletions
diff --git a/lib/unicorn.rb b/lib/unicorn.rb
index dc0b339..9c6aab7 100644
--- a/lib/unicorn.rb
+++ b/lib/unicorn.rb
@@ -1,15 +1,4 @@
-# Standard libraries
-require 'socket'
-require 'tempfile'
-require 'time'
-require 'uri'
-require 'stringio'
-require 'fcntl'
 require 'logger'
-require 'io/nonblock'
-
-# Compiled extension
-require 'http11'
 
 require 'unicorn/socket'
 require 'unicorn/const'
@@ -21,70 +10,247 @@ require 'unicorn/http_response'
 # functionality to service web application requests fast as possible.
 module Unicorn
   class << self
-    # A logger instance that conforms to the API of stdlib's Logger.
-    attr_accessor :logger
-    
     def run(app, options = {})
       HttpServer.new(app, options).start.join
     end
   end
 
-  # We do this to be compatible with the existing API
-  class WorkerTable < Hash
-    def join
-      begin
-        pid = Process.wait
-        self.delete(pid)
-      rescue Errno::ECHLD
-        return
-      end
-    end
-  end
-
-  # This is the main driver of Unicorn, while the Unicorn::HttpParser
-  # and make up the majority of how the server functions.  It forks off
-  # :nr_workers and has the workers accepting connections on a shared
-  # socket and a simple HttpServer.process_client function to
-  # do the heavy lifting with the IO and Ruby.
+  # This is the process manager of Unicorn. This manages worker
+  # processes which in turn handle the I/O and application process.
+  # Listener sockets are started in the master process and shared with
+  # forked worker children.
   class HttpServer
-    attr_reader :workers, :logger, :listeners, :timeout, :nr_workers
-    
+    attr_reader :logger
+    include Process
+    include ::Unicorn::SocketHelper
+
+    DEFAULT_START_CTX = {
+      :argv => ARGV.map { |arg| arg.dup },
+      :cwd => (ENV['PWD'] || Dir.pwd),
+      :zero => $0.dup,
+      :environ => {}.merge!(ENV),
+      :umask => File.umask,
+    }.freeze
+
     DEFAULTS = {
       :timeout => 60,
       :listeners => %w(0.0.0.0:8080),
       :logger => Logger.new(STDERR),
-      :nr_workers => 1
+      :nr_workers => 1,
+      :after_fork => lambda { |server, worker_nr|
+          server.logger.info("worker=#{worker_nr} spawned pid=#{$$}")
+        },
+      :before_fork => lambda { |server, worker_nr|
+          server.logger.info("worker=#{worker_nr} spawning...")
+        },
     }
-
     # Creates a working server on host:port (strange things happen if
     # port isn't a Number).  Use HttpServer::run to start the server and
     # HttpServer.workers.join to join the thread that's processing
     # incoming requests on the socket.
     def initialize(app, options = {})
-      @app = app
-      @workers = WorkerTable.new
-
       (DEFAULTS.to_a + options.to_a).each do |key, value|
         instance_variable_set("@#{key.to_s.downcase}", value)
       end
 
-      @listeners.map! { |address| Socket.unicorn_server_new(address, 1024) }
+      @app = app
+      @mode = :idle
+      @master_pid = $$
+      @workers = Hash.new
+      @request = HttpRequest.new(logger) # shared between all worker processes
+      @start_ctx = DEFAULT_START_CTX.dup
+      @start_ctx.merge!(options[:start_ctx]) if options[:start_ctx]
+      @purgatory = [] # prevents objects in here from being GC-ed
+    end
+
+    # Runs the thing.  Returns self so you can run join on it
+    def start
+      BasicSocket.do_not_reverse_lookup = true
+
+      # inherit sockets from parents, they need to be plain Socket objects
+      # before they become UNIXServer or TCPServer
+      inherited = ENV['UNICORN_FD'].to_s.split(/,/).map do |fd|
+        io = Socket.for_fd(fd.to_i)
+        set_server_sockopt(io)
+        logger.info "inherited: #{io} fd=#{fd} addr=#{sock_name(io)}"
+        io
+      end
+
+      # avoid binding inherited sockets, probably not perfect for TCPSockets
+      # but it works for UNIXSockets
+      @listeners -= inherited.map { |io| sock_name(io) }
+
+      # try binding new listeners
+      @listeners.map! do |addr|
+        if sock = bind_listen(addr, 1024)
+          sock
+        elsif inherited.empty? || addr[0..0] == "/"
+          raise Errno::EADDRINUSE, "couldn't bind #{addr}"
+        else
+          logger.info "couldn't bind #{addr}, inherited?"
+          nil
+        end
+      end
+      @listeners += inherited
+      @listeners.compact!
+      @listeners.empty? and raise ArgumentError, 'No listener sockets'
+
+      # we start out with generic Socket objects that get cast to either
+      # TCPServer or UNIXServer objects; but since the Socket objects
+      # share the same OS-level file descriptor as the higher-level *Server
+      # objects; we need to prevent Socket objects from being garbage-collected
+      @purgatory += @listeners
+      @listeners.map! { |io| server_cast(io) }
+      @listeners.each do |io|
+        logger.info "#{io} listening on fd=#{io.fileno} addr=#{sock_name(io)}"
+      end
+      spawn_missing_workers
+      self
+    end
+
+    # monitors children and receives signals forever
+    # (or until a termination signal is sent)
+    def join
+      %w(QUIT INT TERM USR1 USR2 HUP).each { |sig| trap_deferred(sig) }
+      begin
+        loop do
+          reap_all_workers
+          case @mode
+          when :idle
+            kill_each_worker(0) # ensure they're running
+            spawn_missing_workers
+          when 'QUIT' # graceful shutdown
+            break
+          when 'TERM', 'INT' # immediate shutdown
+            stop(false)
+            break
+          when 'USR1' # user-defined (probably something like log reopening)
+            kill_each_worker('USR1')
+            @mode = :idle
+            trap_deferred('USR1')
+          when 'USR2' # exec binary, stay alive in case something went wrong
+            reexec
+            @mode = :idle
+            trap_deferred('USR2')
+          when 'HUP' # exec binary and exit
+            reexec
+            break
+          else
+            logger.error "master process in unknown mode: #{@mode}, resetting"
+            @mode = :idle
+          end
+          reap_all_workers
+          sleep 1
+        end
+      rescue Errno::EINTR
+        retry
+      rescue Object => e
+        logger.error "Unhandled master loop exception #{e.inspect}."
+        logger.error e.backtrace.join("\n")
+        sleep 1 rescue nil
+        retry
+      end
+      stop # gracefully shutdown all workers on our way out
+      logger.info "master pid=#{$$} exit"
+    end
+
+    # Terminates all workers, but does not exit master process
+    def stop(graceful = true)
+      kill_each_worker(graceful ? 'QUIT' : 'TERM')
+      timeleft = @timeout
+      step = 0.2
+      reap_all_workers
+      until @workers.empty?
+        sleep(step)
+        reap_all_workers
+        (timeleft -= step) > 0 and next
+        kill_each_worker('KILL')
+      end
+    ensure
+      @listeners.each { |sock| sock.close rescue nil }
+      @listeners.clear
+    end
+
+    private
+
+    # defer a signal for later processing
+    def trap_deferred(signal)
+      trap(signal) do |sig_nr|
+        trap(signal, 'IGNORE') # prevent double signalling
+        @mode = signal if Symbol === @mode
+      end
     end
 
-    def process_client(client)
+    # reaps all unreaped workers
+    def reap_all_workers
+      begin
+        loop do
+          pid = waitpid(-1, WNOHANG) or break
+          worker_nr = @workers.delete(pid)
+          logger.info "reaped pid=#{pid} worker=#{worker_nr || 'unknown'} " \
+                      "status=#{$?.exitstatus}"
+        end
+      rescue Errno::ECHILD
+      end
+    end
+
+    # Forks, sets current environment, sets the umask, chdirs to the desired
+    # start directory, and execs the command line originally passed to us to
+    # start Unicorn.
+    # Returns the pid of the forked process
+    def spawn_start_ctx(check = nil)
+      fork do
+        ENV.replace(@start_ctx[:environ])
+        ENV['UNICORN_FD'] = @listeners.map { |sock| sock.fileno }.join(',')
+        File.umask(@start_ctx[:umask])
+        Dir.chdir(@start_ctx[:cwd])
+        cmd = [ @start_ctx[:zero] ] + @start_ctx[:argv]
+        cmd << 'check' if check
+        logger.info "executing #{cmd.inspect}"
+        exec *cmd
+      end
+    end
+
+    # ensures @start_ctx is reusable for re-execution
+    def check_reexec
+      pid = waitpid(spawn_start_ctx(:check))
+      $?.success? and return true
+      logger.error "exec check failed with #{$?.exitstatus}"
+    end
+
+    # reexecutes the @start_ctx with a new binary
+    def reexec
+      check_reexec or return false
+      pid = spawn_start_ctx
+      if waitpid(pid, WNOHANG)
+        logger.error "rexec pid=#{pid} died with #{$?.exitstatus}"
+      end
+    end
+
+    def spawn_missing_workers
+      return if @workers.size == @nr_workers
+      (0...@nr_workers).each do |worker_nr|
+        @workers.values.include?(worker_nr) and next
+        @before_fork.call(self, worker_nr)
+        pid = fork { worker_loop(worker_nr) }
+        @workers[pid] = worker_nr
+      end
+    end
+
+    # once a client is accepted, it is processed in its entirety here
+    # in 3 easy steps: read request, call app, write app response
+    def process_client(client, client_nr)
       env = @request.read(client) or return
       app_response = @app.call(env)
       HttpResponse.write(client, app_response)
     rescue EOFError,Errno::ECONNRESET,Errno::EPIPE,Errno::EINVAL,Errno::EBADF
-      client.close rescue nil
+      client.closed? or client.close rescue nil
     rescue Object => e
       logger.error "Read error: #{e.inspect}"
       logger.error e.backtrace.join("\n")
     ensure
       begin
-        client.close
-      rescue IOError
-        # Already closed
+        client.closed? or client.close
       rescue Object => e
         logger.error "Client error: #{e.inspect}"
         logger.error e.backtrace.join("\n")
@@ -92,75 +258,73 @@ module Unicorn
       @request.reset
     end
 
-    # Runs the thing.  Returns a hash keyed by pid with worker number values
-    # for which to wait on.  Access the HttpServer.workers attribute
-    # to get this hash later.
-    def start
-      BasicSocket.do_not_reverse_lookup = true
-      @listeners.each do |sock|
-        sock.unicorn_server_init if sock.respond_to?(:unicorn_server_init)
-      end
-
-      (1..@nr_workers).each do |worker_nr|
-        pid = fork do
-          nr = 0
-          alive = true
-          listeners = @listeners
-          @request = HttpRequest.new(logger)
-          trap('TERM') { exit 0 }
-          trap('QUIT') do
-            alive = false
-            @listeners.each { |sock| sock.close rescue nil }
-          end
+    # runs inside each forked worker, this sits around and waits
+    # for connections and doesn't die until the parent dies
+    def worker_loop(worker_nr)
+      # allow @after_fork to override these signals:
+      %w(USR1 USR2 HUP).each { |sig| trap(sig, 'IGNORE') }
+      @after_fork.call(self, worker_nr) if @after_fork
 
-          while alive
-            begin
-              nr_before = nr
-              listeners.each do |sock|
-                begin
-                  client, addr = begin
-                    sock.accept_nonblock
-                  rescue Errno::EAGAIN
-                    next
-                  end
-                  nr += 1
-                  client.unicorn_client_init
-                  process_client(client)
-                rescue Errno::ECONNABORTED
-                  # client closed the socket even before accept
-                  client.close rescue nil
-                end
-                alive or exit(0)
-              end
+      if defined?(Fcntl::FD_CLOEXEC)
+        @listeners.each { |s| s.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) }
+      end
+      nr_before = nr = 0
+      client = nil
+      alive = true
+      ready = @listeners
+      %w(TERM INT).each { |sig| trap(sig) { exit(0) } } # instant shutdown
+      trap('QUIT') do
+        alive = false
+        @listeners.each { |sock| sock.close rescue nil } # break IO.select
+      end
 
-              # make the following bet: if we accepted clients this round,
-              # we're probably reasonably busy, so avoid calling select(2)
-              # and try to do a blind non-blocking accept(2) on everything
-              # before we sleep again in select
-              if nr > nr_before
-                listeners = @listeners
-              else
-                begin
-                  ret = IO.select(@listeners, nil, nil, nil) or next
-                  listeners = ret[0]
-                rescue Errno::EBADF
-                  exit(alive ? 1 : 0)
-                end
+      while alive && @master_pid == ppid
+        begin
+          nr_before = nr
+          ready.each do |sock|
+            begin
+              client = begin
+                sock.accept_nonblock
+              rescue Errno::EAGAIN
+                next
               end
-            rescue Object => e
-              if alive
-                logger.error "Unhandled listen loop exception #{e.inspect}."
-                logger.error e.backtrace.join("\n")
+              client.sync = true
+              client.nonblock = false
+              set_client_sockopt(client) if client.class == TCPSocket
+              nr += 1
+              process_client(client, nr)
+            rescue Errno::ECONNABORTED
+              # client closed the socket even before accept
+              if client && !client.closed?
+                client.close rescue nil
               end
             end
           end
-          exit 0
-        end # fork
 
-        @workers[pid] = worker_nr
+          # make the following bet: if we accepted clients this round,
+          # we're probably reasonably busy, so avoid calling select(2)
+          # and try to do a blind non-blocking accept(2) on everything
+          # before we sleep again in select
+          if nr != nr_before
+            ready = @listeners
+          else
+            begin
+              # timeout used so we can detect parent death:
+              ret = IO.select(@listeners, nil, nil, @timeout) or next
+              ready = ret[0]
+            rescue Errno::EBADF => e
+              exit(alive ? 1 : 0)
+            end
+          end
+        rescue SystemExit => e
+          exit(e.status)
+        rescue Object => e
+          if alive
+            logger.error "Unhandled listen loop exception #{e.inspect}."
+            logger.error e.backtrace.join("\n")
+          end
+        end
       end
-
-      @workers
     end
 
     # delivers a signal to each worker
@@ -174,25 +338,5 @@ module Unicorn
       end
     end
 
-    # Terminates all workers
-    def stop(graceful = true)
-      old_chld_handler = trap('CHLD') do
-        pid = Process.waitpid(-1, Process::WNOHANG) and @workers.delete(pid)
-      end
-
-      kill_each_worker(graceful ? 'QUIT' : 'TERM')
-
-      timeleft = @timeout
-      until @workers.empty?
-        sleep(1)
-        (timeleft -= 1) > 0 and next
-        kill_each_worker('KILL')
-      end
-
-    ensure
-      trap('CHLD', old_chld_handler)
-      @listeners.each { |sock| sock.close rescue nil }
-    end
-
   end
 end