about summary refs log tree commit homepage
path: root/lib/rainbows/fiber
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2010-10-21 16:25:39 -0700
committerEric Wong <normalperson@yhbt.net>2010-10-22 18:37:45 +0000
commit15631717fce044fbad2f386a7b1c7daf4bdd83d2 (patch)
treef32c80aafb8b5fe13fefe9a1e3765dd757ccde7d /lib/rainbows/fiber
parentd4a2b5dd2b85f4b2d3bb120ee1e1b0dde31bc25c (diff)
downloadrainbows-15631717fce044fbad2f386a7b1c7daf4bdd83d2.tar.gz
Despite the large number of changes, most of it is code
movement here.
Diffstat (limited to 'lib/rainbows/fiber')
-rw-r--r--lib/rainbows/fiber/base.rb157
-rw-r--r--lib/rainbows/fiber/io.rb232
-rw-r--r--lib/rainbows/fiber/io/compat.rb10
-rw-r--r--lib/rainbows/fiber/io/methods.rb44
-rw-r--r--lib/rainbows/fiber/io/pipe.rb7
-rw-r--r--lib/rainbows/fiber/io/socket.rb7
-rw-r--r--lib/rainbows/fiber/rev.rb165
-rw-r--r--lib/rainbows/fiber/rev/heartbeat.rb8
-rw-r--r--lib/rainbows/fiber/rev/kato.rb22
-rw-r--r--lib/rainbows/fiber/rev/methods.rb48
-rw-r--r--lib/rainbows/fiber/rev/server.rb32
-rw-r--r--lib/rainbows/fiber/rev/sleeper.rb15
12 files changed, 392 insertions, 355 deletions
diff --git a/lib/rainbows/fiber/base.rb b/lib/rainbows/fiber/base.rb
index b3a4c89..b7c4ce5 100644
--- a/lib/rainbows/fiber/base.rb
+++ b/lib/rainbows/fiber/base.rb
@@ -2,103 +2,84 @@
 # :enddoc:
 require 'rainbows/fiber/io'
 
-module Rainbows
-  module Fiber
+module Rainbows::Fiber::Base
 
-    # blocked readers (key: fileno, value: Rainbows::Fiber::IO object)
-    RD = []
+  include Rainbows::Base
 
-    # blocked writers (key: fileno, value: Rainbows::Fiber::IO object)
-    WR = []
+  # :stopdoc:
+  RD = Rainbows::Fiber::RD
+  WR = Rainbows::Fiber::WR
+  ZZ = Rainbows::Fiber::ZZ
+  # :startdoc:
 
-    # sleeping fibers go here (key: Fiber object, value: wakeup time)
-    ZZ = {}.compare_by_identity
+  # the scheduler method that powers both FiberSpawn and FiberPool
+  # concurrency models.  It times out idle clients and attempts to
+  # schedules ones that were blocked on I/O.  At most it'll sleep
+  # for one second (returned by the schedule_sleepers method) which
+  # will cause it.
+  def schedule(&block)
+    ret = begin
+      G.tick
+      RD.compact.each { |c| c.f.resume } # attempt to time out idle clients
+      t = schedule_sleepers
+      Kernel.select(RD.compact.concat(LISTENERS), WR.compact, nil, t) or return
+    rescue Errno::EINTR
+      retry
+    rescue Errno::EBADF, TypeError
+      LISTENERS.compact!
+      raise
+    end or return
 
-    # puts the current Fiber into uninterruptible sleep for at least
-    # +seconds+.  Unlike Kernel#sleep, this it is not possible to sleep
-    # indefinitely to be woken up (nobody wants that in a web server,
-    # right?).  Calling this directly is deprecated, use
-    # Rainbows.sleep(seconds) instead.
-    def self.sleep(seconds)
-      ZZ[::Fiber.current] = Time.now + seconds
-      ::Fiber.yield
-    end
-
-    # base module used by FiberSpawn and FiberPool
-    module Base
-      include Rainbows::Base
-
-      # the scheduler method that powers both FiberSpawn and FiberPool
-      # concurrency models.  It times out idle clients and attempts to
-      # schedules ones that were blocked on I/O.  At most it'll sleep
-      # for one second (returned by the schedule_sleepers method) which
-      # will cause it.
-      def schedule(&block)
-        ret = begin
-          G.tick
-          RD.compact.each { |c| c.f.resume } # attempt to time out idle clients
-          t = schedule_sleepers
-          Kernel.select(RD.compact.concat(LISTENERS),
-                        WR.compact, nil, t) or return
-        rescue Errno::EINTR
-          retry
-        rescue Errno::EBADF, TypeError
-          LISTENERS.compact!
-          raise
-        end or return
-
-        # active writers first, then _all_ readers for keepalive timeout
-        ret[1].concat(RD.compact).each { |c| c.f.resume }
+    # active writers first, then _all_ readers for keepalive timeout
+    ret[1].concat(RD.compact).each { |c| c.f.resume }
 
