yahns Ruby server user/dev discussion
 help / color / Atom feed
* [PATCH 0/7] proxy_pass cleanups
@ 2016-05-16  1:43 Eric Wong
  2016-05-16  1:43 ` [PATCH 1/7] proxy_pass: simplify writing request bodies upstream Eric Wong
                   ` (7 more replies)
  0 siblings, 8 replies; 9+ messages in thread
From: Eric Wong @ 2016-05-16  1:43 UTC (permalink / raw)
  To: yahns-public

A bunch of cleanups to hopefully make the proxy_pass-related
code a little saner and easier-to-follow.

I introduced at least one bug during this series which got fixed
in 7/7.

Extra sets of eyes to review would be greatly appreciated, thanks!

And this is running live and serving critical information to
readers of https://yhbt.net/ in all its glory!

Once I'm reasonably satisfied with this; I'll continue work
on making "proxy_buffering: false" work, so slow-client-capable
upstreams can generate gigantic (hundreds of megabytes!)
responses without filesystem overhead.

But first, I think I should work on making those gigantic
responses cheaper in terms of memory/CPU usage outside of
yahns.  This is git-http-backend for serving mega repos
over smart HTTP, yahns is already great for dumb HTTP
git clones.

 lib/yahns/proxy_http_response.rb | 236 +++++++++++++++------------------------
 lib/yahns/proxy_pass.rb          | 171 +---------------------------
 lib/yahns/req_res.rb             | 159 ++++++++++++++++++++++++++
 3 files changed, 252 insertions(+), 314 deletions(-)

Eric Wong (7):
      proxy_pass: simplify writing request bodies upstream
      proxy_pass: hoist out proxy_res_headers method
      proxy_pass: simplify proxy_http_response
      proxy_pass: split out body and trailer reading in response
      proxy_pass: trim down proxy_response_finish, too
      proxy_pass: split out req_res into a separate file
      proxy_pass: fix resumes after complete buffering is unblocked

^ permalink raw reply	[flat|nested] 9+ messages in thread

* [PATCH 1/7] proxy_pass: simplify writing request bodies upstream
  2016-05-16  1:43 [PATCH 0/7] proxy_pass cleanups Eric Wong
@ 2016-05-16  1:43 ` Eric Wong
  2016-05-16  1:43 ` [PATCH 2/7] proxy_pass: hoist out proxy_res_headers method Eric Wong
                   ` (6 subsequent siblings)
  7 siblings, 0 replies; 9+ messages in thread
From: Eric Wong @ 2016-05-16  1:43 UTC (permalink / raw)
  To: yahns-public

The cost of extra branches inside a loop is negligible compared
to the cost of all the other method calls we make.  Favor
smaller code instead and inline some (now) single-use methods.

Furthermore, this allows us to reuse the request header buffer
instead of relying on thread-local storage and potentially
having to to swap buffers.
---
 lib/yahns/proxy_pass.rb | 112 +++++++++++++++++++++---------------------------
 1 file changed, 49 insertions(+), 63 deletions(-)

diff --git a/lib/yahns/proxy_pass.rb b/lib/yahns/proxy_pass.rb
index 148957b..a2d7d81 100644
--- a/lib/yahns/proxy_pass.rb
+++ b/lib/yahns/proxy_pass.rb
@@ -22,12 +22,6 @@ def req_start(c, req, input, chunked)
       Thread.current[:yahns_queue].queue_add(self, Yahns::Queue::QEV_WR)
     end
 
-    # we must reinitialize the thread-local rbuf if it may get beyond the
-    # current thread
-    def detach_rbuf!
-      Thread.current[:yahns_rbuf] = ''.dup
-    end
-
     def yahns_step # yahns event loop entry point
       c = @yahns_client
       case req = @rrstate
@@ -42,7 +36,9 @@ def yahns_step # yahns event loop entry point
             if res = req.headers(@hdr = [], rv)
               return c.proxy_response_start(res, rv, req, self)
             else # ugh, big headers or tricked response
-              buf = detach_rbuf!
+              # we must reinitialize the thread-local rbuf if it may
+              # live beyond the current thread
+              buf = Thread.current[:yahns_rbuf] = ''.dup
               @resbuf = rv
             end
             # continue looping in middle "case @resbuf" loop
@@ -83,64 +79,63 @@ def yahns_step # yahns event loop entry point
       c.proxy_err_response(502, self, e, wbuf)
     end
 
-    # returns :wait_readable if complete, :wait_writable if not
-    def send_req_body(req)
-      buf, input, chunked = req
-
-      # get the first buffered chunk or vector
+    def send_req_body_chunk(buf)
       case rv = String === buf ? kgio_trywrite(buf) : kgio_trywritev(buf)
       when String, Array
-        buf = rv # retry inner loop
-      when :wait_writable
-        req[0] = buf
-        return :wait_writable
-      when nil
-        break # onto writing body
+        buf.replace(rv) # retry loop on partial write
+      when :wait_writable, nil
+        # :wait_writable = upstream is reading slowly and making us wait
+        return rv
+      else
+        abort "BUG: #{rv.inspect} from kgio_trywrite*"
       end while true
+    end
 
-      buf = Thread.current[:yahns_rbuf]
+    # returns :wait_readable if complete, :wait_writable if not
+    def send_req_body(req) # @rrstate == [ (str|vec), rack.input, chunked? ]
+      buf, input, chunked = req
 
-      # Note: input (env['rack.input']) is fully-buffered by default so
-      # we should not be waiting on a slow network resource when reading
-      # input.  However, some weird configs may disable this on LANs
+      # send the first buffered chunk or vector
+      rv = send_req_body_chunk(buf) and return rv # :wait_writable
 
+      # yay, sent the first chunk, now read the body!
+      rbuf = buf
       if chunked
-        while input.read(0x2000, buf)
-          vec = [ "#{buf.size.to_s(16)}\r\n", buf, "\r\n".freeze ]
-          case rv = kgio_trywritev(vec)
-          when Array
-            vec = rv # partial write, retry in case loop
-          when :wait_writable
-            detach_rbuf!
-            req[0] = vec
-            return :wait_writable
-          when nil
-            break # continue onto reading next chunk
-          end while true
+        if String === buf # initial body
+          req[0] = buf = []
+        else
+          # try to reuse the biggest non-frozen buffer we just wrote;
+          rbuf = buf.max_by(&:size)
+          rbuf = ''.dup if rbuf.frozen? # unlikely...
         end
-        close_req_body(input)
-
-        # note: we do not send any trailer, they are folded into the header
-        # because this relies on full request buffering
-        send_req_buf("0\r\n\r\n".freeze)
-        # prepare_wait_readable already called by send_req_buf
-      else # identity request, easy:
-        while input.read(0x2000, buf)
-          case rv = kgio_trywrite(buf)
-          when String
-            buf = rv # partial write, retry in case loop
-          when :wait_writable
-            detach_rbuf!
-            req[0] = buf
-            return :wait_writable
-          when nil
-            break # continue onto reading next block
-          end while true
+      end
+
+      # Note: input (env['rack.input']) is fully-buffered by default so
+      # we should not be waiting on a slow network resource when reading
+      # input.  However, some weird configs may disable this on LANs
+      # and we may wait indefinitely on input.read here...
+      while input.read(0x2000, rbuf)
+        if chunked
+          buf[0] = "#{rbuf.size.to_s(16)}\r\n".freeze
+          buf[1] = rbuf
+          buf[2] = "\r\n".freeze
         end
+        rv = send_req_body_chunk(buf) and return rv # :wait_writable
+      end
+
+      rbuf.clear # all done, clear the big buffer
 
-        close_req_body(input)
-        prepare_wait_readable
+      # we cannot use respond_to?(:close) here since Rack::Lint::InputWrapper
+      # tries to prevent that (and hijack means all Rack specs go out the door)
+      case input
+      when Yahns::TeeInput, IO
+        input.close
       end
+
+      # note: we do not send any trailer, they are folded into the header
+      # because this relies on full request buffering
+      # prepare_wait_readable is called by send_req_buf
+      chunked ? send_req_buf("0\r\n\r\n".freeze) : prepare_wait_readable
     rescue Errno::EPIPE, Errno::ECONNRESET, Errno::ENOTCONN
       # no more reading off the client socket, just prepare to forward
       # the rejection response from the upstream (if any)
@@ -153,15 +148,6 @@ def prepare_wait_readable
       :wait_readable # all done sending the request, wait for response
     end
 
-    def close_req_body(input)
-      # we cannot use respond_to?(:close) here since Rack::Lint::InputWrapper
-      # tries to prevent that (and hijack means all Rack specs go out the door)
-      case input
-      when Yahns::TeeInput, IO
-        input.close
-      end
-    end
-
     # n.b. buf must be a detached string not shared with
     # Thread.current[:yahns_rbuf] of any thread
     def send_req_buf(buf)

^ permalink raw reply	[flat|nested] 9+ messages in thread

* [PATCH 2/7] proxy_pass: hoist out proxy_res_headers method
  2016-05-16  1:43 [PATCH 0/7] proxy_pass cleanups Eric Wong
  2016-05-16  1:43 ` [PATCH 1/7] proxy_pass: simplify writing request bodies upstream Eric Wong
