yahns Ruby server user/dev discussion
 help / color / mirror / code / Atom feed
* [PATCH] proxy_pass: rewrite to be async, using rack.hijack
@ 2015-04-03  1:53 Eric Wong
  0 siblings, 0 replies; only message in thread
From: Eric Wong @ 2015-04-03  1:53 UTC (permalink / raw)
  To: yahns-public

This allows our reverse proxy to avoid having an innefficient 1:1
relationship between threads and upstream connections, reducing
memory usage when there are many upstream connections (possibly to
multiple backend machines).
---
 lib/yahns/proxy_http_response.rb | 226 ++++++++++++++++++++++++++++++++
 lib/yahns/proxy_pass.rb          | 276 +++++++++++++++++++++------------------
 lib/yahns/queue_epoll.rb         |   5 +
 lib/yahns/queue_kqueue.rb        |   5 +
 lib/yahns/wbuf.rb                |   9 +-
 test/test_proxy_pass.rb          | 120 ++++++++++++++++-
 6 files changed, 510 insertions(+), 131 deletions(-)
 create mode 100644 lib/yahns/proxy_http_response.rb

diff --git a/lib/yahns/proxy_http_response.rb b/lib/yahns/proxy_http_response.rb
new file mode 100644
index 0000000..31505d3
--- /dev/null
+++ b/lib/yahns/proxy_http_response.rb
@@ -0,0 +1,226 @@
+# -*- encoding: binary -*-
+# Copyright (C) 2015 all contributors <yahns-public@yhbt.net>
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+
+# loaded by yahns/proxy_pass, this relies on Yahns::HttpResponse for
+# constants.
+module Yahns::HttpResponse # :nodoc:
+
+  # write everything in buf to our client socket (or wbuf, if it exists)
+  # it may return a newly-created wbuf or nil
+  def proxy_write(wbuf, buf, alive)
+    unless wbuf
+      # no write buffer, try to write directly to the client socket
+      case rv = String === buf ? kgio_trywrite(buf) : kgio_trywritev(buf)
+      when nil then return # done writing buf, likely
+      when String, Array # partial write, hope the skb grows
+        buf = rv
+      when :wait_writable, :wait_readable
+        wbuf = Yahns::Wbuf.new(nil, alive, self.class.output_buffer_tmpdir, rv)
+        buf = buf.join if Array === buf
+        break
+      end while true
+    end
+
+    wbuf.wbuf_write(self, buf)
+    wbuf.busy ? wbuf : nil
+  end
+
+  def proxy_err_response(code, req_res, exc, wbuf)
+    logger = @hs.env['rack.logger']
+    if exc
+      Yahns::Log.exception(logger, 'upstream error', exc)
+    else
+      logger.error('premature upstream EOF')
+    end
+    # try to write something, but don't care if we fail
+    Integer === code and
+      kgio_trywrite("HTTP/1.1 #{CODES[code]}\r\n\r\n") rescue nil
+
+    shutdown rescue nil
+    req_res.shutdown rescue nil
+    nil # signal close of req_res from yahns_step in yahns/proxy_pass.rb
+  ensure
+    wbuf.wbuf_abort if wbuf
+  end
+
+  # returns :wait_readable if we need to read more from req_res
+  # returns :ignore if we yield control to the client(self)
+  # returns nil if completely done
+  def proxy_response_start(res, tip, kcar, req_res)
+    status, headers = res
+    si = status.to_i
+    status = CODES[si] || status
+    env = @hs.env
+    have_body = !Rack::Utils::STATUS_WITH_NO_ENTITY_BODY.include?(si) &&
+                env[REQUEST_METHOD] != HEAD
+    flags = MSG_DONTWAIT
+    rechunk = false
+    k = self.class
+    alive = @hs.next? && k.persistent_connections
+
+    res = "HTTP/1.1 #{status}\r\n"
+    headers.each do |key,value| # n.b.: headers is an Array of 2-element Arrays
+      case key
+      when /\A(?:Connection|Keep-Alive)\z/i
+        next # do not let some upstream headers leak through
+      when %r{\AContent-Length\z}i
+        flags |= MSG_MORE if have_body && value.to_i > 0
+      when %r{\ATransfer-Encoding\z}i
+        if value =~ /\bchunked\b/i
+          case env['HTTP_VERSION'] # this is the original client request
+          when 'HTTP/1.1'
+            rechunk = true
+          else
+            # too expensive to calculate Content-Length for HTTP/1.0
+            # persistent clients, so we will drop persistence instead
+            alive = false # this should already be implied
+            next
+          end
+        end
+      end
+
+      res << "#{key}: #{value}\r\n"
+    end
+
+    # For now, do not add a Date: header, assume upstream already did it
+    # but do not care if they did not
+    res << (alive ? CONN_KA : CONN_CLOSE)
+
+    # send the headers
+    case rv = kgio_syssend(res, flags)
+    when nil then break # all done, likely
+    when String # partial write, highly unlikely
+      flags = MSG_DONTWAIT
+      res = rv # hope the skb grows
+    when :wait_writable, :wait_readable # highly unlikely in real apps
+      wbuf = proxy_write(nil, res, alive)
+      break # keep buffering as much as possible
+    end while true
+
+    rbuf = Thread.current[:yahns_rbuf]
+    tip = tip.empty? ? [] : [ tip ]
+
+    if have_body
+      if len = kcar.body_bytes_left
+
+        case tmp = tip.shift || req_res.kgio_tryread(0x2000, rbuf)
+        when String
+          len = kcar.body_bytes_left -= tmp.size
+          wbuf = proxy_write(wbuf, tmp, alive)
+        when nil # premature EOF
+          return proxy_err_response(nil, req_res, nil, wbuf)
+        when :wait_readable
+          # for ensure:
+          wbuf ||= Yahns::Wbuf.new(nil, alive, k.output_buffer_tmpdir, false)
+          return :wait_readable # self remains in :ignore, wait on upstream
+        end until len == 0
+
+      else # nasty chunked body
+
+        # Only HTTP/1.1 supports chunked responses, we must translate
+        # otherwise.  Otherwise, we must drop the connection to signal
+        # the end.  We only send HTTP/1.1 requests to the upstream so
+        # Rack/Rails can always send streaming responses.
+        buf = ''
+        case tmp = tip.shift || req_res.kgio_tryread(0x2000, rbuf)
+        when String
+          kcar.filter_body(buf, tmp)
+          unless buf.empty?
+            tmp = rechunk ? [ "#{buf.size.to_s(16)}\r\n", buf, "\r\n".freeze ]
+                          : buf
+            wbuf = proxy_write(wbuf, tmp, alive)
+          end
+        when nil # premature EOF
+          return proxy_err_response(nil, req_res, nil, wbuf)
+        when :wait_readable
+          # for ensure:
+          wbuf ||= Yahns::Wbuf.new(nil, alive, k.output_buffer_tmpdir, false)
+          return :wait_readable # self remains in :ignore, wait on upstream
+        end until kcar.body_eof?
+
+        # TODO: Trailer support
+        wbuf = proxy_write(wbuf, "0\r\n\r\n".freeze, alive) if rechunk
+      end
+    end
+
+    wbuf and return proxy_busy_mod_blocked(wbuf, wbuf.busy)
+    proxy_busy_mod_done(alive)
+  rescue => e
+    proxy_err_response(502, req_res, e, wbuf)
+  ensure
+    # this happens if this method returns :wait_readable
+    req_res.resbuf = wbuf if wbuf
+  end
+
+  def proxy_response_finish(kcar, wbuf, req_res)
+    rbuf = Thread.current[:yahns_rbuf]
+    if len = kcar.body_bytes_left
+
+      case tmp = req_res.kgio_tryread(0x2000, rbuf)
+      when String
+        len = kcar.body_bytes_left -= tmp.size
+        wbuf.wbuf_write(self, tmp)
+      when nil # premature EOF
+        return proxy_err_response(nil, req_res, nil, wbuf)
+      when :wait_readable
+        return :wait_readable # self remains in :ignore, wait on upstream
+      end while len != 0
+
+    else # nasty chunked body
+
+      # Only HTTP/1.1 supports chunked responses, we must translate
+      # otherwise.  Otherwise, we must drop the connection to signal
+      # the end.  We only send HTTP/1.1 requests to the upstream so
+      # Rack/Rails can always send streaming responses.
+      rechunk = @hs.env['HTTP_VERSION'] == 'HTTP/1.1'.freeze
+      buf = ''
+      case tmp = req_res.kgio_tryread(0x2000, rbuf)
+      when String
+        kcar.filter_body(buf, tmp)
+        unless buf.empty?
+          tmp = rechunk ? [ "#{buf.size.to_s(16)}\r\n", buf, "\r\n".freeze ]
+                        : buf
+          wbuf.wbuf_write(self, tmp)
+        end
+      when nil # premature EOF
+        return proxy_err_response(nil, req_res, nil, wbuf)
+      when :wait_readable
+        return :wait_readable # self remains in :ignore, wait on upstream
+      end until kcar.body_eof?
+
+      # TODO: Trailer support
+      wbuf.wbuf_write(self, "0\r\n\r\n".freeze) if rechunk
+    end
+
+    busy = wbuf.busy and return proxy_busy_mod_blocked(wbuf, busy)
+    proxy_busy_mod_done(wbuf.wbuf_persist)
+  end
+
+  def proxy_busy_mod_done(alive)
+    q = Thread.current[:yahns_queue]
+    case http_response_done(alive)
+    when :wait_readable then q.queue_mod(self, Yahns::Queue::QEV_RD)
+    when :wait_writable then q.queue_mod(self, Yahns::Queue::QEV_WR)
+    when :close then Thread.current[:yahns_fdmap].sync_close(self)
+    end
+
+    nil # close the req_res, too
+  end
+
+  def proxy_busy_mod_blocked(wbuf, busy)
+    q = Thread.current[:yahns_queue]
+    # we are completely done reading and buffering the upstream response,
+    # but have not completely written the response to the client,
+    # yield control to the client socket:
+    @state = wbuf
+    case busy
+    when :wait_readable then q.queue_mod(self, Yahns::Queue::QEV_RD)
+    when :wait_writable then q.queue_mod(self, Yahns::Queue::QEV_WR)
+    else
+      abort "BUG: invalid wbuf.busy: #{busy.inspect}"
+    end
+    # no touching self after queue_mod
+    :ignore
+  end
+end
diff --git a/lib/yahns/proxy_pass.rb b/lib/yahns/proxy_pass.rb
index 9c12e99..09fb884 100644
--- a/lib/yahns/proxy_pass.rb
+++ b/lib/yahns/proxy_pass.rb
@@ -5,110 +5,168 @@
 require 'kgio'
 require 'kcar' # gem install kcar
 require 'rack/request'
