yahns Ruby server user/dev discussion
 help / color / mirror / code / Atom feed
* [PATCH 2/3] proxy_pass: redo "proxy_buffering: false"
@ 2016-06-05 22:50 Eric Wong
  2016-06-05 22:50 ` [PATCH 3/3] wbuf: remove needless "busy" parameter Eric Wong
  2016-06-05 22:53 ` [PATCH 1/3] req_res: store proxy_pass object here, instead Eric Wong
  0 siblings, 2 replies; 3+ messages in thread
From: Eric Wong @ 2016-06-05 22:50 UTC (permalink / raw)
  To: yahns-public

Relying on @body.close in Yahns::WbufCommon#wbuf_close_common to
resume reading the upstream response was too subtle and
potentially racy.

Instead use a new Yahns::WbufLite class which does exactly what
we want for implementing this feature, and nothing more.
---
 lib/yahns/proxy_http_response.rb     | 80 +++++++++++++++++++-----------------
 lib/yahns/req_res.rb                 | 25 +++--------
 lib/yahns/wbuf.rb                    |  2 +-
 lib/yahns/wbuf_lite.rb               | 50 ++++++++++++++++++++++
 test/test_proxy_pass_no_buffering.rb | 15 ++++---
 5 files changed, 107 insertions(+), 65 deletions(-)
 create mode 100644 lib/yahns/wbuf_lite.rb

diff --git a/lib/yahns/proxy_http_response.rb b/lib/yahns/proxy_http_response.rb
index 765fe14..ea176d9 100644
--- a/lib/yahns/proxy_http_response.rb
+++ b/lib/yahns/proxy_http_response.rb
@@ -3,20 +3,30 @@
 # License: GPL-3.0+ (https://www.gnu.org/licenses/gpl-3.0.txt)
 # frozen_string_literal: true
 
+require_relative 'wbuf_lite'
+
 # loaded by yahns/proxy_pass, this relies on Yahns::HttpResponse for
 # constants.
 module Yahns::HttpResponse # :nodoc:
 
   # switch and yield
   def proxy_unbuffer(wbuf)
-    wbuf.body.resbuf = @state = wbuf
+    @state = wbuf
     tc = Thread.current
     tc[:yahns_fdmap].remember(self) # Yahns::HttpClient
-    tc[:yahns_queue].queue_mod(self, wbuf.busy == :wait_writable ?
-                               Yahns::Queue::QEV_WR : Yahns::Queue::QEV_RD)
+    tc[:yahns_queue].queue_mod(self, wbuf.busy == :wait_readable ?
+                               Yahns::Queue::QEV_RD : Yahns::Queue::QEV_WR)
     :ignore
   end
 
+  def wbuf_alloc(req_res, busy)
+    if req_res.proxy_pass.proxy_buffering
+      Yahns::Wbuf.new(nil, req_res.alive, self.class.output_buffer_tmpdir, busy)
+    else
+      Yahns::WbufLite.new(req_res)
+    end
+  end
+
   # write everything in buf to our client socket (or wbuf, if it exists)
   # it may return a newly-created wbuf or nil
   def proxy_write(wbuf, buf, req_res)
@@ -27,15 +37,7 @@ def proxy_write(wbuf, buf, req_res)
       when String, Array # partial write, hope the skb grows
         buf = rv
       when :wait_writable, :wait_readable
-        if req_res.proxy_pass.proxy_buffering
-          body = nil
-          alive = req_res.alive
-        else
-          req_res.paused = true
-          body = req_res
-          alive = :ignore
-        end
-        wbuf = Yahns::Wbuf.new(body, alive, self.class.output_buffer_tmpdir, rv)
+        wbuf = req_res.resbuf ||= wbuf_alloc(req_res, rv)
         break
       end while true
     end
@@ -44,7 +46,7 @@ def proxy_write(wbuf, buf, req_res)
     wbuf.busy ? wbuf : nil
   end
 
-  def proxy_err_response(code, req_res, exc, wbuf)
+  def proxy_err_response(code, req_res, exc)
     logger = @hs.env['rack.logger']
     case exc
     when nil
@@ -68,13 +70,12 @@ def proxy_err_response(code, req_res, exc, wbuf)
 
     nil # signal close of req_res from yahns_step in yahns/proxy_pass.rb
   ensure
