From 2dddf957462f2cdbd6f141f35e0292a70b62c5a6 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Thu, 5 Feb 2009 00:50:52 -0800 Subject: Remove threading and use worker processes instead All tests for threading and semaphores have been removed. One test was changed because it depended on a shared variable. Tests will be replaced with tests to do process management instead. --- Manifest | 3 - lib/unicorn.rb | 176 +++++++++++++++++++------------------------- lib/unicorn/http_request.rb | 4 +- lib/unicorn/semaphore.rb | 46 ------------ test/unit/test_semaphore.rb | 118 ----------------------------- test/unit/test_server.rb | 22 +----- test/unit/test_threading.rb | 74 ------------------- 7 files changed, 80 insertions(+), 363 deletions(-) delete mode 100644 lib/unicorn/semaphore.rb delete mode 100644 test/unit/test_semaphore.rb delete mode 100644 test/unit/test_threading.rb diff --git a/Manifest b/Manifest index 381fe72..e415b15 100644 --- a/Manifest +++ b/Manifest @@ -20,7 +20,6 @@ lib/unicorn/const.rb lib/unicorn/header_out.rb lib/unicorn/http_request.rb lib/unicorn/http_response.rb -lib/unicorn/semaphore.rb lib/unicorn/tcphack.rb setup.rb test/benchmark/previous.rb @@ -33,6 +32,4 @@ test/test_suite.rb test/tools/trickletest.rb test/unit/test_http_parser.rb test/unit/test_response.rb -test/unit/test_semaphore.rb test/unit/test_server.rb -test/unit/test_threading.rb diff --git a/lib/unicorn.rb b/lib/unicorn.rb index e43b676..b4721be 100644 --- a/lib/unicorn.rb +++ b/lib/unicorn.rb @@ -13,8 +13,6 @@ require 'logger' # Compiled extension require 'http11' -# Gem conditional loader -require 'thread' require 'rack' require 'unicorn/tcphack' @@ -22,7 +20,6 @@ require 'unicorn/const' require 'unicorn/http_request' require 'unicorn/header_out' require 'unicorn/http_response' -require 'unicorn/semaphore' # Unicorn module containing all of the classes (include C extensions) for running # a Unicorn web server. It contains a minimalist HTTP server with just enough @@ -37,46 +34,41 @@ module Unicorn end end - # Used to stop the HttpServer via Thread.raise. - class StopServer < Exception; end - - # Thrown at a thread when it is timed out. - class TimeoutError < Exception; end - - # Thrown by HttpServer#stop if the server is not started. - class AcceptorError < StandardError; end + # We do this to be compatible with the existing API + class WorkerTable < Hash + def join + begin + pid = Process.wait + self.delete(pid) + rescue Errno::ECHLD + return + end + end + end - # - # This is the main driver of Unicorn, while the Unicorn::HttpParser and Unicorn::URIClassifier - # make up the majority of how the server functions. It's a very simple class that just - # has a thread accepting connections and a simple HttpServer.process_client function - # to do the heavy lifting with the IO and Ruby. - # + # This is the main driver of Unicorn, while the Unicorn::HttpParser + # and make up the majority of how the server functions. It forks off + # :nr_workers and has the workers accepting connections on a shared + # socket and a simple HttpServer.process_client function to + # do the heavy lifting with the IO and Ruby. class HttpServer - attr_reader :acceptor, :workers, :logger, :host, :port, :timeout, :max_queued_threads, :max_concurrent_threads + attr_reader :workers, :logger, :host, :port, :timeout, :nr_workers DEFAULTS = { :timeout => 60, :host => '0.0.0.0', :port => 8080, :logger => Logger.new(STDERR), - :max_queued_threads => 12, - :max_concurrent_threads => 4 + :nr_workers => 1 } - # Creates a working server on host:port (strange things happen if port isn't a Number). - # Use HttpServer::run to start the server and HttpServer.acceptor.join to - # join the thread that's processing incoming requests on the socket. - # - # The max_queued_threads optional argument is the maximum number of concurrent - # processors to accept, anything over this is closed immediately to maintain - # server processing performance. This may seem mean but it is the most efficient - # way to deal with overload. Other schemes involve still parsing the client's request - # which defeats the point of an overload handling system. - # + # Creates a working server on host:port (strange things happen if + # port isn't a Number). Use HttpServer::run to start the server and + # HttpServer.workers.join to join the thread that's processing + # incoming requests on the socket. def initialize(app, options = {}) @app = app - @workers = ThreadGroup.new + @workers = WorkerTable.new (DEFAULTS.to_a + options.to_a).each do |key, value| instance_variable_set("@#{key.to_s.downcase}", value) @@ -84,6 +76,7 @@ module Unicorn @socket = TCPServer.new(@host, @port) @socket.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) if defined?(Fcntl::FD_CLOEXEC) + end # Does the majority of the IO processing. It has been written in Ruby using @@ -151,7 +144,7 @@ module Unicorn logger.error "HTTP parse error, malformed request (#{params[Const::HTTP_X_FORWARDED_FOR] || client.peeraddr.last}): #{e.inspect}" logger.error "REQUEST DATA: #{data.inspect}\n---\nPARAMS: #{params.inspect}\n---\n" rescue Errno::EMFILE - reap_dead_workers('too many files') + logger.error "too many files" rescue Object => e logger.error "Read error: #{e.inspect}" logger.error e.backtrace.join("\n") @@ -168,38 +161,6 @@ module Unicorn end end - # Used internally to kill off any worker threads that have taken too long - # to complete processing. Only called if there are too many processors - # currently servicing. It returns the count of workers still active - # after the reap is done. It only runs if there are workers to reap. - def reap_dead_workers(reason='unknown') - if @workers.list.length > 0 - logger.info "Reaping #{@workers.list.length} threads for slow workers because of '#{reason}'" - error_msg = "Unicorn timed out this thread: #{reason}" - mark = Time.now - @workers.list.each do |worker| - worker[:started_on] = Time.now if not worker[:started_on] - - if mark - worker[:started_on] > @timeout - logger.info "Thread #{worker.inspect} is too old, killing." - worker.raise(TimeoutError.new(error_msg)) - end - end - end - - return @workers.list.length - end - - # Performs a wait on all the currently running threads and kills any that take - # too long. It waits by @timeout seconds, which can be set in .initialize or - # via mongrel_rails. - def graceful_shutdown - while reap_dead_workers("shutdown") > 0 - logger.info "Waiting for #{@workers.list.length} requests to finish, could take #{@timeout} seconds." - sleep @timeout / 10 - end - end - def configure_socket_options case RUBY_PLATFORM when /linux/ @@ -217,68 +178,81 @@ module Unicorn end end - # Runs the thing. It returns the thread used so you can "join" it. You can also - # access the HttpServer::acceptor attribute to get the thread later. + # Runs the thing. Returns a hash keyed by pid with worker number values + # for which to wait on. Access the HttpServer.workers attribute + # to get this hash later. def start - semaphore = Semaphore.new(@max_concurrent_threads) BasicSocket.do_not_reverse_lookup = true - configure_socket_options - if defined?($tcp_defer_accept_opts) and $tcp_defer_accept_opts @socket.setsockopt(*$tcp_defer_accept_opts) rescue nil end - @acceptor = Thread.new do - begin - while true + (1..@nr_workers).each do |worker_nr| + pid = fork do + alive = true + trap('TERM') { exit 0 } + trap('QUIT') { alive = false; @socket.close rescue nil } + while alive begin client = @socket.accept if defined?($tcp_cork_opts) and $tcp_cork_opts client.setsockopt(*$tcp_cork_opts) rescue nil end - - worker_list = @workers.list - if worker_list.length >= @max_queued_threads - logger.error "Server overloaded with #{worker_list.length} processors (#@max_queued_threads max). Dropping connection." - client.close rescue nil - reap_dead_workers("max processors") - else - thread = Thread.new(client) {|c| semaphore.synchronize { process_client(c) } } - thread[:started_on] = Time.now - @workers.add(thread) - end - rescue StopServer - break + process_client(client) rescue Errno::EMFILE - reap_dead_workers("too many open files") + logger.error "too many open files" sleep 0.5 rescue Errno::ECONNABORTED # client closed the socket even before accept client.close rescue nil rescue Object => e - logger.error "Unhandled listen loop exception #{e.inspect}." - logger.error e.backtrace.join("\n") + if alive + logger.error "Unhandled listen loop exception #{e.inspect}." + logger.error e.backtrace.join("\n") + end end end - graceful_shutdown - ensure - @socket.close - logger.info "Closed socket." - end + exit 0 + end # fork + + @workers[pid] = worker_nr end - @acceptor + @workers + end + + # delivers a signal to each worker + def kill_each_worker(signal) + @workers.keys.each do |pid| + begin + Process.kill(signal, pid) + rescue Errno::ESRCH + @workers.delete(pid) + end + end end - # Stops the acceptor thread and then causes the worker threads to finish - # off the request queue before finally exiting. - def stop(synchronous = false) - raise AcceptorError, "Server was not started." unless @acceptor - @acceptor.raise(StopServer.new) - (sleep(0.5) while @acceptor.alive?) if synchronous - @acceptor = nil + # Terminates all workers + def stop(graceful = true) + old_chld_handler = trap('CHLD') do + pid = Process.waitpid(-1, Process::WNOHANG) and @workers.delete(pid) + end + + kill_each_worker(graceful ? 'QUIT' : 'TERM') + + timeleft = @timeout + until @workers.empty? + sleep(1) + (timeleft -= 1) > 0 and next + kill_each_worker('KILL') + end + + ensure + trap('CHLD', old_chld_handler) + @socket.close rescue nil end + end end diff --git a/lib/unicorn/http_request.rb b/lib/unicorn/http_request.rb index a76d4e0..f70f0de 100644 --- a/lib/unicorn/http_request.rb +++ b/lib/unicorn/http_request.rb @@ -54,8 +54,8 @@ module Unicorn "rack.input" => @body, "rack.errors" => STDERR, - "rack.multithread" => true, - "rack.multiprocess" => false, # ??? + "rack.multithread" => false, + "rack.multiprocess" => true, "rack.run_once" => false, "rack.url_scheme" => "http", diff --git a/lib/unicorn/semaphore.rb b/lib/unicorn/semaphore.rb deleted file mode 100644 index 1c0b87c..0000000 --- a/lib/unicorn/semaphore.rb +++ /dev/null @@ -1,46 +0,0 @@ -class Semaphore - def initialize(resource_count = 0) - @available_resource_count = resource_count - @mutex = Mutex.new - @waiting_threads = [] - end - - def wait - make_thread_wait unless resource_is_available - end - - def signal - schedule_waiting_thread if thread_is_waiting - end - - def synchronize - self.wait - yield - ensure - self.signal - end - - private - - def resource_is_available - @mutex.synchronize do - return (@available_resource_count -= 1) >= 0 - end - end - - def make_thread_wait - @waiting_threads << Thread.current - Thread.stop - end - - def thread_is_waiting - @mutex.synchronize do - return (@available_resource_count += 1) <= 0 - end - end - - def schedule_waiting_thread - thread = @waiting_threads.shift - thread.wakeup if thread - end -end diff --git a/test/unit/test_semaphore.rb b/test/unit/test_semaphore.rb deleted file mode 100644 index 115f159..0000000 --- a/test/unit/test_semaphore.rb +++ /dev/null @@ -1,118 +0,0 @@ -root_dir = File.join(File.dirname(__FILE__), "../..") -require File.join(root_dir, "test/test_helper") -require File.join(root_dir, "lib/unicorn/semaphore") - -class TestSemaphore < Test::Unit::TestCase - def setup - super - - @semaphore = Semaphore.new - end - - def test_wait_prevents_thread_from_running - thread = Thread.new { @semaphore.wait } - give_up_my_time_slice - - assert thread.stop? - end - - def test_signal_allows_waiting_thread_to_run - ran = false - thread = Thread.new { @semaphore.wait; ran = true } - give_up_my_time_slice - - @semaphore.signal - give_up_my_time_slice - - assert ran - end - - def test_wait_allows_only_specified_number_of_resources - @semaphore = Semaphore.new(1) - - run_count = 0 - thread1 = Thread.new { @semaphore.wait; run_count += 1 } - thread2 = Thread.new { @semaphore.wait; run_count += 1 } - give_up_my_time_slice - - assert_equal 1, run_count - end - - def test_semaphore_serializes_threads - @semaphore = Semaphore.new(1) - - result = "" - thread1 = Thread.new do - @semaphore.wait - 4.times do |i| - give_up_my_time_slice - result << i.to_s - end - @semaphore.signal - end - - thread2 = Thread.new do - @semaphore.wait - ("a".."d").each do |char| - give_up_my_time_slice - result << char - end - @semaphore.signal - end - - give_up_my_time_slice - @semaphore.wait - - assert_equal "0123abcd", result - end - - def test_synchronize_many_threads - @semaphore = Semaphore.new(1) - - result = [] - 5.times do |i| - Thread.new do - @semaphore.wait - 2.times { |j| result << [i, j] } - @semaphore.signal - end - end - - give_up_my_time_slice - @semaphore.wait - - 5.times do |i| - 2.times do |j| - assert_equal i, result[2 * i + j][0] - assert_equal j, result[2 * i + j][1] - end - end - end - - def test_synchronize_ensures_signal - @semaphore = Semaphore.new(1) - threads = [] - run_count = 0 - threads << Thread.new do - @semaphore.synchronize { run_count += 1 } - end - threads << Thread.new do - @semaphore.synchronize { run_count += 1; raise "I'm throwing an error." } - end - threads << Thread.new do - @semaphore.synchronize { run_count += 1 } - end - - give_up_my_time_slice - @semaphore.wait - - assert !threads.any? { |thread| thread.alive? } - assert_equal 3, run_count - end - - private - - def give_up_my_time_slice - sleep(1) - end -end diff --git a/test/unit/test_server.rb b/test/unit/test_server.rb index f5043b8..df57989 100644 --- a/test/unit/test_server.rb +++ b/test/unit/test_server.rb @@ -9,10 +9,8 @@ require 'test/test_helper' include Unicorn class TestHandler - attr_reader :ran_test def call(env) - @ran_test = true # response.socket.write("HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\n\r\nhello!\n") [200, { 'Content-Type' => 'text/plain' }, ['hello!\n']] end @@ -27,8 +25,7 @@ class WebServerTest < Test::Unit::TestCase @tester = TestHandler.new @app = Rack::URLMap.new('/test' => @tester) redirect_test_io do - # We set max_queued_threads=1 so that we can test the reaping code - @server = HttpServer.new(@app, :Host => "127.0.0.1", :Port => @port, :Max_queued_threads => 1) + @server = HttpServer.new(@app, :Host => "127.0.0.1", :Port => @port) end @server.start end @@ -40,8 +37,8 @@ class WebServerTest < Test::Unit::TestCase end def test_simple_server - hit(["http://localhost:#{@port}/test"]) - assert @tester.ran_test, "Handler didn't really run" + results = hit(["http://localhost:#{@port}/test"]) + assert_equal 'hello!\n', results[0], "Handler didn't really run" end @@ -90,19 +87,6 @@ class WebServerTest < Test::Unit::TestCase end end - def test_max_queued_threads_overload - redirect_test_io do - assert_raises Errno::ECONNRESET, Errno::EPIPE, Errno::ECONNABORTED, Errno::EINVAL, IOError do - tests = [ - Thread.new { do_test(@valid_request, 1) }, - Thread.new { do_test(@valid_request, 10) }, - ] - - tests.each {|t| t.join} - end - end - end - def test_file_streamed_request body = "a" * (Unicorn::Const::MAX_BODY * 2) long = "GET /test HTTP/1.1\r\nContent-length: #{body.length}\r\n\r\n" + body diff --git a/test/unit/test_threading.rb b/test/unit/test_threading.rb deleted file mode 100644 index 5551b53..0000000 --- a/test/unit/test_threading.rb +++ /dev/null @@ -1,74 +0,0 @@ -root_dir = File.join(File.dirname(__FILE__), "../..") -require File.join(root_dir, "test/test_helper") - -include Unicorn - -class FakeHandler - @@concurrent_threads = 0 - @@threads = 0 - - def self.max_concurrent_threads - @@threads ||= 0 - end - - def initialize - super - @@mutex = Mutex.new - end - - def call(env) - @@mutex.synchronize do - @@concurrent_threads += 1 # !!! same for += and -= - @@threads = [@@concurrent_threads, @@threads].max - end - - sleep(0.1) - [200, {'Content-Type' => 'text/plain'}, ['hello!']] - ensure - @@mutex.synchronize { @@concurrent_threads -= 1 } - end -end - -class ThreadingTest < Test::Unit::TestCase - def setup - @valid_request = "GET / HTTP/1.1\r\nHost: www.google.com\r\nContent-Type: text/plain\r\n\r\n" - @port = process_based_port - @app = Rack::URLMap.new('/test' => FakeHandler.new) - @threads = 4 - redirect_test_io { @server = HttpServer.new(@app, :Host => "127.0.0.1", :Port => @port, :Max_concurrent_threads => @threads) } - redirect_test_io { @server.start } - end - - def teardown - redirect_test_io { @server.stop(true) } - end - - def test_server_respects_max_concurrent_threads_option - threads = [] - (@threads * 3).times do - threads << Thread.new do - send_data_over_socket("GET /test HTTP/1.1\r\nHost: localhost\r\nContent-Type: text/plain\r\n\r\n") - end - end - while threads.any? { |thread| thread.alive? } - sleep(0) - end - assert_equal @threads, FakeHandler.max_concurrent_threads - end - - private - - def send_data_over_socket(string) - socket = TCPSocket.new("127.0.0.1", @port) - request = StringIO.new(string) - - while data = request.read(8) - socket.write(data) - socket.flush - sleep(0) - end - sleep(0) - socket.write(" ") # Some platforms only raise the exception on attempted write - socket.flush - end -end -- cgit v1.2.3-24-ge0c7