about summary refs log tree commit homepage
diff options
context:
space:
mode:
-rw-r--r--lib/rainbows/thread_pool.rb136
-rw-r--r--lib/rainbows/thread_spawn.rb82
2 files changed, 106 insertions, 112 deletions
diff --git a/lib/rainbows/thread_pool.rb b/lib/rainbows/thread_pool.rb
index 321d3e4..f6420ae 100644
--- a/lib/rainbows/thread_pool.rb
+++ b/lib/rainbows/thread_pool.rb
@@ -1,82 +1,78 @@
 # -*- encoding: binary -*-
 
-module Rainbows
+# Implements a worker thread pool model.  This is suited for platforms
+# like Ruby 1.9, where the cost of dynamically spawning a new thread
+# for every new client connection is higher than with the ThreadSpawn
+# model.
+#
+# This model should provide a high level of compatibility with all
+# Ruby implementations, and most libraries and applications.
+# Applications running under this model should be thread-safe
+# but not necessarily reentrant.
+#
+# Applications using this model are required to be thread-safe.
+# Threads are never spawned dynamically under this model.  If you're
+# connecting to external services and need to perform DNS lookups,
+# consider using the "resolv-replace" library which replaces parts of
+# the core Socket package with concurrent DNS lookup capabilities.
+#
+# This model probably less suited for many slow clients than the
+# others and thus a lower +worker_connections+ setting is recommended.
 
-  # Implements a worker thread pool model.  This is suited for platforms
-  # like Ruby 1.9, where the cost of dynamically spawning a new thread
-  # for every new client connection is higher than with the ThreadSpawn
-  # model.
-  #
-  # This model should provide a high level of compatibility with all
-  # Ruby implementations, and most libraries and applications.
-  # Applications running under this model should be thread-safe
-  # but not necessarily reentrant.
-  #
-  # Applications using this model are required to be thread-safe.
-  # Threads are never spawned dynamically under this model.  If you're
-  # connecting to external services and need to perform DNS lookups,
-  # consider using the "resolv-replace" library which replaces parts of
-  # the core Socket package with concurrent DNS lookup capabilities.
-  #
-  # This model probably less suited for many slow clients than the
-  # others and thus a lower +worker_connections+ setting is recommended.
+module Rainbows::ThreadPool
+  include Rainbows::Base
 
-  module ThreadPool
-    include Base
-
-    def worker_loop(worker) # :nodoc:
-      init_worker_process(worker)
-      pool = (1..worker_connections).map do
-        Thread.new { LISTENERS.size == 1 ? sync_worker : async_worker }
-      end
-
-      while G.alive
-        # if any worker dies, something is serious wrong, bail
-        pool.each do |thr|
-          G.tick or break
-          thr.join(1) and G.quit!
-        end
-      end
-      join_threads(pool)
+  def worker_loop(worker) # :nodoc:
+    init_worker_process(worker)
+    pool = (1..worker_connections).map do
+      Thread.new { LISTENERS.size == 1 ? sync_worker : async_worker }
     end
 
-    def sync_worker # :nodoc:
-      s = LISTENERS[0]
-      begin
-        c = s.kgio_accept and process_client(c)
-      rescue => e
-        Error.listen_loop(e)
-      end while G.alive
+    while G.alive
+      # if any worker dies, something is serious wrong, bail
+      pool.each do |thr|
+        G.tick or break
+        thr.join(1) and G.quit!
+      end
     end
+    join_threads(pool)
+  end
 
-    def async_worker # :nodoc:
-      begin
-        # TODO: check if select() or accept() is a problem on large
-        # SMP systems under Ruby 1.9.  Hundreds of native threads
-        # all working off the same socket could be a thundering herd
-        # problem.  On the other hand, a thundering herd may not
-        # even incur as much overhead as an extra Mutex#synchronize
-        ret = IO.select(LISTENERS, nil, nil, 1) and ret[0].each do |s|
-          s = s.kgio_tryaccept and process_client(s)
-        end
-      rescue Errno::EINTR
-      rescue => e
-        Error.listen_loop(e)
-      end while G.alive
-    end
+  def sync_worker # :nodoc:
+    s = LISTENERS[0]
+    begin
+      c = s.kgio_accept and process_client(c)
+    rescue => e
+      Rainbows::Error.listen_loop(e)
+    end while G.alive
+  end
 
