about summary refs log tree commit homepage
path: root/lib
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2009-02-20 19:55:10 -0800
committerEric Wong <normalperson@yhbt.net>2009-02-21 04:23:37 -0800
commit38141ffdd3bda01dabfdd8ff8f065c783053c86a (patch)
tree1d05740ef1520111a2fb5ee7852b7f6e091c2d60 /lib
parentb8697b5fb102049f341e710204dfafeecfedc308 (diff)
downloadunicorn-38141ffdd3bda01dabfdd8ff8f065c783053c86a.tar.gz
The Configurator includes error checking and opens the way for
better reloading/error-checking abilities.

This also renames many of the config settings with something
nginx-like to minimize the learning/setup curve since nginx is
the only recommended reverse-proxy for this.

s/pid_file/pid/
  => blech!, more confusing :<

s/listen_backlog/backlog/
  => maybe more confusing to some, or less...

s/nr_workers/worker_processes/
  => less confusing to non-AWKers for sure

s/hot_config_file/config_file/
  => the config file is now general purpose,
     not just hot reloads
Diffstat (limited to 'lib')
-rw-r--r--lib/unicorn.rb286
-rw-r--r--lib/unicorn/configurator.rb157
-rw-r--r--lib/unicorn/socket.rb7
3 files changed, 291 insertions, 159 deletions
diff --git a/lib/unicorn.rb b/lib/unicorn.rb
index bb2b170..838ab11 100644
--- a/lib/unicorn.rb
+++ b/lib/unicorn.rb
@@ -4,6 +4,7 @@ require 'unicorn/socket'
 require 'unicorn/const'
 require 'unicorn/http_request'
 require 'unicorn/http_response'
+require 'unicorn/configurator'
 
 # Unicorn module containing all of the classes (include C extensions) for running
 # a Unicorn web server.  It contains a minimalist HTTP server with just enough
@@ -32,27 +33,6 @@ module Unicorn
       :umask => File.umask,
     }.freeze
 
-    DEFAULT_LOGGER = Logger.new(STDERR)
-
-    DEFAULTS = {
-      :timeout => 60,
-      :listeners => [ Const::DEFAULT_LISTEN ],
-      :logger => DEFAULT_LOGGER,
-      :nr_workers => 1,
-      :hot_config_file => nil,
-      :after_fork => lambda { |server, worker_nr|
-          server.logger.info("worker=#{worker_nr} spawned pid=#{$$}")
-
-          # per-process listener ports for debugging/admin:
-          # server.add_listener("127.0.0.1:#{8081 + worker_nr}")
-        },
-      :before_fork => lambda { |server, worker_nr|
-          server.logger.info("worker=#{worker_nr} spawning...")
-        },
-      :pid_file => nil,
-      :listen_backlog => 1024,
-    }
-
     Worker = Struct.new(:nr, :tempfile) unless defined?(Worker)
     class Worker
       # worker objects may be compared to just plain numbers
@@ -66,20 +46,19 @@ module Unicorn
     # HttpServer.workers.join to join the thread that's processing
     # incoming requests on the socket.
     def initialize(app, options = {})
-      (DEFAULTS.to_a + options.to_a).each do |key, value|
-        instance_variable_set("@#{key.to_s.downcase}", value)
-      end
-
+      start_ctx = options.delete(:start_ctx)
+      @start_ctx = DEFAULT_START_CTX.dup
+      @start_ctx.merge!(start_ctx) if start_ctx
       @app = app
       @mode = :idle
       @master_pid = $$
       @workers = Hash.new
-      @request = HttpRequest.new(logger) # shared between all worker processes
-      @start_ctx = DEFAULT_START_CTX.dup
-      @start_ctx.merge!(options[:start_ctx]) if options[:start_ctx]
-      @purgatory = [] # prevents objects in here from being GC-ed
-      @rd_sig = @wr_sig = nil
-      load_hot_config! if @hot_config_file
+      @io_purgatory = [] # prevents IO objects in here from being GC-ed
+      @request = @rd_sig = @wr_sig = nil
+      @reexec_pid = 0
+      @config = Configurator.new(options.merge(:use_defaults => true))
+      @config.commit!(self, :skip => [:listeners, :pid])
+      @listeners = []
     end
 
     # Runs the thing.  Returns self so you can run join on it
