about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <e@yhbt.net>2010-09-27 01:13:30 +0000
committerEric Wong <e@yhbt.net>2010-09-27 09:08:15 +0000
commit6c818b0b6f76ef733679bcea1024142b4ef3ce00 (patch)
tree964eaf8bb7796007bb3f4ecd850feba5acd6a0b5
parentf81cb3c05a0eb46ec61ceb295b51ead16e6a0da4 (diff)
downloadkgio-6c818b0b6f76ef733679bcea1024142b4ef3ce00.tar.gz
We'll stick with the "try" prefix if we're going to be
non-blocking.  kgio_accept will favor a blocking accept() call
where it's possible to release the GVL, allowing it to avoid
thundering herd problems.  Otherwise it'll use thread-safe
blocking under Ruby 1.8.
-rw-r--r--ext/kgio/extconf.rb1
-rw-r--r--ext/kgio/kgio_ext.c150
-rw-r--r--test/lib_server_accept.rb70
-rw-r--r--test/test_tcp_client_read_server_write.rb2
-rw-r--r--test/test_tcp_server.rb23
-rw-r--r--test/test_tcp_server_read_client_write.rb2
-rw-r--r--test/test_unix_client_read_server_write.rb2
-rw-r--r--test/test_unix_server.rb24
-rw-r--r--test/test_unix_server_read_client_write.rb2
9 files changed, 206 insertions, 70 deletions
diff --git a/ext/kgio/extconf.rb b/ext/kgio/extconf.rb
index 2eb35f7..09d710b 100644
--- a/ext/kgio/extconf.rb
+++ b/ext/kgio/extconf.rb
@@ -14,6 +14,7 @@ else
   have_func('rb_fdopen')
 end
 have_func('rb_io_ascii8bit_binmode')
+have_func('rb_thread_blocking_region')
 
 dir_config('kgio')
 create_makefile('kgio_ext')
diff --git a/ext/kgio/kgio_ext.c b/ext/kgio/kgio_ext.c
index 7818425..faa25ff 100644
--- a/ext/kgio/kgio_ext.c
+++ b/ext/kgio/kgio_ext.c
@@ -45,6 +45,12 @@ struct io_args {
         int fd;
 };
 
+struct accept_args {
+        int fd;
+        struct sockaddr *addr;
+        socklen_t *addrlen;
+};
+
 static void wait_readable(VALUE io)
 {
         if (io_wait_rd) {
@@ -337,23 +343,94 @@ static VALUE wait_rd(VALUE mod)
         return io_wait_rd ? ID2SYM(io_wait_rd) : Qnil;
 }
 
+static VALUE xaccept(void *ptr)
+{
+        struct accept_args *a = ptr;
+
+        return (VALUE)accept4(a->fd, a->addr, a->addrlen, accept4_flags);
+}
+
+#ifdef HAVE_RB_THREAD_BLOCKING_REGION
+#  include <time.h>
+static int thread_accept(struct accept_args *a, int force_nonblock)
+{
+        if (force_nonblock)
+                set_nonblocking(a->fd);
+        return (int)rb_thread_blocking_region(xaccept, a, RUBY_UBF_IO, 0);
+}
+
+/*
+ * Try to use a (real) blocking accept() since that can prevent
+ * thundering herds under Linux:
+ * http://www.citi.umich.edu/projects/linux-scalability/reports/accept.html
+ *
+ * So we periodically disable non-blocking, but not too frequently
+ * because other processes may set non-blocking (especially during
+ * a process upgrade) with Rainbows! concurrency model changes.
+ */
+static void set_blocking_or_block(int fd)
+{
+        static time_t last_set_blocking;
+        time_t now = time(NULL);
+
+        if (last_set_blocking == 0) {
+                last_set_blocking = now;
+                (void)rb_io_wait_readable(fd);
+        } else if ((now - last_set_blocking) <= 5) {
+                (void)rb_io_wait_readable(fd);
+        } else {
+                int flags = fcntl(fd, F_GETFL);
+                if (flags == -1)
+                        rb_sys_fail("fcntl(F_GETFL)");
+                if (flags & O_NONBLOCK) {
+                        flags = fcntl(fd, F_SETFL, flags & ~O_NONBLOCK);
+                        if (flags == -1)
+                                rb_sys_fail("fcntl(F_SETFL)");
+                }
+                last_set_blocking = now;
+        }
+}
+#else /* ! HAVE_RB_THREAD_BLOCKING_REGION */
+#  include <rubysig.h>
+static int thread_accept(struct accept_args *a, int force_nonblock)
+{
+        int rv;
+
+        /* always use non-blocking accept() under 1.8 for green threads */
+        set_nonblocking(a->fd);
+        TRAP_BEG;
+        rv = (int)xaccept(a);
+        TRAP_END;
+        return rv;
+}
+#define set_blocking_or_block(fd) (void)rb_io_wait_readable(fd)
+#endif /* ! HAVE_RB_THREAD_BLOCKING_REGION */
+
 static VALUE
-my_accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen)
+my_accept(VALUE io, struct sockaddr *addr, socklen_t *addrlen, int nonblock)
 {
         int client;
+        struct accept_args a;
 
+        a.fd = my_fileno(io);
+        a.addr = addr;
+        a.addrlen = addrlen;
 retry:
-        client = accept4(sockfd, addr, addrlen, accept4_flags);
+        client = thread_accept(&a, nonblock);
         if (client == -1) {
                 switch (errno) {
                 case EAGAIN:
+                        if (nonblock)
+                                return Qnil;
+                        set_blocking_or_block(a.fd);
 #ifdef ECONNABORTED
                 case ECONNABORTED:
 #endif /* ECONNABORTED */
 #ifdef EPROTO
                 case EPROTO:
 #endif /* EPROTO */
-                        return Qnil;
+                case EINTR:
+                        goto retry;
                 case ENOMEM:
                 case EMFILE:
                 case ENFILE:
@@ -362,50 +439,65 @@ retry:
 #endif /* ENOBUFS */
                         errno = 0;
                         rb_gc();
-                        client = accept4(sockfd, addr, addrlen, accept4_flags);
-                        break;
-                case EINTR:
-                        goto retry;
+                        client = thread_accept(&a, nonblock);
                 }
-                if (client == -1)
+                if (client == -1) {
+                        if (errno == EINTR)
+                                goto retry;
                         rb_sys_fail("accept");
+                }
         }
         return sock_for_fd(cSocket, client);
 }
 
