From 2dddf957462f2cdbd6f141f35e0292a70b62c5a6 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Thu, 5 Feb 2009 00:50:52 -0800 Subject: Remove threading and use worker processes instead All tests for threading and semaphores have been removed. One test was changed because it depended on a shared variable. Tests will be replaced with tests to do process management instead. --- lib/unicorn.rb | 176 +++++++++++++++++++------------------------- lib/unicorn/http_request.rb | 4 +- lib/unicorn/semaphore.rb | 46 ------------ 3 files changed, 77 insertions(+), 149 deletions(-) delete mode 100644 lib/unicorn/semaphore.rb (limited to 'lib') diff --git a/lib/unicorn.rb b/lib/unicorn.rb index e43b676..b4721be 100644 --- a/lib/unicorn.rb +++ b/lib/unicorn.rb @@ -13,8 +13,6 @@ require 'logger' # Compiled extension require 'http11' -# Gem conditional loader -require 'thread' require 'rack' require 'unicorn/tcphack' @@ -22,7 +20,6 @@ require 'unicorn/const' require 'unicorn/http_request' require 'unicorn/header_out' require 'unicorn/http_response' -require 'unicorn/semaphore' # Unicorn module containing all of the classes (include C extensions) for running # a Unicorn web server. It contains a minimalist HTTP server with just enough @@ -37,46 +34,41 @@ module Unicorn end end - # Used to stop the HttpServer via Thread.raise. - class StopServer < Exception; end - - # Thrown at a thread when it is timed out. - class TimeoutError < Exception; end - - # Thrown by HttpServer#stop if the server is not started. - class AcceptorError < StandardError; end + # We do this to be compatible with the existing API + class WorkerTable < Hash + def join + begin + pid = Process.wait + self.delete(pid) + rescue Errno::ECHLD + return + end + end + end - # - # This is the main driver of Unicorn, while the Unicorn::HttpParser and Unicorn::URIClassifier - # make up the majority of how the server functions. It's a very simple class that just - # has a thread accepting connections and a simple HttpServer.process_client function - # to do the heavy lifting with the IO and Ruby. - # + # This is the main driver of Unicorn, while the Unicorn::HttpParser + # and make up the majority of how the server functions. It forks off + # :nr_workers and has the workers accepting connections on a shared + # socket and a simple HttpServer.process_client function to + # do the heavy lifting with the IO and Ruby. class HttpServer - attr_reader :acceptor, :workers, :logger, :host, :port, :timeout, :max_queued_threads, :max_concurrent_threads + attr_reader :workers, :logger, :host, :port, :timeout, :nr_workers DEFAULTS = { :timeout => 60, :host => '0.0.0.0', :port => 8080, :logger => Logger.new(STDERR), - :max_queued_threads => 12, - :max_concurrent_threads => 4 + :nr_workers => 1 } - # Creates a working server on host:port (strange things happen if port isn't a Number). - # Use HttpServer::run to start the server and HttpServer.acceptor.join to - # join the thread that's processing incoming requests on the socket. - # - # The max_queued_threads optional argument is the maximum number of concurrent - # processors to accept, anything over this is closed immediately to maintain - # server processing performance. This may seem mean but it is the most efficient - # way to deal with overload. Other schemes involve still parsing the client's request - # which defeats the point of an overload handling system. - # + # Creates a working server on host:port (strange things happen if + # port isn't a Number). Use HttpServer::run to start the server and + # HttpServer.workers.join to join the thread that's processing + # incoming requests on the socket. def initialize(app, options = {}) @app = app - @workers = ThreadGroup.new + @workers = WorkerTable.new (DEFAULTS.to_a + options.to_a).each do |key, value| instance_variable_set("@#{key.to_s.downcase}", value) @@ -84,6 +76,7 @@ module Unicorn @socket = TCPServer.new(@host, @port) @socket.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) if defined?(Fcntl::FD_CLOEXEC) + end # Does the majority of the IO processing. It has been written in Ruby using @@ -151,7 +144,7 @@ module Unicorn logger.error "HTTP parse error, malformed request (#{params[Const::HTTP_X_FORWARDED_FOR] || client.peeraddr.last}): #{e.inspect}" logger.error "REQUEST DATA: #{data.inspect}\n---\nPARAMS: #{params.inspect}\n---\n" rescue Errno::EMFILE - reap_dead_workers('too many files') + logger.error "too many files" rescue Object => e logger.error "Read error: #{e.inspect}" logger.error e.backtrace.join("\n") @@ -168,38 +161,6 @@ module Unicorn end end - # Used internally to kill off any worker threads that have taken too long - # to complete processing. Only called if there are too many processors - # currently servicing. It returns the count of workers still active - # after the reap is done. It only runs if there are workers to reap. - def reap_dead_workers(reason='unknown') - if @workers.list.length > 0 - logger.info "Reaping #{@workers.list.length} threads for slow workers because of '#{reason}'" - error_msg = "Unicorn timed out this thread: #{reason}" - mark = Time.now - @workers.list.each do |worker| - worker[:started_on] = Time.now if not worker[:started_on] - - if mark - worker[:started_on] > @timeout - logger.info "Thread #{worker.inspect} is too old, killing." - worker.raise(TimeoutError.new(error_msg)) - end - end - end - - return @workers.list.length - end - - # Performs a wait on all the currently running threads and kills any that take - # too long. It waits by @timeout seconds, which can be set in .initialize or - # via mongrel_rails. - def graceful_shutdown - while reap_dead_workers("shutdown") > 0 - logger.info "Waiting for #{@workers.list.length} requests to finish, could take #{@timeout} seconds." - sleep @timeout / 10 - end - end - def configure_socket_options case RUBY_PLATFORM when /linux/ @@ -217,68 +178,81 @@ module Unicorn end end - # Runs the thing. It returns the thread used so you can "join" it. You can also - # access the HttpServer::acceptor attribute to get the thread later. + # Runs the thing. Returns a hash keyed by pid with worker number values + # for which to wait on. Access the HttpServer.workers attribute + # to get this hash later. def start - semaphore = Semaphore.new(@max_concurrent_threads) BasicSocket.do_not_reverse_lookup = true - configure_socket_options - if defined?($tcp_defer_accept_opts) and $tcp_defer_accept_opts @socket.setsockopt(*$tcp_defer_accept_opts) rescue nil end - @acceptor = Thread.new do - begin - while true + (1..@nr_workers).each do |worker_nr| + pid = fork do + alive = true + trap('TERM') { exit 0 } + trap('QUIT') { alive = false; @socket.close rescue nil } + while alive begin client = @socket.accept if defined?($tcp_cork_opts) and $tcp_cork_opts client.setsockopt(*$tcp_cork_opts) rescue nil end - - worker_list = @workers.list - if worker_list.length >= @max_queued_threads - logger.error "Server overloaded with #{worker_list.length} processors (#@max_queued_threads max). Dropping connection." - client.close rescue nil - reap_dead_workers("max processors") - else - thread = Thread.new(client) {|c| semaphore.synchronize { process_client(c) } } - thread[:started_on] = Time.now - @workers.add(thread) - end - rescue StopServer - break + process_client(client) rescue Errno::EMFILE - reap_dead_workers("too many open files") + logger.error "too many open files" sleep 0.5 rescue Errno::ECONNABORTED # client closed the socket even before accept client.close rescue nil rescue Object => e - logger.error "Unhandled listen loop exception #{e.inspect}." - logger.error e.backtrace.join("\n") + if alive + logger.error "Unhandled listen loop exception #{e.inspect}." + logger.error e.backtrace.join("\n") + end end end - graceful_shutdown - ensure - @socket.close - logger.info "Closed socket." - end + exit 0 + end # fork + + @workers[pid] = worker_nr end - @acceptor + @workers + end + + # delivers a signal to each worker + def kill_each_worker(signal) + @workers.keys.each do |pid| + begin + Process.kill(signal, pid) + rescue Errno::ESRCH + @workers.delete(pid) + end + end end - # Stops the acceptor thread and then causes the worker threads to finish - # off the request queue before finally exiting. - def stop(synchronous = false) - raise AcceptorError, "Server was not started." unless @acceptor - @acceptor.raise(StopServer.new) - (sleep(0.5) while @acceptor.alive?) if synchronous - @acceptor = nil + # Terminates all workers + def stop(graceful = true) + old_chld_handler = trap('CHLD') do + pid = Process.waitpid(-1, Process::WNOHANG) and @workers.delete(pid) + end + + kill_each_worker(graceful ? 'QUIT' : 'TERM') + + timeleft = @timeout + until @workers.empty? + sleep(1) + (timeleft -= 1) > 0 and next + kill_each_worker('KILL') + end + + ensure + trap('CHLD', old_chld_handler) + @socket.close rescue nil end + end end diff --git a/lib/unicorn/http_request.rb b/lib/unicorn/http_request.rb index a76d4e0..f70f0de 100644 --- a/lib/unicorn/http_request.rb +++ b/lib/unicorn/http_request.rb @@ -54,8 +54,8 @@ module Unicorn "rack.input" => @body, "rack.errors" => STDERR, - "rack.multithread" => true, - "rack.multiprocess" => false, # ??? + "rack.multithread" => false, + "rack.multiprocess" => true, "rack.run_once" => false, "rack.url_scheme" => "http", diff --git a/lib/unicorn/semaphore.rb b/lib/unicorn/semaphore.rb deleted file mode 100644 index 1c0b87c..0000000 --- a/lib/unicorn/semaphore.rb +++ /dev/null @@ -1,46 +0,0 @@ -class Semaphore - def initialize(resource_count = 0) - @available_resource_count = resource_count - @mutex = Mutex.new - @waiting_threads = [] - end - - def wait - make_thread_wait unless resource_is_available - end - - def signal - schedule_waiting_thread if thread_is_waiting - end - - def synchronize - self.wait - yield - ensure - self.signal - end - - private - - def resource_is_available - @mutex.synchronize do - return (@available_resource_count -= 1) >= 0 - end - end - - def make_thread_wait - @waiting_threads << Thread.current - Thread.stop - end - - def thread_is_waiting - @mutex.synchronize do - return (@available_resource_count += 1) <= 0 - end - end - - def schedule_waiting_thread - thread = @waiting_threads.shift - thread.wakeup if thread - end -end -- cgit v1.2.3-24-ge0c7