-    wbuf.wbuf_abort if wbuf
+    wbuf = req_res.resbuf
+    wbuf.wbuf_abort if wbuf.respond_to?(:wbuf_abort)
   end
 
-  def wait_on_upstream(req_res, wbuf)
-    req_res.resbuf = wbuf || Yahns::Wbuf.new(nil, req_res.alive,
-                                             self.class.output_buffer_tmpdir,
-                                             false)
+  def wait_on_upstream(req_res)
+    req_res.resbuf ||= wbuf_alloc(req_res, false)
     :wait_readable # self remains in :ignore, wait on upstream
   end
 
@@ -135,18 +136,19 @@ def proxy_res_headers(res, req_res)
       flags = MSG_DONTWAIT
       res = rv # hope the skb grows
     when :wait_writable, :wait_readable # highly unlikely in real apps
-      wbuf = proxy_write(nil, res, req_res)
-      break # keep buffering as much as possible
+      proxy_write(nil, res, req_res)
+      break # keep buffering body...
     end while true
     req_res.alive = alive
-    [ wbuf, have_body ]
+    have_body
   end
 
-  def proxy_read_body(tip, kcar, req_res, wbuf)
+  def proxy_read_body(tip, kcar, req_res)
     chunk = ''.dup if kcar.chunked?
     len = kcar.body_bytes_left
     rbuf = Thread.current[:yahns_rbuf]
     alive = req_res.alive
+    wbuf = req_res.resbuf
 
     case tmp = tip.shift || req_res.kgio_tryread(0x2000, rbuf)
     when String
@@ -161,45 +163,46 @@ def proxy_read_body(tip, kcar, req_res, wbuf)
       # else # HTTP/1.0 upstream, HTTP/1.0 client, do nothing
       end
       wbuf = proxy_write(wbuf, tmp, req_res)
-      return proxy_unbuffer(wbuf) if wbuf && wbuf.body
       chunk.clear if chunk
+      return proxy_unbuffer(wbuf) if Yahns::WbufLite === wbuf
     when nil # EOF
       # HTTP/1.1 upstream, unexpected premature EOF:
-      return proxy_err_response(nil, req_res, nil, wbuf) if len || chunk
+      return proxy_err_response(nil, req_res, nil) if len || chunk
 
       # HTTP/1.0 upstream:
       wbuf = proxy_write(wbuf, "0\r\n\r\n".freeze, req_res) if alive
-      return proxy_unbuffer(wbuf) if wbuf && wbuf.body
+      return proxy_unbuffer(wbuf) if Yahns::WbufLite === wbuf
       req_res.shutdown
       break
     when :wait_readable
-      return wait_on_upstream(req_res, wbuf)
+      return wait_on_upstream(req_res)
     end until kcar.body_eof?
 
     if chunk
       # tip is an empty array and becomes trailer storage
       req_res.proxy_trailers = [ rbuf.dup, tip ]
-      return proxy_read_trailers(kcar, req_res, wbuf)
+      return proxy_read_trailers(kcar, req_res)
     end
     proxy_busy_mod(wbuf, req_res)
   end
 
-  def proxy_read_trailers(kcar, req_res, wbuf)
+  def proxy_read_trailers(kcar, req_res)
     chunk, tlr = req_res.proxy_trailers
     rbuf = Thread.current[:yahns_rbuf]
+    wbuf = req_res.resbuf
 
     until kcar.trailers(tlr, chunk)
       case rv = req_res.kgio_tryread(0x2000, rbuf)
       when String
         chunk << rv
       when :wait_readable
-        return wait_on_upstream(req_res, wbuf)
+        return wait_on_upstream(req_res)
       when nil # premature EOF
-        return proxy_err_response(nil, req_res, nil, wbuf)
+        return proxy_err_response(nil, req_res, nil)
       end # no loop here
     end
     wbuf = proxy_write(wbuf, trailer_out(tlr), req_res)
-    return proxy_unbuffer(wbuf) if wbuf && wbuf.body
+    return proxy_unbuffer(wbuf) if Yahns::WbufLite === wbuf
     proxy_busy_mod(wbuf, req_res)
   end
 
