about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2011-01-27 19:43:39 -0800
committerEric Wong <normalperson@yhbt.net>2011-01-27 19:45:52 -0800
commit910f6f3df099c04fcd55bd6b20785cce69cb36ae (patch)
tree4cc051bc7155b0b655a44fa19666a8870c5ece49
parentec91ac3d8c8d9236ba0cd01794c9c4a3ee3f7eeb (diff)
downloadkgio-910f6f3df099c04fcd55bd6b20785cce69cb36ae.tar.gz
It only supports TCP_CORK under Linux right now.

We use a very basic strategy to use TCP_CORK semantics optimally
in most TCP servers:  On corked sockets, we will uncork on recv()
if there was a previous send().  Otherwise we do not fiddle
with TCP_CORK at all.

Under Linux, we can rely on TCP_CORK being inherited in an
accept()-ed client socket so we can avoid syscalls for each
accept()-ed client if we already know the accept() socket corks.

This module does NOTHING for client TCP sockets, we only deal
with accept()-ed sockets right now.
-rw-r--r--ext/kgio/accept.c15
-rw-r--r--ext/kgio/kgio.h5
-rw-r--r--ext/kgio/kgio_ext.c1
-rw-r--r--ext/kgio/nopush.c167
-rw-r--r--ext/kgio/read_write.c3
-rw-r--r--kgio.gemspec1
-rw-r--r--test/test_nopush_smart.rb110
7 files changed, 299 insertions, 3 deletions
diff --git a/ext/kgio/accept.c b/ext/kgio/accept.c
index 66c2712..a147fec 100644
--- a/ext/kgio/accept.c
+++ b/ext/kgio/accept.c
@@ -133,14 +133,21 @@ static VALUE acceptor(int argc, const VALUE *argv)
         rb_raise(rb_eArgError, "wrong number of arguments (%d for 1)", argc);
 }
 
+#if defined(__linux__)
+#  define post_accept kgio_nopush_accept
+#else
+#  define post_accept(a,b,c,d) for(;0;)
+#endif
+
 static VALUE
-my_accept(VALUE io, VALUE klass,
+my_accept(VALUE accept_io, VALUE klass,
           struct sockaddr *addr, socklen_t *addrlen, int nonblock)
 {
         int client;
+        VALUE client_io;
         struct accept_args a;
 
-        a.fd = my_fileno(io);
+        a.fd = my_fileno(accept_io);
         a.addr = addr;
         a.addrlen = addrlen;
 retry:
@@ -175,7 +182,9 @@ retry:
                         rb_sys_fail("accept");
                 }
         }
-        return sock_for_fd(klass, client);
+        client_io = sock_for_fd(klass, client);
+        post_accept(accept_io, client_io, a.fd, client);
+        return client_io;
 }
 
 static void in_addr_set(VALUE io, struct sockaddr_in *addr)
diff --git a/ext/kgio/kgio.h b/ext/kgio/kgio.h
index dc270e6..cf117b6 100644
--- a/ext/kgio/kgio.h
+++ b/ext/kgio/kgio.h
@@ -33,6 +33,11 @@ void init_kgio_wait(void);
 void init_kgio_read_write(void);
 void init_kgio_accept(void);
 void init_kgio_connect(void);
+void init_kgio_nopush(void);
+
+void kgio_nopush_accept(VALUE, VALUE, int, int);
+void kgio_nopush_recv(VALUE, int);
+void kgio_nopush_send(VALUE, int);
 
 VALUE kgio_call_wait_writable(VALUE io);
 VALUE kgio_call_wait_readable(VALUE io);
diff --git a/ext/kgio/kgio_ext.c b/ext/kgio/kgio_ext.c
index 0a457ff..1ebdaae 100644
--- a/ext/kgio/kgio_ext.c
+++ b/ext/kgio/kgio_ext.c
@@ -6,4 +6,5 @@ void Init_kgio_ext(void)
         init_kgio_read_write();
         init_kgio_connect();
         init_kgio_accept();
+        init_kgio_nopush();
 }