-require 'thread'
 require 'timeout'
 
-# Totally synchronous and Rack 1.1-compatible, this will probably be rewritten.
-# to take advantage of rack.hijack and use the non-blocking I/O facilities
-# in yahns.  yahns may have to grow a supported API for that...
-# For now, we this blocks a worker thread; fortunately threads are reasonably
-# cheap on GNU/Linux...
+require_relative 'proxy_http_response'
+
 class Yahns::ProxyPass # :nodoc:
-  class ConnPool
-    def initialize
-      @mtx = Mutex.new
-      @objs = []
-    end
+  class ReqRes < Kgio::Socket
+    attr_writer :resbuf
 
-    def get
-      @mtx.synchronize { @objs.pop }
+    def req_start(c, req, input, chunked)
+      @hdr = @resbuf = nil
+      @yahns_client = c
+      @rrstate = input ? [ req, input, chunked ] : req
+      Thread.current[:yahns_queue].queue_add(self, Yahns::Queue::QEV_WR)
     end
 
-    def put(obj)
-      @mtx.synchronize { @objs << obj }
+    # we must reinitialize the thread-local rbuf if it may get beyond the
+    # current thread
+    def detach_rbuf!
+      Thread.current[:yahns_rbuf] = ''
     end
-  end
 
-  class UpstreamSocket < Kgio::Socket # :nodoc:
-    attr_writer :expiry
+    def yahns_step # yahns event loop entry point
+      case req = @rrstate
+      when Kcar::Parser # reading response...
+        buf = Thread.current[:yahns_rbuf]
+        c = @yahns_client
 
