diff options
-rw-r--r-- | lib/yahns/proxy_http_response.rb | 85 | ||||
-rw-r--r-- | lib/yahns/req_res.rb | 14 | ||||
-rw-r--r-- | lib/yahns/wbuf.rb | 5 | ||||
-rw-r--r-- | lib/yahns/wbuf_common.rb | 2 | ||||
-rw-r--r-- | test/test_proxy_pass_no_buffering.rb | 149 |
5 files changed, 217 insertions, 38 deletions
diff --git a/lib/yahns/proxy_http_response.rb b/lib/yahns/proxy_http_response.rb index c7a9447..79b995a 100644 --- a/lib/yahns/proxy_http_response.rb +++ b/lib/yahns/proxy_http_response.rb @@ -7,9 +7,19 @@ # constants. module Yahns::HttpResponse # :nodoc: + # switch and yield + def proxy_unbuffer(wbuf) + wbuf.body.resbuf = @state = wbuf + tc = Thread.current + tc[:yahns_fdmap].remember(self) # Yahns::HttpClient + tc[:yahns_queue].queue_mod(self, wbuf.busy == :wait_writable ? + Yahns::Queue::QEV_WR : Yahns::Queue::QEV_RD) + :ignore + end + # write everything in buf to our client socket (or wbuf, if it exists) # it may return a newly-created wbuf or nil - def proxy_write(wbuf, buf, alive) + def proxy_write(wbuf, buf, req_res) unless wbuf # no write buffer, try to write directly to the client socket case rv = String === buf ? kgio_trywrite(buf) : kgio_trywritev(buf) @@ -17,8 +27,15 @@ module Yahns::HttpResponse # :nodoc: when String, Array # partial write, hope the skb grows buf = rv when :wait_writable, :wait_readable - wbuf = Yahns::Wbuf.new(nil, alive, self.class.output_buffer_tmpdir, rv) - buf = buf.join if Array === buf + if @hs.env['yahns.proxy_pass'].proxy_buffering + body = nil + alive = req_res.alive + else + req_res.paused = true + body = req_res + alive = :ignore + end + wbuf = Yahns::Wbuf.new(body, alive, self.class.output_buffer_tmpdir, rv) break end while true end @@ -54,14 +71,14 @@ module Yahns::HttpResponse # :nodoc: wbuf.wbuf_abort if wbuf end - def wait_on_upstream(req_res, alive, wbuf) - req_res.resbuf = wbuf || Yahns::Wbuf.new(nil, alive, + def wait_on_upstream(req_res, wbuf) + req_res.resbuf = wbuf || Yahns::Wbuf.new(nil, req_res.alive, self.class.output_buffer_tmpdir, false) :wait_readable # self remains in :ignore, wait on upstream end - def proxy_res_headers(res) + def proxy_res_headers(res, req_res) status, headers = res code = status.to_i msg = Rack::Utils::HTTP_STATUS_CODES[code] @@ -118,16 +135,18 @@ module Yahns::HttpResponse # :nodoc: flags = MSG_DONTWAIT res = rv # hope the skb grows when :wait_writable, :wait_readable # highly unlikely in real apps - wbuf = proxy_write(nil, res, alive) + wbuf = proxy_write(nil, res, req_res) break # keep buffering as much as possible end while true - [ alive, wbuf, have_body ] + req_res.alive = alive + [ wbuf, have_body ] end - def proxy_read_body(tip, kcar, req_res, alive, wbuf) + def proxy_read_body(tip, kcar, req_res, wbuf) chunk = ''.dup if kcar.chunked? len = kcar.body_bytes_left rbuf = Thread.current[:yahns_rbuf] + alive = req_res.alive case tmp = tip.shift || req_res.kgio_tryread(0x2000, rbuf) when String @@ -141,29 +160,31 @@ module Yahns::HttpResponse # :nodoc: tmp = chunk_out(tmp) # else # HTTP/1.0 upstream, HTTP/1.0 client, do nothing end - wbuf = proxy_write(wbuf, tmp, alive) + wbuf = proxy_write(wbuf, tmp, req_res) + return proxy_unbuffer(wbuf) if wbuf && wbuf.body chunk.clear if chunk when nil # EOF # HTTP/1.1 upstream, unexpected premature EOF: return proxy_err_response(nil, req_res, nil, wbuf) if len || chunk # HTTP/1.0 upstream: - wbuf = proxy_write(wbuf, "0\r\n\r\n".freeze, true) if alive + wbuf = proxy_write(wbuf, "0\r\n\r\n".freeze, req_res) if alive + return proxy_unbuffer(wbuf) if wbuf && wbuf.body req_res.shutdown break when :wait_readable - return wait_on_upstream(req_res, alive, wbuf) + return wait_on_upstream(req_res, wbuf) end until kcar.body_eof? if chunk # tip is an empty array and becomes trailer storage req_res.proxy_trailers = [ rbuf.dup, tip ] - return proxy_read_trailers(kcar, req_res, alive, wbuf) + return proxy_read_trailers(kcar, req_res, wbuf) end - proxy_busy_mod(wbuf, alive) + proxy_busy_mod(wbuf, req_res) end - def proxy_read_trailers(kcar, req_res, alive, wbuf) + def proxy_read_trailers(kcar, req_res, wbuf) chunk, tlr = req_res.proxy_trailers rbuf = Thread.current[:yahns_rbuf] @@ -172,13 +193,14 @@ module Yahns::HttpResponse # :nodoc: when String chunk << rv when :wait_readable - return wait_on_upstream(req_res, alive, wbuf) + return wait_on_upstream(req_res, wbuf) when nil # premature EOF return proxy_err_response(nil, req_res, nil, wbuf) end # no loop here end - wbuf = proxy_write(wbuf, trailer_out(tlr), alive) - proxy_busy_mod(wbuf, alive) + wbuf = proxy_write(wbuf, trailer_out(tlr), req_res) + return proxy_unbuffer(wbuf) if wbuf && wbuf.body + proxy_busy_mod(wbuf, req_res) end # start streaming the response once upstream is done sending headers to us. @@ -186,25 +208,25 @@ module Yahns::HttpResponse # :nodoc: # returns :ignore if we yield control to the client(self) # returns nil if completely done def proxy_response_start(res, tip, kcar, req_res) - alive, wbuf, have_body = proxy_res_headers(res) + wbuf, have_body = proxy_res_headers(res, req_res) tip = tip.empty? ? [] : [ tip ] if have_body req_res.proxy_trailers = nil # define to avoid uninitialized warnings - return proxy_read_body(tip, kcar, req_res, alive, wbuf) + return proxy_read_body(tip, kcar, req_res, wbuf) end + return proxy_unbuffer(wbuf) if wbuf && wbuf.body # all done reading response from upstream, req_res will be discarded # when we return nil: - proxy_busy_mod(wbuf, alive) + proxy_busy_mod(wbuf, req_res) rescue => e proxy_err_response(502, req_res, e, wbuf) end def proxy_response_finish(kcar, wbuf, req_res) - alive = wbuf.wbuf_persist - req_res.proxy_trailers ? proxy_read_trailers(kcar, req_res, alive, wbuf) - : proxy_read_body([], kcar, req_res, alive, wbuf) + req_res.proxy_trailers ? proxy_read_trailers(kcar, req_res, wbuf) + : proxy_read_body([], kcar, req_res, wbuf) end def proxy_wait_next(qflags) @@ -238,23 +260,18 @@ module Yahns::HttpResponse # :nodoc: Thread.current[:yahns_queue].queue_mod(self, qflags) end - def proxy_busy_mod(wbuf, alive) - busy = wbuf.busy if wbuf - if busy + def proxy_busy_mod(wbuf, req_res) + if wbuf # we are completely done reading and buffering the upstream response, # but have not completely written the response to the client, # yield control to the client socket: @state = wbuf - proxy_wait_next(case busy - when :wait_readable then Yahns::Queue::QEV_RD - when :wait_writable then Yahns::Queue::QEV_WR - else - raise "BUG: invalid wbuf.busy: #{busy.inspect}" - end) + proxy_wait_next(wbuf.busy == :wait_readable ? Yahns::Queue::QEV_RD : + Yahns::Queue::QEV_WR) # no touching self after proxy_wait_next, we may be running # HttpClient#yahns_step in a different thread at this point else - case http_response_done(alive) + case http_response_done(req_res.alive) when :wait_readable then proxy_wait_next(Yahns::Queue::QEV_RD) when :wait_writable then proxy_wait_next(Yahns::Queue::QEV_WR) when :close then close diff --git a/lib/yahns/req_res.rb b/lib/yahns/req_res.rb index 3b0d298..dd4ec87 100644 --- a/lib/yahns/req_res.rb +++ b/lib/yahns/req_res.rb @@ -8,15 +8,29 @@ require 'kgio' class Yahns::ReqRes < Kgio::Socket # :nodoc: attr_writer :resbuf + attr_writer :paused attr_accessor :proxy_trailers + attr_accessor :alive def req_start(c, req, input, chunked) @hdr = @resbuf = nil @yahns_client = c + @paused = false @rrstate = input ? [ req, input, chunked ] : req Thread.current[:yahns_queue].queue_add(self, Yahns::Queue::QEV_WR) end + def close + if @paused # called by wbuf_close_common as @body.close + @paused = false + # we must cleanup and set yahns_client state before queue_mod below: + @yahns_client.hijack_cleanup + Thread.current[:yahns_queue].queue_mod(self, Yahns::Queue::QEV_RD) + else + super + end + end + def yahns_step # yahns event loop entry point c = @yahns_client case req = @rrstate diff --git a/lib/yahns/wbuf.rb b/lib/yahns/wbuf.rb index 1b4ce6e..e6c794a 100644 --- a/lib/yahns/wbuf.rb +++ b/lib/yahns/wbuf.rb @@ -30,15 +30,14 @@ require_relative 'wbuf_common' # to be a scalability issue. class Yahns::Wbuf # :nodoc: include Yahns::WbufCommon - attr_reader :busy - attr_reader :wbuf_persist + attr_reader :body, :busy, :wbuf_persist def initialize(body, persist, tmpdir, busy) @tmpio = nil @tmpdir = tmpdir @sf_offset = @sf_count = 0 @wbuf_persist = persist # whether or not we keep the connection alive - @body = body + @body = body # something we call #close on when done writing @busy = busy # may be false end diff --git a/lib/yahns/wbuf_common.rb b/lib/yahns/wbuf_common.rb index ee18218..cded2e3 100644 --- a/lib/yahns/wbuf_common.rb +++ b/lib/yahns/wbuf_common.rb @@ -48,7 +48,7 @@ module Yahns::WbufCommon # :nodoc: if @wbuf_persist.respond_to?(:call) # hijack client.response_hijacked(@wbuf_persist) # :ignore else - @wbuf_persist # true or false or Yahns::StreamFile + @wbuf_persist # true, false, :ignore, or Yahns::StreamFile end end end diff --git a/test/test_proxy_pass_no_buffering.rb b/test/test_proxy_pass_no_buffering.rb new file mode 100644 index 0000000..88b7c80 --- /dev/null +++ b/test/test_proxy_pass_no_buffering.rb @@ -0,0 +1,149 @@ +# Copyright (C) 2015-2016 all contributors <yahns-public@yhbt.net> +# License: GPL-3.0+ (https://www.gnu.org/licenses/gpl-3.0.txt) +# frozen_string_literal: true +require_relative 'server_helper' +begin + require 'kcar' +rescue LoadError +end +require 'digest/md5' +class TestProxyPassNoBuffering < Testcase + ENV["N"].to_i > 1 and parallelize_me! + include ServerHelper + STR4 = 'abcd' * (256 * 1024) + NCHUNK = 50 + class ProxiedApp + def call(env) + case env['REQUEST_METHOD'] + when 'GET' + case env['PATH_INFO'] + when '/giant-body' + h = [ %W(content-type text/pain), + %W(content-length #{NCHUNK * STR4.size}) ] + body = Object.new + def body.each + NCHUNK.times { yield STR4 } + end + [ 200, h, body ] + end + end + end + end + + def setup + @srv2 = TCPServer.new(ENV["TEST_HOST"] || "127.0.0.1", 0) + server_helper_setup + skip "kcar missing yahns/proxy_pass" unless defined?(Kcar) + require 'yahns/proxy_pass' + end + + def teardown + @srv2.close if defined?(@srv2) && !@srv2.closed? + server_helper_teardown + end + + def check_headers(io) + l = io.gets + assert_match %r{\AHTTP/1\.[01] 200\b}, l + begin + l = io.gets + end until l == "\r\n" + end + + def test_proxy_pass_no_buffering + err, cfg, host, port = @err, Yahns::Config.new, @srv.addr[3], @srv.addr[1] + host2, port2 = @srv2.addr[3], @srv2.addr[1] + pxp = Yahns::ProxyPass.new("http://#{host2}:#{port2}", + proxy_buffering: false) + pid = mkserver(cfg) do + ObjectSpace.each_object(Yahns::TmpIO) { |io| io.close unless io.closed? } + @srv2.close + cfg.instance_eval do + app(:rack, pxp) { listen "#{host}:#{port}" } + stderr_path err.path + end + end + + pid2 = mkserver(cfg, @srv2) do + ObjectSpace.each_object(Yahns::TmpIO) { |io| io.close unless io.closed? } + @srv.close + cfg.instance_eval do + app(:rack, ProxiedApp.new) do + output_buffering false + listen "#{host2}:#{port2}" + end + stderr_path err.path + end + end + s = TCPSocket.new(host, port) + req = "GET /giant-body HTTP/1.1\r\nHost: example.com\r\n" \ + "Connection: close\r\n\r\n" + s.write(req) + bufs = [] + sleep 1 + 10.times do + sleep 0.1 + # ensure no files get created + if RUBY_PLATFORM =~ /\blinux\b/ && `which lsof 2>/dev/null`.size >= 4 + deleted1 = `lsof -p #{pid}`.split("\n") + deleted1 = deleted1.grep(/\bREG\b.* \(deleted\)/) + deleted2 = `lsof -p #{pid2}`.split("\n") + deleted2 = deleted2.grep(/\bREG\b.* \(deleted\)/) + [ deleted1, deleted2 ].each do |ary| + ary.delete_if { |x| x =~ /\.(?:err|out) \(deleted\)/ } + end + assert_equal 1, deleted1.size, "pid1=#{deleted1.inspect}" + assert_equal 0, deleted2.size, "pid2=#{deleted2.inspect}" + bufs.push(deleted1[0]) + end + end + before = bufs.size + bufs.uniq! + assert bufs.size < before, 'unlinked buffer should not grow' + buf = ''.dup + slow = Digest::MD5.new + ft = Thread.new do + fast = Digest::MD5.new + f = TCPSocket.new(host2, port2) + f.write(req) + b2 = ''.dup + check_headers(f) + begin + f.readpartial(1024 * 1024, b2) + fast.update(b2) + rescue EOFError + f = f.close + end while f + b2.clear + fast + end + Thread.abort_on_exception = true + check_headers(s) + begin + s.readpartial(1024 * 1024, buf) + slow.update(buf) + sleep 0.01 + rescue EOFError + s = s.close + end while s + ft.join(5) + assert_equal slow.hexdigest, ft.value.hexdigest + + fast = Digest::MD5.new + f = TCPSocket.new(host, port) + f.write(req) + check_headers(f) + begin + f.readpartial(1024 * 1024, buf) + fast.update(buf) + rescue EOFError + f = f.close + end while f + buf.clear + assert_equal slow.hexdigest, fast.hexdigest + ensure + s.close if s + quit_wait(pid) + quit_wait(pid2) + end +end |