1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
| | # -*- encoding: binary -*-
module Rainbows
# base class for Rainbows concurrency models, this is currently
# used by ThreadSpawn and ThreadPool models
module Base
include Unicorn
include Rainbows::Const
G = Rainbows::G
# write a response without caring if it went out or not for error
# messages.
# TODO: merge into Unicorn::HttpServer
def emergency_response(client, response_str)
client.write_nonblock(response_str) rescue nil
client.close rescue nil
end
def listen_loop_error(e)
G.alive or return
logger.error "Unhandled listen loop exception #{e.inspect}."
logger.error e.backtrace.join("\n")
end
def init_worker_process(worker)
super(worker)
G.cur = 0
G.max = worker_connections
G.logger = logger
G.app = app
# we're don't use the self-pipe mechanism in the Rainbows! worker
# since we don't defer reopening logs
HttpServer::SELF_PIPE.each { |x| x.close }.clear
trap(:USR1) { reopen_worker_logs(worker.nr) rescue nil }
trap(:QUIT) do
G.alive = false
# closing anything we IO.select on will raise EBADF
HttpServer::LISTENERS.map! { |s| s.close rescue nil }
end
[:TERM, :INT].each { |sig| trap(sig) { exit!(0) } } # instant shutdown
logger.info "Rainbows! #@use worker_connections=#@worker_connections"
end
# once a client is accepted, it is processed in its entirety here
# in 3 easy steps: read request, call app, write app response
def process_client(client)
buf = client.readpartial(CHUNK_SIZE)
hp = HttpParser.new
env = {}
alive = true
remote_addr = TCPSocket === client ? client.peeraddr.last : LOCALHOST
begin # loop
while ! hp.headers(env, buf)
buf << client.readpartial(CHUNK_SIZE)
end
env[RACK_INPUT] = 0 == hp.content_length ?
HttpRequest::NULL_IO :
Unicorn::TeeInput.new(client, env, hp, buf)
env[REMOTE_ADDR] = remote_addr
response = app.call(env.update(RACK_DEFAULTS))
if 100 == response.first.to_i
client.write(EXPECT_100_RESPONSE)
env.delete(HTTP_EXPECT)
response = app.call(env)
end
alive = hp.keepalive? && G.alive
out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if hp.headers?
HttpResponse.write(client, response, out)
end while alive and hp.reset.nil? and env.clear
client.close
# if we get any error, try to write something back to the client
# assuming we haven't closed the socket, but don't get hung up
# if the socket is already closed or broken. We'll always ensure
# the socket is closed at the end of this function
rescue EOFError,Errno::ECONNRESET,Errno::EPIPE,Errno::EINVAL,Errno::EBADF
emergency_response(client, ERROR_500_RESPONSE)
rescue HttpParserError # try to tell the client they're bad
buf.empty? or emergency_response(client, ERROR_400_RESPONSE)
rescue Object => e
emergency_response(client, ERROR_500_RESPONSE)
logger.error "Read error: #{e.inspect}"
logger.error e.backtrace.join("\n")
end
def join_threads(threads, worker)
Rainbows::G.alive = false
expire = Time.now + (timeout * 2.0)
m = 0
until (threads.delete_if { |thr| ! thr.alive? }).empty?
threads.each { |thr|
worker.tmp.chmod(m = 0 == m ? 1 : 0)
thr.join(1)
break if Time.now >= expire
}
end
end
def self.included(klass)
klass.const_set :LISTENERS, HttpServer::LISTENERS
klass.const_set :G, Rainbows::G
end
end
end
|