-        # accept is an expensive syscall, filter out listeners we don't want
-        (ret[0] & LISTENERS).each(&block)
-      end
+    # accept is an expensive syscall, filter out listeners we don't want
+    (ret[0] & LISTENERS).each(&block)
+  end
 
-      # wakes up any sleepers that need to be woken and
-      # returns an interval to IO.select on
-      def schedule_sleepers
-        max = nil
-        now = Time.now
-        fibs = []
-        ZZ.delete_if { |fib, time|
-          if now >= time
-            fibs << fib
-          else
-            max = time
-            false
-          end
-        }
-        fibs.each { |fib| fib.resume }
-        now = Time.now
-        max.nil? || max > (now + 1) ? 1 : max - now
+  # wakes up any sleepers that need to be woken and
+  # returns an interval to IO.select on
+  def schedule_sleepers
+    max = nil
+    now = Time.now
+    fibs = []
+    ZZ.delete_if { |fib, time|
+      if now >= time
+        fibs << fib
+      else
+        max = time
+        false
       end
+    }
+    fibs.each { |fib| fib.resume }
+    now = Time.now
+    max.nil? || max > (now + 1) ? 1 : max - now
+  end
 
-      def wait_headers_readable(client)
-        io = client.to_io
-        expire = nil
-        begin
-          return io.recv_nonblock(1, Socket::MSG_PEEK)
-        rescue Errno::EAGAIN
-          return if expire && expire < Time.now
-          expire ||= Time.now + G.kato
-          client.wait_readable
-          retry
-        end
-      end
+  def wait_headers_readable(client)
+    io = client.to_io
+    expire = nil
+    begin
+      return io.recv_nonblock(1, Socket::MSG_PEEK)
+    rescue Errno::EAGAIN
+      return if expire && expire < Time.now
+      expire ||= Time.now + G.kato
+      client.wait_readable
+      retry
+    end
+  end
 
-      def process_client(client)
-        G.cur += 1
-        super(client) # see Rainbows::Base
-      ensure
-        G.cur -= 1
-        ZZ.delete(client.f)
-      end
+  def process(client)
+    G.cur += 1
+    process_client(client)
+  ensure
+    G.cur -= 1
+    ZZ.delete(client.f)
+  end
 
-      def self.setup(klass, app)
-        require 'rainbows/fiber/body'
-        klass.__send__(:include, Rainbows::Fiber::Body)
-        self.const_set(:APP, app)
-      end
-    end
+  def self.setup(klass, app)
+    require 'rainbows/fiber/body'
+    klass.__send__(:include, Rainbows::Fiber::Body)
+    self.const_set(:APP, app)
   end
 end
diff --git a/lib/rainbows/fiber/io.rb b/lib/rainbows/fiber/io.rb
index 571f070..711d95e 100644
--- a/lib/rainbows/fiber/io.rb
+++ b/lib/rainbows/fiber/io.rb
@@ -1,117 +1,133 @@
 # -*- encoding: binary -*-
