about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2011-02-16 01:31:40 +0000
committerEric Wong <normalperson@yhbt.net>2011-02-15 17:34:31 -0800
commite5faa4da78af196ee5abbccf197671fd8e77adad (patch)
tree574c569dd80160df69afd2c6b5ff7c2a3450ea57
parent5183832d1e70cd89aab1cb8bb2e2795f0ad247c7 (diff)
downloadraindrops-e5faa4da78af196ee5abbccf197671fd8e77adad.tar.gz
inet_diag already supports AF_INET6.
-rw-r--r--ext/raindrops/linux_inet_diag.c109
-rw-r--r--test/test_linux_ipv6.rb161
2 files changed, 233 insertions, 37 deletions
diff --git a/ext/raindrops/linux_inet_diag.c b/ext/raindrops/linux_inet_diag.c
index 9006428..a6bf302 100644
--- a/ext/raindrops/linux_inet_diag.c
+++ b/ext/raindrops/linux_inet_diag.c
@@ -50,6 +50,7 @@ rb_thread_blocking_region(
 #include <errno.h>
 #include <sys/socket.h>
 #include <sys/types.h>
+#include <netdb.h>
 #include <unistd.h>
 #include <string.h>
 #include <asm/types.h>
@@ -64,23 +65,18 @@ static size_t page_size;
 static unsigned g_seq;
 static VALUE cListenStats;
 
-struct my_addr {
-        in_addr_t addr;
-        uint16_t port;
-};
-
 struct listen_stats {
         long active;
         long queued;
 };
 
 #define OPLEN (sizeof(struct inet_diag_bc_op) + \
-               sizeof(struct inet_diag_hostcond) + \
-               sizeof(in_addr_t))
+               sizeof(struct inet_diag_hostcond) + \
+               sizeof(struct sockaddr_storage))
 
 struct nogvl_args {
         struct iovec iov[3]; /* last iov holds inet_diag bytecode */
-        struct my_addr query_addr;
+        struct sockaddr_storage query_addr;
         struct listen_stats stats;
 };
 
@@ -102,21 +98,6 @@ static VALUE rb_listen_stats(struct listen_stats *stats)
         return rv;
 }
 
-/*
- * converts a base 10 string representing a port number into
- * an unsigned 16 bit integer.  Raises ArgumentError on failure
- */
-static uint16_t my_inet_port(const char *port)
-{
-        char *err;
-        unsigned long tmp = strtoul(port, &err, 10);
-
-        if (*err != 0 || tmp > 0xffff)
-                rb_raise(rb_eArgError, "port not parsable: `%s'\n", port);
-
-        return (uint16_t)tmp;
-}
-
 /* inner loop of inet_diag, called for every socket returned by netlink */
 static inline void r_acc(struct nogvl_args *args, struct inet_diag_msg *r)
 {
@@ -170,7 +151,7 @@ static VALUE diag(void *ptr)
         req.nlh.nlmsg_flags = NLM_F_ROOT | NLM_F_MATCH | NLM_F_REQUEST;
         req.nlh.nlmsg_pid = getpid();
         req.nlh.nlmsg_seq = seq;
-        req.r.idiag_family = AF_INET;
+        req.r.idiag_family = AF_INET | AF_INET6;
         req.r.idiag_states = (1<<TCP_ESTABLISHED) | (1<<TCP_LISTEN);
         rta.rta_type = INET_DIAG_REQ_BYTECODE;
         rta.rta_len = RTA_LENGTH(args->iov[2].iov_len);
@@ -236,27 +217,61 @@ out:
         return (VALUE)err;
 }
 
-/* populates inet my_addr struct by parsing +addr+ */
-static void parse_addr(struct my_addr *inet, VALUE addr)
+/* populates sockaddr_storage struct by parsing +addr+ */
+static void parse_addr(struct sockaddr_storage *inet, VALUE addr)
 {
-        char *host_port, *colon;
+        char *host_ptr;
+        char *colon = NULL;
+        char *rbracket = NULL;
+        long host_len;
+        struct addrinfo hints;
+        struct addrinfo *res;
+        int rc;
 
         if (TYPE(addr) != T_STRING)
                 rb_raise(rb_eArgError, "addrs must be an Array of Strings");
 
-        host_port = RSTRING_PTR(addr);
-        colon = memchr(host_port, ':', RSTRING_LEN(addr));
+        host_ptr = StringValueCStr(addr);
+        host_len = RSTRING_LEN(addr);
+        if (*host_ptr == '[') { /* ipv6 address format (rfc2732) */
+                rbracket = memchr(host_ptr + 1, ']', host_len - 1);
+
+                if (rbracket) {
+                        if (rbracket[1] == ':') {
+                                colon = rbracket + 1;
+                                host_ptr++;
+                                *rbracket = 0;
+                        } else {
+                                rbracket = NULL;
+                        }
+                }
+        } else { /* ipv4 */
+                colon = memchr(host_ptr, ':', host_len);
+        }
+
         if (!colon)
-                rb_raise(rb_eArgError, "port not found in: `%s'", host_port);
+                rb_raise(rb_eArgError, "port not found in: `%s'", host_ptr);
+
+        hints.ai_family = AF_UNSPEC;
+        hints.ai_socktype = SOCK_STREAM;
+        hints.ai_protocol = IPPROTO_TCP;
+        hints.ai_flags = AI_NUMERICHOST | AI_NUMERICSERV;
 
         *colon = 0;
-        inet->addr = inet_addr(host_port);
+        if (rbracket) *rbracket = 0;
+        rc = getaddrinfo(host_ptr, colon + 1, &hints, &res);
         *colon = ':';
-        inet->port = htons(my_inet_port(colon + 1));
+        if (rbracket) *rbracket = ']';
+        if (rc != 0)
+                rb_raise(rb_eArgError, "getaddrinfo(%s): %s",
+                         host_ptr, gai_strerror(rc));
+
+        memcpy(inet, res->ai_addr, res->ai_addrlen);
+        freeaddrinfo(res);
 }
 
 /* generates inet_diag bytecode to match a single addr */