@ 2016-05-16  1:43 ` Eric Wong
  2016-05-16  1:43 ` [PATCH 3/7] proxy_pass: simplify proxy_http_response Eric Wong
                   ` (5 subsequent siblings)
  7 siblings, 0 replies; 9+ messages in thread
From: Eric Wong @ 2016-05-16  1:43 UTC (permalink / raw)
  To: yahns-public

proxy_response_start is gigantic an hard-to-read, we can
more clearly see the lifetimes of some objects, now, and
hopefully shorten some of them.
---
 lib/yahns/proxy_http_response.rb | 14 +++++++++-----
 1 file changed, 9 insertions(+), 5 deletions(-)

diff --git a/lib/yahns/proxy_http_response.rb b/lib/yahns/proxy_http_response.rb
index bd274fe..693528f 100644
--- a/lib/yahns/proxy_http_response.rb
+++ b/lib/yahns/proxy_http_response.rb
@@ -61,11 +61,7 @@ def wait_on_upstream(req_res, alive, wbuf)
     :wait_readable # self remains in :ignore, wait on upstream
   end
 
-  # start streaming the response once upstream is done sending headers to us.
-  # returns :wait_readable if we need to read more from req_res
-  # returns :ignore if we yield control to the client(self)
-  # returns nil if completely done
-  def proxy_response_start(res, tip, kcar, req_res)
+  def proxy_res_headers(res)
     status, headers = res
     code = status.to_i
     msg = Rack::Utils::HTTP_STATUS_CODES[code]
@@ -125,7 +121,15 @@ def proxy_response_start(res, tip, kcar, req_res)
       wbuf = proxy_write(nil, res, alive)
       break # keep buffering as much as possible
     end while true
+    [ alive, wbuf, have_body ]
+  end
 
+  # start streaming the response once upstream is done sending headers to us.
+  # returns :wait_readable if we need to read more from req_res
+  # returns :ignore if we yield control to the client(self)
+  # returns nil if completely done
+  def proxy_response_start(res, tip, kcar, req_res)
+    alive, wbuf, have_body = proxy_res_headers(res)
     rbuf = Thread.current[:yahns_rbuf]
     tip = tip.empty? ? [] : [ tip ]
 

^ permalink raw reply	[flat|nested] 9+ messages in thread

* [PATCH 3/7] proxy_pass: simplify proxy_http_response
  2016-05-16  1:43 [PATCH 0/7] proxy_pass cleanups Eric Wong
  2016-05-16  1:43 ` [PATCH 1/7] proxy_pass: simplify writing request bodies upstream Eric Wong
  2016-05-16  1:43 ` [PATCH 2/7] proxy_pass: hoist out proxy_res_headers method Eric Wong
@ 2016-05-16  1:43 ` Eric Wong
  2016-05-16  1:43 ` [PATCH 4/7] proxy_pass: split out body and trailer reading in response Eric Wong
                   ` (4 subsequent siblings)
  7 siblings, 0 replies; 9+ messages in thread
From: Eric Wong @ 2016-05-16  1:43 UTC (permalink / raw)
  To: yahns-public

Again, the cost of having extra branches within a loop is
probably neglible compared to having bigger bytecode resulting
in worse CPU cache performance and increased maintenance
overhead for extra code.
---
 lib/yahns/proxy_http_response.rb | 76 ++++++++++++++++++----------------------
 1 file changed, 34 insertions(+), 42 deletions(-)