@@ -91,58 +70,73 @@ module Unicorn
       inherited = ENV['UNICORN_FD'].to_s.split(/,/).map do |fd|
         io = Socket.for_fd(fd.to_i)
         set_server_sockopt(io)
+        @io_purgatory << io
         logger.info "inherited: #{io} fd=#{fd} addr=#{sock_name(io)}"
-        io
-      end
-
-      if @pid_file
-        if pid = pid_file_valid?(@pid_file)
-          raise ArgumentError, "Already running on pid=#{pid} ",
-                               "(or pid_file=#{@pid_file} is stale)"
-        end
-        File.open(@pid_file, 'wb') { |fp| fp.syswrite("#{$$}\n") }
-        at_exit { unlink_pid_file_safe(@pid_file) }
+        server_cast(io)
       end
 
-      # avoid binding inherited sockets, probably not perfect for TCPSockets
-      # but it works for UNIXSockets
-      @listeners -= inherited.map { |io| sock_name(io) }
-
-      # try binding new listeners
-      @listeners.map! do |addr|
-        if sock = bind_listen(addr, @listen_backlog)
-          sock
-        elsif inherited.empty? || addr[0..0] == "/"
-          raise Errno::EADDRINUSE, "couldn't bind #{addr}"
-        else
-          logger.info "couldn't bind #{addr}, inherited?"
-          nil
-        end
-      end
-      @listeners += inherited
-      @listeners.compact!
-      @listeners.empty? and raise ArgumentError, 'No listener sockets'
+      config_listeners = @config[:listeners].dup
+      @listeners.replace(inherited)
 
       # we start out with generic Socket objects that get cast to either
       # TCPServer or UNIXServer objects; but since the Socket objects
       # share the same OS-level file descriptor as the higher-level *Server
       # objects; we need to prevent Socket objects from being garbage-collected
-      @purgatory += @listeners
-      @listeners.map! { |io| server_cast(io) }
-      @listeners.each do |io|
-        logger.info "#{io} listening on fd=#{io.fileno} addr=#{sock_name(io)}"
-      end
+      config_listeners -= listener_names
+      config_listeners.each { |addr| listen(addr) }
+      listen(Const::DEFAULT_LISTENER) if @listeners.empty?
+      self.pid = @config[:pid]
       spawn_missing_workers
       self
     end
 
+    # replaces current listener set with +listeners+.  This will
+    # close the socket if it will not exist in the new listener set
+    def listeners=(listeners)
+      cur_names = listener_names
+      set_names = listener_names(listeners)
+      dead_names = cur_names - set_names
+
+      @listeners.delete_if do |io|
+        if dead_names.include?(sock_name(io))
+          @io_purgatory.delete_if { |pio| pio.fileno == io.fileno }
+          destroy_safely(io)
+          true
+        else
+          false
+        end
+      end
+
+      (set_names - cur_names).each { |addr| listen(addr) }
+    end
+
+    # sets the path for the PID file of the master process
+    def pid=(path)
+      if path
+        if x = valid_pid?(path)
+          return path if @pid && path == @pid && x == $$
+          raise ArgumentError, "Already running on PID:#{x} " \
+                               "(or pid=#{path} is stale)"
+        end
+        File.open(path, 'wb') { |fp| fp.syswrite("#{$$}\n") }
+        at_exit { unlink_pid_safe(path) }
+      end
+      unlink_pid_safe(@pid) if @pid && @pid != path
+      @pid = path
+    end
+
+    # add a given address to the +listeners+ set, idempotently
     # Allows workers to add a private, per-process listener via the
     # @after_fork hook.  Very useful for debugging and testing.
