rainbows.git  about / heads / tags
Unicorn for sleepy apps and slow clients
blob 95d654599a3c135cee233983f8750167c6464382 3692 bytes (raw)
$ git show v0.3.0:lib/rainbows/base.rb	# shows this blob on the CLI

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
 
# -*- encoding: binary -*-

module Rainbows

  # base class for Rainbows concurrency models, this is currently
  # used by ThreadSpawn and ThreadPool models
  module Base

    include Unicorn
    include Rainbows::Const
    G = Rainbows::G

    # write a response without caring if it went out or not for error
    # messages.
    # TODO: merge into Unicorn::HttpServer
    def emergency_response(client, response_str)
      client.write_nonblock(response_str) rescue nil
      client.close rescue nil
    end

    def listen_loop_error(e)
      G.alive or return
      logger.error "Unhandled listen loop exception #{e.inspect}."
      logger.error e.backtrace.join("\n")
    end

    def init_worker_process(worker)
      super(worker)
      G.cur = 0
      G.max = worker_connections
      G.logger = logger
      G.app = app

      # we're don't use the self-pipe mechanism in the Rainbows! worker
      # since we don't defer reopening logs
      HttpServer::SELF_PIPE.each { |x| x.close }.clear
      trap(:USR1) { reopen_worker_logs(worker.nr) rescue nil }
      trap(:QUIT) do
        G.alive = false
        # closing anything we IO.select on will raise EBADF
        HttpServer::LISTENERS.map! { |s| s.close rescue nil }
      end
      [:TERM, :INT].each { |sig| trap(sig) { exit!(0) } } # instant shutdown
      logger.info "Rainbows! #@use worker_connections=#@worker_connections"
    end

    # once a client is accepted, it is processed in its entirety here
    # in 3 easy steps: read request, call app, write app response
    def process_client(client)
      buf = client.readpartial(CHUNK_SIZE)
      hp = HttpParser.new
      env = {}
      alive = true
      remote_addr = TCPSocket === client ? client.peeraddr.last : LOCALHOST

      begin # loop
        while ! hp.headers(env, buf)
          buf << client.readpartial(CHUNK_SIZE)
        end

        env[RACK_INPUT] = 0 == hp.content_length ?
                 HttpRequest::NULL_IO :
                 Unicorn::TeeInput.new(client, env, hp, buf)
        env[REMOTE_ADDR] = remote_addr
        response = app.call(env.update(RACK_DEFAULTS))

        if 100 == response.first.to_i
          client.write(EXPECT_100_RESPONSE)
          env.delete(HTTP_EXPECT)
          response = app.call(env)
        end

        alive = hp.keepalive? && G.alive
        out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if hp.headers?
        HttpResponse.write(client, response, out)
      end while alive and hp.reset.nil? and env.clear
      client.close
    # if we get any error, try to write something back to the client
    # assuming we haven't closed the socket, but don't get hung up
    # if the socket is already closed or broken.  We'll always ensure
    # the socket is closed at the end of this function
    rescue EOFError,Errno::ECONNRESET,Errno::EPIPE,Errno::EINVAL,Errno::EBADF
      emergency_response(client, ERROR_500_RESPONSE)
    rescue HttpParserError # try to tell the client they're bad
      buf.empty? or emergency_response(client, ERROR_400_RESPONSE)
    rescue Object => e
      emergency_response(client, ERROR_500_RESPONSE)
      logger.error "Read error: #{e.inspect}"
      logger.error e.backtrace.join("\n")
    end

    def join_threads(threads, worker)
      Rainbows::G.alive = false
      expire = Time.now + (timeout * 2.0)
      m = 0
      while (nr = threads.count { |thr| thr.alive? }) > 0
        threads.each { |thr|
          worker.tmp.chmod(m = 0 == m ? 1 : 0)
          thr.join(1)
          break if Time.now >= expire
        }
      end
    end

    def self.included(klass)
      klass.const_set :LISTENERS, HttpServer::LISTENERS
      klass.const_set :G, Rainbows::G
    end

  end
end

git clone https://yhbt.net/rainbows.git