-/* non-blocking flag should be set on this socket before accept() is called */
-static VALUE unix_accept(VALUE io)
+static void in_addr_set(VALUE io, struct sockaddr_in *addr)
 {
-        int fd = my_fileno(io);
-        VALUE rv = my_accept(fd, NULL, NULL);
+        VALUE host = rb_str_new(0, INET_ADDRSTRLEN);
+        socklen_t addrlen = (socklen_t)INET_ADDRSTRLEN;
+        const char *name;
 
-        if (! NIL_P(rv))
-                rb_ivar_set(rv, iv_kgio_addr, localhost);
+        name = inet_ntop(AF_INET, &addr->sin_addr, RSTRING_PTR(host), addrlen);
+        if (name == NULL)
+                rb_sys_fail("inet_ntop");
+        rb_str_set_len(host, strlen(name));
+        rb_ivar_set(io, iv_kgio_addr, host);
+}
 
+static VALUE tcp_tryaccept(VALUE io)
+{
+        struct sockaddr_in addr;
+        socklen_t addrlen = sizeof(struct sockaddr_in);
+        VALUE rv = my_accept(io, (struct sockaddr *)&addr, &addrlen, 1);
+
+        if (!NIL_P(rv))
+                in_addr_set(rv, &addr);
         return rv;
 }
 
-/* non-blocking flag should be set on this socket before accept() is called */
 static VALUE tcp_accept(VALUE io)
 {
-        int fd = my_fileno(io);
         struct sockaddr_in addr;
         socklen_t addrlen = sizeof(struct sockaddr_in);
-        VALUE host;
-        const char *name;
-        VALUE rv = my_accept(fd, (struct sockaddr *)&addr, &addrlen);
+        VALUE rv = my_accept(io, (struct sockaddr *)&addr, &addrlen, 0);
 
-        if (NIL_P(rv))
-                return rv;
+        in_addr_set(rv, &addr);
+        return rv;
+}
 
-        host = rb_str_new(0, INET_ADDRSTRLEN);
-        addrlen = (socklen_t)INET_ADDRSTRLEN;
-        name = inet_ntop(AF_INET, &addr.sin_addr, RSTRING_PTR(host), addrlen);
-        if (name == NULL)
-                rb_sys_fail("inet_ntop");
-        rb_str_set_len(host, strlen(name));
-        rb_ivar_set(rv, iv_kgio_addr, host);
+static VALUE unix_tryaccept(VALUE io)
+{
+        VALUE rv = my_accept(io, NULL, NULL, 1);
+
+        if (!NIL_P(rv))
+                rb_ivar_set(rv, iv_kgio_addr, localhost);
+        return rv;
+}
+
+static VALUE unix_accept(VALUE io)
+{
+        VALUE rv = my_accept(io, NULL, NULL, 0);
 
+        rb_ivar_set(rv, iv_kgio_addr, localhost);
         return rv;
 }
 
