From 5c480aee3067006b5da6d45b7de41d8401b70848 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Tue, 8 Feb 2011 10:41:51 +0000 Subject: preliminary poll(2) support It's a nice alternative to IO.select for higher-numbered file descriptors, especially sparse ones. Our interface also generates less garbage than IO.select does. --- .document | 1 + ext/kgio/extconf.rb | 1 + ext/kgio/kgio.h | 1 + ext/kgio/kgio_ext.c | 1 + ext/kgio/poll.c | 186 ++++++++++++++++++++++++++++++++++++++++++++++++++++ test/test_poll.rb | 68 +++++++++++++++++++ 6 files changed, 258 insertions(+) create mode 100644 ext/kgio/poll.c create mode 100644 test/test_poll.rb diff --git a/.document b/.document index f3e553b..c6c1568 100644 --- a/.document +++ b/.document @@ -12,3 +12,4 @@ ext/kgio/connect.c ext/kgio/kgio_ext.c ext/kgio/read_write.c ext/kgio/wait.c +ext/kgio/poll.c diff --git a/ext/kgio/extconf.rb b/ext/kgio/extconf.rb index 3758e92..86e660a 100644 --- a/ext/kgio/extconf.rb +++ b/ext/kgio/extconf.rb @@ -2,6 +2,7 @@ require 'mkmf' $CPPFLAGS << ' -D_GNU_SOURCE' $CPPFLAGS << ' -DPOSIX_C_SOURCE=1' +have_func("poll", "poll.h") have_func("getaddrinfo", %w(sys/types.h sys/socket.h netdb.h)) or abort "getaddrinfo required" have_func("getnameinfo", %w(sys/socket.h netdb.h)) or diff --git a/ext/kgio/kgio.h b/ext/kgio/kgio.h index 244bae5..1fdc36d 100644 --- a/ext/kgio/kgio.h +++ b/ext/kgio/kgio.h @@ -35,6 +35,7 @@ void init_kgio_read_write(void); void init_kgio_accept(void); void init_kgio_connect(void); void init_kgio_autopush(void); +void init_kgio_poll(void); void kgio_autopush_accept(VALUE, VALUE); void kgio_autopush_recv(VALUE); diff --git a/ext/kgio/kgio_ext.c b/ext/kgio/kgio_ext.c index d106d05..f50b3c8 100644 --- a/ext/kgio/kgio_ext.c +++ b/ext/kgio/kgio_ext.c @@ -7,4 +7,5 @@ void Init_kgio_ext(void) init_kgio_connect(); init_kgio_accept(); init_kgio_autopush(); + init_kgio_poll(); } diff --git a/ext/kgio/poll.c b/ext/kgio/poll.c new file mode 100644 index 0000000..41caafd --- /dev/null +++ b/ext/kgio/poll.c @@ -0,0 +1,186 @@ +#include "kgio.h" +#if defined(HAVE_RB_THREAD_BLOCKING_REGION) && defined(HAVE_POLL) +#include +#ifdef HAVE_RUBY_ST_H +# include +#else +# include +#endif + +static VALUE sym_wait_readable, sym_wait_writable; +static ID id_clear; + +struct poll_args { + struct pollfd *fds; + nfds_t nfds; + int timeout; + VALUE ios; + st_table *fd_to_io; +}; + +static int num2timeout(VALUE timeout) +{ + switch (TYPE(timeout)) { + case T_NIL: return -1; + case T_FIXNUM: return FIX2INT(timeout); + case T_BIGNUM: return NUM2INT(timeout); + } + rb_raise(rb_eTypeError, "timeout must be integer or nil"); + return 0; +} + +static VALUE poll_free(VALUE args) +{ + struct poll_args *a = (struct poll_args *)args; + + if (a->fds) + xfree(a->fds); + if (a->fd_to_io) + st_free_table(a->fd_to_io); + + return Qnil; +} + +static short value2events(VALUE event) +{ + if (event == sym_wait_readable) return POLLIN; + if (event == sym_wait_writable) return POLLOUT; + if (TYPE(event) == T_FIXNUM) return (short)FIX2INT(event); + rb_raise(rb_eArgError, "unrecognized event"); +} + +static int io_to_pollfd_i(VALUE key, VALUE value, VALUE args) +{ + struct poll_args *a = (struct poll_args *)args; + struct pollfd *pollfd = &a->fds[a->nfds++]; + + pollfd->fd = my_fileno(key); + pollfd->events = value2events(value); + st_insert(a->fd_to_io, (st_data_t)pollfd->fd, (st_data_t)key); + return ST_CONTINUE; +} + +static void hash2pollfds(struct poll_args *a) +{ + a->fds = xmalloc(sizeof(struct poll_args) * RHASH_SIZE(a->ios)); + a->fd_to_io = st_init_numtable(); + rb_hash_foreach(a->ios, io_to_pollfd_i, (VALUE)a); +} + +static VALUE nogvl_poll(void *ptr) +{ + struct poll_args *a = ptr; + return (VALUE)poll(a->fds, a->nfds, a->timeout); +} + +static VALUE poll_result(int nr, struct poll_args *a) +{ + struct pollfd *fds = a->fds; + VALUE io; + int rc; + + if ((nfds_t)nr != a->nfds) + rb_funcall(a->ios, id_clear, 0); + for (; nr > 0; fds++) { + if (fds->revents == 0) + continue; + --nr; + rc = st_lookup(a->fd_to_io, (st_data_t)fds->fd, &io); + assert(rc == 1 && "fd => IO mapping failed"); + rb_hash_aset(a->ios, io, INT2FIX((int)fds->revents)); + } + return a->ios; +} + +static VALUE do_poll(VALUE args) +{ + struct poll_args *a = (struct poll_args *)args; + int nr; + + Check_Type(a->ios, T_HASH); + hash2pollfds(a); + + nr = (int)rb_thread_blocking_region(nogvl_poll, a, RUBY_UBF_IO, NULL); + if (nr < 0) rb_sys_fail("poll"); + if (nr == 0) return Qnil; + + return poll_result(nr, a); +} + +/* + * call-seq: + * + * Kgio.poll({ $stdin => :wait_readable }, 100) -> hash or nil + * Kgio.poll({ $stdin => Kgio::POLLIN }, 100) -> hash or nil + * + * Accepts an input hash with IO objects to wait for as the key and + * the events to wait for as its value. The events may either be + * +:wait_readable+ or +:wait_writable+ symbols or a Fixnum mask of + * Kgio::POLL* constants: + * + * Kgio::POLLIN - there is data to read + * Kgio::POLLPRI - there is urgent data to read + * Kgio::POLLOUT - writing will not block + * Kgio::POLLRDHUP - peer has shutdown writes (Linux 2.6.17+ only) + * + * Timeout is specified in Integer milliseconds just like the underlying + * poll(2), not in seconds like IO.select. A nil timeout means to wait + * forever. It must be an Integer or nil. + * + * Kgio.poll modifies and returns its input hash on success with the + * IO-like object as the key and an Integer mask of events as the hash + * value. It can return any of the events specified in the input + * above, along with the following events: + * + * Kgio::POLLERR - error condition occurred on the descriptor + * Kgio::POLLHUP - hang up + * Kgio::POLLNVAL - invalid request (bad file descriptor) + * + * This method is only available under Ruby 1.9 or any other + * implementations that uses native threads and rb_thread_blocking_region() + */ +static VALUE s_poll(int argc, VALUE *argv, VALUE self) +{ + VALUE timeout; + struct poll_args a; + + rb_scan_args(argc, argv, "11", &a.ios, &timeout); + a.timeout = num2timeout(timeout); + a.nfds = 0; + a.fds = NULL; + a.fd_to_io = NULL; + + return rb_ensure(do_poll, (VALUE)&a, poll_free, (VALUE)&a); +} + +void init_kgio_poll(void) +{ + VALUE mKgio = rb_define_module("Kgio"); + rb_define_singleton_method(mKgio, "poll", s_poll, -1); + + sym_wait_readable = ID2SYM(rb_intern("wait_readable")); + sym_wait_writable = ID2SYM(rb_intern("wait_writable")); + id_clear = rb_intern("clear"); + +#define c(x) rb_define_const(mKgio,#x,INT2NUM((int)x)) + +/* standard types */ + + c(POLLIN); + c(POLLPRI); + c(POLLOUT); + +#ifdef POLLRDHUP + c(POLLRDHUP); +#endif + +/* outputs */ + c(POLLERR); + c(POLLHUP); + c(POLLNVAL); +} +#else /* ! HAVE_RB_THREAD_BLOCKING_REGION */ +void init_kgio_poll(void) +{ +} +#endif /* ! HAVE_RB_THREAD_BLOCKIONG_REGION */ diff --git a/test/test_poll.rb b/test/test_poll.rb new file mode 100644 index 0000000..8cade91 --- /dev/null +++ b/test/test_poll.rb @@ -0,0 +1,68 @@ +require 'test/unit' +$-w = true +require 'kgio' + +class TestPoll < Test::Unit::TestCase + def teardown + [ @rd, @wr ].each { |io| io.close unless io.closed? } + end + + def setup + @rd, @wr = IO.pipe + end + + def test_constants + assert_kind_of Integer, Kgio::POLLIN + assert_kind_of Integer, Kgio::POLLOUT + assert_kind_of Integer, Kgio::POLLPRI + assert_kind_of Integer, Kgio::POLLHUP + assert_kind_of Integer, Kgio::POLLERR + assert_kind_of Integer, Kgio::POLLNVAL + end + + def test_poll_symbol + set = { @rd => :wait_readable, @wr => :wait_writable } + res = Kgio.poll(set) + assert_equal({@wr => Kgio::POLLOUT}, res) + assert_equal set.object_id, res.object_id + end + + def test_poll_integer + set = { @wr => Kgio::POLLOUT|Kgio::POLLHUP } + res = Kgio.poll(set) + assert_equal({@wr => Kgio::POLLOUT}, res) + assert_equal set.object_id, res.object_id + end + + def test_poll_timeout + t0 = Time.now + res = Kgio.poll({}, 10) + diff = Time.now - t0 + assert diff >= 0.010, "diff=#{diff}" + assert_nil res + end + + def test_poll_interrupt + foo = nil + oldquit = trap(:QUIT) { foo = :bar } + thr = Thread.new { sleep 0.100; Process.kill(:QUIT, $$) } + t0 = Time.now + assert_raises(Errno::EINTR) { Kgio.poll({}) } + diff = Time.now - t0 + thr.join + assert diff >= 0.010, "diff=#{diff}" + ensure + trap(:QUIT, "DEFAULT") + end + + def test_poll_close + foo = nil + thr = Thread.new { sleep 0.100; @wr.close } + t0 = Time.now + res = Kgio.poll({@rd => Kgio::POLLIN}) + diff = Time.now - t0 + thr.join + assert_equal([ @rd ], res.keys) + assert diff >= 0.010, "diff=#{diff}" + end +end if Kgio.respond_to?(:poll) -- cgit v1.2.3-24-ge0c7