about summary refs log tree commit homepage
path: root/lib/yahns/server.rb
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2013-10-20 09:25:54 +0000
committerEric Wong <e@80x24.org>2013-10-21 04:09:20 +0000
commitd3335cbaee6c47880b3bb5a372b966842edfeade (patch)
tree8255bc8670535f34ca57e33ac05272bae51c9ace /lib/yahns/server.rb
parent92172ac2c4e094cb2d8f8669a1f7666501425263 (diff)
downloadyahns-d3335cbaee6c47880b3bb5a372b966842edfeade.tar.gz
Acceptors require closing the descriptor, first, and then doing a
(nasty) cross-thread exception to kick the thread out of the blocking
accept via Thread#run (pthread_kill SIGVTALRM).  We cannot do what we're
doing with epoll with acceptors because the accept socket is shared
across processes.  We will also NEVER be using non-blocking accept, as
it's more important we fairly distribute connections between tasks when
we're not shutting the server down.

The queue worker threads are much easier to kill off :)  We can simply
inject a new QueueQuitter object as a Level-Triggering epoll watch,
activate it, and let it wreak havok on all the worker threads from a
single event activation.

rb_thread_fd_close is convenient, but expensive with many threads, so be
prepared for more systems without it.  This is for Rubinius compatibility.

Yes, we are actually using Level-Triggered epoll here (despite the
non-shutdown pieces of our code being based around EPOLLONESHOT).
Diffstat (limited to 'lib/yahns/server.rb')
-rw-r--r--lib/yahns/server.rb49
1 files changed, 40 insertions, 9 deletions
diff --git a/lib/yahns/server.rb b/lib/yahns/server.rb
index 0a964b3..9bb8067 100644
--- a/lib/yahns/server.rb
+++ b/lib/yahns/server.rb
@@ -1,6 +1,8 @@
 # -*- encoding: binary -*-
 # Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> and all contributors
 # License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+require_relative 'queue_quitter'
+
 class Yahns::Server # :nodoc:
   QUEUE_SIGS = [ :WINCH, :QUIT, :INT, :TERM, :USR1, :USR2, :HUP, :TTIN, :TTOU ]
   attr_accessor :daemon_pipe
@@ -21,7 +23,8 @@ class Yahns::Server # :nodoc:
     @worker_processes = nil
     @user = nil
     @queues = []
-    @thr = []
+    @wthr = []
+    @athr = []
   end
 
   def sqwakeup(sig)
@@ -56,6 +59,10 @@ class Yahns::Server # :nodoc:
     self
   end
 
+  def drop_listeners
+    @listeners.each(&:close).clear
+  end
+
   # replaces current listener set with +listeners+.  This will
   # close the socket if it will not exist in the new listener set
   def listeners=(listeners)
@@ -267,7 +274,7 @@ class Yahns::Server # :nodoc:
     @config.qeggs.each do |name, qe|
       queue = qe.vivify(fdmap)
       qe.worker_threads.times do
-        @thr << queue.worker_thread(@logger, qe.max_events)
+        @wthr << queue.worker_thread(@logger, qe.max_events)
       end
       @queues << queue
       queues[qe] = queue
@@ -282,7 +289,7 @@ class Yahns::Server # :nodoc:
       qegg = ctx.qegg || @config.qeggs[:default]
 
       # acceptors feed the the queues
-      @thr << l.spawn_acceptor(@logger, ctx, queues[qegg])
+      @athr << l.spawn_acceptor(@logger, ctx, queues[qegg])
     end
     fdmap
   end
@@ -294,7 +301,8 @@ class Yahns::Server # :nodoc:
   end
 
   def quit_enter(alive)
-    self.listeners = [] # close acceptors, we close epolls in quit_done
+    @logger.info "#{alive ? :grace : :force}fully exiting"
+    drop_listeners # close acceptors, we close epolls in quit_done
     exit(0) unless alive # drop connections immediately if signaled twice
     @config.config_listeners.each_value do |opts|
       ctx = opts[:yahns_app_ctx] or next
@@ -304,10 +312,21 @@ class Yahns::Server # :nodoc:
   end
 
   # drops all the the IO objects we have threads waiting on before exiting
+  # This just injects the QueueQuitter object which acts like a
+  # monkey wrench thrown into a perfectly good engine :)
   def quit_finish
-    @queues.each(&:close)
-    self.listeners = [] # just in case, this is used in ensure
-    @thr.each(&:join)
+    quitter = Yahns::QueueQuitter.new
+    quitter.sev_signal # this will be level trigger for sustained damage!
+
+    # throw the monkey wrench into the worker threads
+    @queues.each { |q| q.queue_add(quitter, Yahns::Queue::QEV_QUIT) }
+
+    # watch the monkey wrench destroy all the threads!
+    @wthr.delete_if { |t| t.join(0.01) } while @wthr[0]
+
+    # cleanup, our job is done
+    @queues.each(&:close).clear
+    quitter.close
   end
 
   def sp_sig_handle(alive)
@@ -322,13 +341,25 @@ class Yahns::Server # :nodoc:
       reexec
     when :HUP
       reexec
-      return false
+      return quit_enter(alive)
     when :TTIN, :TTOU, :WINCH
       @logger.info("SIG#{sig} ignored in single-process mode")
     end
     alive
   end
 
+  def acceptors_alive
+    @athr.delete_if do |t|
+      # blocking accept() does not wake up on, close() only EINTR and
+      # new connections.  So Thread#run will send SIGVTALRM via
+      # pthread_kill, which will break it out of blocking syscalls
+      # like accept()
+      # if t.run fails, thread is dead
+      t.run rescue nil
+      t.join(1)
+    end.size > 0
+  end
+
   # single-threaded only, this is overriden if @worker_processes is non-nil
   def join
     daemon_ready
@@ -338,7 +369,7 @@ class Yahns::Server # :nodoc:
       alive = sp_sig_handle(alive)
     rescue => e
       Yahns::Log.exception(@logger, "main loop", e)
-    end while alive || fdmap.size > 0
+    end while alive || acceptors_alive || fdmap.size > 0
     unlink_pid_safe(@pid) if @pid
   ensure
     quit_finish