From 38141ffdd3bda01dabfdd8ff8f065c783053c86a Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Fri, 20 Feb 2009 19:55:10 -0800 Subject: revamp configuration with Configurator DSL The Configurator includes error checking and opens the way for better reloading/error-checking abilities. This also renames many of the config settings with something nginx-like to minimize the learning/setup curve since nginx is the only recommended reverse-proxy for this. s/pid_file/pid/ => blech!, more confusing :< s/listen_backlog/backlog/ => maybe more confusing to some, or less... s/nr_workers/worker_processes/ => less confusing to non-AWKers for sure s/hot_config_file/config_file/ => the config file is now general purpose, not just hot reloads --- lib/unicorn.rb | 286 ++++++++++++++++++++------------------------ lib/unicorn/configurator.rb | 157 ++++++++++++++++++++++++ lib/unicorn/socket.rb | 7 ++ 3 files changed, 291 insertions(+), 159 deletions(-) create mode 100644 lib/unicorn/configurator.rb (limited to 'lib') diff --git a/lib/unicorn.rb b/lib/unicorn.rb index bb2b170..838ab11 100644 --- a/lib/unicorn.rb +++ b/lib/unicorn.rb @@ -4,6 +4,7 @@ require 'unicorn/socket' require 'unicorn/const' require 'unicorn/http_request' require 'unicorn/http_response' +require 'unicorn/configurator' # Unicorn module containing all of the classes (include C extensions) for running # a Unicorn web server. It contains a minimalist HTTP server with just enough @@ -32,27 +33,6 @@ module Unicorn :umask => File.umask, }.freeze - DEFAULT_LOGGER = Logger.new(STDERR) - - DEFAULTS = { - :timeout => 60, - :listeners => [ Const::DEFAULT_LISTEN ], - :logger => DEFAULT_LOGGER, - :nr_workers => 1, - :hot_config_file => nil, - :after_fork => lambda { |server, worker_nr| - server.logger.info("worker=#{worker_nr} spawned pid=#{$$}") - - # per-process listener ports for debugging/admin: - # server.add_listener("127.0.0.1:#{8081 + worker_nr}") - }, - :before_fork => lambda { |server, worker_nr| - server.logger.info("worker=#{worker_nr} spawning...") - }, - :pid_file => nil, - :listen_backlog => 1024, - } - Worker = Struct.new(:nr, :tempfile) unless defined?(Worker) class Worker # worker objects may be compared to just plain numbers @@ -66,20 +46,19 @@ module Unicorn # HttpServer.workers.join to join the thread that's processing # incoming requests on the socket. def initialize(app, options = {}) - (DEFAULTS.to_a + options.to_a).each do |key, value| - instance_variable_set("@#{key.to_s.downcase}", value) - end - + start_ctx = options.delete(:start_ctx) + @start_ctx = DEFAULT_START_CTX.dup + @start_ctx.merge!(start_ctx) if start_ctx @app = app @mode = :idle @master_pid = $$ @workers = Hash.new - @request = HttpRequest.new(logger) # shared between all worker processes - @start_ctx = DEFAULT_START_CTX.dup - @start_ctx.merge!(options[:start_ctx]) if options[:start_ctx] - @purgatory = [] # prevents objects in here from being GC-ed - @rd_sig = @wr_sig = nil - load_hot_config! if @hot_config_file + @io_purgatory = [] # prevents IO objects in here from being GC-ed + @request = @rd_sig = @wr_sig = nil + @reexec_pid = 0 + @config = Configurator.new(options.merge(:use_defaults => true)) + @config.commit!(self, :skip => [:listeners, :pid]) + @listeners = [] end # Runs the thing. Returns self so you can run join on it @@ -91,58 +70,73 @@ module Unicorn inherited = ENV['UNICORN_FD'].to_s.split(/,/).map do |fd| io = Socket.for_fd(fd.to_i) set_server_sockopt(io) + @io_purgatory << io logger.info "inherited: #{io} fd=#{fd} addr=#{sock_name(io)}" - io - end - - if @pid_file - if pid = pid_file_valid?(@pid_file) - raise ArgumentError, "Already running on pid=#{pid} ", - "(or pid_file=#{@pid_file} is stale)" - end - File.open(@pid_file, 'wb') { |fp| fp.syswrite("#{$$}\n") } - at_exit { unlink_pid_file_safe(@pid_file) } + server_cast(io) end - # avoid binding inherited sockets, probably not perfect for TCPSockets - # but it works for UNIXSockets - @listeners -= inherited.map { |io| sock_name(io) } - - # try binding new listeners - @listeners.map! do |addr| - if sock = bind_listen(addr, @listen_backlog) - sock - elsif inherited.empty? || addr[0..0] == "/" - raise Errno::EADDRINUSE, "couldn't bind #{addr}" - else - logger.info "couldn't bind #{addr}, inherited?" - nil - end - end - @listeners += inherited - @listeners.compact! - @listeners.empty? and raise ArgumentError, 'No listener sockets' + config_listeners = @config[:listeners].dup + @listeners.replace(inherited) # we start out with generic Socket objects that get cast to either # TCPServer or UNIXServer objects; but since the Socket objects # share the same OS-level file descriptor as the higher-level *Server # objects; we need to prevent Socket objects from being garbage-collected - @purgatory += @listeners - @listeners.map! { |io| server_cast(io) } - @listeners.each do |io| - logger.info "#{io} listening on fd=#{io.fileno} addr=#{sock_name(io)}" - end + config_listeners -= listener_names + config_listeners.each { |addr| listen(addr) } + listen(Const::DEFAULT_LISTENER) if @listeners.empty? + self.pid = @config[:pid] spawn_missing_workers self end + # replaces current listener set with +listeners+. This will + # close the socket if it will not exist in the new listener set + def listeners=(listeners) + cur_names = listener_names + set_names = listener_names(listeners) + dead_names = cur_names - set_names + + @listeners.delete_if do |io| + if dead_names.include?(sock_name(io)) + @io_purgatory.delete_if { |pio| pio.fileno == io.fileno } + destroy_safely(io) + true + else + false + end + end + + (set_names - cur_names).each { |addr| listen(addr) } + end + + # sets the path for the PID file of the master process + def pid=(path) + if path + if x = valid_pid?(path) + return path if @pid && path == @pid && x == $$ + raise ArgumentError, "Already running on PID:#{x} " \ + "(or pid=#{path} is stale)" + end + File.open(path, 'wb') { |fp| fp.syswrite("#{$$}\n") } + at_exit { unlink_pid_safe(path) } + end + unlink_pid_safe(@pid) if @pid && @pid != path + @pid = path + end + + # add a given address to the +listeners+ set, idempotently # Allows workers to add a private, per-process listener via the # @after_fork hook. Very useful for debugging and testing. - def add_listener(address) - if io = bind_listen(address, @listen_backlog) - @purgatory << io - io = server_cast(io) - logger.info "#{io} listening on pid=#{$$} " \ + def listen(address) + return if String === address && listener_names.include?(address) + + if io = bind_listen(address, @backlog) + if Socket == io.class + @io_purgatory << io + io = server_cast(io) + end + logger.info "#{io} listening on PID:#{$$} " \ "fd=#{io.fileno} addr=#{sock_name(io)}" @listeners << io else @@ -163,6 +157,7 @@ module Unicorn %w(QUIT INT TERM USR1 USR2 HUP).each { |sig| trap_deferred(sig) } $0 = "unicorn master" + logger.info "master process ready" # test relies on this message begin loop do reap_all_workers @@ -184,12 +179,13 @@ module Unicorn @mode = :idle trap_deferred('USR2') when 'HUP' - if @hot_config_file - load_hot_config! + if @config.config_file + load_config! @mode = :idle trap_deferred('HUP') redo # immediate reaping since we may have QUIT workers - else # exec binary and exit + else # exec binary and exit if there's no config file + logger.info "config_file not present, reexecutingn binary" reexec break end @@ -217,7 +213,7 @@ module Unicorn retry end stop # gracefully shutdown all workers on our way out - logger.info "master pid=#{$$} join complete" + logger.info "master PID:#{$$} join complete" end # Terminates all workers, but does not exit master process @@ -233,8 +229,7 @@ module Unicorn kill_each_worker('KILL') end ensure - @listeners.each { |sock| sock.close rescue nil } - @listeners.clear + self.listeners = [] end private @@ -260,22 +255,47 @@ module Unicorn begin loop do pid = waitpid(-1, WNOHANG) or break - worker = @workers.delete(pid) - worker.tempfile.close rescue nil - logger.info "reaped pid=#{pid} " \ - "worker=#{worker && worker.nr || 'unknown'} " \ - "status=#{$?.exitstatus}" + if @reexec_pid == pid + logger.error "reaped exec()-ed PID:#{pid} status=#{$?.exitstatus}" + @reexec_pid = 0 + self.pid = @pid.chomp('.oldbin') if @pid + else + worker = @workers.delete(pid) + worker.tempfile.close rescue nil + logger.info "reaped PID:#{pid} " \ + "worker=#{worker.nr rescue 'unknown'} " \ + "status=#{$?.exitstatus}" + end end rescue Errno::ECHILD end end - # Forks, sets current environment, sets the umask, chdirs to the desired - # start directory, and execs the command line originally passed to us to - # start Unicorn. - # Returns the pid of the forked process - def spawn_start_ctx(check = nil) - fork do + # reexecutes the @start_ctx with a new binary + def reexec + if @reexec_pid > 0 + begin + Process.kill(0, @reexec_pid) + logger.error "reexec-ed child already running PID:#{@reexec_pid}" + return + rescue Errno::ESRCH + @reexec_pid = 0 + end + end + + if @pid + old_pid = "#{@pid}.oldbin" + prev_pid = @pid.dup + begin + self.pid = old_pid # clear the path for a new pid file + rescue ArgumentError + logger.error "old PID:#{valid_pid?(old_pid)} running with " \ + "existing pid=#{old_pid}, refusing rexec" + return + end + end + + @reexec_pid = fork do @rd_sig.close if @rd_sig @wr_sig.close if @wr_sig @workers.values.each { |other| other.tempfile.close rescue nil } @@ -285,38 +305,8 @@ module Unicorn File.umask(@start_ctx[:umask]) Dir.chdir(@start_ctx[:cwd]) cmd = [ @start_ctx[:zero] ] + @start_ctx[:argv] - cmd << 'check' if check - logger.info "executing #{cmd.inspect}" - exec *cmd - end - end - - # ensures @start_ctx is reusable for re-execution - def check_reexec - pid = waitpid(spawn_start_ctx(:check)) - $?.success? and return true - logger.error "exec check failed with #{$?.exitstatus}" - end - - # reexecutes the @start_ctx with a new binary - def reexec - check_reexec or return false - - if @pid_file # clear the path for a new pid file - old_pid_file = "#{@pid_file}.oldbin" - if old_pid = pid_file_valid?(old_pid_file) - logger.error "old pid=#{old_pid} running with " \ - "existing pid_file=#{old_pid_file}, refusing rexec" - return - end - File.open(old_pid_file, 'wb') { |fp| fp.syswrite("#{$$}\n") } - at_exit { unlink_pid_file_safe(old_pid_file) } - File.unlink(@pid_file) if File.exist?(@pid_file) - end - - pid = spawn_start_ctx - if waitpid(pid, WNOHANG) - logger.error "rexec pid=#{pid} died with #{$?.exitstatus}" + logger.info "executing #{cmd.inspect} (in #{Dir.pwd})" + exec(*cmd) end end @@ -330,15 +320,15 @@ module Unicorn now = Time.now @workers.each_pair do |pid, worker| (now - worker.tempfile.ctime) <= @timeout and next - logger.error "worker=#{worker.nr} pid=#{pid} is too old, killing" + logger.error "worker=#{worker.nr} PID:#{pid} is too old, killing" kill_worker('KILL', pid) # take no prisoners for @timeout violations worker.tempfile.close rescue nil end end def spawn_missing_workers - return if @workers.size == @nr_workers - (0...@nr_workers).each do |worker_nr| + return if @workers.size == @worker_processes + (0...@worker_processes).each do |worker_nr| @workers.values.include?(worker_nr) and next tempfile = Tempfile.new('') # as short as possible to save dir space tempfile.unlink # don't allow other processes to find or see it @@ -386,9 +376,9 @@ module Unicorn @start_ctx.clear @mode = @start_ctx = @workers = @rd_sig = @wr_sig = nil @listeners.each { |sock| set_cloexec(sock) } - ENV.delete('UNICORN_DAEMONIZE') ENV.delete('UNICORN_FD') @after_fork.call(self, worker.nr) if @after_fork + @request = HttpRequest.new(logger) end # runs inside each forked worker, this sits around and waits @@ -484,13 +474,13 @@ module Unicorn # unlinks a PID file at given +path+ if it contains the current PID # useful as an at_exit handler. - def unlink_pid_file_safe(path) + def unlink_pid_safe(path) (File.read(path).to_i == $$ and File.unlink(path)) rescue nil end # returns a PID if a given path contains a non-stale PID file, # nil otherwise. - def pid_file_valid?(path) + def valid_pid?(path) if File.exist?(path) && (pid = File.read(path).to_i) > 1 begin kill(0, pid) @@ -501,45 +491,23 @@ module Unicorn nil end - # only do minimal validation, assume the user knows what they're doing - def load_hot_config! - log_pfx = "hot_config_file=#{@hot_config_file}" + def load_config! begin - unless File.readable?(@hot_config_file) - logger.error "#{log_pfx} not readable" - return - end - hot_config = File.read(@hot_config_file) - nr_workers, timeout = @nr_workers, @timeout - eval(hot_config) - if Numeric === @timeout - if timeout != @timeout - logger.info "#{log_pfx} set: timeout=#{@timeout}" - if timeout > @timeout # we don't want to have to KILL them later - logger.info "restarting all workers because timeout got lowered" - kill_each_worker('QUIT') - end - end - else - logger.info "#{log_pfx} invalid: timeout=#{@timeout.inspect}" - @timeout = timeout - end - if Integer === @nr_workers - to_kill = nr_workers - @nr_workers - if to_kill != 0 - logger.info "#{log_pfx} set: nr_workers=#{@nr_workers}" - if to_kill > 0 - @workers.keys[0...to_kill].each { |pid| kill_worker('QUIT', pid) } - end - end - else - logger.info "#{log_pfx} invalid: nr_workers=#{@nr_workers.inspect}" - @nr_workers = nr_workers - end + logger.info "reloading config_file=#{@config.config_file}" + @config.reload + @config.commit!(self) + kill_each_worker('QUIT') + logger.info "done reloading config_file=#{@config.config_file}" rescue Object => e - logger.error "#{log_pfx} error: #{e.message}" + logger.error "error reloading config_file=#{@config.config_file}: " \ + "#{e.class} #{e.message}" end end + # returns an array of string names for the given listener array + def listener_names(listeners = @listeners) + listeners.map { |io| sock_name(io) } + end + end end diff --git a/lib/unicorn/configurator.rb b/lib/unicorn/configurator.rb new file mode 100644 index 0000000..9457480 --- /dev/null +++ b/lib/unicorn/configurator.rb @@ -0,0 +1,157 @@ +module Unicorn + + # Implements a simple DSL for configuring a Unicorn server. + class Configurator + include ::Unicorn::SocketHelper + + DEFAULT_LOGGER = Logger.new(STDERR) unless defined?(DEFAULT_LOGGER) + + DEFAULTS = { + :timeout => 60, + :listeners => [ Const::DEFAULT_LISTEN ], + :logger => DEFAULT_LOGGER, + :worker_processes => 1, + :after_fork => lambda { |server, worker_nr| + server.logger.info("worker=#{worker_nr} spawned pid=#{$$}") + + # per-process listener ports for debugging/admin: + # server.add_listener("127.0.0.1:#{8081 + worker_nr}") + }, + :before_fork => lambda { |server, worker_nr| + server.logger.info("worker=#{worker_nr} spawning...") + }, + :pid => nil, + :backlog => 1024, + } + + attr_reader :config_file + + def initialize(defaults = {}) + @set = Hash.new(:unset) + use_defaults = defaults.delete(:use_defaults) + @config_file = defaults.delete(:config_file) + @config_file.freeze + @set.merge!(DEFAULTS) if use_defaults + defaults.each { |key, value| self.send(key, value) } + reload + end + + def reload + instance_eval(File.read(@config_file)) if @config_file + end + + def commit!(server, options = {}) + skip = options[:skip] || [] + @set.each do |key, value| + (Symbol === value && value == :unset) and next + skip.include?(key) and next + setter = "#{key}=" + if server.respond_to?(setter) + server.send(setter, value) + else + server.instance_variable_set("@#{key}", value) + end + end + end + + def [](key) + @set[key] + end + + # Changes the listen() syscall backlog to +nr+ for yet-to-be-created + # sockets. Due to limitations of the OS, this cannot affect + # existing listener sockets in any way, sockets must be completely + # closed and rebound (inherited sockets preserve their existing + # backlog setting). Some operating systems allow negative values + # here to specify the maximum allowable value. + def backlog(nr) + Integer === nr or raise ArgumentError, + "not an integer: backlog=#{nr.inspect}" + @set[:backlog] = nr + end + + # sets object to the +new+ Logger-like object. The new logger-like + # object must respond to the following methods: + # +debug+, +info+, +warn+, +error+, +fatal+, +close+ + def logger(new) + %w(debug info warn error fatal close).each do |m| + new.respond_to?(m) and next + raise ArgumentError, "logger=#{new} does not respond to method=#{m}" + end + + @set[:logger] = new + end + + # sets after_fork hook to a given block. This block + # will be called by the worker after forking + def after_fork(&block) + set_hook(:after_fork, block) + end + + # sets before_fork got be a given Proc object. This Proc + # object will be called by the master process before forking + # each worker. + def before_fork(&block) + set_hook(:before_fork, block) + end + + # sets the timeout of worker processes to +seconds+ + # This will gracefully restart all workers if the value is lowered + # to prevent them from being timed out according to new timeout rules + def timeout(seconds) + Numeric === seconds or raise ArgumentError, + "not numeric: timeout=#{seconds.inspect}" + seconds > 0 or raise ArgumentError, + "not positive: timeout=#{seconds.inspect}" + @set[:timeout] = seconds + end + + # sets the current number of worker_processes to +nr+ + def worker_processes(nr) + Integer === nr or raise ArgumentError, + "not an integer: worker_processes=#{nr.inspect}" + nr >= 0 or raise ArgumentError, + "not non-negative: worker_processes=#{nr.inspect}" + @set[:worker_processes] = nr + end + + # sets listeners to the given +addresses+, replacing the current set + def listeners(addresses) + Array === addresses or addresses = Array(addresses) + @set[:listeners] = addresses + end + + # adds an +address+ to the existing listener set + def listen(address) + @set[:listeners] = [] unless Array === @set[:listeners] + @set[:listeners] << address + end + + # sets the +path+ for the PID file of the unicorn master process + def pid(path) + if path + path = File.expand_path(path) + File.writable?(File.dirname(path)) or raise ArgumentError, + "directory for pid=#{path} not writable" + end + @set[:pid] = path + end + + private + + def set_hook(var, my_proc) #:nodoc: + case my_proc + when Proc + arity = my_proc.arity + (arity == 2 || arity < 0) or raise ArgumentError, + "#{var}=#{my_proc.inspect} has invalid arity: #{arity}" + when NilClass + my_proc = DEFAULTS[var] + else + raise ArgumentError, "invalid type: #{var}=#{my_proc.inspect}" + end + @set[var] = my_proc + end + + end +end diff --git a/lib/unicorn/socket.rb b/lib/unicorn/socket.rb index 9519448..4913261 100644 --- a/lib/unicorn/socket.rb +++ b/lib/unicorn/socket.rb @@ -62,6 +62,13 @@ module Unicorn end end + def destroy_safely(io) + if io.respond_to?(:path) && File.stat(io.path).ino == io.stat.ino + File.unlink(io.path) rescue nil + end + io.close rescue nil + end + # creates a new server, socket. address may be a HOST:PORT or # an absolute path to a UNIX socket. address can even be a Socket # object in which case it is immediately returned -- cgit v1.2.3-24-ge0c7