yahns Ruby server user/dev discussion
 help / color / mirror / code / Atom feed
* [PATCH] extras/proxy_pass: reinstate synchronous version
@ 2015-04-07 22:30 Eric Wong
  0 siblings, 0 replies; only message in thread
From: Eric Wong @ 2015-04-07 22:30 UTC (permalink / raw)
  To: yahns-public

Since yahns/proxy_pass is not a drop-in replacement, reinstate
the old, synchronous version to avoid breaking existing setups
which require Rack middleware support.
---
 extras/proxy_pass.rb           | 222 ++++++++++++++++++++++++++++++++++++++++-
 test/test_extras_proxy_pass.rb | 103 +++++++++++++++++++
 2 files changed, 320 insertions(+), 5 deletions(-)
 create mode 100644 test/test_extras_proxy_pass.rb

diff --git a/extras/proxy_pass.rb b/extras/proxy_pass.rb
index af6fb7c..b7fc87a 100644
--- a/extras/proxy_pass.rb
+++ b/extras/proxy_pass.rb
@@ -1,9 +1,221 @@
+# -*- encoding: binary -*-
+# Copyright (C) 2013-2015 all contributors <yahns-public@yhbt.net>
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+require 'time'
+require 'socket'
+require 'kgio'
+require 'kcar' # gem install kcar
+require 'rack/request'
+require 'thread'
+require 'timeout'
+
 # compatibility class
 warn <<EOF
-ProxyPass is no longer a generic Rack app but relies on yahns internals
-via rack.hijack.  Update require statements to point to 'yahns/proxy_pass'
-and change use the 'Yahns::ProxyPass' constant instead.
+See Yahns::ProxyPass for less memory-hungry (but more complex) implementating
+using rack.hijack
 EOF
 
