about summary refs log tree commit homepage
path: root/lib/unicorn.rb
diff options
context:
space:
mode:
Diffstat (limited to 'lib/unicorn.rb')
-rw-r--r--lib/unicorn.rb284
1 files changed, 284 insertions, 0 deletions
diff --git a/lib/unicorn.rb b/lib/unicorn.rb
new file mode 100644
index 0000000..e43b676
--- /dev/null
+++ b/lib/unicorn.rb
@@ -0,0 +1,284 @@
+
+# Standard libraries
+require 'socket'
+require 'tempfile'
+require 'yaml'
+require 'time'
+require 'etc'
+require 'uri'
+require 'stringio'
+require 'fcntl'
+require 'logger'
+
+# Compiled extension
+require 'http11'
+
+# Gem conditional loader
+require 'thread'
+require 'rack'
+
+require 'unicorn/tcphack'
+require 'unicorn/const'
+require 'unicorn/http_request'
+require 'unicorn/header_out'
+require 'unicorn/http_response'
+require 'unicorn/semaphore'
+
+# Unicorn module containing all of the classes (include C extensions) for running
+# a Unicorn web server.  It contains a minimalist HTTP server with just enough
+# functionality to service web application requests fast as possible.
+module Unicorn
+  class << self
+    # A logger instance that conforms to the API of stdlib's Logger.
+    attr_accessor :logger
+    
+    def run(app, options = {})
+      HttpServer.new(app, options).start.join
+    end
+  end
+
+  # Used to stop the HttpServer via Thread.raise.
+  class StopServer < Exception; end
+
+  # Thrown at a thread when it is timed out.
+  class TimeoutError < Exception; end
+
+  # Thrown by HttpServer#stop if the server is not started.
+  class AcceptorError < StandardError; end
+
+  #
+  # This is the main driver of Unicorn, while the Unicorn::HttpParser and Unicorn::URIClassifier
+  # make up the majority of how the server functions.  It's a very simple class that just
+  # has a thread accepting connections and a simple HttpServer.process_client function
+  # to do the heavy lifting with the IO and Ruby.  
+  #
+  class HttpServer
+    attr_reader :acceptor, :workers, :logger, :host, :port, :timeout, :max_queued_threads, :max_concurrent_threads
+    
+    DEFAULTS = {
+      :timeout => 60,
+      :host => '0.0.0.0',
+      :port => 8080,
+      :logger => Logger.new(STDERR),
+      :max_queued_threads => 12,
+      :max_concurrent_threads => 4
+    }
+
+    # Creates a working server on host:port (strange things happen if port isn't a Number).
+    # Use HttpServer::run to start the server and HttpServer.acceptor.join to
+    # join the thread that's processing incoming requests on the socket.
+    #
+    # The max_queued_threads optional argument is the maximum number of concurrent
+    # processors to accept, anything over this is closed immediately to maintain
+    # server processing performance.  This may seem mean but it is the most efficient
+    # way to deal with overload.  Other schemes involve still parsing the client's request
+    # which defeats the point of an overload handling system.
+    #
+    def initialize(app, options = {})
+      @app = app      
+      @workers = ThreadGroup.new
+      
+      (DEFAULTS.to_a + options.to_a).each do |key, value|
+        instance_variable_set("@#{key.to_s.downcase}", value)
+      end
+
+      @socket = TCPServer.new(@host, @port)      
+      @socket.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) if defined?(Fcntl::FD_CLOEXEC)
+    end
+
+    # Does the majority of the IO processing.  It has been written in Ruby using
+    # about 7 different IO processing strategies and no matter how it's done
+    # the performance just does not improve.  It is currently carefully constructed
+    # to make sure that it gets the best possible performance, but anyone who
+    # thinks they can make it faster is more than welcome to take a crack at it.
+    def process_client(client)
+      begin
+        parser = HttpParser.new
+        params = Hash.new
+        request = nil
+        data = client.readpartial(Const::CHUNK_SIZE)
+        nparsed = 0
+
+        # Assumption: nparsed will always be less since data will get filled with more
+        # after each parsing.  If it doesn't get more then there was a problem
+        # with the read operation on the client socket.  Effect is to stop processing when the
+        # socket can't fill the buffer for further parsing.
+        while nparsed < data.length
+          nparsed = parser.execute(params, data, nparsed)
+
+          if parser.finished?
+            if !params[Const::REQUEST_PATH]
+              # It might be a dumbass full host request header
+              uri = URI.parse(params[Const::REQUEST_URI])
+              params[Const::REQUEST_PATH] = uri.path
+            end
+
+            raise "No REQUEST PATH" if !params[Const::REQUEST_PATH]
+
+            params[Const::PATH_INFO] = params[Const::REQUEST_PATH]
+            params[Const::SCRIPT_NAME] = Const::SLASH
+
+            # From http://www.ietf.org/rfc/rfc3875 :
+            # "Script authors should be aware that the REMOTE_ADDR and REMOTE_HOST
+            #  meta-variables (see sections 4.1.8 and 4.1.9) may not identify the
+            #  ultimate source of the request.  They identify the client for the
+            #  immediate request to the server; that client may be a proxy, gateway,
+            #  or other intermediary acting on behalf of the actual source client."
+            params[Const::REMOTE_ADDR] = client.peeraddr.last
+
+            # Select handlers that want more detailed request notification
+            request = HttpRequest.new(params, client, logger)
+
+            # in the case of large file uploads the user could close the socket, so skip those requests
+            break if request.body == nil  # nil signals from HttpRequest::initialize that the request was aborted
+            app_response = @app.call(request.env)
+            response = HttpResponse.new(client, app_response).start
+          break #done
+          else
+            # Parser is not done, queue up more data to read and continue parsing
+            chunk = client.readpartial(Const::CHUNK_SIZE)
+            break if !chunk or chunk.length == 0  # read failed, stop processing
+
+            data << chunk
+            if data.length >= Const::MAX_HEADER
+              raise HttpParserError.new("HEADER is longer than allowed, aborting client early.")
+            end
+          end
+        end
+      rescue EOFError,Errno::ECONNRESET,Errno::EPIPE,Errno::EINVAL,Errno::EBADF
+        client.close rescue nil
+      rescue HttpParserError => e
+        logger.error "HTTP parse error, malformed request (#{params[Const::HTTP_X_FORWARDED_FOR] || client.peeraddr.last}): #{e.inspect}"
+        logger.error "REQUEST DATA: #{data.inspect}\n---\nPARAMS: #{params.inspect}\n---\n"
+      rescue Errno::EMFILE
+        reap_dead_workers('too many files')
+      rescue Object => e
+        logger.error "Read error: #{e.inspect}"
+        logger.error e.backtrace.join("\n")
+      ensure
+        begin
+          client.close
+        rescue IOError
+          # Already closed
+        rescue Object => e
+          logger.error "Client error: #{e.inspect}"
+          logger.error e.backtrace.join("\n")
+        end
+        request.body.close! if request and request.body.class == Tempfile
+      end
+    end
+
+    # Used internally to kill off any worker threads that have taken too long
+    # to complete processing.  Only called if there are too many processors
+    # currently servicing.  It returns the count of workers still active
+    # after the reap is done.  It only runs if there are workers to reap.
+    def reap_dead_workers(reason='unknown')
+      if @workers.list.length > 0
+        logger.info "Reaping #{@workers.list.length} threads for slow workers because of '#{reason}'"
+        error_msg = "Unicorn timed out this thread: #{reason}"
+        mark = Time.now
+        @workers.list.each do |worker|
+          worker[:started_on] = Time.now if not worker[:started_on]
+
+          if mark - worker[:started_on] > @timeout
+            logger.info "Thread #{worker.inspect} is too old, killing."
+            worker.raise(TimeoutError.new(error_msg))
+          end
+        end
+      end
+
+      return @workers.list.length
+    end
+
+    # Performs a wait on all the currently running threads and kills any that take
+    # too long.  It waits by @timeout seconds, which can be set in .initialize or
+    # via mongrel_rails.
+    def graceful_shutdown
+      while reap_dead_workers("shutdown") > 0
+        logger.info "Waiting for #{@workers.list.length} requests to finish, could take #{@timeout} seconds."
+        sleep @timeout / 10
+      end
+    end
+
+    def configure_socket_options
+      case RUBY_PLATFORM
+      when /linux/
+        # 9 is currently TCP_DEFER_ACCEPT
+        $tcp_defer_accept_opts = [Socket::SOL_TCP, 9, 1]
+        $tcp_cork_opts = [Socket::SOL_TCP, 3, 1]
+      when /freebsd(([1-4]\..{1,2})|5\.[0-4])/
+        # Do nothing, just closing a bug when freebsd <= 5.4
+      when /freebsd/
+        # Use the HTTP accept filter if available.
+        # The struct made by pack() is defined in /usr/include/sys/socket.h as accept_filter_arg
+        unless `/sbin/sysctl -nq net.inet.accf.http`.empty?
+          $tcp_defer_accept_opts = [Socket::SOL_SOCKET, Socket::SO_ACCEPTFILTER, ['httpready', nil].pack('a16a240')]
+        end
+      end
+    end
+    
+    # Runs the thing.  It returns the thread used so you can "join" it.  You can also
+    # access the HttpServer::acceptor attribute to get the thread later.
+    def start
+      semaphore = Semaphore.new(@max_concurrent_threads)
+      BasicSocket.do_not_reverse_lookup = true
+
+      configure_socket_options
+
+      if defined?($tcp_defer_accept_opts) and $tcp_defer_accept_opts
+        @socket.setsockopt(*$tcp_defer_accept_opts) rescue nil
+      end
+
+      @acceptor = Thread.new do
+        begin
+          while true
+            begin
+              client = @socket.accept
+  
+              if defined?($tcp_cork_opts) and $tcp_cork_opts
+                client.setsockopt(*$tcp_cork_opts) rescue nil
+              end
+  
+              worker_list = @workers.list
+              if worker_list.length >= @max_queued_threads
+                logger.error "Server overloaded with #{worker_list.length} processors (#@max_queued_threads max). Dropping connection."
+                client.close rescue nil
+                reap_dead_workers("max processors")
+              else
+                thread = Thread.new(client) {|c| semaphore.synchronize { process_client(c) } }
+                thread[:started_on] = Time.now
+                @workers.add(thread)
+              end
+            rescue StopServer
+              break
+            rescue Errno::EMFILE
+              reap_dead_workers("too many open files")
+              sleep 0.5
+            rescue Errno::ECONNABORTED
+              # client closed the socket even before accept
+              client.close rescue nil
+            rescue Object => e
+              logger.error "Unhandled listen loop exception #{e.inspect}."
+              logger.error e.backtrace.join("\n")
+            end
+          end
+          graceful_shutdown
+        ensure
+          @socket.close
+          logger.info "Closed socket."
+        end
+      end
+
+      @acceptor
+    end
+
+    # Stops the acceptor thread and then causes the worker threads to finish
+    # off the request queue before finally exiting.
+    def stop(synchronous = false)
+      raise AcceptorError, "Server was not started." unless @acceptor
+      @acceptor.raise(StopServer.new)
+      (sleep(0.5) while @acceptor.alive?) if synchronous
+      @acceptor = nil
+    end
+  end
+end