diff options
author | Eric Wong <normalperson@yhbt.net> | 2009-11-25 01:44:34 -0800 |
---|---|---|
committer | Eric Wong <normalperson@yhbt.net> | 2009-11-25 03:13:12 -0800 |
commit | 4d8304bf0aa5665e8f8474dfb96019297fa0c2b9 (patch) | |
tree | 676418a49b5575209c00fb3bcb83db10df8834bc /lib/rainbows/fiber_spawn.rb | |
parent | 2bc6e7a3c4e972ee3227d931e79bc4057ba278ca (diff) | |
download | rainbows-4d8304bf0aa5665e8f8474dfb96019297fa0c2b9.tar.gz |
This one seems a easy to get working and supports everything we need to support from the server perspective. Apps will need modified drivers, but it doesn't seem too hard to add more/better support for wrapping IO objects with Fiber::IO.
Diffstat (limited to 'lib/rainbows/fiber_spawn.rb')
-rw-r--r-- | lib/rainbows/fiber_spawn.rb | 56 |
1 files changed, 56 insertions, 0 deletions
diff --git a/lib/rainbows/fiber_spawn.rb b/lib/rainbows/fiber_spawn.rb new file mode 100644 index 0000000..d1c1ec0 --- /dev/null +++ b/lib/rainbows/fiber_spawn.rb @@ -0,0 +1,56 @@ +# -*- encoding: binary -*- +require 'rainbows/fiber' + +module Rainbows + + # Simple Fiber-based concurrency model for 1.9. This spawns a new + # Fiber for every client connection. This exports a streaming + # "rack.input" with lightweight concurrency. Applications are + # strongly advised to wrap slow all IO objects (sockets, pipes) using + # the Rainbows::Fiber::IO class whenever possible. + + module FiberSpawn + include Fiber::Base + + def worker_loop(worker) + init_worker_process(worker) + Fiber::Base.const_set(:APP, app) + limit = worker_connections + rd = Rainbows::Fiber::RD + wr = Rainbows::Fiber::WR + fio = Rainbows::Fiber::IO + + begin + ret = begin + IO.select(rd.keys.concat(LISTENERS), wr.keys, nil, 1) or next + rescue Errno::EINTR + G.tick + retry + rescue Errno::EBADF, TypeError + LISTENERS.compact! + G.cur > 0 ? retry : break + end + G.tick + + # active writers first, then _all_ readers for keepalive timeout + ret[1].concat(rd.keys).each { |c| c.f.resume } + G.tick + + # accept() is an expensive syscall + (ret.first & LISTENERS).each do |l| + break if G.cur >= limit + io = begin + l.accept_nonblock + rescue Errno::EAGAIN, Errno::ECONNABORTED + next + end + ::Fiber.new { process_client(fio.new(io, ::Fiber.current)) }.resume + end + G.tick + rescue => e + listen_loop_error(e) + end while G.tick || G.cur > 0 + end + + end +end |