diff --git a/lib/yahns/proxy_http_response.rb b/lib/yahns/proxy_http_response.rb
index 693528f..00915df 100644
--- a/lib/yahns/proxy_http_response.rb
+++ b/lib/yahns/proxy_http_response.rb
@@ -134,38 +134,45 @@ def proxy_response_start(res, tip, kcar, req_res)
     tip = tip.empty? ? [] : [ tip ]
 
     if have_body
-      if len = kcar.body_bytes_left
+      req_res.proxy_trailers = nil # define to avoid uninitialized warnings
+      chunk = ''.dup if kcar.chunked?
+      tlr = nil
+      len = kcar.body_bytes_left
 
-        case tmp = tip.shift || req_res.kgio_tryread(0x2000, rbuf)
-        when String
-          len = kcar.body_bytes_left -= tmp.size
-          wbuf = proxy_write(wbuf, tmp, alive)
-        when nil # premature EOF
-          return proxy_err_response(nil, req_res, nil, wbuf)
-        when :wait_readable
-          return wait_on_upstream(req_res, alive, wbuf)
-        end until len == 0
-
-      elsif kcar.chunked? # nasty chunked body
-        req_res.proxy_trailers = nil # define to avoid warnings for now
-        buf = ''.dup
-        case tmp = tip.shift || req_res.kgio_tryread(0x2000, rbuf)
-        when String
-          kcar.filter_body(buf, tmp)
-          wbuf = proxy_write(wbuf, chunk_out(buf), alive) unless buf.empty?
-        when nil # premature EOF
-          return proxy_err_response(nil, req_res, nil, wbuf)
-        when :wait_readable
-          return wait_on_upstream(req_res, alive, wbuf)
-        end until kcar.body_eof?
+      case tmp = tip.shift || req_res.kgio_tryread(0x2000, rbuf)
+      when String
+        if len
+          kcar.body_bytes_left -= tmp.size # progress for body_eof? => true
+        elsif chunk
+          kcar.filter_body(chunk, rbuf = tmp) # progress for body_eof? => true
+          next if chunk.empty? # read req_res.kgio_tryread for more
+          tmp = chunk_out(chunk)
+        elsif alive # HTTP/1.0 upstream, HTTP/1.1 client
+          tmp = chunk_out(tmp)
+        # else # HTTP/1.0 upstream, HTTP/1.0 client, do nothing
+        end
+        wbuf = proxy_write(wbuf, tmp, alive)
+        chunk.clear if chunk
+      when nil # EOF
+        # HTTP/1.1 upstream, unexpected premature EOF:
+        return proxy_err_response(nil, req_res, nil, wbuf) if len || chunk
+
+        # HTTP/1.0 upstream:
+        wbuf = proxy_write(wbuf, "0\r\n\r\n".freeze, true) if alive
+        req_res.shutdown
+        break
+      when :wait_readable
+        return wait_on_upstream(req_res, alive, wbuf)
+      end until kcar.body_eof?
 
-        buf = tmp
-        req_res.proxy_trailers = [ buf, tlr = [] ]
+      if chunk
+        chunk = rbuf
+        req_res.proxy_trailers = [ chunk, tlr = [] ]
         rbuf = Thread.current[:yahns_rbuf] = ''.dup
-        until kcar.trailers(tlr, buf)
+        until kcar.trailers(tlr, chunk)
           case rv = req_res.kgio_tryread(0x2000, rbuf)
           when String
-            buf << rv
+            chunk << rv
           when :wait_readable
             return wait_on_upstream(req_res, alive, wbuf)
           when nil # premature EOF
@@ -173,21 +180,6 @@ def proxy_response_start(res, tip, kcar, req_res)
           end # no loop here
         end
         wbuf = proxy_write(wbuf, trailer_out(tlr), alive)
-
-      else # no Content-Length or Transfer-Encoding: chunked, wait on EOF!
-
-        case tmp = tip.shift || req_res.kgio_tryread(0x2000, rbuf)
-        when String
-          tmp = chunk_out(tmp) if alive
-          wbuf = proxy_write(wbuf, tmp, alive)
-        when nil
-          wbuf = proxy_write(wbuf, "0\r\n\r\n".freeze, true) if alive
-          req_res.shutdown
-          break
-        when :wait_readable
-          return wait_on_upstream(req_res, alive, wbuf)
-        end while true
-
       end
     end
 

^ permalink raw reply	[flat|nested] 9+ messages in thread

* [PATCH 4/7] proxy_pass: split out body and trailer reading in response
  2016-05-16  1:43 [PATCH 0/7] proxy_pass cleanups Eric Wong
                   ` (2 preceding siblings ...)
  2016-05-16  1:43 ` [PATCH 3/7] proxy_pass: simplify proxy_http_response Eric Wong
