about summary refs log tree commit homepage
path: root/lib
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2009-02-05 00:50:52 -0800
committerEric Wong <normalperson@yhbt.net>2009-02-09 19:50:37 -0800
commit2dddf957462f2cdbd6f141f35e0292a70b62c5a6 (patch)
tree2159a3233048cd84e7147572380f090ad22491a3 /lib
parent28d571b7cca709641d964e00e6004facb6bfcc7e (diff)
downloadunicorn-2dddf957462f2cdbd6f141f35e0292a70b62c5a6.tar.gz
All tests for threading and semaphores have been removed.  One
test was changed because it depended on a shared variable.

Tests will be replaced with tests to do process management
instead.
Diffstat (limited to 'lib')
-rw-r--r--lib/unicorn.rb176
-rw-r--r--lib/unicorn/http_request.rb4
-rw-r--r--lib/unicorn/semaphore.rb46
3 files changed, 77 insertions, 149 deletions
diff --git a/lib/unicorn.rb b/lib/unicorn.rb
index e43b676..b4721be 100644
--- a/lib/unicorn.rb
+++ b/lib/unicorn.rb
@@ -13,8 +13,6 @@ require 'logger'
 # Compiled extension
 require 'http11'
 
-# Gem conditional loader
-require 'thread'
 require 'rack'
 
 require 'unicorn/tcphack'
@@ -22,7 +20,6 @@ require 'unicorn/const'
 require 'unicorn/http_request'
 require 'unicorn/header_out'
 require 'unicorn/http_response'
-require 'unicorn/semaphore'
 
 # Unicorn module containing all of the classes (include C extensions) for running
 # a Unicorn web server.  It contains a minimalist HTTP server with just enough
@@ -37,46 +34,41 @@ module Unicorn
     end
   end
 
-  # Used to stop the HttpServer via Thread.raise.
-  class StopServer < Exception; end
-
-  # Thrown at a thread when it is timed out.
-  class TimeoutError < Exception; end
-
-  # Thrown by HttpServer#stop if the server is not started.
-  class AcceptorError < StandardError; end
+  # We do this to be compatible with the existing API
+  class WorkerTable < Hash
+    def join
+      begin
+        pid = Process.wait
+        self.delete(pid)
+      rescue Errno::ECHLD
+        return
+      end
+    end
+  end
 
-  #
-  # This is the main driver of Unicorn, while the Unicorn::HttpParser and Unicorn::URIClassifier
-  # make up the majority of how the server functions.  It's a very simple class that just
-  # has a thread accepting connections and a simple HttpServer.process_client function
-  # to do the heavy lifting with the IO and Ruby.  
-  #
+  # This is the main driver of Unicorn, while the Unicorn::HttpParser
+  # and make up the majority of how the server functions.  It forks off
+  # :nr_workers and has the workers accepting connections on a shared
+  # socket and a simple HttpServer.process_client function to
+  # do the heavy lifting with the IO and Ruby.
   class HttpServer
-    attr_reader :acceptor, :workers, :logger, :host, :port, :timeout, :max_queued_threads, :max_concurrent_threads
+    attr_reader :workers, :logger, :host, :port, :timeout, :nr_workers
     
     DEFAULTS = {
       :timeout => 60,
       :host => '0.0.0.0',
       :port => 8080,
       :logger => Logger.new(STDERR),
-      :max_queued_threads => 12,
-      :max_concurrent_threads => 4
+      :nr_workers => 1
     }
 
-    # Creates a working server on host:port (strange things happen if port isn't a Number).
-    # Use HttpServer::run to start the server and HttpServer.acceptor.join to
-    # join the thread that's processing incoming requests on the socket.
-    #
-    # The max_queued_threads optional argument is the maximum number of concurrent
-    # processors to accept, anything over this is closed immediately to maintain
-    # server processing performance.  This may seem mean but it is the most efficient
-    # way to deal with overload.  Other schemes involve still parsing the client's request
-    # which defeats the point of an overload handling system.
-    #
+    # Creates a working server on host:port (strange things happen if
+    # port isn't a Number).  Use HttpServer::run to start the server and
+    # HttpServer.workers.join to join the thread that's processing
+    # incoming requests on the socket.
     def initialize(app, options = {})
       @app = app      
