yahns Ruby server user/dev discussion
 help / color / mirror / code / Atom feed
From: Eric Wong <e@80x24.org>
To: yahns-public@yhbt.net
Subject: [PATCH 5/5] proxy_pass: fix HTTP/1.0 backends on EOF w/o buffering
Date: Tue,  7 Jun 2016 07:39:08 +0000	[thread overview]
Message-ID: <20160607073908.31035-6-e@80x24.org> (raw)
In-Reply-To: <20160607073908.31035-1-e@80x24.org>

We must ensure we properly close connections to HTTP/1.0
backends even if we blocked writing on outgoing data.
---
 lib/yahns/proxy_http_response.rb     |   9 ++-
 lib/yahns/wbuf_lite.rb               |   7 +-
 test/test_proxy_pass_no_buffering.rb | 138 +++++++++++++++++++----------------
 3 files changed, 84 insertions(+), 70 deletions(-)

diff --git a/lib/yahns/proxy_http_response.rb b/lib/yahns/proxy_http_response.rb
index 9867da2..316c310 100644
--- a/lib/yahns/proxy_http_response.rb
+++ b/lib/yahns/proxy_http_response.rb
@@ -10,13 +10,14 @@
 module Yahns::HttpResponse # :nodoc:
 
   # switch and yield
-  def proxy_unbuffer(wbuf)
+  def proxy_unbuffer(wbuf, nxt = :ignore)
     @state = wbuf
+    wbuf.req_res = nil if nxt.nil? && wbuf.respond_to?(:req_res=)
     tc = Thread.current
     tc[:yahns_fdmap].remember(self) # Yahns::HttpClient
     tc[:yahns_queue].queue_mod(self, wbuf.busy == :wait_readable ?
                                Yahns::Queue::QEV_RD : Yahns::Queue::QEV_WR)
-    :ignore
+    nxt
   end
 
   def wbuf_alloc(req_res)
@@ -175,9 +176,9 @@ def proxy_read_body(tip, kcar, req_res)
 
       # HTTP/1.0 upstream:
       wbuf = proxy_write(wbuf, "0\r\n\r\n".freeze, req_res) if alive
-      return proxy_unbuffer(wbuf) if Yahns::WbufLite === wbuf
       req_res.shutdown
-      break
+      return proxy_unbuffer(wbuf, nil) if Yahns::WbufLite === wbuf
+      return proxy_busy_mod(wbuf, req_res)
     when :wait_readable
       return wait_on_upstream(req_res)
     end until kcar.body_eof?
diff --git a/lib/yahns/wbuf_lite.rb b/lib/yahns/wbuf_lite.rb
index afee1e9..fa52f54 100644
--- a/lib/yahns/wbuf_lite.rb
+++ b/lib/yahns/wbuf_lite.rb
@@ -7,9 +7,11 @@
 # This is only used for "proxy_buffering: false"
 class Yahns::WbufLite < Yahns::Wbuf # :nodoc:
   attr_reader :busy
+  attr_writer :req_res
 
   def initialize(req_res)
-    super(nil, :ignore)
+    alive = req_res.alive
+    super(nil, alive ? :ignore : false)
     @req_res = req_res
   end
 
@@ -35,8 +37,9 @@ def wbuf_close(client)
     if @req_res
       client.hijack_cleanup
       Thread.current[:yahns_queue].queue_mod(@req_res, Yahns::Queue::QEV_RD)
+      return :ignore
     end
-    :ignore
+    @wbuf_persist
   rescue
     @req_res = @req_res.close if @req_res
     raise
diff --git a/test/test_proxy_pass_no_buffering.rb b/test/test_proxy_pass_no_buffering.rb
index c60ccad..0afa4e1 100644
--- a/test/test_proxy_pass_no_buffering.rb
+++ b/test/test_proxy_pass_no_buffering.rb
@@ -18,8 +18,13 @@ def call(env)
       when 'GET'
         case env['PATH_INFO']
         when '/giant-body'