@ 2016-05-16  1:43 ` Eric Wong
  2016-05-16  1:43 ` [PATCH 5/7] proxy_pass: trim down proxy_response_finish, too Eric Wong
                   ` (3 subsequent siblings)
  7 siblings, 0 replies; 9+ messages in thread
From: Eric Wong @ 2016-05-16  1:43 UTC (permalink / raw)
  To: yahns-public

Hopefully this increases readability as well and allows
us to make easier adjustments to support new features in
the future.
---
 lib/yahns/proxy_http_response.rb | 105 +++++++++++++++++++++------------------
 1 file changed, 58 insertions(+), 47 deletions(-)

diff --git a/lib/yahns/proxy_http_response.rb b/lib/yahns/proxy_http_response.rb
index 00915df..52a8aff 100644
--- a/lib/yahns/proxy_http_response.rb
+++ b/lib/yahns/proxy_http_response.rb
@@ -124,63 +124,74 @@ def proxy_res_headers(res)
     [ alive, wbuf, have_body ]
   end
 
+  def proxy_read_body(tip, kcar, req_res, alive, wbuf)
+    chunk = ''.dup if kcar.chunked?
+    len = kcar.body_bytes_left
+    rbuf = Thread.current[:yahns_rbuf]
+
+    case tmp = tip.shift || req_res.kgio_tryread(0x2000, rbuf)
+    when String
+      if len
+        kcar.body_bytes_left -= tmp.size # progress for body_eof? => true
+      elsif chunk
+        kcar.filter_body(chunk, rbuf = tmp) # progress for body_eof? => true
+        next if chunk.empty? # call req_res.kgio_tryread for more
+        tmp = chunk_out(chunk)
+      elsif alive # HTTP/1.0 upstream, HTTP/1.1 client
+        tmp = chunk_out(tmp)
+      # else # HTTP/1.0 upstream, HTTP/1.0 client, do nothing
+      end
+      wbuf = proxy_write(wbuf, tmp, alive)
+      chunk.clear if chunk
+    when nil # EOF
+      # HTTP/1.1 upstream, unexpected premature EOF:
+      return proxy_err_response(nil, req_res, nil, wbuf) if len || chunk
+
+      # HTTP/1.0 upstream:
+      wbuf = proxy_write(wbuf, "0\r\n\r\n".freeze, true) if alive
+      req_res.shutdown
+      break
+    when :wait_readable
+      return wait_on_upstream(req_res, alive, wbuf)
+    end until kcar.body_eof?
+
+    if chunk
+      # tip is an empty array and becomes trailer storage
+      req_res.proxy_trailers = [ rbuf.dup, tip ]
+      return proxy_read_trailers(kcar, req_res, alive, wbuf)
+    end
+    wbuf ? proxy_busy_mod_blocked(wbuf, wbuf.busy) : proxy_busy_mod_done(alive)
+  end
+
+  def proxy_read_trailers(kcar, req_res, alive, wbuf)
+    chunk, tlr = req_res.proxy_trailers
+    rbuf = Thread.current[:yahns_rbuf]
+
+    until kcar.trailers(tlr, chunk)
+      case rv = req_res.kgio_tryread(0x2000, rbuf)
+      when String
+        chunk << rv
+      when :wait_readable
+        return wait_on_upstream(req_res, alive, wbuf)
+      when nil # premature EOF
+        return proxy_err_response(nil, req_res, nil, wbuf)
+      end # no loop here
+    end
+    wbuf = proxy_write(wbuf, trailer_out(tlr), alive)
+    wbuf ? proxy_busy_mod_blocked(wbuf, wbuf.busy) : proxy_busy_mod_done(alive)
+  end
+
   # start streaming the response once upstream is done sending headers to us.
   # returns :wait_readable if we need to read more from req_res
   # returns :ignore if we yield control to the client(self)
   # returns nil if completely done
   def proxy_response_start(res, tip, kcar, req_res)
     alive, wbuf, have_body = proxy_res_headers(res)
-    rbuf = Thread.current[:yahns_rbuf]
     tip = tip.empty? ? [] : [ tip ]
 
     if have_body
       req_res.proxy_trailers = nil # define to avoid uninitialized warnings
-      chunk = ''.dup if kcar.chunked?
-      tlr = nil
-      len = kcar.body_bytes_left
-
-      case tmp = tip.shift || req_res.kgio_tryread(0x2000, rbuf)
-      when String
-        if len
-          kcar.body_bytes_left -= tmp.size # progress for body_eof? => true
-        elsif chunk
-          kcar.filter_body(chunk, rbuf = tmp) # progress for body_eof? => true
-          next if chunk.empty? # read req_res.kgio_tryread for more
-          tmp = chunk_out(chunk)
-        elsif alive # HTTP/1.0 upstream, HTTP/1.1 client
-          tmp = chunk_out(tmp)
-        # else # HTTP/1.0 upstream, HTTP/1.0 client, do nothing
-        end
-        wbuf = proxy_write(wbuf, tmp, alive)
-        chunk.clear if chunk
-      when nil # EOF
-        # HTTP/1.1 upstream, unexpected premature EOF:
-        return proxy_err_response(nil, req_res, nil, wbuf) if len || chunk
-
-        # HTTP/1.0 upstream:
-        wbuf = proxy_write(wbuf, "0\r\n\r\n".freeze, true) if alive
-        req_res.shutdown
-        break
-      when :wait_readable
-        return wait_on_upstream(req_res, alive, wbuf)
-      end until kcar.body_eof?
-
-      if chunk
-        chunk = rbuf
-        req_res.proxy_trailers = [ chunk, tlr = [] ]
-        rbuf = Thread.current[:yahns_rbuf] = ''.dup
-        until kcar.trailers(tlr, chunk)
-          case rv = req_res.kgio_tryread(0x2000, rbuf)
-          when String
-            chunk << rv
-          when :wait_readable
-            return wait_on_upstream(req_res, alive, wbuf)
-          when nil # premature EOF
-            return proxy_err_response(nil, req_res, nil, wbuf)
-          end # no loop here
-        end
-        wbuf = proxy_write(wbuf, trailer_out(tlr), alive)
-      end
+      return proxy_read_body(tip, kcar, req_res, alive, wbuf)
     end
 
     # all done reading response from upstream, req_res will be discarded

^ permalink raw reply	[flat|nested] 9+ messages in thread

* [PATCH 5/7] proxy_pass: trim down proxy_response_finish, too
  2016-05-16  1:43 [PATCH 0/7] proxy_pass cleanups Eric Wong
                   ` (3 preceding siblings ...)
  2016-05-16  1:43 ` [PATCH 4/7] proxy_pass: split out body and trailer reading in response Eric Wong
@ 2016-05-16  1:43 ` Eric Wong
  2016-05-16  1:43 ` [PATCH 6/7] proxy_pass: split out req_res into a separate file Eric Wong
                   ` (2 subsequent siblings)
  7 siblings, 0 replies; 9+ messages in thread
From: Eric Wong @ 2016-05-16  1:43 UTC (permalink / raw)
  To: yahns-public

After the previous commits, we've added more branches
and made the existing response handling more generic;
so we can remove some duplicated logic and increase
review-ability.
---
 lib/yahns/proxy_http_response.rb | 66 ++--------------------------------------
 1 file changed, 3 insertions(+), 63 deletions(-)

diff --git a/lib/yahns/proxy_http_response.rb b/lib/yahns/proxy_http_response.rb
index 52a8aff..65fd03b 100644
--- a/lib/yahns/proxy_http_response.rb
+++ b/lib/yahns/proxy_http_response.rb
@@ -202,69 +202,9 @@ def proxy_response_start(res, tip, kcar, req_res)
   end
 
   def proxy_response_finish(kcar, wbuf, req_res)
