sleepy_penguin RubyGem user+dev discussion/patches/pulls/bugs/help
 help / color / mirror / code / Atom feed
From: Eric Wong <e@80x24.org>
To: sleepy-penguin@bogomips.org
Subject: [PATCH] allow nestable TLS buffers within the same thread
Date: Thu, 16 Mar 2017 19:45:57 +0000	[thread overview]
Message-ID: <20170316194557.11415-1-e@80x24.org> (raw)

Users may wish to use our epoll or kqueue interfaces within
their own app running on a web server or some such.

This prevents users from missing events at an increased
allocation cost.
---
 ext/sleepy_penguin/epoll.c          | 14 ++++---
 ext/sleepy_penguin/init.c           | 75 +++++++++++++++++++++++++++------
 ext/sleepy_penguin/inotify.c        | 82 ++++++++++++++++++++++---------------
 ext/sleepy_penguin/kqueue.c         | 24 +++++++----
 ext/sleepy_penguin/sleepy_penguin.h |  1 +
 test/test_epoll.rb                  | 26 ++++++++++++
 test/test_kqueue.rb                 | 32 +++++++++++++++
 7 files changed, 194 insertions(+), 60 deletions(-)

diff --git a/ext/sleepy_penguin/epoll.c b/ext/sleepy_penguin/epoll.c
index e655bf9..512e11c 100644
--- a/ext/sleepy_penguin/epoll.c
+++ b/ext/sleepy_penguin/epoll.c
@@ -49,7 +49,7 @@ static int ep_fd_check(struct ep_per_thread *ept)
 	return 1;
 }
 
-static struct ep_per_thread *ept_get(VALUE self, int maxevents)
+static struct ep_per_thread *ept_get(int maxevents)
 {
 	struct ep_per_thread *ept;
 	size_t size;
@@ -66,8 +66,6 @@ static struct ep_per_thread *ept_get(VALUE self, int maxevents)
 	ept = rb_sp_gettlsbuf(&size);
 	ept->capa = maxevents;
 	ept->maxevents = maxevents;
-	ept->io = self;
-	ept->fd = rb_sp_fileno(ept->io);
 
 	return ept;
 }
@@ -177,6 +175,7 @@ static VALUE real_epwait(struct ep_per_thread *ept)
 	long n;
 	uint64_t expire_at = ept->timeout > 0 ? now_ms() + ept->timeout : 0;
 
+	ept->fd = rb_sp_fileno(ept->io);
 	do {
 		n = (long)rb_sp_fd_region(nogvl_wait, ept, ept->fd);
 	} while (n < 0 && epoll_resume_p(expire_at, ept));
@@ -200,14 +199,17 @@ static VALUE epwait(int argc, VALUE *argv, VALUE self)
 {
 	VALUE timeout, maxevents;
 	struct ep_per_thread *ept;
+	int t;
 
 	rb_need_block();
 	rb_scan_args(argc, argv, "02", &maxevents, &timeout);
+	t = NIL_P(timeout) ? -1 : NUM2INT(timeout);
 
-	ept = ept_get(self, NIL_P(maxevents) ? 64 : NUM2INT(maxevents));
-	ept->timeout = NIL_P(timeout) ? -1 : NUM2INT(timeout);
+	ept = ept_get(NIL_P(maxevents) ? 64 : NUM2INT(maxevents));
+	ept->timeout = t;
+	ept->io = self;
 
-	return real_epwait(ept);
+	return rb_ensure(real_epwait, (VALUE)ept, rb_sp_puttlsbuf, (VALUE)ept);
 }
 
 /* :nodoc: */
diff --git a/ext/sleepy_penguin/init.c b/ext/sleepy_penguin/init.c
index f9671eb..27aada4 100644
--- a/ext/sleepy_penguin/init.c
+++ b/ext/sleepy_penguin/init.c
@@ -11,8 +11,15 @@
 #define L1_CACHE_LINE_MAX 128 /* largest I've seen (Pentium 4) */
 size_t rb_sp_l1_cache_line_size;
 static pthread_key_t rb_sp_key;