-    def add_listener(address)
-      if io = bind_listen(address, @listen_backlog)
-        @purgatory << io
-        io = server_cast(io)
-        logger.info "#{io} listening on pid=#{$$} " \
+    def listen(address)
+      return if String === address && listener_names.include?(address)
+
+      if io = bind_listen(address, @backlog)
+        if Socket == io.class
+          @io_purgatory << io
+          io = server_cast(io)
+        end
+        logger.info "#{io} listening on PID:#{$$} " \
                     "fd=#{io.fileno} addr=#{sock_name(io)}"
         @listeners << io
       else
@@ -163,6 +157,7 @@ module Unicorn
 
       %w(QUIT INT TERM USR1 USR2 HUP).each { |sig| trap_deferred(sig) }
       $0 = "unicorn master"
+      logger.info "master process ready" # test relies on this message
       begin
         loop do
           reap_all_workers
@@ -184,12 +179,13 @@ module Unicorn
             @mode = :idle
             trap_deferred('USR2')
           when 'HUP'
-            if @hot_config_file
-              load_hot_config!
+            if @config.config_file
+              load_config!
               @mode = :idle
               trap_deferred('HUP')
               redo # immediate reaping since we may have QUIT workers
-            else # exec binary and exit
+            else # exec binary and exit if there's no config file
+              logger.info "config_file not present, reexecutingn binary"
               reexec
               break
             end
@@ -217,7 +213,7 @@ module Unicorn
         retry
       end
       stop # gracefully shutdown all workers on our way out
-      logger.info "master pid=#{$$} join complete"
+      logger.info "master PID:#{$$} join complete"
     end
 
     # Terminates all workers, but does not exit master process
@@ -233,8 +229,7 @@ module Unicorn
         kill_each_worker('KILL')
       end
     ensure
-      @listeners.each { |sock| sock.close rescue nil }
-      @listeners.clear
+      self.listeners = []
     end
 
     private
@@ -260,22 +255,47 @@ module Unicorn
       begin
         loop do
           pid = waitpid(-1, WNOHANG) or break
-          worker = @workers.delete(pid)
-          worker.tempfile.close rescue nil
-          logger.info "reaped pid=#{pid} " \
-                      "worker=#{worker && worker.nr || 'unknown'} " \
-                      "status=#{$?.exitstatus}"
+          if @reexec_pid == pid
+            logger.error "reaped exec()-ed PID:#{pid} status=#{$?.exitstatus}"
+            @reexec_pid = 0
+            self.pid = @pid.chomp('.oldbin') if @pid
+          else
+            worker = @workers.delete(pid)
+            worker.tempfile.close rescue nil
+            logger.info "reaped PID:#{pid} " \
+                        "worker=#{worker.nr rescue 'unknown'} " \
+                        "status=#{$?.exitstatus}"
+          end
         end
       rescue Errno::ECHILD
       end
     end
 
-    # Forks, sets current environment, sets the umask, chdirs to the desired
-    # start directory, and execs the command line originally passed to us to
-    # start Unicorn.
-    # Returns the pid of the forked process
-    def spawn_start_ctx(check = nil)
-      fork do
+    # reexecutes the @start_ctx with a new binary
+    def reexec
+      if @reexec_pid > 0
+        begin
+          Process.kill(0, @reexec_pid)
+          logger.error "reexec-ed child already running PID:#{@reexec_pid}"
+          return
+        rescue Errno::ESRCH
+          @reexec_pid = 0
+        end
+      end
+
+      if @pid
+        old_pid = "#{@pid}.oldbin"
+        prev_pid = @pid.dup
+        begin
+          self.pid = old_pid  # clear the path for a new pid file
+        rescue ArgumentError
+          logger.error "old PID:#{valid_pid?(old_pid)} running with " \
+                       "existing pid=#{old_pid}, refusing rexec"
+          return
+        end
+      end
+
+      @reexec_pid = fork do
         @rd_sig.close if @rd_sig
         @wr_sig.close if @wr_sig
         @workers.values.each { |other| other.tempfile.close rescue nil }