-    rbuf = Thread.current[:yahns_rbuf]
-    if len = kcar.body_bytes_left # known Content-Length
-
-      case tmp = req_res.kgio_tryread(0x2000, rbuf)
-      when String
-        len = kcar.body_bytes_left -= tmp.size
-        wbuf.wbuf_write(self, tmp)
-      when nil # premature EOF
-        return proxy_err_response(nil, req_res, nil, wbuf)
-      when :wait_readable
-        return :wait_readable # self remains in :ignore, wait on upstream
-      end while len != 0
-
-    elsif kcar.chunked? # nasty chunked response body
-      buf = ''.dup
-
-      unless req_res.proxy_trailers
-        # are we done dechunking the main body, yet?
-        case tmp = req_res.kgio_tryread(0x2000, rbuf)
-        when String
-          kcar.filter_body(buf, tmp)
-          buf.empty? or wbuf.wbuf_write(self, chunk_out(buf))
-        when nil # premature EOF
-          return proxy_err_response(nil, req_res, nil, wbuf)
-        when :wait_readable
-          return :wait_readable # self remains in :ignore, wait on upstream
-        end until kcar.body_eof?
-        req_res.proxy_trailers = [ tmp, [] ] # onto trailers!
-        rbuf = Thread.current[:yahns_rbuf] = ''.dup
-      end
-
-      buf, tlr = *req_res.proxy_trailers
-      until kcar.trailers(tlr, buf)
-        case rv = req_res.kgio_tryread(0x2000, rbuf)
-        when String
-          buf << rv
-        when :wait_readable
-          return :wait_readable
-        when nil # premature EOF
-          return proxy_err_response(nil, req_res, nil, wbuf)
-        end # no loop here
-      end
-      wbuf.wbuf_write(self, trailer_out(tlr))
-
-    else # no Content-Length or Transfer-Encoding: chunked, wait on EOF!
-
-      alive = wbuf.wbuf_persist
-      case tmp = req_res.kgio_tryread(0x2000, rbuf)
-      when String
-        tmp = chunk_out(tmp) if alive
-        wbuf.wbuf_write(self, tmp)
-      when nil
-        wbuf.wbuf_write(self, "0\r\n\r\n".freeze) if alive
-        req_res.shutdown
-        break
-      when :wait_readable
-        return :wait_readable # self remains in :ignore, wait on upstream
-      end while true
-
-    end
-
-    busy = wbuf.busy and return proxy_busy_mod_blocked(wbuf, busy)
-    proxy_busy_mod_done(wbuf.wbuf_persist) # returns nil to close req_res
+    alive = wbuf.wbuf_persist
+    req_res.proxy_trailers ? proxy_read_trailers(kcar, req_res, alive, wbuf)
+                           : proxy_read_body([], kcar, req_res, alive, wbuf)
   end
 
   def proxy_wait_next(qflags)

^ permalink raw reply	[flat|nested] 9+ messages in thread

* [PATCH 6/7] proxy_pass: split out req_res into a separate file
  2016-05-16  1:43 [PATCH 0/7] proxy_pass cleanups Eric Wong
                   ` (4 preceding siblings ...)
  2016-05-16  1:43 ` [PATCH 5/7] proxy_pass: trim down proxy_response_finish, too Eric Wong