@@ -697,10 +789,12 @@ void Init_kgio_ext(void)
 
         cUNIXServer = rb_const_get(rb_cObject, rb_intern("UNIXServer"));
         cUNIXServer = rb_define_class_under(mKgio, "UNIXServer", cUNIXServer);
+        rb_define_method(cUNIXServer, "kgio_tryaccept", unix_tryaccept, 0);
         rb_define_method(cUNIXServer, "kgio_accept", unix_accept, 0);
 
         cTCPServer = rb_const_get(rb_cObject, rb_intern("TCPServer"));
         cTCPServer = rb_define_class_under(mKgio, "TCPServer", cTCPServer);
+        rb_define_method(cTCPServer, "kgio_tryaccept", tcp_tryaccept, 0);
         rb_define_method(cTCPServer, "kgio_accept", tcp_accept, 0);
 
         cTCPSocket = rb_const_get(rb_cObject, rb_intern("TCPSocket"));
diff --git a/test/lib_server_accept.rb b/test/lib_server_accept.rb
new file mode 100644
index 0000000..1e6bf24
--- /dev/null
+++ b/test/lib_server_accept.rb
@@ -0,0 +1,70 @@
+require 'test/unit'
+require 'io/nonblock'
+$-w = true
+require 'kgio'
+
+module LibServerAccept
+
+  def teardown
+    @srv.close unless @srv.closed?
+    Kgio.accept_cloexec = true
+    Kgio.accept_nonblock = false
+  end
+
+  def test_tryaccept_success
+    a = client_connect
+    IO.select([@srv])
+    b = @srv.kgio_tryaccept
+    assert_kind_of Kgio::Socket, b
+    assert_equal @host, b.kgio_addr
+  end
+
+  def test_tryaccept_fail
+    assert_equal nil, @srv.kgio_tryaccept
+  end
+
+  def test_blocking_accept
+    t0 = Time.now
+    pid = fork { sleep 1; a = client_connect; sleep }
+    b = @srv.kgio_accept
+    elapsed = Time.now - t0
+    assert_kind_of Kgio::Socket, b
+    assert_equal @host, b.kgio_addr
+    Process.kill(:TERM, pid)
+    Process.waitpid(pid)
+    assert elapsed >= 1, "elapsed: #{elapsed}"
+  end
+
+  def test_blocking_accept_with_nonblock_socket
+    @srv.nonblock = true
+    t0 = Time.now
+    pid = fork { sleep 1; a = client_connect; sleep }
+    b = @srv.kgio_accept
+    elapsed = Time.now - t0
+    assert_kind_of Kgio::Socket, b
+    assert_equal @host, b.kgio_addr
+    Process.kill(:TERM, pid)
+    Process.waitpid(pid)
+    assert elapsed >= 1, "elapsed: #{elapsed}"
+
+    t0 = Time.now
+    pid = fork { sleep 6; a = client_connect; sleep }
+    b = @srv.kgio_accept
+    elapsed = Time.now - t0
+    assert_kind_of Kgio::Socket, b
+    assert_equal @host, b.kgio_addr
+    Process.kill(:TERM, pid)
+    Process.waitpid(pid)
+    assert elapsed >= 6, "elapsed: #{elapsed}"
+
+    t0 = Time.now
+    pid = fork { sleep 1; a = client_connect; sleep }
+    b = @srv.kgio_accept
+    elapsed = Time.now - t0
+    assert_kind_of Kgio::Socket, b
+    assert_equal @host, b.kgio_addr
+    Process.kill(:TERM, pid)
+    Process.waitpid(pid)
+    assert elapsed >= 1, "elapsed: #{elapsed}"
+  end
+end
diff --git a/test/test_tcp_client_read_server_write.rb b/test/test_tcp_client_read_server_write.rb
index 13714e9..6e97321 100644
--- a/test/test_tcp_client_read_server_write.rb
+++ b/test/test_tcp_client_read_server_write.rb
@@ -6,7 +6,7 @@ class TesTcpClientReadServerWrite < Test::Unit::TestCase
     @srv = Kgio::TCPServer.new(@host, 0)
     @port = @srv.addr[1]
     @wr = Kgio::TCPSocket.new(@host, @port)
-    @rd = @srv.kgio_accept
+    @rd = @srv.kgio_tryaccept
   end
 
   include LibReadWriteTest
