about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2013-10-19 23:43:59 +0000
committerEric Wong <normalperson@yhbt.net>2013-10-20 00:09:04 +0000
commitcd84e2ccbdf29b908c7d4711528d61bac05505bb (patch)
tree03e5cfd47e9a445094ccd559e2dd99095c0b422d
parentd4805769eb3174d25b429fa1faf5392b2669f354 (diff)
downloadyahns-cd84e2ccbdf29b908c7d4711528d61bac05505bb.tar.gz
Leaving running threads at exit seems to lead to occasional bugs at
finalization on Ruby 2.0.0.  This could be a bug with sleepy_penguin
or kgio, too, so I'll have to investigate further.  For now, we'll
just destroy the IOs associated with each queue and let the threads
die on their own.

This changes the QueueEgg internals a bit and I've removed the unit
test for QueueEgg now since the rest of the server already works
well (and QueueEgg internals may change even more).

Queues/worker threads no longer have their own logger, it seems like
excessive configurability/complexity since acceptors do not have
their own logger, either.  This logger only exists to log bugs in
yahns, not the application, so using the server logger is sufficient.
-rw-r--r--lib/yahns/acceptor.rb7
-rw-r--r--lib/yahns/config.rb1
-rw-r--r--lib/yahns/queue_egg.rb7
-rw-r--r--lib/yahns/queue_epoll.rb54
-rw-r--r--lib/yahns/server.rb27
-rw-r--r--lib/yahns/server_mp.rb2
-rw-r--r--test/test_queue.rb62
7 files changed, 55 insertions, 105 deletions
diff --git a/lib/yahns/acceptor.rb b/lib/yahns/acceptor.rb
index b5e7b0e..43d5fe8 100644
--- a/lib/yahns/acceptor.rb
+++ b/lib/yahns/acceptor.rb
@@ -2,8 +2,8 @@
 # License: GPLv3 or later (see COPYING for details)
 module Yahns::Acceptor # :nodoc:
   def spawn_acceptor(logger, client_class, queue)
-    accept_flags = Kgio::SOCK_NONBLOCK | Kgio::SOCK_CLOEXEC
     Thread.new do
+      accept_flags = Kgio::SOCK_NONBLOCK | Kgio::SOCK_CLOEXEC
       Thread.current.abort_on_exception = true
       qev_flags = client_class.superclass::QEV_FLAGS
       begin
@@ -21,8 +21,9 @@ module Yahns::Acceptor # :nodoc:
         queue.fdmap.desperate_expire_for(self, 5)
         sleep 1 # let other threads do some work
       rescue => e
-        Yahns::Log.exception(logger, "accept loop error", e) unless closed?
-      end until closed?
+        break if closed?
+        Yahns::Log.exception(logger, "accept loop", e)
+      end while true
     end
   end
 end
diff --git a/lib/yahns/config.rb b/lib/yahns/config.rb
index 61de74e..3f4bb90 100644
--- a/lib/yahns/config.rb
+++ b/lib/yahns/config.rb
@@ -343,7 +343,6 @@ class Yahns::Config # :nodoc:
       server.__send__("#{var}=", val) if val != :unset
     end
     queue(:default) if @qeggs.empty?
-    @qeggs.each_value { |qegg| qegg.logger ||= server.logger }
     @app_ctx.each { |app| app.logger ||= server.logger }
   end
 end
diff --git a/lib/yahns/queue_egg.rb b/lib/yahns/queue_egg.rb
index a2abc2f..dde72bf 100644
--- a/lib/yahns/queue_egg.rb
+++ b/lib/yahns/queue_egg.rb
@@ -4,20 +4,17 @@
 # this represents a Yahns::Queue before its vivified.  This only
 # lives in the parent process and should be clobbered after qc_vivify
 class Yahns::QueueEgg # :nodoc:
-  attr_writer :max_events, :worker_threads
-  attr_accessor :logger
+  attr_accessor :max_events, :worker_threads
 
   def initialize
     @max_events = 1 # 1 is good if worker_threads > 1
     @worker_threads = 7 # any default is wrong for most apps...
-    @logger = nil
   end
 
   # only call after forking
