about summary refs log tree commit homepage
path: root/lib/yahns/server.rb
diff options
context:
space:
mode:
Diffstat (limited to 'lib/yahns/server.rb')
-rw-r--r--lib/yahns/server.rb328
1 files changed, 328 insertions, 0 deletions
diff --git a/lib/yahns/server.rb b/lib/yahns/server.rb
new file mode 100644
index 0000000..c7a5a57
--- /dev/null
+++ b/lib/yahns/server.rb
@@ -0,0 +1,328 @@
+# -*- encoding: binary -*-
+# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> and all contributors
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+class Yahns::Server # :nodoc:
+  QUEUE_SIGS = [ :WINCH, :QUIT, :INT, :TERM, :USR1, :USR2, :HUP, :TTIN, :TTOU ]
+  attr_accessor :daemon_pipe
+  attr_accessor :logger
+  attr_writer :worker_processes
+  include Yahns::SocketHelper
+
+  def initialize(config)
+    @reexec_pid = 0
+    @daemon_pipe = nil # writable IO or true
+    @config = config
+    @workers = {} # pid -> workers
+    @sig_queue = [] # nil in forked workers
+    @logger = Logger.new($stderr)
+    @sev = Yahns::Sigevent.new
+    @listeners = []
+    @pid = nil
+    @worker_processes = nil
+    @user = nil
+  end
+
+  def sqwakeup(sig)
+    @sig_queue << sig
+    @sev.sev_signal
+  end
+
+  def start
+    @config.commit!(self)
+    inherit_listeners!
+    # we try inheriting listeners first, so we bind them later.
+    # we don't write the pid file until we've bound listeners in case
+    # yahns was started twice by mistake.  Even though our #pid= method
+    # checks for stale/existing pid files, race conditions are still
+    # possible (and difficult/non-portable to avoid) and can be likely
+    # to clobber the pid if the second start was in quick succession
+    # after the first, so we rely on the listener binding to fail in
+    # that case.  Some tests (in and outside of this source tree) and
+    # monitoring tools may also rely on pid files existing before we
+    # attempt to connect to the listener(s)
+
+    # setup signal handlers before writing pid file in case people get
+    # trigger happy and send signals as soon as the pid file exists.
+    QUEUE_SIGS.each { |sig| trap(sig) { sqwakeup(sig) } }
+    self.pid = @config.value(:pid) # write pid file
+    bind_new_listeners!
+    if @worker_processes
+      require 'yahns/server_mp'
+      extend Yahns::ServerMP
+      mp_init
+    end
+    self
+  end
+
+  # replaces current listener set with +listeners+.  This will
+  # close the socket if it will not exist in the new listener set
+  def listeners=(listeners)
+    cur_names, dead_names = [], []
+    listener_names.each do |name|
+      if ?/ == name[0]
+        # mark unlinked sockets as dead so we can rebind them
+        (File.socket?(name) ? cur_names : dead_names) << name
+      else
+        cur_names << name
+      end
+    end
+    set_names = listener_names(listeners)
+    dead_names.concat(cur_names - set_names).uniq!
+
+    @listeners.delete_if do |io|
+      if dead_names.include?(sock_name(io))
+        (io.close rescue nil).nil? # true
+      else
+        set_server_sockopt(io, sock_opts(io))
+        false
+      end
+    end
+
+    (set_names - cur_names).each { |addr| listen(addr) }
+  end
+
+  # sets the path for the PID file of the master process
+  def pid=(path)
+    if path
+      if x = valid_pid?(path)
+        return path if @pid && path == @pid && x == $$
+        if x == @reexec_pid && @pid =~ /\.oldbin\z/
+          @logger.warn("will not set pid=#{path} while reexec-ed "\
+                       "child is running PID:#{x}")
+          return
+        end
+        raise ArgumentError, "Already running on PID:#{x} " \
+                             "(or pid=#{path} is stale)"
+      end
+    end
+    unlink_pid_safe(@pid) if @pid
+
+    if path
+      fp = begin
+        tmp = "#{File.dirname(path)}/#{rand}.#$$"
+        File.open(tmp, File::RDWR|File::CREAT|File::EXCL, 0644)
+      rescue Errno::EEXIST
+        retry
+      end
+      fp.syswrite("#$$\n")
+      File.rename(fp.path, path)
+      fp.close
+    end
+    @pid = path
+  end
+
+  # add a given address to the +listeners+ set, idempotently
+  # Allows workers to add a private, per-process listener via the
+  # after_fork hook.  Very useful for debugging and testing.
+  # +:tries+ may be specified as an option for the number of times
+  # to retry, and +:delay+ may be specified as the time in seconds
+  # to delay between retries.
+  # A negative value for +:tries+ indicates the listen will be
+  # retried indefinitely, this is useful when workers belonging to
+  # different masters are spawned during a transparent upgrade.
+  def listen(address)
+    address = @config.expand_addr(address)
+    return if String === address && listener_names.include?(address)
+
+    begin
+      io = bind_listen(address, sock_opts(address))
+      unless Kgio::TCPServer === io || Kgio::UNIXServer === io
+        io = server_cast(io)
+      end
+      @logger.info "listening on addr=#{sock_name(io)} fd=#{io.fileno}"
+      @listeners << io
+      io
+    rescue Errno::EADDRINUSE => err
+      @logger.error "adding listener failed addr=#{address} (in use)"
+    rescue => err
+      @logger.fatal "error adding listener addr=#{address}"
+      raise err
+    end
+  end
+
+  def daemon_ready
+    @daemon_pipe or return
+    @daemon_pipe.syswrite("#$$")
+    @daemon_pipe.close
+    @daemon_pipe = true # for SIGWINCH
+  end
+
+  # reexecutes the Yahns::START with a new binary
+  def reexec
+    if @reexec_pid > 0
+      begin
+        Process.kill(0, @reexec_pid)
+        @logger.error "reexec-ed child already running PID:#@reexec_pid"
+        return
+      rescue Errno::ESRCH
+        @reexec_pid = 0
+      end
+    end
+
+    if @pid
+      old_pid = "#@pid.oldbin"
+      begin
+        self.pid = old_pid  # clear the path for a new pid file
+      rescue ArgumentError
+        @logger.error "old PID:#{valid_pid?(old_pid)} running with " \
+                      "existing pid=#{old_pid}, refusing rexec"
+        return
+      rescue => e
+        @logger.error "error writing pid=#{old_pid} #{e.class} #{e.message}"
+        return
+      end
+    end
+
+    @reexec_pid = fork do
+      redirects = {}
+      listeners.each do |sock|
+        sock.close_on_exec = false
+        redirects[sock.fileno] = sock
+      end
+      ENV['YAHNS_FD'] = redirects.keys.map(&:to_s).join(',')
+      Dir.chdir(@config.value(:working_directory) || Yahns::START[:cwd])
+      cmd = [ Yahns::START[0] ].concat(Yahns::START[:argv])
+      @logger.info "executing #{cmd.inspect} (in #{Dir.pwd})"
+      cmd << redirects
+      exec(*cmd)
+    end
+    proc_name 'master (old)'
+  end
+
+  # unlinks a PID file at given +path+ if it contains the current PID
+  # still potentially racy without locking the directory (which is
+  # non-portable and may interact badly with other programs), but the
+  # window for hitting the race condition is small
+  def unlink_pid_safe(path)
+    (File.read(path).to_i == $$ and File.unlink(path)) rescue nil
+  end
+
+  # returns a PID if a given path contains a non-stale PID file,
+  # nil otherwise.
+  def valid_pid?(path)
+    wpid = File.read(path).to_i
+    wpid <= 0 and return
+    Process.kill(0, wpid)
+    wpid
+  rescue Errno::EPERM
+    @logger.info "pid=#{path} possibly stale, got EPERM signalling PID:#{wpid}"
+    nil
+  rescue Errno::ESRCH, Errno::ENOENT
+    # don't unlink stale pid files, racy without non-portable locking...
+  end
+
+  def load_config!
+    @logger.info "reloading config_file=#{@config.config_file}"
+    @config.config_reload!
+    @config.commit!(self)
+    kill_each_worker(:QUIT)
+    Yahns::Log.reopen_all
+    @logger.info "done reloading config_file=#{@config.config_file}"
+  rescue StandardError, LoadError, SyntaxError => e
+    Yahns::Log.exception(@logger,
+                     "error reloading config_file=#{@config.config_file}", e)
+  end
+
+  # returns an array of string names for the given listener array
+  def listener_names(listeners = @listeners)
+    listeners.map { |io| sock_name(io) }
+  end
+
+  def sock_opts(io)
+    @config.config_listeners[sock_name(io)]
+  end
+
+  def inherit_listeners!
+    # inherit sockets from parents, they need to be plain Socket objects
+    # before they become Kgio::UNIXServer or Kgio::TCPServer
+    inherited = ENV['YAHNS_FD'].to_s.split(/,/).map do |fd|
+      io = Socket.for_fd(fd.to_i)
+      set_server_sockopt(io, sock_opts(io))
+      @logger.info "inherited addr=#{sock_name(io)} fd=#{fd}"
+      server_cast(io)
+    end
+
+    @listeners.replace(inherited)
+  end
+
+  # call only after calling inherit_listeners!
+  # This binds any listeners we did NOT inherit from the parent
+  def bind_new_listeners!
+    self.listeners = @config.config_listeners.keys
+    raise ArgumentError, "no listeners" if @listeners.empty?
+    @listeners.each { |l| l.extend(Yahns::Acceptor) }
+  end
+
+  def proc_name(tag)
+    s = Yahns::START
+    $0 = ([ File.basename(s[0]), tag ]).concat(s[:argv]).join(' ')
+  end
+
+  # spins up processing threads of the server
+  def fdmap_init
+    thresh = @config.value(:client_expire_threshold)
+
+    # keeps track of all connections, like ObjectSpace, but only for IOs
+    fdmap = Yahns::Fdmap.new(@logger, thresh)
+
+    # initialize queues (epoll/kqueue) and associated worker threads
+    queues = {}
+    @config.qeggs.each do |name, qegg|
+      queue = qegg.qc_vivify(fdmap) # worker threads run after this
+      queues[qegg] = queue
+    end
+
+    # spin up applications (which are preload: false)
+    @config.app_ctx.each { |ctx| ctx.after_fork_init }
+
+    # spin up acceptors, clients flow into worker queues after this
+    @listeners.each do |l|
+      ctx = sock_opts(l)[:yahns_app_ctx]
+      qegg = ctx.qegg || @config.qeggs[:default]
+
+      # acceptors feed the the queues
+      l.spawn_acceptor(@logger, ctx, queues[qegg])
+    end
+    fdmap
+  end
+
+  def usr1_reopen(prefix)
+    @logger.info "#{prefix}reopening logs..."
+    Yahns::Log.reopen_all
+    @logger.info "#{prefix}done reopening logs"
+  end
+
+  def sp_sig_handle(alive)
+    @sev.kgio_wait_readable(alive ? nil : 0.01)
+    @sev.yahns_step
+    case sig = @sig_queue.shift
+    when :QUIT, :TERM, :INT
+      self.listeners = [] # stop accepting new connections
+      exit(0) unless alive
+      return false
+    when :USR1
+      usr1_reopen('')
+    when :USR2
+      reexec
+    when :HUP
+      reexec
+      return false
+    when :TTIN, :TTOU, :WINCH
+      @logger.info("SIG#{sig} ignored in single-process mode")
+    end
+    alive
+  end
+
+  # single-threaded only, this is overriden if @worker_processes is non-nil
+  def join
+    daemon_ready
+    fdmap = fdmap_init
+    alive = true
+    begin
+      alive = sp_sig_handle(alive)
+    rescue => e
+      Yahns::Log.exception(@logger, "main loop", e)
+    end while alive || fdmap.size > 0
+    unlink_pid_safe(@pid) if @pid
+  end
+end