diff --git a/test/test_tcp_server.rb b/test/test_tcp_server.rb
index c2bb518..eb6933e 100644
--- a/test/test_tcp_server.rb
+++ b/test/test_tcp_server.rb
@@ -1,7 +1,4 @@
-require 'test/unit'
-require 'io/nonblock'
-$-w = true
-require 'kgio'
+require './test/lib_server_accept'
 
 class TestKgioTCPServer < Test::Unit::TestCase
 
@@ -11,21 +8,9 @@ class TestKgioTCPServer < Test::Unit::TestCase
     @port = @srv.addr[1]
   end
 
-  def teardown
-    @srv.close unless @srv.closed?
-    Kgio.accept_cloexec = true
-    Kgio.accept_nonblock = false
+  def client_connect
+    TCPSocket.new(@host, @port)
   end
 
-  def test_accept
-    a = TCPSocket.new(@host, @port)
-    b = @srv.kgio_accept
-    assert_kind_of Kgio::Socket, b
-    assert_equal @host, b.kgio_addr
-  end
-
-  def test_accept_nonblock
-    @srv.nonblock = true
-    assert_equal nil, @srv.kgio_accept
-  end
+  include LibServerAccept
 end
diff --git a/test/test_tcp_server_read_client_write.rb b/test/test_tcp_server_read_client_write.rb
index 68cada3..8a67917 100644
--- a/test/test_tcp_server_read_client_write.rb
+++ b/test/test_tcp_server_read_client_write.rb
@@ -6,7 +6,7 @@ class TesTcpServerReadClientWrite < Test::Unit::TestCase
     @srv = Kgio::TCPServer.new(@host, 0)
     @port = @srv.addr[1]
     @wr = Kgio::TCPSocket.new(@host, @port)
-    @rd = @srv.kgio_accept
+    @rd = @srv.kgio_tryaccept
   end
 
   include LibReadWriteTest
diff --git a/test/test_unix_client_read_server_write.rb b/test/test_unix_client_read_server_write.rb
index cf2c5f1..0f8e55b 100644
--- a/test/test_unix_client_read_server_write.rb
+++ b/test/test_unix_client_read_server_write.rb
@@ -9,7 +9,7 @@ class TestUnixServerReadClientWrite < Test::Unit::TestCase
     tmp.close rescue nil
     @srv = Kgio::UNIXServer.new(@path)
     @rd = Kgio::UNIXSocket.new(@path)
-    @wr = @srv.kgio_accept
+    @wr = @srv.kgio_tryaccept
   end
 
   include LibReadWriteTest
diff --git a/test/test_unix_server.rb b/test/test_unix_server.rb
index 91b91b8..faa8209 100644
--- a/test/test_unix_server.rb
+++ b/test/test_unix_server.rb
@@ -1,8 +1,5 @@
-require 'test/unit'
-require 'io/nonblock'
-$-w = true
-require 'kgio'
 require 'tempfile'
+require './test/lib_server_accept'
 
 class TestKgioUNIXServer < Test::Unit::TestCase
 
@@ -12,23 +9,12 @@ class TestKgioUNIXServer < Test::Unit::TestCase
     File.unlink(@path)
     tmp.close rescue nil
     @srv = Kgio::UNIXServer.new(@path)
+    @host = '127.0.0.1'
   end
 
-  def teardown
-    @srv.close unless @srv.closed?
-    File.unlink(@path)
-    Kgio.accept_cloexec = true
-  end
-
-  def test_accept
-    a = UNIXSocket.new(@path)
-    b = @srv.kgio_accept
-    assert_kind_of Kgio::Socket, b
-    assert_equal "127.0.0.1", b.kgio_addr
+  def client_connect
+    UNIXSocket.new(@path)
   end
 
-  def test_accept_nonblock
-    @srv.nonblock = true
-    assert_equal nil, @srv.kgio_accept
-  end
+  include LibServerAccept
 end
diff --git a/test/test_unix_server_read_client_write.rb b/test/test_unix_server_read_client_write.rb
index 532989e..db304a2 100644
--- a/test/test_unix_server_read_client_write.rb
+++ b/test/test_unix_server_read_client_write.rb
@@ -9,7 +9,7 @@ class TestUnixServerReadClientWrite < Test::Unit::TestCase
     tmp.close rescue nil
     @srv = Kgio::UNIXServer.new(@path)
     @wr = Kgio::UNIXSocket.new(@path)
-    @rd = @srv.kgio_accept
+    @rd = @srv.kgio_tryaccept
   end
 
   include LibReadWriteTest