@@ -285,38 +305,8 @@ module Unicorn
         File.umask(@start_ctx[:umask])
         Dir.chdir(@start_ctx[:cwd])
         cmd = [ @start_ctx[:zero] ] + @start_ctx[:argv]
-        cmd << 'check' if check
-        logger.info "executing #{cmd.inspect}"
-        exec *cmd
-      end
-    end
-
-    # ensures @start_ctx is reusable for re-execution
-    def check_reexec
-      pid = waitpid(spawn_start_ctx(:check))
-      $?.success? and return true
-      logger.error "exec check failed with #{$?.exitstatus}"
-    end
-
-    # reexecutes the @start_ctx with a new binary
-    def reexec
-      check_reexec or return false
-
-      if @pid_file # clear the path for a new pid file
-        old_pid_file = "#{@pid_file}.oldbin"
-        if old_pid = pid_file_valid?(old_pid_file)
-          logger.error "old pid=#{old_pid} running with " \
-                       "existing pid_file=#{old_pid_file}, refusing rexec"
-          return
-        end
-        File.open(old_pid_file, 'wb') { |fp| fp.syswrite("#{$$}\n") }
-        at_exit { unlink_pid_file_safe(old_pid_file) }
-        File.unlink(@pid_file) if File.exist?(@pid_file)
-      end
-
-      pid = spawn_start_ctx
-      if waitpid(pid, WNOHANG)
-        logger.error "rexec pid=#{pid} died with #{$?.exitstatus}"
+        logger.info "executing #{cmd.inspect} (in #{Dir.pwd})"
+        exec(*cmd)
       end
     end
 
@@ -330,15 +320,15 @@ module Unicorn
       now = Time.now
       @workers.each_pair do |pid, worker|
         (now - worker.tempfile.ctime) <= @timeout and next
-        logger.error "worker=#{worker.nr} pid=#{pid} is too old, killing"
+        logger.error "worker=#{worker.nr} PID:#{pid} is too old, killing"
         kill_worker('KILL', pid) # take no prisoners for @timeout violations
         worker.tempfile.close rescue nil
       end
     end
 
     def spawn_missing_workers
-      return if @workers.size == @nr_workers
-      (0...@nr_workers).each do |worker_nr|
+      return if @workers.size == @worker_processes
+      (0...@worker_processes).each do |worker_nr|
         @workers.values.include?(worker_nr) and next
         tempfile = Tempfile.new('') # as short as possible to save dir space
         tempfile.unlink # don't allow other processes to find or see it
@@ -386,9 +376,9 @@ module Unicorn
       @start_ctx.clear
       @mode = @start_ctx = @workers = @rd_sig = @wr_sig = nil
       @listeners.each { |sock| set_cloexec(sock) }
-      ENV.delete('UNICORN_DAEMONIZE')
       ENV.delete('UNICORN_FD')
       @after_fork.call(self, worker.nr) if @after_fork
+      @request = HttpRequest.new(logger)
     end
 
     # runs inside each forked worker, this sits around and waits
@@ -484,13 +474,13 @@ module Unicorn
 
     # unlinks a PID file at given +path+ if it contains the current PID
     # useful as an at_exit handler.
-    def unlink_pid_file_safe(path)
+    def unlink_pid_safe(path)
       (File.read(path).to_i == $$ and File.unlink(path)) rescue nil
     end
 
     # returns a PID if a given path contains a non-stale PID file,
     # nil otherwise.
-    def pid_file_valid?(path)
+    def valid_pid?(path)
       if File.exist?(path) && (pid = File.read(path).to_i) > 1
         begin
           kill(0, pid)
@@ -501,45 +491,23 @@ module Unicorn
       nil
     end
 
-    # only do minimal validation, assume the user knows what they're doing
-    def load_hot_config!
-      log_pfx = "hot_config_file=#{@hot_config_file}"
+    def load_config!
       begin
