diff options
-rw-r--r-- | lib/yahns/proxy_http_response.rb | 9 | ||||
-rw-r--r-- | lib/yahns/wbuf_lite.rb | 7 | ||||
-rw-r--r-- | test/test_proxy_pass_no_buffering.rb | 138 |
3 files changed, 84 insertions, 70 deletions
diff --git a/lib/yahns/proxy_http_response.rb b/lib/yahns/proxy_http_response.rb index 9867da2..316c310 100644 --- a/lib/yahns/proxy_http_response.rb +++ b/lib/yahns/proxy_http_response.rb @@ -10,13 +10,14 @@ require_relative 'wbuf_lite' module Yahns::HttpResponse # :nodoc: # switch and yield - def proxy_unbuffer(wbuf) + def proxy_unbuffer(wbuf, nxt = :ignore) @state = wbuf + wbuf.req_res = nil if nxt.nil? && wbuf.respond_to?(:req_res=) tc = Thread.current tc[:yahns_fdmap].remember(self) # Yahns::HttpClient tc[:yahns_queue].queue_mod(self, wbuf.busy == :wait_readable ? Yahns::Queue::QEV_RD : Yahns::Queue::QEV_WR) - :ignore + nxt end def wbuf_alloc(req_res) @@ -175,9 +176,9 @@ module Yahns::HttpResponse # :nodoc: # HTTP/1.0 upstream: wbuf = proxy_write(wbuf, "0\r\n\r\n".freeze, req_res) if alive - return proxy_unbuffer(wbuf) if Yahns::WbufLite === wbuf req_res.shutdown - break + return proxy_unbuffer(wbuf, nil) if Yahns::WbufLite === wbuf + return proxy_busy_mod(wbuf, req_res) when :wait_readable return wait_on_upstream(req_res) end until kcar.body_eof? diff --git a/lib/yahns/wbuf_lite.rb b/lib/yahns/wbuf_lite.rb index afee1e9..fa52f54 100644 --- a/lib/yahns/wbuf_lite.rb +++ b/lib/yahns/wbuf_lite.rb @@ -7,9 +7,11 @@ require_relative 'wbuf' # This is only used for "proxy_buffering: false" class Yahns::WbufLite < Yahns::Wbuf # :nodoc: attr_reader :busy + attr_writer :req_res def initialize(req_res) - super(nil, :ignore) + alive = req_res.alive + super(nil, alive ? :ignore : false) @req_res = req_res end @@ -35,8 +37,9 @@ class Yahns::WbufLite < Yahns::Wbuf # :nodoc: if @req_res client.hijack_cleanup Thread.current[:yahns_queue].queue_mod(@req_res, Yahns::Queue::QEV_RD) + return :ignore end - :ignore + @wbuf_persist rescue @req_res = @req_res.close if @req_res raise diff --git a/test/test_proxy_pass_no_buffering.rb b/test/test_proxy_pass_no_buffering.rb index c60ccad..0afa4e1 100644 --- a/test/test_proxy_pass_no_buffering.rb +++ b/test/test_proxy_pass_no_buffering.rb @@ -18,8 +18,13 @@ class TestProxyPassNoBuffering < Testcase when 'GET' case env['PATH_INFO'] when '/giant-body' - h = [ %W(content-type text/pain), - %W(content-length #{NCHUNK * STR4.size}) ] + h = [ %W(content-type text/pain) ] + + # HTTP/1.0 is not Rack-compliant, so no Rack::Lint for us :) + if env['HTTP_VERSION'] == 'HTTP/1.1' + h << %W(content-length #{NCHUNK * STR4.size}) + end + body = Object.new def body.each NCHUNK.times { yield STR4 } @@ -53,6 +58,7 @@ class TestProxyPassNoBuffering < Testcase end def test_proxy_pass_no_buffering + to_close = [] err, cfg, host, port = @err, Yahns::Config.new, @srv.addr[3], @srv.addr[1] host2, port2 = @srv2.addr[3], @srv2.addr[1] pxp = Yahns::ProxyPass.new("http://#{host2}:#{port2}", @@ -81,79 +87,83 @@ class TestProxyPassNoBuffering < Testcase stderr_path err.path end end - s = TCPSocket.new(host, port) - req = "GET /giant-body HTTP/1.1\r\nHost: example.com\r\n" \ - "Connection: close\r\n\r\n" - s.write(req) - bufs = [] - sleep 1 - 10.times do - sleep 0.1 - # ensure no files get created - if RUBY_PLATFORM =~ /\blinux\b/ && `which lsof 2>/dev/null`.size >= 4 - qtmpdir = Regexp.quote("#@tmpdir/") - deleted1 = `lsof -p #{pid}`.split("\n") - deleted1 = deleted1.grep(/\bREG\b.*#{qtmpdir}.* \(deleted\)/) - deleted2 = `lsof -p #{pid2}`.split("\n") - deleted2 = deleted2.grep(/\bREG\b.*#{qtmpdir}.* \(deleted\)/) - [ deleted1, deleted2 ].each do |ary| - ary.delete_if { |x| x =~ /\.(?:err|out) \(deleted\)/ } + %w(1.0 1.1).each do |ver| + s = TCPSocket.new(host, port) + to_close << s + req = "GET /giant-body HTTP/#{ver}\r\nHost: example.com\r\n".dup + req << "Connection: close\r\n" if ver == '1.1' + req << "\r\n" + s.write(req) + bufs = [] + sleep 1 + 10.times do + sleep 0.1 + # ensure no files get created + if RUBY_PLATFORM =~ /\blinux\b/ && `which lsof 2>/dev/null`.size >= 4 + qtmpdir = Regexp.quote("#@tmpdir/") + deleted1 = `lsof -p #{pid}`.split("\n") + deleted1 = deleted1.grep(/\bREG\b.*#{qtmpdir}.* \(deleted\)/) + deleted2 = `lsof -p #{pid2}`.split("\n") + deleted2 = deleted2.grep(/\bREG\b.*#{qtmpdir}.* \(deleted\)/) + [ deleted1, deleted2 ].each do |ary| + ary.delete_if { |x| x =~ /\.(?:err|out) \(deleted\)/ } + end + assert_equal 1, deleted1.size, "pid1=#{deleted1.inspect}" + assert_equal 0, deleted2.size, "pid2=#{deleted2.inspect}" + bufs.push(deleted1[0]) end - assert_equal 1, deleted1.size, "pid1=#{deleted1.inspect}" - assert_equal 0, deleted2.size, "pid2=#{deleted2.inspect}" - bufs.push(deleted1[0]) end - end - before = bufs.size - bufs.uniq! - assert bufs.size < before, 'unlinked buffer should not grow' - buf = ''.dup - slow = Digest::MD5.new - ft = Thread.new do + before = bufs.size + bufs.uniq! + assert bufs.size < before, 'unlinked buffer should not grow' + buf = ''.dup + slow = Digest::MD5.new + ft = Thread.new do + fast = Digest::MD5.new + f = TCPSocket.new(host2, port2) + f.write(req) + b2 = ''.dup + check_headers(f) + nf = 0 + begin + f.readpartial(1024 * 1024, b2) + nf += b2.bytesize + fast.update(b2) + rescue EOFError + f = f.close + end while f + b2.clear + [ nf, fast.hexdigest ] + end + Thread.abort_on_exception = true + check_headers(s) + n = 0 + begin + s.readpartial(1024 * 1024, buf) + slow.update(buf) + n += buf.bytesize + sleep 0.01 + rescue EOFError + s = s.close + end while s + ft.join(5) + assert_equal [n, slow.hexdigest ], ft.value + fast = Digest::MD5.new - f = TCPSocket.new(host2, port2) + f = TCPSocket.new(host, port) f.write(req) - b2 = ''.dup check_headers(f) - nf = 0 begin - f.readpartial(1024 * 1024, b2) - nf += b2.bytesize - fast.update(b2) + f.readpartial(1024 * 1024, buf) + fast.update(buf) rescue EOFError f = f.close end while f - b2.clear - [ nf, fast.hexdigest ] + buf.clear + assert_equal slow.hexdigest, fast.hexdigest end - Thread.abort_on_exception = true - check_headers(s) - n = 0 - begin - s.readpartial(1024 * 1024, buf) - slow.update(buf) - n += buf.bytesize - sleep 0.01 - rescue EOFError - s = s.close - end while s - ft.join(5) - assert_equal [n, slow.hexdigest ], ft.value - - fast = Digest::MD5.new - f = TCPSocket.new(host, port) - f.write(req) - check_headers(f) - begin - f.readpartial(1024 * 1024, buf) - fast.update(buf) - rescue EOFError - f = f.close - end while f - buf.clear - assert_equal slow.hexdigest, fast.hexdigest ensure - s.close if s + to_close.each { |io| io.close unless io.closed? } quit_wait(pid) quit_wait(pid2) end |