diff options
author | Eric Wong <e@80x24.org> | 2013-10-19 23:43:59 +0000 |
---|---|---|
committer | Eric Wong <normalperson@yhbt.net> | 2013-10-20 00:09:04 +0000 |
commit | cd84e2ccbdf29b908c7d4711528d61bac05505bb (patch) | |
tree | 03e5cfd47e9a445094ccd559e2dd99095c0b422d | |
parent | d4805769eb3174d25b429fa1faf5392b2669f354 (diff) | |
download | yahns-cd84e2ccbdf29b908c7d4711528d61bac05505bb.tar.gz |
Leaving running threads at exit seems to lead to occasional bugs at finalization on Ruby 2.0.0. This could be a bug with sleepy_penguin or kgio, too, so I'll have to investigate further. For now, we'll just destroy the IOs associated with each queue and let the threads die on their own. This changes the QueueEgg internals a bit and I've removed the unit test for QueueEgg now since the rest of the server already works well (and QueueEgg internals may change even more). Queues/worker threads no longer have their own logger, it seems like excessive configurability/complexity since acceptors do not have their own logger, either. This logger only exists to log bugs in yahns, not the application, so using the server logger is sufficient.
-rw-r--r-- | lib/yahns/acceptor.rb | 7 | ||||
-rw-r--r-- | lib/yahns/config.rb | 1 | ||||
-rw-r--r-- | lib/yahns/queue_egg.rb | 7 | ||||
-rw-r--r-- | lib/yahns/queue_epoll.rb | 54 | ||||
-rw-r--r-- | lib/yahns/server.rb | 27 | ||||
-rw-r--r-- | lib/yahns/server_mp.rb | 2 | ||||
-rw-r--r-- | test/test_queue.rb | 62 |
7 files changed, 55 insertions, 105 deletions
diff --git a/lib/yahns/acceptor.rb b/lib/yahns/acceptor.rb index b5e7b0e..43d5fe8 100644 --- a/lib/yahns/acceptor.rb +++ b/lib/yahns/acceptor.rb @@ -2,8 +2,8 @@ # License: GPLv3 or later (see COPYING for details) module Yahns::Acceptor # :nodoc: def spawn_acceptor(logger, client_class, queue) - accept_flags = Kgio::SOCK_NONBLOCK | Kgio::SOCK_CLOEXEC Thread.new do + accept_flags = Kgio::SOCK_NONBLOCK | Kgio::SOCK_CLOEXEC Thread.current.abort_on_exception = true qev_flags = client_class.superclass::QEV_FLAGS begin @@ -21,8 +21,9 @@ module Yahns::Acceptor # :nodoc: queue.fdmap.desperate_expire_for(self, 5) sleep 1 # let other threads do some work rescue => e - Yahns::Log.exception(logger, "accept loop error", e) unless closed? - end until closed? + break if closed? + Yahns::Log.exception(logger, "accept loop", e) + end while true end end end diff --git a/lib/yahns/config.rb b/lib/yahns/config.rb index 61de74e..3f4bb90 100644 --- a/lib/yahns/config.rb +++ b/lib/yahns/config.rb @@ -343,7 +343,6 @@ class Yahns::Config # :nodoc: server.__send__("#{var}=", val) if val != :unset end queue(:default) if @qeggs.empty? - @qeggs.each_value { |qegg| qegg.logger ||= server.logger } @app_ctx.each { |app| app.logger ||= server.logger } end end diff --git a/lib/yahns/queue_egg.rb b/lib/yahns/queue_egg.rb index a2abc2f..dde72bf 100644 --- a/lib/yahns/queue_egg.rb +++ b/lib/yahns/queue_egg.rb @@ -4,20 +4,17 @@ # this represents a Yahns::Queue before its vivified. This only # lives in the parent process and should be clobbered after qc_vivify class Yahns::QueueEgg # :nodoc: - attr_writer :max_events, :worker_threads - attr_accessor :logger + attr_accessor :max_events, :worker_threads def initialize @max_events = 1 # 1 is good if worker_threads > 1 @worker_threads = 7 # any default is wrong for most apps... - @logger = nil end # only call after forking - def qc_vivify(fdmap) + def vivify(fdmap) queue = Yahns::Queue.new queue.fdmap = fdmap - queue.spawn_worker_threads(@logger, @worker_threads, @max_events) queue end end diff --git a/lib/yahns/queue_epoll.rb b/lib/yahns/queue_epoll.rb index 5cb455b..1813581 100644 --- a/lib/yahns/queue_epoll.rb +++ b/lib/yahns/queue_epoll.rb @@ -22,35 +22,33 @@ class Yahns::Queue < SleepyPenguin::Epoll::IO # :nodoc: end # returns an array of infinitely running threads - def spawn_worker_threads(logger, worker_threads, max_events) - worker_threads.times do - Thread.new do - Thread.current[:yahns_rbuf] = "" - begin - epoll_wait(max_events) do |_, io| # don't care for flags for now - case rv = io.yahns_step - when :wait_readable - epoll_ctl(Epoll::CTL_MOD, io, QEV_RD) - when :wait_writable - epoll_ctl(Epoll::CTL_MOD, io, QEV_WR) - when :wait_readwrite - epoll_ctl(Epoll::CTL_MOD, io, QEV_RDWR) - when :ignore # only used by rack.hijack - @fdmap.decr - when nil - # this is be the ONLY place where we call IO#close on - # things inside the queue - io.close - @fdmap.decr - else - raise "BUG: #{io.inspect}#yahns_step returned: #{rv.inspect}" - end + def worker_thread(logger, max_events) + Thread.new do + Thread.current[:yahns_rbuf] = "" + begin + epoll_wait(max_events) do |_, io| # don't care for flags for now + case rv = io.yahns_step + when :wait_readable + epoll_ctl(Epoll::CTL_MOD, io, QEV_RD) + when :wait_writable + epoll_ctl(Epoll::CTL_MOD, io, QEV_WR) + when :wait_readwrite + epoll_ctl(Epoll::CTL_MOD, io, QEV_RDWR) + when :ignore # only used by rack.hijack + @fdmap.decr + when nil + # this is be the ONLY place where we call IO#close on + # things inside the queue + io.close + @fdmap.decr + else + raise "BUG: #{io.inspect}#yahns_step returned: #{rv.inspect}" end - rescue => e - break if (IOError === e || Errno::EBADF === e) && closed? - Yahns::Log.exception(logger, 'queue loop', e) - end while true - end + end + rescue => e + break if closed? + Yahns::Log.exception(logger, 'queue loop', e) + end while true end end end diff --git a/lib/yahns/server.rb b/lib/yahns/server.rb index e664293..0a964b3 100644 --- a/lib/yahns/server.rb +++ b/lib/yahns/server.rb @@ -20,6 +20,8 @@ class Yahns::Server # :nodoc: @pid = nil @worker_processes = nil @user = nil + @queues = [] + @thr = [] end def sqwakeup(sig) @@ -262,21 +264,25 @@ class Yahns::Server # :nodoc: # initialize queues (epoll/kqueue) and associated worker threads queues = {} - @config.qeggs.each do |name, qegg| - queue = qegg.qc_vivify(fdmap) # worker threads run after this - queues[qegg] = queue + @config.qeggs.each do |name, qe| + queue = qe.vivify(fdmap) + qe.worker_threads.times do + @thr << queue.worker_thread(@logger, qe.max_events) + end + @queues << queue + queues[qe] = queue end # spin up applications (which are preload: false) @config.app_ctx.each { |ctx| ctx.after_fork_init } - # spin up acceptors, clients flow into worker queues after this + # spin up acceptor threads, clients flow into worker queues after this @listeners.each do |l| ctx = sock_opts(l)[:yahns_app_ctx] qegg = ctx.qegg || @config.qeggs[:default] # acceptors feed the the queues - l.spawn_acceptor(@logger, ctx, queues[qegg]) + @thr << l.spawn_acceptor(@logger, ctx, queues[qegg]) end fdmap end @@ -288,7 +294,7 @@ class Yahns::Server # :nodoc: end def quit_enter(alive) - self.listeners = [] + self.listeners = [] # close acceptors, we close epolls in quit_done exit(0) unless alive # drop connections immediately if signaled twice @config.config_listeners.each_value do |opts| ctx = opts[:yahns_app_ctx] or next @@ -297,6 +303,13 @@ class Yahns::Server # :nodoc: false end + # drops all the the IO objects we have threads waiting on before exiting + def quit_finish + @queues.each(&:close) + self.listeners = [] # just in case, this is used in ensure + @thr.each(&:join) + end + def sp_sig_handle(alive) @sev.kgio_wait_readable(alive ? nil : 0.01) @sev.yahns_step @@ -327,5 +340,7 @@ class Yahns::Server # :nodoc: Yahns::Log.exception(@logger, "main loop", e) end while alive || fdmap.size > 0 unlink_pid_safe(@pid) if @pid + ensure + quit_finish end end diff --git a/lib/yahns/server_mp.rb b/lib/yahns/server_mp.rb index ab0559d..6e65bd4 100644 --- a/lib/yahns/server_mp.rb +++ b/lib/yahns/server_mp.rb @@ -164,6 +164,8 @@ module Yahns::ServerMP # :nodoc: Yahns::Log.exception(@logger, "main worker loop", e) end while alive || fdmap.size > 0 exit + ensure + quit_finish end def mp_sig_handle(worker, alive) diff --git a/test/test_queue.rb b/test/test_queue.rb deleted file mode 100644 index cdb9ade..0000000 --- a/test/test_queue.rb +++ /dev/null @@ -1,62 +0,0 @@ -# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> and all contributors -# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) -require_relative 'helper' -require 'timeout' -require 'stringio' - -class TestQueue < Testcase - parallelize_me! - - def setup - @q = Yahns::Queue.new - @err = StringIO.new - @logger = Logger.new(@err) - @q.fdmap = @fdmap = Yahns::Fdmap.new(@logger, 0.5) - assert @q.close_on_exec? - end - - def test_queue - r, w = IO.pipe - assert_equal 0, @fdmap.size - @q.queue_add(r, Yahns::Queue::QEV_RD) - assert_equal 1, @fdmap.size - def r.yahns_step - begin - case read_nonblock(11) - when "ignore" - return :ignore - end - rescue Errno::EAGAIN - return :wait_readable - rescue EOFError - return nil - end while true - end - w.write('.') - Timeout.timeout(10) do - Thread.pass until r.nread > 0 - @q.spawn_worker_threads(@logger, 1, 1) - Thread.pass until r.nread == 0 - - assert_equal 1, @fdmap.size - w.write("ignore") - Thread.pass until r.nread == 0 - Thread.pass until @fdmap.size == 0 - - assert_raises(Errno::EEXIST) { - @q.queue_add(r, Yahns::Queue::QEV_RD) - } - assert_equal 1, @fdmap.size - @q.epoll_ctl(SleepyPenguin::Epoll::CTL_MOD, r, Yahns::Queue::QEV_RD) - w.close - Thread.pass until @fdmap.size == 0 - end - assert r.closed? - ensure - [ r, w ].each { |io| io.close unless io.closed? } - end - - def teardown - @q.close - end -end |