From 37a560c5d14c15a3da7f2c10c9ea3d6002b34fe1 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Sat, 28 Nov 2009 19:42:53 -0800 Subject: refactor threaded models to use blocking accept() if possible It's a tad faster for non-keepalive connections and should do better on large SMP machines with many workers AND threads. That means the ActorSpawn model in Rubinius is nothing more than ThreadSpawn underneath (for now). --- lib/rainbows/actor_spawn.rb | 33 ++++++++-------------------- lib/rainbows/base.rb | 10 +++------ lib/rainbows/thread_pool.rb | 51 ++++++++++++++++++++++++-------------------- lib/rainbows/thread_spawn.rb | 49 ++++++++++++++++++++++++------------------ 4 files changed, 68 insertions(+), 75 deletions(-) (limited to 'lib') diff --git a/lib/rainbows/actor_spawn.rb b/lib/rainbows/actor_spawn.rb index 2662f9f..98e85bc 100644 --- a/lib/rainbows/actor_spawn.rb +++ b/lib/rainbows/actor_spawn.rb @@ -5,12 +5,17 @@ module Rainbows # Actor concurrency model for Rubinius. We can't seem to get message # passing working right, so we're throwing a Mutex into the mix for - # now. Hopefully somebody can fix things for us. + # now. Hopefully somebody can fix things for us. Currently, this is + # exactly the same as the ThreadSpawn model since we don't use the + # message passing capabilities of the Actor model (and even then + # it wouldn't really make sense since Actors in Rubinius are just + # Threads underneath and our ThreadSpawn model is one layer of + # complexity less. # # This is different from the Revactor one which is not prone to race - # conditions at all (since it uses Fibers). + # conditions within the same process at all (since it uses Fibers). module ActorSpawn - include Base + include ThreadSpawn # runs inside each forked worker, this sits around and waits # for connections and doesn't die until the parent dies (or is @@ -18,27 +23,7 @@ module Rainbows def worker_loop(worker) Const::RACK_DEFAULTS["rack.multithread"] = true # :( init_worker_process(worker) - limit = worker_connections - nr = 0 - - # can't seem to get the message passing to work right at the moment :< - lock = Mutex.new - - begin - ret = IO.select(LISTENERS, nil, nil, 1) and ret.first.each do |l| - lock.synchronize { nr >= limit } and break sleep(0.01) - c = Rainbows.accept(l) and Actor.spawn do - lock.synchronize { nr += 1 } - begin - process_client(c) - ensure - lock.synchronize { nr -= 1 } - end - end - end - rescue => e - Error.listen_loop(e) - end while G.tick || lock.synchronize { nr > 0 } + accept_loop(Actor) end end end diff --git a/lib/rainbows/base.rb b/lib/rainbows/base.rb index 4be37f4..7ee5c03 100644 --- a/lib/rainbows/base.rb +++ b/lib/rainbows/base.rb @@ -70,15 +70,11 @@ module Rainbows end def join_threads(threads) - G.quit! expire = Time.now + (timeout * 2.0) - until (threads.delete_if { |thr| ! thr.alive? }).empty? - threads.each { |thr| - G.tick - thr.join(1) - break if Time.now >= expire - } + until threads.empty? || Time.now >= expire + threads.delete_if { |thr| thr.alive? ? thr.join(0.01) : true } end + exit!(0) unless threads.empty? end def self.included(klass) diff --git a/lib/rainbows/thread_pool.rb b/lib/rainbows/thread_pool.rb index f398828..917b835 100644 --- a/lib/rainbows/thread_pool.rb +++ b/lib/rainbows/thread_pool.rb @@ -27,39 +27,44 @@ module Rainbows def worker_loop(worker) init_worker_process(worker) - pool = (1..worker_connections).map { new_worker_thread } + pool = (1..worker_connections).map do + Thread.new { LISTENERS.size == 1 ? sync_worker : async_worker } + end while G.alive # if any worker dies, something is serious wrong, bail pool.each do |thr| - G.tick + G.tick or break thr.join(1) and G.quit! end end join_threads(pool) end - def new_worker_thread - Thread.new { - begin - begin - # TODO: check if select() or accept() is a problem on large - # SMP systems under Ruby 1.9. Hundreds of native threads - # all working off the same socket could be a thundering herd - # problem. On the other hand, a thundering herd may not - # even incur as much overhead as an extra Mutex#synchronize - ret = IO.select(LISTENERS, nil, nil, 1) and - ret.first.each do |s| - s = Rainbows.accept(s) and process_client(s) - end - rescue Errno::EINTR - rescue Errno::EBADF, TypeError - break - end - rescue => e - Error.listen_loop(e) - end while G.alive - } + def sync_worker + s = LISTENERS.first + begin + process_client(s.accept) + rescue Errno::EINTR, Errno::ECONNABORTED + rescue => e + Error.listen_loop(e) + end while G.alive + end + + def async_worker + begin + # TODO: check if select() or accept() is a problem on large + # SMP systems under Ruby 1.9. Hundreds of native threads + # all working off the same socket could be a thundering herd + # problem. On the other hand, a thundering herd may not + # even incur as much overhead as an extra Mutex#synchronize + ret = IO.select(LISTENERS, nil, nil, 1) and ret.first.each do |s| + s = Rainbows.accept(s) and process_client(s) + end + rescue Errno::EINTR + rescue => e + Error.listen_loop(e) + end while G.alive end end diff --git a/lib/rainbows/thread_spawn.rb b/lib/rainbows/thread_spawn.rb index 5afb91e..eb3ca75 100644 --- a/lib/rainbows/thread_spawn.rb +++ b/lib/rainbows/thread_spawn.rb @@ -1,4 +1,5 @@ # -*- encoding: binary -*- +require 'thread' module Rainbows # Spawns a new thread for every client connection we accept(). This @@ -19,36 +20,42 @@ module Rainbows include Base - def worker_loop(worker) - init_worker_process(worker) - threads = ThreadGroup.new + def accept_loop(klass) + lock = Mutex.new limit = worker_connections - - begin - ret = IO.select(LISTENERS, nil, nil, 1) and - ret.first.each do |l| - if threads.list.size > limit # unlikely + LISTENERS.each do |l| + klass.new(l) do |l| + begin + if lock.synchronize { G.cur >= limit } # Sleep if we're busy, another less busy worker process may # take it for us if we sleep. This is gross but other options # still suck because they require expensive/complicated # synchronization primitives for _every_ case, not just this # unlikely one. Since this case is (or should be) uncommon, # just busy wait when we have to. - sleep(0.1) # hope another process took it - break # back to IO.select + sleep(0.01) + else + klass.new(l.accept) do |c| + begin + lock.synchronize { G.cur += 1 } + process_client(c) + ensure + lock.synchronize { G.cur -= 1 } + end + end end - c = Rainbows.accept(l) and - threads.add(Thread.new { process_client(c) }) - end - rescue Errno::EINTR - retry - rescue Errno::EBADF, TypeError - break - rescue => e - Error.listen_loop(e) - end while G.tick - join_threads(threads.list) + rescue Errno::EINTR, Errno::ECONNABORTED + rescue => e + Error.listen_loop(e) + end while G.alive + end + end + sleep 1 while G.tick || lock.synchronize { G.cur > 0 } end + def worker_loop(worker) + init_worker_process(worker) + accept_loop(Thread) + end end end -- cgit v1.2.3-24-ge0c7