about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2013-10-30 01:50:12 +0000
committerEric Wong <normalperson@yhbt.net>2013-10-30 07:00:55 +0000
commit68fc0c10468f0fefa6777bdabf4712d33de1aa42 (patch)
tree46ed4bd684bed91c3440a1d57c9790db7b91896a
parent72e20c9d7aac1837f1565cff5856e50c692304be (diff)
downloadyahns-68fc0c10468f0fefa6777bdabf4712d33de1aa42.tar.gz
This is probably not needed and just adds contention, but it makes
experimenting easier.

While we're at it, validate minimum values of for sndbuf/rcvbuf
along with this new threads value, too.
-rw-r--r--Documentation/yahns_config.txt12
-rw-r--r--lib/yahns/acceptor.rb75
-rw-r--r--lib/yahns/config.rb7
-rw-r--r--lib/yahns/server.rb6
-rw-r--r--test/test_mt_accept.rb48
5 files changed, 114 insertions, 34 deletions
diff --git a/Documentation/yahns_config.txt b/Documentation/yahns_config.txt
index 83a330e..db2634c 100644
--- a/Documentation/yahns_config.txt
+++ b/Documentation/yahns_config.txt
@@ -335,6 +335,18 @@ Ruby it is running under.
 
         Default: false (unset)
 
+      + threads: INTEGER
+
+        Used to control the number of threads blocking on the accept(2)
+        or accept4(2) system call (per listen socket).
+
+        Usually, only one thread is necessary, especially when multiple
+        worker_processes are configured (as there'll be one thread
+        per-process).  Having extra threads may increase contention with
+        epoll and FD allocation within one process.
+
+        Default: 1
+
       + umask: MODE
 
         Sets the file mode creation mask for UNIX sockets.  If specified,
diff --git a/lib/yahns/acceptor.rb b/lib/yahns/acceptor.rb
index 76fcc26..268393c 100644
--- a/lib/yahns/acceptor.rb
+++ b/lib/yahns/acceptor.rb
@@ -3,48 +3,61 @@
 # License: GPLv3 or later (see COPYING for details)
 module Yahns::Acceptor # :nodoc:
   def __ac_quit_done?
-    @thr.join(0.01) ? close.nil? : false
-  rescue
-    @thr.alive? ? false : close.nil?
+    @thrs.delete_if do |t|
+      begin
+        t.join(0.01)
+      rescue
+        ! t.alive?
+      end
+    end
+    return false if @thrs[0]
+    close
+    true
   end
 
   # just keep looping this on every acceptor until the associated thread dies
   def ac_quit
-    @thr[:yahns_quit] = true
+    @thrs.each { |t| t[:yahns_quit] = true }
     return true if __ac_quit_done?
 
-    # try to connect to kick it out of the blocking accept() syscall
-    killer = Kgio::Socket.start(getsockname)
-    killer.kgio_write("G") # first byte of "GET / HTTP/1.0\r\n\r\n"
+    @thrs.each do
+      begin
+        # try to connect to kick it out of the blocking accept() syscall
+        killer = Kgio::Socket.start(getsockname)
+        killer.kgio_write("G") # first byte of "GET / HTTP/1.0\r\n\r\n"
+      ensure
+        killer.close if killer
+      end
+    end
     false # now hope __ac_quit_done? is true next time around
   rescue SystemCallError
-    __ac_quit_done?
-  ensure
-    killer.close if killer
+    return __ac_quit_done?
   end
 
-  def spawn_acceptor(logger, client_class, queue)
-    @thr = Thread.new do
-      t = Thread.current
-      accept_flags = Kgio::SOCK_NONBLOCK | Kgio::SOCK_CLOEXEC
-      qev_flags = client_class.superclass::QEV_FLAGS
-      begin
-        # We want the accept/accept4 syscall to be _blocking_
-        # so it can distribute work evenly between processes
-        if client = kgio_accept(client_class, accept_flags)
-          client.yahns_init
+  def spawn_acceptor(nr, logger, client_class, queue)
+    @thrs = nr.times.map do
+      Thread.new do
+        t = Thread.current
+        accept_flags = Kgio::SOCK_NONBLOCK | Kgio::SOCK_CLOEXEC
+        qev_flags = client_class.superclass::QEV_FLAGS
+        begin
+          # We want the accept/accept4 syscall to be _blocking_
+          # so it can distribute work evenly between processes
+          if client = kgio_accept(client_class, accept_flags)
+            client.yahns_init
 