@@ -208,23 +211,26 @@ def proxy_read_trailers(kcar, req_res, wbuf)
   # returns :ignore if we yield control to the client(self)
   # returns nil if completely done
   def proxy_response_start(res, tip, kcar, req_res)
-    wbuf, have_body = proxy_res_headers(res, req_res)
+    have_body = proxy_res_headers(res, req_res)
     tip = tip.empty? ? [] : [ tip ]
 
     if have_body
       req_res.proxy_trailers = nil # define to avoid uninitialized warnings
-      return proxy_read_body(tip, kcar, req_res, wbuf)
+      return proxy_read_body(tip, kcar, req_res)
     end
-    return proxy_unbuffer(wbuf) if wbuf && wbuf.body
+
+    # unlikely
+    wbuf = req_res.resbuf
+    return proxy_unbuffer(wbuf) if Yahns::WbufLite === wbuf
 
     # all done reading response from upstream, req_res will be discarded
     # when we return nil:
     proxy_busy_mod(wbuf, req_res)
   end
 
-  def proxy_response_finish(kcar, wbuf, req_res)
-    req_res.proxy_trailers ? proxy_read_trailers(kcar, req_res, wbuf)
-                           : proxy_read_body([], kcar, req_res, wbuf)
+  def proxy_response_finish(kcar, req_res)
+    req_res.proxy_trailers ? proxy_read_trailers(kcar, req_res)
+                           : proxy_read_body([], kcar, req_res)
   end
 
   def proxy_wait_next(qflags)
diff --git a/lib/yahns/req_res.rb b/lib/yahns/req_res.rb
index f585bd9..9bb8f35 100644
--- a/lib/yahns/req_res.rb
+++ b/lib/yahns/req_res.rb
@@ -7,8 +7,7 @@
 require 'kgio'
 
 class Yahns::ReqRes < Kgio::Socket # :nodoc:
-  attr_writer :resbuf
-  attr_writer :paused
+  attr_accessor :resbuf
   attr_accessor :proxy_trailers
   attr_accessor :alive
   attr_reader :proxy_pass
@@ -16,23 +15,11 @@ class Yahns::ReqRes < Kgio::Socket # :nodoc:
   def req_start(c, req, input, chunked, proxy_pass)
     @hdr = @resbuf = nil
     @yahns_client = c
-    @paused = false
     @rrstate = input ? [ req, input, chunked ] : req
     @proxy_pass = proxy_pass
     Thread.current[:yahns_queue].queue_add(self, Yahns::Queue::QEV_WR)
   end
 
-  def close
-    if @paused # called by wbuf_close_common as @body.close
-      @paused = false
-      # we must cleanup and set yahns_client state before queue_mod below:
-      @yahns_client.hijack_cleanup
-      Thread.current[:yahns_queue].queue_mod(self, Yahns::Queue::QEV_RD)
-    else
-      super
-    end
-  end
-
   def yahns_step # yahns event loop entry point
     c = @yahns_client
     case req = @rrstate
@@ -55,7 +42,7 @@ def yahns_step # yahns event loop entry point
           # continue looping in middle "case @resbuf" loop
         when :wait_readable
           return rv # spurious wakeup
-        when nil then return c.proxy_err_response(502, self, nil, nil)
+        when nil then return c.proxy_err_response(502, self, nil)
         end # NOT looping here
 
       when String # continue reading trickled response headers from upstream
@@ -63,15 +50,15 @@ def yahns_step # yahns event loop entry point
         case rv = kgio_tryread(0x2000, buf)
         when String then res = req.headers(@hdr, resbuf << rv) and break
         when :wait_readable then return rv
-        when nil then return c.proxy_err_response(502, self, nil, nil)
+        when nil then return c.proxy_err_response(502, self, nil)
         end while true
+        @resbuf = false
 
         return c.proxy_response_start(res, resbuf, req, self)
 
       when Yahns::WbufCommon # streaming/buffering the response body
 
-        # we assign wbuf for rescue below:
-        return c.proxy_response_finish(req, wbuf = resbuf, self)
+        return c.proxy_response_finish(req, self)
 
       end while true # case @resbuf
 