-module Rainbows
-  module Fiber
-
-    # A partially complete IO wrapper, this exports an IO.select()-able
-    # #to_io method and gives users the illusion of a synchronous
-    # interface that yields away from the current Fiber whenever
-    # the underlying IO object cannot read or write
-    #
-    # TODO: subclass off IO and include Kgio::SocketMethods instead
-    class IO < Struct.new(:to_io, :f)
-      # :stopdoc:
-      LOCALHOST = Kgio::LOCALHOST
-
-      # needed to write errors with
-      def write_nonblock(buf)
-        to_io.write_nonblock(buf)
-      end
-
-      def kgio_addr
-        to_io.kgio_addr
-      end
-
-      # for wrapping output response bodies
-      def each(&block)
-        if buf = readpartial(16384)
-          yield buf
-          yield buf while readpartial(16384, buf)
+
+# A Fiber-aware IO class, gives users the illusion of a synchronous
+# interface that yields away from the current Fiber whenever
+# the underlying descriptor is blocked on reads or write
+#
+# This is a stable, legacy interface and should be preserved for all
+# future versions of Rainbows!  However, new apps should use
+# Rainbows::Fiber::IO::Socket or Rainbows::Fiber::IO::Pipe instead.
+
+class Rainbows::Fiber::IO
+  attr_accessor :to_io
+
+  # :stopdoc:
+  class << self
+    alias :[] :new
+  end
+  # :startdoc:
+
+  # needed to write errors with
+  def write_nonblock(buf)
+    @to_io.write_nonblock(buf)
+  end
+
+  def kgio_addr
+    @to_io.kgio_addr
+  end
+
+  # for wrapping output response bodies
+  def each(&block)
+    buf = readpartial(16384)
+    yield buf
+    yield buf while readpartial(16384, buf)
+    rescue EOFError
+      self
+  end
+
+  def closed?
+    @to_io.closed?
+  end
+
+  def fileno
+    @to_io.fileno
+  end
+
+  def write(buf)
+    if @to_io.respond_to?(:kgio_trywrite)
+      begin
+        case rv = @to_io.kgio_trywrite(buf)
+        when nil
+          return
+        when String
+          buf = rv
+        when Kgio::WaitWritable
+          wait_writable
         end
-        rescue EOFError
-        self
-      end
-
-      def close
-        fileno = to_io.fileno
-        RD[fileno] = WR[fileno] = nil
-        to_io.close unless to_io.closed?
-      end
-
-      def closed?
-        to_io.closed?
-      end
-
-      def wait_readable
-        fileno = to_io.fileno
-        RD[fileno] = self
-        ::Fiber.yield
-        RD[fileno] = nil
-      end
-
-      def wait_writable
-        fileno = to_io.fileno
-        WR[fileno] = self
-        ::Fiber.yield
-        WR[fileno] = nil
-      end
-
-      def write(buf)
-        begin
-          case rv = to_io.kgio_trywrite(buf)
-          when nil
-            return
-          when String
-            buf = rv
-          when Kgio::WaitWritable
-            wait_writable
-          end
-        end while true
-      end
-
-      # used for reading headers (respecting keepalive_timeout)
-      def read_timeout
-        expire = nil
-        begin
-          to_io.read_nonblock(16384)
-        rescue Errno::EAGAIN
-          return if expire && expire < Time.now
-          expire ||= Time.now + G.kato
+      end while true
+    else
+      begin
+        (rv = @to_io.write_nonblock(buf)) == buf.bytesize and return
+        buf = byte_slice(buf, rv..-1)
+      rescue Errno::EAGAIN
+        wait_writable
+      end while true
+    end
+  end
+
+  def byte_slice(buf, range) # :nodoc:
+    if buf.encoding != Encoding::BINARY
+      buf.dup.force_encoding(Encoding::BINARY)[range]
+    else
+      buf[range]
+    end
+  end
+
+  # used for reading headers (respecting keepalive_timeout)
+  def read_timeout
+    expire = nil
+    begin
+      return @to_io.read_nonblock(16384)
+    rescue Errno::EAGAIN
+      return if expire && expire < Time.now
+      expire ||= Time.now + G.kato
+      wait_readable
+    end while true
+  end
+
+  def readpartial(length, buf = "")
+    if @to_io.respond_to?(:kgio_tryread)
+      begin
+        rv = @to_io.kgio_tryread(length, buf)
+        case rv
+        when nil
+          raise EOFError, "end of file reached", []
+        when Kgio::WaitReadable
           wait_readable
-          retry
-        end
-      end
-
-      def readpartial(length, buf = "")
-        if to_io.respond_to?(:kgio_tryread)
-          # TODO: use kgio_read!
-          begin
-            rv = to_io.kgio_tryread(length, buf)
-            case rv
-            when nil
-              raise EOFError, "end of file reached", []
-            when Kgio::WaitReadable
-              wait_readable
-            else
-              return rv
-            end
-          end while true
         else
-          begin
-            to_io.read_nonblock(length, buf)
-          rescue Errno::EAGAIN
-            wait_readable
-            retry
-          end
+          return rv
         end
-      end
+      end while true
+    else
+      begin
+        return @to_io.read_nonblock(length, buf)
+      rescue Errno::EAGAIN
+        wait_readable
+      end while true
+    end
+  end
 
-      def kgio_read(*args)
-        to_io.kgio_read(*args)
-      end
+  def kgio_read(*args)
+    @to_io.kgio_read(*args)
+  end
 
