diff options
author | Eric Wong <e@yhbt.net> | 2010-09-27 22:55:52 +0000 |
---|---|---|
committer | Eric Wong <e@yhbt.net> | 2010-09-27 22:56:20 +0000 |
commit | 50b86bf23063f3e6c3777b39c9464f73ccfd6ef5 (patch) | |
tree | 60e7e0ee84ccd31f19c6007445a6ff24f443a35d | |
parent | 5123d66fe0b2dad67539a20fe5b91f5b9afd814a (diff) | |
download | kgio-50b86bf23063f3e6c3777b39c9464f73ccfd6ef5.tar.gz |
Somebody's gotta do it...
-rw-r--r-- | ext/kgio/kgio_ext.c | 374 | ||||
-rw-r--r-- | lib/kgio.rb | 10 |
2 files changed, 317 insertions, 67 deletions
diff --git a/ext/kgio/kgio_ext.c b/ext/kgio/kgio_ext.c index f717db5..f6f5d3e 100644 --- a/ext/kgio/kgio_ext.c +++ b/ext/kgio/kgio_ext.c @@ -111,23 +111,6 @@ static int read_check(struct io_args *a, long n, const char *msg, int io_wait) return 0; } -/* - * Document-method: Kgio::PipeMethods#kgio_read - * - * call-seq: - * - * socket.kgio_read(maxlen) -> buffer - * socket.kgio_read(maxlen, buffer) -> buffer - * - * Reads at most maxlen bytes from the stream socket. Returns with a - * newly allocated buffer, or may reuse an existing buffer. This - * calls the method identified by Kgio.wait_readable, or uses - * the normal, thread-safe Ruby function to wait for readability. - * This returns nil on EOF. - * - * This behaves like read(2) and IO#readpartial, NOT fread(3) or - * IO#read which possess read-in-full behavior. - */ static VALUE my_read(int io_wait, int argc, VALUE *argv, VALUE io) { struct io_args a; @@ -142,11 +125,41 @@ retry: return a.buf; } +/* + * call-seq: + * + * io.kgio_read(maxlen) -> buffer + * io.kgio_read(maxlen, buffer) -> buffer + * + * Reads at most maxlen bytes from the stream socket. Returns with a + * newly allocated buffer, or may reuse an existing buffer if supplied. + * + * Calls the method assigned to Kgio.wait_readable, or blocks in a + * thread-safe manner for writability. + * + * Returns nil on EOF. + * + * This behaves like read(2) and IO#readpartial, NOT fread(3) or + * IO#read which possess read-in-full behavior. + */ static VALUE kgio_read(int argc, VALUE *argv, VALUE io) { return my_read(1, argc, argv, io); } +/* + * call-seq: + * + * io.kgio_tryread(maxlen) -> buffer + * io.kgio_tryread(maxlen, buffer) -> buffer + * + * Reads at most maxlen bytes from the stream socket. Returns with a + * newly allocated buffer, or may reuse an existing buffer if supplied. + * + * Returns nil on EOF. + * + * Returns Kgio::WaitReadable if EAGAIN is encountered. + */ static VALUE kgio_tryread(int argc, VALUE *argv, VALUE io) { return my_read(0, argc, argv, io); @@ -166,11 +179,21 @@ retry: return a.buf; } +/* + * This method may be optimized on some systems (e.g. GNU/Linux) to use + * MSG_DONTWAIT to avoid explicitly setting the O_NONBLOCK flag via fcntl. + * Otherwise this is the same as Kgio::PipeMethods#kgio_read + */ static VALUE kgio_recv(int argc, VALUE *argv, VALUE io) { return my_recv(1, argc, argv, io); } +/* + * This method may be optimized on some systems (e.g. GNU/Linux) to use + * MSG_DONTWAIT to avoid explicitly setting the O_NONBLOCK flag via fcntl. + * Otherwise this is the same as Kgio::PipeMethods#kgio_tryread + */ static VALUE kgio_tryrecv(int argc, VALUE *argv, VALUE io) { return my_recv(0, argc, argv, io); @@ -233,9 +256,15 @@ retry: } /* - * Returns true if the write was completed. + * call-seq: * - * Calls the method Kgio.wait_writable is not set + * io.kgio_write(str) -> nil + * + * Returns nil when the write completes. + * + * Calls the method Kgio.wait_writable if it is set. Otherwise this + * blocks in a thread-safe manner until all data is written or a + * fatal error occurs. */ static VALUE kgio_write(VALUE io, VALUE str) { @@ -243,11 +272,16 @@ static VALUE kgio_write(VALUE io, VALUE str) } /* + * call-seq: + * + * io.kgio_trywrite(str) -> nil or Kgio::WaitWritable + * + * Returns nil if the write was completed in full. + * * Returns a String containing the unwritten portion if there was a - * partial write. Will return Kgio::WaitReadable if EAGAIN is - * encountered. + * partial write. * - * Returns true if the write completed in full. + * Returns Kgio::WaitWritable if EAGAIN is encountered. */ static VALUE kgio_trywrite(VALUE io, VALUE str) { @@ -273,11 +307,21 @@ retry: return a.buf; } +/* + * This method may be optimized on some systems (e.g. GNU/Linux) to use + * MSG_DONTWAIT to avoid explicitly setting the O_NONBLOCK flag via fcntl. + * Otherwise this is the same as Kgio::PipeMethods#kgio_write + */ static VALUE kgio_send(VALUE io, VALUE str) { return my_send(io, str, 1); } +/* + * This method may be optimized on some systems (e.g. GNU/Linux) to use + * MSG_DONTWAIT to avoid explicitly setting the O_NONBLOCK flag via fcntl. + * Otherwise this is the same as Kgio::PipeMethods#kgio_trywrite + */ static VALUE kgio_trysend(VALUE io, VALUE str) { return my_send(io, str, 0); @@ -291,6 +335,7 @@ static VALUE kgio_trysend(VALUE io, VALUE str) * call-seq: * * Kgio.wait_readable = :method_name + * Kgio.wait_readable = nil * * Sets a method for kgio_read to call when a read would block. * This is useful for non-blocking frameworks that use Fibers, @@ -298,8 +343,7 @@ static VALUE kgio_trysend(VALUE io, VALUE str) * to yield execution. * * A special value of nil will cause Ruby to wait using the - * rb_io_wait_readable() function, giving kgio_read similar semantics to - * IO#readpartial. + * rb_io_wait_readable() function. */ static VALUE set_wait_rd(VALUE mod, VALUE sym) { @@ -315,6 +359,20 @@ static VALUE set_wait_rd(VALUE mod, VALUE sym) return sym; } +/* + * call-seq: + * + * Kgio.wait_writable = :method_name + * Kgio.wait_writable = nil + * + * Sets a method for kgio_write to call when a read would block. + * This is useful for non-blocking frameworks that use Fibers, + * as the method referred to this may cause the current Fiber + * to yield execution. + * + * A special value of nil will cause Ruby to wait using the + * rb_io_wait_writable() function. + */ static VALUE set_wait_wr(VALUE mod, VALUE sym) { switch (TYPE(sym)) { @@ -329,11 +387,29 @@ static VALUE set_wait_wr(VALUE mod, VALUE sym) return sym; } +/* + * call-seq: + * + * Kgio.wait_writable -> Symbol or nil + * + * Returns the symbolic method name of the method assigned to + * call when EAGAIN is occurs on a Kgio::PipeMethods#kgio_write + * or Kgio::SocketMethods#kgio_write call + */ static VALUE wait_wr(VALUE mod) { return io_wait_wr ? ID2SYM(io_wait_wr) : Qnil; } +/* + * call-seq: + * + * Kgio.wait_readable -> Symbol or nil + * + * Returns the symbolic method name of the method assigned to + * call when EAGAIN is occurs on a Kgio::PipeMethods#kgio_read + * or Kgio::SocketMethods#kgio_read call. + */ static VALUE wait_rd(VALUE mod) { return io_wait_rd ? ID2SYM(io_wait_rd) : Qnil; @@ -348,13 +424,6 @@ static VALUE xaccept(void *ptr) #ifdef HAVE_RB_THREAD_BLOCKING_REGION # include <time.h> -static int thread_accept(struct accept_args *a, int force_nonblock) -{ - if (force_nonblock) - set_nonblocking(a->fd); - return (int)rb_thread_blocking_region(xaccept, a, RUBY_UBF_IO, 0); -} - /* * Try to use a (real) blocking accept() since that can prevent * thundering herds under Linux: @@ -364,6 +433,13 @@ static int thread_accept(struct accept_args *a, int force_nonblock) * because other processes may set non-blocking (especially during * a process upgrade) with Rainbows! concurrency model changes. */ +static int thread_accept(struct accept_args *a, int force_nonblock) +{ + if (force_nonblock) + set_nonblocking(a->fd); + return (int)rb_thread_blocking_region(xaccept, a, RUBY_UBF_IO, 0); +} + static void set_blocking_or_block(int fd) { static time_t last_set_blocking; @@ -459,6 +535,18 @@ static void in_addr_set(VALUE io, struct sockaddr_in *addr) rb_ivar_set(io, iv_kgio_addr, host); } +/* + * call-seq: + * + * server = Kgio::TCPServer.new('0.0.0.0', 80) + * server.kgio_tryaccept -> Kgio::Socket or nil + * + * Initiates a non-blocking accept and returns a generic Kgio::Socket + * object with the kgio_addr attribute set to the IP address of the + * connected client on success. + * + * Returns nil on EAGAIN, and raises on other errors. + */ static VALUE tcp_tryaccept(VALUE io) { struct sockaddr_in addr; @@ -470,6 +558,19 @@ static VALUE tcp_tryaccept(VALUE io) return rv; } +/* + * call-seq: + * + * server = Kgio::TCPServer.new('0.0.0.0', 80) + * server.kgio_accept -> Kgio::Socket or nil + * + * Initiates a blocking accept and returns a generic Kgio::Socket + * object with the kgio_addr attribute set to the IP address of + * the client on success. + * + * On Ruby implementations using native threads, this can use a blocking + * accept(2) (or accept4(2)) system call to avoid thundering herds. + */ static VALUE tcp_accept(VALUE io) { struct sockaddr_in addr; @@ -480,6 +581,18 @@ static VALUE tcp_accept(VALUE io) return rv; } +/* + * call-seq: + * + * server = Kgio::UNIXServer.new("/path/to/unix/socket") + * server.kgio_tryaccept -> Kgio::Socket or nil + * + * Initiates a non-blocking accept and returns a generic Kgio::Socket + * object with the kgio_addr attribute set (to the value of + * Kgio::LOCALHOST) on success. + * + * Returns nil on EAGAIN, and raises on other errors. + */ static VALUE unix_tryaccept(VALUE io) { VALUE rv = my_accept(io, NULL, NULL, 1); @@ -489,6 +602,19 @@ static VALUE unix_tryaccept(VALUE io) return rv; } +/* + * call-seq: + * + * server = Kgio::UNIXServer.new("/path/to/unix/socket") + * server.kgio_accept -> Kgio::Socket or nil + * + * Initiates a blocking accept and returns a generic Kgio::Socket + * object with the kgio_addr attribute set (to the value of + * Kgio::LOCALHOST) on success. + * + * On Ruby implementations using native threads, this can use a blocking + * accept(2) (or accept4(2)) system call to avoid thundering herds. + */ static VALUE unix_accept(VALUE io) { VALUE rv = my_accept(io, NULL, NULL, 0); @@ -497,18 +623,51 @@ static VALUE unix_accept(VALUE io) return rv; } +/* + * call-seq: + * + * Kgio.accept_cloexec? -> true or false + * + * Returns true if newly accepted Kgio::Sockets are created with the + * FD_CLOEXEC file descriptor flag, false if not. + */ static VALUE get_cloexec(VALUE mod) { return (accept4_flags & A4_SOCK_CLOEXEC) == A4_SOCK_CLOEXEC ? Qtrue : Qfalse; } +/* + * + * call-seq: + * + * Kgio.accept_nonblock? -> true or false + * + * Returns true if newly accepted Kgio::Sockets are created with the + * O_NONBLOCK file status flag, false if not. + */ static VALUE get_nonblock(VALUE mod) { return (accept4_flags & A4_SOCK_NONBLOCK) == A4_SOCK_NONBLOCK ? Qtrue : Qfalse; } +/* + * call-seq: + * + * Kgio.accept_cloexec = true + * Kgio.accept_clocexec = false + * + * Sets whether or not Kgio::Socket objects created by + * TCPServer#kgio_accept, + * TCPServer#kgio_tryaccept, + * UNIXServer#kgio_accept, + * and UNIXServer#kgio_tryaccept + * are created with the FD_CLOEXEC file descriptor flag. + * + * This is on by default, as there is little reason to deal to enable + * it for client sockets on a socket server. + */ static VALUE set_cloexec(VALUE mod, VALUE boolean) { switch (TYPE(boolean)) { @@ -523,6 +682,24 @@ static VALUE set_cloexec(VALUE mod, VALUE boolean) return Qnil; } +/* + * call-seq: + * + * Kgio.accept_nonblock = true + * Kgio.accept_nonblock = false + * + * Sets whether or not Kgio::Socket objects created by + * TCPServer#kgio_accept, + * TCPServer#kgio_tryaccept, + * UNIXServer#kgio_accept, + * and UNIXServer#kgio_tryaccept + * are created with the O_NONBLOCK file status flag. + * + * This defaults to +false+ for GNU/Linux where MSG_DONTWAIT is + * available (and on newer GNU/Linux, accept4() may also set + * the non-blocking flag. This defaults to +true+ on non-GNU/Linux + * systems. + */ static VALUE set_nonblock(VALUE mod, VALUE boolean) { switch (TYPE(boolean)) { @@ -615,25 +792,34 @@ static VALUE tcp_connect(VALUE klass, VALUE ip, VALUE port, int io_wait) * Kgio::TCPSocket.new('127.0.0.1', 80) -> socket * * Creates a new Kgio::TCPSocket object and initiates a - * non-blocking connection. The caller should select/poll - * on the socket for writability before attempting to write - * or optimistically attempt a write and handle Kgio::WaitWritable - * or Errno::EAGAIN. + * non-blocking connection. + * + * This may block and call any method assigned to Kgio.wait_writable. * * Unlike the TCPSocket.new in Ruby, this does NOT perform DNS * lookups (which is subject to a different set of timeouts and * best handled elsewhere). - * - * This is only intended as a convenience for testing, - * Kgio::Socket.new (along with a cached/memoized addr argument) - * is recommended for applications that repeatedly connect to - * the same backend servers. */ static VALUE kgio_tcp_connect(VALUE klass, VALUE ip, VALUE port) { return tcp_connect(klass, ip, port, 1); } +/* + * call-seq: + * + * Kgio::TCPSocket.start('127.0.0.1', 80) -> socket + * + * Creates a new Kgio::TCPSocket object and initiates a + * non-blocking connection. The caller should select/poll + * on the socket for writability before attempting to write + * or optimistically attempt a write and handle Kgio::WaitWritable + * or Errno::EAGAIN. + * + * Unlike the TCPSocket.new in Ruby, this does NOT perform DNS + * lookups (which is subject to a different set of timeouts and + * best handled elsewhere). + */ static VALUE kgio_tcp_start(VALUE klass, VALUE ip, VALUE port) { return tcp_connect(klass, ip, port, 0); @@ -663,21 +849,26 @@ static VALUE unix_connect(VALUE klass, VALUE path, int io_wait) * Kgio::UNIXSocket.new("/path/to/unix/socket") -> socket * * Creates a new Kgio::UNIXSocket object and initiates a - * non-blocking connection. The caller should select/poll - * on the socket for writability before attempting to write - * or optimistically attempt a write and handle Kgio::WaitWritable - * or Errno::EAGAIN. + * non-blocking connection. * - * This is only intended as a convenience for testing, - * Kgio::Socket.new (along with a cached/memoized addr argument) - * is recommended for applications that repeatedly connect to - * the same backend servers. + * This may block and call any method assigned to Kgio.wait_writable. */ static VALUE kgio_unix_connect(VALUE klass, VALUE path) { return unix_connect(klass, path, 1); } +/* + * call-seq: + * + * Kgio::UNIXSocket.start("/path/to/unix/socket") -> socket + * + * Creates a new Kgio::UNIXSocket object and initiates a + * non-blocking connection. The caller should select/poll + * on the socket for writability before attempting to write + * or optimistically attempt a write and handle Kgio::WaitWritable + * or Errno::EAGAIN. + */ static VALUE kgio_unix_start(VALUE klass, VALUE path) { return unix_connect(klass, path, 0); @@ -708,29 +899,42 @@ static VALUE stream_connect(VALUE klass, VALUE addr, int io_wait) return my_connect(klass, io_wait, domain, sockaddr, addrlen); } +/* call-seq: + * + * addr = Socket.pack_sockaddr_in(80, 'example.com') + * Kgio::Socket.connect(addr) -> socket + * + * addr = Socket.pack_sockaddr_un("/path/to/unix/socket") + * Kgio::Socket.connect(addr) -> socket + * + * Creates a generic Kgio::Socket object and initiates a + * non-blocking connection. + * + * This may block and call any method assigned to Kgio.wait_writable. + */ static VALUE kgio_connect(VALUE klass, VALUE addr) { return stream_connect(klass, addr, 1); } -static VALUE kgio_start(VALUE klass, VALUE addr) -{ - return stream_connect(klass, addr, 0); -} - -/* - * call-seq: +/* call-seq: * - * addr = Socket.pack_sockaddr_in(80, 'example.com') - * Kgio::Socket.new(addr) -> socket + * addr = Socket.pack_sockaddr_in(80, 'example.com') + * Kgio::Socket.start(addr) -> socket * - * addr = Socket.pack_sockaddr_un("/tmp/unix.sock") - * Kgio::Socket.new(addr) -> socket + * addr = Socket.pack_sockaddr_un("/path/to/unix/socket") + * Kgio::Socket.start(addr) -> socket * - * Generic connect method for addr generated by Socket.pack_sockaddr_in - * or Socket.pack_sockaddr_un + * Creates a generic Kgio::Socket object and initiates a + * non-blocking connection. The caller should select/poll + * on the socket for writability before attempting to write + * or optimistically attempt a write and handle Kgio::WaitWritable + * or Errno::EAGAIN. */ - +static VALUE kgio_start(VALUE klass, VALUE addr) +{ + return stream_connect(klass, addr, 0); +} void Init_kgio_ext(void) { @@ -739,21 +943,39 @@ void Init_kgio_ext(void) VALUE cUNIXServer, cTCPServer, cUNIXSocket, cTCPSocket; rb_require("socket"); + + /* + * Document-module: Kgio::Socket + * + * A generic socket class with Kgio::SocketMethods included. + * This is returned by all Kgio methods that accept(2) a connected + * stream socket. + */ cSocket = rb_const_get(rb_cObject, rb_intern("Socket")); cSocket = rb_define_class_under(mKgio, "Socket", cSocket); localhost = rb_str_new2("127.0.0.1"); + + /* + * The IPv4 address of UNIX domain sockets, useful for creating + * Rack (and CGI) servers that also serve HTTP traffic over + * UNIX domain sockets. + */ rb_const_set(mKgio, rb_intern("LOCALHOST"), localhost); /* - * The kgio_read method will return this when waiting for - * a read is required. + * Document-module: Kgio::WaitReadable + * + * PipeMethods#kgio_tryread and SocketMethods#kgio_tryread will + * return this constant when waiting for a read is required. */ mKgio_WaitReadable = rb_define_module_under(mKgio, "WaitReadable"); /* - * The kgio_write method will return this when waiting for - * a write is required. + * Document-module: Kgio::WaitWritable + * + * PipeMethods#kgio_trywrite and SocketMethods#kgio_trywrite will + * return this constant when waiting for a read is required. */ mKgio_WaitWritable = rb_define_module_under(mKgio, "WaitWritable"); @@ -766,19 +988,39 @@ void Init_kgio_ext(void) rb_define_singleton_method(mKgio, "accept_nonblock?", get_nonblock, 0); rb_define_singleton_method(mKgio, "accept_nonblock=", set_nonblock, 1); + /* + * Document-module: Kgio::PipeMethods + * + * This module may be used used to create classes that respond to + * various Kgio methods for reading and writing. This is included + * in Kgio::Pipe by default. + */ mPipeMethods = rb_define_module_under(mKgio, "PipeMethods"); rb_define_method(mPipeMethods, "kgio_read", kgio_read, -1); rb_define_method(mPipeMethods, "kgio_write", kgio_write, 1); rb_define_method(mPipeMethods, "kgio_tryread", kgio_tryread, -1); rb_define_method(mPipeMethods, "kgio_trywrite", kgio_trywrite, 1); + /* + * Document-module: Kgio::SocketMethods + * + * This method behaves like Kgio::PipeMethods, but contains + * optimizations for sockets on certain operating systems + * (e.g. GNU/Linux). + */ mSocketMethods = rb_define_module_under(mKgio, "SocketMethods"); rb_define_method(mSocketMethods, "kgio_read", kgio_recv, -1); rb_define_method(mSocketMethods, "kgio_write", kgio_send, 1); rb_define_method(mSocketMethods, "kgio_tryread", kgio_tryrecv, -1); rb_define_method(mSocketMethods, "kgio_trywrite", kgio_trysend, 1); + /* + * Returns the client IPv4 address of the socket in dotted quad + * form as a string. This is always the value of the + * Kgio::LOCALHOST constant for UNIX domain sockets. + */ rb_define_attr(mSocketMethods, "kgio_addr", 1, 1); + rb_include_module(cSocket, mSocketMethods); rb_define_singleton_method(cSocket, "new", kgio_connect, 1); rb_define_singleton_method(cSocket, "start", kgio_start, 1); diff --git a/lib/kgio.rb b/lib/kgio.rb index d8b8a35..e104f46 100644 --- a/lib/kgio.rb +++ b/lib/kgio.rb @@ -1,10 +1,18 @@ require 'kgio_ext' # use Kgio::Pipe.popen and Kgio::Pipe.new instead of IO.popen -# and IO.pipe to get kgio_read and kgio_write methods. +# and IO.pipe to get PipeMethods#kgio_read and PipeMethod#kgio_write +# methods. class Kgio::Pipe < IO include Kgio::PipeMethods class << self + + # call-seq: + # + # rd, wr = Kgio::Pipe.new + # + # This creates a new pipe(7) with Kgio::Pipe objects that respond + # to PipeMethods#kgio_read and PipeMethod#kgio_write alias new pipe end end |