From 37a560c5d14c15a3da7f2c10c9ea3d6002b34fe1 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Sat, 28 Nov 2009 19:42:53 -0800 Subject: refactor threaded models to use blocking accept() if possible It's a tad faster for non-keepalive connections and should do better on large SMP machines with many workers AND threads. That means the ActorSpawn model in Rubinius is nothing more than ThreadSpawn underneath (for now). --- lib/rainbows/thread_spawn.rb | 49 +++++++++++++++++++++++++------------------- 1 file changed, 28 insertions(+), 21 deletions(-) (limited to 'lib/rainbows/thread_spawn.rb') diff --git a/lib/rainbows/thread_spawn.rb b/lib/rainbows/thread_spawn.rb index 5afb91e..eb3ca75 100644 --- a/lib/rainbows/thread_spawn.rb +++ b/lib/rainbows/thread_spawn.rb @@ -1,4 +1,5 @@ # -*- encoding: binary -*- +require 'thread' module Rainbows # Spawns a new thread for every client connection we accept(). This @@ -19,36 +20,42 @@ module Rainbows include Base - def worker_loop(worker) - init_worker_process(worker) - threads = ThreadGroup.new + def accept_loop(klass) + lock = Mutex.new limit = worker_connections - - begin - ret = IO.select(LISTENERS, nil, nil, 1) and - ret.first.each do |l| - if threads.list.size > limit # unlikely + LISTENERS.each do |l| + klass.new(l) do |l| + begin + if lock.synchronize { G.cur >= limit } # Sleep if we're busy, another less busy worker process may # take it for us if we sleep. This is gross but other options # still suck because they require expensive/complicated # synchronization primitives for _every_ case, not just this # unlikely one. Since this case is (or should be) uncommon, # just busy wait when we have to. - sleep(0.1) # hope another process took it - break # back to IO.select + sleep(0.01) + else + klass.new(l.accept) do |c| + begin + lock.synchronize { G.cur += 1 } + process_client(c) + ensure + lock.synchronize { G.cur -= 1 } + end + end end - c = Rainbows.accept(l) and - threads.add(Thread.new { process_client(c) }) - end - rescue Errno::EINTR - retry - rescue Errno::EBADF, TypeError - break - rescue => e - Error.listen_loop(e) - end while G.tick - join_threads(threads.list) + rescue Errno::EINTR, Errno::ECONNABORTED + rescue => e + Error.listen_loop(e) + end while G.alive + end + end + sleep 1 while G.tick || lock.synchronize { G.cur > 0 } end + def worker_loop(worker) + init_worker_process(worker) + accept_loop(Thread) + end end end -- cgit v1.2.3-24-ge0c7