-      @workers = ThreadGroup.new
+      @workers = WorkerTable.new
       
       (DEFAULTS.to_a + options.to_a).each do |key, value|
         instance_variable_set("@#{key.to_s.downcase}", value)
@@ -84,6 +76,7 @@ module Unicorn
 
       @socket = TCPServer.new(@host, @port)      
       @socket.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) if defined?(Fcntl::FD_CLOEXEC)
+
     end
 
     # Does the majority of the IO processing.  It has been written in Ruby using
@@ -151,7 +144,7 @@ module Unicorn
         logger.error "HTTP parse error, malformed request (#{params[Const::HTTP_X_FORWARDED_FOR] || client.peeraddr.last}): #{e.inspect}"
         logger.error "REQUEST DATA: #{data.inspect}\n---\nPARAMS: #{params.inspect}\n---\n"
       rescue Errno::EMFILE
-        reap_dead_workers('too many files')
+        logger.error "too many files"
       rescue Object => e
         logger.error "Read error: #{e.inspect}"
         logger.error e.backtrace.join("\n")
@@ -168,38 +161,6 @@ module Unicorn
       end
     end
 
-    # Used internally to kill off any worker threads that have taken too long
-    # to complete processing.  Only called if there are too many processors
-    # currently servicing.  It returns the count of workers still active
-    # after the reap is done.  It only runs if there are workers to reap.
-    def reap_dead_workers(reason='unknown')
-      if @workers.list.length > 0
-        logger.info "Reaping #{@workers.list.length} threads for slow workers because of '#{reason}'"
-        error_msg = "Unicorn timed out this thread: #{reason}"
-        mark = Time.now
-        @workers.list.each do |worker|
-          worker[:started_on] = Time.now if not worker[:started_on]
-
-          if mark - worker[:started_on] > @timeout
-            logger.info "Thread #{worker.inspect} is too old, killing."
-            worker.raise(TimeoutError.new(error_msg))
-          end
-        end
-      end
-
-      return @workers.list.length
-    end
-
-    # Performs a wait on all the currently running threads and kills any that take
-    # too long.  It waits by @timeout seconds, which can be set in .initialize or
-    # via mongrel_rails.
-    def graceful_shutdown
-      while reap_dead_workers("shutdown") > 0
-        logger.info "Waiting for #{@workers.list.length} requests to finish, could take #{@timeout} seconds."
-        sleep @timeout / 10
-      end
-    end
-
     def configure_socket_options
       case RUBY_PLATFORM
       when /linux/
@@ -217,68 +178,81 @@ module Unicorn
       end
     end
     
-    # Runs the thing.  It returns the thread used so you can "join" it.  You can also
-    # access the HttpServer::acceptor attribute to get the thread later.
+    # Runs the thing.  Returns a hash keyed by pid with worker number values
+    # for which to wait on.  Access the HttpServer.workers attribute
+    # to get this hash later.
     def start
-      semaphore = Semaphore.new(@max_concurrent_threads)
       BasicSocket.do_not_reverse_lookup = true
-
       configure_socket_options
-
       if defined?($tcp_defer_accept_opts) and $tcp_defer_accept_opts
         @socket.setsockopt(*$tcp_defer_accept_opts) rescue nil
       end
 
-      @acceptor = Thread.new do
-        begin
-          while true
+      (1..@nr_workers).each do |worker_nr|
+        pid = fork do
+          alive = true
+          trap('TERM') { exit 0 }
+          trap('QUIT') { alive = false; @socket.close rescue nil }
+          while alive
             begin
               client = @socket.accept
   
               if defined?($tcp_cork_opts) and $tcp_cork_opts
                 client.setsockopt(*$tcp_cork_opts) rescue nil
               end
