about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2016-06-07 04:07:20 +0000
committerEric Wong <e@80x24.org>2016-06-07 07:22:42 +0000
commit616e42c8d609905d9355bb5db726a5348303ffae (patch)
treed1e85a659cf6097255cf16f4735e0d35f8dd3110
parent31a7777d0c646796e9a344c54f64269c51801fb6 (diff)
downloadyahns-616e42c8d609905d9355bb5db726a5348303ffae.tar.gz
We must ensure we properly close connections to HTTP/1.0
backends even if we blocked writing on outgoing data.
-rw-r--r--lib/yahns/proxy_http_response.rb9
-rw-r--r--lib/yahns/wbuf_lite.rb7
-rw-r--r--test/test_proxy_pass_no_buffering.rb138
3 files changed, 84 insertions, 70 deletions
diff --git a/lib/yahns/proxy_http_response.rb b/lib/yahns/proxy_http_response.rb
index 9867da2..316c310 100644
--- a/lib/yahns/proxy_http_response.rb
+++ b/lib/yahns/proxy_http_response.rb
@@ -10,13 +10,14 @@ require_relative 'wbuf_lite'
 module Yahns::HttpResponse # :nodoc:
 
   # switch and yield
-  def proxy_unbuffer(wbuf)
+  def proxy_unbuffer(wbuf, nxt = :ignore)
     @state = wbuf
+    wbuf.req_res = nil if nxt.nil? && wbuf.respond_to?(:req_res=)
     tc = Thread.current
     tc[:yahns_fdmap].remember(self) # Yahns::HttpClient
     tc[:yahns_queue].queue_mod(self, wbuf.busy == :wait_readable ?
                                Yahns::Queue::QEV_RD : Yahns::Queue::QEV_WR)
-    :ignore
+    nxt
   end
 
   def wbuf_alloc(req_res)
@@ -175,9 +176,9 @@ module Yahns::HttpResponse # :nodoc:
 
       # HTTP/1.0 upstream:
       wbuf = proxy_write(wbuf, "0\r\n\r\n".freeze, req_res) if alive
-      return proxy_unbuffer(wbuf) if Yahns::WbufLite === wbuf
       req_res.shutdown
-      break
+      return proxy_unbuffer(wbuf, nil) if Yahns::WbufLite === wbuf
+      return proxy_busy_mod(wbuf, req_res)
     when :wait_readable
       return wait_on_upstream(req_res)
     end until kcar.body_eof?
diff --git a/lib/yahns/wbuf_lite.rb b/lib/yahns/wbuf_lite.rb
index afee1e9..fa52f54 100644
--- a/lib/yahns/wbuf_lite.rb
+++ b/lib/yahns/wbuf_lite.rb
@@ -7,9 +7,11 @@ require_relative 'wbuf'
 # This is only used for "proxy_buffering: false"
 class Yahns::WbufLite < Yahns::Wbuf # :nodoc:
   attr_reader :busy
+  attr_writer :req_res
 
   def initialize(req_res)
-    super(nil, :ignore)
+    alive = req_res.alive
+    super(nil, alive ? :ignore : false)
     @req_res = req_res
   end
 
@@ -35,8 +37,9 @@ class Yahns::WbufLite < Yahns::Wbuf # :nodoc:
     if @req_res
       client.hijack_cleanup
       Thread.current[:yahns_queue].queue_mod(@req_res, Yahns::Queue::QEV_RD)
+      return :ignore
     end
-    :ignore
+    @wbuf_persist
   rescue
     @req_res = @req_res.close if @req_res
     raise
diff --git a/test/test_proxy_pass_no_buffering.rb b/test/test_proxy_pass_no_buffering.rb
index c60ccad..0afa4e1 100644
--- a/test/test_proxy_pass_no_buffering.rb
+++ b/test/test_proxy_pass_no_buffering.rb
@@ -18,8 +18,13 @@ class TestProxyPassNoBuffering < Testcase
       when 'GET'
         case env['PATH_INFO']
         when '/giant-body'
