From 35a72997899616b6d453e6dbf6a7d4477103dd1a Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Tue, 22 Oct 2013 01:23:23 +0000 Subject: rework acceptor thread shutdown (again) Calling close on the socket while accept4 is running on a different thread can be problematic on Rubinius and some versions of MRI. So we will use a thread-local variable, continuously issue new connections to our own socket and rely on the round-robin behavior of accept4 to eventually get our acceptor thread to wakeup. We must continuously issue new connections because some connections may be going to a new processes (from SIGUSR2). We cannot use shutdown for the same reason. --- lib/yahns/server.rb | 51 ++++++++++++++++----------------------------------- 1 file changed, 16 insertions(+), 35 deletions(-) (limited to 'lib/yahns/server.rb') diff --git a/lib/yahns/server.rb b/lib/yahns/server.rb index 63b45e9..3bdb2c0 100644 --- a/lib/yahns/server.rb +++ b/lib/yahns/server.rb @@ -24,7 +24,6 @@ class Yahns::Server # :nodoc: @user = nil @queues = [] @wthr = [] - @athr = [] end def sqwakeup(sig) @@ -59,8 +58,8 @@ class Yahns::Server # :nodoc: self end - def drop_listeners - @listeners.each(&:close).clear + def drop_acceptors + @listeners.delete_if(&:ac_quit) end # replaces current listener set with +listeners+. This will @@ -77,16 +76,23 @@ class Yahns::Server # :nodoc: end set_names = listener_names(listeners) dead_names.concat(cur_names - set_names).uniq! - + dying = [] @listeners.delete_if do |io| if dead_names.include?(sock_name(io)) - (io.close rescue nil).nil? # true + if ac_quit + true + else + dying << io + false + end else set_server_sockopt(io, sock_opts(io)) false end end + dying.delete_if(&:ac_quit) while dying[0] + (set_names - cur_names).each { |addr| listen(addr) } end @@ -308,7 +314,7 @@ class Yahns::Server # :nodoc: qegg = ctx.qegg || @config.qeggs[:default] # acceptors feed the the queues - @athr << l.spawn_acceptor(@logger, ctx, queues[qegg]) + l.spawn_acceptor(@logger, ctx, queues[qegg]) end fdmap end @@ -321,7 +327,7 @@ class Yahns::Server # :nodoc: def quit_enter(alive) @logger.info "#{alive ? "grace" : "force"}fully exiting" - drop_listeners # close acceptors, we close epolls in quit_done + drop_acceptors # stop acceptors, we close epolls in quit_done exit(0) unless alive # drop connections immediately if signaled twice @config.config_listeners.each_value do |opts| ctx = opts[:yahns_app_ctx] or next @@ -349,8 +355,8 @@ class Yahns::Server # :nodoc: rescue => e Yahns::Log.exception(@logger, "quit finish", e) ensure - if (@wthr.size + @athr.size) > 0 - abort "BUG: still active wthr=#{@wthr.size} athr=#{@athr.size}" + if (@wthr.size + @listeners.size) > 0 + abort "BUG: still active wthr=#{@wthr.size} listeners=#{@listeners.size}" end end @@ -373,31 +379,6 @@ class Yahns::Server # :nodoc: alive end - # return true if any acceptor is still alive, false otherwise - def acceptors_alive - case RUBY_ENGINE - when "rbx" - false # XXX this needs to be diagnosed and fixed for rbx - else - # We should stop acceptors before stopping queues. Closing the - # acceptor socket is not sufficient to stop the last client of - # each thread fom being accepted and thrown into the queue. - @athr.delete_if do |t| - # blocking accept() does not wake up on, close() only EINTR and - # new connections. So Thread#run will send SIGVTALRM via - # pthread_kill, which will break it out of blocking syscalls - # like accept() - t.run rescue nil # if .run fails, thread is dead and joinable - begin - t.join(0.1) - rescue => e - Yahns::Log.exception(@logger, "acceptor shutdown", e) - true - end - end.size > 0 - end - end - # single-threaded only, this is overriden if @worker_processes is non-nil def join daemon_ready @@ -407,7 +388,7 @@ class Yahns::Server # :nodoc: alive = sp_sig_handle(alive) rescue => e Yahns::Log.exception(@logger, "main loop", e) - end while alive || acceptors_alive || fdmap.size > 0 + end while alive || drop_acceptors[0] || fdmap.size > 0 unlink_pid_safe(@pid) if @pid ensure quit_finish -- cgit v1.2.3-24-ge0c7