+enum rb_sp_tls_buf_type {
+	RB_SP_TLS_INUSE = -1,
+	RB_SP_TLS_READY = 0,
+	RB_SP_TLS_MALLOCED = 1
+};
+
 struct rb_sp_tlsbuf {
-	size_t capa;
+	uint32_t capa;
+	enum rb_sp_tls_buf_type buf_type;
 	unsigned char ptr[FLEX_ARRAY];
 };
 
@@ -89,12 +96,36 @@ static void sp_once(void)
 	}
 }
 
+static struct rb_sp_tlsbuf *alloc_tlsbuf(size_t size)
+{
+	size_t bytes = size + sizeof(struct rb_sp_tlsbuf);
+	struct rb_sp_tlsbuf *buf;
+	void *ptr;
+	int err = posix_memalign(&ptr, rb_sp_l1_cache_line_size, bytes);
+
+	if (err) {
+		errno = err;
+		rb_memerror(); /* fatal */
+	}
+
+	buf = ptr;
+	buf->capa = size;
+
+	return buf;
+}
+
 void *rb_sp_gettlsbuf(size_t *size)
 {
 	struct rb_sp_tlsbuf *buf = pthread_getspecific(rb_sp_key);
-	void *ptr;
 	int err;
-	size_t bytes;
+
+	assert(buf ? buf->buf_type != RB_SP_TLS_MALLOCED : 1);
+
+	if (buf && buf->buf_type != RB_SP_TLS_READY) {
+		buf = alloc_tlsbuf(*size);
+		buf->buf_type = RB_SP_TLS_MALLOCED;
+		return buf->ptr;
+	}
 
 	if (buf && buf->capa >= *size) {
 		*size = buf->capa;
@@ -102,15 +133,7 @@ void *rb_sp_gettlsbuf(size_t *size)
 	}
 
 	free(buf);
-	bytes = *size + sizeof(struct rb_sp_tlsbuf);
-	err = posix_memalign(&ptr, rb_sp_l1_cache_line_size, bytes);
-	if (err) {
-		errno = err;
-		rb_memerror(); /* fatal */
-	}
-
-	buf = ptr;
-	buf->capa = *size;
+	buf = alloc_tlsbuf(*size);
 	err = pthread_setspecific(rb_sp_key, buf);
 	if (err != 0) {
 		free(buf);
@@ -118,9 +141,37 @@ void *rb_sp_gettlsbuf(size_t *size)
 		rb_sys_fail("BUG: pthread_setspecific");
 	}
 out:
+	buf->buf_type = RB_SP_TLS_INUSE;
 	return buf->ptr;
 }
 
+#define container_of(ptr, type, member) \
+	(type *)((uintptr_t)(ptr) - offsetof(type, member))
+
+VALUE rb_sp_puttlsbuf(VALUE p)
+{
+	struct rb_sp_tlsbuf *tls = pthread_getspecific(rb_sp_key);
+	void *ptr = (void *)p;
+	struct rb_sp_tlsbuf *buf;
+
+	if (!ptr)
+		return Qfalse;
+
+	buf = container_of(ptr, struct rb_sp_tlsbuf, ptr);
+
+	switch (buf->buf_type) {
+	case RB_SP_TLS_INUSE:
+		assert(tls == buf && "rb_sp_puttlsbuf mismatch");
+		buf->buf_type = RB_SP_TLS_READY;
+		break;
+	case RB_SP_TLS_READY:
+		assert(0 && "rb_sp_gettlsbuf not called");
+	case RB_SP_TLS_MALLOCED:
+		free(buf);
+	}
+	return Qfalse;
+}
+
 void Init_sleepy_penguin_ext(void)
 {
 	VALUE mSleepyPenguin;
diff --git a/ext/sleepy_penguin/inotify.c b/ext/sleepy_penguin/inotify.c
index b5cd67b..56fcff2 100644
--- a/ext/sleepy_penguin/inotify.c
+++ b/ext/sleepy_penguin/inotify.c
@@ -134,8 +134,11 @@ static VALUE event_new(struct inotify_event *e)
 }
 
 struct inread_args {
+	VALUE self;
 	int fd;
+	int nonblock_p;
 	size_t size;
+	VALUE tmp;
 	void *buf;
 };
 
