1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
| | # -*- encoding: binary -*-
# Copyright (C) 2013-2016 all contributors <yahns-public@yhbt.net>
# License: GPL-3.0+ (https://www.gnu.org/licenses/gpl-3.0.txt)
# frozen_string_literal: true
require 'socket'
require 'kgio'
require 'kcar' # gem install kcar
require 'rack/request'
require 'timeout'
require_relative 'proxy_http_response'
class Yahns::ProxyPass # :nodoc:
class ReqRes < Kgio::Socket # :nodoc:
attr_writer :resbuf
attr_accessor :proxy_trailers
def req_start(c, req, input, chunked)
@hdr = @resbuf = nil
@yahns_client = c
@rrstate = input ? [ req, input, chunked ] : req
Thread.current[:yahns_queue].queue_add(self, Yahns::Queue::QEV_WR)
end
def yahns_step # yahns event loop entry point
c = @yahns_client
case req = @rrstate
when Kcar::Parser # reading response...
buf = Thread.current[:yahns_rbuf]
case resbuf = @resbuf # where are we at the response?
when nil # common case, catch the response header in a single read
case rv = kgio_tryread(0x2000, buf)
when String
if res = req.headers(@hdr = [], rv)
return c.proxy_response_start(res, rv, req, self)
else # ugh, big headers or tricked response
# we must reinitialize the thread-local rbuf if it may
# live beyond the current thread
buf = Thread.current[:yahns_rbuf] = ''.dup
@resbuf = rv
end
# continue looping in middle "case @resbuf" loop
when :wait_readable
return rv # spurious wakeup
when nil then return c.proxy_err_response(502, self, nil, nil)
end # NOT looping here
when String # continue reading trickled response headers from upstream
case rv = kgio_tryread(0x2000, buf)
when String then res = req.headers(@hdr, resbuf << rv) and break
when :wait_readable then return rv
when nil then return c.proxy_err_response(502, self, nil, nil)
end while true
return c.proxy_response_start(res, resbuf, req, self)
when Yahns::WbufCommon # streaming/buffering the response body
# we assign wbuf for rescue below:
return c.proxy_response_finish(req, wbuf = resbuf, self)
end while true # case @resbuf
when Array # [ (str|vec), rack.input, chunked? ]
send_req_body(req) # returns nil or :wait_writable
when String # buffered request header
send_req_buf(req)
end
rescue => e
# avoid polluting logs with a giant backtrace when the problem isn't
# fixable in code.
case e
when Errno::ECONNREFUSED, Errno::ECONNRESET, Errno::EPIPE
e.set_backtrace([])
end
c.proxy_err_response(502, self, e, wbuf)
end
def send_req_body_chunk(buf)
case rv = String === buf ? kgio_trywrite(buf) : kgio_trywritev(buf)
when String, Array
buf.replace(rv) # retry loop on partial write
when :wait_writable, nil
# :wait_writable = upstream is reading slowly and making us wait
return rv
else
abort "BUG: #{rv.inspect} from kgio_trywrite*"
end while true
end
# returns :wait_readable if complete, :wait_writable if not
def send_req_body(req) # @rrstate == [ (str|vec), rack.input, chunked? ]
buf, input, chunked = req
# send the first buffered chunk or vector
rv = send_req_body_chunk(buf) and return rv # :wait_writable
# yay, sent the first chunk, now read the body!
rbuf = buf
if chunked
if String === buf # initial body
req[0] = buf = []
else
# try to reuse the biggest non-frozen buffer we just wrote;
rbuf = buf.max_by(&:size)
rbuf = ''.dup if rbuf.frozen? # unlikely...
end
end
# Note: input (env['rack.input']) is fully-buffered by default so
# we should not be waiting on a slow network resource when reading
# input. However, some weird configs may disable this on LANs
# and we may wait indefinitely on input.read here...
while input.read(0x2000, rbuf)
if chunked
buf[0] = "#{rbuf.size.to_s(16)}\r\n".freeze
buf[1] = rbuf
buf[2] = "\r\n".freeze
end
rv = send_req_body_chunk(buf) and return rv # :wait_writable
end
rbuf.clear # all done, clear the big buffer
# we cannot use respond_to?(:close) here since Rack::Lint::InputWrapper
# tries to prevent that (and hijack means all Rack specs go out the door)
case input
when Yahns::TeeInput, IO
input.close
end
# note: we do not send any trailer, they are folded into the header
# because this relies on full request buffering
# prepare_wait_readable is called by send_req_buf
chunked ? send_req_buf("0\r\n\r\n".freeze) : prepare_wait_readable
rescue Errno::EPIPE, Errno::ECONNRESET, Errno::ENOTCONN
# no more reading off the client socket, just prepare to forward
# the rejection response from the upstream (if any)
@yahns_client.to_io.shutdown(Socket::SHUT_RD)
prepare_wait_readable
end
def prepare_wait_readable
@rrstate = Kcar::Parser.new
:wait_readable # all done sending the request, wait for response
end
# n.b. buf must be a detached string not shared with
# Thread.current[:yahns_rbuf] of any thread
def send_req_buf(buf)
case rv = kgio_trywrite(buf)
when String
buf = rv # retry inner loop
when :wait_writable
@rrstate = buf
return :wait_writable
when nil
return prepare_wait_readable
end while true
end
end # class ReqRes
def initialize(dest, opts = {})
case dest
when %r{\Aunix:([^:]+)(?::(/.*))?\z}
path = $2
@sockaddr = Socket.sockaddr_un($1)
when %r{\Ahttp://([^/]+)(/.*)?\z}
path = $2
host, port = $1.split(':')
@sockaddr = Socket.sockaddr_in(port || 80, host)
else
raise ArgumentError, "destination must be an HTTP URL or unix: path"
end
@response_headers = opts[:response_headers] || {}
# It's wrong to send the backend Server tag through. Let users say
# { "Server => "yahns" } if they want to advertise for us, but don't
# advertise by default (for security)
@response_headers['Server'] ||= :ignore
init_path_vars(path)
end
def init_path_vars(path)
path ||= '$fullpath'
# methods from Rack::Request we want:
allow = %w(fullpath host_with_port host port url path)
want = path.scan(/\$(\w+)/).flatten! || []
diff = want - allow
diff.empty? or
raise ArgumentError, "vars not allowed: #{diff.uniq.join(' ')}"
# kill leading slash just in case...
@path = path.gsub(%r{\A/(\$(?:fullpath|path))}, '\1')
end
def call(env)
# 3-way handshake for TCP backends while we generate the request header
rr = ReqRes.start(@sockaddr)
c = env['rack.hijack'].call
req = Rack::Request.new(env)
req = @path.gsub(/\$(\w+)/) { req.__send__($1) }
# start the connection asynchronously and early so TCP can do a
case ver = env['HTTP_VERSION']
when 'HTTP/1.1' # leave alone, response may be chunked
else # no chunking for HTTP/1.0 and HTTP/0.9
ver = 'HTTP/1.0'.freeze
end
req = "#{env['REQUEST_METHOD']} #{req} #{ver}\r\n" \
"X-Forwarded-Proto: #{env['rack.url_scheme']}\r\n" \
"X-Forwarded-For: #{env["REMOTE_ADDR"]}\r\n".dup
# pass most HTTP_* headers through as-is
chunked = false
env.each do |key, val|
%r{\AHTTP_(\w+)\z} =~ key or next
key = $1
# trailers are folded into the header, so do not send the Trailer:
# header in the request
next if /\A(?:VERSION|CONNECTION|KEEP_ALIVE|X_FORWARDED_FOR|TRAILER)/ =~
key
'TRANSFER_ENCODING'.freeze == key && val =~ /\bchunked\b/i and
chunked = true
key.tr!('_'.freeze, '-'.freeze)
req << "#{key}: #{val}\r\n"
end
# special cases which Rack does not prefix:
ctype = env["CONTENT_TYPE"] and req << "Content-Type: #{ctype}\r\n"
clen = env["CONTENT_LENGTH"] and req << "Content-Length: #{clen}\r\n"
input = chunked || (clen && clen.to_i > 0) ? env['rack.input'] : nil
env['yahns.proxy_pass.response_headers'] = @response_headers
# finally, prepare to emit the headers
rr.req_start(c, req << "\r\n".freeze, input, chunked)
# this probably breaks fewer middlewares than returning whatever else...
[ 500, [], [] ]
rescue => e
Yahns::Log.exception(env['rack.logger'], 'proxy_pass', e)
[ 502, { 'Content-Length' => '0', 'Content-Type' => 'text/plain' }, [] ]
end
end
|