about summary refs log tree commit homepage
path: root/ext/kgio/autopush.c
diff options
context:
space:
mode:
Diffstat (limited to 'ext/kgio/autopush.c')
-rw-r--r--ext/kgio/autopush.c167
1 files changed, 167 insertions, 0 deletions
diff --git a/ext/kgio/autopush.c b/ext/kgio/autopush.c
new file mode 100644
index 0000000..8264c29
--- /dev/null
+++ b/ext/kgio/autopush.c
@@ -0,0 +1,167 @@
+/*
+ * We use a very basic strategy to use TCP_CORK semantics optimally
+ * in most TCP servers:  On corked sockets, we will uncork on recv()
+ * if there was a previous send().  Otherwise we do not fiddle
+ * with TCP_CORK at all.
+ *
+ * Under Linux, we can rely on TCP_CORK being inherited in an
+ * accept()-ed client socket so we can avoid syscalls for each
+ * accept()-ed client if we know the accept() socket corks.
+ *
+ * This module does NOTHING for client TCP sockets, we only deal
+ * with accept()-ed sockets right now.
+ */
+
+#include "kgio.h"
+
+enum autopush_state {
+        AUTOPUSH_STATE_IGNORE = -1,
+        AUTOPUSH_STATE_WRITER = 0,
+        AUTOPUSH_STATE_WRITTEN = 1,
+        AUTOPUSH_STATE_ACCEPTOR = 2
+};
+
+struct autopush_socket {
+        VALUE io;
+        enum autopush_state state;
+};
+
+static int enabled;
+static long capa;
+static struct autopush_socket *active;
+
+static void set_acceptor_state(struct autopush_socket *aps, int fd);
+static void flush_pending_data(int fd);
+
+static void grow(int fd)
+{
+        long new_capa = fd + 64;
+        size_t size;
+
+        assert(new_capa > capa && "grow()-ing for low fd");
+        size = new_capa * sizeof(struct autopush_socket);
+        active = xrealloc(active, size);
+
+        while (capa < new_capa) {
+                struct autopush_socket *aps = &active[capa++];
+
+                aps->io = Qnil;
+                aps->state = AUTOPUSH_STATE_IGNORE;
+        }
+}
+
+static VALUE s_get_autopush(VALUE self)
+{
+        return enabled ? Qtrue : Qfalse;
+}
+
+static VALUE s_set_autopush(VALUE self, VALUE val)
+{
+        enabled = RTEST(val);
+
+        return val;
+}
+
+void init_kgio_autopush(void)
+{
+        VALUE m = rb_define_module("Kgio");
+
+        rb_define_singleton_method(m, "autopush?", s_get_autopush, 0);
+        rb_define_singleton_method(m, "autopush=", s_set_autopush, 1);
+}
+
+/*
+ * called after a successful write, just mark that we've put something
+ * in the skb and will need to uncork on the next write.
+ */
+void kgio_autopush_send(VALUE io, int fd)
+{
+        struct autopush_socket *aps;
+
+        if (fd >= capa) return;
+        aps = &active[fd];
+        if (aps->io == io && aps->state == AUTOPUSH_STATE_WRITER)
+                aps->state = AUTOPUSH_STATE_WRITTEN;
+}
+
+/* called on successful accept() */
+void kgio_autopush_accept(VALUE accept_io, VALUE io, int accept_fd, int fd)
+{
+        struct autopush_socket *accept_aps, *client_aps;
+
+        if (!enabled)
+                return;
+        assert(fd >= 0 && "client_fd negative");
+        assert(accept_fd >= 0 && "accept_fd negative");
+        if (fd >= capa || accept_fd >= capa)
+                grow(fd > accept_fd ? fd : accept_fd);
+
+        accept_aps = &active[accept_fd];
+
+        if (accept_aps->io != accept_io) {
+                accept_aps->io = accept_io;
+                set_acceptor_state(accept_aps, fd);
+        }
+        client_aps = &active[fd];
+        client_aps->io = io;
+        if (accept_aps->state == AUTOPUSH_STATE_ACCEPTOR)
+                client_aps->state = AUTOPUSH_STATE_WRITER;
+        else
+                client_aps->state = AUTOPUSH_STATE_IGNORE;
+}
+
+void kgio_autopush_recv(VALUE io, int fd)
+{
+        struct autopush_socket *aps;
+
+        if (fd >= capa)
+                return;
+
+        aps = &active[fd];
+        if (aps->io != io || aps->state != AUTOPUSH_STATE_WRITTEN)
+                return;
+
+        /* reset internal state and flush corked buffers */
+        aps->state = AUTOPUSH_STATE_WRITER;
+        if (enabled)
+                flush_pending_data(fd);
+}
+
+#ifdef __linux__
+#include <netinet/tcp.h>
+static void set_acceptor_state(struct autopush_socket *aps, int fd)
+{
+        int corked = 0;
+        socklen_t optlen = sizeof(int);
+
+        if (getsockopt(fd, SOL_TCP, TCP_CORK, &corked, &optlen) != 0) {
+                if (errno != EOPNOTSUPP)
+                        rb_sys_fail("getsockopt(SOL_TCP, TCP_CORK)");
+                errno = 0;
+                aps->state = AUTOPUSH_STATE_IGNORE;
+        } else if (corked) {
+                aps->state = AUTOPUSH_STATE_ACCEPTOR;
+        } else {
+                aps->state = AUTOPUSH_STATE_IGNORE;
+        }
+}
+
+/*
+ * checks to see if we've written anything since the last recv()
+ * If we have, uncork the socket and immediately recork it.
+ */
+static void flush_pending_data(int fd)
+{
+        int optval = 0;
+        socklen_t optlen = sizeof(int);
+
+        if (setsockopt(fd, SOL_TCP, TCP_CORK, &optval, optlen) != 0)
+                rb_sys_fail("setsockopt(SOL_TCP, TCP_CORK, 0)");
+        /* immediately recork */
+        optval = 1;
+        if (setsockopt(fd, SOL_TCP, TCP_CORK, &optval, optlen) != 0)
+                rb_sys_fail("setsockopt(SOL_TCP, TCP_CORK, 1)");
+}
+/* TODO: add FreeBSD support */
+
+#endif /* linux */