diff --git a/ext/kgio/nopush.c b/ext/kgio/nopush.c
new file mode 100644
index 0000000..c8a7619
--- /dev/null
+++ b/ext/kgio/nopush.c
@@ -0,0 +1,167 @@
+/*
+ * We use a very basic strategy to use TCP_CORK semantics optimally
+ * in most TCP servers:  On corked sockets, we will uncork on recv()
+ * if there was a previous send().  Otherwise we do not fiddle
+ * with TCP_CORK at all.
+ *
+ * Under Linux, we can rely on TCP_CORK being inherited in an
+ * accept()-ed client socket so we can avoid syscalls for each
+ * accept()-ed client if we know the accept() socket corks.
+ *
+ * This module does NOTHING for client TCP sockets, we only deal
+ * with accept()-ed sockets right now.
+ */
+
+#include "kgio.h"
+
+enum nopush_state {
+        NOPUSH_STATE_IGNORE = -1,
+        NOPUSH_STATE_WRITER = 0,
+        NOPUSH_STATE_WRITTEN = 1,
+        NOPUSH_STATE_ACCEPTOR = 2
+};
+
+struct nopush_socket {
+        VALUE io;
+        enum nopush_state state;
+};
+
+static int enabled;
+static long capa;
+static struct nopush_socket *active;
+
+static void set_acceptor_state(struct nopush_socket *nps, int fd);
+static void flush_pending_data(int fd);
+
+static void grow(int fd)
+{
+        long new_capa = fd + 64;
+        size_t size;
+
+        assert(new_capa > capa && "grow()-ing for low fd");
+        size = new_capa * sizeof(struct nopush_socket);
+        active = xrealloc(active, size);
+
+        while (capa < new_capa) {
+                struct nopush_socket *nps = &active[capa++];
+
+                nps->io = Qnil;
+                nps->state = NOPUSH_STATE_IGNORE;
+        }
+}
+
+static VALUE s_get_nopush_smart(VALUE self)
+{
+        return enabled ? Qtrue : Qfalse;
+}
+
+static VALUE s_set_nopush_smart(VALUE self, VALUE val)
+{
+        enabled = RTEST(val);
+
+        return val;
+}
+
+void init_kgio_nopush(void)
+{
+        VALUE m = rb_define_module("Kgio");
+
+        rb_define_singleton_method(m, "nopush_smart?", s_get_nopush_smart, 0);
+        rb_define_singleton_method(m, "nopush_smart=", s_set_nopush_smart, 1);
+}
+
+/*
+ * called after a successful write, just mark that we've put something
+ * in the skb and will need to uncork on the next write.
+ */
+void kgio_nopush_send(VALUE io, int fd)
+{
+        struct nopush_socket *nps;
+
+        if (fd >= capa) return;
+        nps = &active[fd];
+        if (nps->io == io && nps->state == NOPUSH_STATE_WRITER)
+                nps->state = NOPUSH_STATE_WRITTEN;
+}
+
+/* called on successful accept() */
+void kgio_nopush_accept(VALUE accept_io, VALUE io, int accept_fd, int fd)
+{
+        struct nopush_socket *accept_nps, *client_nps;
+
+        if (!enabled)
+                return;
+        assert(fd >= 0 && "client_fd negative");
+        assert(accept_fd >= 0 && "accept_fd negative");
+        if (fd >= capa || accept_fd >= capa)
+                grow(fd > accept_fd ? fd : accept_fd);
+
+        accept_nps = &active[accept_fd];
+
+        if (accept_nps->io != accept_io) {
+                accept_nps->io = accept_io;
+                set_acceptor_state(accept_nps, fd);
+        }
+        client_nps = &active[fd];
+        client_nps->io = io;
+        if (accept_nps->state == NOPUSH_STATE_ACCEPTOR)
+                client_nps->state = NOPUSH_STATE_WRITER;
+        else
+                client_nps->state = NOPUSH_STATE_IGNORE;
+}
+
+void kgio_nopush_recv(VALUE io, int fd)
+{
+        struct nopush_socket *nps;
+
+        if (fd >= capa)
+                return;
+
+        nps = &active[fd];
+        if (nps->io != io || nps->state != NOPUSH_STATE_WRITTEN)
+                return;
+
+        /* reset internal state and flush corked buffers */
+        nps->state = NOPUSH_STATE_WRITER;
+        if (enabled)
+                flush_pending_data(fd);
+}
+
+#ifdef __linux__
+#include <netinet/tcp.h>
+static void set_acceptor_state(struct nopush_socket *nps, int fd)
+{
+        int corked = 0;
+        socklen_t optlen = sizeof(int);
+
+        if (getsockopt(fd, SOL_TCP, TCP_CORK, &corked, &optlen) != 0) {
+                if (errno != EOPNOTSUPP)
+                        rb_sys_fail("getsockopt(SOL_TCP, TCP_CORK)");
+                errno = 0;
+                nps->state = NOPUSH_STATE_IGNORE;
+        } else if (corked) {
+                nps->state = NOPUSH_STATE_ACCEPTOR;
+        } else {
+                nps->state = NOPUSH_STATE_IGNORE;
+        }
+}
+
+/*
+ * checks to see if we've written anything since the last recv()
+ * If we have, uncork the socket and immediately recork it.
+ */
+static void flush_pending_data(int fd)
+{
+        int optval = 0;
+        socklen_t optlen = sizeof(int);
+
+        if (setsockopt(fd, SOL_TCP, TCP_CORK, &optval, optlen) != 0)
+                rb_sys_fail("setsockopt(SOL_TCP, TCP_CORK, 0)");
+        /* immediately recork */
+        optval = 1;
+        if (setsockopt(fd, SOL_TCP, TCP_CORK, &optval, optlen) != 0)
+                rb_sys_fail("setsockopt(SOL_TCP, TCP_CORK, 1)");
+}
+/* TODO: add FreeBSD support */
+
+#endif /* linux */
diff --git a/ext/kgio/read_write.c b/ext/kgio/read_write.c
index 7ba2925..a954865 100644
--- a/ext/kgio/read_write.c
+++ b/ext/kgio/read_write.c
@@ -164,6 +164,7 @@ static VALUE my_recv(int io_wait, int argc, VALUE *argv, VALUE io)
         long n;
 
         prepare_read(&a, argc, argv, io);
