From 68fc0c10468f0fefa6777bdabf4712d33de1aa42 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Wed, 30 Oct 2013 01:50:12 +0000 Subject: allow multiple blocking threads per listen socket This is probably not needed and just adds contention, but it makes experimenting easier. While we're at it, validate minimum values of for sndbuf/rcvbuf along with this new threads value, too. --- Documentation/yahns_config.txt | 12 +++++++ lib/yahns/acceptor.rb | 75 +++++++++++++++++++++++++----------------- lib/yahns/config.rb | 7 +++- lib/yahns/server.rb | 6 ++-- test/test_mt_accept.rb | 48 +++++++++++++++++++++++++++ 5 files changed, 114 insertions(+), 34 deletions(-) create mode 100644 test/test_mt_accept.rb diff --git a/Documentation/yahns_config.txt b/Documentation/yahns_config.txt index 83a330e..db2634c 100644 --- a/Documentation/yahns_config.txt +++ b/Documentation/yahns_config.txt @@ -335,6 +335,18 @@ Ruby it is running under. Default: false (unset) + + threads: INTEGER + + Used to control the number of threads blocking on the accept(2) + or accept4(2) system call (per listen socket). + + Usually, only one thread is necessary, especially when multiple + worker_processes are configured (as there'll be one thread + per-process). Having extra threads may increase contention with + epoll and FD allocation within one process. + + Default: 1 + + umask: MODE Sets the file mode creation mask for UNIX sockets. If specified, diff --git a/lib/yahns/acceptor.rb b/lib/yahns/acceptor.rb index 76fcc26..268393c 100644 --- a/lib/yahns/acceptor.rb +++ b/lib/yahns/acceptor.rb @@ -3,48 +3,61 @@ # License: GPLv3 or later (see COPYING for details) module Yahns::Acceptor # :nodoc: def __ac_quit_done? - @thr.join(0.01) ? close.nil? : false - rescue - @thr.alive? ? false : close.nil? + @thrs.delete_if do |t| + begin + t.join(0.01) + rescue + ! t.alive? + end + end + return false if @thrs[0] + close + true end # just keep looping this on every acceptor until the associated thread dies def ac_quit - @thr[:yahns_quit] = true + @thrs.each { |t| t[:yahns_quit] = true } return true if __ac_quit_done? - # try to connect to kick it out of the blocking accept() syscall - killer = Kgio::Socket.start(getsockname) - killer.kgio_write("G") # first byte of "GET / HTTP/1.0\r\n\r\n" + @thrs.each do + begin + # try to connect to kick it out of the blocking accept() syscall + killer = Kgio::Socket.start(getsockname) + killer.kgio_write("G") # first byte of "GET / HTTP/1.0\r\n\r\n" + ensure + killer.close if killer + end + end false # now hope __ac_quit_done? is true next time around rescue SystemCallError - __ac_quit_done? - ensure - killer.close if killer + return __ac_quit_done? end - def spawn_acceptor(logger, client_class, queue) - @thr = Thread.new do - t = Thread.current - accept_flags = Kgio::SOCK_NONBLOCK | Kgio::SOCK_CLOEXEC - qev_flags = client_class.superclass::QEV_FLAGS - begin - # We want the accept/accept4 syscall to be _blocking_ - # so it can distribute work evenly between processes - if client = kgio_accept(client_class, accept_flags) - client.yahns_init + def spawn_acceptor(nr, logger, client_class, queue) + @thrs = nr.times.map do + Thread.new do + t = Thread.current + accept_flags = Kgio::SOCK_NONBLOCK | Kgio::SOCK_CLOEXEC + qev_flags = client_class.superclass::QEV_FLAGS + begin + # We want the accept/accept4 syscall to be _blocking_ + # so it can distribute work evenly between processes + if client = kgio_accept(client_class, accept_flags) + client.yahns_init - # it is not safe to touch client in this thread after this, - # a worker thread may grab client right away - queue.queue_add(client, qev_flags) - end - rescue Errno::EMFILE, Errno::ENFILE => e - logger.error("#{e.message}, consider raising open file limits") - queue.fdmap.desperate_expire_for(nil, 5) - sleep 1 # let other threads do some work - rescue => e - Yahns::Log.exception(logger, "accept loop", e) - end until t[:yahns_quit] + # it is not safe to touch client in this thread after this, + # a worker thread may grab client right away + queue.queue_add(client, qev_flags) + end + rescue Errno::EMFILE, Errno::ENFILE => e + logger.error("#{e.message}, consider raising open file limits") + queue.fdmap.desperate_expire_for(nil, 5) + sleep 1 # let other threads do some work + rescue => e + Yahns::Log.exception(logger, "accept loop", e) + end until t[:yahns_quit] + end end end end diff --git a/lib/yahns/config.rb b/lib/yahns/config.rb index c923434..27c085f 100644 --- a/lib/yahns/config.rb +++ b/lib/yahns/config.rb @@ -182,11 +182,16 @@ class Yahns::Config # :nodoc: address = expand_addr(address) String === address or raise ArgumentError, "address=#{address.inspect} must be a string" - [ :umask, :backlog, :sndbuf, :rcvbuf ].each do |key| + [ :umask, :backlog ].each do |key| + # :backlog may be negative on some OSes value = options[key] or next Integer === value or raise ArgumentError, "#{var}: not an integer: #{key}=#{value.inspect}" end + [ :sndbuf, :rcvbuf, :threads ].each do |key| + value = options[key] and _check_int(key, value, 1) + end + [ :ipv6only, :reuseport ].each do |key| (value = options[key]).nil? and next [ true, false ].include?(value) or diff --git a/lib/yahns/server.rb b/lib/yahns/server.rb index 396b653..d7f1b54 100644 --- a/lib/yahns/server.rb +++ b/lib/yahns/server.rb @@ -341,11 +341,13 @@ class Yahns::Server # :nodoc: # spin up acceptor threads, clients flow into worker queues after this @listeners.each do |l| - ctx = sock_opts(l)[:yahns_app_ctx] + opts = sock_opts(l) + ctx = opts[:yahns_app_ctx] qegg = ctx.qegg || @config.qeggs[:default] + q = queues[qegg] ||= qegg_vivify(qegg, fdmap) # acceptors feed the the queues - l.spawn_acceptor(@logger, ctx, queues[qegg] ||= qegg_vivify(qegg, fdmap)) + l.spawn_acceptor(opts[:threads] || 1, @logger, ctx, q) end fdmap end diff --git a/test/test_mt_accept.rb b/test/test_mt_accept.rb new file mode 100644 index 0000000..e006af8 --- /dev/null +++ b/test/test_mt_accept.rb @@ -0,0 +1,48 @@ +# Copyright (C) 2013, Eric Wong and all contributors +# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) +require_relative 'server_helper' +require 'rack/lobster' + +class TestMtAccept < Testcase + ENV["N"].to_i > 1 and parallelize_me! + include ServerHelper + alias setup server_helper_setup + alias teardown server_helper_teardown + + def test_mt_accept + err, cfg, host, port = @err, Yahns::Config.new, @srv.addr[3], @srv.addr[1] + opts = { threads: 1 } + cfg.instance_eval do + GTL.synchronize do + app(:rack, Rack::Lobster.new) { listen "#{host}:#{port}", threads: 1 } + end + stderr_path err.path + end + pid = mkserver(cfg) + Net::HTTP.start(host, port) do |http| + assert_equal 200, http.request(Net::HTTP::Get.new("/")).code.to_i + end + orig_count = Dir["/proc/#{pid}/task/*"].size + quit_wait(pid) + + cfg = Yahns::Config.new + opts = { threads: 1 } + cfg.instance_eval do + GTL.synchronize do + app(:rack, Rack::Lobster.new) { listen "#{host}:#{port}", threads: 2 } + end + stderr_path err.path + end + pid = mkserver(cfg) + Net::HTTP.start(host, port) do |http| + assert_equal 200, http.request(Net::HTTP::Get.new("/")).code.to_i + end + Timeout.timeout(30) do + begin + new_count = Dir["/proc/#{pid}/task/*"].size + end until new_count == (orig_count + 1) && sleep(0.01) + end + ensure + quit_wait(pid) + end +end if RUBY_PLATFORM =~ /linux/ && File.directory?("/proc") -- cgit v1.2.3-24-ge0c7