sleepy_penguin RubyGem user+dev discussion/patches/pulls/bugs/help
 help / color / mirror / code / Atom feed
From: Eric Wong <normalperson@yhbt.net>
To: sleepy.penguin@librelist.org
Subject: Re: [sleepy.penguin] [PATCH 3/6] split Epoll and Epoll::IO, rewrite Epoll in Ruby
Date: Fri, 12 Apr 2013 20:38:20 +0000	[thread overview]
Message-ID: <20130412203820.GA17862@dcvr.yhbt.net> (raw)
In-Reply-To: 1365653855-1101-4-git-send-email-normalperson@yhbt.net

Eric Wong <normalperson@yhbt.net> wrote:
> --- a/ext/sleepy_penguin/epoll.c
> +++ b/ext/sleepy_penguin/epoll.c
> @@ -358,10 +168,11 @@ static int epoll_resume_p(uint64_t expire_at, struct ep_per_thread *ept)
>  {
>  	uint64_t now;
>  
> -	ep_fd_check(ept->ep);
> -
>  	if (errno != EINTR)
>  		return 0;
> +
> +	ep_fd_check(ept);
> +
>  	if (ept->timeout < 0)
>  		return 1;
>  	now = now_ms();

The above hunk was buggy and inconsistent with the green-thread-friendly
variant.  Below is an updated patch:
--------------------------------8<-------------------------------
Subject: [PATCH] split Epoll and Epoll::IO, rewrite Epoll in Ruby

Epoll::IO is a dangerous, low-level class which is intended
for users aware of the GC and fork behavior of epoll in the
Linux kernel.

Rewriting the higher-level Epoll in Ruby makes it easier to
maintain, especially since Rubinius has no GVL while running
C extensions.
---
 ext/sleepy_penguin/epoll.c       | 603 ++++++---------------------------------
 ext/sleepy_penguin/epoll_green.h |   8 +-
 lib/sleepy_penguin.rb            |   1 +
 lib/sleepy_penguin/epoll.rb      | 228 +++++++++++++++
 test/test_epoll.rb               |  15 +-
 test/test_epoll_io.rb            |  24 ++
 test/test_epoll_optimizations.rb |   2 +-
 7 files changed, 359 insertions(+), 522 deletions(-)
 create mode 100644 lib/sleepy_penguin/epoll.rb
 create mode 100644 test/test_epoll_io.rb

diff --git a/ext/sleepy_penguin/epoll.c b/ext/sleepy_penguin/epoll.c
index 3dcd357..2ddc71d 100644
--- a/ext/sleepy_penguin/epoll.c
+++ b/ext/sleepy_penguin/epoll.c
@@ -3,20 +3,12 @@
 #include <pthread.h>
 #include <time.h>
 #include "missing_epoll.h"
-#ifdef HAVE_RUBY_ST_H
-#  include <ruby/st.h>
-#else
-#  include <st.h>
-#endif
 #include "missing_rb_thread_fd_close.h"
 #include "missing_rb_update_max_fd.h"
-#define EP_RECREATE (-2)
 
 static pthread_key_t epoll_key;
-static st_table *active;
-static const int step = 64; /* unlikely to grow unless you're huge */
-static VALUE cEpoll_IO;
 static ID id_for_fd;
+static VALUE cEpoll;
 
 static uint64_t now_ms(void)
 {
@@ -47,27 +39,31 @@ static VALUE unpack_event_data(struct epoll_event *event)
 # endif
 #endif
 
-struct rb_epoll {
-	int fd;
-	VALUE io;
-	VALUE marks;
-	VALUE flag_cache;
-	int flags;
-};
-
 struct ep_per_thread {
-	struct rb_epoll *ep;
+	VALUE io;
+	int fd;
 	int timeout;
 	int maxevents;
 	int capa;
 	struct epoll_event events[FLEX_ARRAY];
 };
 
-static struct ep_per_thread *ept_get(int maxevents)
+/* this will raise if the IO is closed */
+static int ep_fd_check(struct ep_per_thread *ept)
+{
+	int save_errno = errno;
+
+	ept->fd = rb_sp_fileno(ept->io);
+	errno = save_errno;
+
+	return 1;
+}
+
+static struct ep_per_thread *ept_get(VALUE self, int maxevents)
 {
 	struct ep_per_thread *ept = pthread_getspecific(epoll_key);
-	int err;
 	size_t size;
+	int err;
 
 	if (ept && ept->capa >= maxevents)
 		goto out;
@@ -87,253 +83,72 @@ static struct ep_per_thread *ept_get(int maxevents)
 	ept->capa = maxevents;
 out:
 	ept->maxevents = maxevents;
+	ept->io = self;
+	ept->fd = rb_sp_fileno(ept->io);
 
 	return ept;
 }
 
-static struct rb_epoll *ep_get(VALUE self)
-{
-	struct rb_epoll *ep;
-
-	Data_Get_Struct(self, struct rb_epoll, ep);
-
-	return ep;
-}
-
-static void gcmark(void *ptr)
-{
-	struct rb_epoll *ep = ptr;
-
-	rb_gc_mark(ep->io);
-	rb_gc_mark(ep->marks);
-	rb_gc_mark(ep->flag_cache);
-}
-
-static void gcfree(void *ptr)
-{
-	struct rb_epoll *ep = ptr;
-
-	if (ep->fd >= 0) {
-		st_data_t key = ep->fd;
-		st_delete(active, &key, NULL);
-	}
-	if (NIL_P(ep->io) && ep->fd >= 0) {
-		/* can't raise during GC, and close() never fails in Linux */
-		(void)close(ep->fd);
-		errno = 0;
-	}
-	/* let GC take care of the underlying IO object if there is one */
-
-	xfree(ep);
-}
-
-static VALUE alloc(VALUE klass)
-{
-	struct rb_epoll *ep;
-	VALUE self;
-
-	self = Data_Make_Struct(klass, struct rb_epoll, gcmark, gcfree, ep);
-	ep->fd = -1;
-	ep->io = Qnil;
-	ep->marks = Qnil;
-	ep->flag_cache = Qnil;
-	ep->flags = 0;
-
-	return self;
-}
-
-static void my_epoll_create(struct rb_epoll *ep)
-{
-	ep->fd = epoll_create1(ep->flags);
-
-	if (ep->fd == -1) {
-		if (errno == EMFILE || errno == ENFILE || errno == ENOMEM) {
-			rb_gc();
-			ep->fd = epoll_create1(ep->flags);
-		}
-		if (ep->fd == -1)
-			rb_sys_fail("epoll_create1");
-	}
-	rb_update_max_fd(ep->fd);
-	st_insert(active, (st_data_t)ep->fd, (st_data_t)ep);
-	ep->marks = rb_ary_new();
-	ep->flag_cache = rb_ary_new();
-}
-
-static int ep_fd_check(struct rb_epoll *ep)
-{
-	if (ep->fd == -1)
-		rb_raise(rb_eIOError, "closed epoll descriptor");
-	return 1;
-}
-
-static void ep_check(struct rb_epoll *ep)
-{
-	if (ep->fd == EP_RECREATE)
-		my_epoll_create(ep);
-	ep_fd_check(ep);
-	assert(TYPE(ep->marks) == T_ARRAY && "marks not initialized");
-	assert(TYPE(ep->flag_cache) == T_ARRAY && "flag_cache not initialized");
-}
-
 /*
  * call-seq:
- *	SleepyPenguin::Epoll.new([flags])	-> Epoll object
+ *	SleepyPenguin::Epoll::IO.new(flags)	-> Epoll::IO object
  *
- * Creates a new Epoll object with an optional +flags+ argument.
- * +flags+ may currently be +:CLOEXEC+ or +0+ (or +nil+).
+ * Creates a new Epoll::IO object with the given +flags+ argument.
+ * +flags+ may currently be +CLOEXEC+ or +0+.
  */
-static VALUE init(int argc, VALUE *argv, VALUE self)
-{
-	struct rb_epoll *ep = ep_get(self);
-	VALUE fl;
-
-	rb_scan_args(argc, argv, "01", &fl);
-	ep->flags = rb_sp_get_flags(self, fl);
-	my_epoll_create(ep);
-
-	return self;
-}
-
-static VALUE ctl(VALUE self, VALUE io, VALUE flags, int op)
+static VALUE s_new(VALUE klass, VALUE _flags)
 {
-	struct epoll_event event;
-	struct rb_epoll *ep = ep_get(self);
-	int fd = rb_sp_fileno(io);
-	int rv;
+	int flags = rb_sp_get_flags(klass, _flags);
+	int fd = epoll_create1(flags);
+	VALUE rv;
 
-	ep_check(ep);
-	event.events = rb_sp_get_uflags(self, flags);
-	pack_event_data(&event, io);
-
-	rv = epoll_ctl(ep->fd, op, fd, &event);
-	if (rv == -1) {
-		if (errno == ENOMEM) {
+	if (fd < 0) {
+		if (errno == EMFILE || errno == ENFILE || errno == ENOMEM) {
 			rb_gc();
-			rv = epoll_ctl(ep->fd, op, fd, &event);
+			fd = epoll_create1(flags);
 		}
-		if (rv == -1)
-			rb_sys_fail("epoll_ctl");
-	}
-	switch (op) {
-	case EPOLL_CTL_ADD:
-		rb_ary_store(ep->marks, fd, io);
-		/* fall-through */
-	case EPOLL_CTL_MOD:
-		flags = UINT2NUM(event.events);
-		rb_ary_store(ep->flag_cache, fd, flags);
-		break;
-	case EPOLL_CTL_DEL:
-		rb_ary_store(ep->marks, fd, Qnil);
-		rb_ary_store(ep->flag_cache, fd, Qnil);
+		if (fd == -1)
+			rb_sys_fail("epoll_create1");
 	}
 
-	return INT2NUM(rv);
+	rv = INT2FIX(fd);
+	return rb_call_super(1, &rv);
 }
 
 /*
  * call-seq:
- *	ep.set(io, flags)	-> 0
+ * 	epoll_io.epoll_ctl(op, io, events)	-> nil
  *
- * Used to avoid exceptions when your app is too lazy to check
- * what state a descriptor is in, this sets the epoll descriptor
- * to watch an +io+ with the given +flags+
+ * Register, modify, or register a watch for a given +io+ for events.
  *
- * +flags+ may be an array of symbols or an unsigned Integer bit mask:
+ * +op+ may be one of +EPOLL_CTL_ADD+, +EPOLL_CTL_MOD+, or +EPOLL_CTL_DEL+
+ * +io+ is an IO object or one which proxies via the +to_io+ method.
+ * +events+ is an integer mask of events to watch for.
  *
- * - flags = [ :IN, :ET ]
- * - flags = SleepyPenguin::Epoll::IN | SleepyPenguin::Epoll::ET
- *
- * See constants in Epoll for more information.
+ * Returns nil on success.
  */
-static VALUE set(VALUE self, VALUE io, VALUE flags)
+static VALUE epctl(VALUE self, VALUE _op, VALUE io, VALUE events)
 {
 	struct epoll_event event;
-	struct rb_epoll *ep = ep_get(self);
+	int epfd = rb_sp_fileno(self);
 	int fd = rb_sp_fileno(io);
+	int op = NUM2INT(_op);
 	int rv;
-	VALUE cur_io = rb_ary_entry(ep->marks, fd);
 
-	ep_check(ep);
-	event.events = rb_sp_get_uflags(self, flags);
+	event.events = NUM2UINT(events);
 	pack_event_data(&event, io);
 
-	if (cur_io == io) {
-		VALUE cur_flags = rb_ary_entry(ep->flag_cache, fd);
-		uint32_t cur_events;
-
-		assert(!NIL_P(cur_flags) && "cur_flags nil but cur_io is not");
-		cur_events = NUM2UINT(cur_flags);
-
-		if (!(cur_events & EPOLLONESHOT) && cur_events == event.events)
-			return Qnil;
-
-fallback_mod:
-		rv = epoll_ctl(ep->fd, EPOLL_CTL_MOD, fd, &event);
-		if (rv == -1) {
-			if (errno != ENOENT)
-				rb_sys_fail("epoll_ctl - mod");
-			errno = 0;
-			rb_warn("epoll flag_cache failed (mod -> add)");
-			goto fallback_add;
-		}
-	} else {
-fallback_add:
-		rv = epoll_ctl(ep->fd, EPOLL_CTL_ADD, fd, &event);
-		if (rv == -1) {
-			if (errno != EEXIST)
-				rb_sys_fail("epoll_ctl - add");
-			errno = 0;
-			rb_warn("epoll flag_cache failed (add -> mod)");
-			goto fallback_mod;
-		}
-		rb_ary_store(ep->marks, fd, io);
-	}
-	flags = UINT2NUM(event.events);
-	rb_ary_store(ep->flag_cache, fd, flags);
-
-	return INT2NUM(rv);
-}
-
-/*
- * call-seq:
- *	epoll.delete(io) -> io or nil
- *
- * Stops an +io+ object from being monitored.  This is like Epoll#del
- * but returns +nil+ on ENOENT instead of raising an error.  This is
- * useful for apps that do not care to track the status of an
- * epoll object itself.
- */
-static VALUE delete(VALUE self, VALUE io)
-{
-	struct rb_epoll *ep = ep_get(self);
-	int fd = rb_sp_fileno(io);
-	int rv;
-	VALUE cur_io;
-
-	ep_check(ep);
-	if (rb_sp_io_closed(io))
-		goto out;
-
-	cur_io = rb_ary_entry(ep->marks, fd);
-	if (NIL_P(cur_io) || rb_sp_io_closed(cur_io))
-		return Qnil;
-
-	rv = epoll_ctl(ep->fd, EPOLL_CTL_DEL, fd, NULL);
-	if (rv == -1) {
-		/* beware of IO.for_fd-created descriptors */
-		if (errno == ENOENT || errno == EBADF) {
-			errno = 0;
-			io = Qnil;
-		} else {
-			rb_sys_fail("epoll_ctl - del");
+	rv = epoll_ctl(epfd, op, fd, &event);
+	if (rv < 0) {
+		if (errno == ENOMEM) {
+			rb_gc();
+			rv = epoll_ctl(epfd, op, fd, &event);
 		}
+		if (rv < 0)
+			rb_sys_fail("epoll_ctl");
 	}
-out:
-	rb_ary_store(ep->marks, fd, Qnil);
-	rb_ary_store(ep->flag_cache, fd, Qnil);
 
-	return io;
+	return Qnil;
 }
 
 static VALUE epwait_result(struct ep_per_thread *ept, int n)
@@ -358,7 +173,7 @@ static int epoll_resume_p(uint64_t expire_at, struct ep_per_thread *ept)
 {
 	uint64_t now;
 
-	ep_fd_check(ept->ep);
+	ep_fd_check(ept); /* may raise IOError */
 
 	if (errno != EINTR)
 		return 0;
@@ -373,8 +188,7 @@ static int epoll_resume_p(uint64_t expire_at, struct ep_per_thread *ept)
 static VALUE nogvl_wait(void *args)
 {
 	struct ep_per_thread *ept = args;
-	int fd = ept->ep->fd;
-	int n = epoll_wait(fd, ept->events, ept->maxevents, ept->timeout);
+	int n = epoll_wait(ept->fd, ept->events, ept->maxevents, ept->timeout);
 
 	return (VALUE)n;
 }
@@ -385,7 +199,7 @@ static VALUE real_epwait(struct ep_per_thread *ept)
 	uint64_t expire_at = ept->timeout > 0 ? now_ms() + ept->timeout : 0;
 
 	do {
-		n = (int)rb_sp_fd_region(nogvl_wait, ept, ept->ep->fd);
+		n = (int)rb_sp_fd_region(nogvl_wait, ept, ept->fd);
 	} while (n == -1 && epoll_resume_p(expire_at, ept));
 
 	return epwait_result(ept, n);
@@ -396,11 +210,11 @@ static VALUE real_epwait(struct ep_per_thread *ept)
 
 /*
  * call-seq:
- *	epoll.wait([maxevents[, timeout]]) { |flags, io| ... }
+ *	ep_io.epoll_wait([maxevents[, timeout]]) { |events, io| ... }
  *
- * Calls epoll_wait(2) and yields Integer +flags+ and IO objects watched
+ * Calls epoll_wait(2) and yields Integer +events+ and IO objects watched
  * for.  +maxevents+ is the maximum number of events to process at once,
- * lower numbers may prevent starvation when used by Epoll#wait in multiple
+ * lower numbers may prevent starvation when used by epoll_wait in multiple
  * threads.  Larger +maxevents+ reduces syscall overhead for
  * single-threaded applications. +maxevents+ defaults to 64 events.
  * +timeout+ is specified in milliseconds, +nil+
@@ -409,259 +223,17 @@ static VALUE real_epwait(struct ep_per_thread *ept)
 static VALUE epwait(int argc, VALUE *argv, VALUE self)
 {
 	VALUE timeout, maxevents;
-	struct rb_epoll *ep = ep_get(self);
 	struct ep_per_thread *ept;
 
-	ep_check(ep);
 	rb_need_block();
 	rb_scan_args(argc, argv, "02", &maxevents, &timeout);
-	ept = ept_get(NIL_P(maxevents) ? 64 : NUM2INT(maxevents));
+
+	ept = ept_get(self, NIL_P(maxevents) ? 64 : NUM2INT(maxevents));
 	ept->timeout = NIL_P(timeout) ? -1 : NUM2INT(timeout);
-	ept->ep = ep;
 
 	return real_epwait(ept);
 }
 
-/*
- * call-seq:
- *	epoll.add(io, flags)	->  0
- *
- * Starts watching a given +io+ object with +flags+ which may be an Integer
- * bitmask or Array representing arrays to watch for.  Consider Epoll#set
- * instead as it is easier to use.
- */
-static VALUE add(VALUE self, VALUE io, VALUE flags)
-{
-	return ctl(self, io, flags, EPOLL_CTL_ADD);
-}
-
-/*
- * call-seq:
- *	epoll.del(io)	-> 0
- *
- * Disables an IO object from being watched.  Consider Epoll#delete as
- * it is easier to use.
- */
-static VALUE del(VALUE self, VALUE io)
-{
-	return ctl(self, io, INT2FIX(0), EPOLL_CTL_DEL);
-}
-
-/*
- * call-seq:
- *	epoll.mod(io, flags)	-> 0
- *
- * Changes the watch for an existing IO object based on +flags+.
- * Consider Epoll#set instead as it is easier to use.
- */
-static VALUE mod(VALUE self, VALUE io, VALUE flags)
-{
-	return ctl(self, io, flags, EPOLL_CTL_MOD);
-}
-
-/*
- * call-seq:
- *	epoll.to_io	-> Epoll::IO object
- *
- * Used to expose the given Epoll object as an Epoll::IO object for IO.select
- * or IO#stat.  This is unlikely to be useful directly, but is used internally
- * by IO.select.
- */
-static VALUE to_io(VALUE self)
-{
-	struct rb_epoll *ep = ep_get(self);
-
-	ep_check(ep);
-
-	if (NIL_P(ep->io))
-		ep->io = rb_funcall(cEpoll_IO, id_for_fd, 1, INT2NUM(ep->fd));
-
-	return ep->io;
-}
-
-/*
- * call-seq:
- *	epoll.close	-> nil
- *
- * Closes an existing Epoll object and returns memory back to the kernel.
- * Raises IOError if object is already closed.
- */
-static VALUE epclose(VALUE self)
-{
-	struct rb_epoll *ep = ep_get(self);
-
-	if (ep->fd >= 0) {
-		st_data_t key = ep->fd;
-		st_delete(active, &key, NULL);
-	}
-
-	if (NIL_P(ep->io)) {
-		ep_fd_check(ep);
-
-		if (ep->fd == EP_RECREATE) {
-			ep->fd = -1; /* success */
-		} else {
-			int err;
-			int fd = ep->fd;
-
-			ep->fd = -1;
-			rb_thread_fd_close(fd);
-			err = close(fd);
-			if (err == -1)
-				rb_sys_fail("close");
-		}
-	} else {
-		ep->fd = -1;
-		rb_io_close(ep->io);
-	}
-
-	return Qnil;
-}
-
-/*
- * call-seq:
- *	epoll.closed?	-> true or false
- *
- * Returns whether or not an Epoll object is closed.
- */
-static VALUE epclosed(VALUE self)
-{
-	struct rb_epoll *ep = ep_get(self);
-
-	return ep->fd == -1 ? Qtrue : Qfalse;
-}
-
-static int cloexec_dup(struct rb_epoll *ep)
-{
-#ifdef F_DUPFD_CLOEXEC
-	int flags = ep->flags & EPOLL_CLOEXEC ? F_DUPFD_CLOEXEC : F_DUPFD;
-	int fd = fcntl(ep->fd, flags, 0);
-#else /* potentially racy on GVL-free systems: */
-	int fd = dup(ep->fd);
-	if (fd >= 0)
-		(void)fcntl(fd, F_SETFD, FD_CLOEXEC);
-#endif
-	return fd;
-}
-
-/*
- * call-seq:
- *	epoll.dup	-> another Epoll object
- *
- * Duplicates an Epoll object and userspace buffers related to this library.
- * Since SleepyPenguin 3.1.0, this is no longer needed for multi-threaded
- * Epoll#wait.
- */
-static VALUE init_copy(VALUE copy, VALUE orig)
-{
-	struct rb_epoll *a = ep_get(orig);
-	struct rb_epoll *b = ep_get(copy);
-
-	assert(NIL_P(b->io) && "Ruby broken?");
-
-	ep_check(a);
-	assert(NIL_P(b->marks) && "mark array not nil");
-	assert(NIL_P(b->flag_cache) && "flag_cache not nil");
-	b->marks = a->marks;
-	b->flag_cache = a->flag_cache;
-	assert(TYPE(b->marks) == T_ARRAY && "mark array not initialized");
-	assert(TYPE(b->flag_cache) == T_ARRAY && "flag_cache not initialized");
-	b->flags = a->flags;
-	b->fd = cloexec_dup(a);
-	if (b->fd == -1) {
-		if (errno == ENFILE || errno == EMFILE) {
-			rb_gc();
-			b->fd = cloexec_dup(a);
-		}
-		if (b->fd == -1)
-			rb_sys_fail("dup");
-	}
-	st_insert(active, (st_data_t)b->fd, (st_data_t)b);
-
-	return copy;
-}
-
-/* occasionally it's still useful to lookup aliased IO objects
- * based on for debugging */
-static int my_fileno(VALUE obj)
-{
-	if (T_FIXNUM == TYPE(obj))
-		return FIX2INT(obj);
-	return rb_sp_fileno(obj);
-}
-
-/*
- * call-seq:
- *	epoll.io_for(io)	-> object
- *
- * Returns the given IO object currently being watched for.  Different
- * IO objects may internally refer to the same process file descriptor.
- * Mostly used for debugging.
- */
-static VALUE io_for(VALUE self, VALUE obj)
-{
-	struct rb_epoll *ep = ep_get(self);
-
-	return rb_ary_entry(ep->marks, my_fileno(obj));
-}
-
-/*
- * call-seq:
- *	epoll.flags_for(io)	-> Integer
- *
- * Returns the flags currently watched for in current Epoll object.
- * Mostly used for debugging.
- */
-static VALUE flags_for(VALUE self, VALUE obj)
-{
-	struct rb_epoll *ep = ep_get(self);
-
-	return rb_ary_entry(ep->flag_cache, my_fileno(obj));
-}
-
-/*
- * call-seq:
- *	epoll.include?(io) => true or false
- *
- * Returns whether or not a given IO is watched and prevented from being
- * garbage-collected by the current Epoll object.  This may include
- * closed IO objects.
- */
-static VALUE include_p(VALUE self, VALUE obj)
-{
-	struct rb_epoll *ep = ep_get(self);
-
-	return NIL_P(rb_ary_entry(ep->marks, my_fileno(obj))) ? Qfalse : Qtrue;
-}
-
-/*
- * we close (or lose to GC) epoll descriptors at fork to avoid leakage
- * and invalid objects being referenced later in the child
- */
-static int ep_atfork(st_data_t key, st_data_t value, void *ignored)
-{
-	struct rb_epoll *ep = (struct rb_epoll *)value;
-
-	if (NIL_P(ep->io)) {
-		if (ep->fd >= 0)
-			(void)close(ep->fd);
-	} else {
-		ep->io = Qnil; /* must let GC take care of it later :< */
-	}
-	ep->fd = EP_RECREATE;
-
-	return ST_CONTINUE;
-}
-
-static void atfork_child(void)
-{
-	st_table *old = active;
-
-	active = st_init_numtable();
-	st_foreach(old, ep_atfork, (st_data_t)NULL);
-	st_free_table(old);
-}
-
 static void epoll_once(void)
 {
 	int err = pthread_key_create(&epoll_key, free);
@@ -670,19 +242,17 @@ static void epoll_once(void)
 		errno = err;
 		rb_sys_fail("pthread_key_create");
 	}
+}
 
-	active = st_init_numtable();
-
-	if (pthread_atfork(NULL, NULL, atfork_child) != 0) {
-		rb_gc();
-		if (pthread_atfork(NULL, NULL, atfork_child) != 0)
-			rb_memerror();
-	}
+/* :nodoc: */
+static VALUE event_flags(VALUE self, VALUE flags)
+{
+	return UINT2NUM(rb_sp_get_uflags(self, flags));
 }
 
 void sleepy_penguin_init_epoll(void)
 {
-	VALUE mSleepyPenguin, cEpoll;
+	VALUE mSleepyPenguin, cEpoll_IO;
 	static pthread_once_t once = PTHREAD_ONCE_INIT;
 	int err = pthread_once(&once, epoll_once);
 
@@ -708,6 +278,7 @@ void sleepy_penguin_init_epoll(void)
 	 * And then access classes via:
 	 *
 	 * - SP::Epoll
+	 * - SP::Epoll::IO
 	 * - SP::EventFD
 	 * - SP::Inotify
 	 * - SP::TimerFD
@@ -717,36 +288,36 @@ void sleepy_penguin_init_epoll(void)
 	/*
 	 * Document-class: SleepyPenguin::Epoll
 	 *
-	 * The Epoll class provides access to epoll(7) functionality in the
-	 * Linux 2.6 kernel.  It provides fork and GC-safety for Ruby
-	 * objects stored within the IO object and may be passed as an
-	 * argument to IO.select.
+	 * The Epoll class provides high-level access to epoll(7)
+	 * functionality in the Linux 2.6 and later kernels.  It provides
+	 * fork and GC-safety for Ruby objects stored within the IO object
+	 * and may be passed as an argument to IO.select.
 	 */
 	cEpoll = rb_define_class_under(mSleepyPenguin, "Epoll", rb_cObject);
 
 	/*
 	 * Document-class: SleepyPenguin::Epoll::IO
 	 *
-	 * Epoll::IO is an internal class.  Its only purpose is to be
-	 * compatible with IO.select and related methods and should
-	 * never be used directly, use Epoll instead.
+	 * Epoll::IO is a low-level class.  It does not provide fork nor
+	 * GC-safety, so Ruby IO objects added via epoll_ctl must be retained
+	 * by the application until IO#close is called.
 	 */
 	cEpoll_IO = rb_define_class_under(cEpoll, "IO", rb_cIO);
-	rb_define_method(cEpoll, "initialize", init, -1);
-	rb_define_method(cEpoll, "initialize_copy", init_copy, 1);
-	rb_define_alloc_func(cEpoll, alloc);
-	rb_define_method(cEpoll, "to_io", to_io, 0);
-	rb_define_method(cEpoll, "close", epclose, 0);
-	rb_define_method(cEpoll, "closed?", epclosed, 0);
-	rb_define_method(cEpoll, "add", add, 2);
-	rb_define_method(cEpoll, "mod", mod, 2);
-	rb_define_method(cEpoll, "del", del, 1);
-	rb_define_method(cEpoll, "delete", delete, 1);
-	rb_define_method(cEpoll, "io_for", io_for, 1);
-	rb_define_method(cEpoll, "flags_for", flags_for, 1);
-	rb_define_method(cEpoll, "include?", include_p, 1);
-	rb_define_method(cEpoll, "set", set, 2);
-	rb_define_method(cEpoll, "wait", epwait, -1);
+	rb_define_singleton_method(cEpoll_IO, "new", s_new, 1);
+
+	rb_define_method(cEpoll_IO, "epoll_ctl", epctl, 3);
+	rb_define_method(cEpoll_IO, "epoll_wait", epwait, -1);
+
+	rb_define_method(cEpoll, "__event_flags", event_flags, 1);
+
+	/* registers an IO object via epoll_ctl */
+	rb_define_const(cEpoll, "CTL_ADD", INT2NUM(EPOLL_CTL_ADD));
+
+	/* unregisters an IO object via epoll_ctl */
+	rb_define_const(cEpoll, "CTL_DEL", INT2NUM(EPOLL_CTL_DEL));
+
+	/* modifies the registration of an IO object via epoll_ctl */
+	rb_define_const(cEpoll, "CTL_MOD", INT2NUM(EPOLL_CTL_MOD));
 
 	/* specifies whether close-on-exec flag is set for Epoll.new */
 	rb_define_const(cEpoll, "CLOEXEC", INT2NUM(EPOLL_CLOEXEC));
diff --git a/ext/sleepy_penguin/epoll_green.h b/ext/sleepy_penguin/epoll_green.h
index 276a545..e3414eb 100644
--- a/ext/sleepy_penguin/epoll_green.h
+++ b/ext/sleepy_penguin/epoll_green.h
@@ -26,9 +26,9 @@ static int safe_epoll_wait(struct ep_per_thread *ept)
 
 	do {
 		TRAP_BEG;
-		n = epoll_wait(ept->ep->fd, ept->events, ept->maxevents, 0);
+		n = epoll_wait(ept->fd, ept->events, ept->maxevents, 0);
 		TRAP_END;
-	} while (n == -1 && errno == EINTR && ep_fd_check(ept->ep));
+	} while (n == -1 && ep_fd_check(ept) && errno == EINTR);
 
 	return n;
 }
@@ -38,7 +38,7 @@ static int epwait_forever(struct ep_per_thread *ept)
 	int n;
 
 	do {
-		(void)rb_io_wait_readable(ept->ep->fd);
+		(void)rb_io_wait_readable(ept->fd);
 		n = safe_epoll_wait(ept);
 	} while (n == 0);
 
@@ -55,7 +55,7 @@ static int epwait_timed(struct ep_per_thread *ept)
 	for (;;) {
 		struct timeval t0, now, diff;
 		int n;
-		int fd = ept->ep->fd;
+		int fd = ept->fd;
 		fd_set rfds;
 
 		FD_ZERO(&rfds);
diff --git a/lib/sleepy_penguin.rb b/lib/sleepy_penguin.rb
index 3a189b1..c13eb0c 100644
--- a/lib/sleepy_penguin.rb
+++ b/lib/sleepy_penguin.rb
@@ -5,3 +5,4 @@ module SleepyPenguin
   SLEEPY_PENGUIN_VERSION = '3.1.0'
 end
 require 'sleepy_penguin_ext'
+require 'sleepy_penguin/epoll'
diff --git a/lib/sleepy_penguin/epoll.rb b/lib/sleepy_penguin/epoll.rb
new file mode 100644
index 0000000..845dcf0
--- /dev/null
+++ b/lib/sleepy_penguin/epoll.rb
@@ -0,0 +1,228 @@
+class SleepyPenguin::Epoll
+
+  # Epoll objects may be watched by IO.select and similar methods
+  attr_reader :to_io
+
+  # call-seq:
+  #     SleepyPenguin::Epoll.new([flags]) -> Epoll object
+  #
+  # Creates a new Epoll object with an optional +flags+ argument.
+  # +flags+ may currently be +:CLOEXEC+ or +0+ (or +nil+).
+  def initialize(create_flags = nil)
+    @to_io = SleepyPenguin::Epoll::IO.new(create_flags)
+    @events = []
+    @marks = []
+    @pid = $$
+    @create_flags = create_flags
+    @copies = { @to_io => self }
+  end
+
+  def __ep_reinit # :nodoc:
+    @events.clear
+    @marks.clear
+    @to_io = SleepyPenguin::Epoll::IO.new(@create_flags)
+  end
+
+  # auto-reinitialize the Epoll object after forking
+  def __ep_check # :nodoc:
+    return if @pid == $$
+    return if @to_io.closed?
+    objects = @copies.values
+    @copies.each_key { |epio| epio.close }
+    @copies.clear
+    __ep_reinit
+    objects.each do |obj|
+      io_dup = @to_io.dup
+      @copies[io_dup] = obj
+    end
+    @pid = $$
+  end
+
+  # Calls epoll_wait(2) and yields Integer +events+ and IO objects watched
+  # for.  +maxevents+ is the maximum number of events to process at once,
+  # lower numbers may prevent starvation when used by epoll_wait in multiple
+  # threads.  Larger +maxevents+ reduces syscall overhead for
+  # single-threaded applications. +maxevents+ defaults to 64 events.
+  # +timeout+ is specified in milliseconds, +nil+
+  # (the default) meaning it will block and wait indefinitely.
+  def wait(maxevents = 64, timeout = nil)
+    __ep_check
+    @to_io.epoll_wait(maxevents, timeout) { |events, io| yield(events, io) }
+  end
+
+  # Starts watching a given +io+ object with +events+ which may be an Integer
+  # bitmask or Array representing arrays to watch for.
+  def add(io, events)
+    __ep_check
+    fd = io.to_io.fileno
+    events = __event_flags(events)
+    @to_io.epoll_ctl(CTL_ADD, io, events)
+    @marks[fd] = io
+    @events[fd] = events
+    0
+  end
+
+  # call-seq:
+  #     ep.del(io) -> 0
+  #
+  # Disables an IO object from being watched.
+  def del(io)
+    __ep_check
+    fd = io.to_io.fileno
+    rv = @to_io.epoll_ctl(CTL_DEL, io, 0)
+    @marks[fd] = @events[fd] = nil
+    rv
+  end
+
+  # call-seq:
+  #     ep.delete(io) -> io or nil
+  #
+  # This method is deprecated and will be removed in sleepy_penguin 4.x
+  #
+  # Stops an +io+ object from being monitored.  This is like Epoll#del
+  # but returns +nil+ on ENOENT instead of raising an error.  This is
+  # useful for apps that do not care to track the status of an
+  # epoll object itself.
+  #
+  # This method is deprecated and will be removed in sleepy_penguin 4.x
+  def delete(io)
+    __ep_check
+    fd = io.to_io.fileno
+    cur_io = @marks[fd]
+    return if nil == cur_io || cur_io.to_io.closed?
+    @marks[fd] = @events[fd] = nil
+    @to_io.epoll_ctl(CTL_DEL, io, 0)
+    io
+  rescue Errno::ENOENT, Errno::EBADF
+  end
+
+  # call-seq:
+  #     epoll.mod(io, flags) -> 0
+  #
+  # Changes the watch for an existing IO object based on +events+.
+  # Returns zero on success, will raise SystemError on failure.
+  def mod(io, events)
+    __ep_check
+    fd = io.to_io.fileno
+    events = __event_flags(events)
+    rv = @to_io.epoll_ctl(CTL_MOD, io, events)
+    @marks[fd] = io
+    @events[fd] = events
+    rv
+  end
+
+  # call-seq:
+  #     ep.set(io, flags) -> 0
+  #
+  # This method is deprecated and will be removed in sleepy_penguin 4.x
+  #
+  # Used to avoid exceptions when your app is too lazy to check
+  # what state a descriptor is in, this sets the epoll descriptor
+  # to watch an +io+ with the given +events+
+  #
+  # +events+ may be an array of symbols or an unsigned Integer bit mask:
+  #
+  # - events = [ :IN, :ET ]
+  # - events = SleepyPenguin::Epoll::IN | SleepyPenguin::Epoll::ET
+  #
+  # See constants in Epoll for more information.
+  #
+  # This method is deprecated and will be removed in sleepy_penguin 4.x
+  def set(io, events)
+    __ep_check
+    fd = io.to_io.fileno
+    cur_io = @marks[fd]
+    if cur_io == io
+      cur_events = @events[fd]
+      return 0 if (cur_events & ONESHOT) == 0 && cur_events == events
+      begin
+        @to_io.epoll_ctl(CTL_MOD, io, events)
+      rescue Errno::ENOENT
+        warn "epoll flag cache failed (mod -> add)"
+        @to_io.epoll_ctl(CTL_ADD, io, events)
+        @marks[fd] = io
+      end
+    else
+      begin
+        @to_io.epoll_ctl(CTL_ADD, io, events)
+      rescue Errno::EEXIST
+        warn "epoll flag cache failed (add -> mod)"
+        @to_io.epoll_ctl(CTL_MOD, io, events)
+      end
+      @marks[fd] = io
+    end
+    @events[fd] = events
+    0
+  end
+
+  # call-seq:
+  #     ep.close -> nil
+  #
+  # Closes an existing Epoll object and returns memory back to the kernel.
+  # Raises IOError if object is already closed.
+  def close
+    __ep_check
+    @copies.delete(@to_io)
+    @to_io.close
+  end
+
+  # call-seq:
+  #     ep.closed? -> true or false
+  #
+  # Returns whether or not an Epoll object is closed.
+  def closed?
+    __ep_check
+    @to_io.closed?
+  end
+
+  # we still support integer FDs for some debug functions
+  def __fileno(io) # :nodoc:
+    Integer === io ? io : io.to_io.fileno
+  end
+
+  # call-seq:
+  #     ep.io_for(io) -> object
+  #
+  # Returns the given IO object currently being watched for.  Different
+  # IO objects may internally refer to the same process file descriptor.
+  # Mostly used for debugging.
+  def io_for(io)
+    __ep_check
+    @marks[__fileno(io)]
+  end
+
+  # call-seq:
+  #     epoll.events_for(io) -> Integer
+  #
+  # Returns the events currently watched for in current Epoll object.
+  # Mostly used for debugging.
+  def events_for(io)
+    __ep_check
+    @events[__fileno(io)]
+  end
+
+  # backwards compatibility, to be removed in 4.x
+  alias flags_for events_for
+
+  # call-seq:
+  #     epoll.include?(io) -> true or false
+  #
+  # Returns whether or not a given IO is watched and prevented from being
+  # garbage-collected by the current Epoll object.  This may include
+  # closed IO objects.
+  def include?(io)
+    __ep_check
+    @marks[__fileno(io)] ? true : nil
+  end
+
+  def initialize_copy(src) # :nodoc:
+    __ep_check
+    rv = super
+    unless @to_io.closed?
+      @to_io = @to_io.dup
+      @copies[@to_io] = self
+    end
+
+    rv
+  end
+end
diff --git a/test/test_epoll.rb b/test/test_epoll.rb
index cd50cff..1a99dfd 100644
--- a/test/test_epoll.rb
+++ b/test/test_epoll.rb
@@ -48,6 +48,19 @@ def test_fork_safe
     assert_equal [[Epoll::IN, @rd]], tmp
   end
 
+  def test_dup_and_fork
+    epdup = @ep.dup
+    @ep.close
+    assert ! epdup.closed?
+    pid = fork do
+      exit(!epdup.closed? && @ep.closed?)
+    end
+    _, status = Process.waitpid2(pid)
+    assert status.success?, status.inspect
+  ensure
+    epdup.close
+  end
+
   def test_after_fork_usability
     fork { @ep.add(@rd, Epoll::IN); exit!(0) }
     fork { @ep.set(@rd, Epoll::IN); exit!(0) }
@@ -399,7 +412,7 @@ def test_flags_for_sym_ary
   def test_include?
     assert ! @ep.include?(@rd)
     @ep.add @rd, Epoll::IN
-    assert @ep.include?(@rd)
+    assert @ep.include?(@rd), @ep.instance_variable_get(:@marks).inspect
     assert @ep.include?(@rd.fileno)
     assert ! @ep.include?(@wr)
     assert ! @ep.include?(@wr.fileno)
diff --git a/test/test_epoll_io.rb b/test/test_epoll_io.rb
new file mode 100644
index 0000000..8aca155
--- /dev/null
+++ b/test/test_epoll_io.rb
@@ -0,0 +1,24 @@
+require 'test/unit'
+require 'fcntl'
+require 'socket'
+require 'thread'
+$-w = true
+Thread.abort_on_exception = true
+require 'sleepy_penguin'
+
+class TestEpollIO < Test::Unit::TestCase
+  include SleepyPenguin
+  RBX = defined?(RUBY_ENGINE) && (RUBY_ENGINE == 'rbx')
+
+  def setup
+    @rd, @wr = IO.pipe
+    @epio = Epoll::IO.new(nil)
+  end
+
+  def test_add_wait
+    @epio.epoll_ctl(Epoll::CTL_ADD, @wr, Epoll::OUT)
+    ev = []
+    @epio.epoll_wait { |events, obj| ev << [ events, obj ] }
+    assert_equal([[Epoll::OUT, @wr]], ev)
+  end
+end
diff --git a/test/test_epoll_optimizations.rb b/test/test_epoll_optimizations.rb
index bd77397..f5970fd 100644
--- a/test/test_epoll_optimizations.rb
+++ b/test/test_epoll_optimizations.rb
@@ -28,7 +28,7 @@ def test_set
     end
     assert_nil err
     lines = io.readlines; io.close
-    assert_equal 1, lines.grep(/^epoll_ctl/).size
+    assert_equal 1, lines.grep(/^epoll_ctl/).size, lines.inspect
     assert_match(/EPOLL_CTL_ADD/, lines.grep(/^epoll_ctl/)[0])
 
     io, err = Strace.me { @ep.set(@wr, Epoll::OUT | Epoll::ONESHOT) }
-- 
Eric Wong


  reply	other threads:[~2013-04-12 20:38 UTC|newest]

Thread overview: 9+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2013-04-11  4:17 [sleepy.penguin] [PATCH 0/6] epoll wrapper cleanups Eric Wong
2013-04-11  4:17 ` [sleepy.penguin] [PATCH 1/6] test_epoll: fix timing error in test Eric Wong
2013-04-11  4:17 ` [sleepy.penguin] [PATCH 2/6] test_epoll: synchronize writes to the pipe array Eric Wong
2013-04-11  4:17 ` [sleepy.penguin] [PATCH 3/6] split Epoll and Epoll::IO, rewrite Epoll in Ruby Eric Wong
2013-04-12 20:38   ` Eric Wong [this message]
2013-04-11  4:17 ` [sleepy.penguin] [PATCH 4/6] epoll: implement thread-safety for mark/flag arrays Eric Wong
2013-04-12 21:18   ` Eric Wong
2013-04-11  4:17 ` [sleepy.penguin] [PATCH 5/6] epoll: cache alignment for per-thread structure Eric Wong
2013-04-11  4:17 ` [sleepy.penguin] [PATCH 6/6] avoid ENOMEM checking in common code paths Eric Wong

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

  List information: https://yhbt.net/sleepy_penguin/

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20130412203820.GA17862@dcvr.yhbt.net \
    --to=normalperson@yhbt.net \
    --cc=sleepy.penguin@librelist.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
Code repositories for project(s) associated with this public inbox

	https://yhbt.net/sleepy_penguin.git/

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).