From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.3.2 (2011-06-06) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: AS47066 71.19.144.0/20 X-Spam-Status: No, score=-1.9 required=3.0 tests=AWL,BAYES_00 shortcircuit=no autolearn=unavailable version=3.3.2 X-Original-To: normalperson@yhbt.net Received: from zedshaw2.xen.prgmr.com (zedshaw2.xen.prgmr.com [71.19.156.177]) by dcvr.yhbt.net (Postfix) with ESMTP id C0A581F6C9 for ; Tue, 30 Apr 2013 02:40:10 +0000 (UTC) Received: from zedshaw2.xen.prgmr.com (unknown [IPv6:::1]) by zedshaw2.xen.prgmr.com (Postfix) with ESMTP id 402EC73DFC for ; Tue, 30 Apr 2013 02:42:08 +0000 (UTC) MIME-Version: 1.0 Date: Tue, 30 Apr 2013 02:39:29 +0000 From: Eric Wong List-Archive: List-Help: List-Id: List-Post: List-Subscribe: List-Unsubscribe: Message-Id: <1367289582-31293-5-git-send-email-normalperson@yhbt.net> Precedence: list References: <1367289582-31293-1-git-send-email-normalperson@yhbt.net> Sender: sleepy.penguin@librelist.org Subject: [sleepy.penguin] [PATCH 04/17] fork-safe "to_io" in high-level epoll/kqueue To: sleepy.penguin@librelist.org Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit We need to validate the underlying IO object before using it in a forked child. --- lib/sleepy_penguin/epoll.rb | 51 ++++++++++++++++++++++++-------------------- lib/sleepy_penguin/kqueue.rb | 39 ++++++++++++++++++--------------- 2 files changed, 50 insertions(+), 40 deletions(-) diff --git a/lib/sleepy_penguin/epoll.rb b/lib/sleepy_penguin/epoll.rb index 8d78e46..f29189a 100644 --- a/lib/sleepy_penguin/epoll.rb +++ b/lib/sleepy_penguin/epoll.rb @@ -1,45 +1,50 @@ require 'thread' class SleepyPenguin::Epoll - # Epoll objects may be watched by IO.select and similar methods - attr_reader :to_io - # call-seq: # SleepyPenguin::Epoll.new([flags]) -> Epoll object # # Creates a new Epoll object with an optional +flags+ argument. # +flags+ may currently be +:CLOEXEC+ or +0+ (or +nil+). def initialize(create_flags = nil) - @to_io = SleepyPenguin::Epoll::IO.new(create_flags) + @io = SleepyPenguin::Epoll::IO.new(create_flags) @mtx = Mutex.new @events = [] @marks = [] @pid = $$ @create_flags = create_flags - @copies = { @to_io => self } + @copies = { @io => self } end def __ep_reinit # :nodoc: @events.clear @marks.clear - @to_io = SleepyPenguin::Epoll::IO.new(@create_flags) + @io = SleepyPenguin::Epoll::IO.new(@create_flags) end # auto-reinitialize the Epoll object after forking def __ep_check # :nodoc: return if @pid == $$ - return if @to_io.closed? + return if @io.closed? objects = @copies.values @copies.each_key { |epio| epio.close } @copies.clear __ep_reinit objects.each do |obj| - io_dup = @to_io.dup + io_dup = @io.dup @copies[io_dup] = obj end @pid = $$ end + # Epoll objects may be watched by IO.select and similar methods + def to_io + @mtx.synchronize do + __ep_check + @io + end + end + # Calls epoll_wait(2) and yields Integer +events+ and IO objects watched # for. +maxevents+ is the maximum number of events to process at once, # lower numbers may prevent starvation when used by epoll_wait in multiple @@ -59,7 +64,7 @@ def wait(maxevents = 64, timeout = nil) # we keep a snapshot of @marks around in case another thread closes # the IO while it is being transferred to userspace. We release mtx # so another thread may add events to us while we're sleeping. - @to_io.epoll_wait(maxevents, timeout) { |events, io| yield(events, io) } + @io.epoll_wait(maxevents, timeout) { |events, io| yield(events, io) } ensure # hopefully Ruby does not optimize this array away... snapshot[0] @@ -72,7 +77,7 @@ def add(io, events) events = __event_flags(events) @mtx.synchronize do __ep_check - @to_io.epoll_ctl(CTL_ADD, io, events) + @io.epoll_ctl(CTL_ADD, io, events) @events[fd] = events @marks[fd] = io end @@ -87,7 +92,7 @@ def del(io) fd = io.to_io.fileno @mtx.synchronize do __ep_check - @to_io.epoll_ctl(CTL_DEL, io, 0) + @io.epoll_ctl(CTL_DEL, io, 0) @events[fd] = @marks[fd] = nil end 0 @@ -110,7 +115,7 @@ def delete(io) __ep_check cur_io = @marks[fd] return if nil == cur_io || cur_io.to_io.closed? - @to_io.epoll_ctl(CTL_DEL, io, 0) + @io.epoll_ctl(CTL_DEL, io, 0) @events[fd] = @marks[fd] = nil end io @@ -127,7 +132,7 @@ def mod(io, events) fd = io.to_io.fileno @mtx.synchronize do __ep_check - @to_io.epoll_ctl(CTL_MOD, io, events) + @io.epoll_ctl(CTL_MOD, io, events) @marks[fd] = io # may be a different object with same fd/file @events[fd] = events end @@ -159,18 +164,18 @@ def set(io, events) cur_events = @events[fd] return 0 if (cur_events & ONESHOT) == 0 && cur_events == events begin - @to_io.epoll_ctl(CTL_MOD, io, events) + @io.epoll_ctl(CTL_MOD, io, events) rescue Errno::ENOENT warn "epoll event cache failed (mod -> add)" - @to_io.epoll_ctl(CTL_ADD, io, events) + @io.epoll_ctl(CTL_ADD, io, events) @marks[fd] = io end else begin - @to_io.epoll_ctl(CTL_ADD, io, events) + @io.epoll_ctl(CTL_ADD, io, events) rescue Errno::EEXIST warn "epoll event cache failed (add -> mod)" - @to_io.epoll_ctl(CTL_MOD, io, events) + @io.epoll_ctl(CTL_MOD, io, events) end @marks[fd] = io end @@ -186,8 +191,8 @@ def set(io, events) # Raises IOError if object is already closed. def close @mtx.synchronize do - @copies.delete(@to_io) - @to_io.close + @copies.delete(@io) + @io.close end end @@ -197,7 +202,7 @@ def close # Returns whether or not an Epoll object is closed. def closed? @mtx.synchronize do - @to_io.closed? + @io.closed? end end @@ -254,9 +259,9 @@ def initialize_copy(src) # :nodoc: @mtx.synchronize do __ep_check rv = super - unless @to_io.closed? - @to_io = @to_io.dup - @copies[@to_io] = self + unless @io.closed? + @io = @io.dup + @copies[@io] = self end rv end diff --git a/lib/sleepy_penguin/kqueue.rb b/lib/sleepy_penguin/kqueue.rb index fbbde8a..1eeb641 100644 --- a/lib/sleepy_penguin/kqueue.rb +++ b/lib/sleepy_penguin/kqueue.rb @@ -8,23 +8,28 @@ # Events registered to a Kqueue object cannot be shared across fork # due to the underlying implementation of kqueue in *BSDs. class SleepyPenguin::Kqueue - # Kqueue objects may be watched by IO.select and similar methods - attr_reader :to_io - def initialize - @to_io = SleepyPenguin::Kqueue::IO.new + @io = SleepyPenguin::Kqueue::IO.new @mtx = Mutex.new @pid = $$ - @copies = { @to_io => self } + @copies = { @io => self } + end + + # Kqueue objects may be watched by IO.select and similar methods + def to_io + @mtx.synchronize do + __kq_check + @io + end end def __kq_reinit # :nodoc: - @to_io = SleepyPenguin::Kqueue::IO.new + @io = SleepyPenguin::Kqueue::IO.new end def __kq_check # :nodoc: - return if @pid == $$ || @to_io.closed? - unless @to_io.respond_to?(:autoclose=) + return if @pid == $$ || @io.closed? + unless @io.respond_to?(:autoclose=) raise RuntimeError, "Kqueue is not safe to use without IO#autoclose=, upgrade to Ruby 1.9+" end @@ -35,7 +40,7 @@ def __kq_check # :nodoc: @copies.clear __kq_reinit objects.each do |obj| - io_dup = @to_io.dup + io_dup = @io.dup @copies[io_dup] = obj end @pid = $$ @@ -61,7 +66,7 @@ def kevent(changelist = nil, *args) end if block_given? - n = @to_io.kevent(changelist, *args) do |ident,filter,flags, + n = @io.kevent(changelist, *args) do |ident,filter,flags, fflags,data,udata| # This may raise and cause events to be lost, # that's the users' fault/problem @@ -70,7 +75,7 @@ def kevent(changelist = nil, *args) fflags, data, udata) end else - n = @to_io.kevent(changelist, *args) + n = @io.kevent(changelist, *args) end end @@ -78,9 +83,9 @@ def initialize_copy(src) # :nodoc: @mtx.synchronize do __kq_check rv = super - unless @to_io.closed? - @to_io = @to_io.dup - @copies[@to_io] = self + unless @io.closed? + @io = @io.dup + @copies[@io] = self end rv end @@ -93,8 +98,8 @@ def initialize_copy(src) # :nodoc: # Raises IOError if object is already closed. def close @mtx.synchronize do - @copies.delete(@to_io) - @to_io.close + @copies.delete(@io) + @io.close end end @@ -104,7 +109,7 @@ def close # Returns whether or not an Kqueue object is closed. def closed? @mtx.synchronize do - @to_io.closed? + @io.closed? end end end -- 1.8.2.1.367.gc875ca7