@@ -87,7 +74,7 @@ def yahns_step # yahns event loop entry point
     when Errno::ECONNREFUSED, Errno::ECONNRESET, Errno::EPIPE
       e.set_backtrace([])
     end
-    c.proxy_err_response(502, self, e, wbuf)
+    c.proxy_err_response(502, self, e)
   end
 
   def send_req_body_chunk(buf)
diff --git a/lib/yahns/wbuf.rb b/lib/yahns/wbuf.rb
index e6c794a..f7b2ffa 100644
--- a/lib/yahns/wbuf.rb
+++ b/lib/yahns/wbuf.rb
@@ -30,7 +30,7 @@
 # to be a scalability issue.
 class Yahns::Wbuf # :nodoc:
   include Yahns::WbufCommon
-  attr_reader :body, :busy, :wbuf_persist
+  attr_reader :busy
 
   def initialize(body, persist, tmpdir, busy)
     @tmpio = nil
diff --git a/lib/yahns/wbuf_lite.rb b/lib/yahns/wbuf_lite.rb
new file mode 100644
index 0000000..5f25b2e
--- /dev/null
+++ b/lib/yahns/wbuf_lite.rb
@@ -0,0 +1,50 @@
+# -*- encoding: binary -*-
+# Copyright (C) 2016 all contributors <yahns-public@yhbt.net>
+# License: GPL-3.0+ <https://www.gnu.org/licenses/gpl-3.0.txt>
+# frozen_string_literal: true
+require_relative 'wbuf_common'
+
+# This is only used for "proxy_buffering: false"
+class Yahns::WbufLite # :nodoc:
+  include Yahns::WbufCommon
+  attr_reader :busy
+
+  def initialize(req_res)
+    @req_res = req_res
+    @busy = false
+    @buf = nil
+  end
+
+  # called only by proxy_write in proxy_http_response
+  def wbuf_write(client, buf)
+    buf = buf.join if Array === buf
+    buf = @buf << buf if @buf # unlikely
+    do_write(client, buf) # try our best to avoid string copying/appending
+  end
+
+  def do_write(client, buf)
+    case rv = client.kgio_trywrite(buf)
+    when String
+      buf = rv # continue looping
+    when :wait_writable, :wait_readable
+      @buf = buf
+      return @busy = rv
+    when nil
+      @buf = nil
+      return @busy = false
+    end while true
+  end
+
+  # called by Yahns::HttpClient#step_write
+  def wbuf_flush(client)
+    sym = do_write(client, @buf) and return sym # :wait_writable/:wait_readable
+
+    # resume reading
+    client.hijack_cleanup
+    Thread.current[:yahns_queue].queue_mod(@req_res, Yahns::Queue::QEV_RD)
+    :ignore
+  rescue
+    @req_res.close
+    raise
+  end
+end
diff --git a/test/test_proxy_pass_no_buffering.rb b/test/test_proxy_pass_no_buffering.rb
index 88b7c80..48b8241 100644
--- a/test/test_proxy_pass_no_buffering.rb
+++ b/test/test_proxy_pass_no_buffering.rb
@@ -79,7 +79,6 @@ def test_proxy_pass_no_buffering
     req = "GET /giant-body HTTP/1.1\r\nHost: example.com\r\n" \
           "Connection: close\r\n\r\n"
     s.write(req)
-    bufs = []
     sleep 1
     10.times do
       sleep 0.1
@@ -92,14 +91,10 @@ def test_proxy_pass_no_buffering
         [ deleted1, deleted2 ].each do |ary|
           ary.delete_if { |x| x =~ /\.(?:err|out) \(deleted\)/ }
         end
-        assert_equal 1, deleted1.size, "pid1=#{deleted1.inspect}"
+        assert_equal 0, deleted1.size, "pid1=#{deleted1.inspect}"
         assert_equal 0, deleted2.size, "pid2=#{deleted2.inspect}"
-        bufs.push(deleted1[0])
       end
     end
-    before = bufs.size
-    bufs.uniq!
-    assert bufs.size < before, 'unlinked buffer should not grow'
     buf = ''.dup
     slow = Digest::MD5.new
     ft = Thread.new do
@@ -108,26 +103,30 @@ def test_proxy_pass_no_buffering
       f.write(req)
       b2 = ''.dup
       check_headers(f)