-      def kgio_read!(*args)
-        to_io.kgio_read!(*args)
-      end
-    end
+  def kgio_read!(*args)
+    @to_io.kgio_read!(*args)
   end
+
+  def kgio_trywrite(*args)
+    @to_io.kgio_trywrite(*args)
+  end
+
+  autoload :Socket, 'rainbows/fiber/io/socket'
+  autoload :Pipe, 'rainbows/fiber/io/pipe'
 end
+
+# :stopdoc:
+require 'rainbows/fiber/io/methods'
+require 'rainbows/fiber/io/compat'
+Rainbows::Client.__send__(:include, Rainbows::Fiber::IO::Methods)
+Rainbows::Fiber::IO.__send__(:include, Rainbows::Fiber::IO::Compat)
+Rainbows::Fiber::IO.__send__(:include, Rainbows::Fiber::IO::Methods)
+Kgio.wait_readable = :wait_readable
+Kgio.wait_writable = :wait_writable
diff --git a/lib/rainbows/fiber/io/compat.rb b/lib/rainbows/fiber/io/compat.rb
new file mode 100644
index 0000000..2aaf416
--- /dev/null
+++ b/lib/rainbows/fiber/io/compat.rb
@@ -0,0 +1,10 @@
+# -*- encoding: binary -*-
+module Rainbows::Fiber::IO::Compat
+  def initialize(io, fiber = Fiber.current)
+    @to_io, @f = io, fiber
+  end
+
+  def close
+    @to_io.close
+  end
+end
diff --git a/lib/rainbows/fiber/io/methods.rb b/lib/rainbows/fiber/io/methods.rb
new file mode 100644
index 0000000..663fdb4
--- /dev/null
+++ b/lib/rainbows/fiber/io/methods.rb
@@ -0,0 +1,44 @@
+# -*- encoding: binary -*-
+
+module Rainbows::Fiber::IO::Methods
+  RD = Rainbows::Fiber::RD
+  WR = Rainbows::Fiber::WR
+  attr_accessor :f
+
+  # for wrapping output response bodies
+  def each(&block)
+    if buf = kgio_read(16384)
+      yield buf
+      yield buf while kgio_read(16384, buf)
+    end
+    self
+  end
+
+  def close
+    fd = fileno
+    RD[fd] = WR[fd] = nil
+    super
+  end
+
+  def wait_readable
+    fd = fileno
+    @f = Fiber.current
+    RD[fd] = self
+    Fiber.yield
+    RD[fd] = nil
+  end
+
+  def wait_writable
+    fd = fileno
+    @f = Fiber.current
+    WR[fd] = self
+    Fiber.yield
+    WR[fd] = nil
+  end
+
+  def self.included(klass)
+    if klass.method_defined?(:kgio_write)
+      klass.__send__(:alias_method, :write, :kgio_write)
+    end
+  end
+end
diff --git a/lib/rainbows/fiber/io/pipe.rb b/lib/rainbows/fiber/io/pipe.rb
new file mode 100644
index 0000000..c7ae508
--- /dev/null
+++ b/lib/rainbows/fiber/io/pipe.rb
@@ -0,0 +1,7 @@
+# -*- encoding: binary -*-
+# A Fiber-aware Pipe class, gives users the illusion of a synchronous
+# interface that yields away from the current Fiber whenever
+# the underlying descriptor is blocked on reads or write
+class Rainbows::Fiber::IO::Pipe < Kgio::Pipe
+  include Rainbows::Fiber::IO::Methods
+end
diff --git a/lib/rainbows/fiber/io/socket.rb b/lib/rainbows/fiber/io/socket.rb
new file mode 100644
index 0000000..61c451d
--- /dev/null
+++ b/lib/rainbows/fiber/io/socket.rb
@@ -0,0 +1,7 @@
+# -*- encoding: binary -*-
+# A Fiber-aware Socket class, gives users the illusion of a synchronous
+# interface that yields away from the current Fiber whenever
+# the underlying descriptor is blocked on reads or write
+class Rainbows::Fiber::IO::Socket < Kgio::Socket
+  include Rainbows::Fiber::IO::Methods
+end
diff --git a/lib/rainbows/fiber/rev.rb b/lib/rainbows/fiber/rev.rb
index 6969f5b..85d1c5f 100644
--- a/lib/rainbows/fiber/rev.rb
+++ b/lib/rainbows/fiber/rev.rb
@@ -4,163 +4,10 @@ require 'rev'
 require 'rainbows/fiber'
 require 'rainbows/fiber/io'
 
