From 15631717fce044fbad2f386a7b1c7daf4bdd83d2 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Thu, 21 Oct 2010 16:25:39 -0700 Subject: code shuffling for kgio Despite the large number of changes, most of it is code movement here. --- lib/rainbows/fiber/base.rb | 157 +++++++++++------------- lib/rainbows/fiber/io.rb | 232 +++++++++++++++++++----------------- lib/rainbows/fiber/io/compat.rb | 10 ++ lib/rainbows/fiber/io/methods.rb | 44 +++++++ lib/rainbows/fiber/io/pipe.rb | 7 ++ lib/rainbows/fiber/io/socket.rb | 7 ++ lib/rainbows/fiber/rev.rb | 165 +------------------------ lib/rainbows/fiber/rev/heartbeat.rb | 8 ++ lib/rainbows/fiber/rev/kato.rb | 22 ++++ lib/rainbows/fiber/rev/methods.rb | 48 ++++++++ lib/rainbows/fiber/rev/server.rb | 32 +++++ lib/rainbows/fiber/rev/sleeper.rb | 15 +++ 12 files changed, 392 insertions(+), 355 deletions(-) create mode 100644 lib/rainbows/fiber/io/compat.rb create mode 100644 lib/rainbows/fiber/io/methods.rb create mode 100644 lib/rainbows/fiber/io/pipe.rb create mode 100644 lib/rainbows/fiber/io/socket.rb create mode 100644 lib/rainbows/fiber/rev/heartbeat.rb create mode 100644 lib/rainbows/fiber/rev/kato.rb create mode 100644 lib/rainbows/fiber/rev/methods.rb create mode 100644 lib/rainbows/fiber/rev/server.rb create mode 100644 lib/rainbows/fiber/rev/sleeper.rb (limited to 'lib/rainbows/fiber') diff --git a/lib/rainbows/fiber/base.rb b/lib/rainbows/fiber/base.rb index b3a4c89..b7c4ce5 100644 --- a/lib/rainbows/fiber/base.rb +++ b/lib/rainbows/fiber/base.rb @@ -2,103 +2,84 @@ # :enddoc: require 'rainbows/fiber/io' -module Rainbows - module Fiber +module Rainbows::Fiber::Base - # blocked readers (key: fileno, value: Rainbows::Fiber::IO object) - RD = [] + include Rainbows::Base - # blocked writers (key: fileno, value: Rainbows::Fiber::IO object) - WR = [] + # :stopdoc: + RD = Rainbows::Fiber::RD + WR = Rainbows::Fiber::WR + ZZ = Rainbows::Fiber::ZZ + # :startdoc: - # sleeping fibers go here (key: Fiber object, value: wakeup time) - ZZ = {}.compare_by_identity + # the scheduler method that powers both FiberSpawn and FiberPool + # concurrency models. It times out idle clients and attempts to + # schedules ones that were blocked on I/O. At most it'll sleep + # for one second (returned by the schedule_sleepers method) which + # will cause it. + def schedule(&block) + ret = begin + G.tick + RD.compact.each { |c| c.f.resume } # attempt to time out idle clients + t = schedule_sleepers + Kernel.select(RD.compact.concat(LISTENERS), WR.compact, nil, t) or return + rescue Errno::EINTR + retry + rescue Errno::EBADF, TypeError + LISTENERS.compact! + raise + end or return - # puts the current Fiber into uninterruptible sleep for at least - # +seconds+. Unlike Kernel#sleep, this it is not possible to sleep - # indefinitely to be woken up (nobody wants that in a web server, - # right?). Calling this directly is deprecated, use - # Rainbows.sleep(seconds) instead. - def self.sleep(seconds) - ZZ[::Fiber.current] = Time.now + seconds - ::Fiber.yield - end - - # base module used by FiberSpawn and FiberPool - module Base - include Rainbows::Base - - # the scheduler method that powers both FiberSpawn and FiberPool - # concurrency models. It times out idle clients and attempts to - # schedules ones that were blocked on I/O. At most it'll sleep - # for one second (returned by the schedule_sleepers method) which - # will cause it. - def schedule(&block) - ret = begin - G.tick - RD.compact.each { |c| c.f.resume } # attempt to time out idle clients - t = schedule_sleepers - Kernel.select(RD.compact.concat(LISTENERS), - WR.compact, nil, t) or return - rescue Errno::EINTR - retry - rescue Errno::EBADF, TypeError - LISTENERS.compact! - raise - end or return - - # active writers first, then _all_ readers for keepalive timeout - ret[1].concat(RD.compact).each { |c| c.f.resume } + # active writers first, then _all_ readers for keepalive timeout + ret[1].concat(RD.compact).each { |c| c.f.resume } - # accept is an expensive syscall, filter out listeners we don't want - (ret[0] & LISTENERS).each(&block) - end + # accept is an expensive syscall, filter out listeners we don't want + (ret[0] & LISTENERS).each(&block) + end - # wakes up any sleepers that need to be woken and - # returns an interval to IO.select on - def schedule_sleepers - max = nil - now = Time.now - fibs = [] - ZZ.delete_if { |fib, time| - if now >= time - fibs << fib - else - max = time - false - end - } - fibs.each { |fib| fib.resume } - now = Time.now - max.nil? || max > (now + 1) ? 1 : max - now + # wakes up any sleepers that need to be woken and + # returns an interval to IO.select on + def schedule_sleepers + max = nil + now = Time.now + fibs = [] + ZZ.delete_if { |fib, time| + if now >= time + fibs << fib + else + max = time + false end + } + fibs.each { |fib| fib.resume } + now = Time.now + max.nil? || max > (now + 1) ? 1 : max - now + end - def wait_headers_readable(client) - io = client.to_io - expire = nil - begin - return io.recv_nonblock(1, Socket::MSG_PEEK) - rescue Errno::EAGAIN - return if expire && expire < Time.now - expire ||= Time.now + G.kato - client.wait_readable - retry - end - end + def wait_headers_readable(client) + io = client.to_io + expire = nil + begin + return io.recv_nonblock(1, Socket::MSG_PEEK) + rescue Errno::EAGAIN + return if expire && expire < Time.now + expire ||= Time.now + G.kato + client.wait_readable + retry + end + end - def process_client(client) - G.cur += 1 - super(client) # see Rainbows::Base - ensure - G.cur -= 1 - ZZ.delete(client.f) - end + def process(client) + G.cur += 1 + process_client(client) + ensure + G.cur -= 1 + ZZ.delete(client.f) + end - def self.setup(klass, app) - require 'rainbows/fiber/body' - klass.__send__(:include, Rainbows::Fiber::Body) - self.const_set(:APP, app) - end - end + def self.setup(klass, app) + require 'rainbows/fiber/body' + klass.__send__(:include, Rainbows::Fiber::Body) + self.const_set(:APP, app) end end diff --git a/lib/rainbows/fiber/io.rb b/lib/rainbows/fiber/io.rb index 571f070..711d95e 100644 --- a/lib/rainbows/fiber/io.rb +++ b/lib/rainbows/fiber/io.rb @@ -1,117 +1,133 @@ # -*- encoding: binary -*- -module Rainbows - module Fiber - - # A partially complete IO wrapper, this exports an IO.select()-able - # #to_io method and gives users the illusion of a synchronous - # interface that yields away from the current Fiber whenever - # the underlying IO object cannot read or write - # - # TODO: subclass off IO and include Kgio::SocketMethods instead - class IO < Struct.new(:to_io, :f) - # :stopdoc: - LOCALHOST = Kgio::LOCALHOST - - # needed to write errors with - def write_nonblock(buf) - to_io.write_nonblock(buf) - end - - def kgio_addr - to_io.kgio_addr - end - - # for wrapping output response bodies - def each(&block) - if buf = readpartial(16384) - yield buf - yield buf while readpartial(16384, buf) + +# A Fiber-aware IO class, gives users the illusion of a synchronous +# interface that yields away from the current Fiber whenever +# the underlying descriptor is blocked on reads or write +# +# This is a stable, legacy interface and should be preserved for all +# future versions of Rainbows! However, new apps should use +# Rainbows::Fiber::IO::Socket or Rainbows::Fiber::IO::Pipe instead. + +class Rainbows::Fiber::IO + attr_accessor :to_io + + # :stopdoc: + class << self + alias :[] :new + end + # :startdoc: + + # needed to write errors with + def write_nonblock(buf) + @to_io.write_nonblock(buf) + end + + def kgio_addr + @to_io.kgio_addr + end + + # for wrapping output response bodies + def each(&block) + buf = readpartial(16384) + yield buf + yield buf while readpartial(16384, buf) + rescue EOFError + self + end + + def closed? + @to_io.closed? + end + + def fileno + @to_io.fileno + end + + def write(buf) + if @to_io.respond_to?(:kgio_trywrite) + begin + case rv = @to_io.kgio_trywrite(buf) + when nil + return + when String + buf = rv + when Kgio::WaitWritable + wait_writable end - rescue EOFError - self - end - - def close - fileno = to_io.fileno - RD[fileno] = WR[fileno] = nil - to_io.close unless to_io.closed? - end - - def closed? - to_io.closed? - end - - def wait_readable - fileno = to_io.fileno - RD[fileno] = self - ::Fiber.yield - RD[fileno] = nil - end - - def wait_writable - fileno = to_io.fileno - WR[fileno] = self - ::Fiber.yield - WR[fileno] = nil - end - - def write(buf) - begin - case rv = to_io.kgio_trywrite(buf) - when nil - return - when String - buf = rv - when Kgio::WaitWritable - wait_writable - end - end while true - end - - # used for reading headers (respecting keepalive_timeout) - def read_timeout - expire = nil - begin - to_io.read_nonblock(16384) - rescue Errno::EAGAIN - return if expire && expire < Time.now - expire ||= Time.now + G.kato + end while true + else + begin + (rv = @to_io.write_nonblock(buf)) == buf.bytesize and return + buf = byte_slice(buf, rv..-1) + rescue Errno::EAGAIN + wait_writable + end while true + end + end + + def byte_slice(buf, range) # :nodoc: + if buf.encoding != Encoding::BINARY + buf.dup.force_encoding(Encoding::BINARY)[range] + else + buf[range] + end + end + + # used for reading headers (respecting keepalive_timeout) + def read_timeout + expire = nil + begin + return @to_io.read_nonblock(16384) + rescue Errno::EAGAIN + return if expire && expire < Time.now + expire ||= Time.now + G.kato + wait_readable + end while true + end + + def readpartial(length, buf = "") + if @to_io.respond_to?(:kgio_tryread) + begin + rv = @to_io.kgio_tryread(length, buf) + case rv + when nil + raise EOFError, "end of file reached", [] + when Kgio::WaitReadable wait_readable - retry - end - end - - def readpartial(length, buf = "") - if to_io.respond_to?(:kgio_tryread) - # TODO: use kgio_read! - begin - rv = to_io.kgio_tryread(length, buf) - case rv - when nil - raise EOFError, "end of file reached", [] - when Kgio::WaitReadable - wait_readable - else - return rv - end - end while true else - begin - to_io.read_nonblock(length, buf) - rescue Errno::EAGAIN - wait_readable - retry - end + return rv end - end + end while true + else + begin + return @to_io.read_nonblock(length, buf) + rescue Errno::EAGAIN + wait_readable + end while true + end + end - def kgio_read(*args) - to_io.kgio_read(*args) - end + def kgio_read(*args) + @to_io.kgio_read(*args) + end - def kgio_read!(*args) - to_io.kgio_read!(*args) - end - end + def kgio_read!(*args) + @to_io.kgio_read!(*args) end + + def kgio_trywrite(*args) + @to_io.kgio_trywrite(*args) + end + + autoload :Socket, 'rainbows/fiber/io/socket' + autoload :Pipe, 'rainbows/fiber/io/pipe' end + +# :stopdoc: +require 'rainbows/fiber/io/methods' +require 'rainbows/fiber/io/compat' +Rainbows::Client.__send__(:include, Rainbows::Fiber::IO::Methods) +Rainbows::Fiber::IO.__send__(:include, Rainbows::Fiber::IO::Compat) +Rainbows::Fiber::IO.__send__(:include, Rainbows::Fiber::IO::Methods) +Kgio.wait_readable = :wait_readable +Kgio.wait_writable = :wait_writable diff --git a/lib/rainbows/fiber/io/compat.rb b/lib/rainbows/fiber/io/compat.rb new file mode 100644 index 0000000..2aaf416 --- /dev/null +++ b/lib/rainbows/fiber/io/compat.rb @@ -0,0 +1,10 @@ +# -*- encoding: binary -*- +module Rainbows::Fiber::IO::Compat + def initialize(io, fiber = Fiber.current) + @to_io, @f = io, fiber + end + + def close + @to_io.close + end +end diff --git a/lib/rainbows/fiber/io/methods.rb b/lib/rainbows/fiber/io/methods.rb new file mode 100644 index 0000000..663fdb4 --- /dev/null +++ b/lib/rainbows/fiber/io/methods.rb @@ -0,0 +1,44 @@ +# -*- encoding: binary -*- + +module Rainbows::Fiber::IO::Methods + RD = Rainbows::Fiber::RD + WR = Rainbows::Fiber::WR + attr_accessor :f + + # for wrapping output response bodies + def each(&block) + if buf = kgio_read(16384) + yield buf + yield buf while kgio_read(16384, buf) + end + self + end + + def close + fd = fileno + RD[fd] = WR[fd] = nil + super + end + + def wait_readable + fd = fileno + @f = Fiber.current + RD[fd] = self + Fiber.yield + RD[fd] = nil + end + + def wait_writable + fd = fileno + @f = Fiber.current + WR[fd] = self + Fiber.yield + WR[fd] = nil + end + + def self.included(klass) + if klass.method_defined?(:kgio_write) + klass.__send__(:alias_method, :write, :kgio_write) + end + end +end diff --git a/lib/rainbows/fiber/io/pipe.rb b/lib/rainbows/fiber/io/pipe.rb new file mode 100644 index 0000000..c7ae508 --- /dev/null +++ b/lib/rainbows/fiber/io/pipe.rb @@ -0,0 +1,7 @@ +# -*- encoding: binary -*- +# A Fiber-aware Pipe class, gives users the illusion of a synchronous +# interface that yields away from the current Fiber whenever +# the underlying descriptor is blocked on reads or write +class Rainbows::Fiber::IO::Pipe < Kgio::Pipe + include Rainbows::Fiber::IO::Methods +end diff --git a/lib/rainbows/fiber/io/socket.rb b/lib/rainbows/fiber/io/socket.rb new file mode 100644 index 0000000..61c451d --- /dev/null +++ b/lib/rainbows/fiber/io/socket.rb @@ -0,0 +1,7 @@ +# -*- encoding: binary -*- +# A Fiber-aware Socket class, gives users the illusion of a synchronous +# interface that yields away from the current Fiber whenever +# the underlying descriptor is blocked on reads or write +class Rainbows::Fiber::IO::Socket < Kgio::Socket + include Rainbows::Fiber::IO::Methods +end diff --git a/lib/rainbows/fiber/rev.rb b/lib/rainbows/fiber/rev.rb index 6969f5b..85d1c5f 100644 --- a/lib/rainbows/fiber/rev.rb +++ b/lib/rainbows/fiber/rev.rb @@ -4,163 +4,10 @@ require 'rev' require 'rainbows/fiber' require 'rainbows/fiber/io' -module Rainbows::Fiber - module Rev - G = Rainbows::G - - # keep-alive timeout class - class Kato < ::Rev::TimerWatcher - def initialize - @watch = [] - super(1, true) - end - - def <<(fiber) - @watch << fiber - enable unless enabled? - end - - def on_timer - @watch.uniq! - while f = @watch.shift - f.resume if f.alive? - end - disable - end - end - - class Heartbeat < ::Rev::TimerWatcher - def on_timer - exit if (! G.tick && G.cur <= 0) - end - end - - class Sleeper < ::Rev::TimerWatcher - - def initialize(seconds) - @f = ::Fiber.current - super(seconds, false) - attach(::Rev::Loop.default) - ::Fiber.yield - end - - def on_timer - @f.resume - end - end - - class Server < ::Rev::IOWatcher - include Unicorn - include Rainbows - include Rainbows::Const - include Rainbows::Response - FIO = Rainbows::Fiber::IO - - def to_io - @io - end - - def initialize(io) - @io = io - super(self, :r) - end - - def close - detach if attached? - @io.close - end - - def on_readable - return if G.cur >= MAX - c = @io.kgio_tryaccept and ::Fiber.new { process(c) }.resume - end - - def process(io) - G.cur += 1 - client = FIO.new(io, ::Fiber.current) - hp = HttpParser.new - client.readpartial(16384, buf = hp.buf) - - begin # loop - until env = hp.parse - buf << (client.read_timeout or return) - end - - env[CLIENT_IO] = client - env[RACK_INPUT] = 0 == hp.content_length ? - HttpRequest::NULL_IO : TeeInput.new(client, hp) - env[REMOTE_ADDR] = io.kgio_addr - status, headers, body = APP.call(env.update(RACK_DEFAULTS)) - - if 100 == status.to_i - client.write(EXPECT_100_RESPONSE) - env.delete(HTTP_EXPECT) - status, headers, body = APP.call(env) - end - - if hp.headers? - headers = HH.new(headers) - range = make_range!(env, status, headers) and status = range.shift - env = hp.keepalive? && G.alive - headers[CONNECTION] = env ? KEEP_ALIVE : CLOSE - client.write(response_header(status, headers)) - end - write_body(client, body, range) - end while env && hp.reset.nil? - rescue => e - Error.write(io, e) - ensure - G.cur -= 1 - client.close - end - end - end - - class IO # see rainbows/fiber/io for original definition - - class Watcher < ::Rev::IOWatcher - def initialize(fio, flag) - @fiber = fio.f - super(fio, flag) - attach(::Rev::Loop.default) - end - - def on_readable - @fiber.resume - end - - alias on_writable on_readable - end - - undef_method :wait_readable - undef_method :wait_writable - undef_method :close - - def initialize(*args) - super - @r = @w = false - end - - def close - @w.detach if @w - @r.detach if @r - @r = @w = false - to_io.close unless to_io.closed? - end - - def wait_writable - @w ||= Watcher.new(self, :w) - @w.enable unless @w.enabled? - ::Fiber.yield - @w.disable - end - - def wait_readable - @r ||= Watcher.new(self, :r) - @r.enable unless @r.enabled? - KATO << f - ::Fiber.yield - @r.disable - end - end +module Rainbows::Fiber::Rev + autoload :Heartbeat, 'rainbows/fiber/rev/heartbeat' + autoload :Kato, 'rainbows/fiber/rev/kato' + autoload :Server, 'rainbows/fiber/rev/server' + autoload :Sleeper, 'rainbows/fiber/rev/sleeper' end +require 'rainbows/fiber/rev/methods' diff --git a/lib/rainbows/fiber/rev/heartbeat.rb b/lib/rainbows/fiber/rev/heartbeat.rb new file mode 100644 index 0000000..9411b4a --- /dev/null +++ b/lib/rainbows/fiber/rev/heartbeat.rb @@ -0,0 +1,8 @@ +# -*- encoding: binary -*- +# :enddoc: +class Rainbows::Fiber::Rev::Heartbeat < Rev::TimerWatcher + G = Rainbows::G + def on_timer + exit if (! G.tick && G.cur <= 0) + end +end diff --git a/lib/rainbows/fiber/rev/kato.rb b/lib/rainbows/fiber/rev/kato.rb new file mode 100644 index 0000000..056b6ef --- /dev/null +++ b/lib/rainbows/fiber/rev/kato.rb @@ -0,0 +1,22 @@ +# -*- encoding: binary -*- +# :enddoc: +# keep-alive timeout class +class Rainbows::Fiber::Rev::Kato < Rev::TimerWatcher + def initialize + @watch = [] + super(1, true) + end + + def <<(fiber) + @watch << fiber + enable unless enabled? + end + + def on_timer + @watch.uniq! + while f = @watch.shift + f.resume if f.alive? + end + disable + end +end diff --git a/lib/rainbows/fiber/rev/methods.rb b/lib/rainbows/fiber/rev/methods.rb new file mode 100644 index 0000000..5f4367e --- /dev/null +++ b/lib/rainbows/fiber/rev/methods.rb @@ -0,0 +1,48 @@ +# -*- encoding: binary -*- +# :enddoc: +module Rainbows::Fiber::Rev::Methods + class Watcher < Rev::IOWatcher + def initialize(fio, flag) + @f = fio.f || Fiber.current + super(fio, flag) + attach(Rev::Loop.default) + end + + def on_readable + @f.resume + end + + alias on_writable on_readable + end + + def initialize(*args) + @f = Fiber.current + super(*args) + @r = @w = false + end + + def close + @w.detach if @w + @r.detach if @r + @r = @w = false + super + end + + def wait_writable + @w ||= Watcher.new(self, :w) + @w.enable unless @w.enabled? + Fiber.yield + @w.disable + end + + def wait_readable + @r ||= Watcher.new(self, :r) + @r.enable unless @r.enabled? + KATO << @f + Fiber.yield + @r.disable + end +end + +Rainbows::Fiber::IO.__send__(:include, Rainbows::Fiber::Rev::Methods) +Rainbows::Client.__send__(:include, Rainbows::Fiber::Rev::Methods) diff --git a/lib/rainbows/fiber/rev/server.rb b/lib/rainbows/fiber/rev/server.rb new file mode 100644 index 0000000..9998cde --- /dev/null +++ b/lib/rainbows/fiber/rev/server.rb @@ -0,0 +1,32 @@ +# -*- encoding: binary -*- +# :enddoc: +class Rainbows::Fiber::Rev::Server < Rev::IOWatcher + G = Rainbows::G + include Rainbows::ProcessClient + + def to_io + @io + end + + def initialize(io) + @io = io + super(self, :r) + end + + def close + detach if attached? + @io.close + end + + def on_readable + return if G.cur >= MAX + c = @io.kgio_tryaccept and Fiber.new { process(c) }.resume + end + + def process(io) + G.cur += 1 + process_client(io) + ensure + G.cur -= 1 + end +end diff --git a/lib/rainbows/fiber/rev/sleeper.rb b/lib/rainbows/fiber/rev/sleeper.rb new file mode 100644 index 0000000..51f4527 --- /dev/null +++ b/lib/rainbows/fiber/rev/sleeper.rb @@ -0,0 +1,15 @@ +# -*- encoding: binary -*- +# :enddoc: +class Rainbows::Fiber::Rev::Sleeper < Rev::TimerWatcher + + def initialize(seconds) + @f = Fiber.current + super(seconds, false) + attach(Rev::Loop.default) + Fiber.yield + end + + def on_timer + @f.resume + end +end -- cgit v1.2.3-24-ge0c7