From 4d8304bf0aa5665e8f8474dfb96019297fa0c2b9 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Wed, 25 Nov 2009 01:44:34 -0800 Subject: add FiberSpawn concurrency model This one seems a easy to get working and supports everything we need to support from the server perspective. Apps will need modified drivers, but it doesn't seem too hard to add more/better support for wrapping IO objects with Fiber::IO. --- lib/rainbows/fiber_spawn.rb | 56 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 56 insertions(+) create mode 100644 lib/rainbows/fiber_spawn.rb (limited to 'lib/rainbows/fiber_spawn.rb') diff --git a/lib/rainbows/fiber_spawn.rb b/lib/rainbows/fiber_spawn.rb new file mode 100644 index 0000000..d1c1ec0 --- /dev/null +++ b/lib/rainbows/fiber_spawn.rb @@ -0,0 +1,56 @@ +# -*- encoding: binary -*- +require 'rainbows/fiber' + +module Rainbows + + # Simple Fiber-based concurrency model for 1.9. This spawns a new + # Fiber for every client connection. This exports a streaming + # "rack.input" with lightweight concurrency. Applications are + # strongly advised to wrap slow all IO objects (sockets, pipes) using + # the Rainbows::Fiber::IO class whenever possible. + + module FiberSpawn + include Fiber::Base + + def worker_loop(worker) + init_worker_process(worker) + Fiber::Base.const_set(:APP, app) + limit = worker_connections + rd = Rainbows::Fiber::RD + wr = Rainbows::Fiber::WR + fio = Rainbows::Fiber::IO + + begin + ret = begin + IO.select(rd.keys.concat(LISTENERS), wr.keys, nil, 1) or next + rescue Errno::EINTR + G.tick + retry + rescue Errno::EBADF, TypeError + LISTENERS.compact! + G.cur > 0 ? retry : break + end + G.tick + + # active writers first, then _all_ readers for keepalive timeout + ret[1].concat(rd.keys).each { |c| c.f.resume } + G.tick + + # accept() is an expensive syscall + (ret.first & LISTENERS).each do |l| + break if G.cur >= limit + io = begin + l.accept_nonblock + rescue Errno::EAGAIN, Errno::ECONNABORTED + next + end + ::Fiber.new { process_client(fio.new(io, ::Fiber.current)) }.resume + end + G.tick + rescue => e + listen_loop_error(e) + end while G.tick || G.cur > 0 + end + + end +end -- cgit v1.2.3-24-ge0c7