diff options
Diffstat (limited to 'lib/yahns/server.rb')
-rw-r--r-- | lib/yahns/server.rb | 328 |
1 files changed, 328 insertions, 0 deletions
diff --git a/lib/yahns/server.rb b/lib/yahns/server.rb new file mode 100644 index 0000000..c7a5a57 --- /dev/null +++ b/lib/yahns/server.rb @@ -0,0 +1,328 @@ +# -*- encoding: binary -*- +# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> and all contributors +# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) +class Yahns::Server # :nodoc: + QUEUE_SIGS = [ :WINCH, :QUIT, :INT, :TERM, :USR1, :USR2, :HUP, :TTIN, :TTOU ] + attr_accessor :daemon_pipe + attr_accessor :logger + attr_writer :worker_processes + include Yahns::SocketHelper + + def initialize(config) + @reexec_pid = 0 + @daemon_pipe = nil # writable IO or true + @config = config + @workers = {} # pid -> workers + @sig_queue = [] # nil in forked workers + @logger = Logger.new($stderr) + @sev = Yahns::Sigevent.new + @listeners = [] + @pid = nil + @worker_processes = nil + @user = nil + end + + def sqwakeup(sig) + @sig_queue << sig + @sev.sev_signal + end + + def start + @config.commit!(self) + inherit_listeners! + # we try inheriting listeners first, so we bind them later. + # we don't write the pid file until we've bound listeners in case + # yahns was started twice by mistake. Even though our #pid= method + # checks for stale/existing pid files, race conditions are still + # possible (and difficult/non-portable to avoid) and can be likely + # to clobber the pid if the second start was in quick succession + # after the first, so we rely on the listener binding to fail in + # that case. Some tests (in and outside of this source tree) and + # monitoring tools may also rely on pid files existing before we + # attempt to connect to the listener(s) + + # setup signal handlers before writing pid file in case people get + # trigger happy and send signals as soon as the pid file exists. + QUEUE_SIGS.each { |sig| trap(sig) { sqwakeup(sig) } } + self.pid = @config.value(:pid) # write pid file + bind_new_listeners! + if @worker_processes + require 'yahns/server_mp' + extend Yahns::ServerMP + mp_init + end + self + end + + # replaces current listener set with +listeners+. This will + # close the socket if it will not exist in the new listener set + def listeners=(listeners) + cur_names, dead_names = [], [] + listener_names.each do |name| + if ?/ == name[0] + # mark unlinked sockets as dead so we can rebind them + (File.socket?(name) ? cur_names : dead_names) << name + else + cur_names << name + end + end + set_names = listener_names(listeners) + dead_names.concat(cur_names - set_names).uniq! + + @listeners.delete_if do |io| + if dead_names.include?(sock_name(io)) + (io.close rescue nil).nil? # true + else + set_server_sockopt(io, sock_opts(io)) + false + end + end + + (set_names - cur_names).each { |addr| listen(addr) } + end + + # sets the path for the PID file of the master process + def pid=(path) + if path + if x = valid_pid?(path) + return path if @pid && path == @pid && x == $$ + if x == @reexec_pid && @pid =~ /\.oldbin\z/ + @logger.warn("will not set pid=#{path} while reexec-ed "\ + "child is running PID:#{x}") + return + end + raise ArgumentError, "Already running on PID:#{x} " \ + "(or pid=#{path} is stale)" + end + end + unlink_pid_safe(@pid) if @pid + + if path + fp = begin + tmp = "#{File.dirname(path)}/#{rand}.#$$" + File.open(tmp, File::RDWR|File::CREAT|File::EXCL, 0644) + rescue Errno::EEXIST + retry + end + fp.syswrite("#$$\n") + File.rename(fp.path, path) + fp.close + end + @pid = path + end + + # add a given address to the +listeners+ set, idempotently + # Allows workers to add a private, per-process listener via the + # after_fork hook. Very useful for debugging and testing. + # +:tries+ may be specified as an option for the number of times + # to retry, and +:delay+ may be specified as the time in seconds + # to delay between retries. + # A negative value for +:tries+ indicates the listen will be + # retried indefinitely, this is useful when workers belonging to + # different masters are spawned during a transparent upgrade. + def listen(address) + address = @config.expand_addr(address) + return if String === address && listener_names.include?(address) + + begin + io = bind_listen(address, sock_opts(address)) + unless Kgio::TCPServer === io || Kgio::UNIXServer === io + io = server_cast(io) + end + @logger.info "listening on addr=#{sock_name(io)} fd=#{io.fileno}" + @listeners << io + io + rescue Errno::EADDRINUSE => err + @logger.error "adding listener failed addr=#{address} (in use)" + rescue => err + @logger.fatal "error adding listener addr=#{address}" + raise err + end + end + + def daemon_ready + @daemon_pipe or return + @daemon_pipe.syswrite("#$$") + @daemon_pipe.close + @daemon_pipe = true # for SIGWINCH + end + + # reexecutes the Yahns::START with a new binary + def reexec + if @reexec_pid > 0 + begin + Process.kill(0, @reexec_pid) + @logger.error "reexec-ed child already running PID:#@reexec_pid" + return + rescue Errno::ESRCH + @reexec_pid = 0 + end + end + + if @pid + old_pid = "#@pid.oldbin" + begin + self.pid = old_pid # clear the path for a new pid file + rescue ArgumentError + @logger.error "old PID:#{valid_pid?(old_pid)} running with " \ + "existing pid=#{old_pid}, refusing rexec" + return + rescue => e + @logger.error "error writing pid=#{old_pid} #{e.class} #{e.message}" + return + end + end + + @reexec_pid = fork do + redirects = {} + listeners.each do |sock| + sock.close_on_exec = false + redirects[sock.fileno] = sock + end + ENV['YAHNS_FD'] = redirects.keys.map(&:to_s).join(',') + Dir.chdir(@config.value(:working_directory) || Yahns::START[:cwd]) + cmd = [ Yahns::START[0] ].concat(Yahns::START[:argv]) + @logger.info "executing #{cmd.inspect} (in #{Dir.pwd})" + cmd << redirects + exec(*cmd) + end + proc_name 'master (old)' + end + + # unlinks a PID file at given +path+ if it contains the current PID + # still potentially racy without locking the directory (which is + # non-portable and may interact badly with other programs), but the + # window for hitting the race condition is small + def unlink_pid_safe(path) + (File.read(path).to_i == $$ and File.unlink(path)) rescue nil + end + + # returns a PID if a given path contains a non-stale PID file, + # nil otherwise. + def valid_pid?(path) + wpid = File.read(path).to_i + wpid <= 0 and return + Process.kill(0, wpid) + wpid + rescue Errno::EPERM + @logger.info "pid=#{path} possibly stale, got EPERM signalling PID:#{wpid}" + nil + rescue Errno::ESRCH, Errno::ENOENT + # don't unlink stale pid files, racy without non-portable locking... + end + + def load_config! + @logger.info "reloading config_file=#{@config.config_file}" + @config.config_reload! + @config.commit!(self) + kill_each_worker(:QUIT) + Yahns::Log.reopen_all + @logger.info "done reloading config_file=#{@config.config_file}" + rescue StandardError, LoadError, SyntaxError => e + Yahns::Log.exception(@logger, + "error reloading config_file=#{@config.config_file}", e) + end + + # returns an array of string names for the given listener array + def listener_names(listeners = @listeners) + listeners.map { |io| sock_name(io) } + end + + def sock_opts(io) + @config.config_listeners[sock_name(io)] + end + + def inherit_listeners! + # inherit sockets from parents, they need to be plain Socket objects + # before they become Kgio::UNIXServer or Kgio::TCPServer + inherited = ENV['YAHNS_FD'].to_s.split(/,/).map do |fd| + io = Socket.for_fd(fd.to_i) + set_server_sockopt(io, sock_opts(io)) + @logger.info "inherited addr=#{sock_name(io)} fd=#{fd}" + server_cast(io) + end + + @listeners.replace(inherited) + end + + # call only after calling inherit_listeners! + # This binds any listeners we did NOT inherit from the parent + def bind_new_listeners! + self.listeners = @config.config_listeners.keys + raise ArgumentError, "no listeners" if @listeners.empty? + @listeners.each { |l| l.extend(Yahns::Acceptor) } + end + + def proc_name(tag) + s = Yahns::START + $0 = ([ File.basename(s[0]), tag ]).concat(s[:argv]).join(' ') + end + + # spins up processing threads of the server + def fdmap_init + thresh = @config.value(:client_expire_threshold) + + # keeps track of all connections, like ObjectSpace, but only for IOs + fdmap = Yahns::Fdmap.new(@logger, thresh) + + # initialize queues (epoll/kqueue) and associated worker threads + queues = {} + @config.qeggs.each do |name, qegg| + queue = qegg.qc_vivify(fdmap) # worker threads run after this + queues[qegg] = queue + end + + # spin up applications (which are preload: false) + @config.app_ctx.each { |ctx| ctx.after_fork_init } + + # spin up acceptors, clients flow into worker queues after this + @listeners.each do |l| + ctx = sock_opts(l)[:yahns_app_ctx] + qegg = ctx.qegg || @config.qeggs[:default] + + # acceptors feed the the queues + l.spawn_acceptor(@logger, ctx, queues[qegg]) + end + fdmap + end + + def usr1_reopen(prefix) + @logger.info "#{prefix}reopening logs..." + Yahns::Log.reopen_all + @logger.info "#{prefix}done reopening logs" + end + + def sp_sig_handle(alive) + @sev.kgio_wait_readable(alive ? nil : 0.01) + @sev.yahns_step + case sig = @sig_queue.shift + when :QUIT, :TERM, :INT + self.listeners = [] # stop accepting new connections + exit(0) unless alive + return false + when :USR1 + usr1_reopen('') + when :USR2 + reexec + when :HUP + reexec + return false + when :TTIN, :TTOU, :WINCH + @logger.info("SIG#{sig} ignored in single-process mode") + end + alive + end + + # single-threaded only, this is overriden if @worker_processes is non-nil + def join + daemon_ready + fdmap = fdmap_init + alive = true + begin + alive = sp_sig_handle(alive) + rescue => e + Yahns::Log.exception(@logger, "main loop", e) + end while alive || fdmap.size > 0 + unlink_pid_safe(@pid) if @pid + end +end |