From e5faa4da78af196ee5abbccf197671fd8e77adad Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Wed, 16 Feb 2011 01:31:40 +0000 Subject: linux: add ipv6 support for inet_diag inet_diag already supports AF_INET6. --- ext/raindrops/linux_inet_diag.c | 109 ++++++++++++++++++--------- test/test_linux_ipv6.rb | 161 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 233 insertions(+), 37 deletions(-) create mode 100644 test/test_linux_ipv6.rb diff --git a/ext/raindrops/linux_inet_diag.c b/ext/raindrops/linux_inet_diag.c index 9006428..a6bf302 100644 --- a/ext/raindrops/linux_inet_diag.c +++ b/ext/raindrops/linux_inet_diag.c @@ -50,6 +50,7 @@ rb_thread_blocking_region( #include #include #include +#include #include #include #include @@ -64,23 +65,18 @@ static size_t page_size; static unsigned g_seq; static VALUE cListenStats; -struct my_addr { - in_addr_t addr; - uint16_t port; -}; - struct listen_stats { long active; long queued; }; #define OPLEN (sizeof(struct inet_diag_bc_op) + \ - sizeof(struct inet_diag_hostcond) + \ - sizeof(in_addr_t)) + sizeof(struct inet_diag_hostcond) + \ + sizeof(struct sockaddr_storage)) struct nogvl_args { struct iovec iov[3]; /* last iov holds inet_diag bytecode */ - struct my_addr query_addr; + struct sockaddr_storage query_addr; struct listen_stats stats; }; @@ -102,21 +98,6 @@ static VALUE rb_listen_stats(struct listen_stats *stats) return rv; } -/* - * converts a base 10 string representing a port number into - * an unsigned 16 bit integer. Raises ArgumentError on failure - */ -static uint16_t my_inet_port(const char *port) -{ - char *err; - unsigned long tmp = strtoul(port, &err, 10); - - if (*err != 0 || tmp > 0xffff) - rb_raise(rb_eArgError, "port not parsable: `%s'\n", port); - - return (uint16_t)tmp; -} - /* inner loop of inet_diag, called for every socket returned by netlink */ static inline void r_acc(struct nogvl_args *args, struct inet_diag_msg *r) { @@ -170,7 +151,7 @@ static VALUE diag(void *ptr) req.nlh.nlmsg_flags = NLM_F_ROOT | NLM_F_MATCH | NLM_F_REQUEST; req.nlh.nlmsg_pid = getpid(); req.nlh.nlmsg_seq = seq; - req.r.idiag_family = AF_INET; + req.r.idiag_family = AF_INET | AF_INET6; req.r.idiag_states = (1<iov[2].iov_len); @@ -236,27 +217,61 @@ out: return (VALUE)err; } -/* populates inet my_addr struct by parsing +addr+ */ -static void parse_addr(struct my_addr *inet, VALUE addr) +/* populates sockaddr_storage struct by parsing +addr+ */ +static void parse_addr(struct sockaddr_storage *inet, VALUE addr) { - char *host_port, *colon; + char *host_ptr; + char *colon = NULL; + char *rbracket = NULL; + long host_len; + struct addrinfo hints; + struct addrinfo *res; + int rc; if (TYPE(addr) != T_STRING) rb_raise(rb_eArgError, "addrs must be an Array of Strings"); - host_port = RSTRING_PTR(addr); - colon = memchr(host_port, ':', RSTRING_LEN(addr)); + host_ptr = StringValueCStr(addr); + host_len = RSTRING_LEN(addr); + if (*host_ptr == '[') { /* ipv6 address format (rfc2732) */ + rbracket = memchr(host_ptr + 1, ']', host_len - 1); + + if (rbracket) { + if (rbracket[1] == ':') { + colon = rbracket + 1; + host_ptr++; + *rbracket = 0; + } else { + rbracket = NULL; + } + } + } else { /* ipv4 */ + colon = memchr(host_ptr, ':', host_len); + } + if (!colon) - rb_raise(rb_eArgError, "port not found in: `%s'", host_port); + rb_raise(rb_eArgError, "port not found in: `%s'", host_ptr); + + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_protocol = IPPROTO_TCP; + hints.ai_flags = AI_NUMERICHOST | AI_NUMERICSERV; *colon = 0; - inet->addr = inet_addr(host_port); + if (rbracket) *rbracket = 0; + rc = getaddrinfo(host_ptr, colon + 1, &hints, &res); *colon = ':'; - inet->port = htons(my_inet_port(colon + 1)); + if (rbracket) *rbracket = ']'; + if (rc != 0) + rb_raise(rb_eArgError, "getaddrinfo(%s): %s", + host_ptr, gai_strerror(rc)); + + memcpy(inet, res->ai_addr, res->ai_addrlen); + freeaddrinfo(res); } /* generates inet_diag bytecode to match a single addr */ -static void gen_bytecode(struct iovec *iov, struct my_addr *inet) +static void gen_bytecode(struct iovec *iov, struct sockaddr_storage *inet) { struct inet_diag_bc_op *op; struct inet_diag_hostcond *cond; @@ -269,10 +284,30 @@ static void gen_bytecode(struct iovec *iov, struct my_addr *inet) op->no = sizeof(struct inet_diag_bc_op) + OPLEN; cond = (struct inet_diag_hostcond *)(op + 1); - cond->family = AF_INET; - cond->port = ntohs(inet->port); - cond->prefix_len = inet->addr == 0 ? 0 : sizeof(in_addr_t) * CHAR_BIT; - *cond->addr = inet->addr; + cond->family = inet->ss_family; + switch (inet->ss_family) { + case AF_INET: { + struct sockaddr_in *in = (struct sockaddr_in *)inet; + + cond->port = ntohs(in->sin_port); + cond->prefix_len = in->sin_addr.s_addr == 0 ? 0 : + sizeof(in->sin_addr.s_addr) * CHAR_BIT; + *cond->addr = in->sin_addr.s_addr; + } + break; + case AF_INET6: { + struct sockaddr_in6 *in6 = (struct sockaddr_in6 *)inet; + + cond->port = ntohs(in6->sin6_port); + cond->prefix_len = memcmp(&in6addr_any, &in6->sin6_addr, + sizeof(struct in6_addr)) == 0 ? + 0 : sizeof(in6->sin6_addr) * CHAR_BIT; + memcpy(&cond->addr, &in6->sin6_addr, sizeof(struct in6_addr)); + } + break; + default: + assert("unsupported address family, could that be IPv7?!"); + } } static VALUE tcp_stats(struct nogvl_args *args, VALUE addr) diff --git a/test/test_linux_ipv6.rb b/test/test_linux_ipv6.rb new file mode 100644 index 0000000..e2886ac --- /dev/null +++ b/test/test_linux_ipv6.rb @@ -0,0 +1,161 @@ +# -*- encoding: binary -*- +require 'test/unit' +require 'tempfile' +require 'raindrops' +require 'socket' +require 'pp' +$stderr.sync = $stdout.sync = true + +begin + tmp = TCPServer.new(ENV["TEST_HOST6"] || '::1', 0) + ipv6_enabled = true +rescue => e + warn "skipping IPv6 tests, host does not seem to be IPv6 enabled:" + warn " #{e.class}: #{e}" + ipv6_enabled = false +end + +class TestLinuxIPv6 < Test::Unit::TestCase + include Raindrops::Linux + + TEST_ADDR = ENV["TEST_HOST6"] || "::1" + + def test_tcp + s = TCPServer.new(TEST_ADDR, 0) + port = s.addr[1] + addr = "[#{TEST_ADDR}]:#{port}" + addrs = [ addr ] + stats = tcp_listener_stats(addrs) + assert_equal 1, stats.size + assert_equal 0, stats[addr].queued + assert_equal 0, stats[addr].active + + c = TCPSocket.new(TEST_ADDR, port) + stats = tcp_listener_stats(addrs) + assert_equal 1, stats.size + assert_equal 1, stats[addr].queued + assert_equal 0, stats[addr].active + + sc = s.accept + stats = tcp_listener_stats(addrs) + assert_equal 1, stats.size + assert_equal 0, stats[addr].queued + assert_equal 1, stats[addr].active + end + + def test_tcp_multi + s1 = TCPServer.new(TEST_ADDR, 0) + s2 = TCPServer.new(TEST_ADDR, 0) + port1, port2 = s1.addr[1], s2.addr[1] + addr1, addr2 = "[#{TEST_ADDR}]:#{port1}", "[#{TEST_ADDR}]:#{port2}" + addrs = [ addr1, addr2 ] + stats = tcp_listener_stats(addrs) + assert_equal 2, stats.size + assert_equal 0, stats[addr1].queued + assert_equal 0, stats[addr1].active + assert_equal 0, stats[addr2].queued + assert_equal 0, stats[addr2].active + + c1 = TCPSocket.new(TEST_ADDR, port1) + stats = tcp_listener_stats(addrs) + assert_equal 2, stats.size + assert_equal 1, stats[addr1].queued + assert_equal 0, stats[addr1].active + assert_equal 0, stats[addr2].queued + assert_equal 0, stats[addr2].active + + sc1 = s1.accept + stats = tcp_listener_stats(addrs) + assert_equal 2, stats.size + assert_equal 0, stats[addr1].queued + assert_equal 1, stats[addr1].active + assert_equal 0, stats[addr2].queued + assert_equal 0, stats[addr2].active + + c2 = TCPSocket.new(TEST_ADDR, port2) + stats = tcp_listener_stats(addrs) + assert_equal 2, stats.size + assert_equal 0, stats[addr1].queued + assert_equal 1, stats[addr1].active + assert_equal 1, stats[addr2].queued + assert_equal 0, stats[addr2].active + + c3 = TCPSocket.new(TEST_ADDR, port2) + stats = tcp_listener_stats(addrs) + assert_equal 2, stats.size + assert_equal 0, stats[addr1].queued + assert_equal 1, stats[addr1].active + assert_equal 2, stats[addr2].queued + assert_equal 0, stats[addr2].active + + sc2 = s2.accept + stats = tcp_listener_stats(addrs) + assert_equal 2, stats.size + assert_equal 0, stats[addr1].queued + assert_equal 1, stats[addr1].active + assert_equal 1, stats[addr2].queued + assert_equal 1, stats[addr2].active + + sc1.close + stats = tcp_listener_stats(addrs) + assert_equal 0, stats[addr1].queued + assert_equal 0, stats[addr1].active + assert_equal 1, stats[addr2].queued + assert_equal 1, stats[addr2].active + end + + # tries to overflow buffers + def test_tcp_stress_test + nr_proc = 32 + nr_sock = 500 + s = TCPServer.new(TEST_ADDR, 0) + port = s.addr[1] + addr = "[#{TEST_ADDR}]:#{port}" + addrs = [ addr ] + rda, wra = IO.pipe + rdb, wrb = IO.pipe + + nr_proc.times do + fork do + rda.close + wrb.close + socks = (1..nr_sock).map { s.accept } + wra.syswrite('.') + wra.close + rdb.sysread(1) # wait for parent to nuke us + end + end + + nr_proc.times do + fork do + rda.close + wrb.close + socks = (1..nr_sock).map { TCPSocket.new(TEST_ADDR, port) } + wra.syswrite('.') + wra.close + rdb.sysread(1) # wait for parent to nuke us + end + end + + assert_equal('.' * (nr_proc * 2), rda.read(nr_proc * 2)) + + rda.close + stats = tcp_listener_stats(addrs) + expect = { addr => Raindrops::ListenStats[nr_sock * nr_proc, 0] } + assert_equal expect, stats + + uno_mas = TCPSocket.new(TEST_ADDR, port) + stats = tcp_listener_stats(addrs) + expect = { addr => Raindrops::ListenStats[nr_sock * nr_proc, 1] } + assert_equal expect, stats + + if ENV["BENCHMARK"].to_i != 0 + require 'benchmark' + puts(Benchmark.measure{1000.times { tcp_listener_stats(addrs) }}) + end + + wrb.syswrite('.' * (nr_proc * 2)) # broadcast a wakeup + statuses = Process.waitall + statuses.each { |(pid,status)| assert status.success?, status.inspect } + end if ENV["STRESS"].to_i != 0 +end if RUBY_PLATFORM =~ /linux/ && ipv6_enabled -- cgit v1.2.3-24-ge0c7