* [PATCH 0/7] proxy_pass cleanups
@ 2016-05-16 1:43 Eric Wong
2016-05-16 1:43 ` [PATCH 1/7] proxy_pass: simplify writing request bodies upstream Eric Wong
` (7 more replies)
0 siblings, 8 replies; 9+ messages in thread
From: Eric Wong @ 2016-05-16 1:43 UTC (permalink / raw)
To: yahns-public
A bunch of cleanups to hopefully make the proxy_pass-related
code a little saner and easier-to-follow.
I introduced at least one bug during this series which got fixed
in 7/7.
Extra sets of eyes to review would be greatly appreciated, thanks!
And this is running live and serving critical information to
readers of https://yhbt.net/ in all its glory!
Once I'm reasonably satisfied with this; I'll continue work
on making "proxy_buffering: false" work, so slow-client-capable
upstreams can generate gigantic (hundreds of megabytes!)
responses without filesystem overhead.
But first, I think I should work on making those gigantic
responses cheaper in terms of memory/CPU usage outside of
yahns. This is git-http-backend for serving mega repos
over smart HTTP, yahns is already great for dumb HTTP
git clones.
lib/yahns/proxy_http_response.rb | 236 +++++++++++++++------------------------
lib/yahns/proxy_pass.rb | 171 +---------------------------
lib/yahns/req_res.rb | 159 ++++++++++++++++++++++++++
3 files changed, 252 insertions(+), 314 deletions(-)
Eric Wong (7):
proxy_pass: simplify writing request bodies upstream
proxy_pass: hoist out proxy_res_headers method
proxy_pass: simplify proxy_http_response
proxy_pass: split out body and trailer reading in response
proxy_pass: trim down proxy_response_finish, too
proxy_pass: split out req_res into a separate file
proxy_pass: fix resumes after complete buffering is unblocked
^ permalink raw reply [flat|nested] 9+ messages in thread
* [PATCH 1/7] proxy_pass: simplify writing request bodies upstream
2016-05-16 1:43 [PATCH 0/7] proxy_pass cleanups Eric Wong
@ 2016-05-16 1:43 ` Eric Wong
2016-05-16 1:43 ` [PATCH 2/7] proxy_pass: hoist out proxy_res_headers method Eric Wong
` (6 subsequent siblings)
7 siblings, 0 replies; 9+ messages in thread
From: Eric Wong @ 2016-05-16 1:43 UTC (permalink / raw)
To: yahns-public
The cost of extra branches inside a loop is negligible compared
to the cost of all the other method calls we make. Favor
smaller code instead and inline some (now) single-use methods.
Furthermore, this allows us to reuse the request header buffer
instead of relying on thread-local storage and potentially
having to to swap buffers.
---
lib/yahns/proxy_pass.rb | 112 +++++++++++++++++++++---------------------------
1 file changed, 49 insertions(+), 63 deletions(-)
diff --git a/lib/yahns/proxy_pass.rb b/lib/yahns/proxy_pass.rb
index 148957b..a2d7d81 100644
--- a/lib/yahns/proxy_pass.rb
+++ b/lib/yahns/proxy_pass.rb
@@ -22,12 +22,6 @@ def req_start(c, req, input, chunked)
Thread.current[:yahns_queue].queue_add(self, Yahns::Queue::QEV_WR)
end
- # we must reinitialize the thread-local rbuf if it may get beyond the
- # current thread
- def detach_rbuf!
- Thread.current[:yahns_rbuf] = ''.dup
- end
-
def yahns_step # yahns event loop entry point
c = @yahns_client
case req = @rrstate
@@ -42,7 +36,9 @@ def yahns_step # yahns event loop entry point
if res = req.headers(@hdr = [], rv)
return c.proxy_response_start(res, rv, req, self)
else # ugh, big headers or tricked response
- buf = detach_rbuf!
+ # we must reinitialize the thread-local rbuf if it may
+ # live beyond the current thread
+ buf = Thread.current[:yahns_rbuf] = ''.dup
@resbuf = rv
end
# continue looping in middle "case @resbuf" loop
@@ -83,64 +79,63 @@ def yahns_step # yahns event loop entry point
c.proxy_err_response(502, self, e, wbuf)
end
- # returns :wait_readable if complete, :wait_writable if not
- def send_req_body(req)
- buf, input, chunked = req
-
- # get the first buffered chunk or vector
+ def send_req_body_chunk(buf)
case rv = String === buf ? kgio_trywrite(buf) : kgio_trywritev(buf)
when String, Array
- buf = rv # retry inner loop
- when :wait_writable
- req[0] = buf
- return :wait_writable
- when nil
- break # onto writing body
+ buf.replace(rv) # retry loop on partial write
+ when :wait_writable, nil
+ # :wait_writable = upstream is reading slowly and making us wait
+ return rv
+ else
+ abort "BUG: #{rv.inspect} from kgio_trywrite*"
end while true
+ end
- buf = Thread.current[:yahns_rbuf]
+ # returns :wait_readable if complete, :wait_writable if not
+ def send_req_body(req) # @rrstate == [ (str|vec), rack.input, chunked? ]
+ buf, input, chunked = req
- # Note: input (env['rack.input']) is fully-buffered by default so
- # we should not be waiting on a slow network resource when reading
- # input. However, some weird configs may disable this on LANs
+ # send the first buffered chunk or vector
+ rv = send_req_body_chunk(buf) and return rv # :wait_writable
+ # yay, sent the first chunk, now read the body!
+ rbuf = buf
if chunked
- while input.read(0x2000, buf)
- vec = [ "#{buf.size.to_s(16)}\r\n", buf, "\r\n".freeze ]
- case rv = kgio_trywritev(vec)
- when Array
- vec = rv # partial write, retry in case loop
- when :wait_writable
- detach_rbuf!
- req[0] = vec
- return :wait_writable
- when nil
- break # continue onto reading next chunk
- end while true
+ if String === buf # initial body
+ req[0] = buf = []
+ else
+ # try to reuse the biggest non-frozen buffer we just wrote;
+ rbuf = buf.max_by(&:size)
+ rbuf = ''.dup if rbuf.frozen? # unlikely...
end
- close_req_body(input)
-
- # note: we do not send any trailer, they are folded into the header
- # because this relies on full request buffering
- send_req_buf("0\r\n\r\n".freeze)
- # prepare_wait_readable already called by send_req_buf
- else # identity request, easy:
- while input.read(0x2000, buf)
- case rv = kgio_trywrite(buf)
- when String
- buf = rv # partial write, retry in case loop
- when :wait_writable
- detach_rbuf!
- req[0] = buf
- return :wait_writable
- when nil
- break # continue onto reading next block
- end while true
+ end
+
+ # Note: input (env['rack.input']) is fully-buffered by default so
+ # we should not be waiting on a slow network resource when reading
+ # input. However, some weird configs may disable this on LANs
+ # and we may wait indefinitely on input.read here...
+ while input.read(0x2000, rbuf)
+ if chunked
+ buf[0] = "#{rbuf.size.to_s(16)}\r\n".freeze
+ buf[1] = rbuf
+ buf[2] = "\r\n".freeze
end
+ rv = send_req_body_chunk(buf) and return rv # :wait_writable
+ end
+
+ rbuf.clear # all done, clear the big buffer
- close_req_body(input)
- prepare_wait_readable
+ # we cannot use respond_to?(:close) here since Rack::Lint::InputWrapper
+ # tries to prevent that (and hijack means all Rack specs go out the door)
+ case input
+ when Yahns::TeeInput, IO
+ input.close
end
+
+ # note: we do not send any trailer, they are folded into the header
+ # because this relies on full request buffering
+ # prepare_wait_readable is called by send_req_buf
+ chunked ? send_req_buf("0\r\n\r\n".freeze) : prepare_wait_readable
rescue Errno::EPIPE, Errno::ECONNRESET, Errno::ENOTCONN
# no more reading off the client socket, just prepare to forward
# the rejection response from the upstream (if any)
@@ -153,15 +148,6 @@ def prepare_wait_readable
:wait_readable # all done sending the request, wait for response
end
- def close_req_body(input)
- # we cannot use respond_to?(:close) here since Rack::Lint::InputWrapper
- # tries to prevent that (and hijack means all Rack specs go out the door)
- case input
- when Yahns::TeeInput, IO
- input.close
- end
- end
-
# n.b. buf must be a detached string not shared with
# Thread.current[:yahns_rbuf] of any thread
def send_req_buf(buf)
^ permalink raw reply related [flat|nested] 9+ messages in thread
* [PATCH 2/7] proxy_pass: hoist out proxy_res_headers method
2016-05-16 1:43 [PATCH 0/7] proxy_pass cleanups Eric Wong
2016-05-16 1:43 ` [PATCH 1/7] proxy_pass: simplify writing request bodies upstream Eric Wong
@ 2016-05-16 1:43 ` Eric Wong
2016-05-16 1:43 ` [PATCH 3/7] proxy_pass: simplify proxy_http_response Eric Wong
` (5 subsequent siblings)
7 siblings, 0 replies; 9+ messages in thread
From: Eric Wong @ 2016-05-16 1:43 UTC (permalink / raw)
To: yahns-public
proxy_response_start is gigantic an hard-to-read, we can
more clearly see the lifetimes of some objects, now, and
hopefully shorten some of them.
---
lib/yahns/proxy_http_response.rb | 14 +++++++++-----
1 file changed, 9 insertions(+), 5 deletions(-)
diff --git a/lib/yahns/proxy_http_response.rb b/lib/yahns/proxy_http_response.rb
index bd274fe..693528f 100644
--- a/lib/yahns/proxy_http_response.rb
+++ b/lib/yahns/proxy_http_response.rb
@@ -61,11 +61,7 @@ def wait_on_upstream(req_res, alive, wbuf)
:wait_readable # self remains in :ignore, wait on upstream
end
- # start streaming the response once upstream is done sending headers to us.
- # returns :wait_readable if we need to read more from req_res
- # returns :ignore if we yield control to the client(self)
- # returns nil if completely done
- def proxy_response_start(res, tip, kcar, req_res)
+ def proxy_res_headers(res)
status, headers = res
code = status.to_i
msg = Rack::Utils::HTTP_STATUS_CODES[code]
@@ -125,7 +121,15 @@ def proxy_response_start(res, tip, kcar, req_res)
wbuf = proxy_write(nil, res, alive)
break # keep buffering as much as possible
end while true
+ [ alive, wbuf, have_body ]
+ end
+ # start streaming the response once upstream is done sending headers to us.
+ # returns :wait_readable if we need to read more from req_res
+ # returns :ignore if we yield control to the client(self)
+ # returns nil if completely done
+ def proxy_response_start(res, tip, kcar, req_res)
+ alive, wbuf, have_body = proxy_res_headers(res)
rbuf = Thread.current[:yahns_rbuf]
tip = tip.empty? ? [] : [ tip ]
^ permalink raw reply related [flat|nested] 9+ messages in thread
* [PATCH 3/7] proxy_pass: simplify proxy_http_response
2016-05-16 1:43 [PATCH 0/7] proxy_pass cleanups Eric Wong
2016-05-16 1:43 ` [PATCH 1/7] proxy_pass: simplify writing request bodies upstream Eric Wong
2016-05-16 1:43 ` [PATCH 2/7] proxy_pass: hoist out proxy_res_headers method Eric Wong
@ 2016-05-16 1:43 ` Eric Wong
2016-05-16 1:43 ` [PATCH 4/7] proxy_pass: split out body and trailer reading in response Eric Wong
` (4 subsequent siblings)
7 siblings, 0 replies; 9+ messages in thread
From: Eric Wong @ 2016-05-16 1:43 UTC (permalink / raw)
To: yahns-public
Again, the cost of having extra branches within a loop is
probably neglible compared to having bigger bytecode resulting
in worse CPU cache performance and increased maintenance
overhead for extra code.
---
lib/yahns/proxy_http_response.rb | 76 ++++++++++++++++++----------------------
1 file changed, 34 insertions(+), 42 deletions(-)
diff --git a/lib/yahns/proxy_http_response.rb b/lib/yahns/proxy_http_response.rb
index 693528f..00915df 100644
--- a/lib/yahns/proxy_http_response.rb
+++ b/lib/yahns/proxy_http_response.rb
@@ -134,38 +134,45 @@ def proxy_response_start(res, tip, kcar, req_res)
tip = tip.empty? ? [] : [ tip ]
if have_body
- if len = kcar.body_bytes_left
+ req_res.proxy_trailers = nil # define to avoid uninitialized warnings
+ chunk = ''.dup if kcar.chunked?
+ tlr = nil
+ len = kcar.body_bytes_left
- case tmp = tip.shift || req_res.kgio_tryread(0x2000, rbuf)
- when String
- len = kcar.body_bytes_left -= tmp.size
- wbuf = proxy_write(wbuf, tmp, alive)
- when nil # premature EOF
- return proxy_err_response(nil, req_res, nil, wbuf)
- when :wait_readable
- return wait_on_upstream(req_res, alive, wbuf)
- end until len == 0
-
- elsif kcar.chunked? # nasty chunked body
- req_res.proxy_trailers = nil # define to avoid warnings for now
- buf = ''.dup
- case tmp = tip.shift || req_res.kgio_tryread(0x2000, rbuf)
- when String
- kcar.filter_body(buf, tmp)
- wbuf = proxy_write(wbuf, chunk_out(buf), alive) unless buf.empty?
- when nil # premature EOF
- return proxy_err_response(nil, req_res, nil, wbuf)
- when :wait_readable
- return wait_on_upstream(req_res, alive, wbuf)
- end until kcar.body_eof?
+ case tmp = tip.shift || req_res.kgio_tryread(0x2000, rbuf)
+ when String
+ if len
+ kcar.body_bytes_left -= tmp.size # progress for body_eof? => true
+ elsif chunk
+ kcar.filter_body(chunk, rbuf = tmp) # progress for body_eof? => true
+ next if chunk.empty? # read req_res.kgio_tryread for more
+ tmp = chunk_out(chunk)
+ elsif alive # HTTP/1.0 upstream, HTTP/1.1 client
+ tmp = chunk_out(tmp)
+ # else # HTTP/1.0 upstream, HTTP/1.0 client, do nothing
+ end
+ wbuf = proxy_write(wbuf, tmp, alive)
+ chunk.clear if chunk
+ when nil # EOF
+ # HTTP/1.1 upstream, unexpected premature EOF:
+ return proxy_err_response(nil, req_res, nil, wbuf) if len || chunk
+
+ # HTTP/1.0 upstream:
+ wbuf = proxy_write(wbuf, "0\r\n\r\n".freeze, true) if alive
+ req_res.shutdown
+ break
+ when :wait_readable
+ return wait_on_upstream(req_res, alive, wbuf)
+ end until kcar.body_eof?
- buf = tmp
- req_res.proxy_trailers = [ buf, tlr = [] ]
+ if chunk
+ chunk = rbuf
+ req_res.proxy_trailers = [ chunk, tlr = [] ]
rbuf = Thread.current[:yahns_rbuf] = ''.dup
- until kcar.trailers(tlr, buf)
+ until kcar.trailers(tlr, chunk)
case rv = req_res.kgio_tryread(0x2000, rbuf)
when String
- buf << rv
+ chunk << rv
when :wait_readable
return wait_on_upstream(req_res, alive, wbuf)
when nil # premature EOF
@@ -173,21 +180,6 @@ def proxy_response_start(res, tip, kcar, req_res)
end # no loop here
end
wbuf = proxy_write(wbuf, trailer_out(tlr), alive)
-
- else # no Content-Length or Transfer-Encoding: chunked, wait on EOF!
-
- case tmp = tip.shift || req_res.kgio_tryread(0x2000, rbuf)
- when String
- tmp = chunk_out(tmp) if alive
- wbuf = proxy_write(wbuf, tmp, alive)
- when nil
- wbuf = proxy_write(wbuf, "0\r\n\r\n".freeze, true) if alive
- req_res.shutdown
- break
- when :wait_readable
- return wait_on_upstream(req_res, alive, wbuf)
- end while true
-
end
end
^ permalink raw reply related [flat|nested] 9+ messages in thread
* [PATCH 4/7] proxy_pass: split out body and trailer reading in response
2016-05-16 1:43 [PATCH 0/7] proxy_pass cleanups Eric Wong
` (2 preceding siblings ...)
2016-05-16 1:43 ` [PATCH 3/7] proxy_pass: simplify proxy_http_response Eric Wong
@ 2016-05-16 1:43 ` Eric Wong
2016-05-16 1:43 ` [PATCH 5/7] proxy_pass: trim down proxy_response_finish, too Eric Wong
` (3 subsequent siblings)
7 siblings, 0 replies; 9+ messages in thread
From: Eric Wong @ 2016-05-16 1:43 UTC (permalink / raw)
To: yahns-public
Hopefully this increases readability as well and allows
us to make easier adjustments to support new features in
the future.
---
lib/yahns/proxy_http_response.rb | 105 +++++++++++++++++++++------------------
1 file changed, 58 insertions(+), 47 deletions(-)
diff --git a/lib/yahns/proxy_http_response.rb b/lib/yahns/proxy_http_response.rb
index 00915df..52a8aff 100644
--- a/lib/yahns/proxy_http_response.rb
+++ b/lib/yahns/proxy_http_response.rb
@@ -124,63 +124,74 @@ def proxy_res_headers(res)
[ alive, wbuf, have_body ]
end
+ def proxy_read_body(tip, kcar, req_res, alive, wbuf)
+ chunk = ''.dup if kcar.chunked?
+ len = kcar.body_bytes_left
+ rbuf = Thread.current[:yahns_rbuf]
+
+ case tmp = tip.shift || req_res.kgio_tryread(0x2000, rbuf)
+ when String
+ if len
+ kcar.body_bytes_left -= tmp.size # progress for body_eof? => true
+ elsif chunk
+ kcar.filter_body(chunk, rbuf = tmp) # progress for body_eof? => true
+ next if chunk.empty? # call req_res.kgio_tryread for more
+ tmp = chunk_out(chunk)
+ elsif alive # HTTP/1.0 upstream, HTTP/1.1 client
+ tmp = chunk_out(tmp)
+ # else # HTTP/1.0 upstream, HTTP/1.0 client, do nothing
+ end
+ wbuf = proxy_write(wbuf, tmp, alive)
+ chunk.clear if chunk
+ when nil # EOF
+ # HTTP/1.1 upstream, unexpected premature EOF:
+ return proxy_err_response(nil, req_res, nil, wbuf) if len || chunk
+
+ # HTTP/1.0 upstream:
+ wbuf = proxy_write(wbuf, "0\r\n\r\n".freeze, true) if alive
+ req_res.shutdown
+ break
+ when :wait_readable
+ return wait_on_upstream(req_res, alive, wbuf)
+ end until kcar.body_eof?
+
+ if chunk
+ # tip is an empty array and becomes trailer storage
+ req_res.proxy_trailers = [ rbuf.dup, tip ]
+ return proxy_read_trailers(kcar, req_res, alive, wbuf)
+ end
+ wbuf ? proxy_busy_mod_blocked(wbuf, wbuf.busy) : proxy_busy_mod_done(alive)
+ end
+
+ def proxy_read_trailers(kcar, req_res, alive, wbuf)
+ chunk, tlr = req_res.proxy_trailers
+ rbuf = Thread.current[:yahns_rbuf]
+
+ until kcar.trailers(tlr, chunk)
+ case rv = req_res.kgio_tryread(0x2000, rbuf)
+ when String
+ chunk << rv
+ when :wait_readable
+ return wait_on_upstream(req_res, alive, wbuf)
+ when nil # premature EOF
+ return proxy_err_response(nil, req_res, nil, wbuf)
+ end # no loop here
+ end
+ wbuf = proxy_write(wbuf, trailer_out(tlr), alive)
+ wbuf ? proxy_busy_mod_blocked(wbuf, wbuf.busy) : proxy_busy_mod_done(alive)
+ end
+
# start streaming the response once upstream is done sending headers to us.
# returns :wait_readable if we need to read more from req_res
# returns :ignore if we yield control to the client(self)
# returns nil if completely done
def proxy_response_start(res, tip, kcar, req_res)
alive, wbuf, have_body = proxy_res_headers(res)
- rbuf = Thread.current[:yahns_rbuf]
tip = tip.empty? ? [] : [ tip ]
if have_body
req_res.proxy_trailers = nil # define to avoid uninitialized warnings
- chunk = ''.dup if kcar.chunked?
- tlr = nil
- len = kcar.body_bytes_left
-
- case tmp = tip.shift || req_res.kgio_tryread(0x2000, rbuf)
- when String
- if len
- kcar.body_bytes_left -= tmp.size # progress for body_eof? => true
- elsif chunk
- kcar.filter_body(chunk, rbuf = tmp) # progress for body_eof? => true
- next if chunk.empty? # read req_res.kgio_tryread for more
- tmp = chunk_out(chunk)
- elsif alive # HTTP/1.0 upstream, HTTP/1.1 client
- tmp = chunk_out(tmp)
- # else # HTTP/1.0 upstream, HTTP/1.0 client, do nothing
- end
- wbuf = proxy_write(wbuf, tmp, alive)
- chunk.clear if chunk
- when nil # EOF
- # HTTP/1.1 upstream, unexpected premature EOF:
- return proxy_err_response(nil, req_res, nil, wbuf) if len || chunk
-
- # HTTP/1.0 upstream:
- wbuf = proxy_write(wbuf, "0\r\n\r\n".freeze, true) if alive
- req_res.shutdown
- break
- when :wait_readable
- return wait_on_upstream(req_res, alive, wbuf)
- end until kcar.body_eof?
-
- if chunk
- chunk = rbuf
- req_res.proxy_trailers = [ chunk, tlr = [] ]
- rbuf = Thread.current[:yahns_rbuf] = ''.dup
- until kcar.trailers(tlr, chunk)
- case rv = req_res.kgio_tryread(0x2000, rbuf)
- when String
- chunk << rv
- when :wait_readable
- return wait_on_upstream(req_res, alive, wbuf)
- when nil # premature EOF
- return proxy_err_response(nil, req_res, nil, wbuf)
- end # no loop here
- end
- wbuf = proxy_write(wbuf, trailer_out(tlr), alive)
- end
+ return proxy_read_body(tip, kcar, req_res, alive, wbuf)
end
# all done reading response from upstream, req_res will be discarded
^ permalink raw reply related [flat|nested] 9+ messages in thread
* [PATCH 5/7] proxy_pass: trim down proxy_response_finish, too
2016-05-16 1:43 [PATCH 0/7] proxy_pass cleanups Eric Wong
` (3 preceding siblings ...)
2016-05-16 1:43 ` [PATCH 4/7] proxy_pass: split out body and trailer reading in response Eric Wong
@ 2016-05-16 1:43 ` Eric Wong
2016-05-16 1:43 ` [PATCH 6/7] proxy_pass: split out req_res into a separate file Eric Wong
` (2 subsequent siblings)
7 siblings, 0 replies; 9+ messages in thread
From: Eric Wong @ 2016-05-16 1:43 UTC (permalink / raw)
To: yahns-public
After the previous commits, we've added more branches
and made the existing response handling more generic;
so we can remove some duplicated logic and increase
review-ability.
---
lib/yahns/proxy_http_response.rb | 66 ++--------------------------------------
1 file changed, 3 insertions(+), 63 deletions(-)
diff --git a/lib/yahns/proxy_http_response.rb b/lib/yahns/proxy_http_response.rb
index 52a8aff..65fd03b 100644
--- a/lib/yahns/proxy_http_response.rb
+++ b/lib/yahns/proxy_http_response.rb
@@ -202,69 +202,9 @@ def proxy_response_start(res, tip, kcar, req_res)
end
def proxy_response_finish(kcar, wbuf, req_res)
- rbuf = Thread.current[:yahns_rbuf]
- if len = kcar.body_bytes_left # known Content-Length
-
- case tmp = req_res.kgio_tryread(0x2000, rbuf)
- when String
- len = kcar.body_bytes_left -= tmp.size
- wbuf.wbuf_write(self, tmp)
- when nil # premature EOF
- return proxy_err_response(nil, req_res, nil, wbuf)
- when :wait_readable
- return :wait_readable # self remains in :ignore, wait on upstream
- end while len != 0
-
- elsif kcar.chunked? # nasty chunked response body
- buf = ''.dup
-
- unless req_res.proxy_trailers
- # are we done dechunking the main body, yet?
- case tmp = req_res.kgio_tryread(0x2000, rbuf)
- when String
- kcar.filter_body(buf, tmp)
- buf.empty? or wbuf.wbuf_write(self, chunk_out(buf))
- when nil # premature EOF
- return proxy_err_response(nil, req_res, nil, wbuf)
- when :wait_readable
- return :wait_readable # self remains in :ignore, wait on upstream
- end until kcar.body_eof?
- req_res.proxy_trailers = [ tmp, [] ] # onto trailers!
- rbuf = Thread.current[:yahns_rbuf] = ''.dup
- end
-
- buf, tlr = *req_res.proxy_trailers
- until kcar.trailers(tlr, buf)
- case rv = req_res.kgio_tryread(0x2000, rbuf)
- when String
- buf << rv
- when :wait_readable
- return :wait_readable
- when nil # premature EOF
- return proxy_err_response(nil, req_res, nil, wbuf)
- end # no loop here
- end
- wbuf.wbuf_write(self, trailer_out(tlr))
-
- else # no Content-Length or Transfer-Encoding: chunked, wait on EOF!
-
- alive = wbuf.wbuf_persist
- case tmp = req_res.kgio_tryread(0x2000, rbuf)
- when String
- tmp = chunk_out(tmp) if alive
- wbuf.wbuf_write(self, tmp)
- when nil
- wbuf.wbuf_write(self, "0\r\n\r\n".freeze) if alive
- req_res.shutdown
- break
- when :wait_readable
- return :wait_readable # self remains in :ignore, wait on upstream
- end while true
-
- end
-
- busy = wbuf.busy and return proxy_busy_mod_blocked(wbuf, busy)
- proxy_busy_mod_done(wbuf.wbuf_persist) # returns nil to close req_res
+ alive = wbuf.wbuf_persist
+ req_res.proxy_trailers ? proxy_read_trailers(kcar, req_res, alive, wbuf)
+ : proxy_read_body([], kcar, req_res, alive, wbuf)
end
def proxy_wait_next(qflags)
^ permalink raw reply related [flat|nested] 9+ messages in thread
* [PATCH 6/7] proxy_pass: split out req_res into a separate file
2016-05-16 1:43 [PATCH 0/7] proxy_pass cleanups Eric Wong
` (4 preceding siblings ...)
2016-05-16 1:43 ` [PATCH 5/7] proxy_pass: trim down proxy_response_finish, too Eric Wong
@ 2016-05-16 1:43 ` Eric Wong
2016-05-16 1:43 ` [PATCH 7/7] proxy_pass: fix resumes after complete buffering is unblocked Eric Wong
2016-05-16 2:05 ` false-positive spam [Re: [PATCH 0/7] proxy_pass cleanups] Eric Wong
7 siblings, 0 replies; 9+ messages in thread
From: Eric Wong @ 2016-05-16 1:43 UTC (permalink / raw)
To: yahns-public
This makes the ReqRes class easier-to-find and hopefully
maintain when using with other parts of yahns, although there
may be no reason to use this class outside of ProxyPass.
---
lib/yahns/proxy_pass.rb | 157 +----------------------------------------------
lib/yahns/req_res.rb | 159 ++++++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 161 insertions(+), 155 deletions(-)
create mode 100644 lib/yahns/req_res.rb
diff --git a/lib/yahns/proxy_pass.rb b/lib/yahns/proxy_pass.rb
index a2d7d81..8e0b742 100644
--- a/lib/yahns/proxy_pass.rb
+++ b/lib/yahns/proxy_pass.rb
@@ -3,166 +3,13 @@
# License: GPL-3.0+ (https://www.gnu.org/licenses/gpl-3.0.txt)
# frozen_string_literal: true
require 'socket'
-require 'kgio'
-require 'kcar' # gem install kcar
require 'rack/request'
require 'timeout'
require_relative 'proxy_http_response'
+require_relative 'req_res'
class Yahns::ProxyPass # :nodoc:
- class ReqRes < Kgio::Socket # :nodoc:
- attr_writer :resbuf
- attr_accessor :proxy_trailers
-
- def req_start(c, req, input, chunked)
- @hdr = @resbuf = nil
- @yahns_client = c
- @rrstate = input ? [ req, input, chunked ] : req
- Thread.current[:yahns_queue].queue_add(self, Yahns::Queue::QEV_WR)
- end
-
- def yahns_step # yahns event loop entry point
- c = @yahns_client
- case req = @rrstate
- when Kcar::Parser # reading response...
- buf = Thread.current[:yahns_rbuf]
-
- case resbuf = @resbuf # where are we at the response?
- when nil # common case, catch the response header in a single read
-
- case rv = kgio_tryread(0x2000, buf)
- when String
- if res = req.headers(@hdr = [], rv)
- return c.proxy_response_start(res, rv, req, self)
- else # ugh, big headers or tricked response
- # we must reinitialize the thread-local rbuf if it may
- # live beyond the current thread
- buf = Thread.current[:yahns_rbuf] = ''.dup
- @resbuf = rv
- end
- # continue looping in middle "case @resbuf" loop
- when :wait_readable
- return rv # spurious wakeup
- when nil then return c.proxy_err_response(502, self, nil, nil)
- end # NOT looping here
-
- when String # continue reading trickled response headers from upstream
-
- case rv = kgio_tryread(0x2000, buf)
- when String then res = req.headers(@hdr, resbuf << rv) and break
- when :wait_readable then return rv
- when nil then return c.proxy_err_response(502, self, nil, nil)
- end while true
-
- return c.proxy_response_start(res, resbuf, req, self)
-
- when Yahns::WbufCommon # streaming/buffering the response body
-
- # we assign wbuf for rescue below:
- return c.proxy_response_finish(req, wbuf = resbuf, self)
-
- end while true # case @resbuf
-
- when Array # [ (str|vec), rack.input, chunked? ]
- send_req_body(req) # returns nil or :wait_writable
- when String # buffered request header
- send_req_buf(req)
- end
- rescue => e
- # avoid polluting logs with a giant backtrace when the problem isn't
- # fixable in code.
- case e
- when Errno::ECONNREFUSED, Errno::ECONNRESET, Errno::EPIPE
- e.set_backtrace([])
- end
- c.proxy_err_response(502, self, e, wbuf)
- end
-
- def send_req_body_chunk(buf)
- case rv = String === buf ? kgio_trywrite(buf) : kgio_trywritev(buf)
- when String, Array
- buf.replace(rv) # retry loop on partial write
- when :wait_writable, nil
- # :wait_writable = upstream is reading slowly and making us wait
- return rv
- else
- abort "BUG: #{rv.inspect} from kgio_trywrite*"
- end while true
- end
-
- # returns :wait_readable if complete, :wait_writable if not
- def send_req_body(req) # @rrstate == [ (str|vec), rack.input, chunked? ]
- buf, input, chunked = req
-
- # send the first buffered chunk or vector
- rv = send_req_body_chunk(buf) and return rv # :wait_writable
-
- # yay, sent the first chunk, now read the body!
- rbuf = buf
- if chunked
- if String === buf # initial body
- req[0] = buf = []
- else
- # try to reuse the biggest non-frozen buffer we just wrote;
- rbuf = buf.max_by(&:size)
- rbuf = ''.dup if rbuf.frozen? # unlikely...
- end
- end
-
- # Note: input (env['rack.input']) is fully-buffered by default so
- # we should not be waiting on a slow network resource when reading
- # input. However, some weird configs may disable this on LANs
- # and we may wait indefinitely on input.read here...
- while input.read(0x2000, rbuf)
- if chunked
- buf[0] = "#{rbuf.size.to_s(16)}\r\n".freeze
- buf[1] = rbuf
- buf[2] = "\r\n".freeze
- end
- rv = send_req_body_chunk(buf) and return rv # :wait_writable
- end
-
- rbuf.clear # all done, clear the big buffer
-
- # we cannot use respond_to?(:close) here since Rack::Lint::InputWrapper
- # tries to prevent that (and hijack means all Rack specs go out the door)
- case input
- when Yahns::TeeInput, IO
- input.close
- end
-
- # note: we do not send any trailer, they are folded into the header
- # because this relies on full request buffering
- # prepare_wait_readable is called by send_req_buf
- chunked ? send_req_buf("0\r\n\r\n".freeze) : prepare_wait_readable
- rescue Errno::EPIPE, Errno::ECONNRESET, Errno::ENOTCONN
- # no more reading off the client socket, just prepare to forward
- # the rejection response from the upstream (if any)
- @yahns_client.to_io.shutdown(Socket::SHUT_RD)
- prepare_wait_readable
- end
-
- def prepare_wait_readable
- @rrstate = Kcar::Parser.new
- :wait_readable # all done sending the request, wait for response
- end
-
- # n.b. buf must be a detached string not shared with
- # Thread.current[:yahns_rbuf] of any thread
- def send_req_buf(buf)
- case rv = kgio_trywrite(buf)
- when String
- buf = rv # retry inner loop
- when :wait_writable
- @rrstate = buf
- return :wait_writable
- when nil
- return prepare_wait_readable
- end while true
- end
- end # class ReqRes
-
def initialize(dest, opts = {})
case dest
when %r{\Aunix:([^:]+)(?::(/.*))?\z}
@@ -199,7 +46,7 @@ def init_path_vars(path)
def call(env)
# 3-way handshake for TCP backends while we generate the request header
- rr = ReqRes.start(@sockaddr)
+ rr = Yahns::ReqRes.start(@sockaddr)
c = env['rack.hijack'].call
req = Rack::Request.new(env)
diff --git a/lib/yahns/req_res.rb b/lib/yahns/req_res.rb
new file mode 100644
index 0000000..3b0d298
--- /dev/null
+++ b/lib/yahns/req_res.rb
@@ -0,0 +1,159 @@
+# -*- encoding: binary -*-
+# Copyright (C) 2013-2016 all contributors <yahns-public@yhbt.net>
+# License: GPL-3.0+ (https://www.gnu.org/licenses/gpl-3.0.txt)
+# frozen_string_literal: true
+# Only used by Yahns::ProxyPass
+require 'kcar' # gem install kcar
+require 'kgio'
+
+class Yahns::ReqRes < Kgio::Socket # :nodoc:
+ attr_writer :resbuf
+ attr_accessor :proxy_trailers
+
+ def req_start(c, req, input, chunked)
+ @hdr = @resbuf = nil
+ @yahns_client = c
+ @rrstate = input ? [ req, input, chunked ] : req
+ Thread.current[:yahns_queue].queue_add(self, Yahns::Queue::QEV_WR)
+ end
+
+ def yahns_step # yahns event loop entry point
+ c = @yahns_client
+ case req = @rrstate
+ when Kcar::Parser # reading response...
+ buf = Thread.current[:yahns_rbuf]
+
+ case resbuf = @resbuf # where are we at the response?
+ when nil # common case, catch the response header in a single read
+
+ case rv = kgio_tryread(0x2000, buf)
+ when String
+ if res = req.headers(@hdr = [], rv)
+ return c.proxy_response_start(res, rv, req, self)
+ else # ugh, big headers or tricked response
+ # we must reinitialize the thread-local rbuf if it may
+ # live beyond the current thread
+ buf = Thread.current[:yahns_rbuf] = ''.dup
+ @resbuf = rv
+ end
+ # continue looping in middle "case @resbuf" loop
+ when :wait_readable
+ return rv # spurious wakeup
+ when nil then return c.proxy_err_response(502, self, nil, nil)
+ end # NOT looping here
+
+ when String # continue reading trickled response headers from upstream
+
+ case rv = kgio_tryread(0x2000, buf)
+ when String then res = req.headers(@hdr, resbuf << rv) and break
+ when :wait_readable then return rv
+ when nil then return c.proxy_err_response(502, self, nil, nil)
+ end while true
+
+ return c.proxy_response_start(res, resbuf, req, self)
+
+ when Yahns::WbufCommon # streaming/buffering the response body
+
+ # we assign wbuf for rescue below:
+ return c.proxy_response_finish(req, wbuf = resbuf, self)
+
+ end while true # case @resbuf
+
+ when Array # [ (str|vec), rack.input, chunked? ]
+ send_req_body(req) # returns nil or :wait_writable
+ when String # buffered request header
+ send_req_buf(req)
+ end
+ rescue => e
+ # avoid polluting logs with a giant backtrace when the problem isn't
+ # fixable in code.
+ case e
+ when Errno::ECONNREFUSED, Errno::ECONNRESET, Errno::EPIPE
+ e.set_backtrace([])
+ end
+ c.proxy_err_response(502, self, e, wbuf)
+ end
+
+ def send_req_body_chunk(buf)
+ case rv = String === buf ? kgio_trywrite(buf) : kgio_trywritev(buf)
+ when String, Array
+ buf.replace(rv) # retry loop on partial write
+ when :wait_writable, nil
+ # :wait_writable = upstream is reading slowly and making us wait
+ return rv
+ else
+ abort "BUG: #{rv.inspect} from kgio_trywrite*"
+ end while true
+ end
+
+ # returns :wait_readable if complete, :wait_writable if not
+ def send_req_body(req) # @rrstate == [ (str|vec), rack.input, chunked? ]
+ buf, input, chunked = req
+
+ # send the first buffered chunk or vector
+ rv = send_req_body_chunk(buf) and return rv # :wait_writable
+
+ # yay, sent the first chunk, now read the body!
+ rbuf = buf
+ if chunked
+ if String === buf # initial body
+ req[0] = buf = []
+ else
+ # try to reuse the biggest non-frozen buffer we just wrote;
+ rbuf = buf.max_by(&:size)
+ rbuf = ''.dup if rbuf.frozen? # unlikely...
+ end
+ end
+
+ # Note: input (env['rack.input']) is fully-buffered by default so
+ # we should not be waiting on a slow network resource when reading
+ # input. However, some weird configs may disable this on LANs
+ # and we may wait indefinitely on input.read here...
+ while input.read(0x2000, rbuf)
+ if chunked
+ buf[0] = "#{rbuf.size.to_s(16)}\r\n".freeze
+ buf[1] = rbuf
+ buf[2] = "\r\n".freeze
+ end
+ rv = send_req_body_chunk(buf) and return rv # :wait_writable
+ end
+
+ rbuf.clear # all done, clear the big buffer
+
+ # we cannot use respond_to?(:close) here since Rack::Lint::InputWrapper
+ # tries to prevent that (and hijack means all Rack specs go out the door)
+ case input
+ when Yahns::TeeInput, IO
+ input.close
+ end
+
+ # note: we do not send any trailer, they are folded into the header
+ # because this relies on full request buffering
+ # prepare_wait_readable is called by send_req_buf
+ chunked ? send_req_buf("0\r\n\r\n".freeze) : prepare_wait_readable
+ rescue Errno::EPIPE, Errno::ECONNRESET, Errno::ENOTCONN
+ # no more reading off the client socket, just prepare to forward
+ # the rejection response from the upstream (if any)
+ @yahns_client.to_io.shutdown(Socket::SHUT_RD)
+ prepare_wait_readable
+ end
+
+ def prepare_wait_readable
+ @rrstate = Kcar::Parser.new
+ :wait_readable # all done sending the request, wait for response
+ end
+
+ # n.b. buf must be a detached string not shared with
+ # Thread.current[:yahns_rbuf] of any thread
+ def send_req_buf(buf)
+ case rv = kgio_trywrite(buf)
+ when String
+ buf = rv # retry inner loop
+ when :wait_writable
+ @rrstate = buf
+ return :wait_writable
+ when nil
+ return prepare_wait_readable
+ end while true
+ end
+end # class ReqRes
^ permalink raw reply related [flat|nested] 9+ messages in thread
* [PATCH 7/7] proxy_pass: fix resumes after complete buffering is unblocked
2016-05-16 1:43 [PATCH 0/7] proxy_pass cleanups Eric Wong
` (5 preceding siblings ...)
2016-05-16 1:43 ` [PATCH 6/7] proxy_pass: split out req_res into a separate file Eric Wong
@ 2016-05-16 1:43 ` Eric Wong
2016-05-16 2:05 ` false-positive spam [Re: [PATCH 0/7] proxy_pass cleanups] Eric Wong
7 siblings, 0 replies; 9+ messages in thread
From: Eric Wong @ 2016-05-16 1:43 UTC (permalink / raw)
To: yahns-public
This fixes a bug where a cleared wbuf would kill the
process after the response got flushed out to the client
as `wbuf.busy' becomes `false'.
This fixes a regression introduced in
"proxy_pass: trim down proxy_response_finish, too"
which was never in a stable release
---
lib/yahns/proxy_http_response.rb | 51 ++++++++++++++++++++--------------------
1 file changed, 25 insertions(+), 26 deletions(-)
diff --git a/lib/yahns/proxy_http_response.rb b/lib/yahns/proxy_http_response.rb
index 65fd03b..284a3c6 100644
--- a/lib/yahns/proxy_http_response.rb
+++ b/lib/yahns/proxy_http_response.rb
@@ -160,7 +160,7 @@ def proxy_read_body(tip, kcar, req_res, alive, wbuf)
req_res.proxy_trailers = [ rbuf.dup, tip ]
return proxy_read_trailers(kcar, req_res, alive, wbuf)
end
- wbuf ? proxy_busy_mod_blocked(wbuf, wbuf.busy) : proxy_busy_mod_done(alive)
+ proxy_busy_mod(wbuf, alive)
end
def proxy_read_trailers(kcar, req_res, alive, wbuf)
@@ -178,7 +178,7 @@ def proxy_read_trailers(kcar, req_res, alive, wbuf)
end # no loop here
end
wbuf = proxy_write(wbuf, trailer_out(tlr), alive)
- wbuf ? proxy_busy_mod_blocked(wbuf, wbuf.busy) : proxy_busy_mod_done(alive)
+ proxy_busy_mod(wbuf, alive)
end
# start streaming the response once upstream is done sending headers to us.
@@ -196,7 +196,7 @@ def proxy_response_start(res, tip, kcar, req_res)
# all done reading response from upstream, req_res will be discarded
# when we return nil:
- wbuf ? proxy_busy_mod_blocked(wbuf, wbuf.busy) : proxy_busy_mod_done(alive)
+ proxy_busy_mod(wbuf, alive)
rescue => e
proxy_err_response(502, req_res, e, wbuf)
end
@@ -218,7 +218,7 @@ def proxy_wait_next(qflags)
# <thread is scheduled away> | epoll_wait readiness
# | ReqRes#yahns_step
# | proxy dispatch ...
- # | proxy_busy_mod_done
+ # | proxy_busy_mod
# ************************** DANGER BELOW ********************************
# | HttpClient#yahns_step
# | # clears env
@@ -238,29 +238,28 @@ def proxy_wait_next(qflags)
Thread.current[:yahns_queue].queue_mod(self, qflags)
end
- def proxy_busy_mod_done(alive)
- case http_response_done(alive)
- when :wait_readable then proxy_wait_next(Yahns::Queue::QEV_RD)
- when :wait_writable then proxy_wait_next(Yahns::Queue::QEV_WR)
- when :close then close
+ def proxy_busy_mod(wbuf, alive)
+ busy = wbuf.busy if wbuf
+ if busy
+ # we are completely done reading and buffering the upstream response,
+ # but have not completely written the response to the client,
+ # yield control to the client socket:
+ @state = wbuf
+ proxy_wait_next(case busy
+ when :wait_readable then Yahns::Queue::QEV_RD
+ when :wait_writable then Yahns::Queue::QEV_WR
+ else
+ raise "BUG: invalid wbuf.busy: #{busy.inspect}"
+ end)
+ # no touching self after proxy_wait_next, we may be running
+ # HttpClient#yahns_step in a different thread at this point
+ else
+ case http_response_done(alive)
+ when :wait_readable then proxy_wait_next(Yahns::Queue::QEV_RD)
+ when :wait_writable then proxy_wait_next(Yahns::Queue::QEV_WR)
+ when :close then close
+ end
end
-
- nil # signal close for ReqRes#yahns_step
- end
-
- def proxy_busy_mod_blocked(wbuf, busy)
- # we are completely done reading and buffering the upstream response,
- # but have not completely written the response to the client,
- # yield control to the client socket:
- @state = wbuf
- proxy_wait_next(case busy
- when :wait_readable then Yahns::Queue::QEV_RD
- when :wait_writable then Yahns::Queue::QEV_WR
- else
- abort "BUG: invalid wbuf.busy: #{busy.inspect}"
- end)
- # no touching self after proxy_wait_next, we may be running
- # HttpClient#yahns_step in a different thread at this point
nil # signal close for ReqRes#yahns_step
end
^ permalink raw reply related [flat|nested] 9+ messages in thread
* false-positive spam [Re: [PATCH 0/7] proxy_pass cleanups]
2016-05-16 1:43 [PATCH 0/7] proxy_pass cleanups Eric Wong
` (6 preceding siblings ...)
2016-05-16 1:43 ` [PATCH 7/7] proxy_pass: fix resumes after complete buffering is unblocked Eric Wong
@ 2016-05-16 2:05 ` Eric Wong
7 siblings, 0 replies; 9+ messages in thread
From: Eric Wong @ 2016-05-16 2:05 UTC (permalink / raw)
To: yahns-public
Apologies if you get duplicates, upgraded SpamAssassin and
wanted to make sure mail over Tor could still get through.
Bayesian can be pretty weak (even after training), so I guess
it totally depends which relays Tor lets you through.
I guess it's a luck-of-the-draw deal.
^ permalink raw reply [flat|nested] 9+ messages in thread
end of thread, other threads:[~2016-05-16 2:05 UTC | newest]
Thread overview: 9+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2016-05-16 1:43 [PATCH 0/7] proxy_pass cleanups Eric Wong
2016-05-16 1:43 ` [PATCH 1/7] proxy_pass: simplify writing request bodies upstream Eric Wong
2016-05-16 1:43 ` [PATCH 2/7] proxy_pass: hoist out proxy_res_headers method Eric Wong
2016-05-16 1:43 ` [PATCH 3/7] proxy_pass: simplify proxy_http_response Eric Wong
2016-05-16 1:43 ` [PATCH 4/7] proxy_pass: split out body and trailer reading in response Eric Wong
2016-05-16 1:43 ` [PATCH 5/7] proxy_pass: trim down proxy_response_finish, too Eric Wong
2016-05-16 1:43 ` [PATCH 6/7] proxy_pass: split out req_res into a separate file Eric Wong
2016-05-16 1:43 ` [PATCH 7/7] proxy_pass: fix resumes after complete buffering is unblocked Eric Wong
2016-05-16 2:05 ` false-positive spam [Re: [PATCH 0/7] proxy_pass cleanups] Eric Wong
Code repositories for project(s) associated with this public inbox
https://yhbt.net/yahns.git/
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).