yahns Ruby server user/dev discussion
 help / color / Atom feed
From: Eric Wong <e@80x24.org>
To: yahns-public@yhbt.net
Subject: [PATCH 6/7] proxy_pass: split out req_res into a separate file
Date: Mon, 16 May 2016 01:43:39 +0000
Message-ID: <20160516014340.8258-7-e@80x24.org> (raw)
In-Reply-To: <20160516014340.8258-1-e@80x24.org>

This makes the ReqRes class easier-to-find and hopefully
maintain when using with other parts of yahns, although there
may be no reason to use this class outside of ProxyPass.
---
 lib/yahns/proxy_pass.rb | 157 +----------------------------------------------
 lib/yahns/req_res.rb    | 159 ++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 161 insertions(+), 155 deletions(-)
 create mode 100644 lib/yahns/req_res.rb

diff --git a/lib/yahns/proxy_pass.rb b/lib/yahns/proxy_pass.rb
index a2d7d81..8e0b742 100644
--- a/lib/yahns/proxy_pass.rb
+++ b/lib/yahns/proxy_pass.rb
@@ -3,166 +3,13 @@
 # License: GPL-3.0+ (https://www.gnu.org/licenses/gpl-3.0.txt)
 # frozen_string_literal: true
 require 'socket'
-require 'kgio'
-require 'kcar' # gem install kcar
 require 'rack/request'
 require 'timeout'
 
 require_relative 'proxy_http_response'
+require_relative 'req_res'
 
 class Yahns::ProxyPass # :nodoc:
