about summary refs log tree commit homepage
diff options
context:
space:
mode:
-rw-r--r--lib/yahns/acceptor.rb7
-rw-r--r--lib/yahns/config.rb1
-rw-r--r--lib/yahns/queue_egg.rb7
-rw-r--r--lib/yahns/queue_epoll.rb54
-rw-r--r--lib/yahns/server.rb27
-rw-r--r--lib/yahns/server_mp.rb2
-rw-r--r--test/test_queue.rb62
7 files changed, 55 insertions, 105 deletions
diff --git a/lib/yahns/acceptor.rb b/lib/yahns/acceptor.rb
index b5e7b0e..43d5fe8 100644
--- a/lib/yahns/acceptor.rb
+++ b/lib/yahns/acceptor.rb
@@ -2,8 +2,8 @@
 # License: GPLv3 or later (see COPYING for details)
 module Yahns::Acceptor # :nodoc:
   def spawn_acceptor(logger, client_class, queue)
-    accept_flags = Kgio::SOCK_NONBLOCK | Kgio::SOCK_CLOEXEC
     Thread.new do
+      accept_flags = Kgio::SOCK_NONBLOCK | Kgio::SOCK_CLOEXEC
       Thread.current.abort_on_exception = true
       qev_flags = client_class.superclass::QEV_FLAGS
       begin
@@ -21,8 +21,9 @@ module Yahns::Acceptor # :nodoc:
         queue.fdmap.desperate_expire_for(self, 5)
         sleep 1 # let other threads do some work
       rescue => e
-        Yahns::Log.exception(logger, "accept loop error", e) unless closed?
-      end until closed?
+        break if closed?
+        Yahns::Log.exception(logger, "accept loop", e)
+      end while true
     end
   end
 end
diff --git a/lib/yahns/config.rb b/lib/yahns/config.rb
index 61de74e..3f4bb90 100644
--- a/lib/yahns/config.rb
+++ b/lib/yahns/config.rb
@@ -343,7 +343,6 @@ class Yahns::Config # :nodoc:
       server.__send__("#{var}=", val) if val != :unset
     end
     queue(:default) if @qeggs.empty?
-    @qeggs.each_value { |qegg| qegg.logger ||= server.logger }
     @app_ctx.each { |app| app.logger ||= server.logger }
   end
 end
diff --git a/lib/yahns/queue_egg.rb b/lib/yahns/queue_egg.rb
index a2abc2f..dde72bf 100644
--- a/lib/yahns/queue_egg.rb
+++ b/lib/yahns/queue_egg.rb
@@ -4,20 +4,17 @@
 # this represents a Yahns::Queue before its vivified.  This only
 # lives in the parent process and should be clobbered after qc_vivify
 class Yahns::QueueEgg # :nodoc:
-  attr_writer :max_events, :worker_threads
-  attr_accessor :logger
+  attr_accessor :max_events, :worker_threads
 
   def initialize
     @max_events = 1 # 1 is good if worker_threads > 1
     @worker_threads = 7 # any default is wrong for most apps...
-    @logger = nil
   end
 
   # only call after forking
-  def qc_vivify(fdmap)
+  def vivify(fdmap)
     queue = Yahns::Queue.new
     queue.fdmap = fdmap
-    queue.spawn_worker_threads(@logger, @worker_threads, @max_events)
     queue
   end
 end
diff --git a/lib/yahns/queue_epoll.rb b/lib/yahns/queue_epoll.rb
index 5cb455b..1813581 100644
--- a/lib/yahns/queue_epoll.rb
+++ b/lib/yahns/queue_epoll.rb
@@ -22,35 +22,33 @@ class Yahns::Queue < SleepyPenguin::Epoll::IO # :nodoc:
   end
 
   # returns an array of infinitely running threads
-  def spawn_worker_threads(logger, worker_threads, max_events)
-    worker_threads.times do
-      Thread.new do
-        Thread.current[:yahns_rbuf] = ""
-        begin
-          epoll_wait(max_events) do |_, io| # don't care for flags for now
-            case rv = io.yahns_step
-            when :wait_readable
-              epoll_ctl(Epoll::CTL_MOD, io, QEV_RD)
-            when :wait_writable
-              epoll_ctl(Epoll::CTL_MOD, io, QEV_WR)
-            when :wait_readwrite
-              epoll_ctl(Epoll::CTL_MOD, io, QEV_RDWR)
-            when :ignore # only used by rack.hijack
-              @fdmap.decr
-            when nil
-              # this is be the ONLY place where we call IO#close on
-              # things inside the queue
-              io.close
-              @fdmap.decr
-            else
-              raise "BUG: #{io.inspect}#yahns_step returned: #{rv.inspect}"
-            end
+  def worker_thread(logger, max_events)
+    Thread.new do
+      Thread.current[:yahns_rbuf] = ""
+      begin
+        epoll_wait(max_events) do |_, io| # don't care for flags for now
+          case rv = io.yahns_step
+          when :wait_readable
+            epoll_ctl(Epoll::CTL_MOD, io, QEV_RD)
+          when :wait_writable
+            epoll_ctl(Epoll::CTL_MOD, io, QEV_WR)
+          when :wait_readwrite
+            epoll_ctl(Epoll::CTL_MOD, io, QEV_RDWR)
+          when :ignore # only used by rack.hijack
+            @fdmap.decr
+          when nil
+            # this is be the ONLY place where we call IO#close on
+            # things inside the queue
+            io.close
+            @fdmap.decr
+          else
+            raise "BUG: #{io.inspect}#yahns_step returned: #{rv.inspect}"
           end
