From 4d8304bf0aa5665e8f8474dfb96019297fa0c2b9 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Wed, 25 Nov 2009 01:44:34 -0800 Subject: add FiberSpawn concurrency model This one seems a easy to get working and supports everything we need to support from the server perspective. Apps will need modified drivers, but it doesn't seem too hard to add more/better support for wrapping IO objects with Fiber::IO. --- lib/rainbows.rb | 1 + lib/rainbows/dev_fd_response.rb | 4 +++ lib/rainbows/fiber.rb | 53 +++++++++++++++++++++++++++++++++ lib/rainbows/fiber/io.rb | 65 +++++++++++++++++++++++++++++++++++++++++ lib/rainbows/fiber_spawn.rb | 56 +++++++++++++++++++++++++++++++++++ 5 files changed, 179 insertions(+) create mode 100644 lib/rainbows/fiber.rb create mode 100644 lib/rainbows/fiber/io.rb create mode 100644 lib/rainbows/fiber_spawn.rb (limited to 'lib') diff --git a/lib/rainbows.rb b/lib/rainbows.rb index 8923bf6..20f6ea8 100644 --- a/lib/rainbows.rb +++ b/lib/rainbows.rb @@ -76,6 +76,7 @@ module Rainbows :Rev => 50, :RevThreadSpawn => 50, :EventMachine => 50, + :FiberSpawn => 50, }.each do |model, _| u = model.to_s.gsub(/([a-z0-9])([A-Z0-9])/) { "#{$1}_#{$2.downcase!}" } autoload model, "rainbows/#{u.downcase!}" diff --git a/lib/rainbows/dev_fd_response.rb b/lib/rainbows/dev_fd_response.rb index e4e5f0c..5b90d24 100644 --- a/lib/rainbows/dev_fd_response.rb +++ b/lib/rainbows/dev_fd_response.rb @@ -36,6 +36,10 @@ module Rainbows else headers['X-Rainbows-Autochunk'] = 'no' end + + # we need to make sure our pipe output is Fiber-compatible + env["rainbows.model"] == :FiberSpawn and + return [ status, headers.to_hash, Fiber::IO.new(io,::Fiber.current) ] else # unlikely, char/block device file, directory, ... return response end diff --git a/lib/rainbows/fiber.rb b/lib/rainbows/fiber.rb new file mode 100644 index 0000000..e7d64ca --- /dev/null +++ b/lib/rainbows/fiber.rb @@ -0,0 +1,53 @@ +# -*- encoding: binary -*- +require 'fiber' +require 'rainbows/fiber/io' + +module Rainbows + module Fiber + RD = {} + WR = {} + + module Base + include Rainbows::Base + + def process_client(client) + G.cur += 1 + io = client.to_io + buf = client.read_timeout or return + hp = HttpParser.new + env = {} + alive = true + remote_addr = TCPSocket === io ? io.peeraddr.last : LOCALHOST + + begin # loop + while ! hp.headers(env, buf) + buf << client.read_timeout or return + end + + env[RACK_INPUT] = 0 == hp.content_length ? + HttpRequest::NULL_IO : TeeInput.new(client, env, hp, buf) + env[REMOTE_ADDR] = remote_addr + response = APP.call(env.update(RACK_DEFAULTS)) + + if 100 == response.first.to_i + client.write(EXPECT_100_RESPONSE) + env.delete(HTTP_EXPECT) + response = APP.call(env) + end + + alive = hp.keepalive? && G.alive + out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if hp.headers? + HttpResponse.write(client, response, out) + end while alive and hp.reset.nil? and env.clear + io.close + rescue => e + handle_error(io, e) + ensure + G.cur -= 1 + RD.delete(client) + WR.delete(client) + end + + end + end +end diff --git a/lib/rainbows/fiber/io.rb b/lib/rainbows/fiber/io.rb new file mode 100644 index 0000000..bc6c0fe --- /dev/null +++ b/lib/rainbows/fiber/io.rb @@ -0,0 +1,65 @@ +# -*- encoding: binary -*- +module Rainbows + module Fiber + + # A partially complete IO wrapper, this exports an IO.select()-able + # #to_io method and gives users the illusion of a synchronous + # interface that yields away from the current Fiber whenever + # the underlying IO object cannot read or write + class IO < Struct.new(:to_io, :f) + + # for wrapping output response bodies + def each(&block) + begin + yield readpartial(16384) + rescue EOFError + break + end while true + self + end + + def close + to_io.close + end + + def write(buf) + begin + (w = to_io.write_nonblock(buf)) == buf.size and return + buf = buf[w..-1] + rescue Errno::EAGAIN + WR[self] = false + ::Fiber.yield + WR.delete(self) + retry + end while true + end + + # used for reading headers (respecting keepalive_timeout) + def read_timeout + expire = false + begin + to_io.read_nonblock(16384) + rescue Errno::EAGAIN + return if expire && expire < Time.now + RD[self] = false + expire = Time.now + G.kato + ::Fiber.yield + RD.delete(self) + retry + end + end + + def readpartial(length, buf = "") + begin + to_io.read_nonblock(length, buf) + rescue Errno::EAGAIN + RD[self] = false + ::Fiber.yield + RD.delete(self) + retry + end + end + + end + end +end diff --git a/lib/rainbows/fiber_spawn.rb b/lib/rainbows/fiber_spawn.rb new file mode 100644 index 0000000..d1c1ec0 --- /dev/null +++ b/lib/rainbows/fiber_spawn.rb @@ -0,0 +1,56 @@ +# -*- encoding: binary -*- +require 'rainbows/fiber' + +module Rainbows + + # Simple Fiber-based concurrency model for 1.9. This spawns a new + # Fiber for every client connection. This exports a streaming + # "rack.input" with lightweight concurrency. Applications are + # strongly advised to wrap slow all IO objects (sockets, pipes) using + # the Rainbows::Fiber::IO class whenever possible. + + module FiberSpawn + include Fiber::Base + + def worker_loop(worker) + init_worker_process(worker) + Fiber::Base.const_set(:APP, app) + limit = worker_connections + rd = Rainbows::Fiber::RD + wr = Rainbows::Fiber::WR + fio = Rainbows::Fiber::IO + + begin + ret = begin + IO.select(rd.keys.concat(LISTENERS), wr.keys, nil, 1) or next + rescue Errno::EINTR + G.tick + retry + rescue Errno::EBADF, TypeError + LISTENERS.compact! + G.cur > 0 ? retry : break + end + G.tick + + # active writers first, then _all_ readers for keepalive timeout + ret[1].concat(rd.keys).each { |c| c.f.resume } + G.tick + + # accept() is an expensive syscall + (ret.first & LISTENERS).each do |l| + break if G.cur >= limit + io = begin + l.accept_nonblock + rescue Errno::EAGAIN, Errno::ECONNABORTED + next + end + ::Fiber.new { process_client(fio.new(io, ::Fiber.current)) }.resume + end + G.tick + rescue => e + listen_loop_error(e) + end while G.tick || G.cur > 0 + end + + end +end -- cgit v1.2.3-24-ge0c7