From 2a6115a89d5c95428bd6c3e0bc10e5a3a4c3c3be Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Wed, 29 Sep 2010 18:25:58 -0700 Subject: refactor and split into separate files Making the code easier to read and navigate. This also frees us from having to use the stupid A4_ prefix for accept4(2) flags since it conflicts with the socket(2) ones. --- ext/kgio/accept.c | 371 +++++++++++++++ ext/kgio/connect.c | 256 +++++++++++ ext/kgio/kgio.h | 40 ++ ext/kgio/kgio_ext.c | 1096 +------------------------------------------- ext/kgio/missing/accept4.h | 20 +- ext/kgio/read_write.c | 325 +++++++++++++ ext/kgio/sock_for_fd.h | 3 + ext/kgio/wait.c | 115 +++++ lib/kgio.rb | 18 + 9 files changed, 1141 insertions(+), 1103 deletions(-) create mode 100644 ext/kgio/accept.c create mode 100644 ext/kgio/connect.c create mode 100644 ext/kgio/kgio.h create mode 100644 ext/kgio/read_write.c create mode 100644 ext/kgio/wait.c diff --git a/ext/kgio/accept.c b/ext/kgio/accept.c new file mode 100644 index 0000000..e1d1541 --- /dev/null +++ b/ext/kgio/accept.c @@ -0,0 +1,371 @@ +#include "kgio.h" +#include "missing/accept4.h" +#include "sock_for_fd.h" + +static VALUE localhost; +static VALUE cClientSocket; +static VALUE cKgio_Socket; +static VALUE mSocketMethods; +static VALUE iv_kgio_addr; + +#if defined(__linux__) +static int accept4_flags = SOCK_CLOEXEC; +#else /* ! linux */ +static int accept4_flags = SOCK_CLOEXEC | SOCK_NONBLOCK; +#endif /* ! linux */ + +struct accept_args { + int fd; + struct sockaddr *addr; + socklen_t *addrlen; +}; + +static VALUE set_accepted(VALUE klass, VALUE aclass) +{ + VALUE tmp; + + if (NIL_P(aclass)) + aclass = cKgio_Socket; + + tmp = rb_funcall(aclass, rb_intern("included_modules"), 0, 0); + tmp = rb_funcall(tmp, rb_intern("include?"), 1, mSocketMethods); + + if (tmp != Qtrue) + rb_raise(rb_eTypeError, + "class must include Kgio::SocketMethods"); + + cClientSocket = aclass; + + return aclass; +} + +static VALUE get_accepted(VALUE klass) +{ + return cClientSocket; +} + +static VALUE xaccept(void *ptr) +{ + struct accept_args *a = ptr; + + return (VALUE)accept4(a->fd, a->addr, a->addrlen, accept4_flags); +} + +#ifdef HAVE_RB_THREAD_BLOCKING_REGION +# include +/* + * Try to use a (real) blocking accept() since that can prevent + * thundering herds under Linux: + * http://www.citi.umich.edu/projects/linux-scalability/reports/accept.html + * + * So we periodically disable non-blocking, but not too frequently + * because other processes may set non-blocking (especially during + * a process upgrade) with Rainbows! concurrency model changes. + */ +static int thread_accept(struct accept_args *a, int force_nonblock) +{ + if (force_nonblock) + set_nonblocking(a->fd); + return (int)rb_thread_blocking_region(xaccept, a, RUBY_UBF_IO, 0); +} + +static void set_blocking_or_block(int fd) +{ + static time_t last_set_blocking; + time_t now = time(NULL); + + if (last_set_blocking == 0) { + last_set_blocking = now; + (void)rb_io_wait_readable(fd); + } else if ((now - last_set_blocking) <= 5) { + (void)rb_io_wait_readable(fd); + } else { + int flags = fcntl(fd, F_GETFL); + if (flags == -1) + rb_sys_fail("fcntl(F_GETFL)"); + if (flags & O_NONBLOCK) { + flags = fcntl(fd, F_SETFL, flags & ~O_NONBLOCK); + if (flags == -1) + rb_sys_fail("fcntl(F_SETFL)"); + } + last_set_blocking = now; + } +} +#else /* ! HAVE_RB_THREAD_BLOCKING_REGION */ +# include +static int thread_accept(struct accept_args *a, int force_nonblock) +{ + int rv; + + /* always use non-blocking accept() under 1.8 for green threads */ + set_nonblocking(a->fd); + TRAP_BEG; + rv = (int)xaccept(a); + TRAP_END; + return rv; +} +#define set_blocking_or_block(fd) (void)rb_io_wait_readable(fd) +#endif /* ! HAVE_RB_THREAD_BLOCKING_REGION */ + +static VALUE +my_accept(VALUE io, struct sockaddr *addr, socklen_t *addrlen, int nonblock) +{ + int client; + struct accept_args a; + + a.fd = my_fileno(io); + a.addr = addr; + a.addrlen = addrlen; +retry: + client = thread_accept(&a, nonblock); + if (client == -1) { + switch (errno) { + case EAGAIN: + if (nonblock) + return Qnil; + set_blocking_or_block(a.fd); +#ifdef ECONNABORTED + case ECONNABORTED: +#endif /* ECONNABORTED */ +#ifdef EPROTO + case EPROTO: +#endif /* EPROTO */ + case EINTR: + goto retry; + case ENOMEM: + case EMFILE: + case ENFILE: +#ifdef ENOBUFS + case ENOBUFS: +#endif /* ENOBUFS */ + errno = 0; + rb_gc(); + client = thread_accept(&a, nonblock); + } + if (client == -1) { + if (errno == EINTR) + goto retry; + rb_sys_fail("accept"); + } + } + return sock_for_fd(cClientSocket, client); +} + +static void in_addr_set(VALUE io, struct sockaddr_in *addr) +{ + VALUE host = rb_str_new(0, INET_ADDRSTRLEN); + socklen_t addrlen = (socklen_t)INET_ADDRSTRLEN; + const char *name; + + name = inet_ntop(AF_INET, &addr->sin_addr, RSTRING_PTR(host), addrlen); + if (name == NULL) + rb_sys_fail("inet_ntop"); + rb_str_set_len(host, strlen(name)); + rb_ivar_set(io, iv_kgio_addr, host); +} + +/* + * call-seq: + * + * server = Kgio::TCPServer.new('0.0.0.0', 80) + * server.kgio_tryaccept -> Kgio::Socket or nil + * + * Initiates a non-blocking accept and returns a generic Kgio::Socket + * object with the kgio_addr attribute set to the IP address of the + * connected client on success. + * + * Returns nil on EAGAIN, and raises on other errors. + */ +static VALUE tcp_tryaccept(VALUE io) +{ + struct sockaddr_in addr; + socklen_t addrlen = sizeof(struct sockaddr_in); + VALUE rv = my_accept(io, (struct sockaddr *)&addr, &addrlen, 1); + + if (!NIL_P(rv)) + in_addr_set(rv, &addr); + return rv; +} + +/* + * call-seq: + * + * server = Kgio::TCPServer.new('0.0.0.0', 80) + * server.kgio_accept -> Kgio::Socket or nil + * + * Initiates a blocking accept and returns a generic Kgio::Socket + * object with the kgio_addr attribute set to the IP address of + * the client on success. + * + * On Ruby implementations using native threads, this can use a blocking + * accept(2) (or accept4(2)) system call to avoid thundering herds. + */ +static VALUE tcp_accept(VALUE io) +{ + struct sockaddr_in addr; + socklen_t addrlen = sizeof(struct sockaddr_in); + VALUE rv = my_accept(io, (struct sockaddr *)&addr, &addrlen, 0); + + in_addr_set(rv, &addr); + return rv; +} + +/* + * call-seq: + * + * server = Kgio::UNIXServer.new("/path/to/unix/socket") + * server.kgio_tryaccept -> Kgio::Socket or nil + * + * Initiates a non-blocking accept and returns a generic Kgio::Socket + * object with the kgio_addr attribute set (to the value of + * Kgio::LOCALHOST) on success. + * + * Returns nil on EAGAIN, and raises on other errors. + */ +static VALUE unix_tryaccept(VALUE io) +{ + VALUE rv = my_accept(io, NULL, NULL, 1); + + if (!NIL_P(rv)) + rb_ivar_set(rv, iv_kgio_addr, localhost); + return rv; +} + +/* + * call-seq: + * + * server = Kgio::UNIXServer.new("/path/to/unix/socket") + * server.kgio_accept -> Kgio::Socket or nil + * + * Initiates a blocking accept and returns a generic Kgio::Socket + * object with the kgio_addr attribute set (to the value of + * Kgio::LOCALHOST) on success. + * + * On Ruby implementations using native threads, this can use a blocking + * accept(2) (or accept4(2)) system call to avoid thundering herds. + */ +static VALUE unix_accept(VALUE io) +{ + VALUE rv = my_accept(io, NULL, NULL, 0); + + rb_ivar_set(rv, iv_kgio_addr, localhost); + return rv; +} + +/* + * call-seq: + * + * Kgio.accept_cloexec? -> true or false + * + * Returns true if newly accepted Kgio::Sockets are created with the + * FD_CLOEXEC file descriptor flag, false if not. + */ +static VALUE get_cloexec(VALUE mod) +{ + return (accept4_flags & SOCK_CLOEXEC) == SOCK_CLOEXEC ? Qtrue : Qfalse; +} + +/* + * + * call-seq: + * + * Kgio.accept_nonblock? -> true or false + * + * Returns true if newly accepted Kgio::Sockets are created with the + * O_NONBLOCK file status flag, false if not. + */ +static VALUE get_nonblock(VALUE mod) +{ + return (accept4_flags & SOCK_NONBLOCK)==SOCK_NONBLOCK ? Qtrue : Qfalse; +} + +/* + * call-seq: + * + * Kgio.accept_cloexec = true + * Kgio.accept_clocexec = false + * + * Sets whether or not Kgio::Socket objects created by + * TCPServer#kgio_accept, + * TCPServer#kgio_tryaccept, + * UNIXServer#kgio_accept, + * and UNIXServer#kgio_tryaccept + * are created with the FD_CLOEXEC file descriptor flag. + * + * This is on by default, as there is little reason to deal to enable + * it for client sockets on a socket server. + */ +static VALUE set_cloexec(VALUE mod, VALUE boolean) +{ + switch (TYPE(boolean)) { + case T_TRUE: + accept4_flags |= SOCK_CLOEXEC; + return boolean; + case T_FALSE: + accept4_flags &= ~SOCK_CLOEXEC; + return boolean; + } + rb_raise(rb_eTypeError, "not true or false"); + return Qnil; +} + +/* + * call-seq: + * + * Kgio.accept_nonblock = true + * Kgio.accept_nonblock = false + * + * Sets whether or not Kgio::Socket objects created by + * TCPServer#kgio_accept, + * TCPServer#kgio_tryaccept, + * UNIXServer#kgio_accept, + * and UNIXServer#kgio_tryaccept + * are created with the O_NONBLOCK file status flag. + * + * This defaults to +false+ for GNU/Linux where MSG_DONTWAIT is + * available (and on newer GNU/Linux, accept4() may also set + * the non-blocking flag. This defaults to +true+ on non-GNU/Linux + * systems. + */ +static VALUE set_nonblock(VALUE mod, VALUE boolean) +{ + switch (TYPE(boolean)) { + case T_TRUE: + accept4_flags |= SOCK_NONBLOCK; + return boolean; + case T_FALSE: + accept4_flags &= ~SOCK_NONBLOCK; + return boolean; + } + rb_raise(rb_eTypeError, "not true or false"); + return Qnil; +} + +void init_kgio_accept(VALUE mKgio) +{ + VALUE cUNIXServer, cTCPServer; + + localhost = rb_const_get(mKgio, rb_intern("LOCALHOST")); + cKgio_Socket = rb_const_get(mKgio, rb_intern("Socket")); + cClientSocket = cKgio_Socket; + mSocketMethods = rb_const_get(mKgio, rb_intern("SocketMethods")); + + rb_define_singleton_method(mKgio, "accept_cloexec?", get_cloexec, 0); + rb_define_singleton_method(mKgio, "accept_cloexec=", set_cloexec, 1); + rb_define_singleton_method(mKgio, "accept_nonblock?", get_nonblock, 0); + rb_define_singleton_method(mKgio, "accept_nonblock=", set_nonblock, 1); + rb_define_singleton_method(mKgio, "accept_class=", set_accepted, 1); + rb_define_singleton_method(mKgio, "accept_class", get_accepted, 0); + + cUNIXServer = rb_const_get(rb_cObject, rb_intern("UNIXServer")); + cUNIXServer = rb_define_class_under(mKgio, "UNIXServer", cUNIXServer); + rb_define_method(cUNIXServer, "kgio_tryaccept", unix_tryaccept, 0); + rb_define_method(cUNIXServer, "kgio_accept", unix_accept, 0); + + cTCPServer = rb_const_get(rb_cObject, rb_intern("TCPServer")); + cTCPServer = rb_define_class_under(mKgio, "TCPServer", cTCPServer); + rb_define_method(cTCPServer, "kgio_tryaccept", tcp_tryaccept, 0); + rb_define_method(cTCPServer, "kgio_accept", tcp_accept, 0); + init_sock_for_fd(); + iv_kgio_addr = rb_intern("@kgio_addr"); +} diff --git a/ext/kgio/connect.c b/ext/kgio/connect.c new file mode 100644 index 0000000..17c77bb --- /dev/null +++ b/ext/kgio/connect.c @@ -0,0 +1,256 @@ +#include "kgio.h" +#include "sock_for_fd.h" + +static void close_fail(int fd, const char *msg) +{ + int saved_errno = errno; + (void)close(fd); + errno = saved_errno; + rb_sys_fail(msg); +} + +#ifdef SOCK_NONBLOCK +# define MY_SOCK_STREAM (SOCK_STREAM|SOCK_NONBLOCK) +#else +# define MY_SOCK_STREAM SOCK_STREAM +#endif /* ! SOCK_NONBLOCK */ + +static VALUE +my_connect(VALUE klass, int io_wait, int domain, void *addr, socklen_t addrlen) +{ + int fd = socket(domain, MY_SOCK_STREAM, 0); + + if (fd == -1) { + switch (errno) { + case EMFILE: + case ENFILE: +#ifdef ENOBUFS + case ENOBUFS: +#endif /* ENOBUFS */ + errno = 0; + rb_gc(); + fd = socket(domain, MY_SOCK_STREAM, 0); + } + if (fd == -1) + rb_sys_fail("socket"); + } + +#ifndef SOCK_NONBLOCK + if (fcntl(fd, F_SETFL, O_RDWR | O_NONBLOCK) == -1) + close_fail(fd, "fcntl(F_SETFL, O_RDWR | O_NONBLOCK)"); +#endif /* SOCK_NONBLOCK */ + + if (connect(fd, addr, addrlen) == -1) { + if (errno == EINPROGRESS) { + VALUE io = sock_for_fd(klass, fd); + + if (io_wait) { + errno = EAGAIN; + kgio_wait_writable(io, fd); + } + return io; + } + close_fail(fd, "connect"); + } + return sock_for_fd(klass, fd); +} + +static VALUE tcp_connect(VALUE klass, VALUE ip, VALUE port, int io_wait) +{ + struct sockaddr_in addr = { 0 }; + + addr.sin_family = AF_INET; + addr.sin_port = htons((unsigned short)NUM2INT(port)); + + switch (inet_pton(AF_INET, StringValuePtr(ip), &addr.sin_addr)) { + case 1: + return my_connect(klass, io_wait, PF_INET, &addr, sizeof(addr)); + case -1: + rb_sys_fail("inet_pton"); + } + rb_raise(rb_eArgError, "invalid address: %s", StringValuePtr(ip)); + + return Qnil; +} + +/* + * call-seq: + * + * Kgio::TCPSocket.new('127.0.0.1', 80) -> socket + * + * Creates a new Kgio::TCPSocket object and initiates a + * non-blocking connection. + * + * This may block and call any method assigned to Kgio.wait_writable. + * + * Unlike the TCPSocket.new in Ruby, this does NOT perform DNS + * lookups (which is subject to a different set of timeouts and + * best handled elsewhere). + */ +static VALUE kgio_tcp_connect(VALUE klass, VALUE ip, VALUE port) +{ + return tcp_connect(klass, ip, port, 1); +} + +/* + * call-seq: + * + * Kgio::TCPSocket.start('127.0.0.1', 80) -> socket + * + * Creates a new Kgio::TCPSocket object and initiates a + * non-blocking connection. The caller should select/poll + * on the socket for writability before attempting to write + * or optimistically attempt a write and handle Kgio::WaitWritable + * or Errno::EAGAIN. + * + * Unlike the TCPSocket.new in Ruby, this does NOT perform DNS + * lookups (which is subject to a different set of timeouts and + * best handled elsewhere). + */ +static VALUE kgio_tcp_start(VALUE klass, VALUE ip, VALUE port) +{ + return tcp_connect(klass, ip, port, 0); +} + +static VALUE unix_connect(VALUE klass, VALUE path, int io_wait) +{ + struct sockaddr_un addr = { 0 }; + long len; + + StringValue(path); + len = RSTRING_LEN(path); + if (sizeof(addr.sun_path) <= len) + rb_raise(rb_eArgError, + "too long unix socket path (max: %dbytes)", + (int)sizeof(addr.sun_path)-1); + + memcpy(addr.sun_path, RSTRING_PTR(path), len); + addr.sun_family = AF_UNIX; + + return my_connect(klass, io_wait, PF_UNIX, &addr, sizeof(addr)); +} + +/* + * call-seq: + * + * Kgio::UNIXSocket.new("/path/to/unix/socket") -> socket + * + * Creates a new Kgio::UNIXSocket object and initiates a + * non-blocking connection. + * + * This may block and call any method assigned to Kgio.wait_writable. + */ +static VALUE kgio_unix_connect(VALUE klass, VALUE path) +{ + return unix_connect(klass, path, 1); +} + +/* + * call-seq: + * + * Kgio::UNIXSocket.start("/path/to/unix/socket") -> socket + * + * Creates a new Kgio::UNIXSocket object and initiates a + * non-blocking connection. The caller should select/poll + * on the socket for writability before attempting to write + * or optimistically attempt a write and handle Kgio::WaitWritable + * or Errno::EAGAIN. + */ +static VALUE kgio_unix_start(VALUE klass, VALUE path) +{ + return unix_connect(klass, path, 0); +} + +static VALUE stream_connect(VALUE klass, VALUE addr, int io_wait) +{ + int domain; + socklen_t addrlen; + struct sockaddr *sockaddr; + + if (TYPE(addr) == T_STRING) { + sockaddr = (struct sockaddr *)(RSTRING_PTR(addr)); + addrlen = (socklen_t)RSTRING_LEN(addr); + } else { + rb_raise(rb_eTypeError, "invalid address"); + } + switch (((struct sockaddr_in *)(sockaddr))->sin_family) { + case AF_UNIX: domain = PF_UNIX; break; + case AF_INET: domain = PF_INET; break; +#ifdef AF_INET6 /* IPv6 support incomplete */ + case AF_INET6: domain = PF_INET6; break; +#endif /* AF_INET6 */ + default: + rb_raise(rb_eArgError, "invalid address family"); + } + + return my_connect(klass, io_wait, domain, sockaddr, addrlen); +} + +/* call-seq: + * + * addr = Socket.pack_sockaddr_in(80, 'example.com') + * Kgio::Socket.connect(addr) -> socket + * + * addr = Socket.pack_sockaddr_un("/path/to/unix/socket") + * Kgio::Socket.connect(addr) -> socket + * + * Creates a generic Kgio::Socket object and initiates a + * non-blocking connection. + * + * This may block and call any method assigned to Kgio.wait_writable. + */ +static VALUE kgio_connect(VALUE klass, VALUE addr) +{ + return stream_connect(klass, addr, 1); +} + +/* call-seq: + * + * addr = Socket.pack_sockaddr_in(80, 'example.com') + * Kgio::Socket.start(addr) -> socket + * + * addr = Socket.pack_sockaddr_un("/path/to/unix/socket") + * Kgio::Socket.start(addr) -> socket + * + * Creates a generic Kgio::Socket object and initiates a + * non-blocking connection. The caller should select/poll + * on the socket for writability before attempting to write + * or optimistically attempt a write and handle Kgio::WaitWritable + * or Errno::EAGAIN. + */ +static VALUE kgio_start(VALUE klass, VALUE addr) +{ + return stream_connect(klass, addr, 0); +} + +void init_kgio_connect(VALUE mKgio) +{ + VALUE cSocket = rb_const_get(rb_cObject, rb_intern("Socket")); + VALUE mSocketMethods = rb_const_get(mKgio, rb_intern("SocketMethods")); + VALUE cKgio_Socket, cTCPSocket, cUNIXSocket; + + /* + * Document-class: Kgio::Socket + * + * A generic socket class with Kgio::SocketMethods included. + * This is returned by all Kgio methods that accept(2) a connected + * stream socket. + */ + cKgio_Socket = rb_define_class_under(mKgio, "Socket", cSocket); + rb_include_module(cKgio_Socket, mSocketMethods); + rb_define_singleton_method(cKgio_Socket, "new", kgio_connect, 1); + rb_define_singleton_method(cKgio_Socket, "start", kgio_start, 1); + + cTCPSocket = rb_const_get(rb_cObject, rb_intern("TCPSocket")); + cTCPSocket = rb_define_class_under(mKgio, "TCPSocket", cTCPSocket); + rb_include_module(cTCPSocket, mSocketMethods); + rb_define_singleton_method(cTCPSocket, "new", kgio_tcp_connect, 2); + rb_define_singleton_method(cTCPSocket, "start", kgio_tcp_start, 2); + + cUNIXSocket = rb_const_get(rb_cObject, rb_intern("UNIXSocket")); + cUNIXSocket = rb_define_class_under(mKgio, "UNIXSocket", cUNIXSocket); + rb_include_module(cUNIXSocket, mSocketMethods); + rb_define_singleton_method(cUNIXSocket, "new", kgio_unix_connect, 1); + rb_define_singleton_method(cUNIXSocket, "start", kgio_unix_start, 1); + init_sock_for_fd(); +} diff --git a/ext/kgio/kgio.h b/ext/kgio/kgio.h new file mode 100644 index 0000000..83a247c --- /dev/null +++ b/ext/kgio/kgio.h @@ -0,0 +1,40 @@ +#ifndef KGIO_H +#define KGIO_H + +#include +#ifdef HAVE_RUBY_IO_H +# include +#else +# include +#endif +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "missing/ancient_ruby.h" +#include "nonblock.h" +#include "my_fileno.h" + +struct io_args { + VALUE io; + VALUE buf; + char *ptr; + long len; + int fd; +}; + +void init_kgio_wait(VALUE mKgio); +void init_kgio_read_write(VALUE mKgio); +void init_kgio_accept(VALUE mKgio); +void init_kgio_connect(VALUE mKgio); + +void kgio_wait_writable(VALUE io, int fd); +void kgio_wait_readable(VALUE io, int fd); + +#endif /* KGIO_H */ diff --git a/ext/kgio/kgio_ext.c b/ext/kgio/kgio_ext.c index c8a9611..12f1c6a 100644 --- a/ext/kgio/kgio_ext.c +++ b/ext/kgio/kgio_ext.c @@ -1,1095 +1,11 @@ -#include -#ifdef HAVE_RUBY_IO_H -# include -#else -# include -#endif -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "missing/accept4.h" -#include "missing/ancient_ruby.h" -#include "nonblock.h" -#include "my_fileno.h" -#include "sock_for_fd.h" - -#if defined(__linux__) -/* - * we know MSG_DONTWAIT works properly on all stream sockets under Linux - * we can define this macro for other platforms as people care and - * notice. - */ -# define USE_MSG_DONTWAIT -static int accept4_flags = A4_SOCK_CLOEXEC; -#else /* ! linux */ -static int accept4_flags = A4_SOCK_CLOEXEC | A4_SOCK_NONBLOCK; -#endif /* ! linux */ - -static VALUE cClientSocket; -static VALUE mSocketMethods; -static VALUE cSocket; -static VALUE localhost; -static VALUE mKgio_WaitReadable, mKgio_WaitWritable; -static ID io_wait_rd, io_wait_wr; -static ID iv_kgio_addr; - -struct io_args { - VALUE io; - VALUE buf; - char *ptr; - long len; - int fd; -}; - -struct accept_args { - int fd; - struct sockaddr *addr; - socklen_t *addrlen; -}; - -static void wait_readable(VALUE io, int fd) -{ - if (io_wait_rd) { - (void)rb_funcall(io, io_wait_rd, 0, 0); - } else { - if (!rb_io_wait_readable(fd)) - rb_sys_fail("wait readable"); - } -} - -static void wait_writable(VALUE io, int fd) -{ - if (io_wait_wr) { - (void)rb_funcall(io, io_wait_wr, 0, 0); - } else { - if (!rb_io_wait_writable(fd)) - rb_sys_fail("wait writable"); - } -} - -static void prepare_read(struct io_args *a, int argc, VALUE *argv, VALUE io) -{ - VALUE length; - - a->io = io; - a->fd = my_fileno(io); - rb_scan_args(argc, argv, "11", &length, &a->buf); - a->len = NUM2LONG(length); - if (NIL_P(a->buf)) { - a->buf = rb_str_new(NULL, a->len); - } else { - StringValue(a->buf); - rb_str_resize(a->buf, a->len); - } - a->ptr = RSTRING_PTR(a->buf); -} - -static int read_check(struct io_args *a, long n, const char *msg, int io_wait) -{ - if (n == -1) { - if (errno == EINTR) - return -1; - rb_str_set_len(a->buf, 0); - if (errno == EAGAIN) { - if (io_wait) { - wait_readable(a->io, a->fd); - - /* buf may be modified in other thread/fiber */ - rb_str_resize(a->buf, a->len); - a->ptr = RSTRING_PTR(a->buf); - return -1; - } else { - a->buf = mKgio_WaitReadable; - return 0; - } - } - rb_sys_fail(msg); - } - rb_str_set_len(a->buf, n); - if (n == 0) - a->buf = Qnil; - return 0; -} - -static VALUE my_read(int io_wait, int argc, VALUE *argv, VALUE io) -{ - struct io_args a; - long n; - - prepare_read(&a, argc, argv, io); - set_nonblocking(a.fd); -retry: - n = (long)read(a.fd, a.ptr, a.len); - if (read_check(&a, n, "read", io_wait) != 0) - goto retry; - return a.buf; -} - -/* - * call-seq: - * - * io.kgio_read(maxlen) -> buffer - * io.kgio_read(maxlen, buffer) -> buffer - * - * Reads at most maxlen bytes from the stream socket. Returns with a - * newly allocated buffer, or may reuse an existing buffer if supplied. - * - * Calls the method assigned to Kgio.wait_readable, or blocks in a - * thread-safe manner for writability. - * - * Returns nil on EOF. - * - * This behaves like read(2) and IO#readpartial, NOT fread(3) or - * IO#read which possess read-in-full behavior. - */ -static VALUE kgio_read(int argc, VALUE *argv, VALUE io) -{ - return my_read(1, argc, argv, io); -} - -/* - * call-seq: - * - * io.kgio_tryread(maxlen) -> buffer - * io.kgio_tryread(maxlen, buffer) -> buffer - * - * Reads at most maxlen bytes from the stream socket. Returns with a - * newly allocated buffer, or may reuse an existing buffer if supplied. - * - * Returns nil on EOF. - * - * Returns Kgio::WaitReadable if EAGAIN is encountered. - */ -static VALUE kgio_tryread(int argc, VALUE *argv, VALUE io) -{ - return my_read(0, argc, argv, io); -} - -#ifdef USE_MSG_DONTWAIT -static VALUE my_recv(int io_wait, int argc, VALUE *argv, VALUE io) -{ - struct io_args a; - long n; - - prepare_read(&a, argc, argv, io); -retry: - n = (long)recv(a.fd, a.ptr, a.len, MSG_DONTWAIT); - if (read_check(&a, n, "recv", io_wait) != 0) - goto retry; - return a.buf; -} - -/* - * This method may be optimized on some systems (e.g. GNU/Linux) to use - * MSG_DONTWAIT to avoid explicitly setting the O_NONBLOCK flag via fcntl. - * Otherwise this is the same as Kgio::PipeMethods#kgio_read - */ -static VALUE kgio_recv(int argc, VALUE *argv, VALUE io) -{ - return my_recv(1, argc, argv, io); -} - -/* - * This method may be optimized on some systems (e.g. GNU/Linux) to use - * MSG_DONTWAIT to avoid explicitly setting the O_NONBLOCK flag via fcntl. - * Otherwise this is the same as Kgio::PipeMethods#kgio_tryread - */ -static VALUE kgio_tryrecv(int argc, VALUE *argv, VALUE io) -{ - return my_recv(0, argc, argv, io); -} -#else /* ! USE_MSG_DONTWAIT */ -# define kgio_recv kgio_read -# define kgio_tryrecv kgio_tryread -#endif /* USE_MSG_DONTWAIT */ - -static void prepare_write(struct io_args *a, VALUE io, VALUE str) -{ - a->buf = (TYPE(str) == T_STRING) ? str : rb_obj_as_string(str); - a->ptr = RSTRING_PTR(a->buf); - a->len = RSTRING_LEN(a->buf); - a->io = io; - a->fd = my_fileno(io); -} - -static int write_check(struct io_args *a, long n, const char *msg, int io_wait) -{ - if (a->len == n) { -done: - a->buf = Qnil; - } else if (n == -1) { - if (errno == EINTR) - return -1; - if (errno == EAGAIN) { - long written = RSTRING_LEN(a->buf) - a->len; - - if (io_wait) { - wait_writable(a->io, a->fd); - - /* buf may be modified in other thread/fiber */ - a->len = RSTRING_LEN(a->buf) - written; - if (a->len <= 0) - goto done; - a->ptr = RSTRING_PTR(a->buf) + written; - return -1; - } else if (written > 0) { - a->buf = rb_str_new(a->ptr + n, a->len - n); - } else { - a->buf = mKgio_WaitWritable; - } - return 0; - } - rb_sys_fail(msg); - } else { - assert(n >= 0 && n < a->len && "write/send syscall broken?"); - a->ptr += n; - a->len -= n; - return -1; - } - return 0; -} - -static VALUE my_write(VALUE io, VALUE str, int io_wait) -{ - struct io_args a; - long n; - - prepare_write(&a, io, str); - set_nonblocking(a.fd); -retry: - n = (long)write(a.fd, a.ptr, a.len); - if (write_check(&a, n, "write", io_wait) != 0) - goto retry; - return a.buf; -} - -/* - * call-seq: - * - * io.kgio_write(str) -> nil - * - * Returns nil when the write completes. - * - * Calls the method Kgio.wait_writable if it is set. Otherwise this - * blocks in a thread-safe manner until all data is written or a - * fatal error occurs. - */ -static VALUE kgio_write(VALUE io, VALUE str) -{ - return my_write(io, str, 1); -} - -/* - * call-seq: - * - * io.kgio_trywrite(str) -> nil or Kgio::WaitWritable - * - * Returns nil if the write was completed in full. - * - * Returns a String containing the unwritten portion if EAGAIN - * was encountered, but some portion was successfully written. - * - * Returns Kgio::WaitWritable if EAGAIN is encountered and nothing - * was written. - */ -static VALUE kgio_trywrite(VALUE io, VALUE str) -{ - return my_write(io, str, 0); -} - -#ifdef USE_MSG_DONTWAIT -/* - * This method behaves like Kgio::PipeMethods#kgio_write, except - * it will use send(2) with the MSG_DONTWAIT flag on sockets to - * avoid unnecessary calls to fcntl(2). - */ -static VALUE my_send(VALUE io, VALUE str, int io_wait) -{ - struct io_args a; - long n; - - prepare_write(&a, io, str); -retry: - n = (long)send(a.fd, a.ptr, a.len, MSG_DONTWAIT); - if (write_check(&a, n, "send", io_wait) != 0) - goto retry; - return a.buf; -} - -/* - * This method may be optimized on some systems (e.g. GNU/Linux) to use - * MSG_DONTWAIT to avoid explicitly setting the O_NONBLOCK flag via fcntl. - * Otherwise this is the same as Kgio::PipeMethods#kgio_write - */ -static VALUE kgio_send(VALUE io, VALUE str) -{ - return my_send(io, str, 1); -} - -/* - * This method may be optimized on some systems (e.g. GNU/Linux) to use - * MSG_DONTWAIT to avoid explicitly setting the O_NONBLOCK flag via fcntl. - * Otherwise this is the same as Kgio::PipeMethods#kgio_trywrite - */ -static VALUE kgio_trysend(VALUE io, VALUE str) -{ - return my_send(io, str, 0); -} -#else /* ! USE_MSG_DONTWAIT */ -# define kgio_send kgio_write -# define kgio_trysend kgio_trywrite -#endif /* ! USE_MSG_DONTWAIT */ - -/* - * call-seq: - * - * Kgio.wait_readable = :method_name - * Kgio.wait_readable = nil - * - * Sets a method for kgio_read to call when a read would block. - * This is useful for non-blocking frameworks that use Fibers, - * as the method referred to this may cause the current Fiber - * to yield execution. - * - * A special value of nil will cause Ruby to wait using the - * rb_io_wait_readable() function. - */ -static VALUE set_wait_rd(VALUE mod, VALUE sym) -{ - switch (TYPE(sym)) { - case T_SYMBOL: - io_wait_rd = SYM2ID(sym); - return sym; - case T_NIL: - io_wait_rd = 0; - return sym; - } - rb_raise(rb_eTypeError, "must be a symbol or nil"); - return sym; -} - -/* - * call-seq: - * - * Kgio.wait_writable = :method_name - * Kgio.wait_writable = nil - * - * Sets a method for kgio_write to call when a read would block. - * This is useful for non-blocking frameworks that use Fibers, - * as the method referred to this may cause the current Fiber - * to yield execution. - * - * A special value of nil will cause Ruby to wait using the - * rb_io_wait_writable() function. - */ -static VALUE set_wait_wr(VALUE mod, VALUE sym) -{ - switch (TYPE(sym)) { - case T_SYMBOL: - io_wait_wr = SYM2ID(sym); - return sym; - case T_NIL: - io_wait_wr = 0; - return sym; - } - rb_raise(rb_eTypeError, "must be a symbol or nil"); - return sym; -} - -/* - * call-seq: - * - * Kgio.wait_writable -> Symbol or nil - * - * Returns the symbolic method name of the method assigned to - * call when EAGAIN is occurs on a Kgio::PipeMethods#kgio_write - * or Kgio::SocketMethods#kgio_write call - */ -static VALUE wait_wr(VALUE mod) -{ - return io_wait_wr ? ID2SYM(io_wait_wr) : Qnil; -} - -/* - * call-seq: - * - * Kgio.wait_readable -> Symbol or nil - * - * Returns the symbolic method name of the method assigned to - * call when EAGAIN is occurs on a Kgio::PipeMethods#kgio_read - * or Kgio::SocketMethods#kgio_read call. - */ -static VALUE wait_rd(VALUE mod) -{ - return io_wait_rd ? ID2SYM(io_wait_rd) : Qnil; -} - -static VALUE xaccept(void *ptr) -{ - struct accept_args *a = ptr; - - return (VALUE)accept4(a->fd, a->addr, a->addrlen, accept4_flags); -} - -#ifdef HAVE_RB_THREAD_BLOCKING_REGION -# include -/* - * Try to use a (real) blocking accept() since that can prevent - * thundering herds under Linux: - * http://www.citi.umich.edu/projects/linux-scalability/reports/accept.html - * - * So we periodically disable non-blocking, but not too frequently - * because other processes may set non-blocking (especially during - * a process upgrade) with Rainbows! concurrency model changes. - */ -static int thread_accept(struct accept_args *a, int force_nonblock) -{ - if (force_nonblock) - set_nonblocking(a->fd); - return (int)rb_thread_blocking_region(xaccept, a, RUBY_UBF_IO, 0); -} - -static void set_blocking_or_block(int fd) -{ - static time_t last_set_blocking; - time_t now = time(NULL); - - if (last_set_blocking == 0) { - last_set_blocking = now; - (void)rb_io_wait_readable(fd); - } else if ((now - last_set_blocking) <= 5) { - (void)rb_io_wait_readable(fd); - } else { - int flags = fcntl(fd, F_GETFL); - if (flags == -1) - rb_sys_fail("fcntl(F_GETFL)"); - if (flags & O_NONBLOCK) { - flags = fcntl(fd, F_SETFL, flags & ~O_NONBLOCK); - if (flags == -1) - rb_sys_fail("fcntl(F_SETFL)"); - } - last_set_blocking = now; - } -} -#else /* ! HAVE_RB_THREAD_BLOCKING_REGION */ -# include -static int thread_accept(struct accept_args *a, int force_nonblock) -{ - int rv; - - /* always use non-blocking accept() under 1.8 for green threads */ - set_nonblocking(a->fd); - TRAP_BEG; - rv = (int)xaccept(a); - TRAP_END; - return rv; -} -#define set_blocking_or_block(fd) (void)rb_io_wait_readable(fd) -#endif /* ! HAVE_RB_THREAD_BLOCKING_REGION */ - -static VALUE -my_accept(VALUE io, struct sockaddr *addr, socklen_t *addrlen, int nonblock) -{ - int client; - struct accept_args a; - - a.fd = my_fileno(io); - a.addr = addr; - a.addrlen = addrlen; -retry: - client = thread_accept(&a, nonblock); - if (client == -1) { - switch (errno) { - case EAGAIN: - if (nonblock) - return Qnil; - set_blocking_or_block(a.fd); -#ifdef ECONNABORTED - case ECONNABORTED: -#endif /* ECONNABORTED */ -#ifdef EPROTO - case EPROTO: -#endif /* EPROTO */ - case EINTR: - goto retry; - case ENOMEM: - case EMFILE: - case ENFILE: -#ifdef ENOBUFS - case ENOBUFS: -#endif /* ENOBUFS */ - errno = 0; - rb_gc(); - client = thread_accept(&a, nonblock); - } - if (client == -1) { - if (errno == EINTR) - goto retry; - rb_sys_fail("accept"); - } - } - return sock_for_fd(cClientSocket, client); -} - -static void in_addr_set(VALUE io, struct sockaddr_in *addr) -{ - VALUE host = rb_str_new(0, INET_ADDRSTRLEN); - socklen_t addrlen = (socklen_t)INET_ADDRSTRLEN; - const char *name; - - name = inet_ntop(AF_INET, &addr->sin_addr, RSTRING_PTR(host), addrlen); - if (name == NULL) - rb_sys_fail("inet_ntop"); - rb_str_set_len(host, strlen(name)); - rb_ivar_set(io, iv_kgio_addr, host); -} - -/* - * call-seq: - * - * server = Kgio::TCPServer.new('0.0.0.0', 80) - * server.kgio_tryaccept -> Kgio::Socket or nil - * - * Initiates a non-blocking accept and returns a generic Kgio::Socket - * object with the kgio_addr attribute set to the IP address of the - * connected client on success. - * - * Returns nil on EAGAIN, and raises on other errors. - */ -static VALUE tcp_tryaccept(VALUE io) -{ - struct sockaddr_in addr; - socklen_t addrlen = sizeof(struct sockaddr_in); - VALUE rv = my_accept(io, (struct sockaddr *)&addr, &addrlen, 1); - - if (!NIL_P(rv)) - in_addr_set(rv, &addr); - return rv; -} - -/* - * call-seq: - * - * server = Kgio::TCPServer.new('0.0.0.0', 80) - * server.kgio_accept -> Kgio::Socket or nil - * - * Initiates a blocking accept and returns a generic Kgio::Socket - * object with the kgio_addr attribute set to the IP address of - * the client on success. - * - * On Ruby implementations using native threads, this can use a blocking - * accept(2) (or accept4(2)) system call to avoid thundering herds. - */ -static VALUE tcp_accept(VALUE io) -{ - struct sockaddr_in addr; - socklen_t addrlen = sizeof(struct sockaddr_in); - VALUE rv = my_accept(io, (struct sockaddr *)&addr, &addrlen, 0); - - in_addr_set(rv, &addr); - return rv; -} - -/* - * call-seq: - * - * server = Kgio::UNIXServer.new("/path/to/unix/socket") - * server.kgio_tryaccept -> Kgio::Socket or nil - * - * Initiates a non-blocking accept and returns a generic Kgio::Socket - * object with the kgio_addr attribute set (to the value of - * Kgio::LOCALHOST) on success. - * - * Returns nil on EAGAIN, and raises on other errors. - */ -static VALUE unix_tryaccept(VALUE io) -{ - VALUE rv = my_accept(io, NULL, NULL, 1); - - if (!NIL_P(rv)) - rb_ivar_set(rv, iv_kgio_addr, localhost); - return rv; -} - -/* - * call-seq: - * - * server = Kgio::UNIXServer.new("/path/to/unix/socket") - * server.kgio_accept -> Kgio::Socket or nil - * - * Initiates a blocking accept and returns a generic Kgio::Socket - * object with the kgio_addr attribute set (to the value of - * Kgio::LOCALHOST) on success. - * - * On Ruby implementations using native threads, this can use a blocking - * accept(2) (or accept4(2)) system call to avoid thundering herds. - */ -static VALUE unix_accept(VALUE io) -{ - VALUE rv = my_accept(io, NULL, NULL, 0); - - rb_ivar_set(rv, iv_kgio_addr, localhost); - return rv; -} - -/* - * call-seq: - * - * Kgio.accept_cloexec? -> true or false - * - * Returns true if newly accepted Kgio::Sockets are created with the - * FD_CLOEXEC file descriptor flag, false if not. - */ -static VALUE get_cloexec(VALUE mod) -{ - return (accept4_flags & A4_SOCK_CLOEXEC) == - A4_SOCK_CLOEXEC ? Qtrue : Qfalse; -} - -/* - * - * call-seq: - * - * Kgio.accept_nonblock? -> true or false - * - * Returns true if newly accepted Kgio::Sockets are created with the - * O_NONBLOCK file status flag, false if not. - */ -static VALUE get_nonblock(VALUE mod) -{ - return (accept4_flags & A4_SOCK_NONBLOCK) == - A4_SOCK_NONBLOCK ? Qtrue : Qfalse; -} - -/* - * call-seq: - * - * Kgio.accept_cloexec = true - * Kgio.accept_clocexec = false - * - * Sets whether or not Kgio::Socket objects created by - * TCPServer#kgio_accept, - * TCPServer#kgio_tryaccept, - * UNIXServer#kgio_accept, - * and UNIXServer#kgio_tryaccept - * are created with the FD_CLOEXEC file descriptor flag. - * - * This is on by default, as there is little reason to deal to enable - * it for client sockets on a socket server. - */ -static VALUE set_cloexec(VALUE mod, VALUE boolean) -{ - switch (TYPE(boolean)) { - case T_TRUE: - accept4_flags |= A4_SOCK_CLOEXEC; - return boolean; - case T_FALSE: - accept4_flags &= ~A4_SOCK_CLOEXEC; - return boolean; - } - rb_raise(rb_eTypeError, "not true or false"); - return Qnil; -} - -/* - * call-seq: - * - * Kgio.accept_nonblock = true - * Kgio.accept_nonblock = false - * - * Sets whether or not Kgio::Socket objects created by - * TCPServer#kgio_accept, - * TCPServer#kgio_tryaccept, - * UNIXServer#kgio_accept, - * and UNIXServer#kgio_tryaccept - * are created with the O_NONBLOCK file status flag. - * - * This defaults to +false+ for GNU/Linux where MSG_DONTWAIT is - * available (and on newer GNU/Linux, accept4() may also set - * the non-blocking flag. This defaults to +true+ on non-GNU/Linux - * systems. - */ -static VALUE set_nonblock(VALUE mod, VALUE boolean) -{ - switch (TYPE(boolean)) { - case T_TRUE: - accept4_flags |= A4_SOCK_NONBLOCK; - return boolean; - case T_FALSE: - accept4_flags &= ~A4_SOCK_NONBLOCK; - return boolean; - } - rb_raise(rb_eTypeError, "not true or false"); - return Qnil; -} - -static void close_fail(int fd, const char *msg) -{ - int saved_errno = errno; - (void)close(fd); - errno = saved_errno; - rb_sys_fail(msg); -} - -#ifdef SOCK_NONBLOCK -# define MY_SOCK_STREAM (SOCK_STREAM|SOCK_NONBLOCK) -#else -# define MY_SOCK_STREAM SOCK_STREAM -#endif /* ! SOCK_NONBLOCK */ - -static VALUE -my_connect(VALUE klass, int io_wait, int domain, void *addr, socklen_t addrlen) -{ - int fd = socket(domain, MY_SOCK_STREAM, 0); - - if (fd == -1) { - switch (errno) { - case EMFILE: - case ENFILE: -#ifdef ENOBUFS - case ENOBUFS: -#endif /* ENOBUFS */ - errno = 0; - rb_gc(); - fd = socket(domain, MY_SOCK_STREAM, 0); - } - if (fd == -1) - rb_sys_fail("socket"); - } - -#ifndef SOCK_NONBLOCK - if (fcntl(fd, F_SETFL, O_RDWR | O_NONBLOCK) == -1) - close_fail(fd, "fcntl(F_SETFL, O_RDWR | O_NONBLOCK)"); -#endif /* SOCK_NONBLOCK */ - - if (connect(fd, addr, addrlen) == -1) { - if (errno == EINPROGRESS) { - VALUE io = sock_for_fd(klass, fd); - - if (io_wait) { - errno = EAGAIN; - wait_writable(io, fd); - } - return io; - } - close_fail(fd, "connect"); - } - return sock_for_fd(klass, fd); -} - -static VALUE tcp_connect(VALUE klass, VALUE ip, VALUE port, int io_wait) -{ - struct sockaddr_in addr = { 0 }; - - addr.sin_family = AF_INET; - addr.sin_port = htons((unsigned short)NUM2INT(port)); - - switch (inet_pton(AF_INET, StringValuePtr(ip), &addr.sin_addr)) { - case 1: - return my_connect(klass, io_wait, PF_INET, &addr, sizeof(addr)); - case -1: - rb_sys_fail("inet_pton"); - } - rb_raise(rb_eArgError, "invalid address: %s", StringValuePtr(ip)); - - return Qnil; -} - -/* - * call-seq: - * - * Kgio::TCPSocket.new('127.0.0.1', 80) -> socket - * - * Creates a new Kgio::TCPSocket object and initiates a - * non-blocking connection. - * - * This may block and call any method assigned to Kgio.wait_writable. - * - * Unlike the TCPSocket.new in Ruby, this does NOT perform DNS - * lookups (which is subject to a different set of timeouts and - * best handled elsewhere). - */ -static VALUE kgio_tcp_connect(VALUE klass, VALUE ip, VALUE port) -{ - return tcp_connect(klass, ip, port, 1); -} - -/* - * call-seq: - * - * Kgio::TCPSocket.start('127.0.0.1', 80) -> socket - * - * Creates a new Kgio::TCPSocket object and initiates a - * non-blocking connection. The caller should select/poll - * on the socket for writability before attempting to write - * or optimistically attempt a write and handle Kgio::WaitWritable - * or Errno::EAGAIN. - * - * Unlike the TCPSocket.new in Ruby, this does NOT perform DNS - * lookups (which is subject to a different set of timeouts and - * best handled elsewhere). - */ -static VALUE kgio_tcp_start(VALUE klass, VALUE ip, VALUE port) -{ - return tcp_connect(klass, ip, port, 0); -} - -static VALUE unix_connect(VALUE klass, VALUE path, int io_wait) -{ - struct sockaddr_un addr = { 0 }; - long len; - - StringValue(path); - len = RSTRING_LEN(path); - if (sizeof(addr.sun_path) <= len) - rb_raise(rb_eArgError, - "too long unix socket path (max: %dbytes)", - (int)sizeof(addr.sun_path)-1); - - memcpy(addr.sun_path, RSTRING_PTR(path), len); - addr.sun_family = AF_UNIX; - - return my_connect(klass, io_wait, PF_UNIX, &addr, sizeof(addr)); -} - -/* - * call-seq: - * - * Kgio::UNIXSocket.new("/path/to/unix/socket") -> socket - * - * Creates a new Kgio::UNIXSocket object and initiates a - * non-blocking connection. - * - * This may block and call any method assigned to Kgio.wait_writable. - */ -static VALUE kgio_unix_connect(VALUE klass, VALUE path) -{ - return unix_connect(klass, path, 1); -} - -/* - * call-seq: - * - * Kgio::UNIXSocket.start("/path/to/unix/socket") -> socket - * - * Creates a new Kgio::UNIXSocket object and initiates a - * non-blocking connection. The caller should select/poll - * on the socket for writability before attempting to write - * or optimistically attempt a write and handle Kgio::WaitWritable - * or Errno::EAGAIN. - */ -static VALUE kgio_unix_start(VALUE klass, VALUE path) -{ - return unix_connect(klass, path, 0); -} - -static VALUE stream_connect(VALUE klass, VALUE addr, int io_wait) -{ - int domain; - socklen_t addrlen; - struct sockaddr *sockaddr; - - if (TYPE(addr) == T_STRING) { - sockaddr = (struct sockaddr *)(RSTRING_PTR(addr)); - addrlen = (socklen_t)RSTRING_LEN(addr); - } else { - rb_raise(rb_eTypeError, "invalid address"); - } - switch (((struct sockaddr_in *)(sockaddr))->sin_family) { - case AF_UNIX: domain = PF_UNIX; break; - case AF_INET: domain = PF_INET; break; -#ifdef AF_INET6 /* IPv6 support incomplete */ - case AF_INET6: domain = PF_INET6; break; -#endif /* AF_INET6 */ - default: - rb_raise(rb_eArgError, "invalid address family"); - } - - return my_connect(klass, io_wait, domain, sockaddr, addrlen); -} - -/* call-seq: - * - * addr = Socket.pack_sockaddr_in(80, 'example.com') - * Kgio::Socket.connect(addr) -> socket - * - * addr = Socket.pack_sockaddr_un("/path/to/unix/socket") - * Kgio::Socket.connect(addr) -> socket - * - * Creates a generic Kgio::Socket object and initiates a - * non-blocking connection. - * - * This may block and call any method assigned to Kgio.wait_writable. - */ -static VALUE kgio_connect(VALUE klass, VALUE addr) -{ - return stream_connect(klass, addr, 1); -} - -/* call-seq: - * - * addr = Socket.pack_sockaddr_in(80, 'example.com') - * Kgio::Socket.start(addr) -> socket - * - * addr = Socket.pack_sockaddr_un("/path/to/unix/socket") - * Kgio::Socket.start(addr) -> socket - * - * Creates a generic Kgio::Socket object and initiates a - * non-blocking connection. The caller should select/poll - * on the socket for writability before attempting to write - * or optimistically attempt a write and handle Kgio::WaitWritable - * or Errno::EAGAIN. - */ -static VALUE kgio_start(VALUE klass, VALUE addr) -{ - return stream_connect(klass, addr, 0); -} - -static VALUE set_accepted(VALUE klass, VALUE aclass) -{ - VALUE tmp; - - if (NIL_P(aclass)) - aclass = cSocket; - - tmp = rb_funcall(aclass, rb_intern("included_modules"), 0, 0); - tmp = rb_funcall(tmp, rb_intern("include?"), 1, mSocketMethods); - - if (tmp != Qtrue) - rb_raise(rb_eTypeError, - "class must include Kgio::SocketMethods"); - - cClientSocket = aclass; - - return aclass; -} - -static VALUE get_accepted(VALUE klass) -{ - return cClientSocket; -} +#include "kgio.h" void Init_kgio_ext(void) { - VALUE mKgio = rb_define_module("Kgio"); - VALUE mPipeMethods; - VALUE cUNIXServer, cTCPServer, cUNIXSocket, cTCPSocket; - - rb_require("socket"); - - /* - * Document-module: Kgio::Socket - * - * A generic socket class with Kgio::SocketMethods included. - * This is returned by all Kgio methods that accept(2) a connected - * stream socket. - */ - cSocket = rb_const_get(rb_cObject, rb_intern("Socket")); - cSocket = rb_define_class_under(mKgio, "Socket", cSocket); - cClientSocket = cSocket; - - localhost = rb_str_new2("127.0.0.1"); - - /* - * The IPv4 address of UNIX domain sockets, useful for creating - * Rack (and CGI) servers that also serve HTTP traffic over - * UNIX domain sockets. - */ - rb_const_set(mKgio, rb_intern("LOCALHOST"), localhost); - - /* - * Document-module: Kgio::WaitReadable - * - * PipeMethods#kgio_tryread and SocketMethods#kgio_tryread will - * return this constant when waiting for a read is required. - */ - mKgio_WaitReadable = rb_define_module_under(mKgio, "WaitReadable"); - - /* - * Document-module: Kgio::WaitWritable - * - * PipeMethods#kgio_trywrite and SocketMethods#kgio_trywrite will - * return this constant when waiting for a read is required. - */ - mKgio_WaitWritable = rb_define_module_under(mKgio, "WaitWritable"); - - rb_define_singleton_method(mKgio, "wait_readable=", set_wait_rd, 1); - rb_define_singleton_method(mKgio, "wait_writable=", set_wait_wr, 1); - rb_define_singleton_method(mKgio, "wait_readable", wait_rd, 0); - rb_define_singleton_method(mKgio, "wait_writable", wait_wr, 0); - rb_define_singleton_method(mKgio, "accept_cloexec?", get_cloexec, 0); - rb_define_singleton_method(mKgio, "accept_cloexec=", set_cloexec, 1); - rb_define_singleton_method(mKgio, "accept_nonblock?", get_nonblock, 0); - rb_define_singleton_method(mKgio, "accept_nonblock=", set_nonblock, 1); - rb_define_singleton_method(mKgio, "accept_class=", set_accepted, 1); - rb_define_singleton_method(mKgio, "accept_class", get_accepted, 0); - - /* - * Document-module: Kgio::PipeMethods - * - * This module may be used used to create classes that respond to - * various Kgio methods for reading and writing. This is included - * in Kgio::Pipe by default. - */ - mPipeMethods = rb_define_module_under(mKgio, "PipeMethods"); - rb_define_method(mPipeMethods, "kgio_read", kgio_read, -1); - rb_define_method(mPipeMethods, "kgio_write", kgio_write, 1); - rb_define_method(mPipeMethods, "kgio_tryread", kgio_tryread, -1); - rb_define_method(mPipeMethods, "kgio_trywrite", kgio_trywrite, 1); - - /* - * Document-module: Kgio::SocketMethods - * - * This method behaves like Kgio::PipeMethods, but contains - * optimizations for sockets on certain operating systems - * (e.g. GNU/Linux). - */ - mSocketMethods = rb_define_module_under(mKgio, "SocketMethods"); - rb_define_method(mSocketMethods, "kgio_read", kgio_recv, -1); - rb_define_method(mSocketMethods, "kgio_write", kgio_send, 1); - rb_define_method(mSocketMethods, "kgio_tryread", kgio_tryrecv, -1); - rb_define_method(mSocketMethods, "kgio_trywrite", kgio_trysend, 1); - - /* - * Returns the client IPv4 address of the socket in dotted quad - * form as a string. This is always the value of the - * Kgio::LOCALHOST constant for UNIX domain sockets. - */ - rb_define_attr(mSocketMethods, "kgio_addr", 1, 1); - - rb_include_module(cSocket, mSocketMethods); - rb_define_singleton_method(cSocket, "new", kgio_connect, 1); - rb_define_singleton_method(cSocket, "start", kgio_start, 1); - - cUNIXServer = rb_const_get(rb_cObject, rb_intern("UNIXServer")); - cUNIXServer = rb_define_class_under(mKgio, "UNIXServer", cUNIXServer); - rb_define_method(cUNIXServer, "kgio_tryaccept", unix_tryaccept, 0); - rb_define_method(cUNIXServer, "kgio_accept", unix_accept, 0); - - cTCPServer = rb_const_get(rb_cObject, rb_intern("TCPServer")); - cTCPServer = rb_define_class_under(mKgio, "TCPServer", cTCPServer); - rb_define_method(cTCPServer, "kgio_tryaccept", tcp_tryaccept, 0); - rb_define_method(cTCPServer, "kgio_accept", tcp_accept, 0); - - cTCPSocket = rb_const_get(rb_cObject, rb_intern("TCPSocket")); - cTCPSocket = rb_define_class_under(mKgio, "TCPSocket", cTCPSocket); - rb_include_module(cTCPSocket, mSocketMethods); - rb_define_singleton_method(cTCPSocket, "new", kgio_tcp_connect, 2); - rb_define_singleton_method(cTCPSocket, "start", kgio_tcp_start, 2); - - cUNIXSocket = rb_const_get(rb_cObject, rb_intern("UNIXSocket")); - cUNIXSocket = rb_define_class_under(mKgio, "UNIXSocket", cUNIXSocket); - rb_include_module(cUNIXSocket, mSocketMethods); - rb_define_singleton_method(cUNIXSocket, "new", kgio_unix_connect, 1); - rb_define_singleton_method(cUNIXSocket, "start", kgio_unix_start, 1); + VALUE mKgio = rb_const_get(rb_cObject, rb_intern("Kgio")); - iv_kgio_addr = rb_intern("@kgio_addr"); - init_sock_for_fd(); + init_kgio_wait(mKgio); + init_kgio_read_write(mKgio); + init_kgio_connect(mKgio); + init_kgio_accept(mKgio); } diff --git a/ext/kgio/missing/accept4.h b/ext/kgio/missing/accept4.h index 8fc37c4..cd8be79 100644 --- a/ext/kgio/missing/accept4.h +++ b/ext/kgio/missing/accept4.h @@ -1,7 +1,4 @@ -#ifdef HAVE_ACCEPT4 -# define A4_SOCK_CLOEXEC SOCK_CLOEXEC -# define A4_SOCK_NONBLOCK SOCK_NONBLOCK -#else +#ifndef HAVE_ACCEPT4 # ifndef _GNU_SOURCE # define _GNU_SOURCE # endif @@ -9,15 +6,12 @@ # include # ifndef SOCK_CLOEXEC # if (FD_CLOEXEC == O_NONBLOCK) -# define A4_SOCK_CLOEXEC 1 -# define A4_SOCK_NONBLOCK 2 +# define SOCK_CLOEXEC 1 +# define SOCK_NONBLOCK 2 # else -# define A4_SOCK_CLOEXEC FD_CLOEXEC -# define A4_SOCK_NONBLOCK O_NONBLOCK +# define SOCK_CLOEXEC FD_CLOEXEC +# define SOCK_NONBLOCK O_NONBLOCK # endif -# else -# define A4_SOCK_CLOEXEC SOCK_CLOEXEC -# define A4_SOCK_NONBLOCK SOCK_NONBLOCK # endif /* accept4() is currently a Linux-only goodie */ @@ -27,7 +21,7 @@ accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags) int fd = accept(sockfd, addr, addrlen); if (fd >= 0) { - if ((flags & A4_SOCK_CLOEXEC) == A4_SOCK_CLOEXEC) + if ((flags & SOCK_CLOEXEC) == SOCK_CLOEXEC) (void)fcntl(fd, F_SETFD, FD_CLOEXEC); /* @@ -36,7 +30,7 @@ accept4(int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags) * Linux, so fcntl() is completely unnecessary * in most cases... */ - if ((flags & A4_SOCK_NONBLOCK) == A4_SOCK_NONBLOCK) { + if ((flags & SOCK_NONBLOCK) == SOCK_NONBLOCK) { int fl = fcntl(fd, F_GETFL); if ((fl & O_NONBLOCK) == 0) diff --git a/ext/kgio/read_write.c b/ext/kgio/read_write.c new file mode 100644 index 0000000..7f1748a --- /dev/null +++ b/ext/kgio/read_write.c @@ -0,0 +1,325 @@ +#include "kgio.h" +static VALUE mKgio_WaitReadable, mKgio_WaitWritable; + +/* + * we know MSG_DONTWAIT works properly on all stream sockets under Linux + * we can define this macro for other platforms as people care and + * notice. + */ +#if defined(__linux__) && ! defined(USE_MSG_DONTWAIT) +# define USE_MSG_DONTWAIT +#endif + +static void prepare_read(struct io_args *a, int argc, VALUE *argv, VALUE io) +{ + VALUE length; + + a->io = io; + a->fd = my_fileno(io); + rb_scan_args(argc, argv, "11", &length, &a->buf); + a->len = NUM2LONG(length); + if (NIL_P(a->buf)) { + a->buf = rb_str_new(NULL, a->len); + } else { + StringValue(a->buf); + rb_str_resize(a->buf, a->len); + } + a->ptr = RSTRING_PTR(a->buf); +} + +static int read_check(struct io_args *a, long n, const char *msg, int io_wait) +{ + if (n == -1) { + if (errno == EINTR) + return -1; + rb_str_set_len(a->buf, 0); + if (errno == EAGAIN) { + if (io_wait) { + kgio_wait_readable(a->io, a->fd); + + /* buf may be modified in other thread/fiber */ + rb_str_resize(a->buf, a->len); + a->ptr = RSTRING_PTR(a->buf); + return -1; + } else { + a->buf = mKgio_WaitReadable; + return 0; + } + } + rb_sys_fail(msg); + } + rb_str_set_len(a->buf, n); + if (n == 0) + a->buf = Qnil; + return 0; +} + +static VALUE my_read(int io_wait, int argc, VALUE *argv, VALUE io) +{ + struct io_args a; + long n; + + prepare_read(&a, argc, argv, io); + set_nonblocking(a.fd); +retry: + n = (long)read(a.fd, a.ptr, a.len); + if (read_check(&a, n, "read", io_wait) != 0) + goto retry; + return a.buf; +} + +/* + * call-seq: + * + * io.kgio_read(maxlen) -> buffer + * io.kgio_read(maxlen, buffer) -> buffer + * + * Reads at most maxlen bytes from the stream socket. Returns with a + * newly allocated buffer, or may reuse an existing buffer if supplied. + * + * Calls the method assigned to Kgio.wait_readable, or blocks in a + * thread-safe manner for writability. + * + * Returns nil on EOF. + * + * This behaves like read(2) and IO#readpartial, NOT fread(3) or + * IO#read which possess read-in-full behavior. + */ +static VALUE kgio_read(int argc, VALUE *argv, VALUE io) +{ + return my_read(1, argc, argv, io); +} + +/* + * call-seq: + * + * io.kgio_tryread(maxlen) -> buffer + * io.kgio_tryread(maxlen, buffer) -> buffer + * + * Reads at most maxlen bytes from the stream socket. Returns with a + * newly allocated buffer, or may reuse an existing buffer if supplied. + * + * Returns nil on EOF. + * + * Returns Kgio::WaitReadable if EAGAIN is encountered. + */ +static VALUE kgio_tryread(int argc, VALUE *argv, VALUE io) +{ + return my_read(0, argc, argv, io); +} + +#ifdef USE_MSG_DONTWAIT +static VALUE my_recv(int io_wait, int argc, VALUE *argv, VALUE io) +{ + struct io_args a; + long n; + + prepare_read(&a, argc, argv, io); +retry: + n = (long)recv(a.fd, a.ptr, a.len, MSG_DONTWAIT); + if (read_check(&a, n, "recv", io_wait) != 0) + goto retry; + return a.buf; +} + +/* + * This method may be optimized on some systems (e.g. GNU/Linux) to use + * MSG_DONTWAIT to avoid explicitly setting the O_NONBLOCK flag via fcntl. + * Otherwise this is the same as Kgio::PipeMethods#kgio_read + */ +static VALUE kgio_recv(int argc, VALUE *argv, VALUE io) +{ + return my_recv(1, argc, argv, io); +} + +/* + * This method may be optimized on some systems (e.g. GNU/Linux) to use + * MSG_DONTWAIT to avoid explicitly setting the O_NONBLOCK flag via fcntl. + * Otherwise this is the same as Kgio::PipeMethods#kgio_tryread + */ +static VALUE kgio_tryrecv(int argc, VALUE *argv, VALUE io) +{ + return my_recv(0, argc, argv, io); +} +#else /* ! USE_MSG_DONTWAIT */ +# define kgio_recv kgio_read +# define kgio_tryrecv kgio_tryread +#endif /* USE_MSG_DONTWAIT */ + +static void prepare_write(struct io_args *a, VALUE io, VALUE str) +{ + a->buf = (TYPE(str) == T_STRING) ? str : rb_obj_as_string(str); + a->ptr = RSTRING_PTR(a->buf); + a->len = RSTRING_LEN(a->buf); + a->io = io; + a->fd = my_fileno(io); +} + +static int write_check(struct io_args *a, long n, const char *msg, int io_wait) +{ + if (a->len == n) { +done: + a->buf = Qnil; + } else if (n == -1) { + if (errno == EINTR) + return -1; + if (errno == EAGAIN) { + long written = RSTRING_LEN(a->buf) - a->len; + + if (io_wait) { + kgio_wait_writable(a->io, a->fd); + + /* buf may be modified in other thread/fiber */ + a->len = RSTRING_LEN(a->buf) - written; + if (a->len <= 0) + goto done; + a->ptr = RSTRING_PTR(a->buf) + written; + return -1; + } else if (written > 0) { + a->buf = rb_str_new(a->ptr + n, a->len - n); + } else { + a->buf = mKgio_WaitWritable; + } + return 0; + } + rb_sys_fail(msg); + } else { + assert(n >= 0 && n < a->len && "write/send syscall broken?"); + a->ptr += n; + a->len -= n; + return -1; + } + return 0; +} + +static VALUE my_write(VALUE io, VALUE str, int io_wait) +{ + struct io_args a; + long n; + + prepare_write(&a, io, str); + set_nonblocking(a.fd); +retry: + n = (long)write(a.fd, a.ptr, a.len); + if (write_check(&a, n, "write", io_wait) != 0) + goto retry; + return a.buf; +} + +/* + * call-seq: + * + * io.kgio_write(str) -> nil + * + * Returns nil when the write completes. + * + * Calls the method Kgio.wait_writable if it is set. Otherwise this + * blocks in a thread-safe manner until all data is written or a + * fatal error occurs. + */ +static VALUE kgio_write(VALUE io, VALUE str) +{ + return my_write(io, str, 1); +} + +/* + * call-seq: + * + * io.kgio_trywrite(str) -> nil or Kgio::WaitWritable + * + * Returns nil if the write was completed in full. + * + * Returns a String containing the unwritten portion if EAGAIN + * was encountered, but some portion was successfully written. + * + * Returns Kgio::WaitWritable if EAGAIN is encountered and nothing + * was written. + */ +static VALUE kgio_trywrite(VALUE io, VALUE str) +{ + return my_write(io, str, 0); +} + +#ifdef USE_MSG_DONTWAIT +/* + * This method behaves like Kgio::PipeMethods#kgio_write, except + * it will use send(2) with the MSG_DONTWAIT flag on sockets to + * avoid unnecessary calls to fcntl(2). + */ +static VALUE my_send(VALUE io, VALUE str, int io_wait) +{ + struct io_args a; + long n; + + prepare_write(&a, io, str); +retry: + n = (long)send(a.fd, a.ptr, a.len, MSG_DONTWAIT); + if (write_check(&a, n, "send", io_wait) != 0) + goto retry; + return a.buf; +} + +/* + * This method may be optimized on some systems (e.g. GNU/Linux) to use + * MSG_DONTWAIT to avoid explicitly setting the O_NONBLOCK flag via fcntl. + * Otherwise this is the same as Kgio::PipeMethods#kgio_write + */ +static VALUE kgio_send(VALUE io, VALUE str) +{ + return my_send(io, str, 1); +} + +/* + * This method may be optimized on some systems (e.g. GNU/Linux) to use + * MSG_DONTWAIT to avoid explicitly setting the O_NONBLOCK flag via fcntl. + * Otherwise this is the same as Kgio::PipeMethods#kgio_trywrite + */ +static VALUE kgio_trysend(VALUE io, VALUE str) +{ + return my_send(io, str, 0); +} +#else /* ! USE_MSG_DONTWAIT */ +# define kgio_send kgio_write +# define kgio_trysend kgio_trywrite +#endif /* ! USE_MSG_DONTWAIT */ + +void init_kgio_read_write(VALUE mKgio) +{ + VALUE mPipeMethods, mSocketMethods; + + mKgio_WaitReadable = rb_const_get(mKgio, rb_intern("WaitReadable")); + mKgio_WaitWritable = rb_const_get(mKgio, rb_intern("WaitWritable")); + + /* + * Document-module: Kgio::PipeMethods + * + * This module may be used used to create classes that respond to + * various Kgio methods for reading and writing. This is included + * in Kgio::Pipe by default. + */ + mPipeMethods = rb_define_module_under(mKgio, "PipeMethods"); + rb_define_method(mPipeMethods, "kgio_read", kgio_read, -1); + rb_define_method(mPipeMethods, "kgio_write", kgio_write, 1); + rb_define_method(mPipeMethods, "kgio_tryread", kgio_tryread, -1); + rb_define_method(mPipeMethods, "kgio_trywrite", kgio_trywrite, 1); + + /* + * Document-module: Kgio::SocketMethods + * + * This method behaves like Kgio::PipeMethods, but contains + * optimizations for sockets on certain operating systems + * (e.g. GNU/Linux). + */ + mSocketMethods = rb_define_module_under(mKgio, "SocketMethods"); + rb_define_method(mSocketMethods, "kgio_read", kgio_recv, -1); + rb_define_method(mSocketMethods, "kgio_write", kgio_send, 1); + rb_define_method(mSocketMethods, "kgio_tryread", kgio_tryrecv, -1); + rb_define_method(mSocketMethods, "kgio_trywrite", kgio_trysend, 1); + + /* + * Returns the client IPv4 address of the socket in dotted quad + * form as a string. This is always the value of the + * Kgio::LOCALHOST constant for UNIX domain sockets. + */ + rb_define_attr(mSocketMethods, "kgio_addr", 1, 1); +} diff --git a/ext/kgio/sock_for_fd.h b/ext/kgio/sock_for_fd.h index db5cba5..ab704b1 100644 --- a/ext/kgio/sock_for_fd.h +++ b/ext/kgio/sock_for_fd.h @@ -1,3 +1,5 @@ +#ifndef SOCK_FOR_FD_H +#define SOCK_FOR_FD_H #include #ifdef HAVE_RUBY_IO_H # include @@ -64,3 +66,4 @@ static void init_sock_for_fd(void) #if SOCK_FOR_FD > 0 # define init_sock_for_fd() if (0) #endif +#endif /* SOCK_FOR_FD_H */ diff --git a/ext/kgio/wait.c b/ext/kgio/wait.c new file mode 100644 index 0000000..4fe6d25 --- /dev/null +++ b/ext/kgio/wait.c @@ -0,0 +1,115 @@ +#include "kgio.h" + +static ID io_wait_rd, io_wait_wr; + +void kgio_wait_readable(VALUE io, int fd) +{ + if (io_wait_rd) { + (void)rb_funcall(io, io_wait_rd, 0, 0); + } else { + if (!rb_io_wait_readable(fd)) + rb_sys_fail("wait readable"); + } +} + +void kgio_wait_writable(VALUE io, int fd) +{ + if (io_wait_wr) { + (void)rb_funcall(io, io_wait_wr, 0, 0); + } else { + if (!rb_io_wait_writable(fd)) + rb_sys_fail("wait writable"); + } +} + +/* + * call-seq: + * + * Kgio.wait_readable = :method_name + * Kgio.wait_readable = nil + * + * Sets a method for kgio_read to call when a read would block. + * This is useful for non-blocking frameworks that use Fibers, + * as the method referred to this may cause the current Fiber + * to yield execution. + * + * A special value of nil will cause Ruby to wait using the + * rb_io_wait_readable() function. + */ +static VALUE set_wait_rd(VALUE mod, VALUE sym) +{ + switch (TYPE(sym)) { + case T_SYMBOL: + io_wait_rd = SYM2ID(sym); + return sym; + case T_NIL: + io_wait_rd = 0; + return sym; + } + rb_raise(rb_eTypeError, "must be a symbol or nil"); + return sym; +} + +/* + * call-seq: + * + * Kgio.wait_writable = :method_name + * Kgio.wait_writable = nil + * + * Sets a method for kgio_write to call when a read would block. + * This is useful for non-blocking frameworks that use Fibers, + * as the method referred to this may cause the current Fiber + * to yield execution. + * + * A special value of nil will cause Ruby to wait using the + * rb_io_wait_writable() function. + */ +static VALUE set_wait_wr(VALUE mod, VALUE sym) +{ + switch (TYPE(sym)) { + case T_SYMBOL: + io_wait_wr = SYM2ID(sym); + return sym; + case T_NIL: + io_wait_wr = 0; + return sym; + } + rb_raise(rb_eTypeError, "must be a symbol or nil"); + return sym; +} + +/* + * call-seq: + * + * Kgio.wait_writable -> Symbol or nil + * + * Returns the symbolic method name of the method assigned to + * call when EAGAIN is occurs on a Kgio::PipeMethods#kgio_write + * or Kgio::SocketMethods#kgio_write call + */ +static VALUE wait_wr(VALUE mod) +{ + return io_wait_wr ? ID2SYM(io_wait_wr) : Qnil; +} + +/* + * call-seq: + * + * Kgio.wait_readable -> Symbol or nil + * + * Returns the symbolic method name of the method assigned to + * call when EAGAIN is occurs on a Kgio::PipeMethods#kgio_read + * or Kgio::SocketMethods#kgio_read call. + */ +static VALUE wait_rd(VALUE mod) +{ + return io_wait_rd ? ID2SYM(io_wait_rd) : Qnil; +} + +void init_kgio_wait(VALUE mKgio) +{ + rb_define_singleton_method(mKgio, "wait_readable=", set_wait_rd, 1); + rb_define_singleton_method(mKgio, "wait_writable=", set_wait_wr, 1); + rb_define_singleton_method(mKgio, "wait_readable", wait_rd, 0); + rb_define_singleton_method(mKgio, "wait_writable", wait_wr, 0); +} diff --git a/lib/kgio.rb b/lib/kgio.rb index e104f46..ae7eac8 100644 --- a/lib/kgio.rb +++ b/lib/kgio.rb @@ -1,3 +1,21 @@ +# -*- encoding: binary -*- +require 'socket' +module Kgio + + # The IPv4 address of UNIX domain sockets, useful for creating + # Rack (and CGI) servers that also serve HTTP traffic over + # UNIX domain sockets. + LOCALHOST = '127.0.0.1' +end + +# Kgio::PipeMethods#kgio_tryread and Kgio::SocketMethods#kgio_tryread will +# return this constant when waiting for a read is required. +module Kgio::WaitReadable; end + +# PipeMethods#kgio_trywrite and SocketMethods#kgio_trywrite will +# return this constant when waiting for a read is required. +module Kgio::WaitWritable; end + require 'kgio_ext' # use Kgio::Pipe.popen and Kgio::Pipe.new instead of IO.popen -- cgit v1.2.3-24-ge0c7