From ff5201fb34b6ceb83cf0f795ab9639d5c0089695 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Tue, 3 May 2016 22:31:13 +0000 Subject: proxy_pass: simplify writing request bodies upstream The cost of extra branches inside a loop is negligible compared to the cost of all the other method calls we make. Favor smaller code instead and inline some (now) single-use methods. Furthermore, this allows us to reuse the request header buffer instead of relying on thread-local storage and potentially having to to swap buffers. --- lib/yahns/proxy_pass.rb | 112 +++++++++++++++++++++--------------------------- 1 file changed, 49 insertions(+), 63 deletions(-) (limited to 'lib') diff --git a/lib/yahns/proxy_pass.rb b/lib/yahns/proxy_pass.rb index 148957b..a2d7d81 100644 --- a/lib/yahns/proxy_pass.rb +++ b/lib/yahns/proxy_pass.rb @@ -22,12 +22,6 @@ class Yahns::ProxyPass # :nodoc: Thread.current[:yahns_queue].queue_add(self, Yahns::Queue::QEV_WR) end - # we must reinitialize the thread-local rbuf if it may get beyond the - # current thread - def detach_rbuf! - Thread.current[:yahns_rbuf] = ''.dup - end - def yahns_step # yahns event loop entry point c = @yahns_client case req = @rrstate @@ -42,7 +36,9 @@ class Yahns::ProxyPass # :nodoc: if res = req.headers(@hdr = [], rv) return c.proxy_response_start(res, rv, req, self) else # ugh, big headers or tricked response - buf = detach_rbuf! + # we must reinitialize the thread-local rbuf if it may + # live beyond the current thread + buf = Thread.current[:yahns_rbuf] = ''.dup @resbuf = rv end # continue looping in middle "case @resbuf" loop @@ -83,64 +79,63 @@ class Yahns::ProxyPass # :nodoc: c.proxy_err_response(502, self, e, wbuf) end - # returns :wait_readable if complete, :wait_writable if not - def send_req_body(req) - buf, input, chunked = req - - # get the first buffered chunk or vector + def send_req_body_chunk(buf) case rv = String === buf ? kgio_trywrite(buf) : kgio_trywritev(buf) when String, Array - buf = rv # retry inner loop - when :wait_writable - req[0] = buf - return :wait_writable - when nil - break # onto writing body + buf.replace(rv) # retry loop on partial write + when :wait_writable, nil + # :wait_writable = upstream is reading slowly and making us wait + return rv + else + abort "BUG: #{rv.inspect} from kgio_trywrite*" end while true + end - buf = Thread.current[:yahns_rbuf] + # returns :wait_readable if complete, :wait_writable if not + def send_req_body(req) # @rrstate == [ (str|vec), rack.input, chunked? ] + buf, input, chunked = req - # Note: input (env['rack.input']) is fully-buffered by default so - # we should not be waiting on a slow network resource when reading - # input. However, some weird configs may disable this on LANs + # send the first buffered chunk or vector + rv = send_req_body_chunk(buf) and return rv # :wait_writable + # yay, sent the first chunk, now read the body! + rbuf = buf if chunked - while input.read(0x2000, buf) - vec = [ "#{buf.size.to_s(16)}\r\n", buf, "\r\n".freeze ] - case rv = kgio_trywritev(vec) - when Array - vec = rv # partial write, retry in case loop - when :wait_writable - detach_rbuf! - req[0] = vec - return :wait_writable - when nil - break # continue onto reading next chunk - end while true + if String === buf # initial body + req[0] = buf = [] + else + # try to reuse the biggest non-frozen buffer we just wrote; + rbuf = buf.max_by(&:size) + rbuf = ''.dup if rbuf.frozen? # unlikely... end - close_req_body(input) - - # note: we do not send any trailer, they are folded into the header - # because this relies on full request buffering - send_req_buf("0\r\n\r\n".freeze) - # prepare_wait_readable already called by send_req_buf - else # identity request, easy: - while input.read(0x2000, buf) - case rv = kgio_trywrite(buf) - when String - buf = rv # partial write, retry in case loop - when :wait_writable - detach_rbuf! - req[0] = buf - return :wait_writable - when nil - break # continue onto reading next block - end while true + end + + # Note: input (env['rack.input']) is fully-buffered by default so + # we should not be waiting on a slow network resource when reading + # input. However, some weird configs may disable this on LANs + # and we may wait indefinitely on input.read here... + while input.read(0x2000, rbuf) + if chunked + buf[0] = "#{rbuf.size.to_s(16)}\r\n".freeze + buf[1] = rbuf + buf[2] = "\r\n".freeze end + rv = send_req_body_chunk(buf) and return rv # :wait_writable + end + + rbuf.clear # all done, clear the big buffer - close_req_body(input) - prepare_wait_readable + # we cannot use respond_to?(:close) here since Rack::Lint::InputWrapper + # tries to prevent that (and hijack means all Rack specs go out the door) + case input + when Yahns::TeeInput, IO + input.close end + + # note: we do not send any trailer, they are folded into the header + # because this relies on full request buffering + # prepare_wait_readable is called by send_req_buf + chunked ? send_req_buf("0\r\n\r\n".freeze) : prepare_wait_readable rescue Errno::EPIPE, Errno::ECONNRESET, Errno::ENOTCONN # no more reading off the client socket, just prepare to forward # the rejection response from the upstream (if any) @@ -153,15 +148,6 @@ class Yahns::ProxyPass # :nodoc: :wait_readable # all done sending the request, wait for response end - def close_req_body(input) - # we cannot use respond_to?(:close) here since Rack::Lint::InputWrapper - # tries to prevent that (and hijack means all Rack specs go out the door) - case input - when Yahns::TeeInput, IO - input.close - end - end - # n.b. buf must be a detached string not shared with # Thread.current[:yahns_rbuf] of any thread def send_req_buf(buf) -- cgit v1.2.3-24-ge0c7