-static void gen_bytecode(struct iovec *iov, struct my_addr *inet)
+static void gen_bytecode(struct iovec *iov, struct sockaddr_storage *inet)
 {
         struct inet_diag_bc_op *op;
         struct inet_diag_hostcond *cond;
@@ -269,10 +284,30 @@ static void gen_bytecode(struct iovec *iov, struct my_addr *inet)
         op->no = sizeof(struct inet_diag_bc_op) + OPLEN;
 
         cond = (struct inet_diag_hostcond *)(op + 1);
-        cond->family = AF_INET;
-        cond->port = ntohs(inet->port);
-        cond->prefix_len = inet->addr == 0 ? 0 : sizeof(in_addr_t) * CHAR_BIT;
-        *cond->addr = inet->addr;
+        cond->family = inet->ss_family;
+        switch (inet->ss_family) {
+        case AF_INET: {
+                struct sockaddr_in *in = (struct sockaddr_in *)inet;
+
+                cond->port = ntohs(in->sin_port);
+                cond->prefix_len = in->sin_addr.s_addr == 0 ? 0 :
+                                   sizeof(in->sin_addr.s_addr) * CHAR_BIT;
+                *cond->addr = in->sin_addr.s_addr;
+                }
+                break;
+        case AF_INET6: {
+                struct sockaddr_in6 *in6 = (struct sockaddr_in6 *)inet;
+
+                cond->port = ntohs(in6->sin6_port);
+                cond->prefix_len = memcmp(&in6addr_any, &in6->sin6_addr,
+                                          sizeof(struct in6_addr)) == 0 ?
+                                  0 : sizeof(in6->sin6_addr) * CHAR_BIT;
+                memcpy(&cond->addr, &in6->sin6_addr, sizeof(struct in6_addr));
+                }
+                break;
+        default:
+                assert("unsupported address family, could that be IPv7?!");
+        }
 }
 
 static VALUE tcp_stats(struct nogvl_args *args, VALUE addr)
