about summary refs log tree commit homepage
path: root/lib/yahns/server.rb
diff options
context:
space:
mode:
Diffstat (limited to 'lib/yahns/server.rb')
-rw-r--r--lib/yahns/server.rb27
1 files changed, 21 insertions, 6 deletions
diff --git a/lib/yahns/server.rb b/lib/yahns/server.rb
index e664293..0a964b3 100644
--- a/lib/yahns/server.rb
+++ b/lib/yahns/server.rb
@@ -20,6 +20,8 @@ class Yahns::Server # :nodoc:
     @pid = nil
     @worker_processes = nil
     @user = nil
+    @queues = []
+    @thr = []
   end
 
   def sqwakeup(sig)
@@ -262,21 +264,25 @@ class Yahns::Server # :nodoc:
 
     # initialize queues (epoll/kqueue) and associated worker threads
     queues = {}
-    @config.qeggs.each do |name, qegg|
-      queue = qegg.qc_vivify(fdmap) # worker threads run after this
-      queues[qegg] = queue
+    @config.qeggs.each do |name, qe|
+      queue = qe.vivify(fdmap)
+      qe.worker_threads.times do
+        @thr << queue.worker_thread(@logger, qe.max_events)
+      end
+      @queues << queue
+      queues[qe] = queue
     end
 
     # spin up applications (which are preload: false)
     @config.app_ctx.each { |ctx| ctx.after_fork_init }
 
-    # spin up acceptors, clients flow into worker queues after this
+    # spin up acceptor threads, clients flow into worker queues after this
     @listeners.each do |l|
       ctx = sock_opts(l)[:yahns_app_ctx]
       qegg = ctx.qegg || @config.qeggs[:default]
 
       # acceptors feed the the queues
-      l.spawn_acceptor(@logger, ctx, queues[qegg])
+      @thr << l.spawn_acceptor(@logger, ctx, queues[qegg])
     end
     fdmap
   end
@@ -288,7 +294,7 @@ class Yahns::Server # :nodoc:
   end
 
   def quit_enter(alive)
-    self.listeners = []
+    self.listeners = [] # close acceptors, we close epolls in quit_done
     exit(0) unless alive # drop connections immediately if signaled twice
     @config.config_listeners.each_value do |opts|
       ctx = opts[:yahns_app_ctx] or next
@@ -297,6 +303,13 @@ class Yahns::Server # :nodoc:
     false
   end
 
+  # drops all the the IO objects we have threads waiting on before exiting
+  def quit_finish
+    @queues.each(&:close)
+    self.listeners = [] # just in case, this is used in ensure
+    @thr.each(&:join)
+  end
+
   def sp_sig_handle(alive)
     @sev.kgio_wait_readable(alive ? nil : 0.01)
     @sev.yahns_step
@@ -327,5 +340,7 @@ class Yahns::Server # :nodoc:
       Yahns::Log.exception(@logger, "main loop", e)
     end while alive || fdmap.size > 0
     unlink_pid_safe(@pid) if @pid
+  ensure
+    quit_finish
   end
 end