about summary refs log tree commit homepage
diff options
context:
space:
mode:
-rw-r--r--lib/yahns/proxy_http_response.rb85
-rw-r--r--lib/yahns/req_res.rb14
-rw-r--r--lib/yahns/wbuf.rb5
-rw-r--r--lib/yahns/wbuf_common.rb2
-rw-r--r--test/test_proxy_pass_no_buffering.rb149
5 files changed, 217 insertions, 38 deletions
diff --git a/lib/yahns/proxy_http_response.rb b/lib/yahns/proxy_http_response.rb
index c7a9447..79b995a 100644
--- a/lib/yahns/proxy_http_response.rb
+++ b/lib/yahns/proxy_http_response.rb
@@ -7,9 +7,19 @@
 # constants.
 module Yahns::HttpResponse # :nodoc:
 
+  # switch and yield
+  def proxy_unbuffer(wbuf)
+    wbuf.body.resbuf = @state = wbuf
+    tc = Thread.current
+    tc[:yahns_fdmap].remember(self) # Yahns::HttpClient
+    tc[:yahns_queue].queue_mod(self, wbuf.busy == :wait_writable ?
+                               Yahns::Queue::QEV_WR : Yahns::Queue::QEV_RD)
+    :ignore
+  end
+
   # write everything in buf to our client socket (or wbuf, if it exists)
   # it may return a newly-created wbuf or nil
-  def proxy_write(wbuf, buf, alive)
+  def proxy_write(wbuf, buf, req_res)
     unless wbuf
       # no write buffer, try to write directly to the client socket
       case rv = String === buf ? kgio_trywrite(buf) : kgio_trywritev(buf)
@@ -17,8 +27,15 @@ module Yahns::HttpResponse # :nodoc:
       when String, Array # partial write, hope the skb grows
         buf = rv
       when :wait_writable, :wait_readable
-        wbuf = Yahns::Wbuf.new(nil, alive, self.class.output_buffer_tmpdir, rv)
-        buf = buf.join if Array === buf
+        if @hs.env['yahns.proxy_pass'].proxy_buffering
+          body = nil
+          alive = req_res.alive
+        else
+          req_res.paused = true
+          body = req_res
+          alive = :ignore
+        end
+        wbuf = Yahns::Wbuf.new(body, alive, self.class.output_buffer_tmpdir, rv)
         break
       end while true
     end
@@ -54,14 +71,14 @@ module Yahns::HttpResponse # :nodoc:
     wbuf.wbuf_abort if wbuf
   end
 
