diff options
-rw-r--r-- | lib/yahns/acceptor.rb | 7 | ||||
-rw-r--r-- | lib/yahns/config.rb | 1 | ||||
-rw-r--r-- | lib/yahns/queue_egg.rb | 7 | ||||
-rw-r--r-- | lib/yahns/queue_epoll.rb | 54 | ||||
-rw-r--r-- | lib/yahns/server.rb | 27 | ||||
-rw-r--r-- | lib/yahns/server_mp.rb | 2 | ||||
-rw-r--r-- | test/test_queue.rb | 62 |
7 files changed, 55 insertions, 105 deletions
diff --git a/lib/yahns/acceptor.rb b/lib/yahns/acceptor.rb index b5e7b0e..43d5fe8 100644 --- a/lib/yahns/acceptor.rb +++ b/lib/yahns/acceptor.rb @@ -2,8 +2,8 @@ # License: GPLv3 or later (see COPYING for details) module Yahns::Acceptor # :nodoc: def spawn_acceptor(logger, client_class, queue) - accept_flags = Kgio::SOCK_NONBLOCK | Kgio::SOCK_CLOEXEC Thread.new do + accept_flags = Kgio::SOCK_NONBLOCK | Kgio::SOCK_CLOEXEC Thread.current.abort_on_exception = true qev_flags = client_class.superclass::QEV_FLAGS begin @@ -21,8 +21,9 @@ module Yahns::Acceptor # :nodoc: queue.fdmap.desperate_expire_for(self, 5) sleep 1 # let other threads do some work rescue => e - Yahns::Log.exception(logger, "accept loop error", e) unless closed? - end until closed? + break if closed? + Yahns::Log.exception(logger, "accept loop", e) + end while true end end end diff --git a/lib/yahns/config.rb b/lib/yahns/config.rb index 61de74e..3f4bb90 100644 --- a/lib/yahns/config.rb +++ b/lib/yahns/config.rb @@ -343,7 +343,6 @@ class Yahns::Config # :nodoc: server.__send__("#{var}=", val) if val != :unset end queue(:default) if @qeggs.empty? - @qeggs.each_value { |qegg| qegg.logger ||= server.logger } @app_ctx.each { |app| app.logger ||= server.logger } end end diff --git a/lib/yahns/queue_egg.rb b/lib/yahns/queue_egg.rb index a2abc2f..dde72bf 100644 --- a/lib/yahns/queue_egg.rb +++ b/lib/yahns/queue_egg.rb @@ -4,20 +4,17 @@ # this represents a Yahns::Queue before its vivified. This only # lives in the parent process and should be clobbered after qc_vivify class Yahns::QueueEgg # :nodoc: - attr_writer :max_events, :worker_threads - attr_accessor :logger + attr_accessor :max_events, :worker_threads def initialize @max_events = 1 # 1 is good if worker_threads > 1 @worker_threads = 7 # any default is wrong for most apps... - @logger = nil end # only call after forking - def qc_vivify(fdmap) + def vivify(fdmap) queue = Yahns::Queue.new queue.fdmap = fdmap - queue.spawn_worker_threads(@logger, @worker_threads, @max_events) queue end end diff --git a/lib/yahns/queue_epoll.rb b/lib/yahns/queue_epoll.rb index 5cb455b..1813581 100644 --- a/lib/yahns/queue_epoll.rb +++ b/lib/yahns/queue_epoll.rb @@ -22,35 +22,33 @@ class Yahns::Queue < SleepyPenguin::Epoll::IO # :nodoc: end # returns an array of infinitely running threads - def spawn_worker_threads(logger, worker_threads, max_events) - worker_threads.times do - Thread.new do - Thread.current[:yahns_rbuf] = "" - begin - epoll_wait(max_events) do |_, io| # don't care for flags for now - case rv = io.yahns_step - when :wait_readable - epoll_ctl(Epoll::CTL_MOD, io, QEV_RD) - when :wait_writable - epoll_ctl(Epoll::CTL_MOD, io, QEV_WR) - when :wait_readwrite - epoll_ctl(Epoll::CTL_MOD, io, QEV_RDWR) - when :ignore # only used by rack.hijack - @fdmap.decr - when nil - # this is be the ONLY place where we call IO#close on - # things inside the queue - io.close - @fdmap.decr - else - raise "BUG: #{io.inspect}#yahns_step returned: #{rv.inspect}" - end + def worker_thread(logger, max_events) + Thread.new do + Thread.current[:yahns_rbuf] = "" + begin + epoll_wait(max_events) do |_, io| # don't care for flags for now + case rv = io.yahns_step + when :wait_readable + epoll_ctl(Epoll::CTL_MOD, io, QEV_RD) + when :wait_writable + epoll_ctl(Epoll::CTL_MOD, io, QEV_WR) + when :wait_readwrite + epoll_ctl(Epoll::CTL_MOD, io, QEV_RDWR) + when :ignore # only used by rack.hijack + @fdmap.decr + when nil + # this is be the ONLY place where we call IO#close on + # things inside the queue + io.close + @fdmap.decr + else + raise "BUG: #{io.inspect}#yahns_step returned: #{rv.inspect}" end - rescue => e - break if (IOError === e || Errno::EBADF === e) && closed? - Yahns::Log.exception(logger, 'queue loop', e) - end while true - end + end + rescue => e + break if closed? + Yahns::Log.exception(logger, 'queue loop', e) + end while true end end end diff --git a/lib/yahns/server.rb b/lib/yahns/server.rb index e664293..0a964b3 100644 --- a/lib/yahns/server.rb +++ b/lib/yahns/server.rb @@ -20,6 +20,8 @@ class Yahns::Server # :nodoc: @pid = nil @worker_processes = nil @user = nil + @queues = [] + @thr = [] end def sqwakeup(sig) @@ -262,21 +264,25 @@ class Yahns::Server # :nodoc: # initialize queues (epoll/kqueue) and associated worker threads queues = {} - @config.qeggs.each do |name, qegg| - queue = qegg.qc_vivify(fdmap) # worker threads run after this - queues[qegg] = queue + @config.qeggs.each do |name, qe| + queue = qe.vivify(fdmap) + qe.worker_threads.times do + @thr << queue.worker_thread(@logger, qe.max_events) + end + @queues << queue + queues[qe] = queue end # spin up applications (which are preload: false) @config.app_ctx.each { |ctx| ctx.after_fork_init } - # spin up acceptors, clients flow into worker queues after this + # spin up acceptor threads, clients flow into worker queues after this @listeners.each do |l| ctx = sock_opts(l)[:yahns_app_ctx] qegg = ctx.qegg || @config.qeggs[:default] # acceptors feed the the queues - l.spawn_acceptor(@logger, ctx, queues[qegg]) + @thr << l.spawn_acceptor(@logger, ctx, queues[qegg]) end fdmap end @@ -288,7 +294,7 @@ class Yahns::Server # :nodoc: end def quit_enter(alive) - self.listeners = [] + self.listeners = [] # close acceptors, we close epolls in quit_done exit(0) unless alive # drop connections immediately if signaled twice @config.config_listeners.each_value do |opts| ctx = opts[:yahns_app_ctx] or next @@ -297,6 +303,13 @@ class Yahns::Server # :nodoc: false end + # drops all the the IO objects we have threads waiting on before exiting + def quit_finish + @queues.each(&:close) + self.listeners = [] # just in case, this is used in ensure + @thr.each(&:join) + end + def sp_sig_handle(alive) @sev.kgio_wait_readable(alive ? nil : 0.01) @sev.yahns_step @@ -327,5 +340,7 @@ class Yahns::Server # :nodoc: Yahns::Log.exception(@logger, "main loop", e) end while alive || fdmap.size > 0 unlink_pid_safe(@pid) if @pid + ensure + quit_finish end end diff --git a/lib/yahns/server_mp.rb b/lib/yahns/server_mp.rb index ab0559d..6e65bd4 100644 --- a/lib/yahns/server_mp.rb +++ b/lib/yahns/server_mp.rb @@ -164,6 +164,8 @@ module Yahns::ServerMP # :nodoc: Yahns::Log.exception(@logger, "main worker loop", e) end while alive || fdmap.size > 0 exit + ensure + quit_finish end def mp_sig_handle(worker, alive) diff --git a/test/test_queue.rb b/test/test_queue.rb deleted file mode 100644 index cdb9ade..0000000 --- a/test/test_queue.rb +++ /dev/null @@ -1,62 +0,0 @@ -# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> and all contributors -# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) -require_relative 'helper' -require 'timeout' -require 'stringio' - -class TestQueue < Testcase - parallelize_me! - - def setup - @q = Yahns::Queue.new - @err = StringIO.new - @logger = Logger.new(@err) - @q.fdmap = @fdmap = Yahns::Fdmap.new(@logger, 0.5) - assert @q.close_on_exec? - end - - def test_queue - r, w = IO.pipe - assert_equal 0, @fdmap.size - @q.queue_add(r, Yahns::Queue::QEV_RD) - assert_equal 1, @fdmap.size - def r.yahns_step - begin - case read_nonblock(11) - when "ignore" - return :ignore - end - rescue Errno::EAGAIN - return :wait_readable - rescue EOFError - return nil - end while true - end - w.write('.') - Timeout.timeout(10) do - Thread.pass until r.nread > 0 - @q.spawn_worker_threads(@logger, 1, 1) - Thread.pass until r.nread == 0 - - assert_equal 1, @fdmap.size - w.write("ignore") - Thread.pass until r.nread == 0 - Thread.pass until @fdmap.size == 0 - - assert_raises(Errno::EEXIST) { - @q.queue_add(r, Yahns::Queue::QEV_RD) - } - assert_equal 1, @fdmap.size - @q.epoll_ctl(SleepyPenguin::Epoll::CTL_MOD, r, Yahns::Queue::QEV_RD) - w.close - Thread.pass until @fdmap.size == 0 - end - assert r.closed? - ensure - [ r, w ].each { |io| io.close unless io.closed? } - end - - def teardown - @q.close - end -end |