diff options
author | Eric Wong <e@80x24.org> | 2013-10-30 01:50:12 +0000 |
---|---|---|
committer | Eric Wong <normalperson@yhbt.net> | 2013-10-30 07:00:55 +0000 |
commit | 68fc0c10468f0fefa6777bdabf4712d33de1aa42 (patch) | |
tree | 46ed4bd684bed91c3440a1d57c9790db7b91896a | |
parent | 72e20c9d7aac1837f1565cff5856e50c692304be (diff) | |
download | yahns-68fc0c10468f0fefa6777bdabf4712d33de1aa42.tar.gz |
This is probably not needed and just adds contention, but it makes experimenting easier. While we're at it, validate minimum values of for sndbuf/rcvbuf along with this new threads value, too.
-rw-r--r-- | Documentation/yahns_config.txt | 12 | ||||
-rw-r--r-- | lib/yahns/acceptor.rb | 75 | ||||
-rw-r--r-- | lib/yahns/config.rb | 7 | ||||
-rw-r--r-- | lib/yahns/server.rb | 6 | ||||
-rw-r--r-- | test/test_mt_accept.rb | 48 |
5 files changed, 114 insertions, 34 deletions
diff --git a/Documentation/yahns_config.txt b/Documentation/yahns_config.txt index 83a330e..db2634c 100644 --- a/Documentation/yahns_config.txt +++ b/Documentation/yahns_config.txt @@ -335,6 +335,18 @@ Ruby it is running under. Default: false (unset) + + threads: INTEGER + + Used to control the number of threads blocking on the accept(2) + or accept4(2) system call (per listen socket). + + Usually, only one thread is necessary, especially when multiple + worker_processes are configured (as there'll be one thread + per-process). Having extra threads may increase contention with + epoll and FD allocation within one process. + + Default: 1 + + umask: MODE Sets the file mode creation mask for UNIX sockets. If specified, diff --git a/lib/yahns/acceptor.rb b/lib/yahns/acceptor.rb index 76fcc26..268393c 100644 --- a/lib/yahns/acceptor.rb +++ b/lib/yahns/acceptor.rb @@ -3,48 +3,61 @@ # License: GPLv3 or later (see COPYING for details) module Yahns::Acceptor # :nodoc: def __ac_quit_done? - @thr.join(0.01) ? close.nil? : false - rescue - @thr.alive? ? false : close.nil? + @thrs.delete_if do |t| + begin + t.join(0.01) + rescue + ! t.alive? + end + end + return false if @thrs[0] + close + true end # just keep looping this on every acceptor until the associated thread dies def ac_quit - @thr[:yahns_quit] = true + @thrs.each { |t| t[:yahns_quit] = true } return true if __ac_quit_done? - # try to connect to kick it out of the blocking accept() syscall - killer = Kgio::Socket.start(getsockname) - killer.kgio_write("G") # first byte of "GET / HTTP/1.0\r\n\r\n" + @thrs.each do + begin + # try to connect to kick it out of the blocking accept() syscall + killer = Kgio::Socket.start(getsockname) + killer.kgio_write("G") # first byte of "GET / HTTP/1.0\r\n\r\n" + ensure + killer.close if killer + end + end false # now hope __ac_quit_done? is true next time around rescue SystemCallError - __ac_quit_done? - ensure - killer.close if killer + return __ac_quit_done? end - def spawn_acceptor(logger, client_class, queue) - @thr = Thread.new do - t = Thread.current - accept_flags = Kgio::SOCK_NONBLOCK | Kgio::SOCK_CLOEXEC - qev_flags = client_class.superclass::QEV_FLAGS - begin - # We want the accept/accept4 syscall to be _blocking_ - # so it can distribute work evenly between processes - if client = kgio_accept(client_class, accept_flags) - client.yahns_init + def spawn_acceptor(nr, logger, client_class, queue) + @thrs = nr.times.map do + Thread.new do + t = Thread.current + accept_flags = Kgio::SOCK_NONBLOCK | Kgio::SOCK_CLOEXEC + qev_flags = client_class.superclass::QEV_FLAGS + begin + # We want the accept/accept4 syscall to be _blocking_ + # so it can distribute work evenly between processes + if client = kgio_accept(client_class, accept_flags) + client.yahns_init - # it is not safe to touch client in this thread after this, - # a worker thread may grab client right away - queue.queue_add(client, qev_flags) - end - rescue Errno::EMFILE, Errno::ENFILE => e - logger.error("#{e.message}, consider raising open file limits") - queue.fdmap.desperate_expire_for(nil, 5) - sleep 1 # let other threads do some work - rescue => e - Yahns::Log.exception(logger, "accept loop", e) - end until t[:yahns_quit] + # it is not safe to touch client in this thread after this, + # a worker thread may grab client right away + queue.queue_add(client, qev_flags) + end + rescue Errno::EMFILE, Errno::ENFILE => e + logger.error("#{e.message}, consider raising open file limits") + queue.fdmap.desperate_expire_for(nil, 5) + sleep 1 # let other threads do some work + rescue => e + Yahns::Log.exception(logger, "accept loop", e) + end until t[:yahns_quit] + end end end end diff --git a/lib/yahns/config.rb b/lib/yahns/config.rb index c923434..27c085f 100644 --- a/lib/yahns/config.rb +++ b/lib/yahns/config.rb @@ -182,11 +182,16 @@ class Yahns::Config # :nodoc: address = expand_addr(address) String === address or raise ArgumentError, "address=#{address.inspect} must be a string" - [ :umask, :backlog, :sndbuf, :rcvbuf ].each do |key| + [ :umask, :backlog ].each do |key| + # :backlog may be negative on some OSes value = options[key] or next Integer === value or raise ArgumentError, "#{var}: not an integer: #{key}=#{value.inspect}" end + [ :sndbuf, :rcvbuf, :threads ].each do |key| + value = options[key] and _check_int(key, value, 1) + end + [ :ipv6only, :reuseport ].each do |key| (value = options[key]).nil? and next [ true, false ].include?(value) or diff --git a/lib/yahns/server.rb b/lib/yahns/server.rb index 396b653..d7f1b54 100644 --- a/lib/yahns/server.rb +++ b/lib/yahns/server.rb @@ -341,11 +341,13 @@ class Yahns::Server # :nodoc: # spin up acceptor threads, clients flow into worker queues after this @listeners.each do |l| - ctx = sock_opts(l)[:yahns_app_ctx] + opts = sock_opts(l) + ctx = opts[:yahns_app_ctx] qegg = ctx.qegg || @config.qeggs[:default] + q = queues[qegg] ||= qegg_vivify(qegg, fdmap) # acceptors feed the the queues - l.spawn_acceptor(@logger, ctx, queues[qegg] ||= qegg_vivify(qegg, fdmap)) + l.spawn_acceptor(opts[:threads] || 1, @logger, ctx, q) end fdmap end diff --git a/test/test_mt_accept.rb b/test/test_mt_accept.rb new file mode 100644 index 0000000..e006af8 --- /dev/null +++ b/test/test_mt_accept.rb @@ -0,0 +1,48 @@ +# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> and all contributors +# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) +require_relative 'server_helper' +require 'rack/lobster' + +class TestMtAccept < Testcase + ENV["N"].to_i > 1 and parallelize_me! + include ServerHelper + alias setup server_helper_setup + alias teardown server_helper_teardown + + def test_mt_accept + err, cfg, host, port = @err, Yahns::Config.new, @srv.addr[3], @srv.addr[1] + opts = { threads: 1 } + cfg.instance_eval do + GTL.synchronize do + app(:rack, Rack::Lobster.new) { listen "#{host}:#{port}", threads: 1 } + end + stderr_path err.path + end + pid = mkserver(cfg) + Net::HTTP.start(host, port) do |http| + assert_equal 200, http.request(Net::HTTP::Get.new("/")).code.to_i + end + orig_count = Dir["/proc/#{pid}/task/*"].size + quit_wait(pid) + + cfg = Yahns::Config.new + opts = { threads: 1 } + cfg.instance_eval do + GTL.synchronize do + app(:rack, Rack::Lobster.new) { listen "#{host}:#{port}", threads: 2 } + end + stderr_path err.path + end + pid = mkserver(cfg) + Net::HTTP.start(host, port) do |http| + assert_equal 200, http.request(Net::HTTP::Get.new("/")).code.to_i + end + Timeout.timeout(30) do + begin + new_count = Dir["/proc/#{pid}/task/*"].size + end until new_count == (orig_count + 1) && sleep(0.01) + end + ensure + quit_wait(pid) + end +end if RUBY_PLATFORM =~ /linux/ && File.directory?("/proc") |