@@ -158,6 +161,7 @@ static void resize_internal_buffer(struct inread_args *args)
 
 	if (newlen > 0) {
 		args->size = (size_t)newlen;
+		rb_sp_puttlsbuf((VALUE)args->buf);
 		args->buf = rb_sp_gettlsbuf(&args->size);
 	}
 
@@ -169,56 +173,35 @@ static void resize_internal_buffer(struct inread_args *args)
 		newlen);
 }
 
-/*
- * call-seq:
- *	ino.take([nonblock]) -> Inotify::Event or nil
- *
- * Returns the next Inotify::Event processed.  May return +nil+ if +nonblock+
- * is +true+.
- */
-static VALUE take(int argc, VALUE *argv, VALUE self)
+static VALUE do_take(VALUE p)
 {
-	struct inread_args args;
-	VALUE tmp = rb_ivar_get(self, id_inotify_tmp);
-	struct inotify_event *e, *end;
-	ssize_t r;
+	struct inread_args *args = (struct inread_args *)p;
 	VALUE rv = Qnil;
-	VALUE nonblock;
-
-	if (RARRAY_LEN(tmp) > 0)
-		return rb_ary_shift(tmp);
-
-	rb_scan_args(argc, argv, "01", &nonblock);
-
-	args.fd = rb_sp_fileno(self);
-	args.size = 128;
-	args.buf = rb_sp_gettlsbuf(&args.size);
+	struct inotify_event *e, *end;
 
-	if (RTEST(nonblock))
-		rb_sp_set_nonblock(args.fd);
-	else
-		blocking_io_prepare(args.fd);
+	args->buf = rb_sp_gettlsbuf(&args->size);
 	do {
-		r = (ssize_t)rb_sp_fd_region(inread, &args, args.fd);
+		ssize_t r = (ssize_t)rb_sp_fd_region(inread, args, args->fd);
 		if (r == 0 /* Linux < 2.6.21 */
 		    ||
 		    (r < 0 && errno == EINVAL) /* Linux >= 2.6.21 */
 		   ) {
-			resize_internal_buffer(&args);
+			resize_internal_buffer(args);
 		} else if (r < 0) {
-			if (errno == EAGAIN && RTEST(nonblock))
+			if (errno == EAGAIN && args->nonblock_p)
 				return Qnil;
-			if (!rb_sp_wait(rb_io_wait_readable, self, &args.fd))
+			if (!rb_sp_wait(rb_io_wait_readable, args->self,
+					&args->fd))
 				rb_sys_fail("read(inotify)");
 		} else {
 			/* buffer in userspace to minimize read() calls */
-			end = (struct inotify_event *)((char *)args.buf + r);
-			for (e = args.buf; e < end; ) {
+			end = (struct inotify_event *)((char *)args->buf + r);
+			for (e = args->buf; e < end; ) {
 				VALUE event = event_new(e);
 				if (NIL_P(rv))
 					rv = event;
 				else
-					rb_ary_push(tmp, event);
+					rb_ary_push(args->tmp, event);
 				e = (struct inotify_event *)
 				    ((char *)e + event_len(e));
 			}
@@ -230,6 +213,39 @@ static VALUE take(int argc, VALUE *argv, VALUE self)
 
 /*
  * call-seq:
+ *	ino.take([nonblock]) -> Inotify::Event or nil
+ *
+ * Returns the next Inotify::Event processed.  May return +nil+ if +nonblock+
+ * is +true+.
+ */
+static VALUE take(int argc, VALUE *argv, VALUE self)
+{
+	struct inread_args args;
+	VALUE nonblock;
+
+	args.tmp = rb_ivar_get(self, id_inotify_tmp);
+	if (RARRAY_LEN(args.tmp) > 0)
+		return rb_ary_shift(args.tmp);
+
+	rb_scan_args(argc, argv, "01", &nonblock);
+
+	args.self = self;
+	args.fd = rb_sp_fileno(self);
+	args.size = 128;
+	args.nonblock_p = RTEST(nonblock);
+
+	if (args.nonblock_p)
+		rb_sp_set_nonblock(args.fd);
+	else
+		blocking_io_prepare(args.fd);
+
+	args.buf = 0;
+	return rb_ensure(do_take, (VALUE)&args,
+			 rb_sp_puttlsbuf, (VALUE)args.buf);
+}
+
+/*
+ * call-seq:
  *	inotify_event.events => [ :MOVED_TO, ... ]
  *
  * Returns an array of symbolic event names based on the contents of
diff --git a/ext/sleepy_penguin/kqueue.c b/ext/sleepy_penguin/kqueue.c
index 22e20f1..22a2c5d 100644
--- a/ext/sleepy_penguin/kqueue.c
+++ b/ext/sleepy_penguin/kqueue.c
@@ -43,6 +43,7 @@ static VALUE mEv, mEvFilt, mNote, mVQ;
 
 struct kq_per_thread {
 	VALUE io;
+	VALUE changelist;
 	int fd;
 	int nchanges;
 	int nevents;
@@ -72,7 +73,7 @@ static int kq_fd_check(struct kq_per_thread *kpt)
 	return 1;
 }
 
-static struct kq_per_thread *kpt_get(VALUE self, int nchanges, int nevents)
+static struct kq_per_thread *kpt_get(int nchanges, int nevents)
 {
 	struct kq_per_thread *kpt;
 	size_t size;
@@ -89,8 +90,6 @@ static struct kq_per_thread *kpt_get(VALUE self, int nchanges, int nevents)
 	kpt->capa = max;
 	kpt->nchanges = nchanges;
 	kpt->nevents = nevents;
-	kpt->io = self;
-	kpt->fd = rb_sp_fileno(kpt->io);
 
 	return kpt;
 }
@@ -203,11 +202,16 @@ static VALUE nogvl_kevent(void *args)
 	return (VALUE)nevents;
 }
 
+static void changelist_prepare(struct kevent *, VALUE);
+
 static VALUE do_kevent(struct kq_per_thread *kpt)
 {
 	long nevents;
 	struct timespec expire_at;
 
+	if (kpt->nchanges)
+		changelist_prepare(kpt->events, kpt->changelist);
+
 	if (kpt->ts) {
 		clock_gettime(CLOCK_MONOTONIC, &expire_at);
 
@@ -333,7 +337,7 @@ static void changelist_prepare(struct kevent *events, VALUE changelist)
  */
 static VALUE sp_kevent(int argc, VALUE *argv, VALUE self)
 {
-	struct timespec ts;
+	struct timespec ts, *t;
 	VALUE changelist, events, timeout;
 	struct kq_per_thread *kpt;
 	int nchanges, nevents;
@@ -362,12 +366,14 @@ static VALUE sp_kevent(int argc, VALUE *argv, VALUE self)
 		nevents = 0;
 	}
 
-	kpt = kpt_get(self, nchanges, nevents);
-	kpt->ts = NIL_P(timeout) ? NULL : value2timespec(&ts, timeout);
-	if (nchanges)
-		changelist_prepare(kpt->events, changelist);
+	t = NIL_P(timeout) ? NULL : value2timespec(&ts, timeout);
+	kpt = kpt_get(nchanges, nevents);
+	kpt->ts = t;
+	kpt->changelist = changelist;
+	kpt->io = self;
+	kpt->fd = rb_sp_fileno(kpt->io);
 
-	return do_kevent(kpt);
+	return rb_ensure(do_kevent, (VALUE)kpt, rb_sp_puttlsbuf, (VALUE)kpt);
 }
 
 /* initialize constants in the SleepyPenguin::Ev namespace */
diff --git a/ext/sleepy_penguin/sleepy_penguin.h b/ext/sleepy_penguin/sleepy_penguin.h
index 99ad0b7..bd44e18 100644
--- a/ext/sleepy_penguin/sleepy_penguin.h
+++ b/ext/sleepy_penguin/sleepy_penguin.h
@@ -78,6 +78,7 @@ static inline VALUE fake_blocking_region(VALUE (*fn)(void *), void *data)
 typedef int rb_sp_waitfn(int fd);
 int rb_sp_wait(rb_sp_waitfn waiter, VALUE obj, int *fd);
 void *rb_sp_gettlsbuf(size_t *size);
+VALUE rb_sp_puttlsbuf(VALUE);
 
 /* Flexible array elements are standard in C99 */
 #if defined(__STDC_VERSION__) && (__STDC_VERSION__ >= 199901L)
diff --git a/test/test_epoll.rb b/test/test_epoll.rb
index d2b560c..786c5be 100644
--- a/test/test_epoll.rb
+++ b/test/test_epoll.rb
@@ -534,4 +534,30 @@ def test_epoll_as_queue
     end
     @ep.wait(1) { |flags, io| assert_equal(first[0], io) }
   end
+
+  def test_epoll_nest
+    ep2 = Epoll.new
+    r, w = IO.pipe
+    @ep.add(@rd, :IN)
+    @ep.add(@wr, :OUT)
+    ep2.add(r, :IN)
+    ep2.add(w, :OUT)
+    w.write('.')
+    @wr.write('.')
+    outer = []
+    inner = []
+    nr = 0
+    @ep.wait(2) do |_, io|
+      outer << io
+      ep2.wait(2) do |_, io2|
+        (inner[nr] ||= []) << io2
+      end
+      nr += 1
+    end
+    assert_equal [ @rd, @wr ].sort_by(&:fileno), outer.sort_by(&:fileno)
+    exp = [ r, w ].sort_by(&:fileno)
+    assert_equal [ exp, exp ], inner.map { |x| x.sort_by(&:fileno) }
+  ensure
+    [ r, w, ep2 ].compact.each(&:close)
+  end
 end if defined?(SleepyPenguin::Epoll)
diff --git a/test/test_kqueue.rb b/test/test_kqueue.rb
index fc59d60..6de75f3 100644
--- a/test/test_kqueue.rb
+++ b/test/test_kqueue.rb
@@ -68,5 +68,37 @@ def test_usable_after_fork
   ensure
     kq.close
   end
+
+  def test_epoll_nest
+    kq1 = Kqueue.new
+    kq2 = Kqueue.new
+    r1, w1 = IO.pipe
+    r2, w2 = IO.pipe
+    w1.write '.'
+    w2.write '.'
+    kq1.kevent([
+       Kevent[r1.fileno, EvFilt::READ, Ev::ADD, 0, 0, r1],
+       Kevent[w1.fileno, EvFilt::WRITE, Ev::ADD, 0, 0, w1]
+    ])
+    kq2.kevent([
+       Kevent[r2.fileno, EvFilt::READ, Ev::ADD, 0, 0, r2],
+       Kevent[w2.fileno, EvFilt::WRITE, Ev::ADD, 0, 0, w2]
+    ])
+    outer = []
+    inner = []
+    nr = 0
+    kq1.kevent(nil, 2) do |kev1|
+      outer << kev1.udata
+      kq2.kevent(nil, 2) do |kev2|
+        (inner[nr] ||= []) << kev2.udata
+      end
+      nr += 1
+    end
+    assert_equal [ r1, w1 ].sort_by(&:fileno), outer.sort_by(&:fileno)
+    exp = [ r2, w2 ].sort_by(&:fileno)
+    assert_equal [ exp, exp ], inner.map { |x| x.sort_by(&:fileno) }
+  ensure
+    [ r1, w1, r2, w2, kq1, kq2 ].compact.each(&:close)
+  end
 end if defined?(SleepyPenguin::Kqueue) &&
        IO.instance_methods.include?(:autoclose=)
-- 
EW


                 reply	other threads:[~2017-03-16 19:45 UTC|newest]

Thread overview: [no followups] expand[flat|nested]  mbox.gz  Atom feed

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

  List information: https://yhbt.net/sleepy_penguin/

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20170316194557.11415-1-e@80x24.org \
    --to=e@80x24.org \
    --cc=sleepy-penguin@bogomips.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
Code repositories for project(s) associated with this public inbox

	https://yhbt.net/sleepy_penguin.git/

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).