+      nf = 0
       begin
         f.readpartial(1024 * 1024, b2)
+        nf += b2.bytesize
         fast.update(b2)
       rescue EOFError
         f = f.close
       end while f
       b2.clear
-      fast
+      [ nf, fast.hexdigest ]
     end
     Thread.abort_on_exception = true
     check_headers(s)
+    n = 0
     begin
       s.readpartial(1024 * 1024, buf)
       slow.update(buf)
+      n += buf.bytesize
       sleep 0.01
     rescue EOFError
       s = s.close
     end while s
     ft.join(5)
-    assert_equal slow.hexdigest, ft.value.hexdigest
+    assert_equal [n, slow.hexdigest ], ft.value
 
     fast = Digest::MD5.new
     f = TCPSocket.new(host, port)

^ permalink raw reply related	[flat|nested] 3+ messages in thread

* [PATCH 3/3] wbuf: remove needless "busy" parameter
  2016-06-05 22:50 [PATCH 2/3] proxy_pass: redo "proxy_buffering: false" Eric Wong
@ 2016-06-05 22:50 ` Eric Wong
  2016-06-05 22:53 ` [PATCH 1/3] req_res: store proxy_pass object here, instead Eric Wong
  1 sibling, 0 replies; 3+ messages in thread
From: Eric Wong @ 2016-06-05 22:50 UTC (permalink / raw)
  To: yahns-public

@busy will be reset on wbuf_write anyways, since there is no
initial data and we will always attempt to write to the socket
aggressively.
---
 lib/yahns/http_response.rb       | 8 ++++----
 lib/yahns/proxy_http_response.rb | 8 ++++----
 lib/yahns/wbuf.rb                | 4 ++--
 test/test_wbuf.rb                | 8 ++++----
 4 files changed, 14 insertions(+), 14 deletions(-)

diff --git a/lib/yahns/http_response.rb b/lib/yahns/http_response.rb
index 4b36db2..531194f 100644
--- a/lib/yahns/http_response.rb
+++ b/lib/yahns/http_response.rb
@@ -57,12 +57,12 @@ def err_response(code)
     "#{response_start}#{code} #{Rack::Utils::HTTP_STATUS_CODES[code]}\r\n\r\n"
   end
 
-  def response_header_blocked(ret, header, body, alive, offset, count)
+  def response_header_blocked(header, body, alive, offset, count)
     if body.respond_to?(:to_path)
       alive = Yahns::StreamFile.new(body, alive, offset, count)
       body = nil
     end
-    wbuf = Yahns::Wbuf.new(body, alive, self.class.output_buffer_tmpdir, ret)
+    wbuf = Yahns::Wbuf.new(body, alive, self.class.output_buffer_tmpdir)
     rv = wbuf.wbuf_write(self, header)
     if body && ! alive.respond_to?(:call) # skip body.each if hijacked
       body.each { |chunk| rv = wbuf.wbuf_write(self, chunk) }
@@ -171,7 +171,7 @@ def http_response_write(status, headers, body)
       when :wait_writable, :wait_readable # unlikely
         if k.output_buffering
           alive = hijack ? hijack : alive
-          rv = response_header_blocked(rv, buf, body, alive, offset, count)
+          rv = response_header_blocked(buf, body, alive, offset, count)
           body = nil # ensure we do not close body in ensure
           return rv
         else
@@ -199,7 +199,7 @@ def http_response_write(status, headers, body)
           chunk = rv # hope the skb grows when we loop into the trywrite
         when :wait_writable, :wait_readable
           if k.output_buffering
-            wbuf = Yahns::Wbuf.new(body, alive, k.output_buffer_tmpdir, rv)
+            wbuf = Yahns::Wbuf.new(body, alive, k.output_buffer_tmpdir)
             rv = wbuf.wbuf_write(self, chunk)
             break
           else
diff --git a/lib/yahns/proxy_http_response.rb b/lib/yahns/proxy_http_response.rb
index ea176d9..61f1539 100644
--- a/lib/yahns/proxy_http_response.rb
+++ b/lib/yahns/proxy_http_response.rb
@@ -19,9 +19,9 @@ def proxy_unbuffer(wbuf)
     :ignore
   end
 
