about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <e@yhbt.net>2010-09-29 18:25:58 -0700
committerEric Wong <e@yhbt.net>2010-09-29 18:31:27 -0700
commit2a6115a89d5c95428bd6c3e0bc10e5a3a4c3c3be (patch)
tree77851c1db59ebf828dd1f88b231b1b4f0b1734db
parent8fe89997453d6c530c3f5e08bc9c1da40a621248 (diff)
downloadkgio-2a6115a89d5c95428bd6c3e0bc10e5a3a4c3c3be.tar.gz
Making the code easier to read and navigate.  This also
frees us from having to use the stupid A4_ prefix for
accept4(2) flags since it conflicts with the socket(2)
ones.
-rw-r--r--ext/kgio/accept.c371
-rw-r--r--ext/kgio/connect.c256
-rw-r--r--ext/kgio/kgio.h40
-rw-r--r--ext/kgio/kgio_ext.c1096
-rw-r--r--ext/kgio/missing/accept4.h20
-rw-r--r--ext/kgio/read_write.c325
-rw-r--r--ext/kgio/sock_for_fd.h3
-rw-r--r--ext/kgio/wait.c115
-rw-r--r--lib/kgio.rb18
9 files changed, 1141 insertions, 1103 deletions
diff --git a/ext/kgio/accept.c b/ext/kgio/accept.c
new file mode 100644
index 0000000..e1d1541
--- /dev/null
+++ b/ext/kgio/accept.c
@@ -0,0 +1,371 @@
+#include "kgio.h"
+#include "missing/accept4.h"
+#include "sock_for_fd.h"
+
+static VALUE localhost;
+static VALUE cClientSocket;
+static VALUE cKgio_Socket;
+static VALUE mSocketMethods;
+static VALUE iv_kgio_addr;
+
+#if defined(__linux__)
+static int accept4_flags = SOCK_CLOEXEC;
+#else /* ! linux */
+static int accept4_flags = SOCK_CLOEXEC | SOCK_NONBLOCK;
+#endif /* ! linux */
+
+struct accept_args {
+        int fd;
+        struct sockaddr *addr;
+        socklen_t *addrlen;
+};
+
+static VALUE set_accepted(VALUE klass, VALUE aclass)
+{
+        VALUE tmp;
+
+        if (NIL_P(aclass))
+                aclass = cKgio_Socket;
+
+        tmp = rb_funcall(aclass, rb_intern("included_modules"), 0, 0);
+        tmp = rb_funcall(tmp, rb_intern("include?"), 1, mSocketMethods);
+
+        if (tmp != Qtrue)
+                rb_raise(rb_eTypeError,
+                         "class must include Kgio::SocketMethods");
+
+        cClientSocket = aclass;
+
+        return aclass;
+}
+
+static VALUE get_accepted(VALUE klass)
+{
+        return cClientSocket;
+}
+
+static VALUE xaccept(void *ptr)
+{
+        struct accept_args *a = ptr;
+
+        return (VALUE)accept4(a->fd, a->addr, a->addrlen, accept4_flags);
+}
+
+#ifdef HAVE_RB_THREAD_BLOCKING_REGION
+#  include <time.h>
+/*
+ * Try to use a (real) blocking accept() since that can prevent
+ * thundering herds under Linux:
+ * http://www.citi.umich.edu/projects/linux-scalability/reports/accept.html
+ *
+ * So we periodically disable non-blocking, but not too frequently
+ * because other processes may set non-blocking (especially during
+ * a process upgrade) with Rainbows! concurrency model changes.
+ */
+static int thread_accept(struct accept_args *a, int force_nonblock)
+{
+        if (force_nonblock)
+                set_nonblocking(a->fd);
+        return (int)rb_thread_blocking_region(xaccept, a, RUBY_UBF_IO, 0);
+}
+
+static void set_blocking_or_block(int fd)
+{
+        static time_t last_set_blocking;
+        time_t now = time(NULL);
+
+        if (last_set_blocking == 0) {
+                last_set_blocking = now;
+                (void)rb_io_wait_readable(fd);
+        } else if ((now - last_set_blocking) <= 5) {
+                (void)rb_io_wait_readable(fd);
+        } else {
+                int flags = fcntl(fd, F_GETFL);
+                if (flags == -1)
+                        rb_sys_fail("fcntl(F_GETFL)");
+                if (flags & O_NONBLOCK) {
+                        flags = fcntl(fd, F_SETFL, flags & ~O_NONBLOCK);
+                        if (flags == -1)
+                                rb_sys_fail("fcntl(F_SETFL)");
+                }
+                last_set_blocking = now;
+        }
+}
+#else /* ! HAVE_RB_THREAD_BLOCKING_REGION */
+#  include <rubysig.h>
+static int thread_accept(struct accept_args *a, int force_nonblock)
+{
+        int rv;
+
+        /* always use non-blocking accept() under 1.8 for green threads */
+        set_nonblocking(a->fd);
+        TRAP_BEG;
+        rv = (int)xaccept(a);
+        TRAP_END;
+        return rv;
+}
+#define set_blocking_or_block(fd) (void)rb_io_wait_readable(fd)
+#endif /* ! HAVE_RB_THREAD_BLOCKING_REGION */
+
+static VALUE
+my_accept(VALUE io, struct sockaddr *addr, socklen_t *addrlen, int nonblock)
+{
+        int client;
+        struct accept_args a;
+
+        a.fd = my_fileno(io);
+        a.addr = addr;
+        a.addrlen = addrlen;
+retry:
+        client = thread_accept(&a, nonblock);
+        if (client == -1) {
+                switch (errno) {
+                case EAGAIN:
+                        if (nonblock)
+                                return Qnil;
+                        set_blocking_or_block(a.fd);
+#ifdef ECONNABORTED
+                case ECONNABORTED:
+#endif /* ECONNABORTED */
+#ifdef EPROTO
+                case EPROTO:
+#endif /* EPROTO */
+                case EINTR:
+                        goto retry;
+                case ENOMEM:
+                case EMFILE:
+                case ENFILE:
+#ifdef ENOBUFS
+                case ENOBUFS:
+#endif /* ENOBUFS */
+                        errno = 0;
+                        rb_gc();
+                        client = thread_accept(&a, nonblock);
+                }
+                if (client == -1) {
+                        if (errno == EINTR)
+                                goto retry;
+                        rb_sys_fail("accept");
+                }
+        }
+        return sock_for_fd(cClientSocket, client);
+}
+
+static void in_addr_set(VALUE io, struct sockaddr_in *addr)
+{
+        VALUE host = rb_str_new(0, INET_ADDRSTRLEN);
+        socklen_t addrlen = (socklen_t)INET_ADDRSTRLEN;
+        const char *name;
+
+        name = inet_ntop(AF_INET, &addr->sin_addr, RSTRING_PTR(host), addrlen);
+        if (name == NULL)
+                rb_sys_fail("inet_ntop");
+        rb_str_set_len(host, strlen(name));
+        rb_ivar_set(io, iv_kgio_addr, host);
+}
+
+/*
+ * call-seq:
+ *
+ *        server = Kgio::TCPServer.new('0.0.0.0', 80)
+ *        server.kgio_tryaccept -> Kgio::Socket or nil
+ *
+ * Initiates a non-blocking accept and returns a generic Kgio::Socket
+ * object with the kgio_addr attribute set to the IP address of the
+ * connected client on success.
+ *
+ * Returns nil on EAGAIN, and raises on other errors.
+ */
+static VALUE tcp_tryaccept(VALUE io)
+{
+        struct sockaddr_in addr;
+        socklen_t addrlen = sizeof(struct sockaddr_in);
+        VALUE rv = my_accept(io, (struct sockaddr *)&addr, &addrlen, 1);
+
+        if (!NIL_P(rv))
+                in_addr_set(rv, &addr);
+        return rv;
+}
+
+/*
+ * call-seq:
+ *
+ *        server = Kgio::TCPServer.new('0.0.0.0', 80)
+ *        server.kgio_accept -> Kgio::Socket or nil
+ *
+ * Initiates a blocking accept and returns a generic Kgio::Socket
+ * object with the kgio_addr attribute set to the IP address of
+ * the client on success.
+ *
+ * On Ruby implementations using native threads, this can use a blocking
+ * accept(2) (or accept4(2)) system call to avoid thundering herds.
+ */
+static VALUE tcp_accept(VALUE io)
+{
+        struct sockaddr_in addr;
+        socklen_t addrlen = sizeof(struct sockaddr_in);
+        VALUE rv = my_accept(io, (struct sockaddr *)&addr, &addrlen, 0);
+
+        in_addr_set(rv, &addr);
+        return rv;
+}
+
+/*
+ * call-seq:
+ *
+ *        server = Kgio::UNIXServer.new("/path/to/unix/socket")
+ *        server.kgio_tryaccept -> Kgio::Socket or nil
+ *
+ * Initiates a non-blocking accept and returns a generic Kgio::Socket
+ * object with the kgio_addr attribute set (to the value of
+ * Kgio::LOCALHOST) on success.
+ *
+ * Returns nil on EAGAIN, and raises on other errors.
+ */
+static VALUE unix_tryaccept(VALUE io)
+{
+        VALUE rv = my_accept(io, NULL, NULL, 1);
+
+        if (!NIL_P(rv))
+                rb_ivar_set(rv, iv_kgio_addr, localhost);
+        return rv;
+}
+
+/*
+ * call-seq:
+ *
+ *        server = Kgio::UNIXServer.new("/path/to/unix/socket")
+ *        server.kgio_accept -> Kgio::Socket or nil
+ *
+ * Initiates a blocking accept and returns a generic Kgio::Socket
+ * object with the kgio_addr attribute set (to the value of
+ * Kgio::LOCALHOST) on success.
+ *
+ * On Ruby implementations using native threads, this can use a blocking
+ * accept(2) (or accept4(2)) system call to avoid thundering herds.
+ */
+static VALUE unix_accept(VALUE io)
+{
+        VALUE rv = my_accept(io, NULL, NULL, 0);
+
+        rb_ivar_set(rv, iv_kgio_addr, localhost);
+        return rv;
+}
+
+/*
+ * call-seq:
+ *
+ *        Kgio.accept_cloexec? -> true or false
+ *
+ * Returns true if newly accepted Kgio::Sockets are created with the
+ * FD_CLOEXEC file descriptor flag, false if not.
+ */
+static VALUE get_cloexec(VALUE mod)
+{
+        return (accept4_flags & SOCK_CLOEXEC) == SOCK_CLOEXEC ? Qtrue : Qfalse;
+}
+
+/*
+ *
+ * call-seq:
+ *
+ *        Kgio.accept_nonblock? -> true or false
+ *
+ * Returns true if newly accepted Kgio::Sockets are created with the
+ * O_NONBLOCK file status flag, false if not.
+ */
+static VALUE get_nonblock(VALUE mod)
+{
+        return (accept4_flags & SOCK_NONBLOCK)==SOCK_NONBLOCK ? Qtrue : Qfalse;
+}
+
+/*
+ * call-seq:
+ *
+ *        Kgio.accept_cloexec = true
+ *        Kgio.accept_clocexec = false
+ *
+ * Sets whether or not Kgio::Socket objects created by
+ * TCPServer#kgio_accept,
+ * TCPServer#kgio_tryaccept,
+ * UNIXServer#kgio_accept,
+ * and UNIXServer#kgio_tryaccept
+ * are created with the FD_CLOEXEC file descriptor flag.
+ *
+ * This is on by default, as there is little reason to deal to enable
+ * it for client sockets on a socket server.
+ */
+static VALUE set_cloexec(VALUE mod, VALUE boolean)
+{
+        switch (TYPE(boolean)) {
+        case T_TRUE:
+                accept4_flags |= SOCK_CLOEXEC;
+                return boolean;
+        case T_FALSE:
+                accept4_flags &= ~SOCK_CLOEXEC;
+                return boolean;
+        }
+        rb_raise(rb_eTypeError, "not true or false");
+        return Qnil;
+}
+
+/*
+ * call-seq:
+ *
+ *        Kgio.accept_nonblock = true
+ *        Kgio.accept_nonblock = false
+ *
+ * Sets whether or not Kgio::Socket objects created by
+ * TCPServer#kgio_accept,
+ * TCPServer#kgio_tryaccept,
+ * UNIXServer#kgio_accept,
+ * and UNIXServer#kgio_tryaccept
+ * are created with the O_NONBLOCK file status flag.
+ *
+ * This defaults to +false+ for GNU/Linux where MSG_DONTWAIT is
+ * available (and on newer GNU/Linux, accept4() may also set
+ * the non-blocking flag.  This defaults to +true+ on non-GNU/Linux
+ * systems.
+ */
+static VALUE set_nonblock(VALUE mod, VALUE boolean)
+{
+        switch (TYPE(boolean)) {
+        case T_TRUE:
+                accept4_flags |= SOCK_NONBLOCK;
+                return boolean;
+        case T_FALSE:
+                accept4_flags &= ~SOCK_NONBLOCK;
+                return boolean;
+        }
+        rb_raise(rb_eTypeError, "not true or false");
+        return Qnil;
+}
+
+void init_kgio_accept(VALUE mKgio)
+{
+        VALUE cUNIXServer, cTCPServer;
+
+        localhost = rb_const_get(mKgio, rb_intern("LOCALHOST"));
+        cKgio_Socket = rb_const_get(mKgio, rb_intern("Socket"));
+        cClientSocket = cKgio_Socket;
+        mSocketMethods = rb_const_get(mKgio, rb_intern("SocketMethods"));
+
+        rb_define_singleton_method(mKgio, "accept_cloexec?", get_cloexec, 0);
+        rb_define_singleton_method(mKgio, "accept_cloexec=", set_cloexec, 1);
+        rb_define_singleton_method(mKgio, "accept_nonblock?", get_nonblock, 0);
+        rb_define_singleton_method(mKgio, "accept_nonblock=", set_nonblock, 1);
+        rb_define_singleton_method(mKgio, "accept_class=", set_accepted, 1);
+        rb_define_singleton_method(mKgio, "accept_class", get_accepted, 0);
+
+        cUNIXServer = rb_const_get(rb_cObject, rb_intern("UNIXServer"));
+        cUNIXServer = rb_define_class_under(mKgio, "UNIXServer", cUNIXServer);
+        rb_define_method(cUNIXServer, "kgio_tryaccept", unix_tryaccept, 0);
+        rb_define_method(cUNIXServer, "kgio_accept", unix_accept, 0);
+
+        cTCPServer = rb_const_get(rb_cObject, rb_intern("TCPServer"));
+        cTCPServer = rb_define_class_under(mKgio, "TCPServer", cTCPServer);
+        rb_define_method(cTCPServer, "kgio_tryaccept", tcp_tryaccept, 0);
+        rb_define_method(cTCPServer, "kgio_accept", tcp_accept, 0);
+        init_sock_for_fd();
+        iv_kgio_addr = rb_intern("@kgio_addr");
+}
diff --git a/ext/kgio/connect.c b/ext/kgio/connect.c
new file mode 100644
index 0000000..17c77bb
--- /dev/null
+++ b/ext/kgio/connect.c
@@ -0,0 +1,256 @@
+#include "kgio.h"
+#include "sock_for_fd.h"
+
+static void close_fail(int fd, const char *msg)
+{
+        int saved_errno = errno;
+        (void)close(fd);
+        errno = saved_errno;
+        rb_sys_fail(msg);
+}
+
+#ifdef SOCK_NONBLOCK
+#  define MY_SOCK_STREAM (SOCK_STREAM|SOCK_NONBLOCK)
+#else
+#  define MY_SOCK_STREAM SOCK_STREAM
+#endif /* ! SOCK_NONBLOCK */
+
+static VALUE
+my_connect(VALUE klass, int io_wait, int domain, void *addr, socklen_t addrlen)
+{
+        int fd = socket(domain, MY_SOCK_STREAM, 0);
+
+        if (fd == -1) {
+                switch (errno) {
+                case EMFILE:
+                case ENFILE:
+#ifdef ENOBUFS
+                case ENOBUFS:
+#endif /* ENOBUFS */
+                        errno = 0;
+                        rb_gc();
+                        fd = socket(domain, MY_SOCK_STREAM, 0);
+                }
+                if (fd == -1)
+                        rb_sys_fail("socket");
+        }
+
+#ifndef SOCK_NONBLOCK
+        if (fcntl(fd, F_SETFL, O_RDWR | O_NONBLOCK) == -1)
+                close_fail(fd, "fcntl(F_SETFL, O_RDWR | O_NONBLOCK)");
+#endif /* SOCK_NONBLOCK */
+
+        if (connect(fd, addr, addrlen) == -1) {
+                if (errno == EINPROGRESS) {
+                        VALUE io = sock_for_fd(klass, fd);
+
+                        if (io_wait) {
+                                errno = EAGAIN;
+                                kgio_wait_writable(io, fd);
+                        }
+                        return io;
+                }
+                close_fail(fd, "connect");
+        }
+        return sock_for_fd(klass, fd);
+}
+
+static VALUE tcp_connect(VALUE klass, VALUE ip, VALUE port, int io_wait)
+{
+        struct sockaddr_in addr = { 0 };
+
+        addr.sin_family = AF_INET;
+        addr.sin_port = htons((unsigned short)NUM2INT(port));
+
+        switch (inet_pton(AF_INET, StringValuePtr(ip), &addr.sin_addr)) {
+        case 1:
+                return my_connect(klass, io_wait, PF_INET, &addr, sizeof(addr));
+        case -1:
+                rb_sys_fail("inet_pton");
+        }
+        rb_raise(rb_eArgError, "invalid address: %s", StringValuePtr(ip));
+
+        return Qnil;
+}
+
+/*
+ * call-seq:
+ *
+ *        Kgio::TCPSocket.new('127.0.0.1', 80) -> socket
+ *
+ * Creates a new Kgio::TCPSocket object and initiates a
+ * non-blocking connection.
+ *
+ * This may block and call any method assigned to Kgio.wait_writable.
+ *
+ * Unlike the TCPSocket.new in Ruby, this does NOT perform DNS
+ * lookups (which is subject to a different set of timeouts and
+ * best handled elsewhere).
+ */
+static VALUE kgio_tcp_connect(VALUE klass, VALUE ip, VALUE port)
+{
+        return tcp_connect(klass, ip, port, 1);
+}
+
+/*
+ * call-seq:
+ *
+ *        Kgio::TCPSocket.start('127.0.0.1', 80) -> socket
+ *
+ * Creates a new Kgio::TCPSocket object and initiates a
+ * non-blocking connection.  The caller should select/poll
+ * on the socket for writability before attempting to write
+ * or optimistically attempt a write and handle Kgio::WaitWritable
+ * or Errno::EAGAIN.
+ *
+ * Unlike the TCPSocket.new in Ruby, this does NOT perform DNS
+ * lookups (which is subject to a different set of timeouts and
+ * best handled elsewhere).
+ */
+static VALUE kgio_tcp_start(VALUE klass, VALUE ip, VALUE port)
+{
+        return tcp_connect(klass, ip, port, 0);
+}
+
+static VALUE unix_connect(VALUE klass, VALUE path, int io_wait)
+{
+        struct sockaddr_un addr = { 0 };
+        long len;
+
+        StringValue(path);
+        len = RSTRING_LEN(path);
+        if (sizeof(addr.sun_path) <= len)
+                rb_raise(rb_eArgError,
+                         "too long unix socket path (max: %dbytes)",
+                         (int)sizeof(addr.sun_path)-1);
+
+        memcpy(addr.sun_path, RSTRING_PTR(path), len);
+        addr.sun_family = AF_UNIX;
+
+        return my_connect(klass, io_wait, PF_UNIX, &addr, sizeof(addr));
+}
+
+/*
+ * call-seq:
+ *
+ *        Kgio::UNIXSocket.new("/path/to/unix/socket") -> socket
+ *
+ * Creates a new Kgio::UNIXSocket object and initiates a
+ * non-blocking connection.
+ *
+ * This may block and call any method assigned to Kgio.wait_writable.
+ */
+static VALUE kgio_unix_connect(VALUE klass, VALUE path)
+{
+        return unix_connect(klass, path, 1);
+}
+
+/*
+ * call-seq:
+ *
+ *        Kgio::UNIXSocket.start("/path/to/unix/socket") -> socket
+ *
+ * Creates a new Kgio::UNIXSocket object and initiates a
+ * non-blocking connection.  The caller should select/poll
+ * on the socket for writability before attempting to write
+ * or optimistically attempt a write and handle Kgio::WaitWritable
+ * or Errno::EAGAIN.
+ */
+static VALUE kgio_unix_start(VALUE klass, VALUE path)
+{
+        return unix_connect(klass, path, 0);
+}
+
+static VALUE stream_connect(VALUE klass, VALUE addr, int io_wait)
+{
+        int domain;
+        socklen_t addrlen;
+        struct sockaddr *sockaddr;
+
+        if (TYPE(addr) == T_STRING) {
+                sockaddr = (struct sockaddr *)(RSTRING_PTR(addr));
+                addrlen = (socklen_t)RSTRING_LEN(addr);
+        } else {
+                rb_raise(rb_eTypeError, "invalid address");
+        }
+        switch (((struct sockaddr_in *)(sockaddr))->sin_family) {
+        case AF_UNIX: domain = PF_UNIX; break;
+        case AF_INET: domain = PF_INET; break;
+#ifdef AF_INET6 /* IPv6 support incomplete */
+        case AF_INET6: domain = PF_INET6; break;
+#endif /* AF_INET6 */
+        default:
+                rb_raise(rb_eArgError, "invalid address family");
+        }
+
+        return my_connect(klass, io_wait, domain, sockaddr, addrlen);
+}
+
+/* call-seq:
+ *
+ *      addr = Socket.pack_sockaddr_in(80, 'example.com')
+ *        Kgio::Socket.connect(addr) -> socket
+ *
+ *      addr = Socket.pack_sockaddr_un("/path/to/unix/socket")
+ *        Kgio::Socket.connect(addr) -> socket
+ *
+ * Creates a generic Kgio::Socket object and initiates a
+ * non-blocking connection.
+ *
+ * This may block and call any method assigned to Kgio.wait_writable.
+ */
+static VALUE kgio_connect(VALUE klass, VALUE addr)
+{
+        return stream_connect(klass, addr, 1);
+}
+
+/* call-seq:
+ *
+ *      addr = Socket.pack_sockaddr_in(80, 'example.com')
+ *        Kgio::Socket.start(addr) -> socket
+ *
+ *      addr = Socket.pack_sockaddr_un("/path/to/unix/socket")
+ *        Kgio::Socket.start(addr) -> socket
+ *
+ * Creates a generic Kgio::Socket object and initiates a
+ * non-blocking connection.  The caller should select/poll
+ * on the socket for writability before attempting to write
+ * or optimistically attempt a write and handle Kgio::WaitWritable
+ * or Errno::EAGAIN.
+ */
+static VALUE kgio_start(VALUE klass, VALUE addr)
+{
+        return stream_connect(klass, addr, 0);
+}
+
+void init_kgio_connect(VALUE mKgio)
+{
+        VALUE cSocket = rb_const_get(rb_cObject, rb_intern("Socket"));
+        VALUE mSocketMethods = rb_const_get(mKgio, rb_intern("SocketMethods"));
+        VALUE cKgio_Socket, cTCPSocket, cUNIXSocket;
+
+        /*
+         * Document-class: Kgio::Socket
+         *
+         * A generic socket class with Kgio::SocketMethods included.
+         * This is returned by all Kgio methods that accept(2) a connected
+         * stream socket.
+         */
+        cKgio_Socket = rb_define_class_under(mKgio, "Socket", cSocket);
+        rb_include_module(cKgio_Socket, mSocketMethods);
+        rb_define_singleton_method(cKgio_Socket, "new", kgio_connect, 1);
+        rb_define_singleton_method(cKgio_Socket, "start", kgio_start, 1);
+
+        cTCPSocket = rb_const_get(rb_cObject, rb_intern("TCPSocket"));
+        cTCPSocket = rb_define_class_under(mKgio, "TCPSocket", cTCPSocket);
+        rb_include_module(cTCPSocket, mSocketMethods);
+        rb_define_singleton_method(cTCPSocket, "new", kgio_tcp_connect, 2);
+        rb_define_singleton_method(cTCPSocket, "start", kgio_tcp_start, 2);
+
+        cUNIXSocket = rb_const_get(rb_cObject, rb_intern("UNIXSocket"));
+        cUNIXSocket = rb_define_class_under(mKgio, "UNIXSocket", cUNIXSocket);
+        rb_include_module(cUNIXSocket, mSocketMethods);
+        rb_define_singleton_method(cUNIXSocket, "new", kgio_unix_connect, 1);
+        rb_define_singleton_method(cUNIXSocket, "start", kgio_unix_start, 1);
+        init_sock_for_fd();
+}
diff --git a/ext/kgio/kgio.h b/ext/kgio/kgio.h
new file mode 100644
index 0000000..83a247c
--- /dev/null
+++ b/ext/kgio/kgio.h
@@ -0,0 +1,40 @@
+#ifndef KGIO_H
+#define KGIO_H
+
+#include <ruby.h>
+#ifdef HAVE_RUBY_IO_H
+#  include <ruby/io.h>
+#else
+#  include <rubyio.h>
+#endif
+#include <errno.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <netinet/in.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <arpa/inet.h>
+#include <assert.h>
+
+#include "missing/ancient_ruby.h"
+#include "nonblock.h"
+#include "my_fileno.h"
+
+struct io_args {
+        VALUE io;
+        VALUE buf;
+        char *ptr;
+        long len;
+        int fd;
+};
+
+void init_kgio_wait(VALUE mKgio);
+void init_kgio_read_write(VALUE mKgio);
+void init_kgio_accept(VALUE mKgio);
+void init_kgio_connect(VALUE mKgio);
+
+void kgio_wait_writable(VALUE io, int fd);
+void kgio_wait_readable(VALUE io, int fd);
+
+#endif /* KGIO_H */
diff --git a/ext/kgio/kgio_ext.c b/ext/kgio/kgio_ext.c
index c8a9611..12f1c6a 100644
--- a/ext/kgio/kgio_ext.c
+++ b/ext/kgio/kgio_ext.c
@@ -1,1095 +1,11 @@
-#include <ruby.h>
-#ifdef HAVE_RUBY_IO_H
-#  include <ruby/io.h>
-#else
-#  include <rubyio.h>
-#endif
-#include <errno.h>
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <sys/un.h>
-#include <netinet/in.h>
-#include <fcntl.h>
-#include <unistd.h>
-#include <arpa/inet.h>
-#include <assert.h>
-
-#include "missing/accept4.h"
-#include "missing/ancient_ruby.h"
-#include "nonblock.h"
-#include "my_fileno.h"
-#include "sock_for_fd.h"
-
-#if defined(__linux__)
-/*
- * we know MSG_DONTWAIT works properly on all stream sockets under Linux
- * we can define this macro for other platforms as people care and
- * notice.
- */
-#  define USE_MSG_DONTWAIT
-static int accept4_flags = A4_SOCK_CLOEXEC;
-#else /* ! linux */
-static int accept4_flags = A4_SOCK_CLOEXEC | A4_SOCK_NONBLOCK;
-#endif /* ! linux */
-
-static VALUE cClientSocket;
-static VALUE mSocketMethods;
-static VALUE cSocket;
-static VALUE localhost;
-static VALUE mKgio_WaitReadable, mKgio_WaitWritable;
-static ID io_wait_rd, io_wait_wr;
-static ID iv_kgio_addr;
-
-struct io_args {
-        VALUE io;
-        VALUE buf;
-        char *ptr;
-        long len;
-        int fd;
-};
-
-struct accept_args {
-        int fd;
-        struct sockaddr *addr;
-        socklen_t *addrlen;
-};
-
-static void wait_readable(VALUE io, int fd)
-{
-        if (io_wait_rd) {
-                (void)rb_funcall(io, io_wait_rd, 0, 0);
-        } else {
-                if (!rb_io_wait_readable(fd))
-                        rb_sys_fail("wait readable");
-        }
-}
-
-static void wait_writable(VALUE io, int fd)
-{
-        if (io_wait_wr) {
-                (void)rb_funcall(io, io_wait_wr, 0, 0);
-        } else {
-                if (!rb_io_wait_writable(fd))
-                        rb_sys_fail("wait writable");
-        }
-}
-
-static void prepare_read(struct io_args *a, int argc, VALUE *argv, VALUE io)
-{
-        VALUE length;
-
-        a->io = io;
-        a->fd = my_fileno(io);
-        rb_scan_args(argc, argv, "11", &length, &a->buf);
-        a->len = NUM2LONG(length);
-        if (NIL_P(a->buf)) {
-                a->buf = rb_str_new(NULL, a->len);
-        } else {
-                StringValue(a->buf);
-                rb_str_resize(a->buf, a->len);
-        }
-        a->ptr = RSTRING_PTR(a->buf);
-}
-
-static int read_check(struct io_args *a, long n, const char *msg, int io_wait)
-{
-        if (n == -1) {
-                if (errno == EINTR)
-                        return -1;
-                rb_str_set_len(a->buf, 0);
-                if (errno == EAGAIN) {
-                        if (io_wait) {
-                                wait_readable(a->io, a->fd);
-
-                                /* buf may be modified in other thread/fiber */
-                                rb_str_resize(a->buf, a->len);
-                                a->ptr = RSTRING_PTR(a->buf);
-                                return -1;
-                        } else {
-                                a->buf = mKgio_WaitReadable;
-                                return 0;
-                        }
-                }
-                rb_sys_fail(msg);
-        }
-        rb_str_set_len(a->buf, n);
-        if (n == 0)
-                a->buf = Qnil;
-        return 0;
-}
-
-static VALUE my_read(int io_wait, int argc, VALUE *argv, VALUE io)
-{
-        struct io_args a;
-        long n;
-
-        prepare_read(&a, argc, argv, io);
-        set_nonblocking(a.fd);
-retry:
-        n = (long)read(a.fd, a.ptr, a.len);
-        if (read_check(&a, n, "read", io_wait) != 0)
-                goto retry;
-        return a.buf;
-}
-
-/*
- * call-seq:
- *
- *        io.kgio_read(maxlen)           ->  buffer
- *        io.kgio_read(maxlen, buffer)   ->  buffer
- *
- * Reads at most maxlen bytes from the stream socket.  Returns with a
- * newly allocated buffer, or may reuse an existing buffer if supplied.
- *
- * Calls the method assigned to Kgio.wait_readable, or blocks in a
- * thread-safe manner for writability.
- *
- * Returns nil on EOF.
- *
- * This behaves like read(2) and IO#readpartial, NOT fread(3) or
- * IO#read which possess read-in-full behavior.
- */
-static VALUE kgio_read(int argc, VALUE *argv, VALUE io)
-{
-        return my_read(1, argc, argv, io);
-}
-
-/*
- * call-seq:
- *
- *        io.kgio_tryread(maxlen)           ->  buffer
- *        io.kgio_tryread(maxlen, buffer)   ->  buffer
- *
- * Reads at most maxlen bytes from the stream socket.  Returns with a
- * newly allocated buffer, or may reuse an existing buffer if supplied.
- *
- * Returns nil on EOF.
- *
- * Returns Kgio::WaitReadable if EAGAIN is encountered.
- */
-static VALUE kgio_tryread(int argc, VALUE *argv, VALUE io)
-{
-        return my_read(0, argc, argv, io);
-}
-
-#ifdef USE_MSG_DONTWAIT
-static VALUE my_recv(int io_wait, int argc, VALUE *argv, VALUE io)
-{
-        struct io_args a;
-        long n;
-
-        prepare_read(&a, argc, argv, io);
-retry:
-        n = (long)recv(a.fd, a.ptr, a.len, MSG_DONTWAIT);
-        if (read_check(&a, n, "recv", io_wait) != 0)
-                goto retry;
-        return a.buf;
-}
-
-/*
- * This method may be optimized on some systems (e.g. GNU/Linux) to use
- * MSG_DONTWAIT to avoid explicitly setting the O_NONBLOCK flag via fcntl.
- * Otherwise this is the same as Kgio::PipeMethods#kgio_read
- */
-static VALUE kgio_recv(int argc, VALUE *argv, VALUE io)
-{
-        return my_recv(1, argc, argv, io);
-}
-
-/*
- * This method may be optimized on some systems (e.g. GNU/Linux) to use
- * MSG_DONTWAIT to avoid explicitly setting the O_NONBLOCK flag via fcntl.
- * Otherwise this is the same as Kgio::PipeMethods#kgio_tryread
- */
-static VALUE kgio_tryrecv(int argc, VALUE *argv, VALUE io)
-{
-        return my_recv(0, argc, argv, io);
-}
-#else /* ! USE_MSG_DONTWAIT */
-#  define kgio_recv kgio_read
-#  define kgio_tryrecv kgio_tryread
-#endif /* USE_MSG_DONTWAIT */
-
-static void prepare_write(struct io_args *a, VALUE io, VALUE str)
-{
-        a->buf = (TYPE(str) == T_STRING) ? str : rb_obj_as_string(str);
-        a->ptr = RSTRING_PTR(a->buf);
-        a->len = RSTRING_LEN(a->buf);
-        a->io = io;
-        a->fd = my_fileno(io);
-}
-
-static int write_check(struct io_args *a, long n, const char *msg, int io_wait)
-{
-        if (a->len == n) {
-done:
-                a->buf = Qnil;
-        } else if (n == -1) {
-                if (errno == EINTR)
-                        return -1;
-                if (errno == EAGAIN) {
-                        long written = RSTRING_LEN(a->buf) - a->len;
-
-                        if (io_wait) {
-                                wait_writable(a->io, a->fd);
-
-                                /* buf may be modified in other thread/fiber */
-                                a->len = RSTRING_LEN(a->buf) - written;
-                                if (a->len <= 0)
-                                        goto done;
-                                a->ptr = RSTRING_PTR(a->buf) + written;
-                                return -1;
-                        } else if (written > 0) {
-                                a->buf = rb_str_new(a->ptr + n, a->len - n);
-                        } else {
-                                a->buf = mKgio_WaitWritable;
-                        }
-                        return 0;
-                }
-                rb_sys_fail(msg);
-        } else {
-                assert(n >= 0 && n < a->len && "write/send syscall broken?");
-                a->ptr += n;
-                a->len -= n;
-                return -1;
-        }
-        return 0;
-}
-
-static VALUE my_write(VALUE io, VALUE str, int io_wait)
-{
-        struct io_args a;
-        long n;
-
-        prepare_write(&a, io, str);
-        set_nonblocking(a.fd);
-retry:
-        n = (long)write(a.fd, a.ptr, a.len);
-        if (write_check(&a, n, "write", io_wait) != 0)
-                goto retry;
-        return a.buf;
-}
-
-/*
- * call-seq:
- *
- *        io.kgio_write(str)        -> nil
- *
- * Returns nil when the write completes.
- *
- * Calls the method Kgio.wait_writable if it is set.  Otherwise this
- * blocks in a thread-safe manner until all data is written or a
- * fatal error occurs.
- */
-static VALUE kgio_write(VALUE io, VALUE str)
-{
-        return my_write(io, str, 1);
-}
-
-/*
- * call-seq:
- *
- *        io.kgio_trywrite(str)        -> nil or Kgio::WaitWritable
- *
- * Returns nil if the write was completed in full.
- *
- * Returns a String containing the unwritten portion if EAGAIN
- * was encountered, but some portion was successfully written.
- *
- * Returns Kgio::WaitWritable if EAGAIN is encountered and nothing
- * was written.
- */
-static VALUE kgio_trywrite(VALUE io, VALUE str)
-{
-        return my_write(io, str, 0);
-}
-
-#ifdef USE_MSG_DONTWAIT
-/*
- * This method behaves like Kgio::PipeMethods#kgio_write, except
- * it will use send(2) with the MSG_DONTWAIT flag on sockets to
- * avoid unnecessary calls to fcntl(2).
- */
-static VALUE my_send(VALUE io, VALUE str, int io_wait)
-{
-        struct io_args a;
-        long n;
-
-        prepare_write(&a, io, str);
-retry:
-        n = (long)send(a.fd, a.ptr, a.len, MSG_DONTWAIT);
-        if (write_check(&a, n, "send", io_wait) != 0)
-                goto retry;
-        return a.buf;
-}
-
-/*
- * This method may be optimized on some systems (e.g. GNU/Linux) to use
- * MSG_DONTWAIT to avoid explicitly setting the O_NONBLOCK flag via fcntl.
- * Otherwise this is the same as Kgio::PipeMethods#kgio_write
- */
-static VALUE kgio_send(VALUE io, VALUE str)
-{
-        return my_send(io, str, 1);
-}
-
-/*
- * This method may be optimized on some systems (e.g. GNU/Linux) to use
- * MSG_DONTWAIT to avoid explicitly setting the O_NONBLOCK flag via fcntl.
- * Otherwise this is the same as Kgio::PipeMethods#kgio_trywrite
- */
-static VALUE kgio_trysend(VALUE io, VALUE str)
-{
-        return my_send(io, str, 0);
-}
-#else /* ! USE_MSG_DONTWAIT */
-#  define kgio_send kgio_write
-#  define kgio_trysend kgio_trywrite
-#endif /* ! USE_MSG_DONTWAIT */
-
-/*
- * call-seq:
- *
- *        Kgio.wait_readable = :method_name
- *        Kgio.wait_readable = nil
- *
- * Sets a method for kgio_read to call when a read would block.
- * This is useful for non-blocking frameworks that use Fibers,
- * as the method referred to this may cause the current Fiber
- * to yield execution.
- *
- * A special value of nil will cause Ruby to wait using the
- * rb_io_wait_readable() function.
- */
-static VALUE set_wait_rd(VALUE mod, VALUE sym)
-{
-        switch (TYPE(sym)) {
-        case T_SYMBOL:
-                io_wait_rd = SYM2ID(sym);
-                return sym;
-        case T_NIL:
-                io_wait_rd = 0;
-                return sym;
-        }
-        rb_raise(rb_eTypeError, "must be a symbol or nil");
-        return sym;
-}
-
-/*
- * call-seq:
- *
- *        Kgio.wait_writable = :method_name
- *        Kgio.wait_writable = nil
- *
- * Sets a method for kgio_write to call when a read would block.
- * This is useful for non-blocking frameworks that use Fibers,
- * as the method referred to this may cause the current Fiber
- * to yield execution.
- *
- * A special value of nil will cause Ruby to wait using the
- * rb_io_wait_writable() function.
- */
-static VALUE set_wait_wr(VALUE mod, VALUE sym)
-{
-        switch (TYPE(sym)) {
-        case T_SYMBOL:
-                io_wait_wr = SYM2ID(sym);
-                return sym;
-        case T_NIL:
-                io_wait_wr = 0;
-                return sym;
-        }
-        rb_raise(rb_eTypeError, "must be a symbol or nil");
-        return sym;
-}
-
-/*
- * call-seq:
- *
- *        Kgio.wait_writable        -> Symbol or nil
- *
- * Returns the symbolic method name of the method assigned to
- * call when EAGAIN is occurs on a Kgio::PipeMethods#kgio_write
- * or Kgio::SocketMethods#kgio_write call
- */
-static VALUE wait_wr(VALUE mod)
-{
-        return io_wait_wr ? ID2SYM(io_wait_wr) : Qnil;
-}
-
-/*
- * call-seq:
- *
- *        Kgio.wait_readable        -> Symbol or nil
- *
- * Returns the symbolic method name of the method assigned to
- * call when EAGAIN is occurs on a Kgio::PipeMethods#kgio_read
- * or Kgio::SocketMethods#kgio_read call.
- */
-static VALUE wait_rd(VALUE mod)
-{
-        return io_wait_rd ? ID2SYM(io_wait_rd) : Qnil;
-}
-
-static VALUE xaccept(void *ptr)
-{
-        struct accept_args *a = ptr;
-
-        return (VALUE)accept4(a->fd, a->addr, a->addrlen, accept4_flags);
-}
-
-#ifdef HAVE_RB_THREAD_BLOCKING_REGION
-#  include <time.h>
-/*
- * Try to use a (real) blocking accept() since that can prevent
- * thundering herds under Linux:
- * http://www.citi.umich.edu/projects/linux-scalability/reports/accept.html
- *
- * So we periodically disable non-blocking, but not too frequently
- * because other processes may set non-blocking (especially during
- * a process upgrade) with Rainbows! concurrency model changes.
- */
-static int thread_accept(struct accept_args *a, int force_nonblock)
-{
-        if (force_nonblock)
-                set_nonblocking(a->fd);
-        return (int)rb_thread_blocking_region(xaccept, a, RUBY_UBF_IO, 0);
-}
-
-static void set_blocking_or_block(int fd)
-{
-        static time_t last_set_blocking;
-        time_t now = time(NULL);
-
-        if (last_set_blocking == 0) {
-                last_set_blocking = now;
-                (void)rb_io_wait_readable(fd);
-        } else if ((now - last_set_blocking) <= 5) {
-                (void)rb_io_wait_readable(fd);
-        } else {
-                int flags = fcntl(fd, F_GETFL);
-                if (flags == -1)
-                        rb_sys_fail("fcntl(F_GETFL)");
-                if (flags & O_NONBLOCK) {
-                        flags = fcntl(fd, F_SETFL, flags & ~O_NONBLOCK);
-                        if (flags == -1)
-                                rb_sys_fail("fcntl(F_SETFL)");
-                }
-                last_set_blocking = now;
-        }
-}
-#else /* ! HAVE_RB_THREAD_BLOCKING_REGION */
-#  include <rubysig.h>
-static int thread_accept(struct accept_args *a, int force_nonblock)
-{
-        int rv;
-
-        /* always use non-blocking accept() under 1.8 for green threads */
-        set_nonblocking(a->fd);
-        TRAP_BEG;
-        rv = (int)xaccept(a);
-        TRAP_END;
-        return rv;
-}
-#define set_blocking_or_block(fd) (void)rb_io_wait_readable(fd)
-#endif /* ! HAVE_RB_THREAD_BLOCKING_REGION */
-
-static VALUE
-my_accept(VALUE io, struct sockaddr *addr, socklen_t *addrlen, int nonblock)
-{
-        int client;
-        struct accept_args a;
-
-        a.fd = my_fileno(io);
-        a.addr = addr;
-        a.addrlen = addrlen;
-retry:
-        client = thread_accept(&a, nonblock);
-        if (client == -1) {
-                switch (errno) {
-                case EAGAIN:
-                        if (nonblock)
-                                return Qnil;
-                        set_blocking_or_block(a.fd);
-#ifdef ECONNABORTED
-                case ECONNABORTED:
-#endif /* ECONNABORTED */
-#ifdef EPROTO
-                case EPROTO:
-#endif /* EPROTO */
-                case EINTR:
-                        goto retry;
-                case ENOMEM:
-                case EMFILE:
-                case ENFILE:
-#ifdef ENOBUFS
-                case ENOBUFS:
-#endif /* ENOBUFS */
-                        errno = 0;
-                        rb_gc();
-                        client = thread_accept(&a, nonblock);
-                }
-                if (client == -1) {
-                        if (errno == EINTR)
-                                goto retry;
-                        rb_sys_fail("accept");
-                }
-        }
-        return sock_for_fd(cClientSocket, client);
-}
-
-static void in_addr_set(VALUE io, struct sockaddr_in *addr)
-{
-        VALUE host = rb_str_new(0, INET_ADDRSTRLEN);
-        socklen_t addrlen = (socklen_t)INET_ADDRSTRLEN;
-        const char *name;
-
-        name = inet_ntop(AF_INET, &addr->sin_addr, RSTRING_PTR(host), addrlen);
-        if (name == NULL)
-                rb_sys_fail("inet_ntop");
-        rb_str_set_len(host, strlen(name));
-        rb_ivar_set(io, iv_kgio_addr, host);
-}
-
-/*
- * call-seq:
- *
- *        server = Kgio::TCPServer.new('0.0.0.0', 80)
- *        server.kgio_tryaccept -> Kgio::Socket or nil
- *
- * Initiates a non-blocking accept and returns a generic Kgio::Socket
- * object with the kgio_addr attribute set to the IP address of the
- * connected client on success.
- *
- * Returns nil on EAGAIN, and raises on other errors.
- */
-static VALUE tcp_tryaccept(VALUE io)
-{
-        struct sockaddr_in addr;
-        socklen_t addrlen = sizeof(struct sockaddr_in);
-        VALUE rv = my_accept(io, (struct sockaddr *)&addr, &addrlen, 1);
-
-        if (!NIL_P(rv))
-                in_addr_set(rv, &addr);
-        return rv;
-}
-
-/*
- * call-seq:
- *
- *        server = Kgio::TCPServer.new('0.0.0.0', 80)
- *        server.kgio_accept -> Kgio::Socket or nil
- *
- * Initiates a blocking accept and returns a generic Kgio::Socket
- * object with the kgio_addr attribute set to the IP address of
- * the client on success.
- *
- * On Ruby implementations using native threads, this can use a blocking
- * accept(2) (or accept4(2)) system call to avoid thundering herds.
- */
-static VALUE tcp_accept(VALUE io)
-{
-        struct sockaddr_in addr;
-        socklen_t addrlen = sizeof(struct sockaddr_in);
-        VALUE rv = my_accept(io, (struct sockaddr *)&addr, &addrlen, 0);
-
-        in_addr_set(rv, &addr);
-        return rv;
-}
-
-/*
- * call-seq:
- *
- *        server = Kgio::UNIXServer.new("/path/to/unix/socket")
- *        server.kgio_tryaccept -> Kgio::Socket or nil
- *
- * Initiates a non-blocking accept and returns a generic Kgio::Socket
- * object with the kgio_addr attribute set (to the value of
- * Kgio::LOCALHOST) on success.
- *
- * Returns nil on EAGAIN, and raises on other errors.
- */
-static VALUE unix_tryaccept(VALUE io)
-{
-        VALUE rv = my_accept(io, NULL, NULL, 1);
-
-        if (!NIL_P(rv))
-                rb_ivar_set(rv, iv_kgio_addr, localhost);
-        return rv;
-}
-
-/*
- * call-seq:
- *
- *        server = Kgio::UNIXServer.new("/path/to/unix/socket")
- *        server.kgio_accept -> Kgio::Socket or nil
- *
- * Initiates a blocking accept and returns a generic Kgio::Socket
- * object with the kgio_addr attribute set (to the value of
- * Kgio::LOCALHOST) on success.
- *
- * On Ruby implementations using native threads, this can use a blocking
- * accept(2) (or accept4(2)) system call to avoid thundering herds.
- */
-static VALUE unix_accept(VALUE io)
-{
-        VALUE rv = my_accept(io, NULL, NULL, 0);
-
-        rb_ivar_set(rv, iv_kgio_addr, localhost);
-        return rv;
-}
-
-/*
- * call-seq:
- *
- *        Kgio.accept_cloexec? -> true or false
- *
- * Returns true if newly accepted Kgio::Sockets are created with the
- * FD_CLOEXEC file descriptor flag, false if not.
- */
-static VALUE get_cloexec(VALUE mod)
-{
-        return (accept4_flags & A4_SOCK_CLOEXEC) ==
-            A4_SOCK_CLOEXEC ? Qtrue : Qfalse;
-}
-
-/*
- *
- * call-seq:
- *
- *        Kgio.accept_nonblock? -> true or false
- *
- * Returns true if newly accepted Kgio::Sockets are created with the
- * O_NONBLOCK file status flag, false if not.
- */
-static VALUE get_nonblock(VALUE mod)
-{
-        return (accept4_flags & A4_SOCK_NONBLOCK) ==
-            A4_SOCK_NONBLOCK ? Qtrue : Qfalse;
-}
-
-/*
- * call-seq:
- *
- *        Kgio.accept_cloexec = true
- *        Kgio.accept_clocexec = false
- *
- * Sets whether or not Kgio::Socket objects created by
- * TCPServer#kgio_accept,
- * TCPServer#kgio_tryaccept,
- * UNIXServer#kgio_accept,
- * and UNIXServer#kgio_tryaccept
- * are created with the FD_CLOEXEC file descriptor flag.
- *
- * This is on by default, as there is little reason to deal to enable
- * it for client sockets on a socket server.
- */
-static VALUE set_cloexec(VALUE mod, VALUE boolean)
-{
-        switch (TYPE(boolean)) {
-        case T_TRUE:
-                accept4_flags |= A4_SOCK_CLOEXEC;
-                return boolean;
-        case T_FALSE:
-                accept4_flags &= ~A4_SOCK_CLOEXEC;
-                return boolean;
-        }
-        rb_raise(rb_eTypeError, "not true or false");
-        return Qnil;
-}
-
-/*
- * call-seq:
- *
- *        Kgio.accept_nonblock = true
- *        Kgio.accept_nonblock = false
- *
- * Sets whether or not Kgio::Socket objects created by
- * TCPServer#kgio_accept,
- * TCPServer#kgio_tryaccept,
- * UNIXServer#kgio_accept,
- * and UNIXServer#kgio_tryaccept
- * are created with the O_NONBLOCK file status flag.
- *
- * This defaults to +false+ for GNU/Linux where MSG_DONTWAIT is
- * available (and on newer GNU/Linux, accept4() may also set
- * the non-blocking flag.  This defaults to +true+ on non-GNU/Linux
- * systems.
- */
-static VALUE set_nonblock(VALUE mod, VALUE boolean)
-{
-        switch (TYPE(boolean)) {
-        case T_TRUE:
-                accept4_flags |= A4_SOCK_NONBLOCK;
-                return boolean;
-        case T_FALSE:
-                accept4_flags &= ~A4_SOCK_NONBLOCK;
-                return boolean;
-        }
-        rb_raise(rb_eTypeError, "not true or false");
-        return Qnil;
-}
-
-static void close_fail(int fd, const char *msg)
-{
-        int saved_errno = errno;
-        (void)close(fd);
-        errno = saved_errno;
-        rb_sys_fail(msg);
-}
-
-#ifdef SOCK_NONBLOCK
-#  define MY_SOCK_STREAM (SOCK_STREAM|SOCK_NONBLOCK)
-#else
-#  define MY_SOCK_STREAM SOCK_STREAM
-#endif /* ! SOCK_NONBLOCK */
-
-static VALUE
-my_connect(VALUE klass, int io_wait, int domain, void *addr, socklen_t addrlen)
-{
-        int fd = socket(domain, MY_SOCK_STREAM, 0);
-
-        if (fd == -1) {
-                switch (errno) {
-                case EMFILE:
-                case ENFILE:
-#ifdef ENOBUFS
-                case ENOBUFS:
-#endif /* ENOBUFS */
-                        errno = 0;
-                        rb_gc();
-                        fd = socket(domain, MY_SOCK_STREAM, 0);
-                }
-                if (fd == -1)
-                        rb_sys_fail("socket");
-        }
-
-#ifndef SOCK_NONBLOCK
-        if (fcntl(fd, F_SETFL, O_RDWR | O_NONBLOCK) == -1)
-                close_fail(fd, "fcntl(F_SETFL, O_RDWR | O_NONBLOCK)");
-#endif /* SOCK_NONBLOCK */
-
-        if (connect(fd, addr, addrlen) == -1) {
-                if (errno == EINPROGRESS) {
-                        VALUE io = sock_for_fd(klass, fd);
-
-                        if (io_wait) {
-                                errno = EAGAIN;
-                                wait_writable(io, fd);
-                        }
-                        return io;
-                }
-                close_fail(fd, "connect");
-        }
-        return sock_for_fd(klass, fd);
-}
-
-static VALUE tcp_connect(VALUE klass, VALUE ip, VALUE port, int io_wait)
-{
-        struct sockaddr_in addr = { 0 };
-
-        addr.sin_family = AF_INET;
-        addr.sin_port = htons((unsigned short)NUM2INT(port));
-
-        switch (inet_pton(AF_INET, StringValuePtr(ip), &addr.sin_addr)) {
-        case 1:
-                return my_connect(klass, io_wait, PF_INET, &addr, sizeof(addr));
-        case -1:
-                rb_sys_fail("inet_pton");
-        }
-        rb_raise(rb_eArgError, "invalid address: %s", StringValuePtr(ip));
-
-        return Qnil;
-}
-
-/*
- * call-seq:
- *
- *        Kgio::TCPSocket.new('127.0.0.1', 80) -> socket
- *
- * Creates a new Kgio::TCPSocket object and initiates a
- * non-blocking connection.
- *
- * This may block and call any method assigned to Kgio.wait_writable.
- *
- * Unlike the TCPSocket.new in Ruby, this does NOT perform DNS
- * lookups (which is subject to a different set of timeouts and
- * best handled elsewhere).
- */
-static VALUE kgio_tcp_connect(VALUE klass, VALUE ip, VALUE port)
-{
-        return tcp_connect(klass, ip, port, 1);
-}
-
-/*
- * call-seq:
- *
- *        Kgio::TCPSocket.start('127.0.0.1', 80) -> socket
- *
- * Creates a new Kgio::TCPSocket object and initiates a
- * non-blocking connection.  The caller should select/poll
- * on the socket for writability before attempting to write
- * or optimistically attempt a write and handle Kgio::WaitWritable
- * or Errno::EAGAIN.
- *
- * Unlike the TCPSocket.new in Ruby, this does NOT perform DNS
- * lookups (which is subject to a different set of timeouts and
- * best handled elsewhere).
- */
-static VALUE kgio_tcp_start(VALUE klass, VALUE ip, VALUE port)
-{
-        return tcp_connect(klass, ip, port, 0);
-}
-
-static VALUE unix_connect(VALUE klass, VALUE path, int io_wait)
-{
-        struct sockaddr_un addr = { 0 };
-        long len;
-
-        StringValue(path);
-        len = RSTRING_LEN(path);
-        if (sizeof(addr.sun_path) <= len)
-                rb_raise(rb_eArgError,
-                         "too long unix socket path (max: %dbytes)",
-                         (int)sizeof(addr.sun_path)-1);
-
-        memcpy(addr.sun_path, RSTRING_PTR(path), len);
-        addr.sun_family = AF_UNIX;
-
-        return my_connect(klass, io_wait, PF_UNIX, &addr, sizeof(addr));
-}
-
-/*
- * call-seq:
- *
- *        Kgio::UNIXSocket.new("/path/to/unix/socket") -> socket
- *
- * Creates a new Kgio::UNIXSocket object and initiates a
- * non-blocking connection.
- *
- * This may block and call any method assigned to Kgio.wait_writable.
- */
-static VALUE kgio_unix_connect(VALUE klass, VALUE path)
-{
-        return unix_connect(klass, path, 1);
-}
-
-/*
- * call-seq:
- *
- *        Kgio::UNIXSocket.start("/path/to/unix/socket") -> socket
- *
- * Creates a new Kgio::UNIXSocket object and initiates a
- * non-blocking connection.  The caller should select/poll
- * on the socket for writability before attempting to write
- * or optimistically attempt a write and handle Kgio::WaitWritable
- * or Errno::EAGAIN.
- */
-static VALUE kgio_unix_start(VALUE klass, VALUE path)
-{
-        return unix_connect(klass, path, 0);
-}
-
-static VALUE stream_connect(VALUE klass, VALUE addr, int io_wait)
-{
-        int domain;
-        socklen_t addrlen;
-        struct sockaddr *sockaddr;
-
-        if (TYPE(addr) == T_STRING) {
-                sockaddr = (struct sockaddr *)(RSTRING_PTR(addr));
-                addrlen = (socklen_t)RSTRING_LEN(addr);
-        } else {
-                rb_raise(rb_eTypeError, "invalid address");
-        }
-        switch (((struct sockaddr_in *)(sockaddr))->sin_family) {
-        case AF_UNIX: domain = PF_UNIX; break;
-        case AF_INET: domain = PF_INET; break;
-#ifdef AF_INET6 /* IPv6 support incomplete */
-        case AF_INET6: domain = PF_INET6; break;
-#endif /* AF_INET6 */
-        default:
-                rb_raise(rb_eArgError, "invalid address family");
-        }
-
-        return my_connect(klass, io_wait, domain, sockaddr, addrlen);
-}
-
-/* call-seq:
- *
- *      addr = Socket.pack_sockaddr_in(80, 'example.com')
- *        Kgio::Socket.connect(addr) -> socket
- *
- *      addr = Socket.pack_sockaddr_un("/path/to/unix/socket")
- *        Kgio::Socket.connect(addr) -> socket
- *
- * Creates a generic Kgio::Socket object and initiates a
- * non-blocking connection.
- *
- * This may block and call any method assigned to Kgio.wait_writable.
- */
-static VALUE kgio_connect(VALUE klass, VALUE addr)
-{
-        return stream_connect(klass, addr, 1);
-}
-
-/* call-seq:
- *
- *      addr = Socket.pack_sockaddr_in(80, 'example.com')
- *        Kgio::Socket.start(addr) -> socket
- *
- *      addr = Socket.pack_sockaddr_un("/path/to/unix/socket")
- *        Kgio::Socket.start(addr) -> socket
- *
- * Creates a generic Kgio::Socket object and initiates a
- * non-blocking connection.  The caller should select/poll
- * on the socket for writability before attempting to write
- * or optimistically attempt a write and handle Kgio::WaitWritable
- * or Errno::EAGAIN.
- */
-static VALUE kgio_start(VALUE klass, VALUE addr)
-{
-        return stream_connect(klass, addr, 0);
-}
-
-static VALUE set_accepted(VALUE klass, VALUE aclass)
-{
-        VALUE tmp;
-
-        if (NIL_P(aclass))
-                aclass = cSocket;
-
-        tmp = rb_funcall(aclass, rb_intern("included_modules"), 0, 0);
-        tmp = rb_funcall(tmp, rb_intern("include?"), 1, mSocketMethods);
-
-        if (tmp != Qtrue)
-                rb_raise(rb_eTypeError,
-                         "class must include Kgio::SocketMethods");
-
-        cClientSocket = aclass;
-
-        return aclass;
-}
-
-static VALUE get_accepted(VALUE klass)
-{
-        return cClientSocket;
-}
+#include "kgio.h"
 
 void Init_kgio_ext(void)
 {
-        VALUE mKgio = rb_define_module("Kgio");
-        VALUE mPipeMethods;
-        VALUE cUNIXServer, cTCPServer, cUNIXSocket, cTCPSocket;
-
-        rb_require("socket");
-
-        /*
-         * Document-module: Kgio::Socket
-         *
-         * A generic socket class with Kgio::SocketMethods included.
-         * This is returned by all Kgio methods that accept(2) a connected
-         * stream socket.
-         */
-        cSocket = rb_const_get(rb_cObject, rb_intern("Socket"));
-        cSocket = rb_define_class_under(mKgio, "Socket", cSocket);
-        cClientSocket = cSocket;
-
-        localhost = rb_str_new2("127.0.0.1");
-
-        /*
-         * The IPv4 address of UNIX domain sockets, useful for creating
-         * Rack (and CGI) servers that also serve HTTP traffic over
-         * UNIX domain sockets.
-         */
-        rb_const_set(mKgio, rb_intern("LOCALHOST"), localhost);
-
-        /*
-         * Document-module: Kgio::WaitReadable
-         *
-         * PipeMethods#kgio_tryread and SocketMethods#kgio_tryread will
-         * return this constant when waiting for a read is required.
-         */
-        mKgio_WaitReadable = rb_define_module_under(mKgio, "WaitReadable");
-
-        /*
-         * Document-module: Kgio::WaitWritable
-         *
-         * PipeMethods#kgio_trywrite and SocketMethods#kgio_trywrite will
-         * return this constant when waiting for a read is required.
-         */
-        mKgio_WaitWritable = rb_define_module_under(mKgio, "WaitWritable");
-
-        rb_define_singleton_method(mKgio, "wait_readable=", set_wait_rd, 1);
-        rb_define_singleton_method(mKgio, "wait_writable=", set_wait_wr, 1);
-        rb_define_singleton_method(mKgio, "wait_readable", wait_rd, 0);
-        rb_define_singleton_method(mKgio, "wait_writable", wait_wr, 0);
-        rb_define_singleton_method(mKgio, "accept_cloexec?", get_cloexec, 0);
-        rb_define_singleton_method(mKgio, "accept_cloexec=", set_cloexec, 1);
-        rb_define_singleton_method(mKgio, "accept_nonblock?", get_nonblock, 0);
-        rb_define_singleton_method(mKgio, "accept_nonblock=", set_nonblock, 1);
-        rb_define_singleton_method(mKgio, "accept_class=", set_accepted, 1);
-        rb_define_singleton_method(mKgio, "accept_class", get_accepted, 0);
-
-        /*
-         * Document-module: Kgio::PipeMethods
-         *
-         * This module may be used used to create classes that respond to
-         * various Kgio methods for reading and writing.  This is included
-         * in Kgio::Pipe by default.
-         */
-        mPipeMethods = rb_define_module_under(mKgio, "PipeMethods");
-        rb_define_method(mPipeMethods, "kgio_read", kgio_read, -1);
-        rb_define_method(mPipeMethods, "kgio_write", kgio_write, 1);
-        rb_define_method(mPipeMethods, "kgio_tryread", kgio_tryread, -1);
-        rb_define_method(mPipeMethods, "kgio_trywrite", kgio_trywrite, 1);
-
-        /*
-         * Document-module: Kgio::SocketMethods
-         *
-         * This method behaves like Kgio::PipeMethods, but contains
-         * optimizations for sockets on certain operating systems
-         * (e.g. GNU/Linux).
-         */
-        mSocketMethods = rb_define_module_under(mKgio, "SocketMethods");
-        rb_define_method(mSocketMethods, "kgio_read", kgio_recv, -1);
-        rb_define_method(mSocketMethods, "kgio_write", kgio_send, 1);
-        rb_define_method(mSocketMethods, "kgio_tryread", kgio_tryrecv, -1);
-        rb_define_method(mSocketMethods, "kgio_trywrite", kgio_trysend, 1);
-
-        /*
-         * Returns the client IPv4 address of the socket in dotted quad
-         * form as a string.  This is always the value of the
-         * Kgio::LOCALHOST constant for UNIX domain sockets.
-         */
-        rb_define_attr(mSocketMethods, "kgio_addr", 1, 1);
-
-        rb_include_module(cSocket, mSocketMethods);
-        rb_define_singleton_method(cSocket, "new", kgio_connect, 1);
-        rb_define_singleton_method(cSocket, "start", kgio_start, 1);
-
-        cUNIXServer = rb_const_get(rb_cObject, rb_intern("UNIXServer"));
-        cUNIXServer = rb_define_class_under(mKgio, "UNIXServer", cUNIXServer);
-        rb_define_method(cUNIXServer, "kgio_tryaccept", unix_tryaccept, 0);
-        rb_define_method(cUNIXServer, "kgio_accept", unix_accept, 0);
-
-        cTCPServer = rb_const_get(rb_cObject, rb_intern("TCPServer"));
-        cTCPServer = rb_define_class_under(mKgio, "TCPServer", cTCPServer);
-        rb_define_method(cTCPServer, "kgio_tryaccept", tcp_tryaccept, 0);
-        rb_define_method(cTCPServer, "kgio_accept", tcp_accept, 0);
-
-        cTCPSocket = rb_const_get(rb_cObject, rb_intern("TCPSocket"));
-        cTCPSocket = rb_define_class_under(mKgio, "TCPSocket", cTCPSocket);
-        rb_include_module(cTCPSocket, mSocketMethods);
-        rb_define_singleton_method(cTCPSocket, "new", kgio_tcp_connect, 2);
-        rb_define_singleton_method(cTCPSocket, "start", kgio_tcp_start, 2);
-
-        cUNIXSocket = rb_const_get(rb_cObject, rb_intern("UNIXSocket"));
-        cUNIXSocket = rb_define_class_under(mKgio, "UNIXSocket", cUNIXSocket);
-        rb_include_module(cUNIXSocket, mSocketMethods);
-        rb_define_singleton_method(cUNIXSocket, "new", kgio_unix_connect, 1);
-        rb_define_singleton_method(cUNIXSocket, "start", kgio_unix_start, 1);
+        VALUE mKgio = rb_const_get(rb_cObject, rb_intern("Kgio"));
 
-        iv_kgio_addr = rb_intern("@kgio_addr");
-        init_sock_for_fd();
+        init_kgio_wait(mKgio);
+        init_kgio_read_write(mKgio);
+        init_kgio_connect(mKgio);
+        init_kgio_accept(mKgio);
 }
