about summary refs log tree commit homepage
path: root/lib/yahns/server_mp.rb
diff options
context:
space:
mode:
Diffstat (limited to 'lib/yahns/server_mp.rb')
-rw-r--r--lib/yahns/server_mp.rb184
1 files changed, 184 insertions, 0 deletions
diff --git a/lib/yahns/server_mp.rb b/lib/yahns/server_mp.rb
new file mode 100644
index 0000000..8818bac
--- /dev/null
+++ b/lib/yahns/server_mp.rb
@@ -0,0 +1,184 @@
+# -*- encoding: binary -*-
+# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> and all contributors
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+module Yahns::ServerMP # :nodoc:
+  EXIT_SIGS = [ :QUIT, :TERM, :INT ]
+
+  def mp_init
+    trap(:CHLD) { @sev.sev_signal }
+  end
+
+  # reaps all unreaped workers
+  def reap_all_workers
+    begin
+      wpid, status = Process.waitpid2(-1, Process::WNOHANG)
+      wpid or return
+      if @reexec_pid == wpid
+        @logger.error "reaped #{status.inspect} exec()-ed"
+        @reexec_pid = 0
+        self.pid = @pid.chomp('.oldbin') if @pid
+        proc_name 'master'
+      else
+        worker = @workers.delete(wpid)
+        worker_id = worker ? worker.nr : "(unknown)"
+        m = "reaped #{status.inspect} worker=#{worker_id}"
+        status.success? ? @logger.info(m) : @logger.error(m)
+      end
+    rescue Errno::ECHILD
+      return
+    end while true
+  end
+
+  def maintain_worker_count
+    (off = @workers.size - @worker_processes) == 0 and return
+    off < 0 and return spawn_missing_workers
+    @workers.each_pair do |wpid, worker|
+      worker.nr >= @worker_processes and Process.kill(:QUIT, wpid)
+    end
+  end
+
+  # delivers a signal to each worker
+  def kill_each_worker(signal)
+    @workers.each_key { |wpid| Process.kill(signal, wpid) }
+  end
+
+  # this is the first thing that runs after forking in a child
+  # gets rid of stuff the worker has no business keeping track of
+  # to free some resources and drops all sig handlers.
+  # traps for USR1, USR2, and HUP may be set in the after_fork Proc
+  # by the user.
+  def after_fork_internal(worker)
+    worker.atfork_child
+
+    # daemon_pipe may be true for non-initial workers
+    @daemon_pipe = @daemon_pipe.close if @daemon_pipe.respond_to?(:close)
+
+    srand # in case this pops up again: https://bugs.ruby-lang.org/issues/4338
+
+    # The OpenSSL PRNG is seeded with only the pid, and apps with frequently
+    # dying workers can recycle pids
+    OpenSSL::Random.seed(rand.to_s) if defined?(OpenSSL::Random)
+    # we'll re-trap EXIT_SIGS later for graceful shutdown iff we accept clients
+    EXIT_SIGS.each { |sig| trap(sig) { exit!(0) } }
+    exit!(0) if (@sig_queue & EXIT_SIGS)[0] # did we inherit sigs from parent?
+    @sig_queue = []
+
+    # ignore WINCH, TTIN, TTOU, HUP in the workers
+    (Yahns::Server::QUEUE_SIGS - EXIT_SIGS).each { |sig| trap(sig, nil) }
+    trap(:CHLD, 'DEFAULT')
+    @logger.info("worker=#{worker.nr} spawned pid=#$$")
+    proc_name "worker[#{worker.nr}]"
+    Yahns::START.clear
+    @sev.close
+    @sev = Yahns::Sigevent.new
+    worker.user(*@user) if @user
+    @user = @workers = nil
+  end
+
+  def spawn_missing_workers
+    worker_nr = -1
+    until (worker_nr += 1) == @worker_processes
+      @workers.value?(worker_nr) and next
+      worker = Yahns::Worker.new(worker_nr)
+      @logger.info("worker=#{worker_nr} spawning...")
+      if pid = fork
+        @workers[pid] = worker.atfork_parent
+      else
+        after_fork_internal(worker)
+        run_mp_worker(worker)
+      end
+    end
+  rescue => e
+    Yahns::Log.exception(@logger, "spawning worker", e)
+    exit!
+  end
+
+  # monitors children and receives signals forever
+  # (or until a termination signal is sent).  This handles signals
+  # one-at-a-time time and we'll happily drop signals in case somebody
+  # is signalling us too often.
+  def join
+    spawn_missing_workers
+    state = :respawn # :QUIT, :WINCH
+    proc_name 'master'
+    @logger.info "master process ready"
+    daemon_ready
+    begin
+      @sev.kgio_wait_readable
+      @sev.yahns_step
+      reap_all_workers
+      case @sig_queue.shift
+      when *EXIT_SIGS # graceful shutdown (twice for non graceful)
+        self.listeners = []
+        kill_each_worker(:QUIT)
+        state = :QUIT
+      when :USR1 # rotate logs
+        usr1_reopen("master ")
+        kill_each_worker(:USR1)
+      when :USR2 # exec binary, stay alive in case something went wrong
+        reexec
+      when :WINCH
+        if @daemon_pipe
+          state = :WINCH
+          @logger.info "gracefully stopping all workers"
+          kill_each_worker(:QUIT)
+          @worker_processes = 0
+        else
+          @logger.info "SIGWINCH ignored because we're not daemonized"
+        end
+      when :TTIN
+        state = :respawn unless state == :QUIT
+        @worker_processes += 1
+      when :TTOU
+        @worker_processes -= 1 if @worker_processes > 0
+      when :HUP
+        state = :respawn unless state == :QUIT
+        if @config.config_file
+          load_config!
+        else # exec binary and exit if there's no config file
+          @logger.info "config_file not present, reexecuting binary"
+          reexec
+        end
+      end while @sig_queue[0]
+      maintain_worker_count if state == :respawn
+    rescue => e
+      Yahns::Log.exception(@logger, "master loop error", e)
+    end while state != :QUIT || @workers.size > 0
+    @logger.info "master complete"
+    unlink_pid_safe(@pid) if @pid
+  end
+
+  def fdmap_init_mp
+    fdmap = fdmap_init # builds apps (if not preloading)
+    EXIT_SIGS.each { |sig| trap(sig) { sqwakeup(sig) } }
+    @config = nil
+    fdmap
+  end
+
+  def run_mp_worker(worker)
+    fdmap = fdmap_init_mp
+    alive = true
+    begin
+      alive = mp_sig_handle(worker, alive)
+    rescue => e
+      Yahns::Log.exception(@logger, "main worker loop", e)
+    end while alive || fdmap.size > 0
+    exit
+  end
+
+  def mp_sig_handle(worker, alive)
+    # not performance critical
+    r = IO.select([worker, @sev], nil, nil, alive ? nil : 0.01) and
+      r[0].each { |io| io.yahns_step }
+    case sig = @sig_queue.shift
+    when *EXIT_SIGS
+      self.listeners = []
+      exit(0) unless alive # drop connections immediately if signaled twice
+      @logger.info("received SIG#{sig}, gracefully exiting")
+      return false
+    when :USR1
+      usr1_reopen("worker ")
+    end
+    alive
+  end
+end