-  def qc_vivify(fdmap)
+  def vivify(fdmap)
     queue = Yahns::Queue.new
     queue.fdmap = fdmap
-    queue.spawn_worker_threads(@logger, @worker_threads, @max_events)
     queue
   end
 end
diff --git a/lib/yahns/queue_epoll.rb b/lib/yahns/queue_epoll.rb
index 5cb455b..1813581 100644
--- a/lib/yahns/queue_epoll.rb
+++ b/lib/yahns/queue_epoll.rb
@@ -22,35 +22,33 @@ class Yahns::Queue < SleepyPenguin::Epoll::IO # :nodoc:
   end
 
   # returns an array of infinitely running threads
-  def spawn_worker_threads(logger, worker_threads, max_events)
-    worker_threads.times do
-      Thread.new do
-        Thread.current[:yahns_rbuf] = ""
-        begin
-          epoll_wait(max_events) do |_, io| # don't care for flags for now
-            case rv = io.yahns_step
-            when :wait_readable
-              epoll_ctl(Epoll::CTL_MOD, io, QEV_RD)
-            when :wait_writable
-              epoll_ctl(Epoll::CTL_MOD, io, QEV_WR)
-            when :wait_readwrite
-              epoll_ctl(Epoll::CTL_MOD, io, QEV_RDWR)
-            when :ignore # only used by rack.hijack
-              @fdmap.decr
-            when nil
-              # this is be the ONLY place where we call IO#close on
-              # things inside the queue
-              io.close
-              @fdmap.decr
-            else
-              raise "BUG: #{io.inspect}#yahns_step returned: #{rv.inspect}"
-            end
+  def worker_thread(logger, max_events)
+    Thread.new do
+      Thread.current[:yahns_rbuf] = ""
+      begin
+        epoll_wait(max_events) do |_, io| # don't care for flags for now
+          case rv = io.yahns_step
+          when :wait_readable
+            epoll_ctl(Epoll::CTL_MOD, io, QEV_RD)
+          when :wait_writable
+            epoll_ctl(Epoll::CTL_MOD, io, QEV_WR)
+          when :wait_readwrite
+            epoll_ctl(Epoll::CTL_MOD, io, QEV_RDWR)
+          when :ignore # only used by rack.hijack
+            @fdmap.decr
+          when nil
+            # this is be the ONLY place where we call IO#close on
+            # things inside the queue
+            io.close
+            @fdmap.decr
+          else
+            raise "BUG: #{io.inspect}#yahns_step returned: #{rv.inspect}"
           end
-        rescue => e
-          break if (IOError === e || Errno::EBADF === e) && closed?
-          Yahns::Log.exception(logger, 'queue loop', e)
-        end while true
-      end
+        end
+      rescue => e
+        break if closed?
+        Yahns::Log.exception(logger, 'queue loop', e)
+      end while true
     end
   end
 end
diff --git a/lib/yahns/server.rb b/lib/yahns/server.rb
index e664293..0a964b3 100644
--- a/lib/yahns/server.rb
+++ b/lib/yahns/server.rb
@@ -20,6 +20,8 @@ class Yahns::Server # :nodoc:
     @pid = nil
     @worker_processes = nil
     @user = nil
+    @queues = []
+    @thr = []
   end
 
   def sqwakeup(sig)
@@ -262,21 +264,25 @@ class Yahns::Server # :nodoc:
 
     # initialize queues (epoll/kqueue) and associated worker threads
     queues = {}
-    @config.qeggs.each do |name, qegg|
-      queue = qegg.qc_vivify(fdmap) # worker threads run after this
-      queues[qegg] = queue
+    @config.qeggs.each do |name, qe|
+      queue = qe.vivify(fdmap)
+      qe.worker_threads.times do
+        @thr << queue.worker_thread(@logger, qe.max_events)
+      end
+      @queues << queue
+      queues[qe] = queue
     end
 
     # spin up applications (which are preload: false)
     @config.app_ctx.each { |ctx| ctx.after_fork_init }
 