-module Rainbows::Fiber
-  module Rev
-    G = Rainbows::G
-
-    # keep-alive timeout class
-    class Kato < ::Rev::TimerWatcher
-      def initialize
-        @watch = []
-        super(1, true)
-      end
-
-      def <<(fiber)
-        @watch << fiber
-        enable unless enabled?
-      end
-
-      def on_timer
-        @watch.uniq!
-        while f = @watch.shift
-          f.resume if f.alive?
-        end
-        disable
-      end
-    end
-
-    class Heartbeat < ::Rev::TimerWatcher
-      def on_timer
-        exit if (! G.tick && G.cur <= 0)
-      end
-    end
-
-    class Sleeper < ::Rev::TimerWatcher
-
-      def initialize(seconds)
-        @f = ::Fiber.current
-        super(seconds, false)
-        attach(::Rev::Loop.default)
-        ::Fiber.yield
-      end
-
-      def on_timer
-        @f.resume
-      end
-    end
-
-    class Server < ::Rev::IOWatcher
-      include Unicorn
-      include Rainbows
-      include Rainbows::Const
-      include Rainbows::Response
-      FIO = Rainbows::Fiber::IO
-
-      def to_io
-        @io
-      end
-
-      def initialize(io)
-        @io = io
-        super(self, :r)
-      end
-
-      def close
-        detach if attached?
-        @io.close
-      end
-
-      def on_readable
-        return if G.cur >= MAX
-        c = @io.kgio_tryaccept and ::Fiber.new { process(c) }.resume
-      end
-
-      def process(io)
-        G.cur += 1
-        client = FIO.new(io, ::Fiber.current)
-        hp = HttpParser.new
-        client.readpartial(16384, buf = hp.buf)
-
-        begin # loop
-          until env = hp.parse
-            buf << (client.read_timeout or return)
-          end
-
-          env[CLIENT_IO] = client
-          env[RACK_INPUT] = 0 == hp.content_length ?
-                    HttpRequest::NULL_IO : TeeInput.new(client, hp)
-          env[REMOTE_ADDR] = io.kgio_addr
-          status, headers, body = APP.call(env.update(RACK_DEFAULTS))
-
-          if 100 == status.to_i
-            client.write(EXPECT_100_RESPONSE)
-            env.delete(HTTP_EXPECT)
-            status, headers, body = APP.call(env)
-          end
-
-          if hp.headers?
-            headers = HH.new(headers)
-            range = make_range!(env, status, headers) and status = range.shift
-            env = hp.keepalive? && G.alive
-            headers[CONNECTION] = env ? KEEP_ALIVE : CLOSE
-            client.write(response_header(status, headers))
-          end
-          write_body(client, body, range)
-        end while env && hp.reset.nil?
-      rescue => e
-        Error.write(io, e)
-      ensure
-        G.cur -= 1
-        client.close
-      end
-    end
-  end
-
-  class IO # see rainbows/fiber/io for original definition
-
-    class Watcher < ::Rev::IOWatcher
-      def initialize(fio, flag)
-        @fiber = fio.f
-        super(fio, flag)
-        attach(::Rev::Loop.default)
-      end
-
-      def on_readable
-        @fiber.resume
-      end
-
-      alias on_writable on_readable
-    end
-
-    undef_method :wait_readable
-    undef_method :wait_writable
-    undef_method :close
-
-    def initialize(*args)
-      super
-      @r = @w = false
-    end
-
-    def close
-      @w.detach if @w
-      @r.detach if @r
-      @r = @w = false
-      to_io.close unless to_io.closed?
-    end
-
-    def wait_writable
-      @w ||= Watcher.new(self, :w)
-      @w.enable unless @w.enabled?
-      ::Fiber.yield
-      @w.disable
-    end
-
-    def wait_readable
-      @r ||= Watcher.new(self, :r)
-      @r.enable unless @r.enabled?
-      KATO << f
-      ::Fiber.yield
-      @r.disable
-    end
-  end
+module Rainbows::Fiber::Rev
+  autoload :Heartbeat, 'rainbows/fiber/rev/heartbeat'
+  autoload :Kato, 'rainbows/fiber/rev/kato'
+  autoload :Server, 'rainbows/fiber/rev/server'
+  autoload :Sleeper, 'rainbows/fiber/rev/sleeper'
 end
