about summary refs log tree commit homepage
path: root/lib/yahns/server.rb
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2013-10-22 01:23:23 +0000
committerEric Wong <e@80x24.org>2013-10-22 01:44:04 +0000
commit35a72997899616b6d453e6dbf6a7d4477103dd1a (patch)
treeae3c0b7b51a6c4067b814e3c399cdbc3303b9305 /lib/yahns/server.rb
parent99b99a01060cba03ef6b8d05e23294d196657865 (diff)
downloadyahns-35a72997899616b6d453e6dbf6a7d4477103dd1a.tar.gz
Calling close on the socket while accept4 is running on a different
thread can be problematic on Rubinius and some versions of MRI.

So we will use a thread-local variable, continuously issue new
connections to our own socket and rely on the round-robin behavior
of accept4 to eventually get our acceptor thread to wakeup.

We must continuously issue new connections because some connections
may be going to a new processes (from SIGUSR2).  We cannot use
shutdown for the same reason.
Diffstat (limited to 'lib/yahns/server.rb')
-rw-r--r--lib/yahns/server.rb51
1 files changed, 16 insertions, 35 deletions
diff --git a/lib/yahns/server.rb b/lib/yahns/server.rb
index 63b45e9..3bdb2c0 100644
--- a/lib/yahns/server.rb
+++ b/lib/yahns/server.rb
@@ -24,7 +24,6 @@ class Yahns::Server # :nodoc:
     @user = nil
     @queues = []
     @wthr = []
-    @athr = []
   end
 
   def sqwakeup(sig)
@@ -59,8 +58,8 @@ class Yahns::Server # :nodoc:
     self
   end
 
-  def drop_listeners
-    @listeners.each(&:close).clear
+  def drop_acceptors
+    @listeners.delete_if(&:ac_quit)
   end
 
   # replaces current listener set with +listeners+.  This will
@@ -77,16 +76,23 @@ class Yahns::Server # :nodoc:
     end
     set_names = listener_names(listeners)
     dead_names.concat(cur_names - set_names).uniq!
-
+    dying = []
     @listeners.delete_if do |io|
       if dead_names.include?(sock_name(io))
-        (io.close rescue nil).nil? # true
+        if ac_quit
+          true
+        else
+          dying << io
+          false
+        end
       else
         set_server_sockopt(io, sock_opts(io))
         false
       end
     end
 
+    dying.delete_if(&:ac_quit) while dying[0]
+
     (set_names - cur_names).each { |addr| listen(addr) }
   end
 
@@ -308,7 +314,7 @@ class Yahns::Server # :nodoc:
       qegg = ctx.qegg || @config.qeggs[:default]
 
       # acceptors feed the the queues
-      @athr << l.spawn_acceptor(@logger, ctx, queues[qegg])
+      l.spawn_acceptor(@logger, ctx, queues[qegg])
     end
     fdmap
   end
@@ -321,7 +327,7 @@ class Yahns::Server # :nodoc:
 
   def quit_enter(alive)
     @logger.info "#{alive ? "grace" : "force"}fully exiting"
-    drop_listeners # close acceptors, we close epolls in quit_done
+    drop_acceptors # stop acceptors, we close epolls in quit_done
     exit(0) unless alive # drop connections immediately if signaled twice
     @config.config_listeners.each_value do |opts|
       ctx = opts[:yahns_app_ctx] or next
@@ -349,8 +355,8 @@ class Yahns::Server # :nodoc:
   rescue => e
     Yahns::Log.exception(@logger, "quit finish", e)
   ensure
-    if (@wthr.size + @athr.size) > 0
-      abort "BUG: still active wthr=#{@wthr.size} athr=#{@athr.size}"
+    if (@wthr.size + @listeners.size) > 0
+      abort "BUG: still active wthr=#{@wthr.size} listeners=#{@listeners.size}"
     end
   end
 
@@ -373,31 +379,6 @@ class Yahns::Server # :nodoc:
     alive
   end
 
-  # return true if any acceptor is still alive, false otherwise
-  def acceptors_alive
-    case RUBY_ENGINE
-    when "rbx"
-      false # XXX this needs to be diagnosed and fixed for rbx
-    else
-      # We should stop acceptors before stopping queues.  Closing the
-      # acceptor socket is not sufficient to stop the last client of
-      # each thread fom being accepted and thrown into the queue.
-      @athr.delete_if do |t|
-        # blocking accept() does not wake up on, close() only EINTR and
-        # new connections.  So Thread#run will send SIGVTALRM via
-        # pthread_kill, which will break it out of blocking syscalls
-        # like accept()
-        t.run rescue nil # if .run fails, thread is dead and joinable
-        begin
-          t.join(0.1)
-        rescue => e
-          Yahns::Log.exception(@logger, "acceptor shutdown", e)
-          true
-        end
-      end.size > 0
-    end
-  end
-
   # single-threaded only, this is overriden if @worker_processes is non-nil
   def join
     daemon_ready
@@ -407,7 +388,7 @@ class Yahns::Server # :nodoc:
       alive = sp_sig_handle(alive)
     rescue => e
       Yahns::Log.exception(@logger, "main loop", e)
-    end while alive || acceptors_alive || fdmap.size > 0
+    end while alive || drop_acceptors[0] || fdmap.size > 0
     unlink_pid_safe(@pid) if @pid
   ensure
     quit_finish