-    # spin up acceptors, clients flow into worker queues after this
+    # spin up acceptor threads, clients flow into worker queues after this
     @listeners.each do |l|
       ctx = sock_opts(l)[:yahns_app_ctx]
       qegg = ctx.qegg || @config.qeggs[:default]
 
       # acceptors feed the the queues
-      l.spawn_acceptor(@logger, ctx, queues[qegg])
+      @thr << l.spawn_acceptor(@logger, ctx, queues[qegg])
     end
     fdmap
   end
@@ -288,7 +294,7 @@ class Yahns::Server # :nodoc:
   end
 
   def quit_enter(alive)
-    self.listeners = []
+    self.listeners = [] # close acceptors, we close epolls in quit_done
     exit(0) unless alive # drop connections immediately if signaled twice
     @config.config_listeners.each_value do |opts|
       ctx = opts[:yahns_app_ctx] or next
@@ -297,6 +303,13 @@ class Yahns::Server # :nodoc:
     false
   end
 
+  # drops all the the IO objects we have threads waiting on before exiting
+  def quit_finish
+    @queues.each(&:close)
+    self.listeners = [] # just in case, this is used in ensure
+    @thr.each(&:join)
+  end
+
   def sp_sig_handle(alive)
     @sev.kgio_wait_readable(alive ? nil : 0.01)
     @sev.yahns_step
@@ -327,5 +340,7 @@ class Yahns::Server # :nodoc:
       Yahns::Log.exception(@logger, "main loop", e)
     end while alive || fdmap.size > 0
     unlink_pid_safe(@pid) if @pid
+  ensure
+    quit_finish
   end
 end
diff --git a/lib/yahns/server_mp.rb b/lib/yahns/server_mp.rb
index ab0559d..6e65bd4 100644
--- a/lib/yahns/server_mp.rb
+++ b/lib/yahns/server_mp.rb
@@ -164,6 +164,8 @@ module Yahns::ServerMP # :nodoc:
       Yahns::Log.exception(@logger, "main worker loop", e)
     end while alive || fdmap.size > 0
     exit
+  ensure
+    quit_finish
   end
 
   def mp_sig_handle(worker, alive)
diff --git a/test/test_queue.rb b/test/test_queue.rb
deleted file mode 100644
index cdb9ade..0000000
--- a/test/test_queue.rb
+++ /dev/null
@@ -1,62 +0,0 @@
-# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> and all contributors
-# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
-require_relative 'helper'
-require 'timeout'
-require 'stringio'
-
-class TestQueue < Testcase
-  parallelize_me!
-
-  def setup
-    @q = Yahns::Queue.new
-    @err = StringIO.new
-    @logger = Logger.new(@err)
-    @q.fdmap = @fdmap = Yahns::Fdmap.new(@logger, 0.5)
-    assert @q.close_on_exec?
-  end
-
-  def test_queue
-    r, w = IO.pipe
-    assert_equal 0, @fdmap.size
-    @q.queue_add(r, Yahns::Queue::QEV_RD)
-    assert_equal 1, @fdmap.size
-    def r.yahns_step
-      begin
-        case read_nonblock(11)
-        when "ignore"
-          return :ignore
-        end
-      rescue Errno::EAGAIN
-        return :wait_readable
-      rescue EOFError
-        return nil
-      end while true
-    end
-    w.write('.')
-    Timeout.timeout(10) do
-      Thread.pass until r.nread > 0
-      @q.spawn_worker_threads(@logger, 1, 1)
-      Thread.pass until r.nread == 0
-
-      assert_equal 1, @fdmap.size
-      w.write("ignore")
-      Thread.pass until r.nread == 0
-      Thread.pass until @fdmap.size == 0
-
-      assert_raises(Errno::EEXIST) {
-        @q.queue_add(r, Yahns::Queue::QEV_RD)
-      }
-      assert_equal 1, @fdmap.size
-      @q.epoll_ctl(SleepyPenguin::Epoll::CTL_MOD, r, Yahns::Queue::QEV_RD)
-      w.close
-      Thread.pass until @fdmap.size == 0
-    end
-    assert r.closed?
-  ensure
-    [ r, w ].each { |io| io.close unless io.closed? }
-  end
-
-  def teardown
-    @q.close
-  end
-end