-          h = [ %W(content-type text/pain),
-                   %W(content-length #{NCHUNK * STR4.size}) ]
+          h = [ %W(content-type text/pain) ]
+
+          # HTTP/1.0 is not Rack-compliant, so no Rack::Lint for us :)
+          if env['HTTP_VERSION'] == 'HTTP/1.1'
+            h << %W(content-length #{NCHUNK * STR4.size})
+          end
+
           body = Object.new
           def body.each
             NCHUNK.times { yield STR4 }
@@ -53,6 +58,7 @@ class TestProxyPassNoBuffering < Testcase
   end
 
   def test_proxy_pass_no_buffering
+    to_close = []
     err, cfg, host, port = @err, Yahns::Config.new, @srv.addr[3], @srv.addr[1]
     host2, port2 = @srv2.addr[3], @srv2.addr[1]
     pxp = Yahns::ProxyPass.new("http://#{host2}:#{port2}",
@@ -81,79 +87,83 @@ class TestProxyPassNoBuffering < Testcase
         stderr_path err.path
       end
     end
-    s = TCPSocket.new(host, port)
-    req = "GET /giant-body HTTP/1.1\r\nHost: example.com\r\n" \
-          "Connection: close\r\n\r\n"
-    s.write(req)
-    bufs = []
-    sleep 1
-    10.times do
-      sleep 0.1
-      # ensure no files get created
-      if RUBY_PLATFORM =~ /\blinux\b/ && `which lsof 2>/dev/null`.size >= 4
-        qtmpdir = Regexp.quote("#@tmpdir/")
-        deleted1 = `lsof -p #{pid}`.split("\n")
-        deleted1 = deleted1.grep(/\bREG\b.*#{qtmpdir}.* \(deleted\)/)
-        deleted2 = `lsof -p #{pid2}`.split("\n")
-        deleted2 = deleted2.grep(/\bREG\b.*#{qtmpdir}.* \(deleted\)/)
-        [ deleted1, deleted2 ].each do |ary|
-          ary.delete_if { |x| x =~ /\.(?:err|out) \(deleted\)/ }
+    %w(1.0 1.1).each do |ver|
+      s = TCPSocket.new(host, port)
+      to_close << s
+      req = "GET /giant-body HTTP/#{ver}\r\nHost: example.com\r\n".dup
+      req << "Connection: close\r\n" if ver == '1.1'
+      req << "\r\n"
+      s.write(req)
+      bufs = []
+      sleep 1
+      10.times do
+        sleep 0.1
+        # ensure no files get created
+        if RUBY_PLATFORM =~ /\blinux\b/ && `which lsof 2>/dev/null`.size >= 4
+          qtmpdir = Regexp.quote("#@tmpdir/")
+          deleted1 = `lsof -p #{pid}`.split("\n")
+          deleted1 = deleted1.grep(/\bREG\b.*#{qtmpdir}.* \(deleted\)/)
+          deleted2 = `lsof -p #{pid2}`.split("\n")
+          deleted2 = deleted2.grep(/\bREG\b.*#{qtmpdir}.* \(deleted\)/)
+          [ deleted1, deleted2 ].each do |ary|
+            ary.delete_if { |x| x =~ /\.(?:err|out) \(deleted\)/ }
+          end
+          assert_equal 1, deleted1.size, "pid1=#{deleted1.inspect}"
+          assert_equal 0, deleted2.size, "pid2=#{deleted2.inspect}"
+          bufs.push(deleted1[0])
         end
-        assert_equal 1, deleted1.size, "pid1=#{deleted1.inspect}"
-        assert_equal 0, deleted2.size, "pid2=#{deleted2.inspect}"
-        bufs.push(deleted1[0])
       end
-    end
-    before = bufs.size
-    bufs.uniq!
-    assert bufs.size < before, 'unlinked buffer should not grow'
-    buf = ''.dup
-    slow = Digest::MD5.new
-    ft = Thread.new do
+      before = bufs.size
+      bufs.uniq!
+      assert bufs.size < before, 'unlinked buffer should not grow'
+      buf = ''.dup
+      slow = Digest::MD5.new
+      ft = Thread.new do
+        fast = Digest::MD5.new
+        f = TCPSocket.new(host2, port2)
+        f.write(req)
+        b2 = ''.dup
+        check_headers(f)
+        nf = 0
+        begin
+          f.readpartial(1024 * 1024, b2)
+          nf += b2.bytesize
+          fast.update(b2)
+        rescue EOFError
+          f = f.close
+        end while f
+        b2.clear
+        [ nf, fast.hexdigest ]
+      end
+      Thread.abort_on_exception = true
+      check_headers(s)
+      n = 0
+      begin
+        s.readpartial(1024 * 1024, buf)
+        slow.update(buf)
+        n += buf.bytesize
+        sleep 0.01
+      rescue EOFError
+        s = s.close
+      end while s
+      ft.join(5)
+      assert_equal [n, slow.hexdigest ], ft.value
+
       fast = Digest::MD5.new
-      f = TCPSocket.new(host2, port2)
+      f = TCPSocket.new(host, port)
       f.write(req)
-      b2 = ''.dup
       check_headers(f)
-      nf = 0
       begin
-        f.readpartial(1024 * 1024, b2)
-        nf += b2.bytesize
-        fast.update(b2)
+        f.readpartial(1024 * 1024, buf)
+        fast.update(buf)
       rescue EOFError
         f = f.close
       end while f
-      b2.clear
-      [ nf, fast.hexdigest ]
+      buf.clear
+      assert_equal slow.hexdigest, fast.hexdigest
     end
-    Thread.abort_on_exception = true
-    check_headers(s)
-    n = 0
-    begin
-      s.readpartial(1024 * 1024, buf)
-      slow.update(buf)
-      n += buf.bytesize
-      sleep 0.01
-    rescue EOFError
-      s = s.close
-    end while s
-    ft.join(5)
-    assert_equal [n, slow.hexdigest ], ft.value
-
-    fast = Digest::MD5.new
-    f = TCPSocket.new(host, port)
-    f.write(req)
-    check_headers(f)
-    begin
-      f.readpartial(1024 * 1024, buf)
-      fast.update(buf)
-    rescue EOFError
-      f = f.close
-    end while f
-    buf.clear
-    assert_equal slow.hexdigest, fast.hexdigest
   ensure
-    s.close if s
+    to_close.each { |io| io.close unless io.closed? }
     quit_wait(pid)
     quit_wait(pid2)
   end