@ 2016-05-16  1:43 ` Eric Wong
  2016-05-16  1:43 ` [PATCH 7/7] proxy_pass: fix resumes after complete buffering is unblocked Eric Wong
  2016-05-16  2:05 ` false-positive spam [Re: [PATCH 0/7] proxy_pass cleanups] Eric Wong
  7 siblings, 0 replies; 9+ messages in thread
From: Eric Wong @ 2016-05-16  1:43 UTC (permalink / raw)
  To: yahns-public

This makes the ReqRes class easier-to-find and hopefully
maintain when using with other parts of yahns, although there
may be no reason to use this class outside of ProxyPass.
---
 lib/yahns/proxy_pass.rb | 157 +----------------------------------------------
 lib/yahns/req_res.rb    | 159 ++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 161 insertions(+), 155 deletions(-)
 create mode 100644 lib/yahns/req_res.rb

diff --git a/lib/yahns/proxy_pass.rb b/lib/yahns/proxy_pass.rb
index a2d7d81..8e0b742 100644
--- a/lib/yahns/proxy_pass.rb
+++ b/lib/yahns/proxy_pass.rb
@@ -3,166 +3,13 @@
 # License: GPL-3.0+ (https://www.gnu.org/licenses/gpl-3.0.txt)
 # frozen_string_literal: true
 require 'socket'
-require 'kgio'
-require 'kcar' # gem install kcar
 require 'rack/request'
 require 'timeout'
 
 require_relative 'proxy_http_response'
+require_relative 'req_res'
 
 class Yahns::ProxyPass # :nodoc:
-  class ReqRes < Kgio::Socket # :nodoc:
-    attr_writer :resbuf
-    attr_accessor :proxy_trailers
-
-    def req_start(c, req, input, chunked)
-      @hdr = @resbuf = nil
-      @yahns_client = c
-      @rrstate = input ? [ req, input, chunked ] : req
-      Thread.current[:yahns_queue].queue_add(self, Yahns::Queue::QEV_WR)
-    end
-
-    def yahns_step # yahns event loop entry point
-      c = @yahns_client
-      case req = @rrstate
-      when Kcar::Parser # reading response...
-        buf = Thread.current[:yahns_rbuf]
-
-        case resbuf = @resbuf # where are we at the response?
-        when nil # common case, catch the response header in a single read
-
-          case rv = kgio_tryread(0x2000, buf)
-          when String
-            if res = req.headers(@hdr = [], rv)
-              return c.proxy_response_start(res, rv, req, self)
-            else # ugh, big headers or tricked response
-              # we must reinitialize the thread-local rbuf if it may
-              # live beyond the current thread
-              buf = Thread.current[:yahns_rbuf] = ''.dup
-              @resbuf = rv
-            end
-            # continue looping in middle "case @resbuf" loop
-          when :wait_readable
-            return rv # spurious wakeup
-          when nil then return c.proxy_err_response(502, self, nil, nil)
-          end # NOT looping here
-
-        when String # continue reading trickled response headers from upstream
-
-          case rv = kgio_tryread(0x2000, buf)
-          when String then res = req.headers(@hdr, resbuf << rv) and break
-          when :wait_readable then return rv
-          when nil then return c.proxy_err_response(502, self, nil, nil)
-          end while true
-
-          return c.proxy_response_start(res, resbuf, req, self)
-
-        when Yahns::WbufCommon # streaming/buffering the response body
-
-          # we assign wbuf for rescue below:
-          return c.proxy_response_finish(req, wbuf = resbuf, self)
-
-        end while true # case @resbuf
-
-      when Array # [ (str|vec), rack.input, chunked? ]
-        send_req_body(req) # returns nil or :wait_writable
-      when String # buffered request header
-        send_req_buf(req)
-      end
-    rescue => e
-      # avoid polluting logs with a giant backtrace when the problem isn't
-      # fixable in code.
-      case e
-      when Errno::ECONNREFUSED, Errno::ECONNRESET, Errno::EPIPE
-        e.set_backtrace([])
-      end
-      c.proxy_err_response(502, self, e, wbuf)
-    end
-
-    def send_req_body_chunk(buf)
-      case rv = String === buf ? kgio_trywrite(buf) : kgio_trywritev(buf)
-      when String, Array
-        buf.replace(rv) # retry loop on partial write
-      when :wait_writable, nil
-        # :wait_writable = upstream is reading slowly and making us wait
-        return rv
-      else
-        abort "BUG: #{rv.inspect} from kgio_trywrite*"
-      end while true
-    end
-
-    # returns :wait_readable if complete, :wait_writable if not
-    def send_req_body(req) # @rrstate == [ (str|vec), rack.input, chunked? ]
-      buf, input, chunked = req
-
-      # send the first buffered chunk or vector
-      rv = send_req_body_chunk(buf) and return rv # :wait_writable
-
-      # yay, sent the first chunk, now read the body!
-      rbuf = buf
-      if chunked
-        if String === buf # initial body
-          req[0] = buf = []
-        else
-          # try to reuse the biggest non-frozen buffer we just wrote;
-          rbuf = buf.max_by(&:size)
-          rbuf = ''.dup if rbuf.frozen? # unlikely...
-        end
-      end
-
-      # Note: input (env['rack.input']) is fully-buffered by default so
-      # we should not be waiting on a slow network resource when reading
-      # input.  However, some weird configs may disable this on LANs
-      # and we may wait indefinitely on input.read here...
-      while input.read(0x2000, rbuf)
-        if chunked
-          buf[0] = "#{rbuf.size.to_s(16)}\r\n".freeze
-          buf[1] = rbuf
-          buf[2] = "\r\n".freeze
-        end
-        rv = send_req_body_chunk(buf) and return rv # :wait_writable
-      end
-
-      rbuf.clear # all done, clear the big buffer
-
-      # we cannot use respond_to?(:close) here since Rack::Lint::InputWrapper
-      # tries to prevent that (and hijack means all Rack specs go out the door)
-      case input
-      when Yahns::TeeInput, IO
-        input.close
-      end
-
-      # note: we do not send any trailer, they are folded into the header
-      # because this relies on full request buffering
-      # prepare_wait_readable is called by send_req_buf
-      chunked ? send_req_buf("0\r\n\r\n".freeze) : prepare_wait_readable
-    rescue Errno::EPIPE, Errno::ECONNRESET, Errno::ENOTCONN
-      # no more reading off the client socket, just prepare to forward
-      # the rejection response from the upstream (if any)
-      @yahns_client.to_io.shutdown(Socket::SHUT_RD)
-      prepare_wait_readable
-    end
-
-    def prepare_wait_readable
-      @rrstate = Kcar::Parser.new
-      :wait_readable # all done sending the request, wait for response
-    end
-
-    # n.b. buf must be a detached string not shared with
-    # Thread.current[:yahns_rbuf] of any thread
-    def send_req_buf(buf)
-      case rv = kgio_trywrite(buf)
-      when String
-        buf = rv # retry inner loop
-      when :wait_writable
-        @rrstate = buf
-        return :wait_writable
-      when nil
-        return prepare_wait_readable
-      end while true
-    end
-  end # class ReqRes
-
   def initialize(dest, opts = {})
     case dest
     when %r{\Aunix:([^:]+)(?::(/.*))?\z}
@@ -199,7 +46,7 @@ def init_path_vars(path)
 
   def call(env)
     # 3-way handshake for TCP backends while we generate the request header
-    rr = ReqRes.start(@sockaddr)
+    rr = Yahns::ReqRes.start(@sockaddr)
     c = env['rack.hijack'].call
 
     req = Rack::Request.new(env)
diff --git a/lib/yahns/req_res.rb b/lib/yahns/req_res.rb
new file mode 100644
index 0000000..3b0d298
--- /dev/null
+++ b/lib/yahns/req_res.rb
@@ -0,0 +1,159 @@
+# -*- encoding: binary -*-
+# Copyright (C) 2013-2016 all contributors <yahns-public@yhbt.net>
+# License: GPL-3.0+ (https://www.gnu.org/licenses/gpl-3.0.txt)
+# frozen_string_literal: true
+# Only used by Yahns::ProxyPass
+require 'kcar' # gem install kcar
+require 'kgio'
+
+class Yahns::ReqRes < Kgio::Socket # :nodoc:
+  attr_writer :resbuf
+  attr_accessor :proxy_trailers
+
+  def req_start(c, req, input, chunked)
+    @hdr = @resbuf = nil
+    @yahns_client = c
+    @rrstate = input ? [ req, input, chunked ] : req
+    Thread.current[:yahns_queue].queue_add(self, Yahns::Queue::QEV_WR)
+  end
+
+  def yahns_step # yahns event loop entry point
+    c = @yahns_client
+    case req = @rrstate
+    when Kcar::Parser # reading response...
+      buf = Thread.current[:yahns_rbuf]
+
+      case resbuf = @resbuf # where are we at the response?
+      when nil # common case, catch the response header in a single read
+
+        case rv = kgio_tryread(0x2000, buf)
+        when String
+          if res = req.headers(@hdr = [], rv)
+            return c.proxy_response_start(res, rv, req, self)
+          else # ugh, big headers or tricked response
+            # we must reinitialize the thread-local rbuf if it may
+            # live beyond the current thread
+            buf = Thread.current[:yahns_rbuf] = ''.dup
+            @resbuf = rv
+          end
+          # continue looping in middle "case @resbuf" loop
+        when :wait_readable
+          return rv # spurious wakeup
+        when nil then return c.proxy_err_response(502, self, nil, nil)
+        end # NOT looping here
+
+      when String # continue reading trickled response headers from upstream
+
+        case rv = kgio_tryread(0x2000, buf)
+        when String then res = req.headers(@hdr, resbuf << rv) and break
+        when :wait_readable then return rv
+        when nil then return c.proxy_err_response(502, self, nil, nil)
+        end while true
+
+        return c.proxy_response_start(res, resbuf, req, self)
+
+      when Yahns::WbufCommon # streaming/buffering the response body
+
+        # we assign wbuf for rescue below:
+        return c.proxy_response_finish(req, wbuf = resbuf, self)
+
+      end while true # case @resbuf
+
+    when Array # [ (str|vec), rack.input, chunked? ]
+      send_req_body(req) # returns nil or :wait_writable
+    when String # buffered request header
+      send_req_buf(req)
+    end
+  rescue => e
+    # avoid polluting logs with a giant backtrace when the problem isn't
+    # fixable in code.
+    case e
+    when Errno::ECONNREFUSED, Errno::ECONNRESET, Errno::EPIPE
+      e.set_backtrace([])
+    end
+    c.proxy_err_response(502, self, e, wbuf)
+  end
+
+  def send_req_body_chunk(buf)
+    case rv = String === buf ? kgio_trywrite(buf) : kgio_trywritev(buf)
+    when String, Array
+      buf.replace(rv) # retry loop on partial write
+    when :wait_writable, nil
+      # :wait_writable = upstream is reading slowly and making us wait
+      return rv
+    else
+      abort "BUG: #{rv.inspect} from kgio_trywrite*"
+    end while true
+  end
+
+  # returns :wait_readable if complete, :wait_writable if not
+  def send_req_body(req) # @rrstate == [ (str|vec), rack.input, chunked? ]
+    buf, input, chunked = req
+
+    # send the first buffered chunk or vector
+    rv = send_req_body_chunk(buf) and return rv # :wait_writable
+
+    # yay, sent the first chunk, now read the body!
+    rbuf = buf
+    if chunked
+      if String === buf # initial body
+        req[0] = buf = []
+      else
+        # try to reuse the biggest non-frozen buffer we just wrote;
+        rbuf = buf.max_by(&:size)
+        rbuf = ''.dup if rbuf.frozen? # unlikely...
+      end
+    end
+
+    # Note: input (env['rack.input']) is fully-buffered by default so
+    # we should not be waiting on a slow network resource when reading
+    # input.  However, some weird configs may disable this on LANs
+    # and we may wait indefinitely on input.read here...
+    while input.read(0x2000, rbuf)
+      if chunked
+        buf[0] = "#{rbuf.size.to_s(16)}\r\n".freeze
+        buf[1] = rbuf
+        buf[2] = "\r\n".freeze
+      end
+      rv = send_req_body_chunk(buf) and return rv # :wait_writable
+    end
+
+    rbuf.clear # all done, clear the big buffer
+
+    # we cannot use respond_to?(:close) here since Rack::Lint::InputWrapper
+    # tries to prevent that (and hijack means all Rack specs go out the door)
+    case input
+    when Yahns::TeeInput, IO
+      input.close
+    end
+
+    # note: we do not send any trailer, they are folded into the header
+    # because this relies on full request buffering
+    # prepare_wait_readable is called by send_req_buf
+    chunked ? send_req_buf("0\r\n\r\n".freeze) : prepare_wait_readable
+  rescue Errno::EPIPE, Errno::ECONNRESET, Errno::ENOTCONN
+    # no more reading off the client socket, just prepare to forward
+    # the rejection response from the upstream (if any)
+    @yahns_client.to_io.shutdown(Socket::SHUT_RD)
+    prepare_wait_readable
+  end
+
+  def prepare_wait_readable
+    @rrstate = Kcar::Parser.new
+    :wait_readable # all done sending the request, wait for response
+  end
+
+  # n.b. buf must be a detached string not shared with
+  # Thread.current[:yahns_rbuf] of any thread
+  def send_req_buf(buf)
+    case rv = kgio_trywrite(buf)
+    when String
+      buf = rv # retry inner loop
+    when :wait_writable
+      @rrstate = buf
+      return :wait_writable
+    when nil
+      return prepare_wait_readable
+    end while true
+  end
+end # class ReqRes

^ permalink raw reply	[flat|nested] 9+ messages in thread

* [PATCH 7/7] proxy_pass: fix resumes after complete buffering is unblocked
  2016-05-16  1:43 [PATCH 0/7] proxy_pass cleanups Eric Wong
                   ` (5 preceding siblings ...)
  2016-05-16  1:43 ` [PATCH 6/7] proxy_pass: split out req_res into a separate file Eric Wong
@ 2016-05-16  1:43 ` Eric Wong
  2016-05-16  2:05 ` false-positive spam [Re: [PATCH 0/7] proxy_pass cleanups] Eric Wong
  7 siblings, 0 replies; 9+ messages in thread
From: Eric Wong @ 2016-05-16  1:43 UTC (permalink / raw)
  To: yahns-public

This fixes a bug where a cleared wbuf would kill the
process after the response got flushed out to the client
as `wbuf.busy' becomes `false'.