-  def wbuf_alloc(req_res, busy)
+  def wbuf_alloc(req_res)
     if req_res.proxy_pass.proxy_buffering
-      Yahns::Wbuf.new(nil, req_res.alive, self.class.output_buffer_tmpdir, busy)
+      Yahns::Wbuf.new(nil, req_res.alive, self.class.output_buffer_tmpdir)
     else
       Yahns::WbufLite.new(req_res)
     end
@@ -37,7 +37,7 @@ def proxy_write(wbuf, buf, req_res)
       when String, Array # partial write, hope the skb grows
         buf = rv
       when :wait_writable, :wait_readable
-        wbuf = req_res.resbuf ||= wbuf_alloc(req_res, rv)
+        wbuf = req_res.resbuf ||= wbuf_alloc(req_res)
         break
       end while true
     end
@@ -75,7 +75,7 @@ def proxy_err_response(code, req_res, exc)
   end
 
   def wait_on_upstream(req_res)
-    req_res.resbuf ||= wbuf_alloc(req_res, false)
+    req_res.resbuf ||= wbuf_alloc(req_res)
     :wait_readable # self remains in :ignore, wait on upstream
   end
 
diff --git a/lib/yahns/wbuf.rb b/lib/yahns/wbuf.rb
index f7b2ffa..1010c86 100644
--- a/lib/yahns/wbuf.rb
+++ b/lib/yahns/wbuf.rb
@@ -32,13 +32,13 @@ class Yahns::Wbuf # :nodoc:
   include Yahns::WbufCommon
   attr_reader :busy
 
-  def initialize(body, persist, tmpdir, busy)
+  def initialize(body, persist, tmpdir)
     @tmpio = nil
     @tmpdir = tmpdir
     @sf_offset = @sf_count = 0
     @wbuf_persist = persist # whether or not we keep the connection alive
     @body = body # something we call #close on when done writing
-    @busy = busy # may be false
+    @busy = false
   end
 
   def wbuf_writev(buf)
diff --git a/test/test_wbuf.rb b/test/test_wbuf.rb
index 1c8c0d0..990ad9d 100644
--- a/test/test_wbuf.rb
+++ b/test/test_wbuf.rb
@@ -20,8 +20,8 @@ def test_wbuf
     buf = "*" * (16384 * 2)
     nr = 1000
     [ true, false ].each do |persist|
-      wbuf = Yahns::Wbuf.new([], persist, Dir.tmpdir, :wait_writable)
-      assert_equal :wait_writable, wbuf.busy
+      wbuf = Yahns::Wbuf.new([], persist, Dir.tmpdir)
+      assert_equal false, wbuf.busy
       a, b = socketpair
       assert_nil wbuf.wbuf_write(a, "HIHI")
       assert_equal "HIHI", b.read(4)
@@ -71,7 +71,7 @@ def test_wbuf_blocked
         break
       end while true
     end
-    wbuf = Yahns::Wbuf.new([], true, Dir.tmpdir, :wait_writable)
+    wbuf = Yahns::Wbuf.new([], true, Dir.tmpdir)
 
     rv1 = wbuf.wbuf_write(a, buf)
     rv2 = wbuf.wbuf_flush(a)
@@ -104,7 +104,7 @@ def test_wbuf_blocked
   def test_wbuf_flush_close
     pipe = cloexec_pipe
     persist = true
-    wbuf = Yahns::Wbuf.new(pipe[0], persist, Dir.tmpdir, :wait_writable)
+    wbuf = Yahns::Wbuf.new(pipe[0], persist, Dir.tmpdir)
     refute wbuf.respond_to?(:close) # we don't want this for HttpResponse body
     sp = socketpair
     rv = nil

^ permalink raw reply related	[flat|nested] 3+ messages in thread

* [PATCH 1/3] req_res: store proxy_pass object here, instead
  2016-06-05 22:50 [PATCH 2/3] proxy_pass: redo "proxy_buffering: false" Eric Wong
  2016-06-05 22:50 ` [PATCH 3/3] wbuf: remove needless "busy" parameter Eric Wong
@ 2016-06-05 22:53 ` Eric Wong
  1 sibling, 0 replies; 3+ messages in thread
