diff options
Diffstat (limited to 'ext/raindrops')
-rw-r--r-- | ext/raindrops/extconf.rb | 11 | ||||
-rw-r--r-- | ext/raindrops/linux_inet_diag.c | 342 | ||||
-rw-r--r-- | ext/raindrops/raindrops.c | 192 |
3 files changed, 545 insertions, 0 deletions
diff --git a/ext/raindrops/extconf.rb b/ext/raindrops/extconf.rb new file mode 100644 index 0000000..d637287 --- /dev/null +++ b/ext/raindrops/extconf.rb @@ -0,0 +1,11 @@ +require 'mkmf' + +# FIXME: test for GCC __sync_XXX builtins here, somehow... +have_func('mmap', 'sys/mman.h') or abort 'mmap() not found' +have_func('munmap', 'sys/mman.h') or abort 'munmap() not found' + +have_func("rb_struct_alloc_noinit") +have_func('rb_thread_blocking_region') + +dir_config('raindrops') +create_makefile('raindrops_ext') diff --git a/ext/raindrops/linux_inet_diag.c b/ext/raindrops/linux_inet_diag.c new file mode 100644 index 0000000..315844e --- /dev/null +++ b/ext/raindrops/linux_inet_diag.c @@ -0,0 +1,342 @@ +#include <ruby.h> + +/* Ruby 1.8.6+ macros (for compatibility with Ruby 1.9) */ +#ifndef RSTRING_PTR +# define RSTRING_PTR(s) (RSTRING(s)->ptr) +#endif +#ifndef RSTRING_LEN +# define RSTRING_LEN(s) (RSTRING(s)->len) +#endif +#ifndef RSTRUCT_PTR +# define RSTRUCT_PTR(s) (RSTRUCT(s)->ptr) +#endif +#ifndef RSTRUCT_LEN +# define RSTRUCT_LEN(s) (RSTRUCT(s)->len) +#endif + +#ifndef HAVE_RB_STRUCT_ALLOC_NOINIT +static ID id_new; +static VALUE rb_struct_alloc_noinit(VALUE class) +{ + return rb_funcall(class, id_new, 0, 0); +} +#endif /* !defined(HAVE_RB_STRUCT_ALLOC_NOINIT) */ + +/* partial emulation of the 1.9 rb_thread_blocking_region under 1.8 */ +#ifndef HAVE_RB_THREAD_BLOCKING_REGION +# include <rubysig.h> +# define RUBY_UBF_IO ((rb_unblock_function_t *)-1) +typedef void rb_unblock_function_t(void *); +typedef VALUE rb_blocking_function_t(void *); +static VALUE +rb_thread_blocking_region( + rb_blocking_function_t *func, void *data1, + rb_unblock_function_t *ubf, void *data2) +{ + VALUE rv; + + TRAP_BEG; + rv = func(data1); + TRAP_END; + + return rv; +} +#endif /* ! HAVE_RB_THREAD_BLOCKING_REGION */ + +#include <assert.h> +#include <errno.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <unistd.h> +#include <string.h> +#include <asm/types.h> +#include <netinet/in.h> +#include <arpa/inet.h> +#include <netinet/tcp.h> +#include <linux/netlink.h> +#include <linux/rtnetlink.h> +#include <linux/inet_diag.h> + +static size_t page_size; +static unsigned g_seq; +static VALUE cListenStats; + +struct my_addr { + in_addr_t addr; + uint16_t port; +}; + +struct listen_stats { + long active; + long queued; +}; + +#define OPLEN (sizeof(struct inet_diag_bc_op) + \ + sizeof(struct inet_diag_hostcond) + \ + sizeof(in_addr_t)) + +struct nogvl_args { + struct iovec iov[3]; /* last iov holds inet_diag bytecode */ + struct my_addr addrs; + struct listen_stats stats; +}; + +/* creates a Ruby ListenStats Struct based on our internal listen_stats */ +static VALUE rb_listen_stats(struct listen_stats *stats) +{ + VALUE rv = rb_struct_alloc_noinit(cListenStats); + VALUE *ptr = RSTRUCT_PTR(rv); + + ptr[0] = LONG2NUM(stats->active); + ptr[1] = LONG2NUM(stats->queued); + + return rv; +} + +/* + * converts a base 10 string representing a port number into + * an unsigned 16 bit integer. Raises ArgumentError on failure + */ +static uint16_t my_inet_port(const char *port) +{ + char *err; + unsigned long tmp = strtoul(port, &err, 10); + + if (*err != 0 || tmp > 0xffff) + rb_raise(rb_eArgError, "port not parsable: `%s'\n", port); + + return (uint16_t)tmp; +} + +/* inner loop of inet_diag, called for every socket returned by netlink */ +static inline void r_acc(struct nogvl_args *args, struct inet_diag_msg *r) +{ + /* + * inode == 0 means the connection is still in the listen queue + * and has not yet been accept()-ed by the server. The + * inet_diag bytecode cannot filter this for us. + */ + if (r->idiag_inode == 0) + return; + if (r->idiag_state == TCP_ESTABLISHED) + args->stats.active++; + else /* if (r->idiag_state == TCP_LISTEN) */ + args->stats.queued = r->idiag_rqueue; + /* + * we wont get anything else because of the idiag_states filter + */ +} + +static const char err_socket[] = "socket"; +static const char err_sendmsg[] = "sendmsg"; +static const char err_recvmsg[] = "recvmsg"; +static const char err_nlmsg[] = "nlmsg"; + +/* does the inet_diag stuff with netlink(), this is called w/o GVL */ +static VALUE diag(void *ptr) +{ + struct nogvl_args *args = ptr; + struct sockaddr_nl nladdr; + struct rtattr rta; + struct { + struct nlmsghdr nlh; + struct inet_diag_req r; + } req; + struct msghdr msg; + const char *err = NULL; + unsigned seq = __sync_add_and_fetch(&g_seq, 1); + int fd = socket(AF_NETLINK, SOCK_RAW, NETLINK_INET_DIAG); + + if (fd < 0) + return (VALUE)err_socket; + + memset(&args->stats, 0, sizeof(struct listen_stats)); + + memset(&nladdr, 0, sizeof(nladdr)); + nladdr.nl_family = AF_NETLINK; + + memset(&req, 0, sizeof(req)); + req.nlh.nlmsg_len = sizeof(req) + RTA_LENGTH(args->iov[2].iov_len); + req.nlh.nlmsg_type = TCPDIAG_GETSOCK; + req.nlh.nlmsg_flags = NLM_F_ROOT | NLM_F_MATCH | NLM_F_REQUEST; + req.nlh.nlmsg_pid = getpid(); + req.nlh.nlmsg_seq = seq; + req.r.idiag_family = AF_INET; + req.r.idiag_states = (1<<TCP_ESTABLISHED) | (1<<TCP_LISTEN); + rta.rta_type = INET_DIAG_REQ_BYTECODE; + rta.rta_len = RTA_LENGTH(args->iov[2].iov_len); + + args->iov[0].iov_base = &req; + args->iov[0].iov_len = sizeof(req); + args->iov[1].iov_base = &rta; + args->iov[1].iov_len = sizeof(rta); + + memset(&msg, 0, sizeof(msg)); + msg.msg_name = (void *)&nladdr; + msg.msg_namelen = sizeof(nladdr); + msg.msg_iov = args->iov; + msg.msg_iovlen = 3; + + if (sendmsg(fd, &msg, 0) < 0) { + err = err_sendmsg; + goto out; + } + + /* reuse buffer that was allocated for bytecode */ + args->iov[0].iov_len = page_size; + args->iov[0].iov_base = args->iov[2].iov_base; + + while (1) { + ssize_t readed; + struct nlmsghdr *h = (struct nlmsghdr *)args->iov[0].iov_base; + + memset(&msg, 0, sizeof(msg)); + msg.msg_name = (void *)&nladdr; + msg.msg_namelen = sizeof(nladdr); + msg.msg_iov = args->iov; + msg.msg_iovlen = 1; + + readed = recvmsg(fd, &msg, 0); + if (readed < 0) { + if (errno == EINTR) + continue; + err = err_recvmsg; + goto out; + } + if (readed == 0) + goto out; + + for ( ; NLMSG_OK(h, readed); h = NLMSG_NEXT(h, readed)) { + if (h->nlmsg_seq != seq) + continue; + if (h->nlmsg_type == NLMSG_DONE) + goto out; + if (h->nlmsg_type == NLMSG_ERROR) { + err = err_nlmsg; + goto out; + } + r_acc(args, NLMSG_DATA(h)); + } + } +out: + { + int save_errno = errno; + close(fd); + errno = save_errno; + } + return (VALUE)err; +} + +/* populates inet my_addr struct by parsing +addr+ */ +static void parse_addr(struct my_addr *inet, VALUE addr) +{ + char *host_port, *colon; + + if (TYPE(addr) != T_STRING) + rb_raise(rb_eArgError, "addrs must be an Array of Strings"); + + host_port = RSTRING_PTR(addr); + colon = memchr(host_port, ':', RSTRING_LEN(addr)); + if (!colon) + rb_raise(rb_eArgError, "port not found in: `%s'", host_port); + + *colon = 0; + inet->addr = inet_addr(host_port); + *colon = ':'; + inet->port = htons(my_inet_port(colon + 1)); +} + +/* generates inet_diag bytecode to match a single addr */ +static void gen_bytecode(struct iovec *iov, struct my_addr *inet) +{ + struct inet_diag_bc_op *op; + struct inet_diag_hostcond *cond; + + /* iov_len was already set and base allocated in a parent function */ + assert(iov->iov_len == OPLEN && iov->iov_base && "iov invalid"); + op = iov->iov_base; + op->code = INET_DIAG_BC_S_COND; + op->yes = OPLEN; + op->no = sizeof(struct inet_diag_bc_op) + OPLEN; + + cond = (struct inet_diag_hostcond *)(op + 1); + cond->family = AF_INET; + cond->port = ntohs(inet->port); + cond->prefix_len = inet->addr == 0 ? 0 : sizeof(in_addr_t) * CHAR_BIT; + *cond->addr = inet->addr; +} + +static VALUE tcp_stats(struct nogvl_args *args, VALUE addr) +{ + const char *err; + VALUE verr; + + parse_addr(&args->addrs, addr); + gen_bytecode(&args->iov[2], &args->addrs); + + verr = rb_thread_blocking_region(diag, args, RUBY_UBF_IO, 0); + err = (const char *)verr; + if (err) { + if (err == err_nlmsg) + rb_raise(rb_eRuntimeError, "NLMSG_ERROR"); + else + rb_sys_fail(err); + } + + return rb_listen_stats(&args->stats); +} + +/* + * call-seq: + * addrs = %w(0.0.0.0:80 127.0.0.1:8080) + * Raindrops::Linux.tcp_listener_stats(addrs) => hash + * + * Takes an array of strings representing listen addresses to filter for. + * Returns a hash with given addresses as keys and ListenStats + * objects as the values. + */ +static VALUE tcp_listener_stats(VALUE obj, VALUE addrs) +{ + VALUE *ary; + long i; + VALUE rv; + struct nogvl_args args; + + /* + * allocating page_size instead of OP_LEN since we'll reuse the + * buffer for recvmsg() later, we already checked for + * OPLEN <= page_size at initialization + */ + args.iov[2].iov_len = OPLEN; + args.iov[2].iov_base = alloca(page_size); + + if (TYPE(addrs) != T_ARRAY) + rb_raise(rb_eArgError, "addrs must be an Array or String"); + + rv = rb_hash_new(); + ary = RARRAY_PTR(addrs); + for (i = RARRAY_LEN(addrs); --i >= 0; ary++) + rb_hash_aset(rv, *ary, tcp_stats(&args, *ary)); + + return rv; +} + +void Init_raindrops_linux_inet_diag(void) +{ + VALUE cRaindrops = rb_const_get(rb_cObject, rb_intern("Raindrops")); + VALUE mLinux = rb_define_module_under(cRaindrops, "Linux"); + + cListenStats = rb_const_get(cRaindrops, rb_intern("ListenStats")); + + rb_define_module_function(mLinux, "tcp_listener_stats", + tcp_listener_stats, 1); + +#ifndef HAVE_RB_STRUCT_ALLOC_NOINIT + id_new = rb_intern("new"); +#endif + rb_require("raindrops/linux"); + + page_size = getpagesize(); + + assert(OPLEN <= page_size && "bytecode OPLEN is no <= PAGE_SIZE"); +} diff --git a/ext/raindrops/raindrops.c b/ext/raindrops/raindrops.c new file mode 100644 index 0000000..65e3947 --- /dev/null +++ b/ext/raindrops/raindrops.c @@ -0,0 +1,192 @@ +#include <ruby.h> +#include <sys/mman.h> +#include <assert.h> +#include <errno.h> +#include <stddef.h> + +/* + * most modern CPUs have a cache-line size of 64 or 128. + * We choose a bigger one by default since our structure is not + * heavily used + */ +#ifndef CACHE_LINE_SIZE +# define CACHE_LINE_SIZE 128 +#endif + +/* each raindrop is a counter */ +struct raindrop { + union { + unsigned long counter; + unsigned char padding[CACHE_LINE_SIZE]; + } as; +} __attribute__((packed)); + +/* allow mmap-ed regions can store more than one raindrop */ +struct raindrops { + long size; + struct raindrop *drops; +}; + +/* called by GC */ +static void evaporate(void *ptr) +{ + struct raindrops *r = ptr; + + if (r->drops) { + int rv = munmap(r->drops, sizeof(struct raindrop) * r->size); + if (rv != 0) + rb_bug("munmap failed in gc: %s", strerror(errno)); + } + + xfree(ptr); +} + +/* automatically called at creation (before initialize) */ +static VALUE alloc(VALUE klass) +{ + struct raindrops *r; + + return Data_Make_Struct(klass, struct raindrops, NULL, evaporate, r); +} + +static struct raindrops *get(VALUE self) +{ + struct raindrops *r; + + Data_Get_Struct(self, struct raindrops, r); + + return r; +} + +/* initializes a Raindrops object to hold +size+ elements */ +static VALUE init(VALUE self, VALUE size) +{ + struct raindrops *r = get(self); + int tries = 1; + + if (r->drops) + rb_raise(rb_eRuntimeError, "already initialized"); + + r->size = NUM2LONG(size); + if (r->size < 1) + rb_raise(rb_eArgError, "size must be >= 1"); + +retry: + r->drops = mmap(NULL, sizeof(struct raindrop) * r->size, + PROT_READ|PROT_WRITE, MAP_ANON|MAP_SHARED, -1, 0); + if (r->drops == MAP_FAILED) { + if ((errno == EAGAIN || errno == ENOMEM) && tries-- > 0) { + rb_gc(); + goto retry; + } + rb_sys_fail("mmap"); + } + + return self; +} + +/* :nodoc */ +static VALUE init_copy(VALUE dest, VALUE source) +{ + struct raindrops *dst = get(dest); + struct raindrops *src = get(source); + + init(dest, LONG2NUM(src->size)); + memcpy(dst->drops, src->drops, sizeof(struct raindrop) * src->size); + + return dest; +} + +static unsigned long *addr_of(VALUE self, VALUE index) +{ + struct raindrops *r = get(self); + unsigned long off = FIX2ULONG(index) * sizeof(struct raindrop); + + if (off >= sizeof(struct raindrop) * r->size) + rb_raise(rb_eArgError, "offset overrun"); + + return (unsigned long *)((unsigned long)r->drops + off); +} + +static unsigned long incr_decr_arg(int argc, const VALUE *argv) +{ + if (argc > 2 || argc < 1) + rb_raise(rb_eArgError, + "wrong number of arguments (%d for 1+)", argc); + + return argc == 2 ? NUM2ULONG(argv[1]) : 1; +} + +/* increments the value referred to by the +index+ constant by 1 */ +static VALUE incr(int argc, VALUE *argv, VALUE self) +{ + unsigned long nr = incr_decr_arg(argc, argv); + + return ULONG2NUM(__sync_add_and_fetch(addr_of(self, argv[0]), nr)); +} + +/* decrements the value referred to by the +index+ constant by 1 */ +static VALUE decr(int argc, VALUE *argv, VALUE self) +{ + unsigned long nr = incr_decr_arg(argc, argv); + + return ULONG2NUM(__sync_sub_and_fetch(addr_of(self, argv[0]), nr)); +} + +/* converts the raindrops structure to an Array */ +static VALUE to_ary(VALUE self) +{ + struct raindrops *r = get(self); + VALUE rv = rb_ary_new2(r->size); + long i; + unsigned long base = (unsigned long)r->drops; + + for (i = 0; i < r->size; i++) { + rb_ary_push(rv, ULONG2NUM(*((unsigned long *)base))); + base += sizeof(struct raindrop); + } + + return rv; +} + +static VALUE size(VALUE self) +{ + return LONG2NUM(get(self)->size); +} + +static VALUE aset(VALUE self, VALUE index, VALUE value) +{ + unsigned long *addr = addr_of(self, index); + + *addr = NUM2ULONG(value); + + return value; +} + +static VALUE aref(VALUE self, VALUE index) +{ + return ULONG2NUM(*addr_of(self, index)); +} + +#ifdef __linux__ +void Init_raindrops_linux_inet_diag(void); +#endif + +void Init_raindrops_ext(void) +{ + VALUE cRaindrops = rb_define_class("Raindrops", rb_cObject); + rb_define_alloc_func(cRaindrops, alloc); + + rb_define_method(cRaindrops, "initialize", init, 1); + rb_define_method(cRaindrops, "incr", incr, -1); + rb_define_method(cRaindrops, "decr", decr, -1); + rb_define_method(cRaindrops, "to_ary", to_ary, 0); + rb_define_method(cRaindrops, "[]", aref, 1); + rb_define_method(cRaindrops, "[]=", aset, 2); + rb_define_method(cRaindrops, "size", size, 0); + rb_define_method(cRaindrops, "initialize_copy", init_copy, 1); + +#ifdef __linux__ + Init_raindrops_linux_inet_diag(); +#endif +} |