-    # called automatically by kgio_read!
-    def kgio_wait_readable(timeout = nil)
-      super(timeout || wait_time)
-    end
+        case resbuf = @resbuf # where are we at the response?
+        when nil # common case, catch the response header in a single read
 
-    def wait_time
-      tout = @expiry ? @expiry - Yahns.now : @timeout
-      raise Timeout::Error, "request timed out", [] if tout < 0
-      tout
-    end
+          case rv = kgio_tryread(0x2000, buf)
+          when String
+            if res = req.headers(@hdr = [], rv)
+              return c.proxy_response_start(res, rv, req, self)
+            else # ugh, big headers or tricked response
+              buf = detach_rbuf!
+              @resbuf = rv
+            end
+            # continue looping in middle "case @resbuf" loop
+          when :wait_readable
+            return rv # spurious wakeup
+          when nil then return c.proxy_err_response(502, self, nil, nil)
+          end # NOT looping here
 
-    def readpartial(bytes, buf = Thread.current[:yahns_rbuf] ||= "")
-      case rv = kgio_read!(bytes, buf)
-      when String
-        @expiry += @timeout # bump expiry when we succeed
-      end
-      rv
-    end
+        when String # continue reading trickled response headers from upstream
 
-    def req_write(buf, timeout)
-      @timeout = timeout
-      @expiry = Yahns.now + timeout
-      case rv = kgio_trywrite(buf)
-      when :wait_writable
-        kgio_wait_writable(wait_time)
-      when nil
-        return
-      when String
-        buf = rv
-      end while true
+          case rv = kgio_tryread(0x2000, buf)
+          when String then res = req.headers(@hdr, resbuf << rv) and break
+          when :wait_readable then return rv
+          when nil then return c.proxy_err_response(502, self, nil, nil)
+          end while true
+
+          return c.proxy_response_start(res, resbuf, req, self)
+
+        when Yahns::WbufCommon # streaming/buffering the response body
+
+          return c.proxy_response_finish(req, resbuf, self)
+
+        end while true # case @resbuf
+
+      when Array # [ (str|vec), rack.input, chunked? ]
+        send_req_body(req) # returns nil or :wait_writable
+      when String # buffered request header
+        send_req_buf(req)
+      end
+    rescue => e
+      c.proxy_err_response(502, self, e, nil)
     end