This fixes a regression introduced in
"proxy_pass: trim down proxy_response_finish, too"
which was never in a stable release
---
 lib/yahns/proxy_http_response.rb | 51 ++++++++++++++++++++--------------------
 1 file changed, 25 insertions(+), 26 deletions(-)

diff --git a/lib/yahns/proxy_http_response.rb b/lib/yahns/proxy_http_response.rb
index 65fd03b..284a3c6 100644
--- a/lib/yahns/proxy_http_response.rb
+++ b/lib/yahns/proxy_http_response.rb
@@ -160,7 +160,7 @@ def proxy_read_body(tip, kcar, req_res, alive, wbuf)
       req_res.proxy_trailers = [ rbuf.dup, tip ]
       return proxy_read_trailers(kcar, req_res, alive, wbuf)
     end
-    wbuf ? proxy_busy_mod_blocked(wbuf, wbuf.busy) : proxy_busy_mod_done(alive)
+    proxy_busy_mod(wbuf, alive)
   end
 
   def proxy_read_trailers(kcar, req_res, alive, wbuf)
@@ -178,7 +178,7 @@ def proxy_read_trailers(kcar, req_res, alive, wbuf)
       end # no loop here
     end
     wbuf = proxy_write(wbuf, trailer_out(tlr), alive)
