about summary refs log tree commit homepage
path: root/lib
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2009-11-28 19:42:53 -0800
committerEric Wong <normalperson@yhbt.net>2009-11-29 12:35:44 -0800
commit37a560c5d14c15a3da7f2c10c9ea3d6002b34fe1 (patch)
tree8d163646e4ba3586cafde788804c580e3315431c /lib
parent50fb5151bd44137adace51a0652f4d01d851790c (diff)
downloadrainbows-37a560c5d14c15a3da7f2c10c9ea3d6002b34fe1.tar.gz
It's a tad faster for non-keepalive connections and should do
better on large SMP machines with many workers AND threads.
That means the ActorSpawn model in Rubinius is nothing more than
ThreadSpawn underneath (for now).
Diffstat (limited to 'lib')
-rw-r--r--lib/rainbows/actor_spawn.rb33
-rw-r--r--lib/rainbows/base.rb10
-rw-r--r--lib/rainbows/thread_pool.rb51
-rw-r--r--lib/rainbows/thread_spawn.rb49
4 files changed, 68 insertions, 75 deletions
diff --git a/lib/rainbows/actor_spawn.rb b/lib/rainbows/actor_spawn.rb
index 2662f9f..98e85bc 100644
--- a/lib/rainbows/actor_spawn.rb
+++ b/lib/rainbows/actor_spawn.rb
@@ -5,12 +5,17 @@ module Rainbows
 
   # Actor concurrency model for Rubinius.  We can't seem to get message
   # passing working right, so we're throwing a Mutex into the mix for
-  # now.  Hopefully somebody can fix things for us.
+  # now.  Hopefully somebody can fix things for us.  Currently, this is
+  # exactly the same as the ThreadSpawn model since we don't use the
+  # message passing capabilities of the Actor model (and even then
+  # it wouldn't really make sense since Actors in Rubinius are just
+  # Threads underneath and our ThreadSpawn model is one layer of
+  # complexity less.
   #
   # This is different from the Revactor one which is not prone to race
-  # conditions at all (since it uses Fibers).
+  # conditions within the same process at all (since it uses Fibers).
   module ActorSpawn
-    include Base
+    include ThreadSpawn
 
     # runs inside each forked worker, this sits around and waits
     # for connections and doesn't die until the parent dies (or is
@@ -18,27 +23,7 @@ module Rainbows
     def worker_loop(worker)
       Const::RACK_DEFAULTS["rack.multithread"] = true # :(
       init_worker_process(worker)
-      limit = worker_connections
-      nr = 0
-
-      # can't seem to get the message passing to work right at the moment :<
-      lock = Mutex.new
-
-      begin
-        ret = IO.select(LISTENERS, nil, nil, 1) and ret.first.each do |l|
-          lock.synchronize { nr >= limit } and break sleep(0.01)
-          c = Rainbows.accept(l) and Actor.spawn do
-            lock.synchronize { nr += 1 }
-            begin
-              process_client(c)
-            ensure
-              lock.synchronize { nr -= 1 }
-            end
-          end
-        end
-      rescue => e
-        Error.listen_loop(e)
-      end while G.tick || lock.synchronize { nr > 0 }
+      accept_loop(Actor)
     end
   end
 end
diff --git a/lib/rainbows/base.rb b/lib/rainbows/base.rb
index 4be37f4..7ee5c03 100644
--- a/lib/rainbows/base.rb
+++ b/lib/rainbows/base.rb
@@ -70,15 +70,11 @@ module Rainbows
     end
 
     def join_threads(threads)
-      G.quit!
       expire = Time.now + (timeout * 2.0)
-      until (threads.delete_if { |thr| ! thr.alive? }).empty?
-        threads.each { |thr|
-          G.tick
-          thr.join(1)
-          break if Time.now >= expire
-        }
+      until threads.empty? || Time.now >= expire
+        threads.delete_if { |thr| thr.alive? ? thr.join(0.01) : true }
       end
+      exit!(0) unless threads.empty?
     end
 
     def self.included(klass)
diff --git a/lib/rainbows/thread_pool.rb b/lib/rainbows/thread_pool.rb
index f398828..917b835 100644
--- a/lib/rainbows/thread_pool.rb
+++ b/lib/rainbows/thread_pool.rb
@@ -27,39 +27,44 @@ module Rainbows
 
     def worker_loop(worker)
       init_worker_process(worker)