-  class ReqRes < Kgio::Socket # :nodoc:
-    attr_writer :resbuf
-    attr_accessor :proxy_trailers
-
-    def req_start(c, req, input, chunked)
-      @hdr = @resbuf = nil
-      @yahns_client = c
-      @rrstate = input ? [ req, input, chunked ] : req
-      Thread.current[:yahns_queue].queue_add(self, Yahns::Queue::QEV_WR)
-    end
-
-    def yahns_step # yahns event loop entry point
-      c = @yahns_client
-      case req = @rrstate
-      when Kcar::Parser # reading response...
-        buf = Thread.current[:yahns_rbuf]
-
-        case resbuf = @resbuf # where are we at the response?
-        when nil # common case, catch the response header in a single read
-
-          case rv = kgio_tryread(0x2000, buf)
-          when String
-            if res = req.headers(@hdr = [], rv)
-              return c.proxy_response_start(res, rv, req, self)
-            else # ugh, big headers or tricked response
-              # we must reinitialize the thread-local rbuf if it may
-              # live beyond the current thread
-              buf = Thread.current[:yahns_rbuf] = ''.dup
-              @resbuf = rv
-            end
-            # continue looping in middle "case @resbuf" loop
-          when :wait_readable
-            return rv # spurious wakeup
-          when nil then return c.proxy_err_response(502, self, nil, nil)
-          end # NOT looping here
-
-        when String # continue reading trickled response headers from upstream
-
-          case rv = kgio_tryread(0x2000, buf)
-          when String then res = req.headers(@hdr, resbuf << rv) and break
-          when :wait_readable then return rv
-          when nil then return c.proxy_err_response(502, self, nil, nil)
-          end while true
-
-          return c.proxy_response_start(res, resbuf, req, self)
-
-        when Yahns::WbufCommon # streaming/buffering the response body
-
-          # we assign wbuf for rescue below:
-          return c.proxy_response_finish(req, wbuf = resbuf, self)
-
-        end while true # case @resbuf
-
-      when Array # [ (str|vec), rack.input, chunked? ]
-        send_req_body(req) # returns nil or :wait_writable
-      when String # buffered request header
-        send_req_buf(req)
-      end
-    rescue => e
-      # avoid polluting logs with a giant backtrace when the problem isn't
-      # fixable in code.
-      case e
-      when Errno::ECONNREFUSED, Errno::ECONNRESET, Errno::EPIPE
-        e.set_backtrace([])
-      end
-      c.proxy_err_response(502, self, e, wbuf)
-    end
-
-    def send_req_body_chunk(buf)
-      case rv = String === buf ? kgio_trywrite(buf) : kgio_trywritev(buf)
-      when String, Array
-        buf.replace(rv) # retry loop on partial write
-      when :wait_writable, nil
-        # :wait_writable = upstream is reading slowly and making us wait
-        return rv
-      else
-        abort "BUG: #{rv.inspect} from kgio_trywrite*"
-      end while true
-    end
-
-    # returns :wait_readable if complete, :wait_writable if not
-    def send_req_body(req) # @rrstate == [ (str|vec), rack.input, chunked? ]
-      buf, input, chunked = req
-
-      # send the first buffered chunk or vector
-      rv = send_req_body_chunk(buf) and return rv # :wait_writable
-
-      # yay, sent the first chunk, now read the body!
-      rbuf = buf
-      if chunked
-        if String === buf # initial body
-          req[0] = buf = []
-        else
-          # try to reuse the biggest non-frozen buffer we just wrote;
-          rbuf = buf.max_by(&:size)
-          rbuf = ''.dup if rbuf.frozen? # unlikely...
-        end
-      end
-
-      # Note: input (env['rack.input']) is fully-buffered by default so
-      # we should not be waiting on a slow network resource when reading
-      # input.  However, some weird configs may disable this on LANs
-      # and we may wait indefinitely on input.read here...
-      while input.read(0x2000, rbuf)
-        if chunked
-          buf[0] = "#{rbuf.size.to_s(16)}\r\n".freeze
-          buf[1] = rbuf
-          buf[2] = "\r\n".freeze
-        end
-        rv = send_req_body_chunk(buf) and return rv # :wait_writable
-      end
-
-      rbuf.clear # all done, clear the big buffer
-
-      # we cannot use respond_to?(:close) here since Rack::Lint::InputWrapper
-      # tries to prevent that (and hijack means all Rack specs go out the door)
-      case input
-      when Yahns::TeeInput, IO
-        input.close
-      end
-
-      # note: we do not send any trailer, they are folded into the header
-      # because this relies on full request buffering
-      # prepare_wait_readable is called by send_req_buf
-      chunked ? send_req_buf("0\r\n\r\n".freeze) : prepare_wait_readable
-    rescue Errno::EPIPE, Errno::ECONNRESET, Errno::ENOTCONN
-      # no more reading off the client socket, just prepare to forward
-      # the rejection response from the upstream (if any)
-      @yahns_client.to_io.shutdown(Socket::SHUT_RD)
-      prepare_wait_readable
-    end
-
-    def prepare_wait_readable
-      @rrstate = Kcar::Parser.new
-      :wait_readable # all done sending the request, wait for response
-    end
-
-    # n.b. buf must be a detached string not shared with
-    # Thread.current[:yahns_rbuf] of any thread
-    def send_req_buf(buf)
-      case rv = kgio_trywrite(buf)
-      when String
-        buf = rv # retry inner loop
-      when :wait_writable
-        @rrstate = buf
-        return :wait_writable
-      when nil
-        return prepare_wait_readable
-      end while true
-    end
-  end # class ReqRes
-
   def initialize(dest, opts = {})
     case dest
     when %r{\Aunix:([^:]+)(?::(/.*))?\z}
@@ -199,7 +46,7 @@ def init_path_vars(path)
 
   def call(env)
     # 3-way handshake for TCP backends while we generate the request header
-    rr = ReqRes.start(@sockaddr)
+    rr = Yahns::ReqRes.start(@sockaddr)
     c = env['rack.hijack'].call
 
     req = Rack::Request.new(env)
