From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=unavailable autolearn_force=no version=3.4.0 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 10B4F200E4 for ; Tue, 7 Jun 2016 07:39:11 +0000 (UTC) From: Eric Wong To: yahns-public@yhbt.net Subject: [PATCH 5/5] proxy_pass: fix HTTP/1.0 backends on EOF w/o buffering Date: Tue, 7 Jun 2016 07:39:08 +0000 Message-Id: <20160607073908.31035-6-e@80x24.org> In-Reply-To: <20160607073908.31035-1-e@80x24.org> References: <20160607073908.31035-1-e@80x24.org> List-Id: We must ensure we properly close connections to HTTP/1.0 backends even if we blocked writing on outgoing data. --- lib/yahns/proxy_http_response.rb | 9 ++- lib/yahns/wbuf_lite.rb | 7 +- test/test_proxy_pass_no_buffering.rb | 138 +++++++++++++++++++---------------- 3 files changed, 84 insertions(+), 70 deletions(-) diff --git a/lib/yahns/proxy_http_response.rb b/lib/yahns/proxy_http_response.rb index 9867da2..316c310 100644 --- a/lib/yahns/proxy_http_response.rb +++ b/lib/yahns/proxy_http_response.rb @@ -10,13 +10,14 @@ module Yahns::HttpResponse # :nodoc: # switch and yield - def proxy_unbuffer(wbuf) + def proxy_unbuffer(wbuf, nxt = :ignore) @state = wbuf + wbuf.req_res = nil if nxt.nil? && wbuf.respond_to?(:req_res=) tc = Thread.current tc[:yahns_fdmap].remember(self) # Yahns::HttpClient tc[:yahns_queue].queue_mod(self, wbuf.busy == :wait_readable ? Yahns::Queue::QEV_RD : Yahns::Queue::QEV_WR) - :ignore + nxt end def wbuf_alloc(req_res) @@ -175,9 +176,9 @@ def proxy_read_body(tip, kcar, req_res) # HTTP/1.0 upstream: wbuf = proxy_write(wbuf, "0\r\n\r\n".freeze, req_res) if alive - return proxy_unbuffer(wbuf) if Yahns::WbufLite === wbuf req_res.shutdown - break + return proxy_unbuffer(wbuf, nil) if Yahns::WbufLite === wbuf + return proxy_busy_mod(wbuf, req_res) when :wait_readable return wait_on_upstream(req_res) end until kcar.body_eof? diff --git a/lib/yahns/wbuf_lite.rb b/lib/yahns/wbuf_lite.rb index afee1e9..fa52f54 100644 --- a/lib/yahns/wbuf_lite.rb +++ b/lib/yahns/wbuf_lite.rb @@ -7,9 +7,11 @@ # This is only used for "proxy_buffering: false" class Yahns::WbufLite < Yahns::Wbuf # :nodoc: attr_reader :busy + attr_writer :req_res def initialize(req_res) - super(nil, :ignore) + alive = req_res.alive + super(nil, alive ? :ignore : false) @req_res = req_res end @@ -35,8 +37,9 @@ def wbuf_close(client) if @req_res client.hijack_cleanup Thread.current[:yahns_queue].queue_mod(@req_res, Yahns::Queue::QEV_RD) + return :ignore end - :ignore + @wbuf_persist rescue @req_res = @req_res.close if @req_res raise diff --git a/test/test_proxy_pass_no_buffering.rb b/test/test_proxy_pass_no_buffering.rb index c60ccad..0afa4e1 100644 --- a/test/test_proxy_pass_no_buffering.rb +++ b/test/test_proxy_pass_no_buffering.rb @@ -18,8 +18,13 @@ def call(env) when 'GET' case env['PATH_INFO'] when '/giant-body' - h = [ %W(content-type text/pain), - %W(content-length #{NCHUNK * STR4.size}) ] + h = [ %W(content-type text/pain) ] + + # HTTP/1.0 is not Rack-compliant, so no Rack::Lint for us :) + if env['HTTP_VERSION'] == 'HTTP/1.1' + h << %W(content-length #{NCHUNK * STR4.size}) + end + body = Object.new def body.each NCHUNK.times { yield STR4 } @@ -53,6 +58,7 @@ def check_headers(io) end def test_proxy_pass_no_buffering + to_close = [] err, cfg, host, port = @err, Yahns::Config.new, @srv.addr[3], @srv.addr[1] host2, port2 = @srv2.addr[3], @srv2.addr[1] pxp = Yahns::ProxyPass.new("http://#{host2}:#{port2}", @@ -81,79 +87,83 @@ def test_proxy_pass_no_buffering stderr_path err.path end end - s = TCPSocket.new(host, port) - req = "GET /giant-body HTTP/1.1\r\nHost: example.com\r\n" \ - "Connection: close\r\n\r\n" - s.write(req) - bufs = [] - sleep 1 - 10.times do - sleep 0.1 - # ensure no files get created - if RUBY_PLATFORM =~ /\blinux\b/ && `which lsof 2>/dev/null`.size >= 4 - qtmpdir = Regexp.quote("#@tmpdir/") - deleted1 = `lsof -p #{pid}`.split("\n") - deleted1 = deleted1.grep(/\bREG\b.*#{qtmpdir}.* \(deleted\)/) - deleted2 = `lsof -p #{pid2}`.split("\n") - deleted2 = deleted2.grep(/\bREG\b.*#{qtmpdir}.* \(deleted\)/) - [ deleted1, deleted2 ].each do |ary| - ary.delete_if { |x| x =~ /\.(?:err|out) \(deleted\)/ } + %w(1.0 1.1).each do |ver| + s = TCPSocket.new(host, port) + to_close << s + req = "GET /giant-body HTTP/#{ver}\r\nHost: example.com\r\n".dup + req << "Connection: close\r\n" if ver == '1.1' + req << "\r\n" + s.write(req) + bufs = [] + sleep 1 + 10.times do + sleep 0.1 + # ensure no files get created + if RUBY_PLATFORM =~ /\blinux\b/ && `which lsof 2>/dev/null`.size >= 4 + qtmpdir = Regexp.quote("#@tmpdir/") + deleted1 = `lsof -p #{pid}`.split("\n") + deleted1 = deleted1.grep(/\bREG\b.*#{qtmpdir}.* \(deleted\)/) + deleted2 = `lsof -p #{pid2}`.split("\n") + deleted2 = deleted2.grep(/\bREG\b.*#{qtmpdir}.* \(deleted\)/) + [ deleted1, deleted2 ].each do |ary| + ary.delete_if { |x| x =~ /\.(?:err|out) \(deleted\)/ } + end + assert_equal 1, deleted1.size, "pid1=#{deleted1.inspect}" + assert_equal 0, deleted2.size, "pid2=#{deleted2.inspect}" + bufs.push(deleted1[0]) end - assert_equal 1, deleted1.size, "pid1=#{deleted1.inspect}" - assert_equal 0, deleted2.size, "pid2=#{deleted2.inspect}" - bufs.push(deleted1[0]) end - end - before = bufs.size - bufs.uniq! - assert bufs.size < before, 'unlinked buffer should not grow' - buf = ''.dup - slow = Digest::MD5.new - ft = Thread.new do + before = bufs.size + bufs.uniq! + assert bufs.size < before, 'unlinked buffer should not grow' + buf = ''.dup + slow = Digest::MD5.new + ft = Thread.new do + fast = Digest::MD5.new + f = TCPSocket.new(host2, port2) + f.write(req) + b2 = ''.dup + check_headers(f) + nf = 0 + begin + f.readpartial(1024 * 1024, b2) + nf += b2.bytesize + fast.update(b2) + rescue EOFError + f = f.close + end while f + b2.clear + [ nf, fast.hexdigest ] + end + Thread.abort_on_exception = true + check_headers(s) + n = 0 + begin + s.readpartial(1024 * 1024, buf) + slow.update(buf) + n += buf.bytesize + sleep 0.01 + rescue EOFError + s = s.close + end while s + ft.join(5) + assert_equal [n, slow.hexdigest ], ft.value + fast = Digest::MD5.new - f = TCPSocket.new(host2, port2) + f = TCPSocket.new(host, port) f.write(req) - b2 = ''.dup check_headers(f) - nf = 0 begin - f.readpartial(1024 * 1024, b2) - nf += b2.bytesize - fast.update(b2) + f.readpartial(1024 * 1024, buf) + fast.update(buf) rescue EOFError f = f.close end while f - b2.clear - [ nf, fast.hexdigest ] + buf.clear + assert_equal slow.hexdigest, fast.hexdigest end - Thread.abort_on_exception = true - check_headers(s) - n = 0 - begin - s.readpartial(1024 * 1024, buf) - slow.update(buf) - n += buf.bytesize - sleep 0.01 - rescue EOFError - s = s.close - end while s - ft.join(5) - assert_equal [n, slow.hexdigest ], ft.value - - fast = Digest::MD5.new - f = TCPSocket.new(host, port) - f.write(req) - check_headers(f) - begin - f.readpartial(1024 * 1024, buf) - fast.update(buf) - rescue EOFError - f = f.close - end while f - buf.clear - assert_equal slow.hexdigest, fast.hexdigest ensure - s.close if s + to_close.each { |io| io.close unless io.closed? } quit_wait(pid) quit_wait(pid2) end