diff options
Diffstat (limited to 'lib/yahns/server_mp.rb')
-rw-r--r-- | lib/yahns/server_mp.rb | 184 |
1 files changed, 184 insertions, 0 deletions
diff --git a/lib/yahns/server_mp.rb b/lib/yahns/server_mp.rb new file mode 100644 index 0000000..8818bac --- /dev/null +++ b/lib/yahns/server_mp.rb @@ -0,0 +1,184 @@ +# -*- encoding: binary -*- +# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> and all contributors +# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) +module Yahns::ServerMP # :nodoc: + EXIT_SIGS = [ :QUIT, :TERM, :INT ] + + def mp_init + trap(:CHLD) { @sev.sev_signal } + end + + # reaps all unreaped workers + def reap_all_workers + begin + wpid, status = Process.waitpid2(-1, Process::WNOHANG) + wpid or return + if @reexec_pid == wpid + @logger.error "reaped #{status.inspect} exec()-ed" + @reexec_pid = 0 + self.pid = @pid.chomp('.oldbin') if @pid + proc_name 'master' + else + worker = @workers.delete(wpid) + worker_id = worker ? worker.nr : "(unknown)" + m = "reaped #{status.inspect} worker=#{worker_id}" + status.success? ? @logger.info(m) : @logger.error(m) + end + rescue Errno::ECHILD + return + end while true + end + + def maintain_worker_count + (off = @workers.size - @worker_processes) == 0 and return + off < 0 and return spawn_missing_workers + @workers.each_pair do |wpid, worker| + worker.nr >= @worker_processes and Process.kill(:QUIT, wpid) + end + end + + # delivers a signal to each worker + def kill_each_worker(signal) + @workers.each_key { |wpid| Process.kill(signal, wpid) } + end + + # this is the first thing that runs after forking in a child + # gets rid of stuff the worker has no business keeping track of + # to free some resources and drops all sig handlers. + # traps for USR1, USR2, and HUP may be set in the after_fork Proc + # by the user. + def after_fork_internal(worker) + worker.atfork_child + + # daemon_pipe may be true for non-initial workers + @daemon_pipe = @daemon_pipe.close if @daemon_pipe.respond_to?(:close) + + srand # in case this pops up again: https://bugs.ruby-lang.org/issues/4338 + + # The OpenSSL PRNG is seeded with only the pid, and apps with frequently + # dying workers can recycle pids + OpenSSL::Random.seed(rand.to_s) if defined?(OpenSSL::Random) + # we'll re-trap EXIT_SIGS later for graceful shutdown iff we accept clients + EXIT_SIGS.each { |sig| trap(sig) { exit!(0) } } + exit!(0) if (@sig_queue & EXIT_SIGS)[0] # did we inherit sigs from parent? + @sig_queue = [] + + # ignore WINCH, TTIN, TTOU, HUP in the workers + (Yahns::Server::QUEUE_SIGS - EXIT_SIGS).each { |sig| trap(sig, nil) } + trap(:CHLD, 'DEFAULT') + @logger.info("worker=#{worker.nr} spawned pid=#$$") + proc_name "worker[#{worker.nr}]" + Yahns::START.clear + @sev.close + @sev = Yahns::Sigevent.new + worker.user(*@user) if @user + @user = @workers = nil + end + + def spawn_missing_workers + worker_nr = -1 + until (worker_nr += 1) == @worker_processes + @workers.value?(worker_nr) and next + worker = Yahns::Worker.new(worker_nr) + @logger.info("worker=#{worker_nr} spawning...") + if pid = fork + @workers[pid] = worker.atfork_parent + else + after_fork_internal(worker) + run_mp_worker(worker) + end + end + rescue => e + Yahns::Log.exception(@logger, "spawning worker", e) + exit! + end + + # monitors children and receives signals forever + # (or until a termination signal is sent). This handles signals + # one-at-a-time time and we'll happily drop signals in case somebody + # is signalling us too often. + def join + spawn_missing_workers + state = :respawn # :QUIT, :WINCH + proc_name 'master' + @logger.info "master process ready" + daemon_ready + begin + @sev.kgio_wait_readable + @sev.yahns_step + reap_all_workers + case @sig_queue.shift + when *EXIT_SIGS # graceful shutdown (twice for non graceful) + self.listeners = [] + kill_each_worker(:QUIT) + state = :QUIT + when :USR1 # rotate logs + usr1_reopen("master ") + kill_each_worker(:USR1) + when :USR2 # exec binary, stay alive in case something went wrong + reexec + when :WINCH + if @daemon_pipe + state = :WINCH + @logger.info "gracefully stopping all workers" + kill_each_worker(:QUIT) + @worker_processes = 0 + else + @logger.info "SIGWINCH ignored because we're not daemonized" + end + when :TTIN + state = :respawn unless state == :QUIT + @worker_processes += 1 + when :TTOU + @worker_processes -= 1 if @worker_processes > 0 + when :HUP + state = :respawn unless state == :QUIT + if @config.config_file + load_config! + else # exec binary and exit if there's no config file + @logger.info "config_file not present, reexecuting binary" + reexec + end + end while @sig_queue[0] + maintain_worker_count if state == :respawn + rescue => e + Yahns::Log.exception(@logger, "master loop error", e) + end while state != :QUIT || @workers.size > 0 + @logger.info "master complete" + unlink_pid_safe(@pid) if @pid + end + + def fdmap_init_mp + fdmap = fdmap_init # builds apps (if not preloading) + EXIT_SIGS.each { |sig| trap(sig) { sqwakeup(sig) } } + @config = nil + fdmap + end + + def run_mp_worker(worker) + fdmap = fdmap_init_mp + alive = true + begin + alive = mp_sig_handle(worker, alive) + rescue => e + Yahns::Log.exception(@logger, "main worker loop", e) + end while alive || fdmap.size > 0 + exit + end + + def mp_sig_handle(worker, alive) + # not performance critical + r = IO.select([worker, @sev], nil, nil, alive ? nil : 0.01) and + r[0].each { |io| io.yahns_step } + case sig = @sig_queue.shift + when *EXIT_SIGS + self.listeners = [] + exit(0) unless alive # drop connections immediately if signaled twice + @logger.info("received SIG#{sig}, gracefully exiting") + return false + when :USR1 + usr1_reopen("worker ") + end + alive + end +end |