-        rescue => e
-          break if (IOError === e || Errno::EBADF === e) && closed?
-          Yahns::Log.exception(logger, 'queue loop', e)
-        end while true
-      end
+        end
+      rescue => e
+        break if closed?
+        Yahns::Log.exception(logger, 'queue loop', e)
+      end while true
     end
   end
 end
diff --git a/lib/yahns/server.rb b/lib/yahns/server.rb
index e664293..0a964b3 100644
--- a/lib/yahns/server.rb
+++ b/lib/yahns/server.rb
@@ -20,6 +20,8 @@ class Yahns::Server # :nodoc:
     @pid = nil
     @worker_processes = nil
     @user = nil
+    @queues = []
+    @thr = []
   end
 
   def sqwakeup(sig)
@@ -262,21 +264,25 @@ class Yahns::Server # :nodoc:
 
     # initialize queues (epoll/kqueue) and associated worker threads
     queues = {}
-    @config.qeggs.each do |name, qegg|
-      queue = qegg.qc_vivify(fdmap) # worker threads run after this
-      queues[qegg] = queue
+    @config.qeggs.each do |name, qe|
+      queue = qe.vivify(fdmap)
+      qe.worker_threads.times do
+        @thr << queue.worker_thread(@logger, qe.max_events)
+      end
+      @queues << queue
+      queues[qe] = queue
     end
 
     # spin up applications (which are preload: false)
     @config.app_ctx.each { |ctx| ctx.after_fork_init }
 
-    # spin up acceptors, clients flow into worker queues after this
+    # spin up acceptor threads, clients flow into worker queues after this
     @listeners.each do |l|
       ctx = sock_opts(l)[:yahns_app_ctx]
       qegg = ctx.qegg || @config.qeggs[:default]
 
       # acceptors feed the the queues
-      l.spawn_acceptor(@logger, ctx, queues[qegg])
+      @thr << l.spawn_acceptor(@logger, ctx, queues[qegg])
     end
     fdmap
   end
@@ -288,7 +294,7 @@ class Yahns::Server # :nodoc:
   end
 
   def quit_enter(alive)
-    self.listeners = []
+    self.listeners = [] # close acceptors, we close epolls in quit_done
     exit(0) unless alive # drop connections immediately if signaled twice
     @config.config_listeners.each_value do |opts|
       ctx = opts[:yahns_app_ctx] or next
@@ -297,6 +303,13 @@ class Yahns::Server # :nodoc:
     false
   end
 
+  # drops all the the IO objects we have threads waiting on before exiting
+  def quit_finish
+    @queues.each(&:close)
+    self.listeners = [] # just in case, this is used in ensure
+    @thr.each(&:join)
+  end
+
   def sp_sig_handle(alive)
     @sev.kgio_wait_readable(alive ? nil : 0.01)
     @sev.yahns_step
@@ -327,5 +340,7 @@ class Yahns::Server # :nodoc:
       Yahns::Log.exception(@logger, "main loop", e)
     end while alive || fdmap.size > 0
     unlink_pid_safe(@pid) if @pid
+  ensure
+    quit_finish
   end
 end
diff --git a/lib/yahns/server_mp.rb b/lib/yahns/server_mp.rb
index ab0559d..6e65bd4 100644
--- a/lib/yahns/server_mp.rb
+++ b/lib/yahns/server_mp.rb
@@ -164,6 +164,8 @@ module Yahns::ServerMP # :nodoc:
       Yahns::Log.exception(@logger, "main worker loop", e)
     end while alive || fdmap.size > 0
     exit
+  ensure
+    quit_finish
   end
 
   def mp_sig_handle(worker, alive)
diff --git a/test/test_queue.rb b/test/test_queue.rb
deleted file mode 100644
index cdb9ade..0000000
--- a/test/test_queue.rb
+++ /dev/null
@@ -1,62 +0,0 @@
-# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> and all contributors
-# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
-require_relative 'helper'
-require 'timeout'
-require 'stringio'
-
-class TestQueue < Testcase
-  parallelize_me!
-
-  def setup
-    @q = Yahns::Queue.new
-    @err = StringIO.new
-    @logger = Logger.new(@err)
-    @q.fdmap = @fdmap = Yahns::Fdmap.new(@logger, 0.5)
-    assert @q.close_on_exec?
-  end
-
-  def test_queue
-    r, w = IO.pipe
-    assert_equal 0, @fdmap.size
-    @q.queue_add(r, Yahns::Queue::QEV_RD)
-    assert_equal 1, @fdmap.size
-    def r.yahns_step
-      begin
-        case read_nonblock(11)
-        when "ignore"
-          return :ignore
-        end
-      rescue Errno::EAGAIN
-        return :wait_readable
-      rescue EOFError
-        return nil
-      end while true
-    end
-    w.write('.')
-    Timeout.timeout(10) do
-      Thread.pass until r.nread > 0
-      @q.spawn_worker_threads(@logger, 1, 1)
-      Thread.pass until r.nread == 0
-
-      assert_equal 1, @fdmap.size
-      w.write("ignore")
-      Thread.pass until r.nread == 0
-      Thread.pass until @fdmap.size == 0
-
-      assert_raises(Errno::EEXIST) {
-        @q.queue_add(r, Yahns::Queue::QEV_RD)
-      }
-      assert_equal 1, @fdmap.size
-      @q.epoll_ctl(SleepyPenguin::Epoll::CTL_MOD, r, Yahns::Queue::QEV_RD)
-      w.close
-      Thread.pass until @fdmap.size == 0
-    end
-    assert r.closed?
-  ensure
-    [ r, w ].each { |io| io.close unless io.closed? }
-  end
-
-  def teardown
-    @q.close
-  end
-end