-  def wait_on_upstream(req_res, alive, wbuf)
-    req_res.resbuf = wbuf || Yahns::Wbuf.new(nil, alive,
+  def wait_on_upstream(req_res, wbuf)
+    req_res.resbuf = wbuf || Yahns::Wbuf.new(nil, req_res.alive,
                                              self.class.output_buffer_tmpdir,
                                              false)
     :wait_readable # self remains in :ignore, wait on upstream
   end
 
-  def proxy_res_headers(res)
+  def proxy_res_headers(res, req_res)
     status, headers = res
     code = status.to_i
     msg = Rack::Utils::HTTP_STATUS_CODES[code]
@@ -118,16 +135,18 @@ module Yahns::HttpResponse # :nodoc:
       flags = MSG_DONTWAIT
       res = rv # hope the skb grows
     when :wait_writable, :wait_readable # highly unlikely in real apps
-      wbuf = proxy_write(nil, res, alive)
+      wbuf = proxy_write(nil, res, req_res)
       break # keep buffering as much as possible
     end while true
-    [ alive, wbuf, have_body ]
+    req_res.alive = alive
+    [ wbuf, have_body ]
   end
 
-  def proxy_read_body(tip, kcar, req_res, alive, wbuf)
+  def proxy_read_body(tip, kcar, req_res, wbuf)
     chunk = ''.dup if kcar.chunked?
     len = kcar.body_bytes_left
     rbuf = Thread.current[:yahns_rbuf]
+    alive = req_res.alive
 
     case tmp = tip.shift || req_res.kgio_tryread(0x2000, rbuf)
     when String
@@ -141,29 +160,31 @@ module Yahns::HttpResponse # :nodoc:
         tmp = chunk_out(tmp)
       # else # HTTP/1.0 upstream, HTTP/1.0 client, do nothing
       end
-      wbuf = proxy_write(wbuf, tmp, alive)
+      wbuf = proxy_write(wbuf, tmp, req_res)
+      return proxy_unbuffer(wbuf) if wbuf && wbuf.body
       chunk.clear if chunk
     when nil # EOF
       # HTTP/1.1 upstream, unexpected premature EOF:
       return proxy_err_response(nil, req_res, nil, wbuf) if len || chunk
 
       # HTTP/1.0 upstream:
-      wbuf = proxy_write(wbuf, "0\r\n\r\n".freeze, true) if alive
+      wbuf = proxy_write(wbuf, "0\r\n\r\n".freeze, req_res) if alive
+      return proxy_unbuffer(wbuf) if wbuf && wbuf.body
       req_res.shutdown
       break
     when :wait_readable
-      return wait_on_upstream(req_res, alive, wbuf)
+      return wait_on_upstream(req_res, wbuf)
     end until kcar.body_eof?
 
     if chunk
       # tip is an empty array and becomes trailer storage
       req_res.proxy_trailers = [ rbuf.dup, tip ]
-      return proxy_read_trailers(kcar, req_res, alive, wbuf)
+      return proxy_read_trailers(kcar, req_res, wbuf)
     end
-    proxy_busy_mod(wbuf, alive)
+    proxy_busy_mod(wbuf, req_res)
   end
 
-  def proxy_read_trailers(kcar, req_res, alive, wbuf)
+  def proxy_read_trailers(kcar, req_res, wbuf)
     chunk, tlr = req_res.proxy_trailers
     rbuf = Thread.current[:yahns_rbuf]
 
@@ -172,13 +193,14 @@ module Yahns::HttpResponse # :nodoc:
       when String
         chunk << rv
       when :wait_readable
-        return wait_on_upstream(req_res, alive, wbuf)
+        return wait_on_upstream(req_res, wbuf)
       when nil # premature EOF
         return proxy_err_response(nil, req_res, nil, wbuf)
       end # no loop here
     end
-    wbuf = proxy_write(wbuf, trailer_out(tlr), alive)
-    proxy_busy_mod(wbuf, alive)
+    wbuf = proxy_write(wbuf, trailer_out(tlr), req_res)
+    return proxy_unbuffer(wbuf) if wbuf && wbuf.body
+    proxy_busy_mod(wbuf, req_res)
   end
 
   # start streaming the response once upstream is done sending headers to us.
@@ -186,25 +208,25 @@ module Yahns::HttpResponse # :nodoc:
   # returns :ignore if we yield control to the client(self)
   # returns nil if completely done
   def proxy_response_start(res, tip, kcar, req_res)
-    alive, wbuf, have_body = proxy_res_headers(res)
+    wbuf, have_body = proxy_res_headers(res, req_res)
     tip = tip.empty? ? [] : [ tip ]
 
     if have_body
       req_res.proxy_trailers = nil # define to avoid uninitialized warnings
-      return proxy_read_body(tip, kcar, req_res, alive, wbuf)
+      return proxy_read_body(tip, kcar, req_res, wbuf)
     end
+    return proxy_unbuffer(wbuf) if wbuf && wbuf.body
 
     # all done reading response from upstream, req_res will be discarded
     # when we return nil:
-    proxy_busy_mod(wbuf, alive)
+    proxy_busy_mod(wbuf, req_res)
   rescue => e
     proxy_err_response(502, req_res, e, wbuf)
   end
 
   def proxy_response_finish(kcar, wbuf, req_res)
-    alive = wbuf.wbuf_persist
-    req_res.proxy_trailers ? proxy_read_trailers(kcar, req_res, alive, wbuf)
-                           : proxy_read_body([], kcar, req_res, alive, wbuf)
+    req_res.proxy_trailers ? proxy_read_trailers(kcar, req_res, wbuf)
+                           : proxy_read_body([], kcar, req_res, wbuf)
   end
 
   def proxy_wait_next(qflags)
@@ -238,23 +260,18 @@ module Yahns::HttpResponse # :nodoc:
     Thread.current[:yahns_queue].queue_mod(self, qflags)
   end
 
-  def proxy_busy_mod(wbuf, alive)
-    busy = wbuf.busy if wbuf
-    if busy
+  def proxy_busy_mod(wbuf, req_res)
+    if wbuf
       # we are completely done reading and buffering the upstream response,
       # but have not completely written the response to the client,
       # yield control to the client socket:
       @state = wbuf
-      proxy_wait_next(case busy
-        when :wait_readable then Yahns::Queue::QEV_RD
-        when :wait_writable then Yahns::Queue::QEV_WR
-        else
-          raise "BUG: invalid wbuf.busy: #{busy.inspect}"
-        end)
+      proxy_wait_next(wbuf.busy == :wait_readable ? Yahns::Queue::QEV_RD :
+                      Yahns::Queue::QEV_WR)
       # no touching self after proxy_wait_next, we may be running
       # HttpClient#yahns_step in a different thread at this point
     else
-      case http_response_done(alive)
+      case http_response_done(req_res.alive)
       when :wait_readable then proxy_wait_next(Yahns::Queue::QEV_RD)
       when :wait_writable then proxy_wait_next(Yahns::Queue::QEV_WR)
       when :close then close
diff --git a/lib/yahns/req_res.rb b/lib/yahns/req_res.rb
index 3b0d298..dd4ec87 100644
--- a/lib/yahns/req_res.rb
+++ b/lib/yahns/req_res.rb
@@ -8,15 +8,29 @@ require 'kgio'
 
 class Yahns::ReqRes < Kgio::Socket # :nodoc:
   attr_writer :resbuf
+  attr_writer :paused
   attr_accessor :proxy_trailers
+  attr_accessor :alive
 
   def req_start(c, req, input, chunked)
     @hdr = @resbuf = nil
     @yahns_client = c
+    @paused = false
     @rrstate = input ? [ req, input, chunked ] : req
     Thread.current[:yahns_queue].queue_add(self, Yahns::Queue::QEV_WR)
   end
 
+  def close
+    if @paused # called by wbuf_close_common as @body.close
+      @paused = false
+      # we must cleanup and set yahns_client state before queue_mod below:
+      @yahns_client.hijack_cleanup
+      Thread.current[:yahns_queue].queue_mod(self, Yahns::Queue::QEV_RD)
+    else
+      super
+    end
+  end
+
   def yahns_step # yahns event loop entry point
     c = @yahns_client
     case req = @rrstate
diff --git a/lib/yahns/wbuf.rb b/lib/yahns/wbuf.rb
index 1b4ce6e..e6c794a 100644
--- a/lib/yahns/wbuf.rb
+++ b/lib/yahns/wbuf.rb
@@ -30,15 +30,14 @@ require_relative 'wbuf_common'
 # to be a scalability issue.
 class Yahns::Wbuf # :nodoc:
   include Yahns::WbufCommon
-  attr_reader :busy
-  attr_reader :wbuf_persist
+  attr_reader :body, :busy, :wbuf_persist
 
   def initialize(body, persist, tmpdir, busy)
     @tmpio = nil
     @tmpdir = tmpdir
     @sf_offset = @sf_count = 0
     @wbuf_persist = persist # whether or not we keep the connection alive
-    @body = body
+    @body = body # something we call #close on when done writing
     @busy = busy # may be false
   end
 
diff --git a/lib/yahns/wbuf_common.rb b/lib/yahns/wbuf_common.rb
index ee18218..cded2e3 100644
--- a/lib/yahns/wbuf_common.rb
+++ b/lib/yahns/wbuf_common.rb
@@ -48,7 +48,7 @@ module Yahns::WbufCommon # :nodoc:
     if @wbuf_persist.respond_to?(:call) # hijack
       client.response_hijacked(@wbuf_persist) # :ignore
     else
-      @wbuf_persist # true or false or Yahns::StreamFile
+      @wbuf_persist # true, false, :ignore, or Yahns::StreamFile
     end
   end
 end
diff --git a/test/test_proxy_pass_no_buffering.rb b/test/test_proxy_pass_no_buffering.rb
new file mode 100644
index 0000000..88b7c80
--- /dev/null
+++ b/test/test_proxy_pass_no_buffering.rb
@@ -0,0 +1,149 @@
+# Copyright (C) 2015-2016 all contributors <yahns-public@yhbt.net>
+# License: GPL-3.0+ (https://www.gnu.org/licenses/gpl-3.0.txt)
+# frozen_string_literal: true
+require_relative 'server_helper'
+begin
+  require 'kcar'
+rescue LoadError
+end
+require 'digest/md5'
+class TestProxyPassNoBuffering < Testcase
+  ENV["N"].to_i > 1 and parallelize_me!
+  include ServerHelper
+  STR4 = 'abcd' * (256 * 1024)
+  NCHUNK = 50
+  class ProxiedApp
+    def call(env)
+      case env['REQUEST_METHOD']
+      when 'GET'
+        case env['PATH_INFO']
+        when '/giant-body'
+          h = [ %W(content-type text/pain),
+                   %W(content-length #{NCHUNK * STR4.size}) ]
+          body = Object.new
+          def body.each
+            NCHUNK.times { yield STR4 }
+          end
+          [ 200, h, body ]
+        end
+      end
+    end
+  end
+
+  def setup
+    @srv2 = TCPServer.new(ENV["TEST_HOST"] || "127.0.0.1", 0)
+    server_helper_setup
+    skip "kcar missing yahns/proxy_pass" unless defined?(Kcar)
+    require 'yahns/proxy_pass'
+  end
+
+  def teardown
+    @srv2.close if defined?(@srv2) && !@srv2.closed?
+    server_helper_teardown
+  end
+
+  def check_headers(io)
+    l = io.gets
+    assert_match %r{\AHTTP/1\.[01] 200\b}, l
+    begin
+      l = io.gets
+    end until l == "\r\n"
+  end
+
+  def test_proxy_pass_no_buffering
+    err, cfg, host, port = @err, Yahns::Config.new, @srv.addr[3], @srv.addr[1]
+    host2, port2 = @srv2.addr[3], @srv2.addr[1]
+    pxp = Yahns::ProxyPass.new("http://#{host2}:#{port2}",
+                               proxy_buffering: false)
+    pid = mkserver(cfg) do
+      ObjectSpace.each_object(Yahns::TmpIO) { |io| io.close unless io.closed? }
+      @srv2.close
+      cfg.instance_eval do
+        app(:rack, pxp) { listen "#{host}:#{port}" }
+        stderr_path err.path
+      end
+    end
+
+    pid2 = mkserver(cfg, @srv2) do
+      ObjectSpace.each_object(Yahns::TmpIO) { |io| io.close unless io.closed? }
+      @srv.close
+      cfg.instance_eval do
+        app(:rack, ProxiedApp.new) do
+          output_buffering false
+          listen "#{host2}:#{port2}"
+        end
+        stderr_path err.path
+      end
+    end
+    s = TCPSocket.new(host, port)
+    req = "GET /giant-body HTTP/1.1\r\nHost: example.com\r\n" \
+          "Connection: close\r\n\r\n"
+    s.write(req)
+    bufs = []
+    sleep 1
+    10.times do
+      sleep 0.1
+      # ensure no files get created
+      if RUBY_PLATFORM =~ /\blinux\b/ && `which lsof 2>/dev/null`.size >= 4
+        deleted1 = `lsof -p #{pid}`.split("\n")
+        deleted1 = deleted1.grep(/\bREG\b.* \(deleted\)/)
+        deleted2 = `lsof -p #{pid2}`.split("\n")
+        deleted2 = deleted2.grep(/\bREG\b.* \(deleted\)/)
+        [ deleted1, deleted2 ].each do |ary|
+          ary.delete_if { |x| x =~ /\.(?:err|out) \(deleted\)/ }
+        end
+        assert_equal 1, deleted1.size, "pid1=#{deleted1.inspect}"
+        assert_equal 0, deleted2.size, "pid2=#{deleted2.inspect}"
+        bufs.push(deleted1[0])
+      end
+    end
+    before = bufs.size
+    bufs.uniq!
+    assert bufs.size < before, 'unlinked buffer should not grow'
+    buf = ''.dup
+    slow = Digest::MD5.new
+    ft = Thread.new do
+      fast = Digest::MD5.new
+      f = TCPSocket.new(host2, port2)
+      f.write(req)
+      b2 = ''.dup
+      check_headers(f)
+      begin
+        f.readpartial(1024 * 1024, b2)
+        fast.update(b2)
+      rescue EOFError
+        f = f.close
+      end while f
+      b2.clear
+      fast
+    end
+    Thread.abort_on_exception = true
+    check_headers(s)
+    begin
+      s.readpartial(1024 * 1024, buf)
+      slow.update(buf)
+      sleep 0.01
+    rescue EOFError
+      s = s.close
+    end while s
+    ft.join(5)
+    assert_equal slow.hexdigest, ft.value.hexdigest
+
+    fast = Digest::MD5.new
+    f = TCPSocket.new(host, port)
+    f.write(req)
+    check_headers(f)
+    begin
+      f.readpartial(1024 * 1024, buf)
+      fast.update(buf)
+    rescue EOFError
+      f = f.close
+    end while f
+    buf.clear
+    assert_equal slow.hexdigest, fast.hexdigest
+  ensure
+    s.close if s
+    quit_wait(pid)
+    quit_wait(pid2)
+  end
+end