unicorn.git  about / heads / tags
Rack HTTP server for Unix and fast clients
blob dceb1382a84d9fe41e4b899f0fc97340f754658c 11083 bytes (raw)
$ git show v0.0.0:lib/mongrel.rb	# shows this blob on the CLI

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
 

# Standard libraries
require 'socket'
require 'tempfile'
require 'yaml'
require 'time'
require 'etc'
require 'uri'
require 'stringio'
require 'fcntl'
require 'logger'

# Compiled Mongrel extension
require 'http11'

# Gem conditional loader
require 'thread'
require 'rack'

# Ruby Mongrel
require 'mongrel/tcphack'
require 'mongrel/const'
require 'mongrel/http_request'
require 'mongrel/header_out'
require 'mongrel/http_response'
require 'mongrel/semaphore'

# Mongrel module containing all of the classes (include C extensions) for running
# a Mongrel web server.  It contains a minimalist HTTP server with just enough
# functionality to service web application requests fast as possible.
module Mongrel
  class << self
    # A logger instance that conforms to the API of stdlib's Logger.
    attr_accessor :logger
    
    def run(app, options = {})
      HttpServer.new(app, options).start.join
    end
  end

  # Used to stop the HttpServer via Thread.raise.
  class StopServer < Exception; end

  # Thrown at a thread when it is timed out.
  class TimeoutError < Exception; end

  # Thrown by HttpServer#stop if the server is not started.
  class AcceptorError < StandardError; end

  #
  # This is the main driver of Mongrel, while the Mongrel::HttpParser and Mongrel::URIClassifier
  # make up the majority of how the server functions.  It's a very simple class that just
  # has a thread accepting connections and a simple HttpServer.process_client function
  # to do the heavy lifting with the IO and Ruby.  
  #
  class HttpServer
    attr_reader :acceptor, :workers, :logger, :host, :port, :timeout, :max_queued_threads, :max_concurrent_threads
    
    DEFAULTS = {
      :timeout => 60,
      :host => '0.0.0.0',
      :port => 8080,
      :logger => Logger.new(STDERR),
      :max_queued_threads => 12, 
      :max_concurrent_threads => 4
    }

    # Creates a working server on host:port (strange things happen if port isn't a Number).
    # Use HttpServer::run to start the server and HttpServer.acceptor.join to 
    # join the thread that's processing incoming requests on the socket.
    #
    # The max_queued_threads optional argument is the maximum number of concurrent
    # processors to accept, anything over this is closed immediately to maintain
    # server processing performance.  This may seem mean but it is the most efficient
    # way to deal with overload.  Other schemes involve still parsing the client's request
    # which defeats the point of an overload handling system.
    # 
    def initialize(app, options = {})
      @app = app      
      @workers = ThreadGroup.new
      
      (DEFAULTS.to_a + options.to_a).each do |key, value|
        instance_variable_set("@#{key.to_s.downcase}", value)
      end

      @socket = TCPServer.new(@host, @port)       
      @socket.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) if defined?(Fcntl::FD_CLOEXEC)
    end

    # Does the majority of the IO processing.  It has been written in Ruby using
    # about 7 different IO processing strategies and no matter how it's done 
    # the performance just does not improve.  It is currently carefully constructed
    # to make sure that it gets the best possible performance, but anyone who
    # thinks they can make it faster is more than welcome to take a crack at it.
    def process_client(client)
      begin
        parser = HttpParser.new
        params = Hash.new
        request = nil
        data = client.readpartial(Const::CHUNK_SIZE)
        nparsed = 0

        # Assumption: nparsed will always be less since data will get filled with more
        # after each parsing.  If it doesn't get more then there was a problem
        # with the read operation on the client socket.  Effect is to stop processing when the
        # socket can't fill the buffer for further parsing.
        while nparsed < data.length
          nparsed = parser.execute(params, data, nparsed)

          if parser.finished?
            if !params[Const::REQUEST_PATH]
              # It might be a dumbass full host request header
              uri = URI.parse(params[Const::REQUEST_URI])
              params[Const::REQUEST_PATH] = uri.path
            end

            raise "No REQUEST PATH" if !params[Const::REQUEST_PATH]
 
            params[Const::PATH_INFO] = params[Const::REQUEST_PATH]
            params[Const::SCRIPT_NAME] = Const::SLASH

            # From http://www.ietf.org/rfc/rfc3875 :
            # "Script authors should be aware that the REMOTE_ADDR and REMOTE_HOST
            #  meta-variables (see sections 4.1.8 and 4.1.9) may not identify the
            #  ultimate source of the request.  They identify the client for the
            #  immediate request to the server; that client may be a proxy, gateway,
            #  or other intermediary acting on behalf of the actual source client."
            params[Const::REMOTE_ADDR] = client.peeraddr.last

            # Select handlers that want more detailed request notification
            request = HttpRequest.new(params, client, logger)

            # in the case of large file uploads the user could close the socket, so skip those requests
            break if request.body == nil  # nil signals from HttpRequest::initialize that the request was aborted
            app_response = @app.call(request.env)
            response = HttpResponse.new(client, app_response).start
          break #done
          else
            # Parser is not done, queue up more data to read and continue parsing
            chunk = client.readpartial(Const::CHUNK_SIZE)
            break if !chunk or chunk.length == 0  # read failed, stop processing

            data << chunk
            if data.length >= Const::MAX_HEADER
              raise HttpParserError.new("HEADER is longer than allowed, aborting client early.")
            end
          end
        end
      rescue EOFError,Errno::ECONNRESET,Errno::EPIPE,Errno::EINVAL,Errno::EBADF
        client.close rescue nil
      rescue HttpParserError => e
        logger.error "HTTP parse error, malformed request (#{params[Const::HTTP_X_FORWARDED_FOR] || client.peeraddr.last}): #{e.inspect}"
        logger.error "REQUEST DATA: #{data.inspect}\n---\nPARAMS: #{params.inspect}\n---\n"
      rescue Errno::EMFILE
        reap_dead_workers('too many files')
      rescue Object => e
        logger.error "Read error: #{e.inspect}"
        logger.error e.backtrace.join("\n")
      ensure
        begin
          client.close
        rescue IOError
          # Already closed
        rescue Object => e
          logger.error "Client error: #{e.inspect}"
          logger.error e.backtrace.join("\n")
        end
        request.body.close! if request and request.body.class == Tempfile
      end
    end

    # Used internally to kill off any worker threads that have taken too long
    # to complete processing.  Only called if there are too many processors
    # currently servicing.  It returns the count of workers still active
    # after the reap is done.  It only runs if there are workers to reap.
    def reap_dead_workers(reason='unknown')
      if @workers.list.length > 0
        logger.info "Reaping #{@workers.list.length} threads for slow workers because of '#{reason}'"
        error_msg = "Mongrel timed out this thread: #{reason}"
        mark = Time.now
        @workers.list.each do |worker|
          worker[:started_on] = Time.now if not worker[:started_on]

          if mark - worker[:started_on] > @timeout
            logger.info "Thread #{worker.inspect} is too old, killing."
            worker.raise(TimeoutError.new(error_msg))
          end
        end
      end

      return @workers.list.length
    end

    # Performs a wait on all the currently running threads and kills any that take
    # too long.  It waits by @timeout seconds, which can be set in .initialize or
    # via mongrel_rails.
    def graceful_shutdown
      while reap_dead_workers("shutdown") > 0
        logger.info "Waiting for #{@workers.list.length} requests to finish, could take #{@timeout} seconds."
        sleep @timeout / 10
      end
    end

    def configure_socket_options
      case RUBY_PLATFORM
      when /linux/
        # 9 is currently TCP_DEFER_ACCEPT
        $tcp_defer_accept_opts = [Socket::SOL_TCP, 9, 1]
        $tcp_cork_opts = [Socket::SOL_TCP, 3, 1]
      when /freebsd(([1-4]\..{1,2})|5\.[0-4])/
        # Do nothing, just closing a bug when freebsd <= 5.4
      when /freebsd/
        # Use the HTTP accept filter if available.
        # The struct made by pack() is defined in /usr/include/sys/socket.h as accept_filter_arg
        unless `/sbin/sysctl -nq net.inet.accf.http`.empty?
          $tcp_defer_accept_opts = [Socket::SOL_SOCKET, Socket::SO_ACCEPTFILTER, ['httpready', nil].pack('a16a240')]
        end
      end
    end
    
    # Runs the thing.  It returns the thread used so you can "join" it.  You can also
    # access the HttpServer::acceptor attribute to get the thread later.
    def start
      semaphore = Semaphore.new(@max_concurrent_threads)
      BasicSocket.do_not_reverse_lookup = true

      configure_socket_options

      if defined?($tcp_defer_accept_opts) and $tcp_defer_accept_opts
        @socket.setsockopt(*$tcp_defer_accept_opts) rescue nil
      end

      @acceptor = Thread.new do
        begin
          while true
            begin
              client = @socket.accept
  
              if defined?($tcp_cork_opts) and $tcp_cork_opts
                client.setsockopt(*$tcp_cork_opts) rescue nil
              end
  
              worker_list = @workers.list
              if worker_list.length >= @max_queued_threads
                logger.error "Server overloaded with #{worker_list.length} processors (#@max_queued_threads max). Dropping connection."
                client.close rescue nil
                reap_dead_workers("max processors")
              else
                thread = Thread.new(client) {|c| semaphore.synchronize { process_client(c) } }
                thread[:started_on] = Time.now
                @workers.add(thread)
              end
            rescue StopServer
              break
            rescue Errno::EMFILE
              reap_dead_workers("too many open files")
              sleep 0.5
            rescue Errno::ECONNABORTED
              # client closed the socket even before accept
              client.close rescue nil
            rescue Object => e
              logger.error "Unhandled listen loop exception #{e.inspect}."
              logger.error e.backtrace.join("\n")
            end
          end
          graceful_shutdown
        ensure
          @socket.close
          logger.info "Closed socket."
        end
      end

      @acceptor
    end

    # Stops the acceptor thread and then causes the worker threads to finish
    # off the request queue before finally exiting.
    def stop(synchronous = false)
      raise AcceptorError, "Server was not started." unless @acceptor
      @acceptor.raise(StopServer.new)
      (sleep(0.5) while @acceptor.alive?) if synchronous
      @acceptor = nil
    end
  end
end

git clone https://yhbt.net/unicorn.git