From: Eric Wong @ 2016-06-05 22:53 UTC (permalink / raw)
  To: yahns-public

We cannot rely on env being available after proxy_wait_next
---
 Oops, fat-fingered sending the mail :x

 Correct patch order should be:
        req_res: store proxy_pass object here, instead
        proxy_pass: redo "proxy_buffering: false"
        wbuf: remove needless "busy" parameter


 lib/yahns/proxy_http_response.rb | 4 ++--
 lib/yahns/proxy_pass.rb          | 3 +--
 lib/yahns/req_res.rb             | 4 +++-
 3 files changed, 6 insertions(+), 5 deletions(-)

diff --git a/lib/yahns/proxy_http_response.rb b/lib/yahns/proxy_http_response.rb
index 2968062..765fe14 100644
--- a/lib/yahns/proxy_http_response.rb
+++ b/lib/yahns/proxy_http_response.rb
@@ -27,7 +27,7 @@ def proxy_write(wbuf, buf, req_res)
       when String, Array # partial write, hope the skb grows
         buf = rv
       when :wait_writable, :wait_readable
-        if @hs.env['yahns.proxy_pass'].proxy_buffering
+        if req_res.proxy_pass.proxy_buffering
           body = nil
           alive = req_res.alive
         else
@@ -88,7 +88,7 @@ def proxy_res_headers(res, req_res)
     flags = MSG_DONTWAIT
     alive = @hs.next? && self.class.persistent_connections
     term = false
-    response_headers = env['yahns.proxy_pass'].response_headers
+    response_headers = req_res.proxy_pass.response_headers
 
     res = "HTTP/1.1 #{msg ? %Q(#{code} #{msg}) : status}\r\n".dup
     headers.each do |key,value| # n.b.: headers is an Array of 2-element Arrays
diff --git a/lib/yahns/proxy_pass.rb b/lib/yahns/proxy_pass.rb
index ed37da5..fcd0cf7 100644
--- a/lib/yahns/proxy_pass.rb
+++ b/lib/yahns/proxy_pass.rb
@@ -89,10 +89,9 @@ def call(env)
     ctype = env["CONTENT_TYPE"] and req << "Content-Type: #{ctype}\r\n"
     clen = env["CONTENT_LENGTH"] and req << "Content-Length: #{clen}\r\n"
     input = chunked || (clen && clen.to_i > 0) ? env['rack.input'] : nil
-    env['yahns.proxy_pass'] = self
 
     # finally, prepare to emit the headers
-    rr.req_start(c, req << "\r\n".freeze, input, chunked)
+    rr.req_start(c, req << "\r\n".freeze, input, chunked, self)
 
     # this probably breaks fewer middlewares than returning whatever else...
     [ 500, [], [] ]
diff --git a/lib/yahns/req_res.rb b/lib/yahns/req_res.rb
index dd4ec87..f585bd9 100644
--- a/lib/yahns/req_res.rb
+++ b/lib/yahns/req_res.rb
@@ -11,12 +11,14 @@ class Yahns::ReqRes < Kgio::Socket # :nodoc:
   attr_writer :paused
   attr_accessor :proxy_trailers
   attr_accessor :alive
+  attr_reader :proxy_pass
 
-  def req_start(c, req, input, chunked)
+  def req_start(c, req, input, chunked, proxy_pass)
     @hdr = @resbuf = nil
     @yahns_client = c
     @paused = false
     @rrstate = input ? [ req, input, chunked ] : req
+    @proxy_pass = proxy_pass
     Thread.current[:yahns_queue].queue_add(self, Yahns::Queue::QEV_WR)
   end
 

^ permalink raw reply related	[flat|nested] 3+ messages in thread

end of thread, other threads:[~2016-06-05 22:53 UTC | newest]

Thread overview: 3+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2016-06-05 22:50 [PATCH 2/3] proxy_pass: redo "proxy_buffering: false" Eric Wong
2016-06-05 22:50 ` [PATCH 3/3] wbuf: remove needless "busy" parameter Eric Wong
2016-06-05 22:53 ` [PATCH 1/3] req_res: store proxy_pass object here, instead Eric Wong

Code repositories for project(s) associated with this public inbox

	https://yhbt.net/yahns.git/

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).