-        unless File.readable?(@hot_config_file)
-          logger.error "#{log_pfx} not readable"
-          return
-        end
-        hot_config = File.read(@hot_config_file)
-        nr_workers, timeout = @nr_workers, @timeout
-        eval(hot_config)
-        if Numeric === @timeout
-          if timeout != @timeout
-            logger.info "#{log_pfx} set: timeout=#{@timeout}"
-            if timeout > @timeout # we don't want to have to KILL them later
-              logger.info "restarting all workers because timeout got lowered"
-              kill_each_worker('QUIT')
-            end
-          end
-        else
-          logger.info "#{log_pfx} invalid: timeout=#{@timeout.inspect}"
-          @timeout = timeout
-        end
-        if Integer === @nr_workers
-          to_kill = nr_workers - @nr_workers
-          if to_kill != 0
-            logger.info "#{log_pfx} set: nr_workers=#{@nr_workers}"
-            if to_kill > 0
-              @workers.keys[0...to_kill].each { |pid| kill_worker('QUIT', pid) }
-            end
-          end
-        else
-          logger.info "#{log_pfx} invalid: nr_workers=#{@nr_workers.inspect}"
-          @nr_workers = nr_workers
-        end
+        logger.info "reloading config_file=#{@config.config_file}"
+        @config.reload
+        @config.commit!(self)
+        kill_each_worker('QUIT')
+        logger.info "done reloading config_file=#{@config.config_file}"
       rescue Object => e
-        logger.error "#{log_pfx} error: #{e.message}"
+        logger.error "error reloading config_file=#{@config.config_file}: " \
+                     "#{e.class} #{e.message}"
       end
     end
 
+    # returns an array of string names for the given listener array
+    def listener_names(listeners = @listeners)
+      listeners.map { |io| sock_name(io) }
+    end
+
   end
 end