+        kgio_nopush_recv(io, a.fd);
 
         if (a.len > 0) {
 retry:
@@ -320,6 +321,8 @@ retry:
         n = (long)send(a.fd, a.ptr, a.len, MSG_DONTWAIT);
         if (write_check(&a, n, "send", io_wait) != 0)
                 goto retry;
+        if (TYPE(a.buf) != T_SYMBOL)
+                kgio_nopush_send(io, a.fd);
         return a.buf;
 }
 
diff --git a/kgio.gemspec b/kgio.gemspec
index ef523b5..96b9e02 100644
--- a/kgio.gemspec
+++ b/kgio.gemspec
@@ -22,6 +22,7 @@ Gem::Specification.new do |s|
   s.extensions = %w(ext/kgio/extconf.rb)
 
   s.add_development_dependency('wrongdoc', '~> 1.4')
+  s.add_development_dependency('strace_me', '~> 1.0')
 
   # s.license = %w(LGPL) # disabled for compatibility with older RubyGems
 end
diff --git a/test/test_nopush_smart.rb b/test/test_nopush_smart.rb
new file mode 100644
index 0000000..6d4a698
--- /dev/null
+++ b/test/test_nopush_smart.rb
@@ -0,0 +1,110 @@
+require 'tempfile'
+require 'test/unit'
+RUBY_PLATFORM =~ /linux/ and require 'strace'
+$-w = true
+require 'kgio'
+
+class TestNoPushSmart < Test::Unit::TestCase
+  TCP_CORK = 3
+
+  def setup
+    Kgio.nopush_smart = false
+    assert_equal false, Kgio.nopush_smart?
+
+    @host = ENV["TEST_HOST"] || '127.0.0.1'
+    @srv = Kgio::TCPServer.new(@host, 0)
+    assert_nothing_raised {
+      @srv.setsockopt(Socket::SOL_TCP, TCP_CORK, 1)
+    } if RUBY_PLATFORM =~ /linux/
+    @port = @srv.addr[1]
+  end
+
+  def test_nopush_smart_true_unix
+    Kgio.nopush_smart = true
+    tmp = Tempfile.new('kgio_unix')
+    @path = tmp.path
+    File.unlink(@path)
+    tmp.close rescue nil
+    @srv = Kgio::UNIXServer.new(@path)
+    @rd = Kgio::UNIXSocket.new(@path)
+    io, err = Strace.me { @wr = @srv.kgio_accept }
+    assert_nil err
+    rc = nil
+    io, err = Strace.me {
+      @wr.kgio_write "HI\n"
+      rc = @wr.kgio_tryread 666
+    }
+    assert_nil err
+    lines = io.readlines
+    assert lines.grep(/TCP_CORK/).empty?, lines.inspect
+    assert_equal :wait_readable, rc
+  ensure
+    File.unlink(@path) rescue nil
+  end
+
+  def test_nopush_smart_false
+    Kgio.nopush_smart = nil
+    assert_equal false, Kgio.nopush_smart?
+
+    @wr = Kgio::TCPSocket.new(@host, @port)
+    io, err = Strace.me { @rd = @srv.kgio_accept }
+    assert_nil err
+    lines = io.readlines
+    assert lines.grep(/TCP_CORK/).empty?, lines.inspect
+    assert_equal 1, @rd.getsockopt(Socket::SOL_TCP, TCP_CORK).unpack("i")[0]
+
+    rbuf = "..."
+    t0 = Time.now
+    @rd.kgio_write "HI\n"
+    @wr.kgio_read(3, rbuf)
+    diff = Time.now - t0
+    assert(diff >= 0.200, "TCP_CORK broken? diff=#{diff} > 200ms")
+    assert_equal "HI\n", rbuf
+  end if RUBY_PLATFORM =~ /linux/
+
+  def test_nopush_smart_true
+    Kgio.nopush_smart = true
+    assert_equal true, Kgio.nopush_smart?
+    @wr = Kgio::TCPSocket.new(@host, @port)
+    io, err = Strace.me { @rd = @srv.kgio_accept }
+    assert_nil err
+    lines = io.readlines
+    assert_equal 1, lines.grep(/TCP_CORK/).size, lines.inspect
+    assert_equal 1, @rd.getsockopt(Socket::SOL_TCP, TCP_CORK).unpack("i")[0]
+
+    @wr.write "HI\n"
+    rbuf = ""
+    io, err = Strace.me { @rd.kgio_read(3, rbuf) }
+    assert_nil err
+    lines = io.readlines
+    assert lines.grep(/TCP_CORK/).empty?, lines.inspect
+    assert_equal "HI\n", rbuf
+
+    t0 = Time.now
+    @rd.kgio_write "HI2U2\n"
+    @rd.kgio_write "HOW\n"
+    rc = false
+    io, err = Strace.me { rc = @rd.kgio_tryread(666) }
+    @wr.readpartial(666, rbuf)
+    rbuf == "HI2U2\nHOW\n" or warn "rbuf=#{rbuf.inspect} looking bad?"
+    diff = Time.now - t0
+    assert(diff < 0.200, "time diff=#{diff} >= 200ms")
+    assert_equal :wait_readable, rc
+    assert_nil err
+    lines = io.readlines
+    assert_equal 2, lines.grep(/TCP_CORK/).size, lines.inspect
+    assert_nothing_raised { @wr.close }
+    assert_nothing_raised { @rd.close }
+
+    @wr = Kgio::TCPSocket.new(@host, @port)
+    io, err = Strace.me { @rd = @srv.kgio_accept }
+    assert_nil err
+    lines = io.readlines
+    assert lines.grep(/TCP_CORK/).empty?, "optimization fail: #{lines.inspect}"
+    assert_equal 1, @rd.getsockopt(Socket::SOL_TCP, TCP_CORK).unpack("i")[0]
+  end if RUBY_PLATFORM =~ /linux/
+
+  def teardown
+    Kgio.nopush_smart = false
+  end
+end