about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2009-10-11 12:15:47 -0700
committerEric Wong <normalperson@yhbt.net>2009-10-11 16:55:58 -0700
commitdf204a05d3a5bda8f716fa9f51be464fa59a3af1 (patch)
tree1391300d9dfe0a01a5f38958e3cf8c4c30c2fa0a
parent4f8ae9abbb985a4091acbb7f57fb7f88fa2d43ba (diff)
downloadrainbows-df204a05d3a5bda8f716fa9f51be464fa59a3af1.tar.gz
The process-based heartbeat continues, but we no longer time
threads out just because a client is idle for any reason (for
now).
-rw-r--r--lib/rainbows/base.rb26
-rw-r--r--lib/rainbows/thread_pool.rb55
-rw-r--r--lib/rainbows/thread_spawn.rb49
3 files changed, 53 insertions, 77 deletions
diff --git a/lib/rainbows/base.rb b/lib/rainbows/base.rb
index 2da6d41..8a38117 100644
--- a/lib/rainbows/base.rb
+++ b/lib/rainbows/base.rb
@@ -2,7 +2,8 @@
 
 module Rainbows
 
-  # base class for Rainbows concurrency models
+  # base class for Rainbows concurrency models, this is currently
+  # used by ThreadSpawn and ThreadPool models
   module Base
 
     include Unicorn
@@ -41,10 +42,10 @@ module Rainbows
       buf = client.readpartial(CHUNK_SIZE)
       hp = HttpParser.new
       env = {}
+      alive = true
       remote_addr = TCPSocket === client ? client.peeraddr.last : LOCALHOST
 
       begin # loop
-        Thread.current[:t] = Time.now
         while ! hp.headers(env, buf)
           buf << client.readpartial(CHUNK_SIZE)
         end
@@ -61,9 +62,10 @@ module Rainbows
           response = app.call(env)
         end
 
-        out = [ hp.keepalive? ? CONN_ALIVE : CONN_CLOSE ] if hp.headers?
+        alive = hp.keepalive? && ! Thread.current[:quit]
+        out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if hp.headers?
         HttpResponse.write(client, response, out)
-      end while hp.keepalive? and hp.reset.nil? and env.clear
+      end while alive and hp.reset.nil? and env.clear
       client.close
     # if we get any error, try to write something back to the client
     # assuming we haven't closed the socket, but don't get hung up
@@ -79,6 +81,22 @@ module Rainbows
       logger.error e.backtrace.join("\n")
     end
 
+    def join_threads(threads)
+      logger.info "Joining threads..."
+      threads.each { |thr| thr[:quit] = true }
+      t0 = Time.now
+      timeleft = timeout * 2.0
+      m = 0
+      while (nr = threads.count { |thr| thr.alive? }) > 0 && timeleft > 0
+        threads.each { |thr|
+          worker.tmp.chmod(m = 0 == m ? 1 : 0)
+          thr.join(1)
+          break if (timeleft -= (Time.now - t0)) < 0
+        }
+      end
+      logger.info "Done joining threads. #{nr} left running"
+    end
+
     def self.included(klass)
       klass.const_set :LISTENERS, HttpServer::LISTENERS
     end
diff --git a/lib/rainbows/thread_pool.rb b/lib/rainbows/thread_pool.rb
index c26f47b..647436b 100644
--- a/lib/rainbows/thread_pool.rb
+++ b/lib/rainbows/thread_pool.rb
@@ -20,63 +20,38 @@ module Rainbows
 
     def worker_loop(worker)
       init_worker_process(worker)
-      threads = ThreadGroup.new
-      alive = worker.tmp
+      pool = (1..worker_connections).map { new_worker_thread }
       m = 0
 
       while LISTENERS.first && master_pid == Process.ppid
-        maintain_thread_count(threads)
-        threads.list.each do |thr|
-          alive.chmod(m = 0 == m ? 1 : 0)
+        pool.each do |thr|
+          worker.tmp.chmod(m = 0 == m ? 1 : 0)
+          # if any worker dies, something is serious wrong, bail
           thr.join(timeout) and break
         end
       end