-      pool = (1..worker_connections).map { new_worker_thread }
+      pool = (1..worker_connections).map do
+        Thread.new { LISTENERS.size == 1 ? sync_worker : async_worker }
+      end
 
       while G.alive
         # if any worker dies, something is serious wrong, bail
         pool.each do |thr|
-          G.tick
+          G.tick or break
           thr.join(1) and G.quit!
         end
       end
       join_threads(pool)
     end
 
-    def new_worker_thread
-      Thread.new {
-        begin
-          begin
-            # TODO: check if select() or accept() is a problem on large
-            # SMP systems under Ruby 1.9.  Hundreds of native threads
-            # all working off the same socket could be a thundering herd
-            # problem.  On the other hand, a thundering herd may not
-            # even incur as much overhead as an extra Mutex#synchronize
-            ret = IO.select(LISTENERS, nil, nil, 1) and
-                  ret.first.each do |s|
-                    s = Rainbows.accept(s) and process_client(s)
-                  end
-          rescue Errno::EINTR
-          rescue Errno::EBADF, TypeError
-            break
-          end
-        rescue => e
-          Error.listen_loop(e)
-        end while G.alive
-      }
+    def sync_worker
+      s = LISTENERS.first
+      begin
+        process_client(s.accept)
+      rescue Errno::EINTR, Errno::ECONNABORTED
+      rescue => e
+        Error.listen_loop(e)
+      end while G.alive
+    end
+
+    def async_worker
+      begin
+        # TODO: check if select() or accept() is a problem on large
+        # SMP systems under Ruby 1.9.  Hundreds of native threads
+        # all working off the same socket could be a thundering herd
+        # problem.  On the other hand, a thundering herd may not
+        # even incur as much overhead as an extra Mutex#synchronize
+        ret = IO.select(LISTENERS, nil, nil, 1) and ret.first.each do |s|
+          s = Rainbows.accept(s) and process_client(s)
+        end
+      rescue Errno::EINTR
+      rescue => e
+        Error.listen_loop(e)
+      end while G.alive
     end
 
   end
diff --git a/lib/rainbows/thread_spawn.rb b/lib/rainbows/thread_spawn.rb
index 5afb91e..eb3ca75 100644
--- a/lib/rainbows/thread_spawn.rb
+++ b/lib/rainbows/thread_spawn.rb
@@ -1,4 +1,5 @@
 # -*- encoding: binary -*-
+require 'thread'
 module Rainbows
 
   # Spawns a new thread for every client connection we accept().  This
@@ -19,36 +20,42 @@ module Rainbows
 
     include Base
 
-    def worker_loop(worker)
-      init_worker_process(worker)
-      threads = ThreadGroup.new
+    def accept_loop(klass)
+      lock = Mutex.new
       limit = worker_connections
-
-      begin
-        ret = IO.select(LISTENERS, nil, nil, 1) and
-          ret.first.each do |l|
-            if threads.list.size > limit # unlikely
+      LISTENERS.each do |l|
+        klass.new(l) do |l|
+          begin
+            if lock.synchronize { G.cur >= limit }
               # Sleep if we're busy, another less busy worker process may
               # take it for us if we sleep. This is gross but other options
               # still suck because they require expensive/complicated
               # synchronization primitives for _every_ case, not just this
               # unlikely one.  Since this case is (or should be) uncommon,
               # just busy wait when we have to.
-              sleep(0.1) # hope another process took it
-              break # back to IO.select
+              sleep(0.01)
+            else
+              klass.new(l.accept) do |c|
+                begin
+                  lock.synchronize { G.cur += 1 }
+                  process_client(c)
+                ensure
+                  lock.synchronize { G.cur -= 1 }
+                end
+              end
             end
-            c = Rainbows.accept(l) and
-              threads.add(Thread.new { process_client(c) })
-          end
-      rescue Errno::EINTR
-        retry
-      rescue Errno::EBADF, TypeError
-        break
-      rescue => e
-        Error.listen_loop(e)
-      end while G.tick
-      join_threads(threads.list)
+          rescue Errno::EINTR, Errno::ECONNABORTED
+          rescue => e
+            Error.listen_loop(e)
+          end while G.alive
+        end
+      end
+      sleep 1 while G.tick || lock.synchronize { G.cur > 0 }
     end
 
+    def worker_loop(worker)
+      init_worker_process(worker)
+      accept_loop(Thread)
+    end
   end
 end