diff options
author | Eric Wong <e@80x24.org> | 2013-10-20 09:25:54 +0000 |
---|---|---|
committer | Eric Wong <e@80x24.org> | 2013-10-21 04:09:20 +0000 |
commit | d3335cbaee6c47880b3bb5a372b966842edfeade (patch) | |
tree | 8255bc8670535f34ca57e33ac05272bae51c9ace /lib/yahns/server.rb | |
parent | 92172ac2c4e094cb2d8f8669a1f7666501425263 (diff) | |
download | yahns-d3335cbaee6c47880b3bb5a372b966842edfeade.tar.gz |
Acceptors require closing the descriptor, first, and then doing a (nasty) cross-thread exception to kick the thread out of the blocking accept via Thread#run (pthread_kill SIGVTALRM). We cannot do what we're doing with epoll with acceptors because the accept socket is shared across processes. We will also NEVER be using non-blocking accept, as it's more important we fairly distribute connections between tasks when we're not shutting the server down. The queue worker threads are much easier to kill off :) We can simply inject a new QueueQuitter object as a Level-Triggering epoll watch, activate it, and let it wreak havok on all the worker threads from a single event activation. rb_thread_fd_close is convenient, but expensive with many threads, so be prepared for more systems without it. This is for Rubinius compatibility. Yes, we are actually using Level-Triggered epoll here (despite the non-shutdown pieces of our code being based around EPOLLONESHOT).
Diffstat (limited to 'lib/yahns/server.rb')
-rw-r--r-- | lib/yahns/server.rb | 49 |
1 files changed, 40 insertions, 9 deletions
diff --git a/lib/yahns/server.rb b/lib/yahns/server.rb index 0a964b3..9bb8067 100644 --- a/lib/yahns/server.rb +++ b/lib/yahns/server.rb @@ -1,6 +1,8 @@ # -*- encoding: binary -*- # Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> and all contributors # License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) +require_relative 'queue_quitter' + class Yahns::Server # :nodoc: QUEUE_SIGS = [ :WINCH, :QUIT, :INT, :TERM, :USR1, :USR2, :HUP, :TTIN, :TTOU ] attr_accessor :daemon_pipe @@ -21,7 +23,8 @@ class Yahns::Server # :nodoc: @worker_processes = nil @user = nil @queues = [] - @thr = [] + @wthr = [] + @athr = [] end def sqwakeup(sig) @@ -56,6 +59,10 @@ class Yahns::Server # :nodoc: self end + def drop_listeners + @listeners.each(&:close).clear + end + # replaces current listener set with +listeners+. This will # close the socket if it will not exist in the new listener set def listeners=(listeners) @@ -267,7 +274,7 @@ class Yahns::Server # :nodoc: @config.qeggs.each do |name, qe| queue = qe.vivify(fdmap) qe.worker_threads.times do - @thr << queue.worker_thread(@logger, qe.max_events) + @wthr << queue.worker_thread(@logger, qe.max_events) end @queues << queue queues[qe] = queue @@ -282,7 +289,7 @@ class Yahns::Server # :nodoc: qegg = ctx.qegg || @config.qeggs[:default] # acceptors feed the the queues - @thr << l.spawn_acceptor(@logger, ctx, queues[qegg]) + @athr << l.spawn_acceptor(@logger, ctx, queues[qegg]) end fdmap end @@ -294,7 +301,8 @@ class Yahns::Server # :nodoc: end def quit_enter(alive) - self.listeners = [] # close acceptors, we close epolls in quit_done + @logger.info "#{alive ? :grace : :force}fully exiting" + drop_listeners # close acceptors, we close epolls in quit_done exit(0) unless alive # drop connections immediately if signaled twice @config.config_listeners.each_value do |opts| ctx = opts[:yahns_app_ctx] or next @@ -304,10 +312,21 @@ class Yahns::Server # :nodoc: end # drops all the the IO objects we have threads waiting on before exiting + # This just injects the QueueQuitter object which acts like a + # monkey wrench thrown into a perfectly good engine :) def quit_finish - @queues.each(&:close) - self.listeners = [] # just in case, this is used in ensure - @thr.each(&:join) + quitter = Yahns::QueueQuitter.new + quitter.sev_signal # this will be level trigger for sustained damage! + + # throw the monkey wrench into the worker threads + @queues.each { |q| q.queue_add(quitter, Yahns::Queue::QEV_QUIT) } + + # watch the monkey wrench destroy all the threads! + @wthr.delete_if { |t| t.join(0.01) } while @wthr[0] + + # cleanup, our job is done + @queues.each(&:close).clear + quitter.close end def sp_sig_handle(alive) @@ -322,13 +341,25 @@ class Yahns::Server # :nodoc: reexec when :HUP reexec - return false + return quit_enter(alive) when :TTIN, :TTOU, :WINCH @logger.info("SIG#{sig} ignored in single-process mode") end alive end + def acceptors_alive + @athr.delete_if do |t| + # blocking accept() does not wake up on, close() only EINTR and + # new connections. So Thread#run will send SIGVTALRM via + # pthread_kill, which will break it out of blocking syscalls + # like accept() + # if t.run fails, thread is dead + t.run rescue nil + t.join(1) + end.size > 0 + end + # single-threaded only, this is overriden if @worker_processes is non-nil def join daemon_ready @@ -338,7 +369,7 @@ class Yahns::Server # :nodoc: alive = sp_sig_handle(alive) rescue => e Yahns::Log.exception(@logger, "main loop", e) - end while alive || fdmap.size > 0 + end while alive || acceptors_alive || fdmap.size > 0 unlink_pid_safe(@pid) if @pid ensure quit_finish |