From 15631717fce044fbad2f386a7b1c7daf4bdd83d2 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Thu, 21 Oct 2010 16:25:39 -0700 Subject: code shuffling for kgio Despite the large number of changes, most of it is code movement here. --- lib/rainbows/base.rb | 56 +-------- lib/rainbows/client.rb | 9 ++ lib/rainbows/fiber.rb | 27 ++++- lib/rainbows/fiber/base.rb | 157 +++++++++++------------- lib/rainbows/fiber/io.rb | 232 +++++++++++++++++++----------------- lib/rainbows/fiber/io/compat.rb | 10 ++ lib/rainbows/fiber/io/methods.rb | 44 +++++++ lib/rainbows/fiber/io/pipe.rb | 7 ++ lib/rainbows/fiber/io/socket.rb | 7 ++ lib/rainbows/fiber/rev.rb | 165 +------------------------ lib/rainbows/fiber/rev/heartbeat.rb | 8 ++ lib/rainbows/fiber/rev/kato.rb | 22 ++++ lib/rainbows/fiber/rev/methods.rb | 48 ++++++++ lib/rainbows/fiber/rev/server.rb | 32 +++++ lib/rainbows/fiber/rev/sleeper.rb | 15 +++ lib/rainbows/fiber_pool.rb | 2 +- lib/rainbows/fiber_spawn.rb | 3 +- lib/rainbows/http_request.rb | 39 ++++++ lib/rainbows/process_client.rb | 61 ++++++++++ lib/rainbows/read_timeout.rb | 28 +++++ lib/rainbows/rev_fiber_spawn.rb | 2 +- lib/rainbows/writer_thread_pool.rb | 4 + lib/rainbows/writer_thread_spawn.rb | 4 + 23 files changed, 564 insertions(+), 418 deletions(-) create mode 100644 lib/rainbows/client.rb create mode 100644 lib/rainbows/fiber/io/compat.rb create mode 100644 lib/rainbows/fiber/io/methods.rb create mode 100644 lib/rainbows/fiber/io/pipe.rb create mode 100644 lib/rainbows/fiber/io/socket.rb create mode 100644 lib/rainbows/fiber/rev/heartbeat.rb create mode 100644 lib/rainbows/fiber/rev/kato.rb create mode 100644 lib/rainbows/fiber/rev/methods.rb create mode 100644 lib/rainbows/fiber/rev/server.rb create mode 100644 lib/rainbows/fiber/rev/sleeper.rb create mode 100644 lib/rainbows/http_request.rb create mode 100644 lib/rainbows/process_client.rb create mode 100644 lib/rainbows/read_timeout.rb (limited to 'lib/rainbows') diff --git a/lib/rainbows/base.rb b/lib/rainbows/base.rb index fe2cf31..3027aba 100644 --- a/lib/rainbows/base.rb +++ b/lib/rainbows/base.rb @@ -8,14 +8,10 @@ module Rainbows::Base # :stopdoc: - include Rainbows::Const - include Rainbows::Response + include Rainbows::ProcessClient # shortcuts... G = Rainbows::G - NULL_IO = Unicorn::HttpRequest::NULL_IO - TeeInput = Rainbows::TeeInput - HttpParser = Unicorn::HttpParser # this method is called by all current concurrency models def init_worker_process(worker) # :nodoc: @@ -33,58 +29,10 @@ module Rainbows::Base trap(:USR1) { reopen_worker_logs(worker.nr) } trap(:QUIT) { G.quit! } [:TERM, :INT].each { |sig| trap(sig) { exit!(0) } } # instant shutdown + Rainbows::ProcessClient.const_set(:APP, G.server.app) logger.info "Rainbows! #@use worker_connections=#@worker_connections" end - def wait_headers_readable(client) # :nodoc: - IO.select([client], nil, nil, G.kato) - end - - # once a client is accepted, it is processed in its entirety here - # in 3 easy steps: read request, call app, write app response - # this is used by synchronous concurrency models - # Base, ThreadSpawn, ThreadPool - def process_client(client) # :nodoc: - hp = HttpParser.new - client.readpartial(CHUNK_SIZE, buf = hp.buf) - remote_addr = Rainbows.addr(client) - - begin # loop - until env = hp.parse - wait_headers_readable(client) or return - buf << client.readpartial(CHUNK_SIZE) - end - - env[CLIENT_IO] = client - env[RACK_INPUT] = 0 == hp.content_length ? NULL_IO : TeeInput.new(client, hp) - env[REMOTE_ADDR] = remote_addr - status, headers, body = app.call(env.update(RACK_DEFAULTS)) - - if 100 == status.to_i - client.write(EXPECT_100_RESPONSE) - env.delete(HTTP_EXPECT) - status, headers, body = app.call(env) - end - - if hp.headers? - headers = HH.new(headers) - range = make_range!(env, status, headers) and status = range.shift - env = hp.keepalive? && G.alive - headers[CONNECTION] = env ? KEEP_ALIVE : CLOSE - client.write(response_header(status, headers)) - end - write_body(client, body, range) - end while env && hp.reset.nil? - # if we get any error, try to write something back to the client - # assuming we haven't closed the socket, but don't get hung up - # if the socket is already closed or broken. We'll always ensure - # the socket is closed at the end of this function - rescue => e - Rainbows::Error.write(client, e) - ensure - client.close unless client.closed? - end - def self.included(klass) # :nodoc: klass.const_set :LISTENERS, Rainbows::HttpServer::LISTENERS klass.const_set :G, Rainbows::G diff --git a/lib/rainbows/client.rb b/lib/rainbows/client.rb new file mode 100644 index 0000000..8956509 --- /dev/null +++ b/lib/rainbows/client.rb @@ -0,0 +1,9 @@ +# -*- encoding: binary -*- +# :enddoc: + +require 'rainbows/read_timeout' + +class Rainbows::Client < Kgio::Socket + include Rainbows::ReadTimeout +end +Kgio.accept_class = Rainbows::Client diff --git a/lib/rainbows/fiber.rb b/lib/rainbows/fiber.rb index e65ef1b..94f7d94 100644 --- a/lib/rainbows/fiber.rb +++ b/lib/rainbows/fiber.rb @@ -6,11 +6,28 @@ rescue LoadError defined?(NeverBlock) or raise end -module Rainbows +# core module for all things that use Fibers in Rainbows! +module Rainbows::Fiber - # core module for all things that use Fibers in Rainbows! - module Fiber - autoload :Base, 'rainbows/fiber/base' - autoload :Queue, 'rainbows/fiber/queue' + # blocked readers (key: fileno, value: Rainbows::Fiber::IO object) + RD = [] + + # blocked writers (key: fileno, value: Rainbows::Fiber::IO object) + WR = [] + + # sleeping fibers go here (key: Fiber object, value: wakeup time) + ZZ = {} + + # puts the current Fiber into uninterruptible sleep for at least + # +seconds+. Unlike Kernel#sleep, this it is not possible to sleep + # indefinitely to be woken up (nobody wants that in a web server, + # right?). Calling this directly is deprecated, use + # Rainbows.sleep(seconds) instead. + def self.sleep(seconds) + ZZ[::Fiber.current] = Time.now + seconds + ::Fiber.yield end + + autoload :Base, 'rainbows/fiber/base' + autoload :Queue, 'rainbows/fiber/queue' end diff --git a/lib/rainbows/fiber/base.rb b/lib/rainbows/fiber/base.rb index b3a4c89..b7c4ce5 100644 --- a/lib/rainbows/fiber/base.rb +++ b/lib/rainbows/fiber/base.rb @@ -2,103 +2,84 @@ # :enddoc: require 'rainbows/fiber/io' -module Rainbows - module Fiber +module Rainbows::Fiber::Base - # blocked readers (key: fileno, value: Rainbows::Fiber::IO object) - RD = [] + include Rainbows::Base - # blocked writers (key: fileno, value: Rainbows::Fiber::IO object) - WR = [] + # :stopdoc: + RD = Rainbows::Fiber::RD + WR = Rainbows::Fiber::WR + ZZ = Rainbows::Fiber::ZZ + # :startdoc: - # sleeping fibers go here (key: Fiber object, value: wakeup time) - ZZ = {}.compare_by_identity + # the scheduler method that powers both FiberSpawn and FiberPool + # concurrency models. It times out idle clients and attempts to + # schedules ones that were blocked on I/O. At most it'll sleep + # for one second (returned by the schedule_sleepers method) which + # will cause it. + def schedule(&block) + ret = begin + G.tick + RD.compact.each { |c| c.f.resume } # attempt to time out idle clients + t = schedule_sleepers + Kernel.select(RD.compact.concat(LISTENERS), WR.compact, nil, t) or return + rescue Errno::EINTR + retry + rescue Errno::EBADF, TypeError + LISTENERS.compact! + raise + end or return - # puts the current Fiber into uninterruptible sleep for at least - # +seconds+. Unlike Kernel#sleep, this it is not possible to sleep - # indefinitely to be woken up (nobody wants that in a web server, - # right?). Calling this directly is deprecated, use - # Rainbows.sleep(seconds) instead. - def self.sleep(seconds) - ZZ[::Fiber.current] = Time.now + seconds - ::Fiber.yield - end - - # base module used by FiberSpawn and FiberPool - module Base - include Rainbows::Base - - # the scheduler method that powers both FiberSpawn and FiberPool - # concurrency models. It times out idle clients and attempts to - # schedules ones that were blocked on I/O. At most it'll sleep - # for one second (returned by the schedule_sleepers method) which - # will cause it. - def schedule(&block) - ret = begin - G.tick - RD.compact.each { |c| c.f.resume } # attempt to time out idle clients - t = schedule_sleepers - Kernel.select(RD.compact.concat(LISTENERS), - WR.compact, nil, t) or return - rescue Errno::EINTR - retry - rescue Errno::EBADF, TypeError - LISTENERS.compact! - raise - end or return - - # active writers first, then _all_ readers for keepalive timeout - ret[1].concat(RD.compact).each { |c| c.f.resume } + # active writers first, then _all_ readers for keepalive timeout + ret[1].concat(RD.compact).each { |c| c.f.resume } - # accept is an expensive syscall, filter out listeners we don't want - (ret[0] & LISTENERS).each(&block) - end + # accept is an expensive syscall, filter out listeners we don't want + (ret[0] & LISTENERS).each(&block) + end - # wakes up any sleepers that need to be woken and - # returns an interval to IO.select on - def schedule_sleepers - max = nil - now = Time.now - fibs = [] - ZZ.delete_if { |fib, time| - if now >= time - fibs << fib - else - max = time - false - end - } - fibs.each { |fib| fib.resume } - now = Time.now - max.nil? || max > (now + 1) ? 1 : max - now + # wakes up any sleepers that need to be woken and + # returns an interval to IO.select on + def schedule_sleepers + max = nil + now = Time.now + fibs = [] + ZZ.delete_if { |fib, time| + if now >= time + fibs << fib + else + max = time + false end + } + fibs.each { |fib| fib.resume } + now = Time.now + max.nil? || max > (now + 1) ? 1 : max - now + end - def wait_headers_readable(client) - io = client.to_io - expire = nil - begin - return io.recv_nonblock(1, Socket::MSG_PEEK) - rescue Errno::EAGAIN - return if expire && expire < Time.now - expire ||= Time.now + G.kato - client.wait_readable - retry - end - end + def wait_headers_readable(client) + io = client.to_io + expire = nil + begin + return io.recv_nonblock(1, Socket::MSG_PEEK) + rescue Errno::EAGAIN + return if expire && expire < Time.now + expire ||= Time.now + G.kato + client.wait_readable + retry + end + end - def process_client(client) - G.cur += 1 - super(client) # see Rainbows::Base - ensure - G.cur -= 1 - ZZ.delete(client.f) - end + def process(client) + G.cur += 1 + process_client(client) + ensure + G.cur -= 1 + ZZ.delete(client.f) + end - def self.setup(klass, app) - require 'rainbows/fiber/body' - klass.__send__(:include, Rainbows::Fiber::Body) - self.const_set(:APP, app) - end - end + def self.setup(klass, app) + require 'rainbows/fiber/body' + klass.__send__(:include, Rainbows::Fiber::Body) + self.const_set(:APP, app) end end diff --git a/lib/rainbows/fiber/io.rb b/lib/rainbows/fiber/io.rb index 571f070..711d95e 100644 --- a/lib/rainbows/fiber/io.rb +++ b/lib/rainbows/fiber/io.rb @@ -1,117 +1,133 @@ # -*- encoding: binary -*- -module Rainbows - module Fiber - - # A partially complete IO wrapper, this exports an IO.select()-able - # #to_io method and gives users the illusion of a synchronous - # interface that yields away from the current Fiber whenever - # the underlying IO object cannot read or write - # - # TODO: subclass off IO and include Kgio::SocketMethods instead - class IO < Struct.new(:to_io, :f) - # :stopdoc: - LOCALHOST = Kgio::LOCALHOST - - # needed to write errors with - def write_nonblock(buf) - to_io.write_nonblock(buf) - end - - def kgio_addr - to_io.kgio_addr - end - - # for wrapping output response bodies - def each(&block) - if buf = readpartial(16384) - yield buf - yield buf while readpartial(16384, buf) + +# A Fiber-aware IO class, gives users the illusion of a synchronous +# interface that yields away from the current Fiber whenever +# the underlying descriptor is blocked on reads or write +# +# This is a stable, legacy interface and should be preserved for all +# future versions of Rainbows! However, new apps should use +# Rainbows::Fiber::IO::Socket or Rainbows::Fiber::IO::Pipe instead. + +class Rainbows::Fiber::IO + attr_accessor :to_io + + # :stopdoc: + class << self + alias :[] :new + end + # :startdoc: + + # needed to write errors with + def write_nonblock(buf) + @to_io.write_nonblock(buf) + end + + def kgio_addr + @to_io.kgio_addr + end + + # for wrapping output response bodies + def each(&block) + buf = readpartial(16384) + yield buf + yield buf while readpartial(16384, buf) + rescue EOFError + self + end + + def closed? + @to_io.closed? + end + + def fileno + @to_io.fileno + end + + def write(buf) + if @to_io.respond_to?(:kgio_trywrite) + begin + case rv = @to_io.kgio_trywrite(buf) + when nil + return + when String + buf = rv + when Kgio::WaitWritable + wait_writable end - rescue EOFError - self - end - - def close - fileno = to_io.fileno - RD[fileno] = WR[fileno] = nil - to_io.close unless to_io.closed? - end - - def closed? - to_io.closed? - end - - def wait_readable - fileno = to_io.fileno - RD[fileno] = self - ::Fiber.yield - RD[fileno] = nil - end - - def wait_writable - fileno = to_io.fileno - WR[fileno] = self - ::Fiber.yield - WR[fileno] = nil - end - - def write(buf) - begin - case rv = to_io.kgio_trywrite(buf) - when nil - return - when String - buf = rv - when Kgio::WaitWritable - wait_writable - end - end while true - end - - # used for reading headers (respecting keepalive_timeout) - def read_timeout - expire = nil - begin - to_io.read_nonblock(16384) - rescue Errno::EAGAIN - return if expire && expire < Time.now - expire ||= Time.now + G.kato + end while true + else + begin + (rv = @to_io.write_nonblock(buf)) == buf.bytesize and return + buf = byte_slice(buf, rv..-1) + rescue Errno::EAGAIN + wait_writable + end while true + end + end + + def byte_slice(buf, range) # :nodoc: + if buf.encoding != Encoding::BINARY + buf.dup.force_encoding(Encoding::BINARY)[range] + else + buf[range] + end + end + + # used for reading headers (respecting keepalive_timeout) + def read_timeout + expire = nil + begin + return @to_io.read_nonblock(16384) + rescue Errno::EAGAIN + return if expire && expire < Time.now + expire ||= Time.now + G.kato + wait_readable + end while true + end + + def readpartial(length, buf = "") + if @to_io.respond_to?(:kgio_tryread) + begin + rv = @to_io.kgio_tryread(length, buf) + case rv + when nil + raise EOFError, "end of file reached", [] + when Kgio::WaitReadable wait_readable - retry - end - end - - def readpartial(length, buf = "") - if to_io.respond_to?(:kgio_tryread) - # TODO: use kgio_read! - begin - rv = to_io.kgio_tryread(length, buf) - case rv - when nil - raise EOFError, "end of file reached", [] - when Kgio::WaitReadable - wait_readable - else - return rv - end - end while true else - begin - to_io.read_nonblock(length, buf) - rescue Errno::EAGAIN - wait_readable - retry - end + return rv end - end + end while true + else + begin + return @to_io.read_nonblock(length, buf) + rescue Errno::EAGAIN + wait_readable + end while true + end + end - def kgio_read(*args) - to_io.kgio_read(*args) - end + def kgio_read(*args) + @to_io.kgio_read(*args) + end - def kgio_read!(*args) - to_io.kgio_read!(*args) - end - end + def kgio_read!(*args) + @to_io.kgio_read!(*args) end + + def kgio_trywrite(*args) + @to_io.kgio_trywrite(*args) + end + + autoload :Socket, 'rainbows/fiber/io/socket' + autoload :Pipe, 'rainbows/fiber/io/pipe' end + +# :stopdoc: +require 'rainbows/fiber/io/methods' +require 'rainbows/fiber/io/compat' +Rainbows::Client.__send__(:include, Rainbows::Fiber::IO::Methods) +Rainbows::Fiber::IO.__send__(:include, Rainbows::Fiber::IO::Compat) +Rainbows::Fiber::IO.__send__(:include, Rainbows::Fiber::IO::Methods) +Kgio.wait_readable = :wait_readable +Kgio.wait_writable = :wait_writable diff --git a/lib/rainbows/fiber/io/compat.rb b/lib/rainbows/fiber/io/compat.rb new file mode 100644 index 0000000..2aaf416 --- /dev/null +++ b/lib/rainbows/fiber/io/compat.rb @@ -0,0 +1,10 @@ +# -*- encoding: binary -*- +module Rainbows::Fiber::IO::Compat + def initialize(io, fiber = Fiber.current) + @to_io, @f = io, fiber + end + + def close + @to_io.close + end +end diff --git a/lib/rainbows/fiber/io/methods.rb b/lib/rainbows/fiber/io/methods.rb new file mode 100644 index 0000000..663fdb4 --- /dev/null +++ b/lib/rainbows/fiber/io/methods.rb @@ -0,0 +1,44 @@ +# -*- encoding: binary -*- + +module Rainbows::Fiber::IO::Methods + RD = Rainbows::Fiber::RD + WR = Rainbows::Fiber::WR + attr_accessor :f + + # for wrapping output response bodies + def each(&block) + if buf = kgio_read(16384) + yield buf + yield buf while kgio_read(16384, buf) + end + self + end + + def close + fd = fileno + RD[fd] = WR[fd] = nil + super + end + + def wait_readable + fd = fileno + @f = Fiber.current + RD[fd] = self + Fiber.yield + RD[fd] = nil + end + + def wait_writable + fd = fileno + @f = Fiber.current + WR[fd] = self + Fiber.yield + WR[fd] = nil + end + + def self.included(klass) + if klass.method_defined?(:kgio_write) + klass.__send__(:alias_method, :write, :kgio_write) + end + end +end diff --git a/lib/rainbows/fiber/io/pipe.rb b/lib/rainbows/fiber/io/pipe.rb new file mode 100644 index 0000000..c7ae508 --- /dev/null +++ b/lib/rainbows/fiber/io/pipe.rb @@ -0,0 +1,7 @@ +# -*- encoding: binary -*- +# A Fiber-aware Pipe class, gives users the illusion of a synchronous +# interface that yields away from the current Fiber whenever +# the underlying descriptor is blocked on reads or write +class Rainbows::Fiber::IO::Pipe < Kgio::Pipe + include Rainbows::Fiber::IO::Methods +end diff --git a/lib/rainbows/fiber/io/socket.rb b/lib/rainbows/fiber/io/socket.rb new file mode 100644 index 0000000..61c451d --- /dev/null +++ b/lib/rainbows/fiber/io/socket.rb @@ -0,0 +1,7 @@ +# -*- encoding: binary -*- +# A Fiber-aware Socket class, gives users the illusion of a synchronous +# interface that yields away from the current Fiber whenever +# the underlying descriptor is blocked on reads or write +class Rainbows::Fiber::IO::Socket < Kgio::Socket + include Rainbows::Fiber::IO::Methods +end diff --git a/lib/rainbows/fiber/rev.rb b/lib/rainbows/fiber/rev.rb index 6969f5b..85d1c5f 100644 --- a/lib/rainbows/fiber/rev.rb +++ b/lib/rainbows/fiber/rev.rb @@ -4,163 +4,10 @@ require 'rev' require 'rainbows/fiber' require 'rainbows/fiber/io' -module Rainbows::Fiber - module Rev - G = Rainbows::G - - # keep-alive timeout class - class Kato < ::Rev::TimerWatcher - def initialize - @watch = [] - super(1, true) - end - - def <<(fiber) - @watch << fiber - enable unless enabled? - end - - def on_timer - @watch.uniq! - while f = @watch.shift - f.resume if f.alive? - end - disable - end - end - - class Heartbeat < ::Rev::TimerWatcher - def on_timer - exit if (! G.tick && G.cur <= 0) - end - end - - class Sleeper < ::Rev::TimerWatcher - - def initialize(seconds) - @f = ::Fiber.current - super(seconds, false) - attach(::Rev::Loop.default) - ::Fiber.yield - end - - def on_timer - @f.resume - end - end - - class Server < ::Rev::IOWatcher - include Unicorn - include Rainbows - include Rainbows::Const - include Rainbows::Response - FIO = Rainbows::Fiber::IO - - def to_io - @io - end - - def initialize(io) - @io = io - super(self, :r) - end - - def close - detach if attached? - @io.close - end - - def on_readable - return if G.cur >= MAX - c = @io.kgio_tryaccept and ::Fiber.new { process(c) }.resume - end - - def process(io) - G.cur += 1 - client = FIO.new(io, ::Fiber.current) - hp = HttpParser.new - client.readpartial(16384, buf = hp.buf) - - begin # loop - until env = hp.parse - buf << (client.read_timeout or return) - end - - env[CLIENT_IO] = client - env[RACK_INPUT] = 0 == hp.content_length ? - HttpRequest::NULL_IO : TeeInput.new(client, hp) - env[REMOTE_ADDR] = io.kgio_addr - status, headers, body = APP.call(env.update(RACK_DEFAULTS)) - - if 100 == status.to_i - client.write(EXPECT_100_RESPONSE) - env.delete(HTTP_EXPECT) - status, headers, body = APP.call(env) - end - - if hp.headers? - headers = HH.new(headers) - range = make_range!(env, status, headers) and status = range.shift - env = hp.keepalive? && G.alive - headers[CONNECTION] = env ? KEEP_ALIVE : CLOSE - client.write(response_header(status, headers)) - end - write_body(client, body, range) - end while env && hp.reset.nil? - rescue => e - Error.write(io, e) - ensure - G.cur -= 1 - client.close - end - end - end - - class IO # see rainbows/fiber/io for original definition - - class Watcher < ::Rev::IOWatcher - def initialize(fio, flag) - @fiber = fio.f - super(fio, flag) - attach(::Rev::Loop.default) - end - - def on_readable - @fiber.resume - end - - alias on_writable on_readable - end - - undef_method :wait_readable - undef_method :wait_writable - undef_method :close - - def initialize(*args) - super - @r = @w = false - end - - def close - @w.detach if @w - @r.detach if @r - @r = @w = false - to_io.close unless to_io.closed? - end - - def wait_writable - @w ||= Watcher.new(self, :w) - @w.enable unless @w.enabled? - ::Fiber.yield - @w.disable - end - - def wait_readable - @r ||= Watcher.new(self, :r) - @r.enable unless @r.enabled? - KATO << f - ::Fiber.yield - @r.disable - end - end +module Rainbows::Fiber::Rev + autoload :Heartbeat, 'rainbows/fiber/rev/heartbeat' + autoload :Kato, 'rainbows/fiber/rev/kato' + autoload :Server, 'rainbows/fiber/rev/server' + autoload :Sleeper, 'rainbows/fiber/rev/sleeper' end +require 'rainbows/fiber/rev/methods' diff --git a/lib/rainbows/fiber/rev/heartbeat.rb b/lib/rainbows/fiber/rev/heartbeat.rb new file mode 100644 index 0000000..9411b4a --- /dev/null +++ b/lib/rainbows/fiber/rev/heartbeat.rb @@ -0,0 +1,8 @@ +# -*- encoding: binary -*- +# :enddoc: +class Rainbows::Fiber::Rev::Heartbeat < Rev::TimerWatcher + G = Rainbows::G + def on_timer + exit if (! G.tick && G.cur <= 0) + end +end diff --git a/lib/rainbows/fiber/rev/kato.rb b/lib/rainbows/fiber/rev/kato.rb new file mode 100644 index 0000000..056b6ef --- /dev/null +++ b/lib/rainbows/fiber/rev/kato.rb @@ -0,0 +1,22 @@ +# -*- encoding: binary -*- +# :enddoc: +# keep-alive timeout class +class Rainbows::Fiber::Rev::Kato < Rev::TimerWatcher + def initialize + @watch = [] + super(1, true) + end + + def <<(fiber) + @watch << fiber + enable unless enabled? + end + + def on_timer + @watch.uniq! + while f = @watch.shift + f.resume if f.alive? + end + disable + end +end diff --git a/lib/rainbows/fiber/rev/methods.rb b/lib/rainbows/fiber/rev/methods.rb new file mode 100644 index 0000000..5f4367e --- /dev/null +++ b/lib/rainbows/fiber/rev/methods.rb @@ -0,0 +1,48 @@ +# -*- encoding: binary -*- +# :enddoc: +module Rainbows::Fiber::Rev::Methods + class Watcher < Rev::IOWatcher + def initialize(fio, flag) + @f = fio.f || Fiber.current + super(fio, flag) + attach(Rev::Loop.default) + end + + def on_readable + @f.resume + end + + alias on_writable on_readable + end + + def initialize(*args) + @f = Fiber.current + super(*args) + @r = @w = false + end + + def close + @w.detach if @w + @r.detach if @r + @r = @w = false + super + end + + def wait_writable + @w ||= Watcher.new(self, :w) + @w.enable unless @w.enabled? + Fiber.yield + @w.disable + end + + def wait_readable + @r ||= Watcher.new(self, :r) + @r.enable unless @r.enabled? + KATO << @f + Fiber.yield + @r.disable + end +end + +Rainbows::Fiber::IO.__send__(:include, Rainbows::Fiber::Rev::Methods) +Rainbows::Client.__send__(:include, Rainbows::Fiber::Rev::Methods) diff --git a/lib/rainbows/fiber/rev/server.rb b/lib/rainbows/fiber/rev/server.rb new file mode 100644 index 0000000..9998cde --- /dev/null +++ b/lib/rainbows/fiber/rev/server.rb @@ -0,0 +1,32 @@ +# -*- encoding: binary -*- +# :enddoc: +class Rainbows::Fiber::Rev::Server < Rev::IOWatcher + G = Rainbows::G + include Rainbows::ProcessClient + + def to_io + @io + end + + def initialize(io) + @io = io + super(self, :r) + end + + def close + detach if attached? + @io.close + end + + def on_readable + return if G.cur >= MAX + c = @io.kgio_tryaccept and Fiber.new { process(c) }.resume + end + + def process(io) + G.cur += 1 + process_client(io) + ensure + G.cur -= 1 + end +end diff --git a/lib/rainbows/fiber/rev/sleeper.rb b/lib/rainbows/fiber/rev/sleeper.rb new file mode 100644 index 0000000..51f4527 --- /dev/null +++ b/lib/rainbows/fiber/rev/sleeper.rb @@ -0,0 +1,15 @@ +# -*- encoding: binary -*- +# :enddoc: +class Rainbows::Fiber::Rev::Sleeper < Rev::TimerWatcher + + def initialize(seconds) + @f = Fiber.current + super(seconds, false) + attach(Rev::Loop.default) + Fiber.yield + end + + def on_timer + @f.resume + end +end diff --git a/lib/rainbows/fiber_pool.rb b/lib/rainbows/fiber_pool.rb index 4f3ffd8..c0d2ba7 100644 --- a/lib/rainbows/fiber_pool.rb +++ b/lib/rainbows/fiber_pool.rb @@ -21,7 +21,7 @@ module Rainbows pool = [] worker_connections.times { ::Fiber.new { - process_client(::Fiber.yield) while pool << ::Fiber.current + process(::Fiber.yield) while pool << ::Fiber.current }.resume # resume to hit ::Fiber.yield so it waits on a client } Fiber::Base.setup(self.class, app) diff --git a/lib/rainbows/fiber_spawn.rb b/lib/rainbows/fiber_spawn.rb index ec259ad..1a0da04 100644 --- a/lib/rainbows/fiber_spawn.rb +++ b/lib/rainbows/fiber_spawn.rb @@ -17,13 +17,12 @@ module Rainbows init_worker_process(worker) Fiber::Base.setup(self.class, app) limit = worker_connections - fio = Rainbows::Fiber::IO begin schedule do |l| break if G.cur >= limit io = l.kgio_tryaccept or next - ::Fiber.new { process_client(fio.new(io, ::Fiber.current)) }.resume + ::Fiber.new { process(io) }.resume end rescue => e Error.listen_loop(e) diff --git a/lib/rainbows/http_request.rb b/lib/rainbows/http_request.rb new file mode 100644 index 0000000..56c24ca --- /dev/null +++ b/lib/rainbows/http_request.rb @@ -0,0 +1,39 @@ +# -*- encoding: binary -*- +class Rainbows::HttpRequest < Unicorn::HttpRequest + attr_accessor :remote_addr + + def keepalive? + if rv = keepalive? + env.clear + parser.reset + end + rv + end + + def initialize(socket) + @remote_addr = if socket.respond_to?(:kgio_addr) + socket.kgio_addr + elsif socket.respond_to?(:peeraddr) + socket.peeraddr[-1] + else + Kgio::LOCALHOST + end + super() + end + + def wait_headers_readable(socket) + IO.select([socket], nil, nil, Rainbows::G.kato) + end + + def tryread(socket) + socket.kgio_read!(16384, b = buf) + until e = parse + wait_headers_readable(socket) + b << socket.kgio_read!(16384) + end + e[Rainbows::Const::CLIENT_IO] = socket + e[RACK_INPUT] = 0 == content_length ? NULL_IO : TeeInput.new(socket, self) + e[REMOTE_ADDR] = @remote_addr + e.merge!(DEFAULTS) + end +end diff --git a/lib/rainbows/process_client.rb b/lib/rainbows/process_client.rb new file mode 100644 index 0000000..5e200e5 --- /dev/null +++ b/lib/rainbows/process_client.rb @@ -0,0 +1,61 @@ +# -*- encoding: binary -*- +# :enddoc: +module Rainbows::ProcessClient + G = Rainbows::G + include Rainbows::Response + HttpParser = Unicorn::HttpParser + NULL_IO = Unicorn::HttpRequest::NULL_IO + RACK_INPUT = Unicorn::HttpRequest::RACK_INPUT + TeeInput = Rainbows::TeeInput + include Rainbows::Const + + def wait_headers_readable(client) + IO.select([client], nil, nil, G.kato) + end + + # once a client is accepted, it is processed in its entirety here + # in 3 easy steps: read request, call app, write app response + # this is used by synchronous concurrency models + # Base, ThreadSpawn, ThreadPool + def process_client(client) # :nodoc: + hp = HttpParser.new + client.readpartial(16384, buf = hp.buf) + remote_addr = client.kgio_addr + + begin # loop + until env = hp.parse + wait_headers_readable(client) or return + buf << client.readpartial(16384) + end + + env[CLIENT_IO] = client + env[RACK_INPUT] = 0 == hp.content_length ? + NULL_IO : TeeInput.new(client, hp) + env[REMOTE_ADDR] = remote_addr + status, headers, body = APP.call(env.update(RACK_DEFAULTS)) + + if 100 == status.to_i + client.write(EXPECT_100_RESPONSE) + env.delete(HTTP_EXPECT) + status, headers, body = APP.call(env) + end + + if hp.headers? + headers = HH.new(headers) + range = make_range!(env, status, headers) and status = range.shift + env = hp.keepalive? && G.alive + headers[CONNECTION] = env ? KEEP_ALIVE : CLOSE + client.write(response_header(status, headers)) + end + write_body(client, body, range) + end while env && hp.reset.nil? + # if we get any error, try to write something back to the client + # assuming we haven't closed the socket, but don't get hung up + # if the socket is already closed or broken. We'll always ensure + # the socket is closed at the end of this function + rescue => e + Rainbows::Error.write(client, e) + ensure + client.close unless client.closed? + end +end diff --git a/lib/rainbows/read_timeout.rb b/lib/rainbows/read_timeout.rb new file mode 100644 index 0000000..d8245bd --- /dev/null +++ b/lib/rainbows/read_timeout.rb @@ -0,0 +1,28 @@ +# -*- encoding: binary -*- +# :enddoc: +module Rainbows::ReadTimeout + G = Rainbows::G # :nodoc: + + def wait_readable + IO.select([self], nil, nil, G.kato) + end + + # used for reading headers (respecting keepalive_timeout) + def read_timeout(buf = "") + expire = nil + begin + case rv = kgio_tryread(16384, buf) + when :wait_readable + now = Time.now.to_f + if expire + now > expire and return + else + expire = now + G.kato + end + wait_readable + else + return rv + end + end while true + end +end diff --git a/lib/rainbows/rev_fiber_spawn.rb b/lib/rainbows/rev_fiber_spawn.rb index c8e2bd1..522ae71 100644 --- a/lib/rainbows/rev_fiber_spawn.rb +++ b/lib/rainbows/rev_fiber_spawn.rb @@ -23,7 +23,7 @@ module Rainbows Server.const_set(:APP, G.server.app) Heartbeat.new(1, true).attach(::Rev::Loop.default) kato = Kato.new.attach(::Rev::Loop.default) - Rainbows::Fiber::IO.const_set(:KATO, kato) + Rainbows::Fiber::Rev::Methods.const_set(:KATO, kato) LISTENERS.map! { |s| Server.new(s).attach(::Rev::Loop.default) } ::Rev::Loop.default.run end diff --git a/lib/rainbows/writer_thread_pool.rb b/lib/rainbows/writer_thread_pool.rb index 5c8e2a3..335a901 100644 --- a/lib/rainbows/writer_thread_pool.rb +++ b/lib/rainbows/writer_thread_pool.rb @@ -25,6 +25,10 @@ module Rainbows # used to wrap a BasicSocket to use with +q+ for all writes # this is compatible with IO.select class QueueSocket < Struct.new(:to_io, :q) # :nodoc: + def kgio_addr + to_io.kgio_addr + end + def readpartial(size, buf = "") to_io.readpartial(size, buf) end diff --git a/lib/rainbows/writer_thread_spawn.rb b/lib/rainbows/writer_thread_spawn.rb index dfd6c39..6468151 100644 --- a/lib/rainbows/writer_thread_spawn.rb +++ b/lib/rainbows/writer_thread_spawn.rb @@ -30,6 +30,10 @@ module Rainbows class MySocket < Struct.new(:to_io, :q, :thr) # :nodoc: all include Rainbows::Response + def kgio_addr + to_io.kgio_addr + end + def readpartial(size, buf = "") to_io.readpartial(size, buf) end -- cgit v1.2.3-24-ge0c7