1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
| | # -*- encoding: binary -*-
# Copyright (C) 2013-2015 all contributors <yahns-public@yhbt.net>
# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
require 'socket'
require 'kgio'
require 'kcar' # gem install kcar
require 'rack/request'
require 'timeout'
require_relative 'proxy_http_response'
class Yahns::ProxyPass # :nodoc:
class ReqRes < Kgio::Socket
attr_writer :resbuf
attr_accessor :proxy_trailers
def req_start(c, req, input, chunked)
@hdr = @resbuf = nil
@yahns_client = c
@rrstate = input ? [ req, input, chunked ] : req
Thread.current[:yahns_queue].queue_add(self, Yahns::Queue::QEV_WR)
end
# we must reinitialize the thread-local rbuf if it may get beyond the
# current thread
def detach_rbuf!
Thread.current[:yahns_rbuf] = ''
end
def yahns_step # yahns event loop entry point
case req = @rrstate
when Kcar::Parser # reading response...
buf = Thread.current[:yahns_rbuf]
c = @yahns_client
case resbuf = @resbuf # where are we at the response?
when nil # common case, catch the response header in a single read
case rv = kgio_tryread(0x2000, buf)
when String
if res = req.headers(@hdr = [], rv)
return c.proxy_response_start(res, rv, req, self)
else # ugh, big headers or tricked response
buf = detach_rbuf!
@resbuf = rv
end
# continue looping in middle "case @resbuf" loop
when :wait_readable
return rv # spurious wakeup
when nil then return c.proxy_err_response(502, self, nil, nil)
end # NOT looping here
when String # continue reading trickled response headers from upstream
case rv = kgio_tryread(0x2000, buf)
when String then res = req.headers(@hdr, resbuf << rv) and break
when :wait_readable then return rv
when nil then return c.proxy_err_response(502, self, nil, nil)
end while true
return c.proxy_response_start(res, resbuf, req, self)
when Yahns::WbufCommon # streaming/buffering the response body
return c.proxy_response_finish(req, resbuf, self)
end while true # case @resbuf
when Array # [ (str|vec), rack.input, chunked? ]
send_req_body(req) # returns nil or :wait_writable
when String # buffered request header
send_req_buf(req)
end
rescue => e
c.proxy_err_response(502, self, e, nil)
end
# Called by the Rack server at the end of a successful response
def close
@hdr = @yahns_client = @rrstate = nil
super
end
# returns :wait_readable if complete, :wait_writable if not
def send_req_body(req)
buf, input, chunked = req
# get the first buffered chunk or vector
case rv = String === buf ? kgio_trywrite(buf) : kgio_trywritev(buf)
when String, Array
buf = rv # retry inner loop
when :wait_writable
req[0] = buf
return :wait_writable
when nil
break # onto writing body
end while true
buf = Thread.current[:yahns_rbuf]
# Note: input (env['rack.input']) is fully-buffered by default so
# we should not be waiting on a slow network resource when reading
# input. However, some weird configs may disable this on LANs
if chunked
while input.read(0x2000, buf)
vec = [ "#{buf.size.to_s(16)}\r\n", buf, "\r\n".freeze ]
case rv = kgio_trywritev(vec)
when Array
vec = rv # partial write, retry in case loop
when :wait_writable
detach_rbuf!
req[0] = vec
return :wait_writable
when nil
break # continue onto reading next chunk
end while true
end
close_req_body(input)
# note: we do not send any trailer, they are folded into the header
# because this relies on full request buffering
send_req_buf("0\r\n\r\n".freeze)
# prepare_wait_readable already called by send_req_buf
else # identity request, easy:
while input.read(0x2000, buf)
case rv = kgio_trywrite(buf)
when String
buf = rv # partial write, retry in case loop
when :wait_writable
detach_rbuf!
req[0] = buf
return :wait_writable
when nil
break # continue onto reading next block
end while true
end
close_req_body(input)
prepare_wait_readable
end
end
def prepare_wait_readable
@rrstate = Kcar::Parser.new
:wait_readable # all done sending the request, wait for response
end
def close_req_body(input)
case input
when Yahns::TeeInput, IO, StringIO
input.close
end
end
# n.b. buf must be a detached string not shared with
# Thread.current[:yahns_rbuf] of any thread
def send_req_buf(buf)
case rv = kgio_trywrite(buf)
when String
buf = rv # retry inner loop
when :wait_writable
@rrstate = buf
return :wait_writable
when nil
return prepare_wait_readable
end while true
end
end # class ReqRes
def initialize(dest)
case dest
when %r{\Aunix:([^:]+)(?::(/.*))?\z}
path = $2
@sockaddr = Socket.sockaddr_un($1)
when %r{\Ahttp://([^/]+)(/.*)?\z}
path = $2
host, port = $1.split(':')
@sockaddr = Socket.sockaddr_in(port || 80, host)
else
raise ArgumentError, "destination must be an HTTP URL or unix: path"
end
init_path_vars(path)
end
def init_path_vars(path)
path ||= '$fullpath'
# methods from Rack::Request we want:
allow = %w(fullpath host_with_port host port url path)
want = path.scan(/\$(\w+)/).flatten! || []
diff = want - allow
diff.empty? or
raise ArgumentError, "vars not allowed: #{diff.uniq.join(' ')}"
# kill leading slash just in case...
@path = path.gsub(%r{\A/(\$(?:fullpath|path))}, '\1')
end
def call(env)
# 3-way handshake for TCP backends while we generate the request header
rr = ReqRes.start(@sockaddr)
c = env['rack.hijack'].call
req = Rack::Request.new(env)
req = @path.gsub(/\$(\w+)/) { req.__send__($1) }
# start the connection asynchronously and early so TCP can do a
case ver = env['HTTP_VERSION']
when 'HTTP/1.1' # leave alone, response may be chunked
else # no chunking for HTTP/1.0 and HTTP/0.9
ver = 'HTTP/1.0'.freeze
end
req = "#{env['REQUEST_METHOD']} #{req} #{ver}\r\n" \
"X-Forwarded-For: #{env["REMOTE_ADDR"]}\r\n"
# pass most HTTP_* headers through as-is
chunked = false
env.each do |key, val|
%r{\AHTTP_(\w+)\z} =~ key or next
key = $1
# trailers are folded into the header, so do not send the Trailer:
# header in the request
next if /\A(?:VERSION|CONNECTION|KEEP_ALIVE|X_FORWARDED_FOR|TRAILER)/ =~
key
chunked = true if %r{\ATRANSFER_ENCODING} =~ key && val =~ /\bchunked\b/i
key.tr!('_'.freeze, '-'.freeze)
req << "#{key}: #{val}\r\n"
end
# special cases which Rack does not prefix:
ctype = env["CONTENT_TYPE"] and req << "Content-Type: #{ctype}\r\n"
clen = env["CONTENT_LENGTH"] and req << "Content-Length: #{clen}\r\n"
input = chunked || (clen && clen.to_i > 0) ? env['rack.input'] : nil
# finally, prepare to emit the headers
rr.req_start(c, req << "\r\n".freeze, input, chunked)
rescue => e
Yahns::Log.exception(env['rack.logger'], 'proxy_pass', e)
[ 502, [ %w(Content-Length 0), %w(Content-Type text/plain) ], [] ]
end
end
|