diff options
Diffstat (limited to 'ext/kgio/autopush.c')
-rw-r--r-- | ext/kgio/autopush.c | 121 |
1 files changed, 49 insertions, 72 deletions
diff --git a/ext/kgio/autopush.c b/ext/kgio/autopush.c index 8264c29..ac9e217 100644 --- a/ext/kgio/autopush.c +++ b/ext/kgio/autopush.c @@ -13,43 +13,36 @@ */ #include "kgio.h" +static ID id_autopush_state; +static int enabled; enum autopush_state { - AUTOPUSH_STATE_IGNORE = -1, - AUTOPUSH_STATE_WRITER = 0, - AUTOPUSH_STATE_WRITTEN = 1, - AUTOPUSH_STATE_ACCEPTOR = 2 -}; - -struct autopush_socket { - VALUE io; - enum autopush_state state; + AUTOPUSH_STATE_ACCEPTOR_IGNORE = -1, + AUTOPUSH_STATE_IGNORE = 0, + AUTOPUSH_STATE_WRITER = 1, + AUTOPUSH_STATE_WRITTEN = 2, + AUTOPUSH_STATE_ACCEPTOR = 3 }; -static int enabled; -static long capa; -static struct autopush_socket *active; - -static void set_acceptor_state(struct autopush_socket *aps, int fd); -static void flush_pending_data(int fd); - -static void grow(int fd) +static enum autopush_state state_get(VALUE io) { - long new_capa = fd + 64; - size_t size; + VALUE val; - assert(new_capa > capa && "grow()-ing for low fd"); - size = new_capa * sizeof(struct autopush_socket); - active = xrealloc(active, size); + if (rb_ivar_defined(io, id_autopush_state) == Qfalse) + return AUTOPUSH_STATE_IGNORE; + val = rb_ivar_get(io, id_autopush_state); - while (capa < new_capa) { - struct autopush_socket *aps = &active[capa++]; + return (enum autopush_state)NUM2INT(val); +} - aps->io = Qnil; - aps->state = AUTOPUSH_STATE_IGNORE; - } +static void state_set(VALUE io, enum autopush_state state) +{ + rb_ivar_set(io, id_autopush_state, INT2NUM(state)); } +static enum autopush_state detect_acceptor_state(VALUE io); +static void push_pending_data(VALUE io); + static VALUE s_get_autopush(VALUE self) { return enabled ? Qtrue : Qfalse; @@ -68,92 +61,76 @@ void init_kgio_autopush(void) rb_define_singleton_method(m, "autopush?", s_get_autopush, 0); rb_define_singleton_method(m, "autopush=", s_set_autopush, 1); + id_autopush_state = rb_intern("@kgio_autopush_state"); } /* * called after a successful write, just mark that we've put something * in the skb and will need to uncork on the next write. */ -void kgio_autopush_send(VALUE io, int fd) +void kgio_autopush_send(VALUE io) { - struct autopush_socket *aps; - - if (fd >= capa) return; - aps = &active[fd]; - if (aps->io == io && aps->state == AUTOPUSH_STATE_WRITER) - aps->state = AUTOPUSH_STATE_WRITTEN; + if (state_get(io) == AUTOPUSH_STATE_WRITER) + state_set(io, AUTOPUSH_STATE_WRITTEN); } /* called on successful accept() */ -void kgio_autopush_accept(VALUE accept_io, VALUE io, int accept_fd, int fd) +void kgio_autopush_accept(VALUE accept_io, VALUE client_io) { - struct autopush_socket *accept_aps, *client_aps; + enum autopush_state acceptor_state; if (!enabled) return; - assert(fd >= 0 && "client_fd negative"); - assert(accept_fd >= 0 && "accept_fd negative"); - if (fd >= capa || accept_fd >= capa) - grow(fd > accept_fd ? fd : accept_fd); - - accept_aps = &active[accept_fd]; - - if (accept_aps->io != accept_io) { - accept_aps->io = accept_io; - set_acceptor_state(accept_aps, fd); - } - client_aps = &active[fd]; - client_aps->io = io; - if (accept_aps->state == AUTOPUSH_STATE_ACCEPTOR) - client_aps->state = AUTOPUSH_STATE_WRITER; + acceptor_state = state_get(accept_io); + if (acceptor_state == AUTOPUSH_STATE_IGNORE) + acceptor_state = detect_acceptor_state(accept_io); + if (acceptor_state == AUTOPUSH_STATE_ACCEPTOR) + state_set(client_io, AUTOPUSH_STATE_WRITER); else - client_aps->state = AUTOPUSH_STATE_IGNORE; + state_set(client_io, AUTOPUSH_STATE_IGNORE); } -void kgio_autopush_recv(VALUE io, int fd) +void kgio_autopush_recv(VALUE io) { - struct autopush_socket *aps; - - if (fd >= capa) - return; - - aps = &active[fd]; - if (aps->io != io || aps->state != AUTOPUSH_STATE_WRITTEN) - return; - - /* reset internal state and flush corked buffers */ - aps->state = AUTOPUSH_STATE_WRITER; - if (enabled) - flush_pending_data(fd); + if (enabled && (state_get(io) == AUTOPUSH_STATE_WRITTEN)) { + push_pending_data(io); + state_set(io, AUTOPUSH_STATE_WRITER); + } } #ifdef __linux__ #include <netinet/tcp.h> -static void set_acceptor_state(struct autopush_socket *aps, int fd) +static enum autopush_state detect_acceptor_state(VALUE io) { int corked = 0; + int fd = my_fileno(io); socklen_t optlen = sizeof(int); + enum autopush_state state; if (getsockopt(fd, SOL_TCP, TCP_CORK, &corked, &optlen) != 0) { if (errno != EOPNOTSUPP) rb_sys_fail("getsockopt(SOL_TCP, TCP_CORK)"); errno = 0; - aps->state = AUTOPUSH_STATE_IGNORE; + state = AUTOPUSH_STATE_ACCEPTOR_IGNORE; } else if (corked) { - aps->state = AUTOPUSH_STATE_ACCEPTOR; + state = AUTOPUSH_STATE_ACCEPTOR; } else { - aps->state = AUTOPUSH_STATE_IGNORE; + state = AUTOPUSH_STATE_ACCEPTOR_IGNORE; } + state_set(io, state); + + return state; } /* * checks to see if we've written anything since the last recv() * If we have, uncork the socket and immediately recork it. */ -static void flush_pending_data(int fd) +static void push_pending_data(VALUE io) { int optval = 0; - socklen_t optlen = sizeof(int); + const socklen_t optlen = sizeof(int); + const int fd = my_fileno(io); if (setsockopt(fd, SOL_TCP, TCP_CORK, &optval, optlen) != 0) rb_sys_fail("setsockopt(SOL_TCP, TCP_CORK, 0)"); |