From: Eric Wong <e@80x24.org>
To: yahns-public@yhbt.net
Subject: [PATCH] extras/proxy_pass: reinstate synchronous version
Date: Tue, 7 Apr 2015 22:30:59 +0000 [thread overview]
Message-ID: <1428445859-15815-1-git-send-email-e@80x24.org> (raw)
Since yahns/proxy_pass is not a drop-in replacement, reinstate
the old, synchronous version to avoid breaking existing setups
which require Rack middleware support.
---
| 222 ++++++++++++++++++++++++++++++++++++++++-
| 103 +++++++++++++++++++
2 files changed, 320 insertions(+), 5 deletions(-)
create mode 100644 test/test_extras_proxy_pass.rb
--git a/extras/proxy_pass.rb b/extras/proxy_pass.rb
index af6fb7c..b7fc87a 100644
--- a/extras/proxy_pass.rb
+++ b/extras/proxy_pass.rb
@@ -1,9 +1,221 @@
+# -*- encoding: binary -*-
+# Copyright (C) 2013-2015 all contributors <yahns-public@yhbt.net>
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+require 'time'
+require 'socket'
+require 'kgio'
+require 'kcar' # gem install kcar
+require 'rack/request'
+require 'thread'
+require 'timeout'
+
# compatibility class
warn <<EOF
-ProxyPass is no longer a generic Rack app but relies on yahns internals
-via rack.hijack. Update require statements to point to 'yahns/proxy_pass'
-and change use the 'Yahns::ProxyPass' constant instead.
+See Yahns::ProxyPass for less memory-hungry (but more complex) implementating
+using rack.hijack
EOF
-require 'yahns/proxy_pass'
-ProxyPass = Yahns::ProxyPass
+# Totally synchronous and Rack 1.1-compatible, this will probably be rewritten.
+# to take advantage of rack.hijack and use the non-blocking I/O facilities
+# in yahns. yahns may have to grow a supported API for that...
+# For now, we this blocks a worker thread; fortunately threads are reasonably
+# cheap on GNU/Linux...
+# This is totally untested but currently doesn't serve anything important.
+class ProxyPass # :nodoc:
+ class ConnPool
+ def initialize
+ @mtx = Mutex.new
+ @objs = []
+ end
+
+ def get
+ @mtx.synchronize { @objs.pop }
+ end
+
+ def put(obj)
+ @mtx.synchronize { @objs << obj }
+ end
+ end
+
+ class UpstreamSocket < Kgio::Socket # :nodoc:
+ attr_writer :expiry
+
+ # called automatically by kgio_read!
+ def kgio_wait_readable(timeout = nil)
+ super(timeout || wait_time)
+ end
+
+ def wait_time
+ tout = @expiry ? @expiry - Time.now : @timeout
+ raise Timeout::Error, "request timed out", [] if tout < 0
+ tout
+ end
+
+ def readpartial(bytes, buf = Thread.current[:proxy_pass_buf] ||= "")
+ case rv = kgio_read!(bytes, buf)
+ when String
+ @expiry += @timeout # bump expiry when we succeed
+ end
+ rv
+ end
+
+ def req_write(buf, timeout)
+ @timeout = timeout
+ @expiry = Time.now + timeout
+ case rv = kgio_trywrite(buf)
+ when :wait_writable
+ kgio_wait_writable(wait_time)
+ when nil
+ return
+ when String
+ buf = rv
+ end while true
+ end
+ end # class UpstreamSocket
+
+ class UpstreamResponse < Kcar::Response # :nodoc:
+ # Called by the Rack server at the end of a successful response
+ def close
+ reusable = @parser.keepalive? && @parser.body_eof?
+ super
+ @pool.put(self) if reusable
+ nil
+ end
+
+ # req is just a string buffer of HTTP headers
+ def req_write(req, timeout)
+ @sock.req_write(req, timeout)
+ end
+
+ # returns true if the socket is still alive, nil if dead
+ def sock_alive?
+ @reused = (:wait_readable == (@sock.kgio_tryread(1) rescue nil)) ?
+ true : @sock.close
+ end
+
+ # returns true if the socket was reused and thus retryable
+ def fail_retryable?
+ @sock.close
+ @reused
+ end
+
+ def initialize(sock, pool)
+ super(sock)
+ @reused = false
+ @pool = pool
+ end
+ end # class UpstreamResponse
+
+ # take a responder from the pool, we'll add the object back to the
+ # pool in UpstreamResponse#close
+ def responder_get
+ while obj = @pool.get
+ return obj if obj.sock_alive?
+ end
+
+ UpstreamResponse.new(UpstreamSocket.start(@sockaddr), @pool)
+ end
+
+ def initialize(dest, timeout = 5)
+ case dest
+ when %r{\Aunix:([^:]+)(?::(/.*))?\z}
+ path = $2
+ @sockaddr = Socket.sockaddr_un($1)
+ when %r{\Ahttp://([^/]+)(/.*)?\z}
+ path = $2
+ host, port = $1.split(':')
+ @sockaddr = Socket.sockaddr_in(port || 80, host)
+ else
+ raise ArgumentError, "destination must be an HTTP URL or unix: path"
+ end
+ init_path_vars(path)
+ @pool = ConnPool.new
+ @timeout = timeout
+ end
+
+ def init_path_vars(path)
+ path ||= '$fullpath'
+ # methods from Rack::Request we want:
+ allow = %w(fullpath host_with_port host port url path)
+ want = path.scan(/\$(\w+)/).flatten! || []
+ diff = want - allow
+ diff.empty? or
+ raise ArgumentError, "vars not allowed: #{diff.uniq.join(' ')}"
+
+ # kill leading slash just in case...
+ @path = path.gsub(%r{\A/(\$(?:fullpath|path))}, '\1')
+ end
+
+ def call(env)
+ request_method = env['REQUEST_METHOD']
+ req = Rack::Request.new(env)
+ path = @path.gsub(/\$(\w+)/) { req.__send__($1) }
+ req = "#{request_method} #{path} HTTP/1.1\r\n" \
+ "X-Forwarded-For: #{env["REMOTE_ADDR"]}\r\n"
+
+ # pass most HTTP_* headers through as-is
+ chunked = false
+ env.each do |key, val|
+ %r{\AHTTP_(\w+)\z} =~ key or next
+ key = $1
+ next if %r{\A(?:VERSION|CONNECTION|KEEP_ALIVE|X_FORWARDED_FOR)} =~ key
+ chunked = true if %r{\ATRANSFER_ENCODING} =~ key && val =~ /\bchunked\b/i
+ key.tr!("_", "-")
+ req << "#{key}: #{val}\r\n"
+ end
+
+ # special cases which Rack does not prefix:
+ ctype = env["CONTENT_TYPE"] and req << "Content-Type: #{ctype}\r\n"
+ clen = env["CONTENT_LENGTH"] and req << "Content-Length: #{clen}\r\n"
+ req << "\r\n"
+
+ # get an open socket and send the headers
+ ures = responder_get
+ ures.req_write(req, @timeout)
+
+ # send the request body if there was one
+ send_body(env["rack.input"], ures, chunked) if chunked || clen
+
+ # wait for the response here
+ _, header, body = res = ures.rack
+
+ # don't let the upstream Connection and Keep-Alive headers leak through
+ header.delete_if do |k,_|
+ k =~ /\A(?:Connection|Keep-Alive)\z/i
+ end
+
+ case request_method
+ when "HEAD"
+ # kcar doesn't know if it's a HEAD or GET response, and HEAD
+ # responses have Content-Length in it which fools kcar...
+ body.parser.body_bytes_left = 0
+ res[1] = header.dup
+ body.close # clobbers original header
+ res[2] = body = []
+ end
+ res
+ rescue => e
+ retry if ures && ures.fail_retryable? && request_method != "POST"
+ if defined?(Yahns::Log)
+ logger = env['rack.logger'] and
+ Yahns::Log.exception(logger, 'proxy_pass', e)
+ end
+ [ 502, [ %w(Content-Length 0), %w(Content-Type text/plain) ], [] ]
+ end
+
+ def send_body(input, ures, chunked)
+ buf = Thread.current[:proxy_pass_buf] ||= ""
+
+ if chunked # unlikely
+ while input.read(16384, buf)
+ buf.replace("#{buf.size.to_s(16)}\r\n#{buf}\r\n")
+ ures.req_write(buf, @timeout)
+ end
+ ures.req_write("0\r\n\r\n", @timeout)
+ else # common if we hit uploads
+ while input.read(16384, buf)
+ ures.req_write(buf, @timeout)
+ end
+ end
+ end
+end
--git a/test/test_extras_proxy_pass.rb b/test/test_extras_proxy_pass.rb
new file mode 100644
index 0000000..495a920
--- /dev/null
+++ b/test/test_extras_proxy_pass.rb
@@ -0,0 +1,103 @@
+# Copyright (C) 2015 all contributors <yahns-public@yhbt.net>
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+require_relative 'server_helper'
+
+class TestExtrasProxyPass < Testcase
+ ENV["N"].to_i > 1 and parallelize_me!
+ include ServerHelper
+
+ class ProxiedApp
+ def call(env)
+ h = [ %w(Content-Length 3), %w(Content-Type text/plain) ]
+ case env['REQUEST_METHOD']
+ when 'GET'
+ [ 200, h, [ "hi\n"] ]
+ when 'HEAD'
+ [ 200, h, [] ]
+ when 'PUT'
+ buf = env['rack.input'].read
+ [ 201, {
+ 'Content-Length' => buf.bytesize.to_s,
+ 'Content-Type' => 'text/plain',
+ }, [ buf ] ]
+ end
+ end
+ end
+
+ def setup
+ @srv2 = TCPServer.new(ENV["TEST_HOST"] || "127.0.0.1", 0)
+ server_helper_setup
+ end
+
+ def teardown
+ @srv2.close if defined?(@srv2) && !@srv2.closed?
+ server_helper_teardown
+ end
+
+ def test_proxy_pass
+ err, cfg, host, port = @err, Yahns::Config.new, @srv.addr[3], @srv.addr[1]
+ host2, port2 = @srv2.addr[3], @srv2.addr[1]
+ pid = mkserver(cfg) do
+ $LOAD_PATH.unshift "#{Dir.pwd}/extras"
+ olderr = $stderr
+ $stderr = StringIO.new
+ require 'proxy_pass'
+ $stderr = olderr
+ @srv2.close
+ cfg.instance_eval do
+ app(:rack, ProxyPass.new("http://#{host2}:#{port2}/")) do
+ listen "#{host}:#{port}"
+ end
+ stderr_path err.path
+ end
+ end
+
+ pid2 = mkserver(cfg, @srv2) do
+ @srv.close
+ cfg.instance_eval do
+ app(:rack, ProxiedApp.new) do
+ listen "#{host2}:#{port2}"
+ end
+ stderr_path err.path
+ end
+ end
+
+ gplv3 = File.open('COPYING')
+
+ Net::HTTP.start(host, port) do |http|
+ res = http.request(Net::HTTP::Get.new('/'))
+ assert_equal 200, res.code.to_i
+ n = res.body.bytesize
+ assert_operator n, :>, 1
+ res = http.request(Net::HTTP::Head.new('/'))
+ assert_equal 200, res.code.to_i
+ assert_equal n, res['Content-Length'].to_i
+ assert_nil res.body
+
+ # chunked encoding
+ req = Net::HTTP::Put.new('/')
+ req.body_stream = gplv3
+ req.content_type = 'application/octet-stream'
+ req['Transfer-Encoding'] = 'chunked'
+ res = http.request(req)
+ gplv3.rewind
+ assert_equal gplv3.read, res.body
+ assert_equal 201, res.code.to_i
+
+ # normal content-length
+ gplv3.rewind
+ req = Net::HTTP::Put.new('/')
+ req.body_stream = gplv3
+ req.content_type = 'application/octet-stream'
+ req.content_length = gplv3.size
+ res = http.request(req)
+ gplv3.rewind
+ assert_equal gplv3.read, res.body
+ assert_equal 201, res.code.to_i
+ end
+ ensure
+ gplv3.close if gplv3
+ quit_wait pid
+ quit_wait pid2
+ end
+end
--
EW
reply other threads:[~2015-04-07 22:30 UTC|newest]
Thread overview: [no followups] expand[flat|nested] mbox.gz Atom feed
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
List information: https://yhbt.net/yahns/README
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=1428445859-15815-1-git-send-email-e@80x24.org \
--to=e@80x24.org \
--cc=yahns-public@yhbt.net \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
Code repositories for project(s) associated with this public inbox
https://yhbt.net/yahns.git/
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).