From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.3.2 (2011-06-06) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: AS47066 71.19.144.0/20 X-Spam-Status: No, score=-1.9 required=3.0 tests=AWL,BAYES_00 shortcircuit=no autolearn=unavailable version=3.3.2 X-Original-To: normalperson@yhbt.net Received: from zedshaw2.xen.prgmr.com (zedshaw2.xen.prgmr.com [71.19.156.177]) by dcvr.yhbt.net (Postfix) with ESMTP id 4FF061F430 for ; Fri, 12 Apr 2013 21:18:55 +0000 (UTC) Received: from zedshaw2.xen.prgmr.com (unknown [IPv6:::1]) by zedshaw2.xen.prgmr.com (Postfix) with ESMTP id 0665673DD6 for ; Fri, 12 Apr 2013 21:19:33 +0000 (UTC) MIME-Version: 1.0 Date: Fri, 12 Apr 2013 21:18:38 +0000 From: Eric Wong List-Archive: List-Help: List-Id: List-Post: List-Subscribe: List-Unsubscribe: Message-Id: <20130412211838.GB17862@dcvr.yhbt.net> Precedence: list References: <1365653855-1101-1-git-send-email-normalperson@yhbt.net> <1365653855-1101-5-git-send-email-normalperson@yhbt.net> Sender: sleepy.penguin@librelist.org Subject: Re: [sleepy.penguin] [PATCH 4/6] epoll: implement thread-safety for mark/flag arrays To: sleepy.penguin@librelist.org Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit Eric Wong wrote: > def add(io, events) > - __ep_check > fd = io.to_io.fileno > events = __event_flags(events) > + @mtx.synchronize do > + __ep_check > + @events[fd] = events > + @marks[fd] = io > + end > @to_io.epoll_ctl(CTL_ADD, io, events) > - @marks[fd] = io > - @events[fd] = events > 0 > end The above ordering is incorrect and the marks must be set after the epoll_ctl syscall in the presence of concurrent epoll_wait callers. Since epoll_ctl is uninterruptible, there is no real problem in holding @mtx across epoll_ctl. So the correct ordering should be something like this: @mtx.synchronize do __ep_check @to_io.epoll_ctl(CTL_ADD, io, events) @events[fd] = events @marks[fd] = io end Yes, lock nesting happens with our epoll_ctl call; but even inside the current 3.9 Linux kernel there is internal lock nesting (both epmutex and ep->mtx are held) for EPOLL_CTL_ADD/DEL operations. Ditto for the other epoll_ctl wrapper methods. --------------------------------8<--------------------------------------- Subject: [PATCH] epoll: implement thread-safety for mark/flag arrays Concurrent modification of Arrays is thread-unsafe and must be protected by a Mutex. eventpoll objects inside the Linux kernel are similarly protected by a (kernel) mutex, and do not need additional locking. --- lib/sleepy_penguin/epoll.rb | 150 +++++++++++++++++++++++++++----------------- 1 file changed, 93 insertions(+), 57 deletions(-) diff --git a/lib/sleepy_penguin/epoll.rb b/lib/sleepy_penguin/epoll.rb index 845dcf0..8d78e46 100644 --- a/lib/sleepy_penguin/epoll.rb +++ b/lib/sleepy_penguin/epoll.rb @@ -1,3 +1,4 @@ +require 'thread' class SleepyPenguin::Epoll # Epoll objects may be watched by IO.select and similar methods @@ -10,6 +11,7 @@ class SleepyPenguin::Epoll # +flags+ may currently be +:CLOEXEC+ or +0+ (or +nil+). def initialize(create_flags = nil) @to_io = SleepyPenguin::Epoll::IO.new(create_flags) + @mtx = Mutex.new @events = [] @marks = [] @pid = $$ @@ -46,19 +48,34 @@ def __ep_check # :nodoc: # +timeout+ is specified in milliseconds, +nil+ # (the default) meaning it will block and wait indefinitely. def wait(maxevents = 64, timeout = nil) - __ep_check + # snapshot the marks so we do can sit this thread on epoll_wait while other + # threads may call epoll_ctl. People say RCU is a poor man's GC, but our + # (ab)use of GC here is inspired by RCU... + snapshot = @mtx.synchronize do + __ep_check + @marks.dup + end + + # we keep a snapshot of @marks around in case another thread closes + # the IO while it is being transferred to userspace. We release mtx + # so another thread may add events to us while we're sleeping. @to_io.epoll_wait(maxevents, timeout) { |events, io| yield(events, io) } + ensure + # hopefully Ruby does not optimize this array away... + snapshot[0] end # Starts watching a given +io+ object with +events+ which may be an Integer # bitmask or Array representing arrays to watch for. def add(io, events) - __ep_check fd = io.to_io.fileno events = __event_flags(events) - @to_io.epoll_ctl(CTL_ADD, io, events) - @marks[fd] = io - @events[fd] = events + @mtx.synchronize do + __ep_check + @to_io.epoll_ctl(CTL_ADD, io, events) + @events[fd] = events + @marks[fd] = io + end 0 end @@ -67,11 +84,13 @@ def add(io, events) # # Disables an IO object from being watched. def del(io) - __ep_check fd = io.to_io.fileno - rv = @to_io.epoll_ctl(CTL_DEL, io, 0) - @marks[fd] = @events[fd] = nil - rv + @mtx.synchronize do + __ep_check + @to_io.epoll_ctl(CTL_DEL, io, 0) + @events[fd] = @marks[fd] = nil + end + 0 end # call-seq: @@ -86,12 +105,14 @@ def del(io) # # This method is deprecated and will be removed in sleepy_penguin 4.x def delete(io) - __ep_check fd = io.to_io.fileno - cur_io = @marks[fd] - return if nil == cur_io || cur_io.to_io.closed? - @marks[fd] = @events[fd] = nil - @to_io.epoll_ctl(CTL_DEL, io, 0) + @mtx.synchronize do + __ep_check + cur_io = @marks[fd] + return if nil == cur_io || cur_io.to_io.closed? + @to_io.epoll_ctl(CTL_DEL, io, 0) + @events[fd] = @marks[fd] = nil + end io rescue Errno::ENOENT, Errno::EBADF end @@ -102,13 +123,14 @@ def delete(io) # Changes the watch for an existing IO object based on +events+. # Returns zero on success, will raise SystemError on failure. def mod(io, events) - __ep_check - fd = io.to_io.fileno events = __event_flags(events) - rv = @to_io.epoll_ctl(CTL_MOD, io, events) - @marks[fd] = io - @events[fd] = events - rv + fd = io.to_io.fileno + @mtx.synchronize do + __ep_check + @to_io.epoll_ctl(CTL_MOD, io, events) + @marks[fd] = io # may be a different object with same fd/file + @events[fd] = events + end end # call-seq: @@ -129,29 +151,31 @@ def mod(io, events) # # This method is deprecated and will be removed in sleepy_penguin 4.x def set(io, events) - __ep_check fd = io.to_io.fileno - cur_io = @marks[fd] - if cur_io == io - cur_events = @events[fd] - return 0 if (cur_events & ONESHOT) == 0 && cur_events == events - begin - @to_io.epoll_ctl(CTL_MOD, io, events) - rescue Errno::ENOENT - warn "epoll flag cache failed (mod -> add)" - @to_io.epoll_ctl(CTL_ADD, io, events) + @mtx.synchronize do + __ep_check + cur_io = @marks[fd] + if cur_io == io + cur_events = @events[fd] + return 0 if (cur_events & ONESHOT) == 0 && cur_events == events + begin + @to_io.epoll_ctl(CTL_MOD, io, events) + rescue Errno::ENOENT + warn "epoll event cache failed (mod -> add)" + @to_io.epoll_ctl(CTL_ADD, io, events) + @marks[fd] = io + end + else + begin + @to_io.epoll_ctl(CTL_ADD, io, events) + rescue Errno::EEXIST + warn "epoll event cache failed (add -> mod)" + @to_io.epoll_ctl(CTL_MOD, io, events) + end @marks[fd] = io end - else - begin - @to_io.epoll_ctl(CTL_ADD, io, events) - rescue Errno::EEXIST - warn "epoll flag cache failed (add -> mod)" - @to_io.epoll_ctl(CTL_MOD, io, events) - end - @marks[fd] = io + @events[fd] = events end - @events[fd] = events 0 end @@ -161,9 +185,10 @@ def set(io, events) # Closes an existing Epoll object and returns memory back to the kernel. # Raises IOError if object is already closed. def close - __ep_check - @copies.delete(@to_io) - @to_io.close + @mtx.synchronize do + @copies.delete(@to_io) + @to_io.close + end end # call-seq: @@ -171,8 +196,9 @@ def close # # Returns whether or not an Epoll object is closed. def closed? - __ep_check - @to_io.closed? + @mtx.synchronize do + @to_io.closed? + end end # we still support integer FDs for some debug functions @@ -187,8 +213,11 @@ def __fileno(io) # :nodoc: # IO objects may internally refer to the same process file descriptor. # Mostly used for debugging. def io_for(io) - __ep_check - @marks[__fileno(io)] + fd = __fileno(io) + @mtx.synchronize do + __ep_check + @marks[fd] + end end # call-seq: @@ -197,8 +226,11 @@ def io_for(io) # Returns the events currently watched for in current Epoll object. # Mostly used for debugging. def events_for(io) - __ep_check - @events[__fileno(io)] + fd = __fileno(io) + @mtx.synchronize do + __ep_check + @events[fd] + end end # backwards compatibility, to be removed in 4.x @@ -211,18 +243,22 @@ def events_for(io) # garbage-collected by the current Epoll object. This may include # closed IO objects. def include?(io) - __ep_check - @marks[__fileno(io)] ? true : nil + fd = __fileno(io) + @mtx.synchronize do + __ep_check + @marks[fd] ? true : false + end end def initialize_copy(src) # :nodoc: - __ep_check - rv = super - unless @to_io.closed? - @to_io = @to_io.dup - @copies[@to_io] = self + @mtx.synchronize do + __ep_check + rv = super + unless @to_io.closed? + @to_io = @to_io.dup + @copies[@to_io] = self + end + rv end - - rv end end -- Eric Wong