rainbows.git  about / heads / tags
Unicorn for sleepy apps and slow clients
blob 2f4d3799deb9fbeb4d83affed2cdd869a6b3cf8b 3428 bytes (raw)
$ git show v0.97.0:lib/rainbows/base.rb	# shows this blob on the CLI

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
 
# -*- encoding: binary -*-

# base class for \Rainbows! concurrency models, this is currently used by
# ThreadSpawn and ThreadPool models.  Base is also its own
# (non-)concurrency model which is basically Unicorn-with-keepalive, and
# not intended for production use, as keepalive with a pure prefork
# concurrency model is extremely expensive.
module Rainbows::Base

  # :stopdoc:
  include Rainbows::Const
  include Rainbows::Response

  # shortcuts...
  G = Rainbows::G
  NULL_IO = Unicorn::HttpRequest::NULL_IO
  TeeInput = Rainbows::TeeInput
  HttpParser = Unicorn::HttpParser

  # this method is called by all current concurrency models
  def init_worker_process(worker) # :nodoc:
    super(worker)
    Rainbows::Response.setup(self.class)
    Rainbows::MaxBody.setup
    G.tmp = worker.tmp

    # avoid spurious wakeups and blocking-accept() with 1.8 green threads
    if ! defined?(RUBY_ENGINE) && RUBY_VERSION.to_f < 1.9
      require "io/nonblock"
      Rainbows::HttpServer::LISTENERS.each { |l| l.nonblock = true }
    end

    # we're don't use the self-pipe mechanism in the Rainbows! worker
    # since we don't defer reopening logs
    Rainbows::HttpServer::SELF_PIPE.each { |x| x.close }.clear
    trap(:USR1) { reopen_worker_logs(worker.nr) }
    trap(:QUIT) { G.quit! }
    [:TERM, :INT].each { |sig| trap(sig) { exit!(0) } } # instant shutdown
    logger.info "Rainbows! #@use worker_connections=#@worker_connections"
  end

  def wait_headers_readable(client)  # :nodoc:
    IO.select([client], nil, nil, G.kato)
  end

  # once a client is accepted, it is processed in its entirety here
  # in 3 easy steps: read request, call app, write app response
  # this is used by synchronous concurrency models
  #   Base, ThreadSpawn, ThreadPool
  def process_client(client) # :nodoc:
    buf = client.readpartial(CHUNK_SIZE) # accept filters protect us here
    hp = HttpParser.new
    env = {}
    remote_addr = Rainbows.addr(client)

    begin # loop
      until hp.headers(env, buf)
        wait_headers_readable(client) or return
        buf << client.readpartial(CHUNK_SIZE)
      end

      env[CLIENT_IO] = client
      env[RACK_INPUT] = 0 == hp.content_length ?
                        NULL_IO : TeeInput.new(client, env, hp, buf)
      env[REMOTE_ADDR] = remote_addr
      status, headers, body = app.call(env.update(RACK_DEFAULTS))

      if 100 == status.to_i
        client.write(EXPECT_100_RESPONSE)
        env.delete(HTTP_EXPECT)
        status, headers, body = app.call(env)
      end

      if hp.headers?
        headers = HH.new(headers)
        range = make_range!(env, status, headers) and status = range.shift
        env = false unless hp.keepalive? && G.alive
        headers[CONNECTION] = env ? KEEP_ALIVE : CLOSE
        client.write(response_header(status, headers))
      end
      write_body(client, body, range)
    end while env && env.clear && hp.reset.nil?
  # if we get any error, try to write something back to the client
  # assuming we haven't closed the socket, but don't get hung up
  # if the socket is already closed or broken.  We'll always ensure
  # the socket is closed at the end of this function
  rescue => e
    Rainbows::Error.write(client, e)
  ensure
    client.close unless client.closed?
  end

  def self.included(klass) # :nodoc:
    klass.const_set :LISTENERS, Rainbows::HttpServer::LISTENERS
    klass.const_set :G, Rainbows::G
  end

  # :startdoc:
end

git clone https://yhbt.net/rainbows.git