-  end # class UpstreamSocket
 
-  class UpstreamResponse < Kcar::Response # :nodoc:
     # Called by the Rack server at the end of a successful response
     def close
-      reusable = @parser.keepalive? && @parser.body_eof?
+      @hdr = @yahns_client = @rrstate = nil
       super
-      @pool.put(self) if reusable
-      nil
     end
 
-    # req is just a string buffer of HTTP headers
-    def req_write(req, timeout)
-      @sock.req_write(req, timeout)
-    end
+    # returns :wait_readable if complete, :wait_writable if not
+    def send_req_body(req)
+      buf, input, chunked = req
 
-    # returns true if the socket is still alive, nil if dead
-    def sock_alive?
-      @reused = (:wait_readable == (@sock.kgio_tryread(1) rescue nil)) ?
-                true : @sock.close
-    end
+      # get the first buffered chunk or vector
+      case rv = String === buf ? kgio_trywrite(buf) : kgio_trywritev(buf)
+      when String, Array
+        buf = rv # retry inner loop
+      when :wait_writable
+        req[0] = buf
+        return :wait_writable
+      when nil
+        break # onto writing body
+      end while true
 
-    # returns true if the socket was reused and thus retryable
-    def fail_retryable?
-      @sock.close
-      @reused
+      buf = Thread.current[:yahns_rbuf]
+
+      # Note: input (env['rack.input']) is fully-buffered by default so
+      # we should not be waiting on a slow network resource when reading
+      # input.  However, some weird configs may disable this on LANs
+
+      if chunked
+        while input.read(0x2000, buf)
+          vec = [ "#{buf.size.to_s(16)}\r\n", buf, "\r\n".freeze ]
+          case rv = kgio_trywritev(vec)
+          when Array
+            vec = rv # partial write, retry in case loop
+          when :wait_writable
+            buf = detach_rbuf!
+            req[0] = vec
+            return :wait_writable
+          when nil
+            break # continue onto reading next chunk
+          end while true
+        end
+        close_req_body(input)
+
+        # note: we do not send any trailer, they are folded into the header
+        # because this relies on full request buffering
+        send_req_buf("0\r\n\r\n".freeze)
+      else # identity request, easy:
+        while input.read(0x2000, buf)
+          case rv = kgio_trywrite(buf)
+          when String
+            buf = rv # partial write, retry in case loop
+          when :wait_writable
+            buf = detach_rbuf!
+            req[0] = buf
+            return :wait_writable
+          when nil
+            break # continue onto reading next block
+          end while true
+        end
+
+        close_req_body(input)
+        prepare_wait_readable
+      end
     end
 