-          h = [ %W(content-type text/pain),
-                   %W(content-length #{NCHUNK * STR4.size}) ]
+          h = [ %W(content-type text/pain) ]
+
+          # HTTP/1.0 is not Rack-compliant, so no Rack::Lint for us :)
+          if env['HTTP_VERSION'] == 'HTTP/1.1'
+            h << %W(content-length #{NCHUNK * STR4.size})
+          end
+
           body = Object.new
           def body.each
             NCHUNK.times { yield STR4 }
@@ -53,6 +58,7 @@ def check_headers(io)
   end
 
   def test_proxy_pass_no_buffering
+    to_close = []
     err, cfg, host, port = @err, Yahns::Config.new, @srv.addr[3], @srv.addr[1]
     host2, port2 = @srv2.addr[3], @srv2.addr[1]
     pxp = Yahns::ProxyPass.new("http://#{host2}:#{port2}",
@@ -81,79 +87,83 @@ def test_proxy_pass_no_buffering
         stderr_path err.path
       end
     end
-    s = TCPSocket.new(host, port)
-    req = "GET /giant-body HTTP/1.1\r\nHost: example.com\r\n" \
-          "Connection: close\r\n\r\n"
-    s.write(req)
-    bufs = []
-    sleep 1
-    10.times do
-      sleep 0.1
-      # ensure no files get created
-      if RUBY_PLATFORM =~ /\blinux\b/ && `which lsof 2>/dev/null`.size >= 4
-        qtmpdir = Regexp.quote("#@tmpdir/")
-        deleted1 = `lsof -p #{pid}`.split("\n")
-        deleted1 = deleted1.grep(/\bREG\b.*#{qtmpdir}.* \(deleted\)/)
-        deleted2 = `lsof -p #{pid2}`.split("\n")
-        deleted2 = deleted2.grep(/\bREG\b.*#{qtmpdir}.* \(deleted\)/)
-        [ deleted1, deleted2 ].each do |ary|
-          ary.delete_if { |x| x =~ /\.(?:err|out) \(deleted\)/ }
+    %w(1.0 1.1).each do |ver|
+      s = TCPSocket.new(host, port)
+      to_close << s
+      req = "GET /giant-body HTTP/#{ver}\r\nHost: example.com\r\n".dup
+      req << "Connection: close\r\n" if ver == '1.1'
+      req << "\r\n"
+      s.write(req)
+      bufs = []
+      sleep 1
+      10.times do
+        sleep 0.1
+        # ensure no files get created
+        if RUBY_PLATFORM =~ /\blinux\b/ && `which lsof 2>/dev/null`.size >= 4
+          qtmpdir = Regexp.quote("#@tmpdir/")
+          deleted1 = `lsof -p #{pid}`.split("\n")
+          deleted1 = deleted1.grep(/\bREG\b.*#{qtmpdir}.* \(deleted\)/)
+          deleted2 = `lsof -p #{pid2}`.split("\n")
+          deleted2 = deleted2.grep(/\bREG\b.*#{qtmpdir}.* \(deleted\)/)
+          [ deleted1, deleted2 ].each do |ary|
+            ary.delete_if { |x| x =~ /\.(?:err|out) \(deleted\)/ }
+          end
+          assert_equal 1, deleted1.size, "pid1=#{deleted1.inspect}"
+          assert_equal 0, deleted2.size, "pid2=#{deleted2.inspect}"
+          bufs.push(deleted1[0])
         end
-        assert_equal 1, deleted1.size, "pid1=#{deleted1.inspect}"
-        assert_equal 0, deleted2.size, "pid2=#{deleted2.inspect}"
-        bufs.push(deleted1[0])
       end
-    end
-    before = bufs.size
-    bufs.uniq!
-    assert bufs.size < before, 'unlinked buffer should not grow'
-    buf = ''.dup
-    slow = Digest::MD5.new
-    ft = Thread.new do
+      before = bufs.size
+      bufs.uniq!
+      assert bufs.size < before, 'unlinked buffer should not grow'
+      buf = ''.dup
+      slow = Digest::MD5.new
+      ft = Thread.new do
+        fast = Digest::MD5.new
+        f = TCPSocket.new(host2, port2)
+        f.write(req)
+        b2 = ''.dup
+        check_headers(f)
+        nf = 0
+        begin
+          f.readpartial(1024 * 1024, b2)
+          nf += b2.bytesize
+          fast.update(b2)
+        rescue EOFError
+          f = f.close
+        end while f
+        b2.clear
+        [ nf, fast.hexdigest ]
+      end
+      Thread.abort_on_exception = true
+      check_headers(s)
+      n = 0
+      begin
+        s.readpartial(1024 * 1024, buf)
+        slow.update(buf)
+        n += buf.bytesize
+        sleep 0.01
+      rescue EOFError
+        s = s.close
+      end while s
+      ft.join(5)
+      assert_equal [n, slow.hexdigest ], ft.value
+
       fast = Digest::MD5.new
-      f = TCPSocket.new(host2, port2)
+      f = TCPSocket.new(host, port)
       f.write(req)
-      b2 = ''.dup
       check_headers(f)
-      nf = 0
       begin
-        f.readpartial(1024 * 1024, b2)
-        nf += b2.bytesize
-        fast.update(b2)
+        f.readpartial(1024 * 1024, buf)
+        fast.update(buf)
       rescue EOFError
         f = f.close
       end while f
-      b2.clear
-      [ nf, fast.hexdigest ]
+      buf.clear
+      assert_equal slow.hexdigest, fast.hexdigest
     end
-    Thread.abort_on_exception = true
-    check_headers(s)
-    n = 0
-    begin
-      s.readpartial(1024 * 1024, buf)
-      slow.update(buf)
-      n += buf.bytesize
-      sleep 0.01
-    rescue EOFError
-      s = s.close
-    end while s
-    ft.join(5)
-    assert_equal [n, slow.hexdigest ], ft.value
-
-    fast = Digest::MD5.new
-    f = TCPSocket.new(host, port)
-    f.write(req)
-    check_headers(f)
-    begin
-      f.readpartial(1024 * 1024, buf)
-      fast.update(buf)
-    rescue EOFError
-      f = f.close
-    end while f
-    buf.clear
-    assert_equal slow.hexdigest, fast.hexdigest
   ensure
-    s.close if s
+    to_close.each { |io| io.close unless io.closed? }
     quit_wait(pid)
     quit_wait(pid2)
   end

      parent reply	other threads:[~2016-06-07  7:39 UTC|newest]

Thread overview: 6+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2016-06-07  7:39 [PATCH \0/5] another round of proxy-related bugfixes! Eric Wong
2016-06-07  7:39 ` [PATCH 1/5] test_proxy_pass_no_buffering: fix racy test Eric Wong
2016-06-07  7:39 ` [PATCH 2/5] queue_*: check for closed IO objects Eric Wong
2016-06-07  7:39 ` [PATCH 3/5] cleanup graceful shutdown handling Eric Wong
2016-06-07  7:39 ` [PATCH 4/5] proxy_pass: more descriptive error messages Eric Wong
2016-06-07  7:39 ` Eric Wong [this message]

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

  List information: http://yhbt.net/yahns/README

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20160607073908.31035-6-e@80x24.org \
    --to=e@80x24.org \
    --cc=yahns-public@yhbt.net \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
Code repositories for project(s) associated with this public inbox

	http://yhbt.net/yahns.git/

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).