diff options
author | Eric Wong <e@80x24.org> | 2016-06-02 00:52:13 +0000 |
---|---|---|
committer | Eric Wong <e@80x24.org> | 2016-06-03 00:12:12 +0000 |
commit | 50bd1a838eafa6cbca5e1b19fec1df4682da0bdb (patch) | |
tree | c39a08c710203920b541bf2bd8a8d0aee4d2c669 /lib/yahns/proxy_http_response.rb | |
parent | a3e3ab48a5a73e7d55bfcb271e5f2a1b666a4f8c (diff) | |
download | yahns-50bd1a838eafa6cbca5e1b19fec1df4682da0bdb.tar.gz |
This may be useful to avoid wasting resources when proxying for an upstream which can already handle slow clients itself. It is impossible to completely disable buffering, this merely prevents gigantic amounts of buffering. This may be useful when an upstream can generate a gigantic response which would cause excessive disk I/O traffic if buffered by yahns. An example of this would be an upstream dynamically-generating a pack for a giant git (clone|fetch) operation. In other words, this option allows the upstream to react to backpressure from slow clients. It is not recommended to enable this unless your upstream server is capable of supporting slow clients.
Diffstat (limited to 'lib/yahns/proxy_http_response.rb')
-rw-r--r-- | lib/yahns/proxy_http_response.rb | 85 |
1 files changed, 51 insertions, 34 deletions
diff --git a/lib/yahns/proxy_http_response.rb b/lib/yahns/proxy_http_response.rb index c7a9447..79b995a 100644 --- a/lib/yahns/proxy_http_response.rb +++ b/lib/yahns/proxy_http_response.rb @@ -7,9 +7,19 @@ # constants. module Yahns::HttpResponse # :nodoc: + # switch and yield + def proxy_unbuffer(wbuf) + wbuf.body.resbuf = @state = wbuf + tc = Thread.current + tc[:yahns_fdmap].remember(self) # Yahns::HttpClient + tc[:yahns_queue].queue_mod(self, wbuf.busy == :wait_writable ? + Yahns::Queue::QEV_WR : Yahns::Queue::QEV_RD) + :ignore + end + # write everything in buf to our client socket (or wbuf, if it exists) # it may return a newly-created wbuf or nil - def proxy_write(wbuf, buf, alive) + def proxy_write(wbuf, buf, req_res) unless wbuf # no write buffer, try to write directly to the client socket case rv = String === buf ? kgio_trywrite(buf) : kgio_trywritev(buf) @@ -17,8 +27,15 @@ module Yahns::HttpResponse # :nodoc: when String, Array # partial write, hope the skb grows buf = rv when :wait_writable, :wait_readable - wbuf = Yahns::Wbuf.new(nil, alive, self.class.output_buffer_tmpdir, rv) - buf = buf.join if Array === buf + if @hs.env['yahns.proxy_pass'].proxy_buffering + body = nil + alive = req_res.alive + else + req_res.paused = true + body = req_res + alive = :ignore + end + wbuf = Yahns::Wbuf.new(body, alive, self.class.output_buffer_tmpdir, rv) break end while true end @@ -54,14 +71,14 @@ module Yahns::HttpResponse # :nodoc: wbuf.wbuf_abort if wbuf end - def wait_on_upstream(req_res, alive, wbuf) - req_res.resbuf = wbuf || Yahns::Wbuf.new(nil, alive, + def wait_on_upstream(req_res, wbuf) + req_res.resbuf = wbuf || Yahns::Wbuf.new(nil, req_res.alive, self.class.output_buffer_tmpdir, false) :wait_readable # self remains in :ignore, wait on upstream end - def proxy_res_headers(res) + def proxy_res_headers(res, req_res) status, headers = res code = status.to_i msg = Rack::Utils::HTTP_STATUS_CODES[code] @@ -118,16 +135,18 @@ module Yahns::HttpResponse # :nodoc: flags = MSG_DONTWAIT res = rv # hope the skb grows when :wait_writable, :wait_readable # highly unlikely in real apps - wbuf = proxy_write(nil, res, alive) + wbuf = proxy_write(nil, res, req_res) break # keep buffering as much as possible end while true - [ alive, wbuf, have_body ] + req_res.alive = alive + [ wbuf, have_body ] end - def proxy_read_body(tip, kcar, req_res, alive, wbuf) + def proxy_read_body(tip, kcar, req_res, wbuf) chunk = ''.dup if kcar.chunked? len = kcar.body_bytes_left rbuf = Thread.current[:yahns_rbuf] + alive = req_res.alive case tmp = tip.shift || req_res.kgio_tryread(0x2000, rbuf) when String @@ -141,29 +160,31 @@ module Yahns::HttpResponse # :nodoc: tmp = chunk_out(tmp) # else # HTTP/1.0 upstream, HTTP/1.0 client, do nothing end - wbuf = proxy_write(wbuf, tmp, alive) + wbuf = proxy_write(wbuf, tmp, req_res) + return proxy_unbuffer(wbuf) if wbuf && wbuf.body chunk.clear if chunk when nil # EOF # HTTP/1.1 upstream, unexpected premature EOF: return proxy_err_response(nil, req_res, nil, wbuf) if len || chunk # HTTP/1.0 upstream: - wbuf = proxy_write(wbuf, "0\r\n\r\n".freeze, true) if alive + wbuf = proxy_write(wbuf, "0\r\n\r\n".freeze, req_res) if alive + return proxy_unbuffer(wbuf) if wbuf && wbuf.body req_res.shutdown break when :wait_readable - return wait_on_upstream(req_res, alive, wbuf) + return wait_on_upstream(req_res, wbuf) end until kcar.body_eof? if chunk # tip is an empty array and becomes trailer storage req_res.proxy_trailers = [ rbuf.dup, tip ] - return proxy_read_trailers(kcar, req_res, alive, wbuf) + return proxy_read_trailers(kcar, req_res, wbuf) end - proxy_busy_mod(wbuf, alive) + proxy_busy_mod(wbuf, req_res) end - def proxy_read_trailers(kcar, req_res, alive, wbuf) + def proxy_read_trailers(kcar, req_res, wbuf) chunk, tlr = req_res.proxy_trailers rbuf = Thread.current[:yahns_rbuf] @@ -172,13 +193,14 @@ module Yahns::HttpResponse # :nodoc: when String chunk << rv when :wait_readable - return wait_on_upstream(req_res, alive, wbuf) + return wait_on_upstream(req_res, wbuf) when nil # premature EOF return proxy_err_response(nil, req_res, nil, wbuf) end # no loop here end - wbuf = proxy_write(wbuf, trailer_out(tlr), alive) - proxy_busy_mod(wbuf, alive) + wbuf = proxy_write(wbuf, trailer_out(tlr), req_res) + return proxy_unbuffer(wbuf) if wbuf && wbuf.body + proxy_busy_mod(wbuf, req_res) end # start streaming the response once upstream is done sending headers to us. @@ -186,25 +208,25 @@ module Yahns::HttpResponse # :nodoc: # returns :ignore if we yield control to the client(self) # returns nil if completely done def proxy_response_start(res, tip, kcar, req_res) - alive, wbuf, have_body = proxy_res_headers(res) + wbuf, have_body = proxy_res_headers(res, req_res) tip = tip.empty? ? [] : [ tip ] if have_body req_res.proxy_trailers = nil # define to avoid uninitialized warnings - return proxy_read_body(tip, kcar, req_res, alive, wbuf) + return proxy_read_body(tip, kcar, req_res, wbuf) end + return proxy_unbuffer(wbuf) if wbuf && wbuf.body # all done reading response from upstream, req_res will be discarded # when we return nil: - proxy_busy_mod(wbuf, alive) + proxy_busy_mod(wbuf, req_res) rescue => e proxy_err_response(502, req_res, e, wbuf) end def proxy_response_finish(kcar, wbuf, req_res) - alive = wbuf.wbuf_persist - req_res.proxy_trailers ? proxy_read_trailers(kcar, req_res, alive, wbuf) - : proxy_read_body([], kcar, req_res, alive, wbuf) + req_res.proxy_trailers ? proxy_read_trailers(kcar, req_res, wbuf) + : proxy_read_body([], kcar, req_res, wbuf) end def proxy_wait_next(qflags) @@ -238,23 +260,18 @@ module Yahns::HttpResponse # :nodoc: Thread.current[:yahns_queue].queue_mod(self, qflags) end - def proxy_busy_mod(wbuf, alive) - busy = wbuf.busy if wbuf - if busy + def proxy_busy_mod(wbuf, req_res) + if wbuf # we are completely done reading and buffering the upstream response, # but have not completely written the response to the client, # yield control to the client socket: @state = wbuf - proxy_wait_next(case busy - when :wait_readable then Yahns::Queue::QEV_RD - when :wait_writable then Yahns::Queue::QEV_WR - else - raise "BUG: invalid wbuf.busy: #{busy.inspect}" - end) + proxy_wait_next(wbuf.busy == :wait_readable ? Yahns::Queue::QEV_RD : + Yahns::Queue::QEV_WR) # no touching self after proxy_wait_next, we may be running # HttpClient#yahns_step in a different thread at this point else - case http_response_done(alive) + case http_response_done(req_res.alive) when :wait_readable then proxy_wait_next(Yahns::Queue::QEV_RD) when :wait_writable then proxy_wait_next(Yahns::Queue::QEV_WR) when :close then close |