diff --git a/lib/yahns/req_res.rb b/lib/yahns/req_res.rb
new file mode 100644
index 0000000..3b0d298
--- /dev/null
+++ b/lib/yahns/req_res.rb
@@ -0,0 +1,159 @@
+# -*- encoding: binary -*-
+# Copyright (C) 2013-2016 all contributors <yahns-public@yhbt.net>
+# License: GPL-3.0+ (https://www.gnu.org/licenses/gpl-3.0.txt)
+# frozen_string_literal: true
+# Only used by Yahns::ProxyPass
+require 'kcar' # gem install kcar
+require 'kgio'
+
+class Yahns::ReqRes < Kgio::Socket # :nodoc:
+  attr_writer :resbuf
+  attr_accessor :proxy_trailers
+
+  def req_start(c, req, input, chunked)
+    @hdr = @resbuf = nil
+    @yahns_client = c
+    @rrstate = input ? [ req, input, chunked ] : req
+    Thread.current[:yahns_queue].queue_add(self, Yahns::Queue::QEV_WR)
+  end
+
+  def yahns_step # yahns event loop entry point
+    c = @yahns_client
+    case req = @rrstate
+    when Kcar::Parser # reading response...
+      buf = Thread.current[:yahns_rbuf]
+
+      case resbuf = @resbuf # where are we at the response?
+      when nil # common case, catch the response header in a single read
+
+        case rv = kgio_tryread(0x2000, buf)
+        when String
+          if res = req.headers(@hdr = [], rv)
+            return c.proxy_response_start(res, rv, req, self)
+          else # ugh, big headers or tricked response
+            # we must reinitialize the thread-local rbuf if it may
+            # live beyond the current thread
+            buf = Thread.current[:yahns_rbuf] = ''.dup
+            @resbuf = rv
+          end
+          # continue looping in middle "case @resbuf" loop
+        when :wait_readable
+          return rv # spurious wakeup
+        when nil then return c.proxy_err_response(502, self, nil, nil)
+        end # NOT looping here
+
+      when String # continue reading trickled response headers from upstream
+
+        case rv = kgio_tryread(0x2000, buf)
+        when String then res = req.headers(@hdr, resbuf << rv) and break
+        when :wait_readable then return rv
+        when nil then return c.proxy_err_response(502, self, nil, nil)
+        end while true
+
+        return c.proxy_response_start(res, resbuf, req, self)
+
+      when Yahns::WbufCommon # streaming/buffering the response body
+
+        # we assign wbuf for rescue below:
+        return c.proxy_response_finish(req, wbuf = resbuf, self)
+
+      end while true # case @resbuf
+
+    when Array # [ (str|vec), rack.input, chunked? ]
+      send_req_body(req) # returns nil or :wait_writable
+    when String # buffered request header
+      send_req_buf(req)
+    end
+  rescue => e
+    # avoid polluting logs with a giant backtrace when the problem isn't
+    # fixable in code.
+    case e
+    when Errno::ECONNREFUSED, Errno::ECONNRESET, Errno::EPIPE
+      e.set_backtrace([])
+    end
+    c.proxy_err_response(502, self, e, wbuf)
+  end
+
+  def send_req_body_chunk(buf)
+    case rv = String === buf ? kgio_trywrite(buf) : kgio_trywritev(buf)
+    when String, Array
+      buf.replace(rv) # retry loop on partial write
+    when :wait_writable, nil
+      # :wait_writable = upstream is reading slowly and making us wait
+      return rv
+    else
+      abort "BUG: #{rv.inspect} from kgio_trywrite*"
+    end while true
+  end
+
+  # returns :wait_readable if complete, :wait_writable if not
+  def send_req_body(req) # @rrstate == [ (str|vec), rack.input, chunked? ]
+    buf, input, chunked = req
+
+    # send the first buffered chunk or vector
+    rv = send_req_body_chunk(buf) and return rv # :wait_writable
+
+    # yay, sent the first chunk, now read the body!
+    rbuf = buf
+    if chunked
+      if String === buf # initial body
+        req[0] = buf = []
+      else
+        # try to reuse the biggest non-frozen buffer we just wrote;
+        rbuf = buf.max_by(&:size)
+        rbuf = ''.dup if rbuf.frozen? # unlikely...
+      end
+    end
+
+    # Note: input (env['rack.input']) is fully-buffered by default so
+    # we should not be waiting on a slow network resource when reading
+    # input.  However, some weird configs may disable this on LANs
+    # and we may wait indefinitely on input.read here...
+    while input.read(0x2000, rbuf)
+      if chunked
+        buf[0] = "#{rbuf.size.to_s(16)}\r\n".freeze
+        buf[1] = rbuf
+        buf[2] = "\r\n".freeze
+      end
+      rv = send_req_body_chunk(buf) and return rv # :wait_writable
+    end
+
+    rbuf.clear # all done, clear the big buffer
+
+    # we cannot use respond_to?(:close) here since Rack::Lint::InputWrapper
+    # tries to prevent that (and hijack means all Rack specs go out the door)
+    case input
+    when Yahns::TeeInput, IO
+      input.close
+    end
+
+    # note: we do not send any trailer, they are folded into the header
+    # because this relies on full request buffering
+    # prepare_wait_readable is called by send_req_buf
+    chunked ? send_req_buf("0\r\n\r\n".freeze) : prepare_wait_readable
+  rescue Errno::EPIPE, Errno::ECONNRESET, Errno::ENOTCONN
+    # no more reading off the client socket, just prepare to forward
+    # the rejection response from the upstream (if any)
+    @yahns_client.to_io.shutdown(Socket::SHUT_RD)
+    prepare_wait_readable
+  end
+
+  def prepare_wait_readable
+    @rrstate = Kcar::Parser.new
+    :wait_readable # all done sending the request, wait for response
+  end
+
+  # n.b. buf must be a detached string not shared with
+  # Thread.current[:yahns_rbuf] of any thread
+  def send_req_buf(buf)
+    case rv = kgio_trywrite(buf)
+    when String
+      buf = rv # retry inner loop
+    when :wait_writable
+      @rrstate = buf
+      return :wait_writable
+    when nil
+      return prepare_wait_readable
+    end while true
+  end
+end # class ReqRes

  parent reply index