-require 'yahns/proxy_pass'
-ProxyPass = Yahns::ProxyPass
+# Totally synchronous and Rack 1.1-compatible, this will probably be rewritten.
+# to take advantage of rack.hijack and use the non-blocking I/O facilities
+# in yahns.  yahns may have to grow a supported API for that...
+# For now, we this blocks a worker thread; fortunately threads are reasonably
+# cheap on GNU/Linux...
+# This is totally untested but currently doesn't serve anything important.
+class ProxyPass # :nodoc:
+  class ConnPool
+    def initialize
+      @mtx = Mutex.new
+      @objs = []
+    end
+
+    def get
+      @mtx.synchronize { @objs.pop }
+    end
+
+    def put(obj)
+      @mtx.synchronize { @objs << obj }
+    end
+  end
+
+  class UpstreamSocket < Kgio::Socket # :nodoc:
+    attr_writer :expiry
+
+    # called automatically by kgio_read!
+    def kgio_wait_readable(timeout = nil)
+      super(timeout || wait_time)
+    end
+
+    def wait_time
+      tout = @expiry ? @expiry - Time.now : @timeout
+      raise Timeout::Error, "request timed out", [] if tout < 0
+      tout
+    end
+
+    def readpartial(bytes, buf = Thread.current[:proxy_pass_buf] ||= "")
+      case rv = kgio_read!(bytes, buf)
+      when String
+        @expiry += @timeout # bump expiry when we succeed
+      end
+      rv
+    end
+
+    def req_write(buf, timeout)
+      @timeout = timeout
+      @expiry = Time.now + timeout
+      case rv = kgio_trywrite(buf)
+      when :wait_writable
+        kgio_wait_writable(wait_time)
+      when nil
+        return
+      when String
+        buf = rv
+      end while true
+    end
+  end # class UpstreamSocket
+
+  class UpstreamResponse < Kcar::Response # :nodoc:
+    # Called by the Rack server at the end of a successful response
+    def close
+      reusable = @parser.keepalive? && @parser.body_eof?
+      super
+      @pool.put(self) if reusable
+      nil
+    end
+
+    # req is just a string buffer of HTTP headers
+    def req_write(req, timeout)
+      @sock.req_write(req, timeout)
+    end
+
+    # returns true if the socket is still alive, nil if dead
+    def sock_alive?
+      @reused = (:wait_readable == (@sock.kgio_tryread(1) rescue nil)) ?
+                true : @sock.close
+    end
+
+    # returns true if the socket was reused and thus retryable
+    def fail_retryable?
+      @sock.close
+      @reused
+    end
+
+    def initialize(sock, pool)
+      super(sock)
+      @reused = false
+      @pool = pool
+    end
+  end # class UpstreamResponse
+
+  # take a responder from the pool, we'll add the object back to the
+  # pool in UpstreamResponse#close
+  def responder_get
+    while obj = @pool.get
+      return obj if obj.sock_alive?
+    end
+
+    UpstreamResponse.new(UpstreamSocket.start(@sockaddr), @pool)
+  end
+
+  def initialize(dest, timeout = 5)
+    case dest
+    when %r{\Aunix:([^:]+)(?::(/.*))?\z}
+      path = $2
+      @sockaddr = Socket.sockaddr_un($1)
+    when %r{\Ahttp://([^/]+)(/.*)?\z}
+      path = $2
+      host, port = $1.split(':')
+      @sockaddr = Socket.sockaddr_in(port || 80, host)
+    else
+      raise ArgumentError, "destination must be an HTTP URL or unix: path"
+    end
+    init_path_vars(path)
+    @pool = ConnPool.new
+    @timeout = timeout
+  end
+
+  def init_path_vars(path)
+    path ||= '$fullpath'
+    # methods from Rack::Request we want:
+    allow = %w(fullpath host_with_port host port url path)
+    want = path.scan(/\$(\w+)/).flatten! || []
+    diff = want - allow
+    diff.empty? or
+             raise ArgumentError, "vars not allowed: #{diff.uniq.join(' ')}"
+
+    # kill leading slash just in case...
+    @path = path.gsub(%r{\A/(\$(?:fullpath|path))}, '\1')
+  end
+
+  def call(env)
+    request_method = env['REQUEST_METHOD']
+    req = Rack::Request.new(env)
+    path = @path.gsub(/\$(\w+)/) { req.__send__($1) }
+    req = "#{request_method} #{path} HTTP/1.1\r\n" \
+          "X-Forwarded-For: #{env["REMOTE_ADDR"]}\r\n"
+
+    # pass most HTTP_* headers through as-is
+    chunked = false
+    env.each do |key, val|
+      %r{\AHTTP_(\w+)\z} =~ key or next
+      key = $1
+      next if %r{\A(?:VERSION|CONNECTION|KEEP_ALIVE|X_FORWARDED_FOR)} =~ key
+      chunked = true if %r{\ATRANSFER_ENCODING} =~ key && val =~ /\bchunked\b/i
+      key.tr!("_", "-")
+      req << "#{key}: #{val}\r\n"
+    end
+
+    # special cases which Rack does not prefix:
+    ctype = env["CONTENT_TYPE"] and req << "Content-Type: #{ctype}\r\n"
+    clen = env["CONTENT_LENGTH"] and req << "Content-Length: #{clen}\r\n"
+    req << "\r\n"
+
+    # get an open socket and send the headers
+    ures = responder_get
+    ures.req_write(req, @timeout)
+
+    # send the request body if there was one
+    send_body(env["rack.input"], ures, chunked) if chunked || clen
+
+    # wait for the response here
+    _, header, body = res = ures.rack
+
+    # don't let the upstream Connection and Keep-Alive headers leak through
+    header.delete_if do |k,_|
+      k =~ /\A(?:Connection|Keep-Alive)\z/i
+    end
+
+    case request_method
+    when "HEAD"
+      # kcar doesn't know if it's a HEAD or GET response, and HEAD
+      # responses have Content-Length in it which fools kcar...
+      body.parser.body_bytes_left = 0
+      res[1] = header.dup
+      body.close # clobbers original header
+      res[2] = body = []
+    end
+    res
+  rescue => e
+    retry if ures && ures.fail_retryable? && request_method != "POST"
+    if defined?(Yahns::Log)
+      logger = env['rack.logger'] and
+        Yahns::Log.exception(logger, 'proxy_pass', e)
+    end
+    [ 502, [ %w(Content-Length 0), %w(Content-Type text/plain) ], [] ]
+  end
+
+  def send_body(input, ures, chunked)
+    buf = Thread.current[:proxy_pass_buf] ||= ""
+
+    if chunked # unlikely
+      while input.read(16384, buf)
+        buf.replace("#{buf.size.to_s(16)}\r\n#{buf}\r\n")
+        ures.req_write(buf, @timeout)
+      end
+      ures.req_write("0\r\n\r\n", @timeout)
+    else # common if we hit uploads
+      while input.read(16384, buf)
+        ures.req_write(buf, @timeout)
+      end
+    end
+  end
+end
diff --git a/test/test_extras_proxy_pass.rb b/test/test_extras_proxy_pass.rb
new file mode 100644
index 0000000..495a920
--- /dev/null
+++ b/test/test_extras_proxy_pass.rb
@@ -0,0 +1,103 @@
+# Copyright (C) 2015 all contributors <yahns-public@yhbt.net>
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+require_relative 'server_helper'
+
+class TestExtrasProxyPass < Testcase
+  ENV["N"].to_i > 1 and parallelize_me!
+  include ServerHelper
+
+  class ProxiedApp
+    def call(env)
+      h = [ %w(Content-Length 3), %w(Content-Type text/plain) ]
+      case env['REQUEST_METHOD']
+      when 'GET'
+        [ 200, h, [ "hi\n"] ]
+      when 'HEAD'
+        [ 200, h, [] ]
+      when 'PUT'
+        buf = env['rack.input'].read
+        [ 201, {
+          'Content-Length' => buf.bytesize.to_s,
+          'Content-Type' => 'text/plain',
+          }, [ buf ] ]
+      end
+    end
+  end
+
+  def setup
+    @srv2 = TCPServer.new(ENV["TEST_HOST"] || "127.0.0.1", 0)
+    server_helper_setup
+  end
+
+  def teardown
+    @srv2.close if defined?(@srv2) && !@srv2.closed?
+    server_helper_teardown
+  end
+
+  def test_proxy_pass
+    err, cfg, host, port = @err, Yahns::Config.new, @srv.addr[3], @srv.addr[1]
+    host2, port2 = @srv2.addr[3], @srv2.addr[1]
+    pid = mkserver(cfg) do
+      $LOAD_PATH.unshift "#{Dir.pwd}/extras"
+      olderr = $stderr
+      $stderr = StringIO.new
+      require 'proxy_pass'
+      $stderr = olderr
+      @srv2.close
+      cfg.instance_eval do
+        app(:rack, ProxyPass.new("http://#{host2}:#{port2}/")) do
+          listen "#{host}:#{port}"
+        end
+        stderr_path err.path
+      end
+    end
+
+    pid2 = mkserver(cfg, @srv2) do
+      @srv.close
+      cfg.instance_eval do
+        app(:rack, ProxiedApp.new) do
+          listen "#{host2}:#{port2}"
+        end
+        stderr_path err.path
+      end
+    end
+
+    gplv3 = File.open('COPYING')
+
+    Net::HTTP.start(host, port) do |http|
+      res = http.request(Net::HTTP::Get.new('/'))
+      assert_equal 200, res.code.to_i
+      n = res.body.bytesize
+      assert_operator n, :>, 1
+      res = http.request(Net::HTTP::Head.new('/'))
+      assert_equal 200, res.code.to_i
+      assert_equal n, res['Content-Length'].to_i
+      assert_nil res.body
+
+      # chunked encoding
+      req = Net::HTTP::Put.new('/')
+      req.body_stream = gplv3
+      req.content_type = 'application/octet-stream'
+      req['Transfer-Encoding'] = 'chunked'
+      res = http.request(req)
+      gplv3.rewind
+      assert_equal gplv3.read, res.body
+      assert_equal 201, res.code.to_i
+
+      # normal content-length
+      gplv3.rewind
+      req = Net::HTTP::Put.new('/')
+      req.body_stream = gplv3
+      req.content_type = 'application/octet-stream'
+      req.content_length = gplv3.size
+      res = http.request(req)
+      gplv3.rewind
+      assert_equal gplv3.read, res.body
+      assert_equal 201, res.code.to_i
+    end
+  ensure
+    gplv3.close if gplv3
+    quit_wait pid
+    quit_wait pid2
+  end
+end
-- 
EW


^ permalink raw reply related	[flat|nested] only message in thread

only message in thread, other threads:[~2015-04-07 22:30 UTC | newest]

Thread overview: (only message) (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2015-04-07 22:30 [PATCH] extras/proxy_pass: reinstate synchronous version Eric Wong

Code repositories for project(s) associated with this public inbox

	https://yhbt.net/yahns.git/

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).