-    wbuf ? proxy_busy_mod_blocked(wbuf, wbuf.busy) : proxy_busy_mod_done(alive)
+    proxy_busy_mod(wbuf, alive)
   end
 
   # start streaming the response once upstream is done sending headers to us.
@@ -196,7 +196,7 @@ def proxy_response_start(res, tip, kcar, req_res)
 
     # all done reading response from upstream, req_res will be discarded
     # when we return nil:
-    wbuf ? proxy_busy_mod_blocked(wbuf, wbuf.busy) : proxy_busy_mod_done(alive)
+    proxy_busy_mod(wbuf, alive)
   rescue => e
     proxy_err_response(502, req_res, e, wbuf)
   end
@@ -218,7 +218,7 @@ def proxy_wait_next(qflags)
     # <thread is scheduled away>                       | epoll_wait readiness
     #                                                  | ReqRes#yahns_step
     #                                                  | proxy dispatch ...
-    #                                                  | proxy_busy_mod_done
+    #                                                  | proxy_busy_mod
     # ************************** DANGER BELOW ********************************
     #                                                  | HttpClient#yahns_step
     #                                                  | # clears env
@@ -238,29 +238,28 @@ def proxy_wait_next(qflags)
     Thread.current[:yahns_queue].queue_mod(self, qflags)
   end
 
-  def proxy_busy_mod_done(alive)
-    case http_response_done(alive)
-    when :wait_readable then proxy_wait_next(Yahns::Queue::QEV_RD)
-    when :wait_writable then proxy_wait_next(Yahns::Queue::QEV_WR)
-    when :close then close
+  def proxy_busy_mod(wbuf, alive)
+    busy = wbuf.busy if wbuf
+    if busy
+      # we are completely done reading and buffering the upstream response,
+      # but have not completely written the response to the client,
+      # yield control to the client socket:
+      @state = wbuf
+      proxy_wait_next(case busy
+        when :wait_readable then Yahns::Queue::QEV_RD
+        when :wait_writable then Yahns::Queue::QEV_WR
+        else
+          raise "BUG: invalid wbuf.busy: #{busy.inspect}"
+        end)
+      # no touching self after proxy_wait_next, we may be running
+      # HttpClient#yahns_step in a different thread at this point
+    else
+      case http_response_done(alive)
+      when :wait_readable then proxy_wait_next(Yahns::Queue::QEV_RD)
+      when :wait_writable then proxy_wait_next(Yahns::Queue::QEV_WR)
+      when :close then close
+      end
     end
-
-    nil # signal close for ReqRes#yahns_step
-  end
-
-  def proxy_busy_mod_blocked(wbuf, busy)
-    # we are completely done reading and buffering the upstream response,
-    # but have not completely written the response to the client,
-    # yield control to the client socket:
-    @state = wbuf
-    proxy_wait_next(case busy
-      when :wait_readable then Yahns::Queue::QEV_RD
-      when :wait_writable then Yahns::Queue::QEV_WR
-      else
-        abort "BUG: invalid wbuf.busy: #{busy.inspect}"
-      end)
-    # no touching self after proxy_wait_next, we may be running
-    # HttpClient#yahns_step in a different thread at this point
     nil # signal close for ReqRes#yahns_step
   end
 

^ permalink raw reply	[flat|nested] 9+ messages in thread

* false-positive spam [Re: [PATCH 0/7] proxy_pass cleanups]
  2016-05-16  1:43 [PATCH 0/7] proxy_pass cleanups Eric Wong
                   ` (6 preceding siblings ...)
  2016-05-16  1:43 ` [PATCH 7/7] proxy_pass: fix resumes after complete buffering is unblocked Eric Wong
@ 2016-05-16  2:05 ` Eric Wong
  7 siblings, 0 replies; 9+ messages in thread
From: Eric Wong @ 2016-05-16  2:05 UTC (permalink / raw)
  To: yahns-public

Apologies if you get duplicates, upgraded SpamAssassin and
wanted to make sure mail over Tor could still get through.
Bayesian can be pretty weak (even after training), so I guess
it totally depends which relays Tor lets you through.
I guess it's a luck-of-the-draw deal.

^ permalink raw reply	[flat|nested] 9+ messages in thread

end of thread, back to index

Thread overview: 9+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2016-05-16  1:43 [PATCH 0/7] proxy_pass cleanups Eric Wong
2016-05-16  1:43 ` [PATCH 1/7] proxy_pass: simplify writing request bodies upstream Eric Wong
2016-05-16  1:43 ` [PATCH 2/7] proxy_pass: hoist out proxy_res_headers method Eric Wong
2016-05-16  1:43 ` [PATCH 3/7] proxy_pass: simplify proxy_http_response Eric Wong
2016-05-16  1:43 ` [PATCH 4/7] proxy_pass: split out body and trailer reading in response Eric Wong
2016-05-16  1:43 ` [PATCH 5/7] proxy_pass: trim down proxy_response_finish, too Eric Wong
2016-05-16  1:43 ` [PATCH 6/7] proxy_pass: split out req_res into a separate file Eric Wong
2016-05-16  1:43 ` [PATCH 7/7] proxy_pass: fix resumes after complete buffering is unblocked Eric Wong
2016-05-16  2:05 ` false-positive spam [Re: [PATCH 0/7] proxy_pass cleanups] Eric Wong

yahns Ruby server user/dev discussion

Archives are clonable:
	git clone --mirror https://yhbt.net/yahns-public
	git clone --mirror http://ou63pmih66umazou.onion/yahns-public

Example config snippet for mirrors

Newsgroups are available over NNTP:
	nntp://news.public-inbox.org/inbox.comp.lang.ruby.yahns
	nntp://ou63pmih66umazou.onion/inbox.comp.lang.ruby.yahns

 note: .onion URLs require Tor: https://www.torproject.org/

AGPL code for this site: git clone https://public-inbox.org/ public-inbox