about summary refs log tree commit homepage
path: root/ext/kgio/accept.c
diff options
context:
space:
mode:
Diffstat (limited to 'ext/kgio/accept.c')
-rw-r--r--ext/kgio/accept.c371
1 files changed, 371 insertions, 0 deletions
diff --git a/ext/kgio/accept.c b/ext/kgio/accept.c
new file mode 100644
index 0000000..e1d1541
--- /dev/null
+++ b/ext/kgio/accept.c
@@ -0,0 +1,371 @@
+#include "kgio.h"
+#include "missing/accept4.h"
+#include "sock_for_fd.h"
+
+static VALUE localhost;
+static VALUE cClientSocket;
+static VALUE cKgio_Socket;
+static VALUE mSocketMethods;
+static VALUE iv_kgio_addr;
+
+#if defined(__linux__)
+static int accept4_flags = SOCK_CLOEXEC;
+#else /* ! linux */
+static int accept4_flags = SOCK_CLOEXEC | SOCK_NONBLOCK;
+#endif /* ! linux */
+
+struct accept_args {
+        int fd;
+        struct sockaddr *addr;
+        socklen_t *addrlen;
+};
+
+static VALUE set_accepted(VALUE klass, VALUE aclass)
+{
+        VALUE tmp;
+
+        if (NIL_P(aclass))
+                aclass = cKgio_Socket;
+
+        tmp = rb_funcall(aclass, rb_intern("included_modules"), 0, 0);
+        tmp = rb_funcall(tmp, rb_intern("include?"), 1, mSocketMethods);
+
+        if (tmp != Qtrue)
+                rb_raise(rb_eTypeError,
+                         "class must include Kgio::SocketMethods");
+
+        cClientSocket = aclass;
+
+        return aclass;
+}
+
+static VALUE get_accepted(VALUE klass)
+{
+        return cClientSocket;
+}
+
+static VALUE xaccept(void *ptr)
+{
+        struct accept_args *a = ptr;
+
+        return (VALUE)accept4(a->fd, a->addr, a->addrlen, accept4_flags);
+}
+
+#ifdef HAVE_RB_THREAD_BLOCKING_REGION
+#  include <time.h>
+/*
+ * Try to use a (real) blocking accept() since that can prevent
+ * thundering herds under Linux:
+ * http://www.citi.umich.edu/projects/linux-scalability/reports/accept.html
+ *
+ * So we periodically disable non-blocking, but not too frequently
+ * because other processes may set non-blocking (especially during
+ * a process upgrade) with Rainbows! concurrency model changes.
+ */
+static int thread_accept(struct accept_args *a, int force_nonblock)
+{
+        if (force_nonblock)
+                set_nonblocking(a->fd);
+        return (int)rb_thread_blocking_region(xaccept, a, RUBY_UBF_IO, 0);
+}
+
+static void set_blocking_or_block(int fd)
+{
+        static time_t last_set_blocking;
+        time_t now = time(NULL);
+
+        if (last_set_blocking == 0) {
+                last_set_blocking = now;
+                (void)rb_io_wait_readable(fd);
+        } else if ((now - last_set_blocking) <= 5) {
+                (void)rb_io_wait_readable(fd);
+        } else {
+                int flags = fcntl(fd, F_GETFL);
+                if (flags == -1)
+                        rb_sys_fail("fcntl(F_GETFL)");
+                if (flags & O_NONBLOCK) {
+                        flags = fcntl(fd, F_SETFL, flags & ~O_NONBLOCK);
+                        if (flags == -1)
+                                rb_sys_fail("fcntl(F_SETFL)");
+                }
+                last_set_blocking = now;
+        }
+}
+#else /* ! HAVE_RB_THREAD_BLOCKING_REGION */
+#  include <rubysig.h>
+static int thread_accept(struct accept_args *a, int force_nonblock)
+{
+        int rv;
+
+        /* always use non-blocking accept() under 1.8 for green threads */
+        set_nonblocking(a->fd);
+        TRAP_BEG;
+        rv = (int)xaccept(a);
+        TRAP_END;
+        return rv;
+}
+#define set_blocking_or_block(fd) (void)rb_io_wait_readable(fd)
+#endif /* ! HAVE_RB_THREAD_BLOCKING_REGION */
+
+static VALUE
+my_accept(VALUE io, struct sockaddr *addr, socklen_t *addrlen, int nonblock)
+{
+        int client;
+        struct accept_args a;
+
+        a.fd = my_fileno(io);
+        a.addr = addr;
+        a.addrlen = addrlen;
+retry:
+        client = thread_accept(&a, nonblock);
+        if (client == -1) {
+                switch (errno) {
+                case EAGAIN:
+                        if (nonblock)
+                                return Qnil;
+                        set_blocking_or_block(a.fd);
+#ifdef ECONNABORTED
+                case ECONNABORTED:
+#endif /* ECONNABORTED */
+#ifdef EPROTO
+                case EPROTO:
+#endif /* EPROTO */
+                case EINTR:
+                        goto retry;
+                case ENOMEM:
+                case EMFILE:
+                case ENFILE:
+#ifdef ENOBUFS
+                case ENOBUFS:
+#endif /* ENOBUFS */
+                        errno = 0;
+                        rb_gc();
+                        client = thread_accept(&a, nonblock);
+                }
+                if (client == -1) {
+                        if (errno == EINTR)
+                                goto retry;
+                        rb_sys_fail("accept");
+                }
+        }
+        return sock_for_fd(cClientSocket, client);
+}
+
+static void in_addr_set(VALUE io, struct sockaddr_in *addr)
+{
+        VALUE host = rb_str_new(0, INET_ADDRSTRLEN);
+        socklen_t addrlen = (socklen_t)INET_ADDRSTRLEN;
+        const char *name;
+
+        name = inet_ntop(AF_INET, &addr->sin_addr, RSTRING_PTR(host), addrlen);
+        if (name == NULL)
+                rb_sys_fail("inet_ntop");
+        rb_str_set_len(host, strlen(name));
+        rb_ivar_set(io, iv_kgio_addr, host);
+}
+
+/*
+ * call-seq:
+ *
+ *        server = Kgio::TCPServer.new('0.0.0.0', 80)
+ *        server.kgio_tryaccept -> Kgio::Socket or nil
+ *
+ * Initiates a non-blocking accept and returns a generic Kgio::Socket
+ * object with the kgio_addr attribute set to the IP address of the
+ * connected client on success.
+ *
+ * Returns nil on EAGAIN, and raises on other errors.
+ */
+static VALUE tcp_tryaccept(VALUE io)
+{
+        struct sockaddr_in addr;
+        socklen_t addrlen = sizeof(struct sockaddr_in);
+        VALUE rv = my_accept(io, (struct sockaddr *)&addr, &addrlen, 1);
+
+        if (!NIL_P(rv))
+                in_addr_set(rv, &addr);
+        return rv;
+}
+
+/*
+ * call-seq:
+ *
+ *        server = Kgio::TCPServer.new('0.0.0.0', 80)
+ *        server.kgio_accept -> Kgio::Socket or nil
+ *
+ * Initiates a blocking accept and returns a generic Kgio::Socket
+ * object with the kgio_addr attribute set to the IP address of
+ * the client on success.
+ *
+ * On Ruby implementations using native threads, this can use a blocking
+ * accept(2) (or accept4(2)) system call to avoid thundering herds.
+ */
+static VALUE tcp_accept(VALUE io)
+{
+        struct sockaddr_in addr;
+        socklen_t addrlen = sizeof(struct sockaddr_in);
+        VALUE rv = my_accept(io, (struct sockaddr *)&addr, &addrlen, 0);
+
+        in_addr_set(rv, &addr);
+        return rv;
+}
+
+/*
+ * call-seq:
+ *
+ *        server = Kgio::UNIXServer.new("/path/to/unix/socket")
+ *        server.kgio_tryaccept -> Kgio::Socket or nil
+ *
+ * Initiates a non-blocking accept and returns a generic Kgio::Socket
+ * object with the kgio_addr attribute set (to the value of
+ * Kgio::LOCALHOST) on success.
+ *
+ * Returns nil on EAGAIN, and raises on other errors.
+ */
+static VALUE unix_tryaccept(VALUE io)
+{
+        VALUE rv = my_accept(io, NULL, NULL, 1);
+
+        if (!NIL_P(rv))
+                rb_ivar_set(rv, iv_kgio_addr, localhost);
+        return rv;
+}
+
+/*
+ * call-seq:
+ *
+ *        server = Kgio::UNIXServer.new("/path/to/unix/socket")
+ *        server.kgio_accept -> Kgio::Socket or nil
+ *
+ * Initiates a blocking accept and returns a generic Kgio::Socket
+ * object with the kgio_addr attribute set (to the value of
+ * Kgio::LOCALHOST) on success.
+ *
+ * On Ruby implementations using native threads, this can use a blocking
+ * accept(2) (or accept4(2)) system call to avoid thundering herds.
+ */
+static VALUE unix_accept(VALUE io)
+{
+        VALUE rv = my_accept(io, NULL, NULL, 0);
+
+        rb_ivar_set(rv, iv_kgio_addr, localhost);
+        return rv;
+}
+
+/*
+ * call-seq:
+ *
+ *        Kgio.accept_cloexec? -> true or false
+ *
+ * Returns true if newly accepted Kgio::Sockets are created with the
+ * FD_CLOEXEC file descriptor flag, false if not.
+ */
+static VALUE get_cloexec(VALUE mod)
+{
+        return (accept4_flags & SOCK_CLOEXEC) == SOCK_CLOEXEC ? Qtrue : Qfalse;
+}
+
+/*
+ *
+ * call-seq:
+ *
+ *        Kgio.accept_nonblock? -> true or false
+ *
+ * Returns true if newly accepted Kgio::Sockets are created with the
+ * O_NONBLOCK file status flag, false if not.
+ */
+static VALUE get_nonblock(VALUE mod)
+{
+        return (accept4_flags & SOCK_NONBLOCK)==SOCK_NONBLOCK ? Qtrue : Qfalse;
+}
+
+/*
+ * call-seq:
+ *
+ *        Kgio.accept_cloexec = true
+ *        Kgio.accept_clocexec = false
+ *
+ * Sets whether or not Kgio::Socket objects created by
+ * TCPServer#kgio_accept,
+ * TCPServer#kgio_tryaccept,
+ * UNIXServer#kgio_accept,
+ * and UNIXServer#kgio_tryaccept
+ * are created with the FD_CLOEXEC file descriptor flag.
+ *
+ * This is on by default, as there is little reason to deal to enable
+ * it for client sockets on a socket server.
+ */
+static VALUE set_cloexec(VALUE mod, VALUE boolean)
+{
+        switch (TYPE(boolean)) {
+        case T_TRUE:
+                accept4_flags |= SOCK_CLOEXEC;
+                return boolean;
+        case T_FALSE:
+                accept4_flags &= ~SOCK_CLOEXEC;
+                return boolean;
+        }
+        rb_raise(rb_eTypeError, "not true or false");
+        return Qnil;
+}
+
+/*
+ * call-seq:
+ *
+ *        Kgio.accept_nonblock = true
+ *        Kgio.accept_nonblock = false
+ *
+ * Sets whether or not Kgio::Socket objects created by
+ * TCPServer#kgio_accept,
+ * TCPServer#kgio_tryaccept,
+ * UNIXServer#kgio_accept,
+ * and UNIXServer#kgio_tryaccept
+ * are created with the O_NONBLOCK file status flag.
+ *
+ * This defaults to +false+ for GNU/Linux where MSG_DONTWAIT is
+ * available (and on newer GNU/Linux, accept4() may also set
+ * the non-blocking flag.  This defaults to +true+ on non-GNU/Linux
+ * systems.
+ */
+static VALUE set_nonblock(VALUE mod, VALUE boolean)
+{
+        switch (TYPE(boolean)) {
+        case T_TRUE:
+                accept4_flags |= SOCK_NONBLOCK;
+                return boolean;
+        case T_FALSE:
+                accept4_flags &= ~SOCK_NONBLOCK;
+                return boolean;
+        }
+        rb_raise(rb_eTypeError, "not true or false");
+        return Qnil;
+}
+
+void init_kgio_accept(VALUE mKgio)
+{
+        VALUE cUNIXServer, cTCPServer;
+
+        localhost = rb_const_get(mKgio, rb_intern("LOCALHOST"));
+        cKgio_Socket = rb_const_get(mKgio, rb_intern("Socket"));
+        cClientSocket = cKgio_Socket;
+        mSocketMethods = rb_const_get(mKgio, rb_intern("SocketMethods"));
+
+        rb_define_singleton_method(mKgio, "accept_cloexec?", get_cloexec, 0);
+        rb_define_singleton_method(mKgio, "accept_cloexec=", set_cloexec, 1);
+        rb_define_singleton_method(mKgio, "accept_nonblock?", get_nonblock, 0);
+        rb_define_singleton_method(mKgio, "accept_nonblock=", set_nonblock, 1);
+        rb_define_singleton_method(mKgio, "accept_class=", set_accepted, 1);
+        rb_define_singleton_method(mKgio, "accept_class", get_accepted, 0);
+
+        cUNIXServer = rb_const_get(rb_cObject, rb_intern("UNIXServer"));
+        cUNIXServer = rb_define_class_under(mKgio, "UNIXServer", cUNIXServer);
+        rb_define_method(cUNIXServer, "kgio_tryaccept", unix_tryaccept, 0);
+        rb_define_method(cUNIXServer, "kgio_accept", unix_accept, 0);
+
+        cTCPServer = rb_const_get(rb_cObject, rb_intern("TCPServer"));
+        cTCPServer = rb_define_class_under(mKgio, "TCPServer", cTCPServer);
+        rb_define_method(cTCPServer, "kgio_tryaccept", tcp_tryaccept, 0);
+        rb_define_method(cTCPServer, "kgio_accept", tcp_accept, 0);
+        init_sock_for_fd();
+        iv_kgio_addr = rb_intern("@kgio_addr");
+}