From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.3.2 (2011-06-06) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: AS6939 64.71.128.0/18 X-Spam-Status: No, score=-1.9 required=3.0 tests=AWL,BAYES_00 shortcircuit=no autolearn=unavailable version=3.3.2 X-Original-To: normalperson@yhbt.net Received: from zedshaw.xen.prgmr.com (zedshaw.xen.prgmr.com [64.71.167.205]) by dcvr.yhbt.net (Postfix) with ESMTP id 3CD461F79F for ; Thu, 22 Mar 2012 08:57:40 +0000 (UTC) Received: from zedshaw.xen.prgmr.com (localhost [IPv6:::1]) by zedshaw.xen.prgmr.com (Postfix) with ESMTP id CD9D021D35B for ; Thu, 22 Mar 2012 09:03:55 +0000 (UTC) MIME-Version: 1.0 Date: Thu, 22 Mar 2012 08:57:36 +0000 From: Eric Wong In-Reply-To: <20120322085736.GA14770@dcvr.yhbt.net> List-Archive: List-Help: List-Id: List-Post: List-Subscribe: List-Unsubscribe: Message-Id: <20120322085736.GA14770@dcvr.yhbt.net> Precedence: list References: <20120322085736.GA14770@dcvr.yhbt.net> Sender: sleepy.penguin@librelist.org Subject: [sleepy.penguin] [PATCH] epoll: use per-thread data structure for concurrent Epoll#wait To: sleepy.penguin@librelist.org Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit This allows multiple threads to park on Epoll#wait (without holding onto the GVL). This allows a single, one-shot notification to wake up a single thread (another notification to a different IO object would wake up another thread). This allows using the same multi-threaded, EPOLLONESHOT-based design as cmogstored: http://bogomips.org/cmogstored/queues.txt --- Pushed to git://bogomips.org/sleepy_penguin.git ext/sleepy_penguin/epoll.c | 139 +++++++++++++++++++++++++------------- ext/sleepy_penguin/epoll_green.h | 39 +++++------ test/test_epoll.rb | 48 ++++++++++++- 3 files changed, 160 insertions(+), 66 deletions(-) diff --git a/ext/sleepy_penguin/epoll.c b/ext/sleepy_penguin/epoll.c index 31c72e6..fa8edf0 100644 --- a/ext/sleepy_penguin/epoll.c +++ b/ext/sleepy_penguin/epoll.c @@ -12,6 +12,7 @@ #include "missing_rb_update_max_fd.h" #define EP_RECREATE (-2) +static pthread_key_t epoll_key; static st_table *active; static const int step = 64; /* unlikely to grow unless you're huge */ static VALUE cEpoll_IO; @@ -36,18 +37,60 @@ static VALUE unpack_event_data(struct epoll_event *event) return (VALUE)event->data.ptr; } +#if defined(__STDC_VERSION__) && (__STDC_VERSION__ >= 199901L) +# define FLEX_ARRAY +#elif defined(__GNUC__) +# if (__GNUC__ >= 3) +# define FLEX_ARRAY +# else +# define FLEX_ARRAY 0 +# endif +#endif + struct rb_epoll { int fd; - int timeout; - int maxevents; - int capa; - struct epoll_event *events; VALUE io; VALUE marks; VALUE flag_cache; int flags; }; +struct ep_per_thread { + struct rb_epoll *ep; + int timeout; + int maxevents; + int capa; + struct epoll_event events[FLEX_ARRAY]; +}; + +static struct ep_per_thread *ept_get(int maxevents) +{ + struct ep_per_thread *ept = pthread_getspecific(epoll_key); + int err; + size_t size; + + if (ept && ept->capa >= maxevents) + goto out; + + size = sizeof(struct ep_per_thread) + + sizeof(struct epoll_event) * maxevents; + + free(ept); /* free(NULL) works on glibc */ + ept = malloc(size); + if (ept == NULL) + rb_memerror(); + err = pthread_setspecific(epoll_key, ept); + if (err != 0) { + errno = err; + rb_sys_fail("pthread_setspecific"); + } + ept->capa = maxevents; +out: + ept->maxevents = maxevents; + + return ept; +} + static struct rb_epoll *ep_get(VALUE self) { struct rb_epoll *ep; @@ -70,7 +113,6 @@ static void gcfree(void *ptr) { struct rb_epoll *ep = ptr; - xfree(ep->events); if (ep->fd >= 0) { st_data_t key = ep->fd; st_delete(active, &key, NULL); @@ -95,9 +137,7 @@ static VALUE alloc(VALUE klass) ep->io = Qnil; ep->marks = Qnil; ep->flag_cache = Qnil; - ep->capa = step; ep->flags = 0; - ep->events = xmalloc(sizeof(struct epoll_event) * ep->capa); return self; } @@ -296,10 +336,10 @@ out: return io; } -static VALUE epwait_result(struct rb_epoll *ep, int n) +static VALUE epwait_result(struct ep_per_thread *ept, int n) { int i; - struct epoll_event *epoll_event = ep->events; + struct epoll_event *epoll_event = ept->events; VALUE obj_events, obj; if (n == -1) @@ -311,50 +351,44 @@ static VALUE epwait_result(struct rb_epoll *ep, int n) rb_yield_values(2, obj_events, obj); } - /* grow our event buffer for the next epoll_wait call */ - if (n == ep->capa) { - xfree(ep->events); - ep->capa += step; - ep->events = xmalloc(sizeof(struct epoll_event) * ep->capa); - } - return INT2NUM(n); } -static int epoll_resume_p(uint64_t expire_at, struct rb_epoll *ep) +static int epoll_resume_p(uint64_t expire_at, struct ep_per_thread *ept) { uint64_t now; - ep_fd_check(ep); + ep_fd_check(ept->ep); if (errno != EINTR) return 0; - if (ep->timeout < 0) + if (ept->timeout < 0) return 1; now = now_ms(); - ep->timeout = now > expire_at ? 0 : (int)(expire_at - now); + ept->timeout = now > expire_at ? 0 : (int)(expire_at - now); return 1; } #if defined(HAVE_RB_THREAD_BLOCKING_REGION) static VALUE nogvl_wait(void *args) { - struct rb_epoll *ep = args; - int n = epoll_wait(ep->fd, ep->events, ep->maxevents, ep->timeout); + struct ep_per_thread *ept = args; + int fd = ept->ep->fd; + int n = epoll_wait(fd, ept->events, ept->maxevents, ept->timeout); return (VALUE)n; } -static VALUE real_epwait(struct rb_epoll *ep) +static VALUE real_epwait(struct ep_per_thread *ept) { int n; - uint64_t expire_at = ep->timeout > 0 ? now_ms() + ep->timeout : 0; + uint64_t expire_at = ept->timeout > 0 ? now_ms() + ept->timeout : 0; - do - n = (int)rb_sp_fd_region(nogvl_wait, ep, ep->fd); - while (n == -1 && epoll_resume_p(expire_at, ep)); + do { + n = (int)rb_sp_fd_region(nogvl_wait, ept, ept->ep->fd); + } while (n == -1 && epoll_resume_p(expire_at, ept)); - return epwait_result(ep, n); + return epwait_result(ept, n); } #else /* 1.8 Green thread compatible code */ # include "epoll_green.h" @@ -374,20 +408,16 @@ static VALUE epwait(int argc, VALUE *argv, VALUE self) { VALUE timeout, maxevents; struct rb_epoll *ep = ep_get(self); + struct ep_per_thread *ept; ep_check(ep); rb_need_block(); rb_scan_args(argc, argv, "02", &maxevents, &timeout); - ep->timeout = NIL_P(timeout) ? -1 : NUM2INT(timeout); - ep->maxevents = NIL_P(maxevents) ? ep->capa : NUM2INT(maxevents); + ept = ept_get(NIL_P(maxevents) ? 64 : NUM2INT(maxevents)); + ept->timeout = NIL_P(timeout) ? -1 : NUM2INT(timeout); + ept->ep = ep; - if (ep->maxevents > ep->capa) { - xfree(ep->events); - ep->capa = ep->maxevents; - ep->events = xmalloc(sizeof(struct epoll_event) * ep->capa); - } - - return real_epwait(ep); + return real_epwait(ept); } /* @@ -526,8 +556,7 @@ static VALUE init_copy(VALUE copy, VALUE orig) struct rb_epoll *a = ep_get(orig); struct rb_epoll *b = ep_get(copy); - assert(a->events && b->events && a->events != b->events && - NIL_P(b->io) && "Ruby broken?"); + assert(NIL_P(b->io) && "Ruby broken?"); ep_check(a); assert(NIL_P(b->marks) && "mark array not nil"); @@ -632,9 +661,34 @@ static void atfork_child(void) st_free_table(old); } +static void epoll_once(void) +{ + int err = pthread_key_create(&epoll_key, free); + + if (err) { + errno = err; + rb_sys_fail("pthread_key_create"); + } + + active = st_init_numtable(); + + if (pthread_atfork(NULL, NULL, atfork_child) != 0) { + rb_gc(); + if (pthread_atfork(NULL, NULL, atfork_child) != 0) + rb_memerror(); + } +} + void sleepy_penguin_init_epoll(void) { VALUE mSleepyPenguin, cEpoll; + pthread_once_t once = PTHREAD_ONCE_INIT; + int err = pthread_once(&once, epoll_once); + + if (err) { + errno = err; + rb_sys_fail("pthread_once(.., epoll_once)"); + } /* * Document-module: SleepyPenguin @@ -732,11 +786,4 @@ void sleepy_penguin_init_epoll(void) rb_define_const(cEpoll, "ONESHOT", UINT2NUM(EPOLLONESHOT)); id_for_fd = rb_intern("for_fd"); - active = st_init_numtable(); - - if (pthread_atfork(NULL, NULL, atfork_child) != 0) { - rb_gc(); - if (pthread_atfork(NULL, NULL, atfork_child) != 0) - rb_memerror(); - } } diff --git a/ext/sleepy_penguin/epoll_green.h b/ext/sleepy_penguin/epoll_green.h index ef36490..276a545 100644 --- a/ext/sleepy_penguin/epoll_green.h +++ b/ext/sleepy_penguin/epoll_green.h @@ -20,49 +20,50 @@ do { \ } while (0) #endif -static int safe_epoll_wait(struct rb_epoll *ep) +static int safe_epoll_wait(struct ep_per_thread *ept) { int n; do { TRAP_BEG; - n = epoll_wait(ep->fd, ep->events, ep->maxevents, 0); + n = epoll_wait(ept->ep->fd, ept->events, ept->maxevents, 0); TRAP_END; - } while (n == -1 && errno == EINTR && ep_fd_check(ep)); + } while (n == -1 && errno == EINTR && ep_fd_check(ept->ep)); return n; } -static int epwait_forever(struct rb_epoll *ep) +static int epwait_forever(struct ep_per_thread *ept) { int n; do { - (void)rb_io_wait_readable(ep->fd); - n = safe_epoll_wait(ep); + (void)rb_io_wait_readable(ept->ep->fd); + n = safe_epoll_wait(ept); } while (n == 0); return n; } -static int epwait_timed(struct rb_epoll *ep) +static int epwait_timed(struct ep_per_thread *ept) { struct timeval tv; - tv.tv_sec = ep->timeout / 1000; - tv.tv_usec = (ep->timeout % 1000) * 1000; + tv.tv_sec = ept->timeout / 1000; + tv.tv_usec = (ept->timeout % 1000) * 1000; for (;;) { struct timeval t0, now, diff; int n; + int fd = ept->ep->fd; fd_set rfds; FD_ZERO(&rfds); - FD_SET(ep->fd, &rfds); + FD_SET(fd, &rfds); gettimeofday(&t0, NULL); - (void)rb_thread_select(ep->fd + 1, &rfds, NULL, NULL, &tv); - n = safe_epoll_wait(ep); + (void)rb_thread_select(fd + 1, &rfds, NULL, NULL, &tv); + n = safe_epoll_wait(ept); if (n != 0) return n; @@ -79,16 +80,16 @@ static int epwait_timed(struct rb_epoll *ep) return -1; } -static VALUE real_epwait(struct rb_epoll *ep) +static VALUE real_epwait(struct ep_per_thread *ept) { int n; - if (ep->timeout == -1) - n = epwait_forever(ep); - else if (ep->timeout == 0) - n = safe_epoll_wait(ep); + if (ept->timeout == -1) + n = epwait_forever(ept); + else if (ept->timeout == 0) + n = safe_epoll_wait(ept); else - n = epwait_timed(ep); + n = epwait_timed(ept); - return epwait_result(ep, n); + return epwait_result(ept, n); } diff --git a/test/test_epoll.rb b/test/test_epoll.rb index c96a733..7633d94 100644 --- a/test/test_epoll.rb +++ b/test/test_epoll.rb @@ -1,7 +1,9 @@ require 'test/unit' require 'fcntl' require 'socket' +require 'thread' $-w = true +Thread.abort_on_exception = true require 'sleepy_penguin' @@ -439,7 +441,7 @@ class TestEpoll < Test::Unit::TestCase def test_epoll_wait_signal_torture usr1 = 0 empty = 0 - nr = 1000 + nr = 100 @ep.add(@rd, Epoll::IN) tmp = [] trap(:USR1) { usr1 += 1 } @@ -461,4 +463,48 @@ class TestEpoll < Test::Unit::TestCase ensure trap(:USR1, "DEFAULT") end if ENV["STRESS"].to_i != 0 + + def test_wait_one_event_per_thread + thr = [] + pipes = {} + lock = Mutex.new + maxevents = 1 + ok = [] + nr = 10 + nr.times do + r, w = IO.pipe + pipes[r] = w + @ep.add(r, Epoll::IN | Epoll::ET | Epoll::ONESHOT) + + t = Thread.new do + sleep 2 + events = 0 + @ep.wait(maxevents) do |_,obj| + assert pipes.include?(obj), "#{obj.inspect} is unknown" + lock.synchronize { ok << obj } + events += 1 + end + events + end + thr << t + end + pipes.each_value { |w| w.syswrite '.' } + thr.each do |t| + begin + t.run + rescue ThreadError + end + end + + thr.each { |t| assert_equal 1, t.value } + assert_equal nr, ok.size, ok.inspect + assert_equal ok.size, ok.uniq.size, ok.inspect + assert_equal ok.map { |io| io.fileno }.sort, + pipes.keys.map { |io| io.fileno }.sort + ensure + pipes.each do |r,w| + r.close + w.close + end + end end -- Eric Wong