diff --git a/lib/unicorn/configurator.rb b/lib/unicorn/configurator.rb
new file mode 100644
index 0000000..9457480
--- /dev/null
+++ b/lib/unicorn/configurator.rb
@@ -0,0 +1,157 @@
+module Unicorn
+
+  # Implements a simple DSL for configuring a Unicorn server.
+  class Configurator
+    include ::Unicorn::SocketHelper
+
+    DEFAULT_LOGGER = Logger.new(STDERR) unless defined?(DEFAULT_LOGGER)
+
+    DEFAULTS = {
+      :timeout => 60,
+      :listeners => [ Const::DEFAULT_LISTEN ],
+      :logger => DEFAULT_LOGGER,
+      :worker_processes => 1,
+      :after_fork => lambda { |server, worker_nr|
+          server.logger.info("worker=#{worker_nr} spawned pid=#{$$}")
+
+          # per-process listener ports for debugging/admin:
+          # server.add_listener("127.0.0.1:#{8081 + worker_nr}")
+        },
+      :before_fork => lambda { |server, worker_nr|
+          server.logger.info("worker=#{worker_nr} spawning...")
+        },
+      :pid => nil,
+      :backlog => 1024,
+    }
+
+    attr_reader :config_file
+
+    def initialize(defaults = {})
+      @set = Hash.new(:unset)
+      use_defaults = defaults.delete(:use_defaults)
+      @config_file = defaults.delete(:config_file)
+      @config_file.freeze
+      @set.merge!(DEFAULTS) if use_defaults
+      defaults.each { |key, value| self.send(key, value) }
+      reload
+    end
+
+    def reload
+      instance_eval(File.read(@config_file)) if @config_file
+    end
+
+    def commit!(server, options = {})
+      skip = options[:skip] || []
+      @set.each do |key, value|
+        (Symbol === value && value == :unset) and next
+        skip.include?(key) and next
+        setter = "#{key}="
+        if server.respond_to?(setter)
+          server.send(setter, value)
+        else
+          server.instance_variable_set("@#{key}", value)
+        end
+      end
+    end
+
+    def [](key)
+      @set[key]
+    end
+
+    # Changes the listen() syscall backlog to +nr+ for yet-to-be-created
+    # sockets.  Due to limitations of the OS, this cannot affect
+    # existing listener sockets in any way, sockets must be completely
+    # closed and rebound (inherited sockets preserve their existing
+    # backlog setting).  Some operating systems allow negative values
+    # here to specify the maximum allowable value.
+    def backlog(nr)
+      Integer === nr or raise ArgumentError,
+         "not an integer: backlog=#{nr.inspect}"
+      @set[:backlog] = nr
+    end
+
+    # sets object to the +new+ Logger-like object.  The new logger-like
+    # object must respond to the following methods:
+    #  +debug+, +info+, +warn+, +error+, +fatal+, +close+
+    def logger(new)
+      %w(debug info warn error fatal close).each do |m|
+        new.respond_to?(m) and next
+        raise ArgumentError, "logger=#{new} does not respond to method=#{m}"
+      end
+
+      @set[:logger] = new
+    end
+
+    # sets after_fork hook to a given block.  This block
+    # will be called by the worker after forking
+    def after_fork(&block)
+      set_hook(:after_fork, block)
+    end
+
+    # sets before_fork got be a given Proc object.  This Proc
+    # object will be called by the master process before forking
+    # each worker.
+    def before_fork(&block)
+      set_hook(:before_fork, block)
+    end
+
+    # sets the timeout of worker processes to +seconds+
+    # This will gracefully restart all workers if the value is lowered
+    # to prevent them from being timed out according to new timeout rules
+    def timeout(seconds)
+      Numeric === seconds or raise ArgumentError,
+                                  "not numeric: timeout=#{seconds.inspect}"
+      seconds > 0 or raise ArgumentError,
+                                  "not positive: timeout=#{seconds.inspect}"
+      @set[:timeout] = seconds
+    end
+
+    # sets the current number of worker_processes to +nr+
+    def worker_processes(nr)
+      Integer === nr or raise ArgumentError,
+                             "not an integer: worker_processes=#{nr.inspect}"
+      nr >= 0 or raise ArgumentError,
+                             "not non-negative: worker_processes=#{nr.inspect}"
+      @set[:worker_processes] = nr
+    end
+
+    # sets listeners to the given +addresses+, replacing the current set
+    def listeners(addresses)
+      Array === addresses or addresses = Array(addresses)
+      @set[:listeners] = addresses
+    end
+
+    # adds an +address+ to the existing listener set
+    def listen(address)
+      @set[:listeners] = [] unless Array === @set[:listeners]
+      @set[:listeners] << address
+    end
+
+    # sets the +path+ for the PID file of the unicorn master process
+    def pid(path)
+      if path
+        path = File.expand_path(path)
+        File.writable?(File.dirname(path)) or raise ArgumentError,
+                                     "directory for pid=#{path} not writable"
+      end
+      @set[:pid] = path
+    end
+
+    private
+
+    def set_hook(var, my_proc) #:nodoc:
+      case my_proc
+      when Proc
+        arity = my_proc.arity
+        (arity == 2 || arity < 0) or raise ArgumentError,
+                        "#{var}=#{my_proc.inspect} has invalid arity: #{arity}"
+      when NilClass
+        my_proc = DEFAULTS[var]
+      else
+        raise ArgumentError, "invalid type: #{var}=#{my_proc.inspect}"
+      end
+      @set[var] = my_proc
+    end
+
+  end
+end
diff --git a/lib/unicorn/socket.rb b/lib/unicorn/socket.rb
index 9519448..4913261 100644
--- a/lib/unicorn/socket.rb
+++ b/lib/unicorn/socket.rb
@@ -62,6 +62,13 @@ module Unicorn
       end
     end
 
+    def destroy_safely(io)
+      if io.respond_to?(:path) && File.stat(io.path).ino == io.stat.ino
+        File.unlink(io.path) rescue nil
+      end
+      io.close rescue nil
+    end
+
     # creates a new server, socket. address may be a HOST:PORT or
     # an absolute path to a UNIX socket.  address can even be a Socket
     # object in which case it is immediately returned