-    def join_threads(threads) # :nodoc:
-      G.quit!
-      threads.delete_if do |thr|
-        G.tick
-        begin
-          thr.run
-          thr.join(0.01)
-        rescue
-          true
-        end
-      end until threads.empty?
-    end
+  def async_worker # :nodoc:
+    begin
+      # TODO: check if select() or accept() is a problem on large
+      # SMP systems under Ruby 1.9.  Hundreds of native threads
+      # all working off the same socket could be a thundering herd
+      # problem.  On the other hand, a thundering herd may not
+      # even incur as much overhead as an extra Mutex#synchronize
+      ret = IO.select(LISTENERS, nil, nil, 1) and ret[0].each do |s|
+        s = s.kgio_tryaccept and process_client(s)
+      end
+    rescue Errno::EINTR
+    rescue => e
+      Rainbows::Error.listen_loop(e)
+    end while G.alive
+  end
 
+  def join_threads(threads) # :nodoc:
+    G.quit!
+    threads.delete_if do |thr|
+      G.tick
+      begin
+        thr.run
+        thr.join(0.01)
+      rescue
+        true
+      end
+    end until threads.empty?
   end
 end
diff --git a/lib/rainbows/thread_spawn.rb b/lib/rainbows/thread_spawn.rb
index 9da75f1..acdaa69 100644
--- a/lib/rainbows/thread_spawn.rb
+++ b/lib/rainbows/thread_spawn.rb
@@ -1,54 +1,52 @@
 # -*- encoding: binary -*-
 require 'thread'
-module Rainbows
 
-  # Spawns a new thread for every client connection we accept().  This
-  # model is recommended for platforms like Ruby 1.8 where spawning new
-  # threads is inexpensive.
-  #
-  # This model should provide a high level of compatibility with all
-  # Ruby implementations, and most libraries and applications.
-  # Applications running under this model should be thread-safe
-  # but not necessarily reentrant.
-  #
-  # If you're connecting to external services and need to perform DNS
-  # lookups, consider using the "resolv-replace" library which replaces
-  # parts of the core Socket package with concurrent DNS lookup
-  # capabilities
+# Spawns a new thread for every client connection we accept().  This
+# model is recommended for platforms like Ruby 1.8 where spawning new
+# threads is inexpensive.
+#
+# This model should provide a high level of compatibility with all
+# Ruby implementations, and most libraries and applications.
+# Applications running under this model should be thread-safe
+# but not necessarily reentrant.
+#
+# If you're connecting to external services and need to perform DNS
+# lookups, consider using the "resolv-replace" library which replaces
+# parts of the core Socket package with concurrent DNS lookup
+# capabilities
 
-  module ThreadSpawn
-    include Base
-    include Rainbows::WorkerYield
+module Rainbows::ThreadSpawn
+  include Rainbows::Base
+  include Rainbows::WorkerYield
 
-    def accept_loop(klass) #:nodoc:
-      lock = Mutex.new
-      limit = worker_connections
-      LISTENERS.each do |l|
-        klass.new(l) do |l|
-          begin
-            if lock.synchronize { G.cur >= limit }
-              worker_yield
-            elsif c = l.kgio_accept
-              klass.new(c) do |c|
-                begin
-                  lock.synchronize { G.cur += 1 }
-                  process_client(c)
-                ensure
-                  lock.synchronize { G.cur -= 1 }
-                end
+  def accept_loop(klass) #:nodoc:
+    lock = Mutex.new
+    limit = worker_connections
+    LISTENERS.each do |l|
+      klass.new(l) do |l|
+        begin
+          if lock.synchronize { G.cur >= limit }
+            worker_yield
+          elsif c = l.kgio_accept
+            klass.new(c) do |c|
+              begin
+                lock.synchronize { G.cur += 1 }
+                process_client(c)
+              ensure
+                lock.synchronize { G.cur -= 1 }
               end
             end
-          rescue => e
-            Error.listen_loop(e)
-          end while G.alive
-        end
+          end
+        rescue => e
+          Rainbows::Error.listen_loop(e)
+        end while G.alive
       end
-      sleep 1 while G.tick || lock.synchronize { G.cur > 0 }
     end
+    sleep 1 while G.tick || lock.synchronize { G.cur > 0 }
+  end
 
-    def worker_loop(worker) #:nodoc:
-      init_worker_process(worker)
-      accept_loop(Thread)
-    end
+  def worker_loop(worker) #:nodoc:
+    init_worker_process(worker)
+    accept_loop(Thread)
   end
 end