-  
-              worker_list = @workers.list
-              if worker_list.length >= @max_queued_threads
-                logger.error "Server overloaded with #{worker_list.length} processors (#@max_queued_threads max). Dropping connection."
-                client.close rescue nil
-                reap_dead_workers("max processors")
-              else
-                thread = Thread.new(client) {|c| semaphore.synchronize { process_client(c) } }
-                thread[:started_on] = Time.now
-                @workers.add(thread)
-              end
-            rescue StopServer
-              break
+              process_client(client)
             rescue Errno::EMFILE
-              reap_dead_workers("too many open files")
+              logger.error "too many open files"
               sleep 0.5
             rescue Errno::ECONNABORTED
               # client closed the socket even before accept
               client.close rescue nil
             rescue Object => e
-              logger.error "Unhandled listen loop exception #{e.inspect}."
-              logger.error e.backtrace.join("\n")
+              if alive
+                logger.error "Unhandled listen loop exception #{e.inspect}."
+                logger.error e.backtrace.join("\n")
+              end
             end
           end
-          graceful_shutdown
-        ensure
-          @socket.close
-          logger.info "Closed socket."
-        end
+          exit 0
+        end # fork
+
+        @workers[pid] = worker_nr
       end
 
-      @acceptor
+      @workers
+    end
+
+    # delivers a signal to each worker
+    def kill_each_worker(signal)
+      @workers.keys.each do |pid|
+        begin
+          Process.kill(signal, pid)
+        rescue Errno::ESRCH
+          @workers.delete(pid)
+        end
+      end
     end
 
-    # Stops the acceptor thread and then causes the worker threads to finish
-    # off the request queue before finally exiting.
-    def stop(synchronous = false)
-      raise AcceptorError, "Server was not started." unless @acceptor
-      @acceptor.raise(StopServer.new)
-      (sleep(0.5) while @acceptor.alive?) if synchronous
-      @acceptor = nil
+    # Terminates all workers
+    def stop(graceful = true)
+      old_chld_handler = trap('CHLD') do
+        pid = Process.waitpid(-1, Process::WNOHANG) and @workers.delete(pid)
+      end
+
+      kill_each_worker(graceful ? 'QUIT' : 'TERM')
+
+      timeleft = @timeout
+      until @workers.empty?
+        sleep(1)
+        (timeleft -= 1) > 0 and next
+        kill_each_worker('KILL')
+      end
+
+    ensure
+      trap('CHLD', old_chld_handler)
+      @socket.close rescue nil
     end
+
   end
 end
diff --git a/lib/unicorn/http_request.rb b/lib/unicorn/http_request.rb
index a76d4e0..f70f0de 100644
--- a/lib/unicorn/http_request.rb
+++ b/lib/unicorn/http_request.rb
@@ -54,8 +54,8 @@ module Unicorn
               "rack.input" => @body,
               "rack.errors" => STDERR,
 
-              "rack.multithread" => true,
-              "rack.multiprocess" => false, # ???
+              "rack.multithread" => false,
+              "rack.multiprocess" => true,
               "rack.run_once" => false,
 
               "rack.url_scheme" => "http",
diff --git a/lib/unicorn/semaphore.rb b/lib/unicorn/semaphore.rb
deleted file mode 100644
index 1c0b87c..0000000
--- a/lib/unicorn/semaphore.rb
+++ /dev/null
@@ -1,46 +0,0 @@
-class Semaphore
-  def initialize(resource_count = 0)
-    @available_resource_count = resource_count
-    @mutex = Mutex.new
-    @waiting_threads = []
-  end
-  
-  def wait
-    make_thread_wait unless resource_is_available
-  end
-  
-  def signal
-    schedule_waiting_thread if thread_is_waiting
-  end
-  
-  def synchronize
-    self.wait
-    yield
-  ensure
-    self.signal
-  end
-  
-  private
-  
-  def resource_is_available
-    @mutex.synchronize do
-      return (@available_resource_count -= 1) >= 0
-    end
-  end
-  
-  def make_thread_wait
-    @waiting_threads << Thread.current
-    Thread.stop  
-  end
-  
-  def thread_is_waiting
-    @mutex.synchronize do
-      return (@available_resource_count += 1) <= 0
-    end
-  end
-  
-  def schedule_waiting_thread
-    thread = @waiting_threads.shift
-    thread.wakeup if thread
-  end
-end