-          # it is not safe to touch client in this thread after this,
-          # a worker thread may grab client right away
-          queue.queue_add(client, qev_flags)
-        end
-      rescue Errno::EMFILE, Errno::ENFILE => e
-        logger.error("#{e.message}, consider raising open file limits")
-        queue.fdmap.desperate_expire_for(nil, 5)
-        sleep 1 # let other threads do some work
-      rescue => e
-        Yahns::Log.exception(logger, "accept loop", e)
-      end until t[:yahns_quit]
+            # it is not safe to touch client in this thread after this,
+            # a worker thread may grab client right away
+            queue.queue_add(client, qev_flags)
+          end
+        rescue Errno::EMFILE, Errno::ENFILE => e
+          logger.error("#{e.message}, consider raising open file limits")
+          queue.fdmap.desperate_expire_for(nil, 5)
+          sleep 1 # let other threads do some work
+        rescue => e
+          Yahns::Log.exception(logger, "accept loop", e)
+        end until t[:yahns_quit]
+      end
     end
   end
 end
diff --git a/lib/yahns/config.rb b/lib/yahns/config.rb
index c923434..27c085f 100644
--- a/lib/yahns/config.rb
+++ b/lib/yahns/config.rb
@@ -182,11 +182,16 @@ class Yahns::Config # :nodoc:
     address = expand_addr(address)
     String === address or
       raise ArgumentError, "address=#{address.inspect} must be a string"
-    [ :umask, :backlog, :sndbuf, :rcvbuf ].each do |key|
+    [ :umask, :backlog ].each do |key|
+      # :backlog may be negative on some OSes
       value = options[key] or next
       Integer === value or
         raise ArgumentError, "#{var}: not an integer: #{key}=#{value.inspect}"
     end
+    [ :sndbuf, :rcvbuf, :threads ].each do |key|
+       value = options[key] and _check_int(key, value, 1)
+    end
+
     [ :ipv6only, :reuseport ].each do |key|
       (value = options[key]).nil? and next
       [ true, false ].include?(value) or
diff --git a/lib/yahns/server.rb b/lib/yahns/server.rb
index 396b653..d7f1b54 100644
--- a/lib/yahns/server.rb
+++ b/lib/yahns/server.rb
@@ -341,11 +341,13 @@ class Yahns::Server # :nodoc:
 
     # spin up acceptor threads, clients flow into worker queues after this
     @listeners.each do |l|
-      ctx = sock_opts(l)[:yahns_app_ctx]
+      opts = sock_opts(l)
+      ctx = opts[:yahns_app_ctx]
       qegg = ctx.qegg || @config.qeggs[:default]
+      q = queues[qegg] ||= qegg_vivify(qegg, fdmap)
 
       # acceptors feed the the queues
-      l.spawn_acceptor(@logger, ctx, queues[qegg] ||= qegg_vivify(qegg, fdmap))
+      l.spawn_acceptor(opts[:threads] || 1, @logger, ctx, q)
     end
     fdmap
   end
diff --git a/test/test_mt_accept.rb b/test/test_mt_accept.rb
new file mode 100644
index 0000000..e006af8
--- /dev/null
+++ b/test/test_mt_accept.rb
@@ -0,0 +1,48 @@
+# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> and all contributors
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+require_relative 'server_helper'
+require 'rack/lobster'
+
+class TestMtAccept < Testcase
+  ENV["N"].to_i > 1 and parallelize_me!
+  include ServerHelper
+  alias setup server_helper_setup
+  alias teardown server_helper_teardown
+
+  def test_mt_accept
+    err, cfg, host, port = @err, Yahns::Config.new, @srv.addr[3], @srv.addr[1]
+    opts = { threads: 1 }
+    cfg.instance_eval do
+      GTL.synchronize do
+        app(:rack, Rack::Lobster.new) { listen "#{host}:#{port}", threads: 1 }
+      end
+      stderr_path err.path
+    end
+    pid = mkserver(cfg)
+    Net::HTTP.start(host, port) do |http|
+      assert_equal 200, http.request(Net::HTTP::Get.new("/")).code.to_i
+    end
+    orig_count = Dir["/proc/#{pid}/task/*"].size
+    quit_wait(pid)
+
+    cfg = Yahns::Config.new
+    opts = { threads: 1 }
+    cfg.instance_eval do
+      GTL.synchronize do
+        app(:rack, Rack::Lobster.new) { listen "#{host}:#{port}", threads: 2 }
+      end
+      stderr_path err.path
+    end
+    pid = mkserver(cfg)
+    Net::HTTP.start(host, port) do |http|
+      assert_equal 200, http.request(Net::HTTP::Get.new("/")).code.to_i
+    end
+    Timeout.timeout(30) do
+      begin
+        new_count = Dir["/proc/#{pid}/task/*"].size
+      end until new_count == (orig_count + 1) && sleep(0.01)
+    end
+  ensure
+    quit_wait(pid)
+  end
+end if RUBY_PLATFORM =~ /linux/ && File.directory?("/proc")