diff --git a/ext/kgio/missing/accept4.h b/ext/kgio/missing/accept4.h
index 8fc37c4..cd8be79 100644
--- a/ext/kgio/missing/accept4.h
+++ b/ext/kgio/missing/accept4.h
@@ -1,7 +1,4 @@
-#ifdef HAVE_ACCEPT4
-#  define A4_SOCK_CLOEXEC SOCK_CLOEXEC
-#  define A4_SOCK_NONBLOCK SOCK_NONBLOCK
-#else
+#ifndef HAVE_ACCEPT4
 #  ifndef _GNU_SOURCE
 #    define _GNU_SOURCE
 #  endif
@@ -9,15 +6,12 @@
 #  include <sys/socket.h>
 #  ifndef SOCK_CLOEXEC
 #    if (FD_CLOEXEC == O_NONBLOCK)
-#      define A4_SOCK_CLOEXEC 1
-#      define A4_SOCK_NONBLOCK 2
+#      define SOCK_CLOEXEC 1
+#      define SOCK_NONBLOCK 2
 #    else
-#      define A4_SOCK_CLOEXEC FD_CLOEXEC
-#      define A4_SOCK_NONBLOCK O_NONBLOCK
+#      define SOCK_CLOEXEC FD_CLOEXEC
+#      define SOCK_NONBLOCK O_NONBLOCK
 #    endif
