1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
| | # -*- encoding: binary -*-
# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> and all contributors
# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
class Yahns::HttpClient < Kgio::Socket # :nodoc:
NULL_IO = StringIO.new("")
# FIXME: we shouldn't have this at all
Unicorn::HttpParser.keepalive_requests = 0xffffffff
include Yahns::HttpResponse
include Yahns::ClientExpire
QEV_FLAGS = Yahns::Queue::QEV_RD # used by acceptor
HTTP_RESPONSE_START = [ 'HTTP', '/1.1 ' ]
# A frozen format for this is about 15% faster (note from Mongrel)
REMOTE_ADDR = 'REMOTE_ADDR'.freeze
RACK_INPUT = 'rack.input'.freeze
RACK_HIJACK = 'rack.hijack'.freeze
RACK_HIJACK_IO = "rack.hijack_io".freeze
# called from acceptor thread
def yahns_init
@hs = Unicorn::HttpRequest.new
@response_start_sent = false
@state = :headers # :body, :trailers, :pipelined, Wbuf, StreamFile
@input = nil
end
# use if writes are deferred by buffering, this return value goes to
# the main epoll/kqueue worker loop
# returns :wait_readable, :wait_writable, or nil
def step_write
case rv = @state.wbuf_flush(self)
when :wait_writable, :wait_readable
return rv # tell epoll/kqueue to wait on this more
when :ignore # :ignore on hijack
@state = :ignore
return :ignore
when Yahns::StreamFile
@state = rv # continue looping
when true, false # done
return http_response_done(rv)
else
raise "BUG: #{@state.inspect}#wbuf_flush returned #{rv.inspect}"
end while true
end
# used only with "input_buffering true"
def mkinput_preread
k = self.class
len = @hs.content_length
mbs = k.client_max_body_size
if mbs && len && len > mbs
raise Unicorn::RequestEntityTooLargeError,
"Content-Length:#{len} too large (>#{mbs})", []
end
@state = :body
@input = k.tmpio_for(len)
rbuf = Thread.current[:yahns_rbuf]
@hs.filter_body(rbuf, @hs.buf)
@input.write(rbuf)
end
def input_ready
empty_body = 0 == @hs.content_length
k = self.class
case k.input_buffering
when true
# common case is an empty body
return NULL_IO if empty_body
# content_length is nil (chunked) or len > 0
mkinput_preread # keep looping
false
else # :lazy, false
empty_body ? NULL_IO : (@input = k.mkinput(self, @hs))
end
end
# the main entry point of the epoll/kqueue worker loop
def yahns_step
# always write unwritten data first if we have any
return step_write if Yahns::WbufCommon === @state
# only read if we had nothing to write in this event loop iteration
k = self.class
rbuf = Thread.current[:yahns_rbuf] # running under spawn_worker_threads
case @state
when :pipelined
if @hs.parse
input = input_ready and return app_call(input)
# @state == :body if we get here point (input_ready -> mkinput_preread)
else
@state = :headers
end
# continue to outer loop
when :headers
case rv = kgio_tryread(k.client_header_buffer_size, rbuf)
when String
if @hs.add_parse(rv)
input = input_ready and return app_call(input)
break # to outer loop to reevaluate @state == :body
end
# keep looping on kgio_tryread
when :wait_readable, :wait_writable, nil
return rv
end while true
when :body
if @hs.body_eof?
if @hs.content_length || @hs.parse # hp.parse == trailers done!
@input.rewind
return app_call(@input)
else # possible Transfer-Encoding:chunked, keep looping
@state = :trailers
end
else
case rv = kgio_tryread(k.client_body_buffer_size, rbuf)
when String
@hs.filter_body(rbuf, @hs.buf << rbuf)
@input.write(rbuf)
# keep looping on kgio_tryread...
when :wait_readable, :wait_writable
return rv # have epoll/kqueue wait for more
when nil # unexpected EOF
return @input.close # nil
end # continue to outer loop (case @state)
end
when :trailers
case rv = kgio_tryread(k.client_header_buffer_size, rbuf)
when String
if @hs.add_parse(rbuf)
@input.rewind
return app_call(@input)
end
# keep looping on kgio_tryread...
when :wait_readable, :wait_writable
return rv # wait for more
when nil # unexpected EOF
return @input.close # nil
end while true
end while true # outer loop
rescue => e
handle_error(e)
end
def app_call(input)
env = @hs.env
env[REMOTE_ADDR] = @kgio_addr
env[RACK_HIJACK] = hijack_proc(env)
env[RACK_INPUT] = input
k = self.class
if k.check_client_connection && @hs.headers?
@response_start_sent = true
# FIXME: we should buffer this just in case
HTTP_RESPONSE_START.each { |c| kgio_write(c) }
end
# run the rack app
response = k.app.call(env.merge!(k.app_defaults))
return :ignore if env.include?(RACK_HIJACK_IO)
# this returns :wait_readable, :wait_writable, :ignore, or nil:
http_response_write(*response)
end
def hijack_proc(env)
proc { env[RACK_HIJACK_IO] = self }
end
# called automatically by kgio_write
def kgio_wait_writable(timeout = self.class.client_timeout)
super timeout
end
# called automatically by kgio_read
def kgio_wait_readable(timeout = self.class.client_timeout)
super timeout
end
# if we get any error, try to write something back to the client
# assuming we haven't closed the socket, but don't get hung up
# if the socket is already closed or broken. We'll always return
# nil to ensure the socket is closed at the end of this function
def handle_error(e)
code = case e
when EOFError,Errno::ECONNRESET,Errno::EPIPE,Errno::ENOTCONN
return # don't send response, drop the connection
when Unicorn::RequestURITooLongError
414
when Unicorn::RequestEntityTooLargeError
413
when Unicorn::HttpParserError # try to tell the client they're bad
400
else
Yahns::Log.exception(@hs.env["rack.logger"], "app error", e)
500
end
kgio_trywrite(err_response(code))
rescue
ensure
shutdown rescue nil
return # always drop the connection on uncaught errors
end
end
|