-    def initialize(sock, pool)
-      super(sock)
-      @reused = false
-      @pool = pool
+    def prepare_wait_readable
+      @rrstate = Kcar::Parser.new
+      :wait_readable # all done sending the request, wait for response
     end
-  end # class UpstreamResponse
 
-  # take a responder from the pool, we'll add the object back to the
-  # pool in UpstreamResponse#close
-  def responder_get
-    while obj = @pool.get
-      return obj if obj.sock_alive?
+    def close_req_body(input)
+      case input
+      when Yahns::TeeInput, IO, StringIO
+        input.close
+      end
     end
 
-    UpstreamResponse.new(UpstreamSocket.start(@sockaddr), @pool)
-  end
+    # n.b. buf must be a detached string not shared with
+    # Thread.current[:yahns_rbuf] of any thread
+    def send_req_buf(buf)
+      case rv = kgio_trywrite(buf)
+      when String
+        buf = rv # retry inner loop
+      when :wait_writable
+        @rrstate = buf
+        return :wait_writable
+      when nil
+        return prepare_wait_readable
+      end while true
+    end
+  end # class ReqRes
 
-  def initialize(dest, timeout = 5)
+  def initialize(dest)
     case dest
     when %r{\Aunix:([^:]+)(?::(/.*))?\z}
       path = $2
@@ -121,8 +179,6 @@ def initialize(dest, timeout = 5)
       raise ArgumentError, "destination must be an HTTP URL or unix: path"
     end
     init_path_vars(path)
-    @pool = ConnPool.new
-    @timeout = timeout
   end
 
   def init_path_vars(path)
@@ -139,10 +195,15 @@ def init_path_vars(path)
   end
 
   def call(env)
-    request_method = env['REQUEST_METHOD']
+    # 3-way handshake for TCP backends while we generate the request header
+    rr = ReqRes.start(@sockaddr)
+    c = env['rack.hijack'].call
+
     req = Rack::Request.new(env)
-    path = @path.gsub(/\$(\w+)/) { req.__send__($1) }
-    req = "#{request_method} #{path} HTTP/1.1\r\n" \
+    req = @path.gsub(/\$(\w+)/) { req.__send__($1) }
+
+    # start the connection asynchronously and early so TCP can do a
+    req = "#{env['REQUEST_METHOD']} #{req} HTTP/1.1\r\n" \
           "X-Forwarded-For: #{env["REMOTE_ADDR"]}\r\n"
 
     # pass most HTTP_* headers through as-is
@@ -150,61 +211,24 @@ def call(env)
     env.each do |key, val|
       %r{\AHTTP_(\w+)\z} =~ key or next
       key = $1
-      next if %r{\A(?:VERSION|CONNECTION|KEEP_ALIVE|X_FORWARDED_FOR)} =~ key
+      # trailers are folded into the header, so do not send the Trailer:
+      # header in the request
+      next if /\A(?:VERSION|CONNECTION|KEEP_ALIVE|X_FORWARDED_FOR|TRAILER)/ =~
+         key
       chunked = true if %r{\ATRANSFER_ENCODING} =~ key && val =~ /\bchunked\b/i