diff --git a/test/test_linux_ipv6.rb b/test/test_linux_ipv6.rb
new file mode 100644
index 0000000..e2886ac
--- /dev/null
+++ b/test/test_linux_ipv6.rb
@@ -0,0 +1,161 @@
+# -*- encoding: binary -*-
+require 'test/unit'
+require 'tempfile'
+require 'raindrops'
+require 'socket'
+require 'pp'
+$stderr.sync = $stdout.sync = true
+
+begin
+  tmp = TCPServer.new(ENV["TEST_HOST6"] || '::1', 0)
+  ipv6_enabled = true
+rescue => e
+  warn "skipping IPv6 tests, host does not seem to be IPv6 enabled:"
+  warn "  #{e.class}: #{e}"
+  ipv6_enabled = false
+end
+
+class TestLinuxIPv6 < Test::Unit::TestCase
+  include Raindrops::Linux
+
+  TEST_ADDR = ENV["TEST_HOST6"] || "::1"
+
+  def test_tcp
+    s = TCPServer.new(TEST_ADDR, 0)
+    port = s.addr[1]
+    addr = "[#{TEST_ADDR}]:#{port}"
+    addrs = [ addr ]
+    stats = tcp_listener_stats(addrs)
+    assert_equal 1, stats.size
+    assert_equal 0, stats[addr].queued
+    assert_equal 0, stats[addr].active
+
+    c = TCPSocket.new(TEST_ADDR, port)
+    stats = tcp_listener_stats(addrs)
+    assert_equal 1, stats.size
+    assert_equal 1, stats[addr].queued
+    assert_equal 0, stats[addr].active
+
+    sc = s.accept
+    stats = tcp_listener_stats(addrs)
+    assert_equal 1, stats.size
+    assert_equal 0, stats[addr].queued
+    assert_equal 1, stats[addr].active
+  end
+
+  def test_tcp_multi
+    s1 = TCPServer.new(TEST_ADDR, 0)
+    s2 = TCPServer.new(TEST_ADDR, 0)
+    port1, port2 = s1.addr[1], s2.addr[1]
+    addr1, addr2 = "[#{TEST_ADDR}]:#{port1}", "[#{TEST_ADDR}]:#{port2}"
+    addrs = [ addr1, addr2 ]
+    stats = tcp_listener_stats(addrs)
+    assert_equal 2, stats.size
+    assert_equal 0, stats[addr1].queued
+    assert_equal 0, stats[addr1].active
+    assert_equal 0, stats[addr2].queued
+    assert_equal 0, stats[addr2].active
+
+    c1 = TCPSocket.new(TEST_ADDR, port1)
+    stats = tcp_listener_stats(addrs)
+    assert_equal 2, stats.size
+    assert_equal 1, stats[addr1].queued
+    assert_equal 0, stats[addr1].active
+    assert_equal 0, stats[addr2].queued
+    assert_equal 0, stats[addr2].active
+
+    sc1 = s1.accept
+    stats = tcp_listener_stats(addrs)
+    assert_equal 2, stats.size
+    assert_equal 0, stats[addr1].queued
+    assert_equal 1, stats[addr1].active
+    assert_equal 0, stats[addr2].queued
+    assert_equal 0, stats[addr2].active
+
+    c2 = TCPSocket.new(TEST_ADDR, port2)
+    stats = tcp_listener_stats(addrs)
+    assert_equal 2, stats.size
+    assert_equal 0, stats[addr1].queued
+    assert_equal 1, stats[addr1].active
+    assert_equal 1, stats[addr2].queued
+    assert_equal 0, stats[addr2].active
+
+    c3 = TCPSocket.new(TEST_ADDR, port2)
+    stats = tcp_listener_stats(addrs)
+    assert_equal 2, stats.size
+    assert_equal 0, stats[addr1].queued
+    assert_equal 1, stats[addr1].active
+    assert_equal 2, stats[addr2].queued
+    assert_equal 0, stats[addr2].active
+
+    sc2 = s2.accept
+    stats = tcp_listener_stats(addrs)
+    assert_equal 2, stats.size
+    assert_equal 0, stats[addr1].queued
+    assert_equal 1, stats[addr1].active
+    assert_equal 1, stats[addr2].queued
+    assert_equal 1, stats[addr2].active
+
+    sc1.close
+    stats = tcp_listener_stats(addrs)
+    assert_equal 0, stats[addr1].queued
+    assert_equal 0, stats[addr1].active
+    assert_equal 1, stats[addr2].queued
+    assert_equal 1, stats[addr2].active
+  end
+
+  # tries to overflow buffers
+  def test_tcp_stress_test
+    nr_proc = 32
+    nr_sock = 500
+    s = TCPServer.new(TEST_ADDR, 0)
+    port = s.addr[1]
+    addr = "[#{TEST_ADDR}]:#{port}"
+    addrs = [ addr ]
+    rda, wra = IO.pipe
+    rdb, wrb = IO.pipe
+
+    nr_proc.times do
+      fork do
+        rda.close
+        wrb.close
+        socks = (1..nr_sock).map { s.accept }
+        wra.syswrite('.')
+        wra.close
+        rdb.sysread(1) # wait for parent to nuke us
+      end
+    end
+
+    nr_proc.times do
+      fork do
+        rda.close
+        wrb.close
+        socks = (1..nr_sock).map { TCPSocket.new(TEST_ADDR, port) }
+        wra.syswrite('.')
+        wra.close
+        rdb.sysread(1) # wait for parent to nuke us
+      end
+    end
+
+    assert_equal('.' * (nr_proc * 2), rda.read(nr_proc * 2))
+
+    rda.close
+    stats = tcp_listener_stats(addrs)
+    expect = { addr => Raindrops::ListenStats[nr_sock * nr_proc, 0] }
+    assert_equal expect, stats
+
+    uno_mas = TCPSocket.new(TEST_ADDR, port)
+    stats = tcp_listener_stats(addrs)
+    expect = { addr => Raindrops::ListenStats[nr_sock * nr_proc, 1] }
+    assert_equal expect, stats
+
+    if ENV["BENCHMARK"].to_i != 0
+      require 'benchmark'
+      puts(Benchmark.measure{1000.times { tcp_listener_stats(addrs) }})
+    end
+
+    wrb.syswrite('.' * (nr_proc * 2)) # broadcast a wakeup
+    statuses = Process.waitall
+    statuses.each { |(pid,status)| assert status.success?, status.inspect }
+  end if ENV["STRESS"].to_i != 0
+end if RUBY_PLATFORM =~ /linux/ && ipv6_enabled