From 15631717fce044fbad2f386a7b1c7daf4bdd83d2 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Thu, 21 Oct 2010 16:25:39 -0700 Subject: code shuffling for kgio Despite the large number of changes, most of it is code movement here. --- lib/rainbows/fiber/rev.rb | 165 ++-------------------------------------------- 1 file changed, 6 insertions(+), 159 deletions(-) (limited to 'lib/rainbows/fiber/rev.rb') diff --git a/lib/rainbows/fiber/rev.rb b/lib/rainbows/fiber/rev.rb index 6969f5b..85d1c5f 100644 --- a/lib/rainbows/fiber/rev.rb +++ b/lib/rainbows/fiber/rev.rb @@ -4,163 +4,10 @@ require 'rev' require 'rainbows/fiber' require 'rainbows/fiber/io' -module Rainbows::Fiber - module Rev - G = Rainbows::G - - # keep-alive timeout class - class Kato < ::Rev::TimerWatcher - def initialize - @watch = [] - super(1, true) - end - - def <<(fiber) - @watch << fiber - enable unless enabled? - end - - def on_timer - @watch.uniq! - while f = @watch.shift - f.resume if f.alive? - end - disable - end - end - - class Heartbeat < ::Rev::TimerWatcher - def on_timer - exit if (! G.tick && G.cur <= 0) - end - end - - class Sleeper < ::Rev::TimerWatcher - - def initialize(seconds) - @f = ::Fiber.current - super(seconds, false) - attach(::Rev::Loop.default) - ::Fiber.yield - end - - def on_timer - @f.resume - end - end - - class Server < ::Rev::IOWatcher - include Unicorn - include Rainbows - include Rainbows::Const - include Rainbows::Response - FIO = Rainbows::Fiber::IO - - def to_io - @io - end - - def initialize(io) - @io = io - super(self, :r) - end - - def close - detach if attached? - @io.close - end - - def on_readable - return if G.cur >= MAX - c = @io.kgio_tryaccept and ::Fiber.new { process(c) }.resume - end - - def process(io) - G.cur += 1 - client = FIO.new(io, ::Fiber.current) - hp = HttpParser.new - client.readpartial(16384, buf = hp.buf) - - begin # loop - until env = hp.parse - buf << (client.read_timeout or return) - end - - env[CLIENT_IO] = client - env[RACK_INPUT] = 0 == hp.content_length ? - HttpRequest::NULL_IO : TeeInput.new(client, hp) - env[REMOTE_ADDR] = io.kgio_addr - status, headers, body = APP.call(env.update(RACK_DEFAULTS)) - - if 100 == status.to_i - client.write(EXPECT_100_RESPONSE) - env.delete(HTTP_EXPECT) - status, headers, body = APP.call(env) - end - - if hp.headers? - headers = HH.new(headers) - range = make_range!(env, status, headers) and status = range.shift - env = hp.keepalive? && G.alive - headers[CONNECTION] = env ? KEEP_ALIVE : CLOSE - client.write(response_header(status, headers)) - end - write_body(client, body, range) - end while env && hp.reset.nil? - rescue => e - Error.write(io, e) - ensure - G.cur -= 1 - client.close - end - end - end - - class IO # see rainbows/fiber/io for original definition - - class Watcher < ::Rev::IOWatcher - def initialize(fio, flag) - @fiber = fio.f - super(fio, flag) - attach(::Rev::Loop.default) - end - - def on_readable - @fiber.resume - end - - alias on_writable on_readable - end - - undef_method :wait_readable - undef_method :wait_writable - undef_method :close - - def initialize(*args) - super - @r = @w = false - end - - def close - @w.detach if @w - @r.detach if @r - @r = @w = false - to_io.close unless to_io.closed? - end - - def wait_writable - @w ||= Watcher.new(self, :w) - @w.enable unless @w.enabled? - ::Fiber.yield - @w.disable - end - - def wait_readable - @r ||= Watcher.new(self, :r) - @r.enable unless @r.enabled? - KATO << f - ::Fiber.yield - @r.disable - end - end +module Rainbows::Fiber::Rev + autoload :Heartbeat, 'rainbows/fiber/rev/heartbeat' + autoload :Kato, 'rainbows/fiber/rev/kato' + autoload :Server, 'rainbows/fiber/rev/server' + autoload :Sleeper, 'rainbows/fiber/rev/sleeper' end +require 'rainbows/fiber/rev/methods' -- cgit v1.2.3-24-ge0c7