-      key.tr!("_", "-")
+      key.tr!('_'.freeze, '-'.freeze)
       req << "#{key}: #{val}\r\n"
     end
 
     # special cases which Rack does not prefix:
     ctype = env["CONTENT_TYPE"] and req << "Content-Type: #{ctype}\r\n"
     clen = env["CONTENT_LENGTH"] and req << "Content-Length: #{clen}\r\n"
-    req << "\r\n"
-
-    # get an open socket and send the headers
-    ures = responder_get
-    ures.req_write(req, @timeout)
+    input = chunked || (clen && clen.to_i > 0) ? env['rack.input'] : nil
 
-    # send the request body if there was one
-    send_body(env["rack.input"], ures, chunked) if chunked || clen
-
-    # wait for the response here
-    _, header, body = res = ures.rack
-
-    # don't let the upstream Connection and Keep-Alive headers leak through
-    header.delete_if do |k,_|
-      k =~ /\A(?:Connection|Keep-Alive)\z/i
-    end
-
-    case request_method
-    when "HEAD"
-      # kcar doesn't know if it's a HEAD or GET response, and HEAD
-      # responses have Content-Length in it which fools kcar...
-      body.parser.body_bytes_left = 0
-      res[1] = header.dup
-      body.close # clobbers original header
-      res[2] = body = []
-    end
-    res
+    # finally, prepare to emit the headers
+    rr.req_start(c, req << "\r\n".freeze, input, chunked)
   rescue => e
-    retry if ures && ures.fail_retryable? && request_method != "POST"
     Yahns::Log.exception(env['rack.logger'], 'proxy_pass', e)
     [ 502, [ %w(Content-Length 0), %w(Content-Type text/plain) ], [] ]
   end
-
-  def send_body(input, ures, chunked)
-    buf = Thread.current[:yahns_rbuf] ||= ""
-
-    if chunked # unlikely
-      while input.read(16384, buf)
-        buf.replace("#{buf.size.to_s(16)}\r\n#{buf}\r\n")
-        ures.req_write(buf, @timeout)
-      end
-      ures.req_write("0\r\n\r\n", @timeout)
-    else # common if we hit uploads
-      while input.read(16384, buf)
-        ures.req_write(buf, @timeout)
-      end
-    end
-  end
 end
diff --git a/lib/yahns/queue_epoll.rb b/lib/yahns/queue_epoll.rb
index da90a95..10ff607 100644
--- a/lib/yahns/queue_epoll.rb
+++ b/lib/yahns/queue_epoll.rb
@@ -27,9 +27,14 @@ def queue_add(io, flags)
     epoll_ctl(Epoll::CTL_ADD, io, flags)
   end
 
+  def queue_mod(io, flags)
+    epoll_ctl(Epoll::CTL_MOD, io, flags)
+  end
+
   def thr_init
     Thread.current[:yahns_rbuf] = ""
     Thread.current[:yahns_fdmap] = @fdmap
+    Thread.current[:yahns_queue] = self
   end
 
   # returns an array of infinitely running threads
diff --git a/lib/yahns/queue_kqueue.rb b/lib/yahns/queue_kqueue.rb
index 4e5c133..c0667b7 100644
--- a/lib/yahns/queue_kqueue.rb
+++ b/lib/yahns/queue_kqueue.rb
@@ -36,9 +36,14 @@ def queue_add(io, flags)
     kevent(Kevent[io.fileno, flags, fflags, 0, 0, io])
   end
 
+  def queue_mod(io, flags)
+    kevent(Kevent[io.fileno, flags, ADD_ONESHOT, 0, 0, io])
+  end
+
   def thr_init
     Thread.current[:yahns_rbuf] = ""
     Thread.current[:yahns_fdmap] = @fdmap
+    Thread.current[:yahns_queue] = self
   end
 
   # returns an array of infinitely running threads
diff --git a/lib/yahns/wbuf.rb b/lib/yahns/wbuf.rb
index fa39b9b..42776cf 100644
--- a/lib/yahns/wbuf.rb
+++ b/lib/yahns/wbuf.rb
@@ -30,6 +30,7 @@
 class Yahns::Wbuf # :nodoc:
   include Yahns::WbufCommon
   attr_reader :busy
+  attr_reader :wbuf_persist
 
   def initialize(body, persist, tmpdir, busy)
     @tmpio = nil
