From 7da8f7696fafc22a50dbcded6ca44cad7ae32ab6 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Wed, 25 Nov 2009 10:18:02 -0800 Subject: add FiberPool concurrency model This is another Fiber-based concurrency model that can exploit a streaming "rack.input" for clients. Spawning Fibers seems pretty fast, but maybe there are apps that will benefit from this. --- lib/rainbows.rb | 1 + lib/rainbows/dev_fd_response.rb | 4 ++- lib/rainbows/fiber_pool.rb | 61 +++++++++++++++++++++++++++++++++++++++++ t/GNUmakefile | 1 + t/simple-http_FiberPool.ru | 9 ++++++ 5 files changed, 75 insertions(+), 1 deletion(-) create mode 100644 lib/rainbows/fiber_pool.rb create mode 100644 t/simple-http_FiberPool.ru diff --git a/lib/rainbows.rb b/lib/rainbows.rb index 20f6ea8..27f2df2 100644 --- a/lib/rainbows.rb +++ b/lib/rainbows.rb @@ -77,6 +77,7 @@ module Rainbows :RevThreadSpawn => 50, :EventMachine => 50, :FiberSpawn => 50, + :FiberPool => 50, }.each do |model, _| u = model.to_s.gsub(/([a-z0-9])([A-Z0-9])/) { "#{$1}_#{$2.downcase!}" } autoload model, "rainbows/#{u.downcase!}" diff --git a/lib/rainbows/dev_fd_response.rb b/lib/rainbows/dev_fd_response.rb index 5b90d24..9ad326c 100644 --- a/lib/rainbows/dev_fd_response.rb +++ b/lib/rainbows/dev_fd_response.rb @@ -38,8 +38,10 @@ module Rainbows end # we need to make sure our pipe output is Fiber-compatible - env["rainbows.model"] == :FiberSpawn and + case env["rainbows.model"] + when :FiberSpawn, :FiberPool return [ status, headers.to_hash, Fiber::IO.new(io,::Fiber.current) ] + end else # unlikely, char/block device file, directory, ... return response end diff --git a/lib/rainbows/fiber_pool.rb b/lib/rainbows/fiber_pool.rb new file mode 100644 index 0000000..8c408b9 --- /dev/null +++ b/lib/rainbows/fiber_pool.rb @@ -0,0 +1,61 @@ +# -*- encoding: binary -*- +require 'rainbows/fiber' +require 'pp' + +module Rainbows + + # A Fiber-based concurrency model for Ruby 1.9. This uses a pool of + # Fibers to handle client IO to run the application and the root Fiber + # for scheduling and connection acceptance. The pool size is equal to + # the number of +worker_connections+. This model supports a streaming + # "rack.input" with lightweight concurrency. Applications are + # strongly advised to wrap slow all IO objects (sockets, pipes) using + # the Rainbows::Fiber::IO class whenever possible. + + module FiberPool + include Fiber::Base + + def worker_loop(worker) + init_worker_process(worker) + pool = [] + worker_connections.times { + ::Fiber.new { + process_client(::Fiber.yield) while pool << ::Fiber.current + }.resume # resume to hit ::Fiber.yield so it waits on a client + } + Fiber::Base.const_set(:APP, app) + rd = Fiber::RD + wr = Fiber::WR + + begin + ret = begin + G.tick + IO.select(rd.keys.concat(LISTENERS), wr.keys, nil, 1) or next + rescue Errno::EINTR + retry + rescue Errno::EBADF, TypeError + LISTENERS.compact! + G.cur > 0 ? retry : break + end + + # active writers first, then _all_ readers for keepalive timeout + ret[1].concat(rd.keys).each { |c| c.f.resume } + + # accept() is an expensive syscall + (ret.first & LISTENERS).each do |l| + fib = pool.shift or break + io = begin + l.accept_nonblock + rescue Errno::EAGAIN, Errno::ECONNABORTED + pool << fib + next + end + fib.resume(Fiber::IO.new(io, fib)) + end + rescue => e + listen_loop_error(e) + end while G.alive || G.cur > 0 + end + + end +end diff --git a/t/GNUmakefile b/t/GNUmakefile index 359f300..934ce47 100644 --- a/t/GNUmakefile +++ b/t/GNUmakefile @@ -22,6 +22,7 @@ models := ThreadPool ThreadSpawn Rev EventMachine ifeq ($(RUBY_VERSION),1.9.1) # 1.9.2-preview1 was broken models += Revactor models += FiberSpawn + models += FiberPool # technically this works under 1.8, too, it's just slow models += RevThreadSpawn diff --git a/t/simple-http_FiberPool.ru b/t/simple-http_FiberPool.ru new file mode 100644 index 0000000..ae5b649 --- /dev/null +++ b/t/simple-http_FiberPool.ru @@ -0,0 +1,9 @@ +use Rack::ContentLength +use Rack::ContentType +run lambda { |env| + if env['rack.multithread'] == false && env['rainbows.model'] == :FiberPool + [ 200, {}, [ Thread.current.inspect << "\n" ] ] + else + raise "rack.multithread is not true" + end +} -- cgit v1.2.3-24-ge0c7