From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=unavailable autolearn_force=no version=3.4.0 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 7402F1FEA4 for ; Sun, 5 Jun 2016 22:50:44 +0000 (UTC) From: Eric Wong To: yahns-public@yhbt.net Subject: [PATCH 2/3] proxy_pass: redo "proxy_buffering: false" Date: Sun, 5 Jun 2016 22:50:41 +0000 Message-Id: <20160605225042.7861-2-e@80x24.org> List-Id: Relying on @body.close in Yahns::WbufCommon#wbuf_close_common to resume reading the upstream response was too subtle and potentially racy. Instead use a new Yahns::WbufLite class which does exactly what we want for implementing this feature, and nothing more. --- lib/yahns/proxy_http_response.rb | 80 +++++++++++++++++++----------------- lib/yahns/req_res.rb | 25 +++-------- lib/yahns/wbuf.rb | 2 +- lib/yahns/wbuf_lite.rb | 50 ++++++++++++++++++++++ test/test_proxy_pass_no_buffering.rb | 15 ++++--- 5 files changed, 107 insertions(+), 65 deletions(-) create mode 100644 lib/yahns/wbuf_lite.rb diff --git a/lib/yahns/proxy_http_response.rb b/lib/yahns/proxy_http_response.rb index 765fe14..ea176d9 100644 --- a/lib/yahns/proxy_http_response.rb +++ b/lib/yahns/proxy_http_response.rb @@ -3,20 +3,30 @@ # License: GPL-3.0+ (https://www.gnu.org/licenses/gpl-3.0.txt) # frozen_string_literal: true +require_relative 'wbuf_lite' + # loaded by yahns/proxy_pass, this relies on Yahns::HttpResponse for # constants. module Yahns::HttpResponse # :nodoc: # switch and yield def proxy_unbuffer(wbuf) - wbuf.body.resbuf = @state = wbuf + @state = wbuf tc = Thread.current tc[:yahns_fdmap].remember(self) # Yahns::HttpClient - tc[:yahns_queue].queue_mod(self, wbuf.busy == :wait_writable ? - Yahns::Queue::QEV_WR : Yahns::Queue::QEV_RD) + tc[:yahns_queue].queue_mod(self, wbuf.busy == :wait_readable ? + Yahns::Queue::QEV_RD : Yahns::Queue::QEV_WR) :ignore end + def wbuf_alloc(req_res, busy) + if req_res.proxy_pass.proxy_buffering + Yahns::Wbuf.new(nil, req_res.alive, self.class.output_buffer_tmpdir, busy) + else + Yahns::WbufLite.new(req_res) + end + end + # write everything in buf to our client socket (or wbuf, if it exists) # it may return a newly-created wbuf or nil def proxy_write(wbuf, buf, req_res) @@ -27,15 +37,7 @@ def proxy_write(wbuf, buf, req_res) when String, Array # partial write, hope the skb grows buf = rv when :wait_writable, :wait_readable - if req_res.proxy_pass.proxy_buffering - body = nil - alive = req_res.alive - else - req_res.paused = true - body = req_res - alive = :ignore - end - wbuf = Yahns::Wbuf.new(body, alive, self.class.output_buffer_tmpdir, rv) + wbuf = req_res.resbuf ||= wbuf_alloc(req_res, rv) break end while true end @@ -44,7 +46,7 @@ def proxy_write(wbuf, buf, req_res) wbuf.busy ? wbuf : nil end - def proxy_err_response(code, req_res, exc, wbuf) + def proxy_err_response(code, req_res, exc) logger = @hs.env['rack.logger'] case exc when nil @@ -68,13 +70,12 @@ def proxy_err_response(code, req_res, exc, wbuf) nil # signal close of req_res from yahns_step in yahns/proxy_pass.rb ensure - wbuf.wbuf_abort if wbuf + wbuf = req_res.resbuf + wbuf.wbuf_abort if wbuf.respond_to?(:wbuf_abort) end - def wait_on_upstream(req_res, wbuf) - req_res.resbuf = wbuf || Yahns::Wbuf.new(nil, req_res.alive, - self.class.output_buffer_tmpdir, - false) + def wait_on_upstream(req_res) + req_res.resbuf ||= wbuf_alloc(req_res, false) :wait_readable # self remains in :ignore, wait on upstream end @@ -135,18 +136,19 @@ def proxy_res_headers(res, req_res) flags = MSG_DONTWAIT res = rv # hope the skb grows when :wait_writable, :wait_readable # highly unlikely in real apps - wbuf = proxy_write(nil, res, req_res) - break # keep buffering as much as possible + proxy_write(nil, res, req_res) + break # keep buffering body... end while true req_res.alive = alive - [ wbuf, have_body ] + have_body end - def proxy_read_body(tip, kcar, req_res, wbuf) + def proxy_read_body(tip, kcar, req_res) chunk = ''.dup if kcar.chunked? len = kcar.body_bytes_left rbuf = Thread.current[:yahns_rbuf] alive = req_res.alive + wbuf = req_res.resbuf case tmp = tip.shift || req_res.kgio_tryread(0x2000, rbuf) when String @@ -161,45 +163,46 @@ def proxy_read_body(tip, kcar, req_res, wbuf) # else # HTTP/1.0 upstream, HTTP/1.0 client, do nothing end wbuf = proxy_write(wbuf, tmp, req_res) - return proxy_unbuffer(wbuf) if wbuf && wbuf.body chunk.clear if chunk + return proxy_unbuffer(wbuf) if Yahns::WbufLite === wbuf when nil # EOF # HTTP/1.1 upstream, unexpected premature EOF: - return proxy_err_response(nil, req_res, nil, wbuf) if len || chunk + return proxy_err_response(nil, req_res, nil) if len || chunk # HTTP/1.0 upstream: wbuf = proxy_write(wbuf, "0\r\n\r\n".freeze, req_res) if alive - return proxy_unbuffer(wbuf) if wbuf && wbuf.body + return proxy_unbuffer(wbuf) if Yahns::WbufLite === wbuf req_res.shutdown break when :wait_readable - return wait_on_upstream(req_res, wbuf) + return wait_on_upstream(req_res) end until kcar.body_eof? if chunk # tip is an empty array and becomes trailer storage req_res.proxy_trailers = [ rbuf.dup, tip ] - return proxy_read_trailers(kcar, req_res, wbuf) + return proxy_read_trailers(kcar, req_res) end proxy_busy_mod(wbuf, req_res) end - def proxy_read_trailers(kcar, req_res, wbuf) + def proxy_read_trailers(kcar, req_res) chunk, tlr = req_res.proxy_trailers rbuf = Thread.current[:yahns_rbuf] + wbuf = req_res.resbuf until kcar.trailers(tlr, chunk) case rv = req_res.kgio_tryread(0x2000, rbuf) when String chunk << rv when :wait_readable - return wait_on_upstream(req_res, wbuf) + return wait_on_upstream(req_res) when nil # premature EOF - return proxy_err_response(nil, req_res, nil, wbuf) + return proxy_err_response(nil, req_res, nil) end # no loop here end wbuf = proxy_write(wbuf, trailer_out(tlr), req_res) - return proxy_unbuffer(wbuf) if wbuf && wbuf.body + return proxy_unbuffer(wbuf) if Yahns::WbufLite === wbuf proxy_busy_mod(wbuf, req_res) end @@ -208,23 +211,26 @@ def proxy_read_trailers(kcar, req_res, wbuf) # returns :ignore if we yield control to the client(self) # returns nil if completely done def proxy_response_start(res, tip, kcar, req_res) - wbuf, have_body = proxy_res_headers(res, req_res) + have_body = proxy_res_headers(res, req_res) tip = tip.empty? ? [] : [ tip ] if have_body req_res.proxy_trailers = nil # define to avoid uninitialized warnings - return proxy_read_body(tip, kcar, req_res, wbuf) + return proxy_read_body(tip, kcar, req_res) end - return proxy_unbuffer(wbuf) if wbuf && wbuf.body + + # unlikely + wbuf = req_res.resbuf + return proxy_unbuffer(wbuf) if Yahns::WbufLite === wbuf # all done reading response from upstream, req_res will be discarded # when we return nil: proxy_busy_mod(wbuf, req_res) end - def proxy_response_finish(kcar, wbuf, req_res) - req_res.proxy_trailers ? proxy_read_trailers(kcar, req_res, wbuf) - : proxy_read_body([], kcar, req_res, wbuf) + def proxy_response_finish(kcar, req_res) + req_res.proxy_trailers ? proxy_read_trailers(kcar, req_res) + : proxy_read_body([], kcar, req_res) end def proxy_wait_next(qflags) diff --git a/lib/yahns/req_res.rb b/lib/yahns/req_res.rb index f585bd9..9bb8f35 100644 --- a/lib/yahns/req_res.rb +++ b/lib/yahns/req_res.rb @@ -7,8 +7,7 @@ require 'kgio' class Yahns::ReqRes < Kgio::Socket # :nodoc: - attr_writer :resbuf - attr_writer :paused + attr_accessor :resbuf attr_accessor :proxy_trailers attr_accessor :alive attr_reader :proxy_pass @@ -16,23 +15,11 @@ class Yahns::ReqRes < Kgio::Socket # :nodoc: def req_start(c, req, input, chunked, proxy_pass) @hdr = @resbuf = nil @yahns_client = c - @paused = false @rrstate = input ? [ req, input, chunked ] : req @proxy_pass = proxy_pass Thread.current[:yahns_queue].queue_add(self, Yahns::Queue::QEV_WR) end - def close - if @paused # called by wbuf_close_common as @body.close - @paused = false - # we must cleanup and set yahns_client state before queue_mod below: - @yahns_client.hijack_cleanup - Thread.current[:yahns_queue].queue_mod(self, Yahns::Queue::QEV_RD) - else - super - end - end - def yahns_step # yahns event loop entry point c = @yahns_client case req = @rrstate @@ -55,7 +42,7 @@ def yahns_step # yahns event loop entry point # continue looping in middle "case @resbuf" loop when :wait_readable return rv # spurious wakeup - when nil then return c.proxy_err_response(502, self, nil, nil) + when nil then return c.proxy_err_response(502, self, nil) end # NOT looping here when String # continue reading trickled response headers from upstream @@ -63,15 +50,15 @@ def yahns_step # yahns event loop entry point case rv = kgio_tryread(0x2000, buf) when String then res = req.headers(@hdr, resbuf << rv) and break when :wait_readable then return rv - when nil then return c.proxy_err_response(502, self, nil, nil) + when nil then return c.proxy_err_response(502, self, nil) end while true + @resbuf = false return c.proxy_response_start(res, resbuf, req, self) when Yahns::WbufCommon # streaming/buffering the response body - # we assign wbuf for rescue below: - return c.proxy_response_finish(req, wbuf = resbuf, self) + return c.proxy_response_finish(req, self) end while true # case @resbuf @@ -87,7 +74,7 @@ def yahns_step # yahns event loop entry point when Errno::ECONNREFUSED, Errno::ECONNRESET, Errno::EPIPE e.set_backtrace([]) end - c.proxy_err_response(502, self, e, wbuf) + c.proxy_err_response(502, self, e) end def send_req_body_chunk(buf) diff --git a/lib/yahns/wbuf.rb b/lib/yahns/wbuf.rb index e6c794a..f7b2ffa 100644 --- a/lib/yahns/wbuf.rb +++ b/lib/yahns/wbuf.rb @@ -30,7 +30,7 @@ # to be a scalability issue. class Yahns::Wbuf # :nodoc: include Yahns::WbufCommon - attr_reader :body, :busy, :wbuf_persist + attr_reader :busy def initialize(body, persist, tmpdir, busy) @tmpio = nil diff --git a/lib/yahns/wbuf_lite.rb b/lib/yahns/wbuf_lite.rb new file mode 100644 index 0000000..5f25b2e --- /dev/null +++ b/lib/yahns/wbuf_lite.rb @@ -0,0 +1,50 @@ +# -*- encoding: binary -*- +# Copyright (C) 2016 all contributors +# License: GPL-3.0+ +# frozen_string_literal: true +require_relative 'wbuf_common' + +# This is only used for "proxy_buffering: false" +class Yahns::WbufLite # :nodoc: + include Yahns::WbufCommon + attr_reader :busy + + def initialize(req_res) + @req_res = req_res + @busy = false + @buf = nil + end + + # called only by proxy_write in proxy_http_response + def wbuf_write(client, buf) + buf = buf.join if Array === buf + buf = @buf << buf if @buf # unlikely + do_write(client, buf) # try our best to avoid string copying/appending + end + + def do_write(client, buf) + case rv = client.kgio_trywrite(buf) + when String + buf = rv # continue looping + when :wait_writable, :wait_readable + @buf = buf + return @busy = rv + when nil + @buf = nil + return @busy = false + end while true + end + + # called by Yahns::HttpClient#step_write + def wbuf_flush(client) + sym = do_write(client, @buf) and return sym # :wait_writable/:wait_readable + + # resume reading + client.hijack_cleanup + Thread.current[:yahns_queue].queue_mod(@req_res, Yahns::Queue::QEV_RD) + :ignore + rescue + @req_res.close + raise + end +end diff --git a/test/test_proxy_pass_no_buffering.rb b/test/test_proxy_pass_no_buffering.rb index 88b7c80..48b8241 100644 --- a/test/test_proxy_pass_no_buffering.rb +++ b/test/test_proxy_pass_no_buffering.rb @@ -79,7 +79,6 @@ def test_proxy_pass_no_buffering req = "GET /giant-body HTTP/1.1\r\nHost: example.com\r\n" \ "Connection: close\r\n\r\n" s.write(req) - bufs = [] sleep 1 10.times do sleep 0.1 @@ -92,14 +91,10 @@ def test_proxy_pass_no_buffering [ deleted1, deleted2 ].each do |ary| ary.delete_if { |x| x =~ /\.(?:err|out) \(deleted\)/ } end - assert_equal 1, deleted1.size, "pid1=#{deleted1.inspect}" + assert_equal 0, deleted1.size, "pid1=#{deleted1.inspect}" assert_equal 0, deleted2.size, "pid2=#{deleted2.inspect}" - bufs.push(deleted1[0]) end end - before = bufs.size - bufs.uniq! - assert bufs.size < before, 'unlinked buffer should not grow' buf = ''.dup slow = Digest::MD5.new ft = Thread.new do @@ -108,26 +103,30 @@ def test_proxy_pass_no_buffering f.write(req) b2 = ''.dup check_headers(f) + nf = 0 begin f.readpartial(1024 * 1024, b2) + nf += b2.bytesize fast.update(b2) rescue EOFError f = f.close end while f b2.clear - fast + [ nf, fast.hexdigest ] end Thread.abort_on_exception = true check_headers(s) + n = 0 begin s.readpartial(1024 * 1024, buf) slow.update(buf) + n += buf.bytesize sleep 0.01 rescue EOFError s = s.close end while s ft.join(5) - assert_equal slow.hexdigest, ft.value.hexdigest + assert_equal [n, slow.hexdigest ], ft.value fast = Digest::MD5.new f = TCPSocket.new(host, port)