From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.0 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id A4EDE20323 for ; Thu, 16 Mar 2017 19:45:57 +0000 (UTC) From: Eric Wong To: sleepy-penguin@bogomips.org Subject: [PATCH] allow nestable TLS buffers within the same thread Date: Thu, 16 Mar 2017 19:45:57 +0000 Message-Id: <20170316194557.11415-1-e@80x24.org> List-Id: Users may wish to use our epoll or kqueue interfaces within their own app running on a web server or some such. This prevents users from missing events at an increased allocation cost. --- ext/sleepy_penguin/epoll.c | 14 ++++--- ext/sleepy_penguin/init.c | 75 +++++++++++++++++++++++++++------ ext/sleepy_penguin/inotify.c | 82 ++++++++++++++++++++++--------------- ext/sleepy_penguin/kqueue.c | 24 +++++++---- ext/sleepy_penguin/sleepy_penguin.h | 1 + test/test_epoll.rb | 26 ++++++++++++ test/test_kqueue.rb | 32 +++++++++++++++ 7 files changed, 194 insertions(+), 60 deletions(-) diff --git a/ext/sleepy_penguin/epoll.c b/ext/sleepy_penguin/epoll.c index e655bf9..512e11c 100644 --- a/ext/sleepy_penguin/epoll.c +++ b/ext/sleepy_penguin/epoll.c @@ -49,7 +49,7 @@ static int ep_fd_check(struct ep_per_thread *ept) return 1; } -static struct ep_per_thread *ept_get(VALUE self, int maxevents) +static struct ep_per_thread *ept_get(int maxevents) { struct ep_per_thread *ept; size_t size; @@ -66,8 +66,6 @@ static struct ep_per_thread *ept_get(VALUE self, int maxevents) ept = rb_sp_gettlsbuf(&size); ept->capa = maxevents; ept->maxevents = maxevents; - ept->io = self; - ept->fd = rb_sp_fileno(ept->io); return ept; } @@ -177,6 +175,7 @@ static VALUE real_epwait(struct ep_per_thread *ept) long n; uint64_t expire_at = ept->timeout > 0 ? now_ms() + ept->timeout : 0; + ept->fd = rb_sp_fileno(ept->io); do { n = (long)rb_sp_fd_region(nogvl_wait, ept, ept->fd); } while (n < 0 && epoll_resume_p(expire_at, ept)); @@ -200,14 +199,17 @@ static VALUE epwait(int argc, VALUE *argv, VALUE self) { VALUE timeout, maxevents; struct ep_per_thread *ept; + int t; rb_need_block(); rb_scan_args(argc, argv, "02", &maxevents, &timeout); + t = NIL_P(timeout) ? -1 : NUM2INT(timeout); - ept = ept_get(self, NIL_P(maxevents) ? 64 : NUM2INT(maxevents)); - ept->timeout = NIL_P(timeout) ? -1 : NUM2INT(timeout); + ept = ept_get(NIL_P(maxevents) ? 64 : NUM2INT(maxevents)); + ept->timeout = t; + ept->io = self; - return real_epwait(ept); + return rb_ensure(real_epwait, (VALUE)ept, rb_sp_puttlsbuf, (VALUE)ept); } /* :nodoc: */ diff --git a/ext/sleepy_penguin/init.c b/ext/sleepy_penguin/init.c index f9671eb..27aada4 100644 --- a/ext/sleepy_penguin/init.c +++ b/ext/sleepy_penguin/init.c @@ -11,8 +11,15 @@ #define L1_CACHE_LINE_MAX 128 /* largest I've seen (Pentium 4) */ size_t rb_sp_l1_cache_line_size; static pthread_key_t rb_sp_key; +enum rb_sp_tls_buf_type { + RB_SP_TLS_INUSE = -1, + RB_SP_TLS_READY = 0, + RB_SP_TLS_MALLOCED = 1 +}; + struct rb_sp_tlsbuf { - size_t capa; + uint32_t capa; + enum rb_sp_tls_buf_type buf_type; unsigned char ptr[FLEX_ARRAY]; }; @@ -89,12 +96,36 @@ static void sp_once(void) } } +static struct rb_sp_tlsbuf *alloc_tlsbuf(size_t size) +{ + size_t bytes = size + sizeof(struct rb_sp_tlsbuf); + struct rb_sp_tlsbuf *buf; + void *ptr; + int err = posix_memalign(&ptr, rb_sp_l1_cache_line_size, bytes); + + if (err) { + errno = err; + rb_memerror(); /* fatal */ + } + + buf = ptr; + buf->capa = size; + + return buf; +} + void *rb_sp_gettlsbuf(size_t *size) { struct rb_sp_tlsbuf *buf = pthread_getspecific(rb_sp_key); - void *ptr; int err; - size_t bytes; + + assert(buf ? buf->buf_type != RB_SP_TLS_MALLOCED : 1); + + if (buf && buf->buf_type != RB_SP_TLS_READY) { + buf = alloc_tlsbuf(*size); + buf->buf_type = RB_SP_TLS_MALLOCED; + return buf->ptr; + } if (buf && buf->capa >= *size) { *size = buf->capa; @@ -102,15 +133,7 @@ void *rb_sp_gettlsbuf(size_t *size) } free(buf); - bytes = *size + sizeof(struct rb_sp_tlsbuf); - err = posix_memalign(&ptr, rb_sp_l1_cache_line_size, bytes); - if (err) { - errno = err; - rb_memerror(); /* fatal */ - } - - buf = ptr; - buf->capa = *size; + buf = alloc_tlsbuf(*size); err = pthread_setspecific(rb_sp_key, buf); if (err != 0) { free(buf); @@ -118,9 +141,37 @@ void *rb_sp_gettlsbuf(size_t *size) rb_sys_fail("BUG: pthread_setspecific"); } out: + buf->buf_type = RB_SP_TLS_INUSE; return buf->ptr; } +#define container_of(ptr, type, member) \ + (type *)((uintptr_t)(ptr) - offsetof(type, member)) + +VALUE rb_sp_puttlsbuf(VALUE p) +{ + struct rb_sp_tlsbuf *tls = pthread_getspecific(rb_sp_key); + void *ptr = (void *)p; + struct rb_sp_tlsbuf *buf; + + if (!ptr) + return Qfalse; + + buf = container_of(ptr, struct rb_sp_tlsbuf, ptr); + + switch (buf->buf_type) { + case RB_SP_TLS_INUSE: + assert(tls == buf && "rb_sp_puttlsbuf mismatch"); + buf->buf_type = RB_SP_TLS_READY; + break; + case RB_SP_TLS_READY: + assert(0 && "rb_sp_gettlsbuf not called"); + case RB_SP_TLS_MALLOCED: + free(buf); + } + return Qfalse; +} + void Init_sleepy_penguin_ext(void) { VALUE mSleepyPenguin; diff --git a/ext/sleepy_penguin/inotify.c b/ext/sleepy_penguin/inotify.c index b5cd67b..56fcff2 100644 --- a/ext/sleepy_penguin/inotify.c +++ b/ext/sleepy_penguin/inotify.c @@ -134,8 +134,11 @@ static VALUE event_new(struct inotify_event *e) } struct inread_args { + VALUE self; int fd; + int nonblock_p; size_t size; + VALUE tmp; void *buf; }; @@ -158,6 +161,7 @@ static void resize_internal_buffer(struct inread_args *args) if (newlen > 0) { args->size = (size_t)newlen; + rb_sp_puttlsbuf((VALUE)args->buf); args->buf = rb_sp_gettlsbuf(&args->size); } @@ -169,56 +173,35 @@ static void resize_internal_buffer(struct inread_args *args) newlen); } -/* - * call-seq: - * ino.take([nonblock]) -> Inotify::Event or nil - * - * Returns the next Inotify::Event processed. May return +nil+ if +nonblock+ - * is +true+. - */ -static VALUE take(int argc, VALUE *argv, VALUE self) +static VALUE do_take(VALUE p) { - struct inread_args args; - VALUE tmp = rb_ivar_get(self, id_inotify_tmp); - struct inotify_event *e, *end; - ssize_t r; + struct inread_args *args = (struct inread_args *)p; VALUE rv = Qnil; - VALUE nonblock; - - if (RARRAY_LEN(tmp) > 0) - return rb_ary_shift(tmp); - - rb_scan_args(argc, argv, "01", &nonblock); - - args.fd = rb_sp_fileno(self); - args.size = 128; - args.buf = rb_sp_gettlsbuf(&args.size); + struct inotify_event *e, *end; - if (RTEST(nonblock)) - rb_sp_set_nonblock(args.fd); - else - blocking_io_prepare(args.fd); + args->buf = rb_sp_gettlsbuf(&args->size); do { - r = (ssize_t)rb_sp_fd_region(inread, &args, args.fd); + ssize_t r = (ssize_t)rb_sp_fd_region(inread, args, args->fd); if (r == 0 /* Linux < 2.6.21 */ || (r < 0 && errno == EINVAL) /* Linux >= 2.6.21 */ ) { - resize_internal_buffer(&args); + resize_internal_buffer(args); } else if (r < 0) { - if (errno == EAGAIN && RTEST(nonblock)) + if (errno == EAGAIN && args->nonblock_p) return Qnil; - if (!rb_sp_wait(rb_io_wait_readable, self, &args.fd)) + if (!rb_sp_wait(rb_io_wait_readable, args->self, + &args->fd)) rb_sys_fail("read(inotify)"); } else { /* buffer in userspace to minimize read() calls */ - end = (struct inotify_event *)((char *)args.buf + r); - for (e = args.buf; e < end; ) { + end = (struct inotify_event *)((char *)args->buf + r); + for (e = args->buf; e < end; ) { VALUE event = event_new(e); if (NIL_P(rv)) rv = event; else - rb_ary_push(tmp, event); + rb_ary_push(args->tmp, event); e = (struct inotify_event *) ((char *)e + event_len(e)); } @@ -230,6 +213,39 @@ static VALUE take(int argc, VALUE *argv, VALUE self) /* * call-seq: + * ino.take([nonblock]) -> Inotify::Event or nil + * + * Returns the next Inotify::Event processed. May return +nil+ if +nonblock+ + * is +true+. + */ +static VALUE take(int argc, VALUE *argv, VALUE self) +{ + struct inread_args args; + VALUE nonblock; + + args.tmp = rb_ivar_get(self, id_inotify_tmp); + if (RARRAY_LEN(args.tmp) > 0) + return rb_ary_shift(args.tmp); + + rb_scan_args(argc, argv, "01", &nonblock); + + args.self = self; + args.fd = rb_sp_fileno(self); + args.size = 128; + args.nonblock_p = RTEST(nonblock); + + if (args.nonblock_p) + rb_sp_set_nonblock(args.fd); + else + blocking_io_prepare(args.fd); + + args.buf = 0; + return rb_ensure(do_take, (VALUE)&args, + rb_sp_puttlsbuf, (VALUE)args.buf); +} + +/* + * call-seq: * inotify_event.events => [ :MOVED_TO, ... ] * * Returns an array of symbolic event names based on the contents of diff --git a/ext/sleepy_penguin/kqueue.c b/ext/sleepy_penguin/kqueue.c index 22e20f1..22a2c5d 100644 --- a/ext/sleepy_penguin/kqueue.c +++ b/ext/sleepy_penguin/kqueue.c @@ -43,6 +43,7 @@ static VALUE mEv, mEvFilt, mNote, mVQ; struct kq_per_thread { VALUE io; + VALUE changelist; int fd; int nchanges; int nevents; @@ -72,7 +73,7 @@ static int kq_fd_check(struct kq_per_thread *kpt) return 1; } -static struct kq_per_thread *kpt_get(VALUE self, int nchanges, int nevents) +static struct kq_per_thread *kpt_get(int nchanges, int nevents) { struct kq_per_thread *kpt; size_t size; @@ -89,8 +90,6 @@ static struct kq_per_thread *kpt_get(VALUE self, int nchanges, int nevents) kpt->capa = max; kpt->nchanges = nchanges; kpt->nevents = nevents; - kpt->io = self; - kpt->fd = rb_sp_fileno(kpt->io); return kpt; } @@ -203,11 +202,16 @@ static VALUE nogvl_kevent(void *args) return (VALUE)nevents; } +static void changelist_prepare(struct kevent *, VALUE); + static VALUE do_kevent(struct kq_per_thread *kpt) { long nevents; struct timespec expire_at; + if (kpt->nchanges) + changelist_prepare(kpt->events, kpt->changelist); + if (kpt->ts) { clock_gettime(CLOCK_MONOTONIC, &expire_at); @@ -333,7 +337,7 @@ static void changelist_prepare(struct kevent *events, VALUE changelist) */ static VALUE sp_kevent(int argc, VALUE *argv, VALUE self) { - struct timespec ts; + struct timespec ts, *t; VALUE changelist, events, timeout; struct kq_per_thread *kpt; int nchanges, nevents; @@ -362,12 +366,14 @@ static VALUE sp_kevent(int argc, VALUE *argv, VALUE self) nevents = 0; } - kpt = kpt_get(self, nchanges, nevents); - kpt->ts = NIL_P(timeout) ? NULL : value2timespec(&ts, timeout); - if (nchanges) - changelist_prepare(kpt->events, changelist); + t = NIL_P(timeout) ? NULL : value2timespec(&ts, timeout); + kpt = kpt_get(nchanges, nevents); + kpt->ts = t; + kpt->changelist = changelist; + kpt->io = self; + kpt->fd = rb_sp_fileno(kpt->io); - return do_kevent(kpt); + return rb_ensure(do_kevent, (VALUE)kpt, rb_sp_puttlsbuf, (VALUE)kpt); } /* initialize constants in the SleepyPenguin::Ev namespace */ diff --git a/ext/sleepy_penguin/sleepy_penguin.h b/ext/sleepy_penguin/sleepy_penguin.h index 99ad0b7..bd44e18 100644 --- a/ext/sleepy_penguin/sleepy_penguin.h +++ b/ext/sleepy_penguin/sleepy_penguin.h @@ -78,6 +78,7 @@ static inline VALUE fake_blocking_region(VALUE (*fn)(void *), void *data) typedef int rb_sp_waitfn(int fd); int rb_sp_wait(rb_sp_waitfn waiter, VALUE obj, int *fd); void *rb_sp_gettlsbuf(size_t *size); +VALUE rb_sp_puttlsbuf(VALUE); /* Flexible array elements are standard in C99 */ #if defined(__STDC_VERSION__) && (__STDC_VERSION__ >= 199901L) diff --git a/test/test_epoll.rb b/test/test_epoll.rb index d2b560c..786c5be 100644 --- a/test/test_epoll.rb +++ b/test/test_epoll.rb @@ -534,4 +534,30 @@ def test_epoll_as_queue end @ep.wait(1) { |flags, io| assert_equal(first[0], io) } end + + def test_epoll_nest + ep2 = Epoll.new + r, w = IO.pipe + @ep.add(@rd, :IN) + @ep.add(@wr, :OUT) + ep2.add(r, :IN) + ep2.add(w, :OUT) + w.write('.') + @wr.write('.') + outer = [] + inner = [] + nr = 0 + @ep.wait(2) do |_, io| + outer << io + ep2.wait(2) do |_, io2| + (inner[nr] ||= []) << io2 + end + nr += 1 + end + assert_equal [ @rd, @wr ].sort_by(&:fileno), outer.sort_by(&:fileno) + exp = [ r, w ].sort_by(&:fileno) + assert_equal [ exp, exp ], inner.map { |x| x.sort_by(&:fileno) } + ensure + [ r, w, ep2 ].compact.each(&:close) + end end if defined?(SleepyPenguin::Epoll) diff --git a/test/test_kqueue.rb b/test/test_kqueue.rb index fc59d60..6de75f3 100644 --- a/test/test_kqueue.rb +++ b/test/test_kqueue.rb @@ -68,5 +68,37 @@ def test_usable_after_fork ensure kq.close end + + def test_epoll_nest + kq1 = Kqueue.new + kq2 = Kqueue.new + r1, w1 = IO.pipe + r2, w2 = IO.pipe + w1.write '.' + w2.write '.' + kq1.kevent([ + Kevent[r1.fileno, EvFilt::READ, Ev::ADD, 0, 0, r1], + Kevent[w1.fileno, EvFilt::WRITE, Ev::ADD, 0, 0, w1] + ]) + kq2.kevent([ + Kevent[r2.fileno, EvFilt::READ, Ev::ADD, 0, 0, r2], + Kevent[w2.fileno, EvFilt::WRITE, Ev::ADD, 0, 0, w2] + ]) + outer = [] + inner = [] + nr = 0 + kq1.kevent(nil, 2) do |kev1| + outer << kev1.udata + kq2.kevent(nil, 2) do |kev2| + (inner[nr] ||= []) << kev2.udata + end + nr += 1 + end + assert_equal [ r1, w1 ].sort_by(&:fileno), outer.sort_by(&:fileno) + exp = [ r2, w2 ].sort_by(&:fileno) + assert_equal [ exp, exp ], inner.map { |x| x.sort_by(&:fileno) } + ensure + [ r1, w1, r2, w2, kq1, kq2 ].compact.each(&:close) + end end if defined?(SleepyPenguin::Kqueue) && IO.instance_methods.include?(:autoclose=) -- EW