diff options
Diffstat (limited to 'lib/yahns/server.rb')
-rw-r--r-- | lib/yahns/server.rb | 49 |
1 files changed, 40 insertions, 9 deletions
diff --git a/lib/yahns/server.rb b/lib/yahns/server.rb index 0a964b3..9bb8067 100644 --- a/lib/yahns/server.rb +++ b/lib/yahns/server.rb @@ -1,6 +1,8 @@ # -*- encoding: binary -*- # Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> and all contributors # License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) +require_relative 'queue_quitter' + class Yahns::Server # :nodoc: QUEUE_SIGS = [ :WINCH, :QUIT, :INT, :TERM, :USR1, :USR2, :HUP, :TTIN, :TTOU ] attr_accessor :daemon_pipe @@ -21,7 +23,8 @@ class Yahns::Server # :nodoc: @worker_processes = nil @user = nil @queues = [] - @thr = [] + @wthr = [] + @athr = [] end def sqwakeup(sig) @@ -56,6 +59,10 @@ class Yahns::Server # :nodoc: self end + def drop_listeners + @listeners.each(&:close).clear + end + # replaces current listener set with +listeners+. This will # close the socket if it will not exist in the new listener set def listeners=(listeners) @@ -267,7 +274,7 @@ class Yahns::Server # :nodoc: @config.qeggs.each do |name, qe| queue = qe.vivify(fdmap) qe.worker_threads.times do - @thr << queue.worker_thread(@logger, qe.max_events) + @wthr << queue.worker_thread(@logger, qe.max_events) end @queues << queue queues[qe] = queue @@ -282,7 +289,7 @@ class Yahns::Server # :nodoc: qegg = ctx.qegg || @config.qeggs[:default] # acceptors feed the the queues - @thr << l.spawn_acceptor(@logger, ctx, queues[qegg]) + @athr << l.spawn_acceptor(@logger, ctx, queues[qegg]) end fdmap end @@ -294,7 +301,8 @@ class Yahns::Server # :nodoc: end def quit_enter(alive) - self.listeners = [] # close acceptors, we close epolls in quit_done + @logger.info "#{alive ? :grace : :force}fully exiting" + drop_listeners # close acceptors, we close epolls in quit_done exit(0) unless alive # drop connections immediately if signaled twice @config.config_listeners.each_value do |opts| ctx = opts[:yahns_app_ctx] or next @@ -304,10 +312,21 @@ class Yahns::Server # :nodoc: end # drops all the the IO objects we have threads waiting on before exiting + # This just injects the QueueQuitter object which acts like a + # monkey wrench thrown into a perfectly good engine :) def quit_finish - @queues.each(&:close) - self.listeners = [] # just in case, this is used in ensure - @thr.each(&:join) + quitter = Yahns::QueueQuitter.new + quitter.sev_signal # this will be level trigger for sustained damage! + + # throw the monkey wrench into the worker threads + @queues.each { |q| q.queue_add(quitter, Yahns::Queue::QEV_QUIT) } + + # watch the monkey wrench destroy all the threads! + @wthr.delete_if { |t| t.join(0.01) } while @wthr[0] + + # cleanup, our job is done + @queues.each(&:close).clear + quitter.close end def sp_sig_handle(alive) @@ -322,13 +341,25 @@ class Yahns::Server # :nodoc: reexec when :HUP reexec - return false + return quit_enter(alive) when :TTIN, :TTOU, :WINCH @logger.info("SIG#{sig} ignored in single-process mode") end alive end + def acceptors_alive + @athr.delete_if do |t| + # blocking accept() does not wake up on, close() only EINTR and + # new connections. So Thread#run will send SIGVTALRM via + # pthread_kill, which will break it out of blocking syscalls + # like accept() + # if t.run fails, thread is dead + t.run rescue nil + t.join(1) + end.size > 0 + end + # single-threaded only, this is overriden if @worker_processes is non-nil def join daemon_ready @@ -338,7 +369,7 @@ class Yahns::Server # :nodoc: alive = sp_sig_handle(alive) rescue => e Yahns::Log.exception(@logger, "main loop", e) - end while alive || fdmap.size > 0 + end while alive || acceptors_alive || fdmap.size > 0 unlink_pid_safe(@pid) if @pid ensure quit_finish |