diff options
Diffstat (limited to 'ext/kgio/read_write.c')
-rw-r--r-- | ext/kgio/read_write.c | 325 |
1 files changed, 325 insertions, 0 deletions
diff --git a/ext/kgio/read_write.c b/ext/kgio/read_write.c new file mode 100644 index 0000000..7f1748a --- /dev/null +++ b/ext/kgio/read_write.c @@ -0,0 +1,325 @@ +#include "kgio.h" +static VALUE mKgio_WaitReadable, mKgio_WaitWritable; + +/* + * we know MSG_DONTWAIT works properly on all stream sockets under Linux + * we can define this macro for other platforms as people care and + * notice. + */ +#if defined(__linux__) && ! defined(USE_MSG_DONTWAIT) +# define USE_MSG_DONTWAIT +#endif + +static void prepare_read(struct io_args *a, int argc, VALUE *argv, VALUE io) +{ + VALUE length; + + a->io = io; + a->fd = my_fileno(io); + rb_scan_args(argc, argv, "11", &length, &a->buf); + a->len = NUM2LONG(length); + if (NIL_P(a->buf)) { + a->buf = rb_str_new(NULL, a->len); + } else { + StringValue(a->buf); + rb_str_resize(a->buf, a->len); + } + a->ptr = RSTRING_PTR(a->buf); +} + +static int read_check(struct io_args *a, long n, const char *msg, int io_wait) +{ + if (n == -1) { + if (errno == EINTR) + return -1; + rb_str_set_len(a->buf, 0); + if (errno == EAGAIN) { + if (io_wait) { + kgio_wait_readable(a->io, a->fd); + + /* buf may be modified in other thread/fiber */ + rb_str_resize(a->buf, a->len); + a->ptr = RSTRING_PTR(a->buf); + return -1; + } else { + a->buf = mKgio_WaitReadable; + return 0; + } + } + rb_sys_fail(msg); + } + rb_str_set_len(a->buf, n); + if (n == 0) + a->buf = Qnil; + return 0; +} + +static VALUE my_read(int io_wait, int argc, VALUE *argv, VALUE io) +{ + struct io_args a; + long n; + + prepare_read(&a, argc, argv, io); + set_nonblocking(a.fd); +retry: + n = (long)read(a.fd, a.ptr, a.len); + if (read_check(&a, n, "read", io_wait) != 0) + goto retry; + return a.buf; +} + +/* + * call-seq: + * + * io.kgio_read(maxlen) -> buffer + * io.kgio_read(maxlen, buffer) -> buffer + * + * Reads at most maxlen bytes from the stream socket. Returns with a + * newly allocated buffer, or may reuse an existing buffer if supplied. + * + * Calls the method assigned to Kgio.wait_readable, or blocks in a + * thread-safe manner for writability. + * + * Returns nil on EOF. + * + * This behaves like read(2) and IO#readpartial, NOT fread(3) or + * IO#read which possess read-in-full behavior. + */ +static VALUE kgio_read(int argc, VALUE *argv, VALUE io) +{ + return my_read(1, argc, argv, io); +} + +/* + * call-seq: + * + * io.kgio_tryread(maxlen) -> buffer + * io.kgio_tryread(maxlen, buffer) -> buffer + * + * Reads at most maxlen bytes from the stream socket. Returns with a + * newly allocated buffer, or may reuse an existing buffer if supplied. + * + * Returns nil on EOF. + * + * Returns Kgio::WaitReadable if EAGAIN is encountered. + */ +static VALUE kgio_tryread(int argc, VALUE *argv, VALUE io) +{ + return my_read(0, argc, argv, io); +} + +#ifdef USE_MSG_DONTWAIT +static VALUE my_recv(int io_wait, int argc, VALUE *argv, VALUE io) +{ + struct io_args a; + long n; + + prepare_read(&a, argc, argv, io); +retry: + n = (long)recv(a.fd, a.ptr, a.len, MSG_DONTWAIT); + if (read_check(&a, n, "recv", io_wait) != 0) + goto retry; + return a.buf; +} + +/* + * This method may be optimized on some systems (e.g. GNU/Linux) to use + * MSG_DONTWAIT to avoid explicitly setting the O_NONBLOCK flag via fcntl. + * Otherwise this is the same as Kgio::PipeMethods#kgio_read + */ +static VALUE kgio_recv(int argc, VALUE *argv, VALUE io) +{ + return my_recv(1, argc, argv, io); +} + +/* + * This method may be optimized on some systems (e.g. GNU/Linux) to use + * MSG_DONTWAIT to avoid explicitly setting the O_NONBLOCK flag via fcntl. + * Otherwise this is the same as Kgio::PipeMethods#kgio_tryread + */ +static VALUE kgio_tryrecv(int argc, VALUE *argv, VALUE io) +{ + return my_recv(0, argc, argv, io); +} +#else /* ! USE_MSG_DONTWAIT */ +# define kgio_recv kgio_read +# define kgio_tryrecv kgio_tryread +#endif /* USE_MSG_DONTWAIT */ + +static void prepare_write(struct io_args *a, VALUE io, VALUE str) +{ + a->buf = (TYPE(str) == T_STRING) ? str : rb_obj_as_string(str); + a->ptr = RSTRING_PTR(a->buf); + a->len = RSTRING_LEN(a->buf); + a->io = io; + a->fd = my_fileno(io); +} + +static int write_check(struct io_args *a, long n, const char *msg, int io_wait) +{ + if (a->len == n) { +done: + a->buf = Qnil; + } else if (n == -1) { + if (errno == EINTR) + return -1; + if (errno == EAGAIN) { + long written = RSTRING_LEN(a->buf) - a->len; + + if (io_wait) { + kgio_wait_writable(a->io, a->fd); + + /* buf may be modified in other thread/fiber */ + a->len = RSTRING_LEN(a->buf) - written; + if (a->len <= 0) + goto done; + a->ptr = RSTRING_PTR(a->buf) + written; + return -1; + } else if (written > 0) { + a->buf = rb_str_new(a->ptr + n, a->len - n); + } else { + a->buf = mKgio_WaitWritable; + } + return 0; + } + rb_sys_fail(msg); + } else { + assert(n >= 0 && n < a->len && "write/send syscall broken?"); + a->ptr += n; + a->len -= n; + return -1; + } + return 0; +} + +static VALUE my_write(VALUE io, VALUE str, int io_wait) +{ + struct io_args a; + long n; + + prepare_write(&a, io, str); + set_nonblocking(a.fd); +retry: + n = (long)write(a.fd, a.ptr, a.len); + if (write_check(&a, n, "write", io_wait) != 0) + goto retry; + return a.buf; +} + +/* + * call-seq: + * + * io.kgio_write(str) -> nil + * + * Returns nil when the write completes. + * + * Calls the method Kgio.wait_writable if it is set. Otherwise this + * blocks in a thread-safe manner until all data is written or a + * fatal error occurs. + */ +static VALUE kgio_write(VALUE io, VALUE str) +{ + return my_write(io, str, 1); +} + +/* + * call-seq: + * + * io.kgio_trywrite(str) -> nil or Kgio::WaitWritable + * + * Returns nil if the write was completed in full. + * + * Returns a String containing the unwritten portion if EAGAIN + * was encountered, but some portion was successfully written. + * + * Returns Kgio::WaitWritable if EAGAIN is encountered and nothing + * was written. + */ +static VALUE kgio_trywrite(VALUE io, VALUE str) +{ + return my_write(io, str, 0); +} + +#ifdef USE_MSG_DONTWAIT +/* + * This method behaves like Kgio::PipeMethods#kgio_write, except + * it will use send(2) with the MSG_DONTWAIT flag on sockets to + * avoid unnecessary calls to fcntl(2). + */ +static VALUE my_send(VALUE io, VALUE str, int io_wait) +{ + struct io_args a; + long n; + + prepare_write(&a, io, str); +retry: + n = (long)send(a.fd, a.ptr, a.len, MSG_DONTWAIT); + if (write_check(&a, n, "send", io_wait) != 0) + goto retry; + return a.buf; +} + +/* + * This method may be optimized on some systems (e.g. GNU/Linux) to use + * MSG_DONTWAIT to avoid explicitly setting the O_NONBLOCK flag via fcntl. + * Otherwise this is the same as Kgio::PipeMethods#kgio_write + */ +static VALUE kgio_send(VALUE io, VALUE str) +{ + return my_send(io, str, 1); +} + +/* + * This method may be optimized on some systems (e.g. GNU/Linux) to use + * MSG_DONTWAIT to avoid explicitly setting the O_NONBLOCK flag via fcntl. + * Otherwise this is the same as Kgio::PipeMethods#kgio_trywrite + */ +static VALUE kgio_trysend(VALUE io, VALUE str) +{ + return my_send(io, str, 0); +} +#else /* ! USE_MSG_DONTWAIT */ +# define kgio_send kgio_write +# define kgio_trysend kgio_trywrite +#endif /* ! USE_MSG_DONTWAIT */ + +void init_kgio_read_write(VALUE mKgio) +{ + VALUE mPipeMethods, mSocketMethods; + + mKgio_WaitReadable = rb_const_get(mKgio, rb_intern("WaitReadable")); + mKgio_WaitWritable = rb_const_get(mKgio, rb_intern("WaitWritable")); + + /* + * Document-module: Kgio::PipeMethods + * + * This module may be used used to create classes that respond to + * various Kgio methods for reading and writing. This is included + * in Kgio::Pipe by default. + */ + mPipeMethods = rb_define_module_under(mKgio, "PipeMethods"); + rb_define_method(mPipeMethods, "kgio_read", kgio_read, -1); + rb_define_method(mPipeMethods, "kgio_write", kgio_write, 1); + rb_define_method(mPipeMethods, "kgio_tryread", kgio_tryread, -1); + rb_define_method(mPipeMethods, "kgio_trywrite", kgio_trywrite, 1); + + /* + * Document-module: Kgio::SocketMethods + * + * This method behaves like Kgio::PipeMethods, but contains + * optimizations for sockets on certain operating systems + * (e.g. GNU/Linux). + */ + mSocketMethods = rb_define_module_under(mKgio, "SocketMethods"); + rb_define_method(mSocketMethods, "kgio_read", kgio_recv, -1); + rb_define_method(mSocketMethods, "kgio_write", kgio_send, 1); + rb_define_method(mSocketMethods, "kgio_tryread", kgio_tryrecv, -1); + rb_define_method(mSocketMethods, "kgio_trywrite", kgio_trysend, 1); + + /* + * Returns the client IPv4 address of the socket in dotted quad + * form as a string. This is always the value of the + * Kgio::LOCALHOST constant for UNIX domain sockets. + */ + rb_define_attr(mSocketMethods, "kgio_addr", 1, 1); +} |