-      join_worker_threads(threads)
-    end
-
-    def join_worker_threads(threads)
-      logger.info "Joining worker threads..."
-      t0 = Time.now
-      timeleft = timeout
-      threads.list.each { |thr|
-        thr.join(timeleft)
-        timeleft -= (Time.now - t0)
-      }
-      logger.info "Done joining worker threads."
-    end
-
-    def maintain_thread_count(threads)
-      threads.list.each do |thr|
-        next if (Time.now - (thr[:t] || next)) < timeout
-        thr.kill
-        logger.error "killed #{thr.inspect} for being too old"
-      end
-
-      while threads.list.size < worker_connections
-        threads.add(new_worker_thread)
-      end
+      join_threads(threads)
     end
 
     def new_worker_thread
       Thread.new {
         begin
-          ret = begin
-            Thread.current[:t] = Time.now
-            IO.select(LISTENERS, nil, nil, timeout) or next
+          begin
+            ret = IO.select(LISTENERS, nil, nil, timeout) or next
+            ret.first.each do |sock|
+              begin
+                process_client(sock.accept_nonblock)
+              rescue Errno::EAGAIN, Errno::ECONNABORTED
+              end
+            end
           rescue Errno::EINTR
-            retry
+            next
           rescue Errno::EBADF, TypeError
             return
           end
-          ret.first.each do |sock|
-            begin
-              process_client(sock.accept_nonblock)
-            rescue Errno::EAGAIN, Errno::ECONNABORTED
-            end
-          end
         rescue Object => e
           listen_loop_error(e) if LISTENERS.first
-        end while LISTENERS.first
+        end while ! Thread.current[:quit] && LISTENERS.first
       }
     end
 
diff --git a/lib/rainbows/thread_spawn.rb b/lib/rainbows/thread_spawn.rb
index c9fd23c..77cc3f2 100644
--- a/lib/rainbows/thread_spawn.rb
+++ b/lib/rainbows/thread_spawn.rb
@@ -29,45 +29,28 @@ module Rainbows
         rescue Errno::EBADF, TypeError
           break
         end
+        alive.chmod(m = 0 == m ? 1 : 0)
 
         ret.first.each do |l|
-          nuke_old_thread(threads, limit)
-          c = begin
-            l.accept_nonblock
+          # Sleep if we're busy, another less busy worker process may
+          # take it for us if we sleep. This is gross but other options
+          # still suck because they require expensive/complicated
+          # synchronization primitives for _every_ case, not just this
+          # unlikely one.  Since this case is (or should be) uncommon,
+          # just busy wait when we have to.
+          while threads.list.size > limit # unlikely
+            sleep(0.1) # hope another process took it
+            break # back to IO.select
+          end
+          begin
+            threads.add(Thread.new(l.accept_nonblock) {|c| process_client(c) })
           rescue Errno::EAGAIN, Errno::ECONNABORTED
-            next
           end
-          threads.add(Thread.new(c) { |c| process_client(c) })
         end
       rescue Object => e
-        listen_loop_error(e) if alive
-      end while alive && master_pid == Process.ppid
-      join_spawned_threads(threads)
-    end
-
-    def nuke_old_thread(threads, limit)
-      while (list = threads.list).size > limit
-        list.each do |thr|
-          thr.alive? or return # it _just_ died, we don't need it
-          next if (age = (Time.now - (thr[:t] || next))) < timeout
-          thr.kill # no-op if already dead
-          logger.error "killed #{thr.inspect} for being too old: #{age}"
-          return
-        end
-        # nothing to kill, yield to another thread
-        Thread.pass
-      end
-    end
-
-    def join_spawned_threads(threads)
-      logger.info "Joining spawned threads..."
-      t0 = Time.now
-      timeleft = timeout
-      threads.list.each { |thr|
-        thr.join(timeleft)
-        timeleft -= (Time.now - t0)
-      }
-      logger.info "Done joining spawned threads."
+        listen_loop_error(e) if LISTENERS.first
+      end while LISTENERS.first && master_pid == Process.ppid
+      join_threads(threads.list)
     end
 
   end