+require 'rainbows/fiber/rev/methods'
diff --git a/lib/rainbows/fiber/rev/heartbeat.rb b/lib/rainbows/fiber/rev/heartbeat.rb
new file mode 100644
index 0000000..9411b4a
--- /dev/null
+++ b/lib/rainbows/fiber/rev/heartbeat.rb
@@ -0,0 +1,8 @@
+# -*- encoding: binary -*-
+# :enddoc:
+class Rainbows::Fiber::Rev::Heartbeat < Rev::TimerWatcher
+  G = Rainbows::G
+  def on_timer
+    exit if (! G.tick && G.cur <= 0)
+  end
+end
diff --git a/lib/rainbows/fiber/rev/kato.rb b/lib/rainbows/fiber/rev/kato.rb
new file mode 100644
index 0000000..056b6ef
--- /dev/null
+++ b/lib/rainbows/fiber/rev/kato.rb
@@ -0,0 +1,22 @@
+# -*- encoding: binary -*-
+# :enddoc:
+# keep-alive timeout class
+class Rainbows::Fiber::Rev::Kato < Rev::TimerWatcher
+  def initialize
+    @watch = []
+    super(1, true)
+  end
+
+  def <<(fiber)
+    @watch << fiber
+    enable unless enabled?
+  end
+
+  def on_timer
+    @watch.uniq!
+    while f = @watch.shift
+      f.resume if f.alive?
+    end
+    disable
+  end
+end
diff --git a/lib/rainbows/fiber/rev/methods.rb b/lib/rainbows/fiber/rev/methods.rb
new file mode 100644
index 0000000..5f4367e
--- /dev/null
+++ b/lib/rainbows/fiber/rev/methods.rb
@@ -0,0 +1,48 @@
+# -*- encoding: binary -*-
+# :enddoc:
+module Rainbows::Fiber::Rev::Methods
+  class Watcher < Rev::IOWatcher
+    def initialize(fio, flag)
+      @f = fio.f || Fiber.current
+      super(fio, flag)
+      attach(Rev::Loop.default)
+    end
+
+    def on_readable
+      @f.resume
+    end
+
+    alias on_writable on_readable
+  end
+
+  def initialize(*args)
+    @f = Fiber.current
+    super(*args)
+    @r = @w = false
+  end
+
+  def close
+    @w.detach if @w
+    @r.detach if @r
+    @r = @w = false
+    super
+  end
+
+  def wait_writable
+    @w ||= Watcher.new(self, :w)
+    @w.enable unless @w.enabled?
+    Fiber.yield
+    @w.disable
+  end
+
+  def wait_readable
+    @r ||= Watcher.new(self, :r)
+    @r.enable unless @r.enabled?
+    KATO << @f
+    Fiber.yield
+    @r.disable
+  end
+end
+
+Rainbows::Fiber::IO.__send__(:include, Rainbows::Fiber::Rev::Methods)
+Rainbows::Client.__send__(:include, Rainbows::Fiber::Rev::Methods)
diff --git a/lib/rainbows/fiber/rev/server.rb b/lib/rainbows/fiber/rev/server.rb
new file mode 100644
index 0000000..9998cde
--- /dev/null
+++ b/lib/rainbows/fiber/rev/server.rb
@@ -0,0 +1,32 @@
+# -*- encoding: binary -*-
+# :enddoc:
+class Rainbows::Fiber::Rev::Server < Rev::IOWatcher
+  G = Rainbows::G
+  include Rainbows::ProcessClient
+
+  def to_io
+    @io
+  end
+
+  def initialize(io)
+    @io = io
+    super(self, :r)
+  end
+
+  def close
+    detach if attached?
+    @io.close
+  end
+
+  def on_readable
+    return if G.cur >= MAX
+    c = @io.kgio_tryaccept and Fiber.new { process(c) }.resume
+  end
+
+  def process(io)
+    G.cur += 1
+    process_client(io)
+  ensure
+    G.cur -= 1
+  end
+end
diff --git a/lib/rainbows/fiber/rev/sleeper.rb b/lib/rainbows/fiber/rev/sleeper.rb
new file mode 100644
index 0000000..51f4527
--- /dev/null
+++ b/lib/rainbows/fiber/rev/sleeper.rb
@@ -0,0 +1,15 @@
+# -*- encoding: binary -*-
+# :enddoc:
+class Rainbows::Fiber::Rev::Sleeper < Rev::TimerWatcher
+
+  def initialize(seconds)
+    @f = Fiber.current
+    super(seconds, false)
+    attach(Rev::Loop.default)
+    Fiber.yield
+  end
+
+  def on_timer
+    @f.resume
+  end
+end