From: Eric Wong <normalperson@yhbt.net>
To: sleepy.penguin@librelist.org
Subject: Re: [sleepy.penguin] [PATCH 3/6] split Epoll and Epoll::IO, rewrite Epoll in Ruby
Date: Fri, 12 Apr 2013 20:38:20 +0000 [thread overview]
Message-ID: <20130412203820.GA17862@dcvr.yhbt.net> (raw)
In-Reply-To: 1365653855-1101-4-git-send-email-normalperson@yhbt.net
Eric Wong <normalperson@yhbt.net> wrote:
> --- a/ext/sleepy_penguin/epoll.c
> +++ b/ext/sleepy_penguin/epoll.c
> @@ -358,10 +168,11 @@ static int epoll_resume_p(uint64_t expire_at, struct ep_per_thread *ept)
> {
> uint64_t now;
>
> - ep_fd_check(ept->ep);
> -
> if (errno != EINTR)
> return 0;
> +
> + ep_fd_check(ept);
> +
> if (ept->timeout < 0)
> return 1;
> now = now_ms();
The above hunk was buggy and inconsistent with the green-thread-friendly
variant. Below is an updated patch:
--------------------------------8<-------------------------------
Subject: [PATCH] split Epoll and Epoll::IO, rewrite Epoll in Ruby
Epoll::IO is a dangerous, low-level class which is intended
for users aware of the GC and fork behavior of epoll in the
Linux kernel.
Rewriting the higher-level Epoll in Ruby makes it easier to
maintain, especially since Rubinius has no GVL while running
C extensions.
---
ext/sleepy_penguin/epoll.c | 603 ++++++---------------------------------
ext/sleepy_penguin/epoll_green.h | 8 +-
lib/sleepy_penguin.rb | 1 +
lib/sleepy_penguin/epoll.rb | 228 +++++++++++++++
test/test_epoll.rb | 15 +-
test/test_epoll_io.rb | 24 ++
test/test_epoll_optimizations.rb | 2 +-
7 files changed, 359 insertions(+), 522 deletions(-)
create mode 100644 lib/sleepy_penguin/epoll.rb
create mode 100644 test/test_epoll_io.rb
diff --git a/ext/sleepy_penguin/epoll.c b/ext/sleepy_penguin/epoll.c
index 3dcd357..2ddc71d 100644
--- a/ext/sleepy_penguin/epoll.c
+++ b/ext/sleepy_penguin/epoll.c
@@ -3,20 +3,12 @@
#include <pthread.h>
#include <time.h>
#include "missing_epoll.h"
-#ifdef HAVE_RUBY_ST_H
-# include <ruby/st.h>
-#else
-# include <st.h>
-#endif
#include "missing_rb_thread_fd_close.h"
#include "missing_rb_update_max_fd.h"
-#define EP_RECREATE (-2)
static pthread_key_t epoll_key;
-static st_table *active;
-static const int step = 64; /* unlikely to grow unless you're huge */
-static VALUE cEpoll_IO;
static ID id_for_fd;
+static VALUE cEpoll;
static uint64_t now_ms(void)
{
@@ -47,27 +39,31 @@ static VALUE unpack_event_data(struct epoll_event *event)
# endif
#endif
-struct rb_epoll {
- int fd;
- VALUE io;
- VALUE marks;
- VALUE flag_cache;
- int flags;
-};
-
struct ep_per_thread {
- struct rb_epoll *ep;
+ VALUE io;
+ int fd;
int timeout;
int maxevents;
int capa;
struct epoll_event events[FLEX_ARRAY];
};
-static struct ep_per_thread *ept_get(int maxevents)
+/* this will raise if the IO is closed */
+static int ep_fd_check(struct ep_per_thread *ept)
+{
+ int save_errno = errno;
+
+ ept->fd = rb_sp_fileno(ept->io);
+ errno = save_errno;
+
+ return 1;
+}
+
+static struct ep_per_thread *ept_get(VALUE self, int maxevents)
{
struct ep_per_thread *ept = pthread_getspecific(epoll_key);
- int err;
size_t size;
+ int err;
if (ept && ept->capa >= maxevents)
goto out;
@@ -87,253 +83,72 @@ static struct ep_per_thread *ept_get(int maxevents)
ept->capa = maxevents;
out:
ept->maxevents = maxevents;
+ ept->io = self;
+ ept->fd = rb_sp_fileno(ept->io);
return ept;
}
-static struct rb_epoll *ep_get(VALUE self)
-{
- struct rb_epoll *ep;
-
- Data_Get_Struct(self, struct rb_epoll, ep);
-
- return ep;
-}
-
-static void gcmark(void *ptr)
-{
- struct rb_epoll *ep = ptr;
-
- rb_gc_mark(ep->io);
- rb_gc_mark(ep->marks);
- rb_gc_mark(ep->flag_cache);
-}
-
-static void gcfree(void *ptr)
-{
- struct rb_epoll *ep = ptr;
-
- if (ep->fd >= 0) {
- st_data_t key = ep->fd;
- st_delete(active, &key, NULL);
- }
- if (NIL_P(ep->io) && ep->fd >= 0) {
- /* can't raise during GC, and close() never fails in Linux */
- (void)close(ep->fd);
- errno = 0;
- }
- /* let GC take care of the underlying IO object if there is one */
-
- xfree(ep);
-}
-
-static VALUE alloc(VALUE klass)
-{
- struct rb_epoll *ep;
- VALUE self;
-
- self = Data_Make_Struct(klass, struct rb_epoll, gcmark, gcfree, ep);
- ep->fd = -1;
- ep->io = Qnil;
- ep->marks = Qnil;
- ep->flag_cache = Qnil;
- ep->flags = 0;
-
- return self;
-}
-
-static void my_epoll_create(struct rb_epoll *ep)
-{
- ep->fd = epoll_create1(ep->flags);
-
- if (ep->fd == -1) {
- if (errno == EMFILE || errno == ENFILE || errno == ENOMEM) {
- rb_gc();
- ep->fd = epoll_create1(ep->flags);
- }
- if (ep->fd == -1)
- rb_sys_fail("epoll_create1");
- }
- rb_update_max_fd(ep->fd);
- st_insert(active, (st_data_t)ep->fd, (st_data_t)ep);
- ep->marks = rb_ary_new();
- ep->flag_cache = rb_ary_new();
-}
-
-static int ep_fd_check(struct rb_epoll *ep)
-{
- if (ep->fd == -1)
- rb_raise(rb_eIOError, "closed epoll descriptor");
- return 1;
-}
-
-static void ep_check(struct rb_epoll *ep)
-{
- if (ep->fd == EP_RECREATE)
- my_epoll_create(ep);
- ep_fd_check(ep);
- assert(TYPE(ep->marks) == T_ARRAY && "marks not initialized");
- assert(TYPE(ep->flag_cache) == T_ARRAY && "flag_cache not initialized");
-}
-
/*
* call-seq:
- * SleepyPenguin::Epoll.new([flags]) -> Epoll object
+ * SleepyPenguin::Epoll::IO.new(flags) -> Epoll::IO object
*
- * Creates a new Epoll object with an optional +flags+ argument.
- * +flags+ may currently be +:CLOEXEC+ or +0+ (or +nil+).
+ * Creates a new Epoll::IO object with the given +flags+ argument.
+ * +flags+ may currently be +CLOEXEC+ or +0+.
*/
-static VALUE init(int argc, VALUE *argv, VALUE self)
-{
- struct rb_epoll *ep = ep_get(self);
- VALUE fl;
-
- rb_scan_args(argc, argv, "01", &fl);
- ep->flags = rb_sp_get_flags(self, fl);
- my_epoll_create(ep);
-
- return self;
-}
-
-static VALUE ctl(VALUE self, VALUE io, VALUE flags, int op)
+static VALUE s_new(VALUE klass, VALUE _flags)
{
- struct epoll_event event;
- struct rb_epoll *ep = ep_get(self);
- int fd = rb_sp_fileno(io);
- int rv;
+ int flags = rb_sp_get_flags(klass, _flags);
+ int fd = epoll_create1(flags);
+ VALUE rv;
- ep_check(ep);
- event.events = rb_sp_get_uflags(self, flags);
- pack_event_data(&event, io);
-
- rv = epoll_ctl(ep->fd, op, fd, &event);
- if (rv == -1) {
- if (errno == ENOMEM) {
+ if (fd < 0) {
+ if (errno == EMFILE || errno == ENFILE || errno == ENOMEM) {
rb_gc();
- rv = epoll_ctl(ep->fd, op, fd, &event);
+ fd = epoll_create1(flags);
}
- if (rv == -1)
- rb_sys_fail("epoll_ctl");
- }
- switch (op) {
- case EPOLL_CTL_ADD:
- rb_ary_store(ep->marks, fd, io);
- /* fall-through */
- case EPOLL_CTL_MOD:
- flags = UINT2NUM(event.events);
- rb_ary_store(ep->flag_cache, fd, flags);
- break;
- case EPOLL_CTL_DEL:
- rb_ary_store(ep->marks, fd, Qnil);
- rb_ary_store(ep->flag_cache, fd, Qnil);
+ if (fd == -1)
+ rb_sys_fail("epoll_create1");
}
- return INT2NUM(rv);
+ rv = INT2FIX(fd);
+ return rb_call_super(1, &rv);
}
/*
* call-seq:
- * ep.set(io, flags) -> 0
+ * epoll_io.epoll_ctl(op, io, events) -> nil
*
- * Used to avoid exceptions when your app is too lazy to check
- * what state a descriptor is in, this sets the epoll descriptor
- * to watch an +io+ with the given +flags+
+ * Register, modify, or register a watch for a given +io+ for events.
*
- * +flags+ may be an array of symbols or an unsigned Integer bit mask:
+ * +op+ may be one of +EPOLL_CTL_ADD+, +EPOLL_CTL_MOD+, or +EPOLL_CTL_DEL+
+ * +io+ is an IO object or one which proxies via the +to_io+ method.
+ * +events+ is an integer mask of events to watch for.
*
- * - flags = [ :IN, :ET ]
- * - flags = SleepyPenguin::Epoll::IN | SleepyPenguin::Epoll::ET
- *
- * See constants in Epoll for more information.
+ * Returns nil on success.
*/
-static VALUE set(VALUE self, VALUE io, VALUE flags)
+static VALUE epctl(VALUE self, VALUE _op, VALUE io, VALUE events)
{
struct epoll_event event;
- struct rb_epoll *ep = ep_get(self);
+ int epfd = rb_sp_fileno(self);
int fd = rb_sp_fileno(io);
+ int op = NUM2INT(_op);
int rv;
- VALUE cur_io = rb_ary_entry(ep->marks, fd);
- ep_check(ep);
- event.events = rb_sp_get_uflags(self, flags);
+ event.events = NUM2UINT(events);
pack_event_data(&event, io);
- if (cur_io == io) {
- VALUE cur_flags = rb_ary_entry(ep->flag_cache, fd);
- uint32_t cur_events;
-
- assert(!NIL_P(cur_flags) && "cur_flags nil but cur_io is not");
- cur_events = NUM2UINT(cur_flags);
-
- if (!(cur_events & EPOLLONESHOT) && cur_events == event.events)
- return Qnil;
-
-fallback_mod:
- rv = epoll_ctl(ep->fd, EPOLL_CTL_MOD, fd, &event);
- if (rv == -1) {
- if (errno != ENOENT)
- rb_sys_fail("epoll_ctl - mod");
- errno = 0;
- rb_warn("epoll flag_cache failed (mod -> add)");
- goto fallback_add;
- }
- } else {
-fallback_add:
- rv = epoll_ctl(ep->fd, EPOLL_CTL_ADD, fd, &event);
- if (rv == -1) {
- if (errno != EEXIST)
- rb_sys_fail("epoll_ctl - add");
- errno = 0;
- rb_warn("epoll flag_cache failed (add -> mod)");
- goto fallback_mod;
- }
- rb_ary_store(ep->marks, fd, io);
- }
- flags = UINT2NUM(event.events);
- rb_ary_store(ep->flag_cache, fd, flags);
-
- return INT2NUM(rv);
-}
-
-/*
- * call-seq:
- * epoll.delete(io) -> io or nil
- *
- * Stops an +io+ object from being monitored. This is like Epoll#del
- * but returns +nil+ on ENOENT instead of raising an error. This is
- * useful for apps that do not care to track the status of an
- * epoll object itself.
- */
-static VALUE delete(VALUE self, VALUE io)
-{
- struct rb_epoll *ep = ep_get(self);
- int fd = rb_sp_fileno(io);
- int rv;
- VALUE cur_io;
-
- ep_check(ep);
- if (rb_sp_io_closed(io))
- goto out;
-
- cur_io = rb_ary_entry(ep->marks, fd);
- if (NIL_P(cur_io) || rb_sp_io_closed(cur_io))
- return Qnil;
-
- rv = epoll_ctl(ep->fd, EPOLL_CTL_DEL, fd, NULL);
- if (rv == -1) {
- /* beware of IO.for_fd-created descriptors */
- if (errno == ENOENT || errno == EBADF) {
- errno = 0;
- io = Qnil;
- } else {
- rb_sys_fail("epoll_ctl - del");
+ rv = epoll_ctl(epfd, op, fd, &event);
+ if (rv < 0) {
+ if (errno == ENOMEM) {
+ rb_gc();
+ rv = epoll_ctl(epfd, op, fd, &event);
}
+ if (rv < 0)
+ rb_sys_fail("epoll_ctl");
}
-out:
- rb_ary_store(ep->marks, fd, Qnil);
- rb_ary_store(ep->flag_cache, fd, Qnil);
- return io;
+ return Qnil;
}
static VALUE epwait_result(struct ep_per_thread *ept, int n)
@@ -358,7 +173,7 @@ static int epoll_resume_p(uint64_t expire_at, struct ep_per_thread *ept)
{
uint64_t now;
- ep_fd_check(ept->ep);
+ ep_fd_check(ept); /* may raise IOError */
if (errno != EINTR)
return 0;
@@ -373,8 +188,7 @@ static int epoll_resume_p(uint64_t expire_at, struct ep_per_thread *ept)
static VALUE nogvl_wait(void *args)
{
struct ep_per_thread *ept = args;
- int fd = ept->ep->fd;
- int n = epoll_wait(fd, ept->events, ept->maxevents, ept->timeout);
+ int n = epoll_wait(ept->fd, ept->events, ept->maxevents, ept->timeout);
return (VALUE)n;
}
@@ -385,7 +199,7 @@ static VALUE real_epwait(struct ep_per_thread *ept)
uint64_t expire_at = ept->timeout > 0 ? now_ms() + ept->timeout : 0;
do {
- n = (int)rb_sp_fd_region(nogvl_wait, ept, ept->ep->fd);
+ n = (int)rb_sp_fd_region(nogvl_wait, ept, ept->fd);
} while (n == -1 && epoll_resume_p(expire_at, ept));
return epwait_result(ept, n);
@@ -396,11 +210,11 @@ static VALUE real_epwait(struct ep_per_thread *ept)
/*
* call-seq:
- * epoll.wait([maxevents[, timeout]]) { |flags, io| ... }
+ * ep_io.epoll_wait([maxevents[, timeout]]) { |events, io| ... }
*
- * Calls epoll_wait(2) and yields Integer +flags+ and IO objects watched
+ * Calls epoll_wait(2) and yields Integer +events+ and IO objects watched
* for. +maxevents+ is the maximum number of events to process at once,
- * lower numbers may prevent starvation when used by Epoll#wait in multiple
+ * lower numbers may prevent starvation when used by epoll_wait in multiple
* threads. Larger +maxevents+ reduces syscall overhead for
* single-threaded applications. +maxevents+ defaults to 64 events.
* +timeout+ is specified in milliseconds, +nil+
@@ -409,259 +223,17 @@ static VALUE real_epwait(struct ep_per_thread *ept)
static VALUE epwait(int argc, VALUE *argv, VALUE self)
{
VALUE timeout, maxevents;
- struct rb_epoll *ep = ep_get(self);
struct ep_per_thread *ept;
- ep_check(ep);
rb_need_block();
rb_scan_args(argc, argv, "02", &maxevents, &timeout);
- ept = ept_get(NIL_P(maxevents) ? 64 : NUM2INT(maxevents));
+
+ ept = ept_get(self, NIL_P(maxevents) ? 64 : NUM2INT(maxevents));
ept->timeout = NIL_P(timeout) ? -1 : NUM2INT(timeout);
- ept->ep = ep;
return real_epwait(ept);
}
-/*
- * call-seq:
- * epoll.add(io, flags) -> 0
- *
- * Starts watching a given +io+ object with +flags+ which may be an Integer
- * bitmask or Array representing arrays to watch for. Consider Epoll#set
- * instead as it is easier to use.
- */
-static VALUE add(VALUE self, VALUE io, VALUE flags)
-{
- return ctl(self, io, flags, EPOLL_CTL_ADD);
-}
-
-/*
- * call-seq:
- * epoll.del(io) -> 0
- *
- * Disables an IO object from being watched. Consider Epoll#delete as
- * it is easier to use.
- */
-static VALUE del(VALUE self, VALUE io)
-{
- return ctl(self, io, INT2FIX(0), EPOLL_CTL_DEL);
-}
-
-/*
- * call-seq:
- * epoll.mod(io, flags) -> 0
- *
- * Changes the watch for an existing IO object based on +flags+.
- * Consider Epoll#set instead as it is easier to use.
- */
-static VALUE mod(VALUE self, VALUE io, VALUE flags)
-{
- return ctl(self, io, flags, EPOLL_CTL_MOD);
-}
-
-/*
- * call-seq:
- * epoll.to_io -> Epoll::IO object
- *
- * Used to expose the given Epoll object as an Epoll::IO object for IO.select
- * or IO#stat. This is unlikely to be useful directly, but is used internally
- * by IO.select.
- */
-static VALUE to_io(VALUE self)
-{
- struct rb_epoll *ep = ep_get(self);
-
- ep_check(ep);
-
- if (NIL_P(ep->io))
- ep->io = rb_funcall(cEpoll_IO, id_for_fd, 1, INT2NUM(ep->fd));
-
- return ep->io;
-}
-
-/*
- * call-seq:
- * epoll.close -> nil
- *
- * Closes an existing Epoll object and returns memory back to the kernel.
- * Raises IOError if object is already closed.
- */
-static VALUE epclose(VALUE self)
-{
- struct rb_epoll *ep = ep_get(self);
-
- if (ep->fd >= 0) {
- st_data_t key = ep->fd;
- st_delete(active, &key, NULL);
- }
-
- if (NIL_P(ep->io)) {
- ep_fd_check(ep);
-
- if (ep->fd == EP_RECREATE) {
- ep->fd = -1; /* success */
- } else {
- int err;
- int fd = ep->fd;
-
- ep->fd = -1;
- rb_thread_fd_close(fd);
- err = close(fd);
- if (err == -1)
- rb_sys_fail("close");
- }
- } else {
- ep->fd = -1;
- rb_io_close(ep->io);
- }
-
- return Qnil;
-}
-
-/*
- * call-seq:
- * epoll.closed? -> true or false
- *
- * Returns whether or not an Epoll object is closed.
- */
-static VALUE epclosed(VALUE self)
-{
- struct rb_epoll *ep = ep_get(self);
-
- return ep->fd == -1 ? Qtrue : Qfalse;
-}
-
-static int cloexec_dup(struct rb_epoll *ep)
-{
-#ifdef F_DUPFD_CLOEXEC
- int flags = ep->flags & EPOLL_CLOEXEC ? F_DUPFD_CLOEXEC : F_DUPFD;
- int fd = fcntl(ep->fd, flags, 0);
-#else /* potentially racy on GVL-free systems: */
- int fd = dup(ep->fd);
- if (fd >= 0)
- (void)fcntl(fd, F_SETFD, FD_CLOEXEC);
-#endif
- return fd;
-}
-
-/*
- * call-seq:
- * epoll.dup -> another Epoll object
- *
- * Duplicates an Epoll object and userspace buffers related to this library.
- * Since SleepyPenguin 3.1.0, this is no longer needed for multi-threaded
- * Epoll#wait.
- */
-static VALUE init_copy(VALUE copy, VALUE orig)
-{
- struct rb_epoll *a = ep_get(orig);
- struct rb_epoll *b = ep_get(copy);
-
- assert(NIL_P(b->io) && "Ruby broken?");
-
- ep_check(a);
- assert(NIL_P(b->marks) && "mark array not nil");
- assert(NIL_P(b->flag_cache) && "flag_cache not nil");
- b->marks = a->marks;
- b->flag_cache = a->flag_cache;
- assert(TYPE(b->marks) == T_ARRAY && "mark array not initialized");
- assert(TYPE(b->flag_cache) == T_ARRAY && "flag_cache not initialized");
- b->flags = a->flags;
- b->fd = cloexec_dup(a);
- if (b->fd == -1) {
- if (errno == ENFILE || errno == EMFILE) {
- rb_gc();
- b->fd = cloexec_dup(a);
- }
- if (b->fd == -1)
- rb_sys_fail("dup");
- }
- st_insert(active, (st_data_t)b->fd, (st_data_t)b);
-
- return copy;
-}
-
-/* occasionally it's still useful to lookup aliased IO objects
- * based on for debugging */
-static int my_fileno(VALUE obj)
-{
- if (T_FIXNUM == TYPE(obj))
- return FIX2INT(obj);
- return rb_sp_fileno(obj);
-}
-
-/*
- * call-seq:
- * epoll.io_for(io) -> object
- *
- * Returns the given IO object currently being watched for. Different
- * IO objects may internally refer to the same process file descriptor.
- * Mostly used for debugging.
- */
-static VALUE io_for(VALUE self, VALUE obj)
-{
- struct rb_epoll *ep = ep_get(self);
-
- return rb_ary_entry(ep->marks, my_fileno(obj));
-}
-
-/*
- * call-seq:
- * epoll.flags_for(io) -> Integer
- *
- * Returns the flags currently watched for in current Epoll object.
- * Mostly used for debugging.
- */
-static VALUE flags_for(VALUE self, VALUE obj)
-{
- struct rb_epoll *ep = ep_get(self);
-
- return rb_ary_entry(ep->flag_cache, my_fileno(obj));
-}
-
-/*
- * call-seq:
- * epoll.include?(io) => true or false
- *
- * Returns whether or not a given IO is watched and prevented from being
- * garbage-collected by the current Epoll object. This may include
- * closed IO objects.
- */
-static VALUE include_p(VALUE self, VALUE obj)
-{
- struct rb_epoll *ep = ep_get(self);
-
- return NIL_P(rb_ary_entry(ep->marks, my_fileno(obj))) ? Qfalse : Qtrue;
-}
-
-/*
- * we close (or lose to GC) epoll descriptors at fork to avoid leakage
- * and invalid objects being referenced later in the child
- */
-static int ep_atfork(st_data_t key, st_data_t value, void *ignored)
-{
- struct rb_epoll *ep = (struct rb_epoll *)value;
-
- if (NIL_P(ep->io)) {
- if (ep->fd >= 0)
- (void)close(ep->fd);
- } else {
- ep->io = Qnil; /* must let GC take care of it later :< */
- }
- ep->fd = EP_RECREATE;
-
- return ST_CONTINUE;
-}
-
-static void atfork_child(void)
-{
- st_table *old = active;
-
- active = st_init_numtable();
- st_foreach(old, ep_atfork, (st_data_t)NULL);
- st_free_table(old);
-}
-
static void epoll_once(void)
{
int err = pthread_key_create(&epoll_key, free);
@@ -670,19 +242,17 @@ static void epoll_once(void)
errno = err;
rb_sys_fail("pthread_key_create");
}
+}
- active = st_init_numtable();
-
- if (pthread_atfork(NULL, NULL, atfork_child) != 0) {
- rb_gc();
- if (pthread_atfork(NULL, NULL, atfork_child) != 0)
- rb_memerror();
- }
+/* :nodoc: */
+static VALUE event_flags(VALUE self, VALUE flags)
+{
+ return UINT2NUM(rb_sp_get_uflags(self, flags));
}
void sleepy_penguin_init_epoll(void)
{
- VALUE mSleepyPenguin, cEpoll;
+ VALUE mSleepyPenguin, cEpoll_IO;
static pthread_once_t once = PTHREAD_ONCE_INIT;
int err = pthread_once(&once, epoll_once);
@@ -708,6 +278,7 @@ void sleepy_penguin_init_epoll(void)
* And then access classes via:
*
* - SP::Epoll
+ * - SP::Epoll::IO
* - SP::EventFD
* - SP::Inotify
* - SP::TimerFD
@@ -717,36 +288,36 @@ void sleepy_penguin_init_epoll(void)
/*
* Document-class: SleepyPenguin::Epoll
*
- * The Epoll class provides access to epoll(7) functionality in the
- * Linux 2.6 kernel. It provides fork and GC-safety for Ruby
- * objects stored within the IO object and may be passed as an
- * argument to IO.select.
+ * The Epoll class provides high-level access to epoll(7)
+ * functionality in the Linux 2.6 and later kernels. It provides
+ * fork and GC-safety for Ruby objects stored within the IO object
+ * and may be passed as an argument to IO.select.
*/
cEpoll = rb_define_class_under(mSleepyPenguin, "Epoll", rb_cObject);
/*
* Document-class: SleepyPenguin::Epoll::IO
*
- * Epoll::IO is an internal class. Its only purpose is to be
- * compatible with IO.select and related methods and should
- * never be used directly, use Epoll instead.
+ * Epoll::IO is a low-level class. It does not provide fork nor
+ * GC-safety, so Ruby IO objects added via epoll_ctl must be retained
+ * by the application until IO#close is called.
*/
cEpoll_IO = rb_define_class_under(cEpoll, "IO", rb_cIO);
- rb_define_method(cEpoll, "initialize", init, -1);
- rb_define_method(cEpoll, "initialize_copy", init_copy, 1);
- rb_define_alloc_func(cEpoll, alloc);
- rb_define_method(cEpoll, "to_io", to_io, 0);
- rb_define_method(cEpoll, "close", epclose, 0);
- rb_define_method(cEpoll, "closed?", epclosed, 0);
- rb_define_method(cEpoll, "add", add, 2);
- rb_define_method(cEpoll, "mod", mod, 2);
- rb_define_method(cEpoll, "del", del, 1);
- rb_define_method(cEpoll, "delete", delete, 1);
- rb_define_method(cEpoll, "io_for", io_for, 1);
- rb_define_method(cEpoll, "flags_for", flags_for, 1);
- rb_define_method(cEpoll, "include?", include_p, 1);
- rb_define_method(cEpoll, "set", set, 2);
- rb_define_method(cEpoll, "wait", epwait, -1);
+ rb_define_singleton_method(cEpoll_IO, "new", s_new, 1);
+
+ rb_define_method(cEpoll_IO, "epoll_ctl", epctl, 3);
+ rb_define_method(cEpoll_IO, "epoll_wait", epwait, -1);
+
+ rb_define_method(cEpoll, "__event_flags", event_flags, 1);
+
+ /* registers an IO object via epoll_ctl */
+ rb_define_const(cEpoll, "CTL_ADD", INT2NUM(EPOLL_CTL_ADD));
+
+ /* unregisters an IO object via epoll_ctl */
+ rb_define_const(cEpoll, "CTL_DEL", INT2NUM(EPOLL_CTL_DEL));
+
+ /* modifies the registration of an IO object via epoll_ctl */
+ rb_define_const(cEpoll, "CTL_MOD", INT2NUM(EPOLL_CTL_MOD));
/* specifies whether close-on-exec flag is set for Epoll.new */
rb_define_const(cEpoll, "CLOEXEC", INT2NUM(EPOLL_CLOEXEC));
diff --git a/ext/sleepy_penguin/epoll_green.h b/ext/sleepy_penguin/epoll_green.h
index 276a545..e3414eb 100644
--- a/ext/sleepy_penguin/epoll_green.h
+++ b/ext/sleepy_penguin/epoll_green.h
@@ -26,9 +26,9 @@ static int safe_epoll_wait(struct ep_per_thread *ept)
do {
TRAP_BEG;
- n = epoll_wait(ept->ep->fd, ept->events, ept->maxevents, 0);
+ n = epoll_wait(ept->fd, ept->events, ept->maxevents, 0);
TRAP_END;
- } while (n == -1 && errno == EINTR && ep_fd_check(ept->ep));
+ } while (n == -1 && ep_fd_check(ept) && errno == EINTR);
return n;
}
@@ -38,7 +38,7 @@ static int epwait_forever(struct ep_per_thread *ept)
int n;
do {
- (void)rb_io_wait_readable(ept->ep->fd);
+ (void)rb_io_wait_readable(ept->fd);
n = safe_epoll_wait(ept);
} while (n == 0);
@@ -55,7 +55,7 @@ static int epwait_timed(struct ep_per_thread *ept)
for (;;) {
struct timeval t0, now, diff;
int n;
- int fd = ept->ep->fd;
+ int fd = ept->fd;
fd_set rfds;
FD_ZERO(&rfds);
diff --git a/lib/sleepy_penguin.rb b/lib/sleepy_penguin.rb
index 3a189b1..c13eb0c 100644
--- a/lib/sleepy_penguin.rb
+++ b/lib/sleepy_penguin.rb
@@ -5,3 +5,4 @@ module SleepyPenguin
SLEEPY_PENGUIN_VERSION = '3.1.0'
end
require 'sleepy_penguin_ext'
+require 'sleepy_penguin/epoll'
diff --git a/lib/sleepy_penguin/epoll.rb b/lib/sleepy_penguin/epoll.rb
new file mode 100644
index 0000000..845dcf0
--- /dev/null
+++ b/lib/sleepy_penguin/epoll.rb
@@ -0,0 +1,228 @@
+class SleepyPenguin::Epoll
+
+ # Epoll objects may be watched by IO.select and similar methods
+ attr_reader :to_io
+
+ # call-seq:
+ # SleepyPenguin::Epoll.new([flags]) -> Epoll object
+ #
+ # Creates a new Epoll object with an optional +flags+ argument.
+ # +flags+ may currently be +:CLOEXEC+ or +0+ (or +nil+).
+ def initialize(create_flags = nil)
+ @to_io = SleepyPenguin::Epoll::IO.new(create_flags)
+ @events = []
+ @marks = []
+ @pid = $$
+ @create_flags = create_flags
+ @copies = { @to_io => self }
+ end
+
+ def __ep_reinit # :nodoc:
+ @events.clear
+ @marks.clear
+ @to_io = SleepyPenguin::Epoll::IO.new(@create_flags)
+ end
+
+ # auto-reinitialize the Epoll object after forking
+ def __ep_check # :nodoc:
+ return if @pid == $$
+ return if @to_io.closed?
+ objects = @copies.values
+ @copies.each_key { |epio| epio.close }
+ @copies.clear
+ __ep_reinit
+ objects.each do |obj|
+ io_dup = @to_io.dup
+ @copies[io_dup] = obj
+ end
+ @pid = $$
+ end
+
+ # Calls epoll_wait(2) and yields Integer +events+ and IO objects watched
+ # for. +maxevents+ is the maximum number of events to process at once,
+ # lower numbers may prevent starvation when used by epoll_wait in multiple
+ # threads. Larger +maxevents+ reduces syscall overhead for
+ # single-threaded applications. +maxevents+ defaults to 64 events.
+ # +timeout+ is specified in milliseconds, +nil+
+ # (the default) meaning it will block and wait indefinitely.
+ def wait(maxevents = 64, timeout = nil)
+ __ep_check
+ @to_io.epoll_wait(maxevents, timeout) { |events, io| yield(events, io) }
+ end
+
+ # Starts watching a given +io+ object with +events+ which may be an Integer
+ # bitmask or Array representing arrays to watch for.
+ def add(io, events)
+ __ep_check
+ fd = io.to_io.fileno
+ events = __event_flags(events)
+ @to_io.epoll_ctl(CTL_ADD, io, events)
+ @marks[fd] = io
+ @events[fd] = events
+ 0
+ end
+
+ # call-seq:
+ # ep.del(io) -> 0
+ #
+ # Disables an IO object from being watched.
+ def del(io)
+ __ep_check
+ fd = io.to_io.fileno
+ rv = @to_io.epoll_ctl(CTL_DEL, io, 0)
+ @marks[fd] = @events[fd] = nil
+ rv
+ end
+
+ # call-seq:
+ # ep.delete(io) -> io or nil
+ #
+ # This method is deprecated and will be removed in sleepy_penguin 4.x
+ #
+ # Stops an +io+ object from being monitored. This is like Epoll#del
+ # but returns +nil+ on ENOENT instead of raising an error. This is
+ # useful for apps that do not care to track the status of an
+ # epoll object itself.
+ #
+ # This method is deprecated and will be removed in sleepy_penguin 4.x
+ def delete(io)
+ __ep_check
+ fd = io.to_io.fileno
+ cur_io = @marks[fd]
+ return if nil == cur_io || cur_io.to_io.closed?
+ @marks[fd] = @events[fd] = nil
+ @to_io.epoll_ctl(CTL_DEL, io, 0)
+ io
+ rescue Errno::ENOENT, Errno::EBADF
+ end
+
+ # call-seq:
+ # epoll.mod(io, flags) -> 0
+ #
+ # Changes the watch for an existing IO object based on +events+.
+ # Returns zero on success, will raise SystemError on failure.
+ def mod(io, events)
+ __ep_check
+ fd = io.to_io.fileno
+ events = __event_flags(events)
+ rv = @to_io.epoll_ctl(CTL_MOD, io, events)
+ @marks[fd] = io
+ @events[fd] = events
+ rv
+ end
+
+ # call-seq:
+ # ep.set(io, flags) -> 0
+ #
+ # This method is deprecated and will be removed in sleepy_penguin 4.x
+ #
+ # Used to avoid exceptions when your app is too lazy to check
+ # what state a descriptor is in, this sets the epoll descriptor
+ # to watch an +io+ with the given +events+
+ #
+ # +events+ may be an array of symbols or an unsigned Integer bit mask:
+ #
+ # - events = [ :IN, :ET ]
+ # - events = SleepyPenguin::Epoll::IN | SleepyPenguin::Epoll::ET
+ #
+ # See constants in Epoll for more information.
+ #
+ # This method is deprecated and will be removed in sleepy_penguin 4.x
+ def set(io, events)
+ __ep_check
+ fd = io.to_io.fileno
+ cur_io = @marks[fd]
+ if cur_io == io
+ cur_events = @events[fd]
+ return 0 if (cur_events & ONESHOT) == 0 && cur_events == events
+ begin
+ @to_io.epoll_ctl(CTL_MOD, io, events)
+ rescue Errno::ENOENT
+ warn "epoll flag cache failed (mod -> add)"
+ @to_io.epoll_ctl(CTL_ADD, io, events)
+ @marks[fd] = io
+ end
+ else
+ begin
+ @to_io.epoll_ctl(CTL_ADD, io, events)
+ rescue Errno::EEXIST
+ warn "epoll flag cache failed (add -> mod)"
+ @to_io.epoll_ctl(CTL_MOD, io, events)
+ end
+ @marks[fd] = io
+ end
+ @events[fd] = events
+ 0
+ end
+
+ # call-seq:
+ # ep.close -> nil
+ #
+ # Closes an existing Epoll object and returns memory back to the kernel.
+ # Raises IOError if object is already closed.
+ def close
+ __ep_check
+ @copies.delete(@to_io)
+ @to_io.close
+ end
+
+ # call-seq:
+ # ep.closed? -> true or false
+ #
+ # Returns whether or not an Epoll object is closed.
+ def closed?
+ __ep_check
+ @to_io.closed?
+ end
+
+ # we still support integer FDs for some debug functions
+ def __fileno(io) # :nodoc:
+ Integer === io ? io : io.to_io.fileno
+ end
+
+ # call-seq:
+ # ep.io_for(io) -> object
+ #
+ # Returns the given IO object currently being watched for. Different
+ # IO objects may internally refer to the same process file descriptor.
+ # Mostly used for debugging.
+ def io_for(io)
+ __ep_check
+ @marks[__fileno(io)]
+ end
+
+ # call-seq:
+ # epoll.events_for(io) -> Integer
+ #
+ # Returns the events currently watched for in current Epoll object.
+ # Mostly used for debugging.
+ def events_for(io)
+ __ep_check
+ @events[__fileno(io)]
+ end
+
+ # backwards compatibility, to be removed in 4.x
+ alias flags_for events_for
+
+ # call-seq:
+ # epoll.include?(io) -> true or false
+ #
+ # Returns whether or not a given IO is watched and prevented from being
+ # garbage-collected by the current Epoll object. This may include
+ # closed IO objects.
+ def include?(io)
+ __ep_check
+ @marks[__fileno(io)] ? true : nil
+ end
+
+ def initialize_copy(src) # :nodoc:
+ __ep_check
+ rv = super
+ unless @to_io.closed?
+ @to_io = @to_io.dup
+ @copies[@to_io] = self
+ end
+
+ rv
+ end
+end
diff --git a/test/test_epoll.rb b/test/test_epoll.rb
index cd50cff..1a99dfd 100644
--- a/test/test_epoll.rb
+++ b/test/test_epoll.rb
@@ -48,6 +48,19 @@ def test_fork_safe
assert_equal [[Epoll::IN, @rd]], tmp
end
+ def test_dup_and_fork
+ epdup = @ep.dup
+ @ep.close
+ assert ! epdup.closed?
+ pid = fork do
+ exit(!epdup.closed? && @ep.closed?)
+ end
+ _, status = Process.waitpid2(pid)
+ assert status.success?, status.inspect
+ ensure
+ epdup.close
+ end
+
def test_after_fork_usability
fork { @ep.add(@rd, Epoll::IN); exit!(0) }
fork { @ep.set(@rd, Epoll::IN); exit!(0) }
@@ -399,7 +412,7 @@ def test_flags_for_sym_ary
def test_include?
assert ! @ep.include?(@rd)
@ep.add @rd, Epoll::IN
- assert @ep.include?(@rd)
+ assert @ep.include?(@rd), @ep.instance_variable_get(:@marks).inspect
assert @ep.include?(@rd.fileno)
assert ! @ep.include?(@wr)
assert ! @ep.include?(@wr.fileno)
diff --git a/test/test_epoll_io.rb b/test/test_epoll_io.rb
new file mode 100644
index 0000000..8aca155
--- /dev/null
+++ b/test/test_epoll_io.rb
@@ -0,0 +1,24 @@
+require 'test/unit'
+require 'fcntl'
+require 'socket'
+require 'thread'
+$-w = true
+Thread.abort_on_exception = true
+require 'sleepy_penguin'
+
+class TestEpollIO < Test::Unit::TestCase
+ include SleepyPenguin
+ RBX = defined?(RUBY_ENGINE) && (RUBY_ENGINE == 'rbx')
+
+ def setup
+ @rd, @wr = IO.pipe
+ @epio = Epoll::IO.new(nil)
+ end
+
+ def test_add_wait
+ @epio.epoll_ctl(Epoll::CTL_ADD, @wr, Epoll::OUT)
+ ev = []
+ @epio.epoll_wait { |events, obj| ev << [ events, obj ] }
+ assert_equal([[Epoll::OUT, @wr]], ev)
+ end
+end
diff --git a/test/test_epoll_optimizations.rb b/test/test_epoll_optimizations.rb
index bd77397..f5970fd 100644
--- a/test/test_epoll_optimizations.rb
+++ b/test/test_epoll_optimizations.rb
@@ -28,7 +28,7 @@ def test_set
end
assert_nil err
lines = io.readlines; io.close
- assert_equal 1, lines.grep(/^epoll_ctl/).size
+ assert_equal 1, lines.grep(/^epoll_ctl/).size, lines.inspect
assert_match(/EPOLL_CTL_ADD/, lines.grep(/^epoll_ctl/)[0])
io, err = Strace.me { @ep.set(@wr, Epoll::OUT | Epoll::ONESHOT) }
--
Eric Wong
next prev parent reply other threads:[~2013-04-12 20:38 UTC|newest]
Thread overview: 9+ messages / expand[flat|nested] mbox.gz Atom feed top
2013-04-11 4:17 [sleepy.penguin] [PATCH 0/6] epoll wrapper cleanups Eric Wong
2013-04-11 4:17 ` [sleepy.penguin] [PATCH 1/6] test_epoll: fix timing error in test Eric Wong
2013-04-11 4:17 ` [sleepy.penguin] [PATCH 2/6] test_epoll: synchronize writes to the pipe array Eric Wong
2013-04-11 4:17 ` [sleepy.penguin] [PATCH 3/6] split Epoll and Epoll::IO, rewrite Epoll in Ruby Eric Wong
2013-04-12 20:38 ` Eric Wong [this message]
2013-04-11 4:17 ` [sleepy.penguin] [PATCH 4/6] epoll: implement thread-safety for mark/flag arrays Eric Wong
2013-04-12 21:18 ` Eric Wong
2013-04-11 4:17 ` [sleepy.penguin] [PATCH 5/6] epoll: cache alignment for per-thread structure Eric Wong
2013-04-11 4:17 ` [sleepy.penguin] [PATCH 6/6] avoid ENOMEM checking in common code paths Eric Wong
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
List information: https://yhbt.net/sleepy_penguin/
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20130412203820.GA17862@dcvr.yhbt.net \
--to=normalperson@yhbt.net \
--cc=sleepy.penguin@librelist.org \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
Code repositories for project(s) associated with this public inbox
https://yhbt.net/sleepy_penguin.git/
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).