@@ -71,7 +72,7 @@ def wbuf_write(c, buf)
 
     # we're all caught up, try to prevent dirty data from getting flushed
     # to disk if we can help it.
-    @tmpio = @tmpio.close
+    wbuf_abort
     @sf_offset = 0
     @busy = false
     nil
@@ -79,7 +80,11 @@ def wbuf_write(c, buf)
 
   # called by last wbuf_flush
   def wbuf_close(client)
-    @tmpio = @tmpio.close if @tmpio
+    wbuf_abort
     wbuf_close_common(client)
   end
+
+  def wbuf_abort
+    @tmpio = @tmpio.close if @tmpio
+  end
 end
diff --git a/test/test_proxy_pass.rb b/test/test_proxy_pass.rb
index 5398b29..5bf8722 100644
--- a/test/test_proxy_pass.rb
+++ b/test/test_proxy_pass.rb
@@ -6,13 +6,45 @@
 class TestProxyPass < Testcase
   ENV["N"].to_i > 1 and parallelize_me!
   include ServerHelper
+  OMFG = 'a' * (1024 * 1024 * 32)
 
   class ProxiedApp
     def call(env)
       h = [ %w(Content-Length 3), %w(Content-Type text/plain) ]
       case env['REQUEST_METHOD']
       when 'GET'