-#  else
-#    define A4_SOCK_CLOEXEC SOCK_CLOEXEC
-#    define A4_SOCK_NONBLOCK SOCK_NONBLOCK
 #  endif
 
 /* accept4() is currently a Linux-only goodie */
@@ -27,7 +21,7 @@ accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags)
         int fd = accept(sockfd, addr, addrlen);
 
         if (fd >= 0) {
-                if ((flags & A4_SOCK_CLOEXEC) == A4_SOCK_CLOEXEC)
+                if ((flags & SOCK_CLOEXEC) == SOCK_CLOEXEC)
                         (void)fcntl(fd, F_SETFD, FD_CLOEXEC);
 
                 /*
@@ -36,7 +30,7 @@ accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags)
                  * Linux, so fcntl() is completely unnecessary
                  * in most cases...
                  */
-                if ((flags & A4_SOCK_NONBLOCK) == A4_SOCK_NONBLOCK) {
+                if ((flags & SOCK_NONBLOCK) == SOCK_NONBLOCK) {
                         int fl = fcntl(fd, F_GETFL);
 
                         if ((fl & O_NONBLOCK) == 0)
diff --git a/ext/kgio/read_write.c b/ext/kgio/read_write.c
new file mode 100644
index 0000000..7f1748a
--- /dev/null
+++ b/ext/kgio/read_write.c
@@ -0,0 +1,325 @@
+#include "kgio.h"
+static VALUE mKgio_WaitReadable, mKgio_WaitWritable;
+
+/*
+ * we know MSG_DONTWAIT works properly on all stream sockets under Linux
+ * we can define this macro for other platforms as people care and
+ * notice.
+ */
+#if defined(__linux__) && ! defined(USE_MSG_DONTWAIT)
+#  define USE_MSG_DONTWAIT
+#endif
+
+static void prepare_read(struct io_args *a, int argc, VALUE *argv, VALUE io)
+{
+        VALUE length;
+
+        a->io = io;
+        a->fd = my_fileno(io);
+        rb_scan_args(argc, argv, "11", &length, &a->buf);
+        a->len = NUM2LONG(length);
+        if (NIL_P(a->buf)) {
+                a->buf = rb_str_new(NULL, a->len);
+        } else {
+                StringValue(a->buf);
+                rb_str_resize(a->buf, a->len);
+        }
+        a->ptr = RSTRING_PTR(a->buf);
+}
+
+static int read_check(struct io_args *a, long n, const char *msg, int io_wait)
+{
+        if (n == -1) {
+                if (errno == EINTR)
+                        return -1;
+                rb_str_set_len(a->buf, 0);
+                if (errno == EAGAIN) {
+                        if (io_wait) {
+                                kgio_wait_readable(a->io, a->fd);
+
+                                /* buf may be modified in other thread/fiber */
+                                rb_str_resize(a->buf, a->len);
+                                a->ptr = RSTRING_PTR(a->buf);
+                                return -1;
+                        } else {
+                                a->buf = mKgio_WaitReadable;
+                                return 0;
+                        }
+                }
+                rb_sys_fail(msg);
+        }
+        rb_str_set_len(a->buf, n);
+        if (n == 0)
+                a->buf = Qnil;
+        return 0;
+}
+
+static VALUE my_read(int io_wait, int argc, VALUE *argv, VALUE io)
+{
+        struct io_args a;
+        long n;
+
+        prepare_read(&a, argc, argv, io);
+        set_nonblocking(a.fd);
+retry:
+        n = (long)read(a.fd, a.ptr, a.len);
+        if (read_check(&a, n, "read", io_wait) != 0)
+                goto retry;
+        return a.buf;
+}
+
+/*
+ * call-seq:
+ *
+ *        io.kgio_read(maxlen)           ->  buffer
+ *        io.kgio_read(maxlen, buffer)   ->  buffer
+ *
+ * Reads at most maxlen bytes from the stream socket.  Returns with a
+ * newly allocated buffer, or may reuse an existing buffer if supplied.
+ *
+ * Calls the method assigned to Kgio.wait_readable, or blocks in a
+ * thread-safe manner for writability.
+ *
+ * Returns nil on EOF.
+ *
+ * This behaves like read(2) and IO#readpartial, NOT fread(3) or
+ * IO#read which possess read-in-full behavior.
+ */
+static VALUE kgio_read(int argc, VALUE *argv, VALUE io)
+{
+        return my_read(1, argc, argv, io);
+}
+
+/*
+ * call-seq:
+ *
+ *        io.kgio_tryread(maxlen)           ->  buffer
+ *        io.kgio_tryread(maxlen, buffer)   ->  buffer
+ *
+ * Reads at most maxlen bytes from the stream socket.  Returns with a
+ * newly allocated buffer, or may reuse an existing buffer if supplied.
+ *
+ * Returns nil on EOF.
+ *
+ * Returns Kgio::WaitReadable if EAGAIN is encountered.
+ */
+static VALUE kgio_tryread(int argc, VALUE *argv, VALUE io)
+{
+        return my_read(0, argc, argv, io);
+}
+
+#ifdef USE_MSG_DONTWAIT
+static VALUE my_recv(int io_wait, int argc, VALUE *argv, VALUE io)
+{
+        struct io_args a;
+        long n;
+
+        prepare_read(&a, argc, argv, io);
+retry:
+        n = (long)recv(a.fd, a.ptr, a.len, MSG_DONTWAIT);
+        if (read_check(&a, n, "recv", io_wait) != 0)
+                goto retry;
+        return a.buf;
+}
+
+/*
+ * This method may be optimized on some systems (e.g. GNU/Linux) to use
+ * MSG_DONTWAIT to avoid explicitly setting the O_NONBLOCK flag via fcntl.
+ * Otherwise this is the same as Kgio::PipeMethods#kgio_read
+ */
+static VALUE kgio_recv(int argc, VALUE *argv, VALUE io)
+{
+        return my_recv(1, argc, argv, io);
+}
+
+/*
+ * This method may be optimized on some systems (e.g. GNU/Linux) to use
+ * MSG_DONTWAIT to avoid explicitly setting the O_NONBLOCK flag via fcntl.
+ * Otherwise this is the same as Kgio::PipeMethods#kgio_tryread
+ */
+static VALUE kgio_tryrecv(int argc, VALUE *argv, VALUE io)
+{
+        return my_recv(0, argc, argv, io);
+}
+#else /* ! USE_MSG_DONTWAIT */
+#  define kgio_recv kgio_read
+#  define kgio_tryrecv kgio_tryread
+#endif /* USE_MSG_DONTWAIT */
+
+static void prepare_write(struct io_args *a, VALUE io, VALUE str)
+{
+        a->buf = (TYPE(str) == T_STRING) ? str : rb_obj_as_string(str);
+        a->ptr = RSTRING_PTR(a->buf);
+        a->len = RSTRING_LEN(a->buf);
+        a->io = io;
+        a->fd = my_fileno(io);
+}
+
+static int write_check(struct io_args *a, long n, const char *msg, int io_wait)
+{
+        if (a->len == n) {
+done:
+                a->buf = Qnil;
+        } else if (n == -1) {
+                if (errno == EINTR)
+                        return -1;
+                if (errno == EAGAIN) {
+                        long written = RSTRING_LEN(a->buf) - a->len;
+
+                        if (io_wait) {
+                                kgio_wait_writable(a->io, a->fd);
+
+                                /* buf may be modified in other thread/fiber */
+                                a->len = RSTRING_LEN(a->buf) - written;
+                                if (a->len <= 0)
+                                        goto done;
+                                a->ptr = RSTRING_PTR(a->buf) + written;
+                                return -1;
+                        } else if (written > 0) {
+                                a->buf = rb_str_new(a->ptr + n, a->len - n);
+                        } else {
+                                a->buf = mKgio_WaitWritable;
+                        }
+                        return 0;
+                }
+                rb_sys_fail(msg);
+        } else {
+                assert(n >= 0 && n < a->len && "write/send syscall broken?");
+                a->ptr += n;
+                a->len -= n;
+                return -1;
+        }
+        return 0;
+}
+
+static VALUE my_write(VALUE io, VALUE str, int io_wait)
+{
+        struct io_args a;
+        long n;
+
+        prepare_write(&a, io, str);
+        set_nonblocking(a.fd);
+retry:
+        n = (long)write(a.fd, a.ptr, a.len);
+        if (write_check(&a, n, "write", io_wait) != 0)
+                goto retry;
+        return a.buf;
+}
+
+/*
+ * call-seq:
+ *
+ *        io.kgio_write(str)        -> nil
+ *
+ * Returns nil when the write completes.
+ *
+ * Calls the method Kgio.wait_writable if it is set.  Otherwise this
+ * blocks in a thread-safe manner until all data is written or a
+ * fatal error occurs.
+ */
+static VALUE kgio_write(VALUE io, VALUE str)
+{
+        return my_write(io, str, 1);
+}
+
+/*
+ * call-seq:
+ *
+ *        io.kgio_trywrite(str)        -> nil or Kgio::WaitWritable
+ *
+ * Returns nil if the write was completed in full.
+ *
+ * Returns a String containing the unwritten portion if EAGAIN
+ * was encountered, but some portion was successfully written.
+ *
+ * Returns Kgio::WaitWritable if EAGAIN is encountered and nothing
+ * was written.
+ */
+static VALUE kgio_trywrite(VALUE io, VALUE str)
+{
+        return my_write(io, str, 0);
+}
+
+#ifdef USE_MSG_DONTWAIT
+/*
+ * This method behaves like Kgio::PipeMethods#kgio_write, except
+ * it will use send(2) with the MSG_DONTWAIT flag on sockets to
+ * avoid unnecessary calls to fcntl(2).
+ */
+static VALUE my_send(VALUE io, VALUE str, int io_wait)
+{
+        struct io_args a;
+        long n;
+
+        prepare_write(&a, io, str);
+retry:
+        n = (long)send(a.fd, a.ptr, a.len, MSG_DONTWAIT);
+        if (write_check(&a, n, "send", io_wait) != 0)
+                goto retry;
+        return a.buf;
+}
+
+/*
+ * This method may be optimized on some systems (e.g. GNU/Linux) to use
+ * MSG_DONTWAIT to avoid explicitly setting the O_NONBLOCK flag via fcntl.
+ * Otherwise this is the same as Kgio::PipeMethods#kgio_write
+ */
+static VALUE kgio_send(VALUE io, VALUE str)
+{
+        return my_send(io, str, 1);
+}
+
+/*
+ * This method may be optimized on some systems (e.g. GNU/Linux) to use
+ * MSG_DONTWAIT to avoid explicitly setting the O_NONBLOCK flag via fcntl.
+ * Otherwise this is the same as Kgio::PipeMethods#kgio_trywrite
+ */
+static VALUE kgio_trysend(VALUE io, VALUE str)
+{
+        return my_send(io, str, 0);
+}
+#else /* ! USE_MSG_DONTWAIT */
+#  define kgio_send kgio_write
+#  define kgio_trysend kgio_trywrite
+#endif /* ! USE_MSG_DONTWAIT */
+
+void init_kgio_read_write(VALUE mKgio)
+{
+        VALUE mPipeMethods, mSocketMethods;
+
+        mKgio_WaitReadable = rb_const_get(mKgio, rb_intern("WaitReadable"));
+        mKgio_WaitWritable = rb_const_get(mKgio, rb_intern("WaitWritable"));
+
+        /*
+         * Document-module: Kgio::PipeMethods
+         *
+         * This module may be used used to create classes that respond to
+         * various Kgio methods for reading and writing.  This is included
+         * in Kgio::Pipe by default.
+         */
+        mPipeMethods = rb_define_module_under(mKgio, "PipeMethods");
+        rb_define_method(mPipeMethods, "kgio_read", kgio_read, -1);
+        rb_define_method(mPipeMethods, "kgio_write", kgio_write, 1);
+        rb_define_method(mPipeMethods, "kgio_tryread", kgio_tryread, -1);
+        rb_define_method(mPipeMethods, "kgio_trywrite", kgio_trywrite, 1);
+
+        /*
+         * Document-module: Kgio::SocketMethods
+         *
+         * This method behaves like Kgio::PipeMethods, but contains
+         * optimizations for sockets on certain operating systems
+         * (e.g. GNU/Linux).
+         */
+        mSocketMethods = rb_define_module_under(mKgio, "SocketMethods");
+        rb_define_method(mSocketMethods, "kgio_read", kgio_recv, -1);
+        rb_define_method(mSocketMethods, "kgio_write", kgio_send, 1);
+        rb_define_method(mSocketMethods, "kgio_tryread", kgio_tryrecv, -1);
+        rb_define_method(mSocketMethods, "kgio_trywrite", kgio_trysend, 1);
+
+        /*
+         * Returns the client IPv4 address of the socket in dotted quad
+         * form as a string.  This is always the value of the
+         * Kgio::LOCALHOST constant for UNIX domain sockets.
+         */
+        rb_define_attr(mSocketMethods, "kgio_addr", 1, 1);
+}
diff --git a/ext/kgio/sock_for_fd.h b/ext/kgio/sock_for_fd.h
index db5cba5..ab704b1 100644
--- a/ext/kgio/sock_for_fd.h
+++ b/ext/kgio/sock_for_fd.h
@@ -1,3 +1,5 @@
+#ifndef SOCK_FOR_FD_H
+#define SOCK_FOR_FD_H
 #include <ruby.h>
 #ifdef HAVE_RUBY_IO_H
 #  include <ruby/io.h>
@@ -64,3 +66,4 @@ static void init_sock_for_fd(void)
 #if SOCK_FOR_FD > 0
 #  define init_sock_for_fd() if (0)
 #endif
+#endif /* SOCK_FOR_FD_H */
diff --git a/ext/kgio/wait.c b/ext/kgio/wait.c
new file mode 100644
index 0000000..4fe6d25
--- /dev/null
+++ b/ext/kgio/wait.c
@@ -0,0 +1,115 @@
+#include "kgio.h"
+
+static ID io_wait_rd, io_wait_wr;
+
+void kgio_wait_readable(VALUE io, int fd)
+{
+        if (io_wait_rd) {
+                (void)rb_funcall(io, io_wait_rd, 0, 0);
+        } else {
+                if (!rb_io_wait_readable(fd))
+                        rb_sys_fail("wait readable");
+        }
+}
+
+void kgio_wait_writable(VALUE io, int fd)
+{
+        if (io_wait_wr) {
+                (void)rb_funcall(io, io_wait_wr, 0, 0);
+        } else {
+                if (!rb_io_wait_writable(fd))
+                        rb_sys_fail("wait writable");
+        }
+}
+
+/*
+ * call-seq:
+ *
+ *        Kgio.wait_readable = :method_name
+ *        Kgio.wait_readable = nil
+ *
+ * Sets a method for kgio_read to call when a read would block.
+ * This is useful for non-blocking frameworks that use Fibers,
+ * as the method referred to this may cause the current Fiber
+ * to yield execution.
+ *
+ * A special value of nil will cause Ruby to wait using the
+ * rb_io_wait_readable() function.
+ */
+static VALUE set_wait_rd(VALUE mod, VALUE sym)
+{
+        switch (TYPE(sym)) {
+        case T_SYMBOL:
+                io_wait_rd = SYM2ID(sym);
+                return sym;
+        case T_NIL:
+                io_wait_rd = 0;
+                return sym;
+        }
+        rb_raise(rb_eTypeError, "must be a symbol or nil");
+        return sym;
+}
+
+/*
+ * call-seq:
+ *
+ *        Kgio.wait_writable = :method_name
+ *        Kgio.wait_writable = nil
+ *
+ * Sets a method for kgio_write to call when a read would block.
+ * This is useful for non-blocking frameworks that use Fibers,
+ * as the method referred to this may cause the current Fiber
+ * to yield execution.
+ *
+ * A special value of nil will cause Ruby to wait using the
+ * rb_io_wait_writable() function.
+ */
+static VALUE set_wait_wr(VALUE mod, VALUE sym)
+{
+        switch (TYPE(sym)) {
+        case T_SYMBOL:
+                io_wait_wr = SYM2ID(sym);
+                return sym;
+        case T_NIL:
+                io_wait_wr = 0;
+                return sym;
+        }
+        rb_raise(rb_eTypeError, "must be a symbol or nil");
+        return sym;
+}
+
+/*
+ * call-seq:
+ *
+ *        Kgio.wait_writable        -> Symbol or nil
+ *
+ * Returns the symbolic method name of the method assigned to
+ * call when EAGAIN is occurs on a Kgio::PipeMethods#kgio_write
+ * or Kgio::SocketMethods#kgio_write call
+ */
+static VALUE wait_wr(VALUE mod)
+{
+        return io_wait_wr ? ID2SYM(io_wait_wr) : Qnil;
+}
+
+/*
+ * call-seq:
+ *
+ *        Kgio.wait_readable        -> Symbol or nil
+ *
+ * Returns the symbolic method name of the method assigned to
+ * call when EAGAIN is occurs on a Kgio::PipeMethods#kgio_read
+ * or Kgio::SocketMethods#kgio_read call.
+ */
+static VALUE wait_rd(VALUE mod)
+{
+        return io_wait_rd ? ID2SYM(io_wait_rd) : Qnil;
+}
+
+void init_kgio_wait(VALUE mKgio)
+{
+        rb_define_singleton_method(mKgio, "wait_readable=", set_wait_rd, 1);
+        rb_define_singleton_method(mKgio, "wait_writable=", set_wait_wr, 1);
+        rb_define_singleton_method(mKgio, "wait_readable", wait_rd, 0);
+        rb_define_singleton_method(mKgio, "wait_writable", wait_wr, 0);
+}
diff --git a/lib/kgio.rb b/lib/kgio.rb
index e104f46..ae7eac8 100644
--- a/lib/kgio.rb
+++ b/lib/kgio.rb
@@ -1,3 +1,21 @@
+# -*- encoding: binary -*-
+require 'socket'
+module Kgio
+
+  # The IPv4 address of UNIX domain sockets, useful for creating
+  # Rack (and CGI) servers that also serve HTTP traffic over
+  # UNIX domain sockets.
+  LOCALHOST = '127.0.0.1'
+end
+
+# Kgio::PipeMethods#kgio_tryread and Kgio::SocketMethods#kgio_tryread will
+# return this constant when waiting for a read is required.
+module Kgio::WaitReadable; end
+
+# PipeMethods#kgio_trywrite and SocketMethods#kgio_trywrite will
+# return this constant when waiting for a read is required.
+module Kgio::WaitWritable; end
+
 require 'kgio_ext'
 
 # use Kgio::Pipe.popen and Kgio::Pipe.new instead of IO.popen