Thread overview: 9+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2016-05-16  1:43 [PATCH 0/7] proxy_pass cleanups Eric Wong
2016-05-16  1:43 ` [PATCH 1/7] proxy_pass: simplify writing request bodies upstream Eric Wong
2016-05-16  1:43 ` [PATCH 2/7] proxy_pass: hoist out proxy_res_headers method Eric Wong
2016-05-16  1:43 ` [PATCH 3/7] proxy_pass: simplify proxy_http_response Eric Wong
2016-05-16  1:43 ` [PATCH 4/7] proxy_pass: split out body and trailer reading in response Eric Wong
2016-05-16  1:43 ` [PATCH 5/7] proxy_pass: trim down proxy_response_finish, too Eric Wong
2016-05-16  1:43 ` Eric Wong [this message]
2016-05-16  1:43 ` [PATCH 7/7] proxy_pass: fix resumes after complete buffering is unblocked Eric Wong
2016-05-16  2:05 ` false-positive spam [Re: [PATCH 0/7] proxy_pass cleanups] Eric Wong

Reply instructions:

You may reply publically to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

  List information: https://yhbt.net/yahns/README

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20160516014340.8258-7-e@80x24.org \
    --to=e@80x24.org \
    --cc=yahns-public@yhbt.net \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

yahns Ruby server user/dev discussion

Archives are clonable:
	git clone --mirror https://yhbt.net/yahns-public
	git clone --mirror http://ou63pmih66umazou.onion/yahns-public

Newsgroups are available over NNTP:
	nntp://news.public-inbox.org/inbox.comp.lang.ruby.yahns
	nntp://ou63pmih66umazou.onion/inbox.comp.lang.ruby.yahns

 note: .onion URLs require Tor: https://www.torproject.org/

AGPL code for this site: git clone https://public-inbox.org/ public-inbox