-        [ 200, h, [ "hi\n"] ]
+        case env['PATH_INFO']
+        when '/giant-body'
+          h = [ %W(Content-Length #{OMFG.size}), %w(Content-Type text/plain) ]
+          [ 200, h, [ OMFG ] ]
+        when %r{\A/slow-headers-(\d+(?:\.\d+)?)\z}
+          delay = $1.to_f
+          io = env['rack.hijack'].call
+          [ "HTTP/1.1 200 OK\r\n",
+            "Content-Length: 7\r\n",
+            "Content-Type: text/PAIN\r\n",
+            "connection: close\r\n\r\n",
+            "HIHIHI!"
+          ].each do |l|
+            io.write(l)
+            sleep delay
+          end
+          io.close
+        when %r{\A/chunky-slow-(\d+(?:\.\d+)?)\z}
+          delay = $1.to_f
+          chunky = Object.new
+          chunky.instance_variable_set(:@delay, delay)
+          def chunky.each
+            sleep @delay
+            yield "3\r\nHI!\r\n"
+            sleep @delay
+            yield "0\r\n\r\n"
+          end
+          h = [ %w(Content-Type text/pain), %w(Transfer-Encoding chunked) ]
+          [ 200, h, chunky ]
+        else
+          [ 200, h, [ "hi\n"] ]
+        end
       when 'HEAD'
         [ 200, h, [] ]
       when 'PUT'
@@ -72,6 +104,7 @@ def test_unix_socket_no_path
         stderr_path err.path
       end
     end
+
     Net::HTTP.start(host, port) do |http|
       res = http.request(Net::HTTP::Get.new('/f00'))
       assert_equal 200, res.code.to_i
@@ -109,7 +142,7 @@ def test_proxy_pass
       require 'yahns/proxy_pass'
       @srv2.close
       cfg.instance_eval do
-        app(:rack, Yahns::ProxyPass.new("http://#{host2}:#{port2}/")) do
+        app(:rack, Yahns::ProxyPass.new("http://#{host2}:#{port2}")) do
           listen "#{host}:#{port}"
         end
         stderr_path err.path
@@ -126,6 +159,8 @@ def test_proxy_pass
       end
     end
 
+    check_pipelining(host, port)
+
     gplv3 = File.open('COPYING')
 
     Net::HTTP.start(host, port) do |http|
@@ -138,7 +173,7 @@ def test_proxy_pass
       assert_equal n, res['Content-Length'].to_i
       assert_nil res.body
 
-      # chunked encoding
+      # chunked encoding (PUT)
       req = Net::HTTP::Put.new('/')
       req.body_stream = gplv3
       req.content_type = 'application/octet-stream'
@@ -148,6 +183,18 @@ def test_proxy_pass
       assert_equal gplv3.read, res.body
       assert_equal 201, res.code.to_i
 
+      # chunked encoding (GET)
+      res = http.request(Net::HTTP::Get.new('/chunky-slow-0.1'))
+      assert_equal 200, res.code.to_i
+      assert_equal 'chunked', res['Transfer-encoding']
+      assert_equal "HI!", res.body
+
+      # slow headers (GET)
+      res = http.request(Net::HTTP::Get.new('/slow-headers-0.01'))
+      assert_equal 200, res.code.to_i
+      assert_equal 'text/PAIN', res['Content-Type']
+      assert_equal 'HIHIHI!', res.body
+
       # normal content-length
       gplv3.rewind
       req = Net::HTTP::Put.new('/')
@@ -158,10 +205,77 @@ def test_proxy_pass
       gplv3.rewind
       assert_equal gplv3.read, res.body
       assert_equal 201, res.code.to_i
+
+      # giant body
+      res = http.request(Net::HTTP::Get.new('/giant-body'))
+      assert_equal 200, res.code.to_i
+      assert_equal OMFG, res.body
+    end
+
+    # ensure we do not chunk responses back to an HTTP/1.0 client even if
+    # the proxy <-> upstream connection is chunky
+    %w(0 0.1).each do |delay|
+      begin
+        h10 = TCPSocket.new(host, port)
+        h10.write "GET /chunky-slow-#{delay} HTTP/1.0\r\n\r\n"
+        res = Timeout.timeout(60) { h10.read }
+        assert_match %r{^Connection: close\r\n}, res
+        assert_match %r{^Content-Type: text/pain\r\n}, res
+        assert_match %r{\r\n\r\nHI!\z}, res
+        refute_match %r{^Transfer-Encoding:}, res
+        refute_match %r{\r0\r\n}, res
+      ensure
+        h10.close
+      end
     end
   ensure
     gplv3.close if gplv3
     quit_wait pid
     quit_wait pid2
   end
+
+  def check_pipelining(host, port)
+    pl = TCPSocket.new(host, port)
+    r1 = ''
+    r2 = ''
+    r3 = ''
+    Timeout.timeout(60) do
+      pl.write "GET / HTTP/1.1\r\nHost: example.com\r\n\r\nGET /"
+      until r1 =~ /hi\n/
+        r1 << pl.readpartial(666)
+      end
+
+      pl.write "chunky-slow-0.1 HTTP/1.1\r\nHost: example.com\r\n\r\nP"
+      until r2 =~ /\r\n3\r\nHI!\r\n0\r\n\r\n/
+        r2 << pl.readpartial(666)
+      end
+
+      if false
+        pl.write "ET / HTTP/1.1\r\nHost: example.com\r\n\r\n"
+        until r3 =~ /hi\n/
+          r3 << pl.readpartial(666)
+        end
+      else
+        pl.write "UT / HTTP/1.1\r\nHost: example.com\r\n"
+        pl.write "Transfer-Encoding: chunked\r\n\r\n"
+        pl.write "6\r\nchunky\r\n"
+        pl.write "0\r\n\r\n"
+
+        until r3 =~ /chunky/
+          r3 << pl.readpartial(666)
+        end
+      end
+    end
+    r1 = r1.split("\r\n").reject { |x| x =~ /^Date: / }
+    r2 = r2.split("\r\n").reject { |x| x =~ /^Date: / }
+    assert_equal 'HTTP/1.1 200 OK', r1[0]
+    assert_equal 'HTTP/1.1 200 OK', r2[0]
+    assert_match %r{\r\n\r\nchunky\z}, r3
+    assert_match %r{\AHTTP/1\.1 201 Created\r\n}, r3
+  rescue => e
+    warn [ e.class, e.message ].inspect
+    warn e.backtrace.join("\n")
+  ensure
+    pl.close
+  end
 end
-- 
EW


^ permalink raw reply	[flat|nested] only message in thread

only message in thread, other threads:[~2015-04-03  1:53 UTC | newest]

Thread overview: (only message) (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2015-04-03  1:53 [PATCH] proxy_pass: rewrite to be async, using rack.hijack Eric Wong

Code repositories for project(s) associated with this inbox:

	../../../yahns.git

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).