* [PATCH 3/3] proxy_pass: officially become a part of yahns
@ 2015-03-14 3:17 7% ` Eric Wong
0 siblings, 0 replies; 1+ results
From: Eric Wong @ 2015-03-14 3:17 UTC (permalink / raw)
To: yahns-public
This will rely on rack.hijack in the future to support
asynchronous execution without tying up a thread when waiting
for upstreams. For now, this allows simpler code with fewer
checks and the use of monotonic time on newer versions of Ruby.
---
| 224 +--------------------
lib/yahns/proxy_pass.rb | 210 +++++++++++++++++++
...est_extras_proxy_pass.rb => test_proxy_pass.rb} | 10 +-
3 files changed, 224 insertions(+), 220 deletions(-)
create mode 100644 lib/yahns/proxy_pass.rb
rename test/{test_extras_proxy_pass.rb => test_proxy_pass.rb} (93%)
--git a/extras/proxy_pass.rb b/extras/proxy_pass.rb
index 00adf18..af6fb7c 100644
--- a/extras/proxy_pass.rb
+++ b/extras/proxy_pass.rb
@@ -1,215 +1,9 @@
-# -*- encoding: binary -*-
-# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> and all contributors
-# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
-require 'time'
-require 'socket'
-require 'kgio'
-require 'kcar' # gem install kcar
-require 'rack/request'
-require 'thread'
-require 'timeout'
-
-# Totally synchronous and Rack 1.1-compatible, this will probably be rewritten.
-# to take advantage of rack.hijack and use the non-blocking I/O facilities
-# in yahns. yahns may have to grow a supported API for that...
-# For now, we this blocks a worker thread; fortunately threads are reasonably
-# cheap on GNU/Linux...
-# This is totally untested but currently doesn't serve anything important.
-class ProxyPass # :nodoc:
- class ConnPool
- def initialize
- @mtx = Mutex.new
- @objs = []
- end
-
- def get
- @mtx.synchronize { @objs.pop }
- end
-
- def put(obj)
- @mtx.synchronize { @objs << obj }
- end
- end
-
- class UpstreamSocket < Kgio::Socket # :nodoc:
- attr_writer :expiry
-
- # called automatically by kgio_read!
- def kgio_wait_readable(timeout = nil)
- super(timeout || wait_time)
- end
-
- def wait_time
- tout = @expiry ? @expiry - Time.now : @timeout
- raise Timeout::Error, "request timed out", [] if tout < 0
- tout
- end
-
- def readpartial(bytes, buf = Thread.current[:proxy_pass_buf] ||= "")
- case rv = kgio_read!(bytes, buf)
- when String
- @expiry += @timeout # bump expiry when we succeed
- end
- rv
- end
-
- def req_write(buf, timeout)
- @timeout = timeout
- @expiry = Time.now + timeout
- case rv = kgio_trywrite(buf)
- when :wait_writable
- kgio_wait_writable(wait_time)
- when nil
- return
- when String
- buf = rv
- end while true
- end
- end # class UpstreamSocket
-
- class UpstreamResponse < Kcar::Response # :nodoc:
- # Called by the Rack server at the end of a successful response
- def close
- reusable = @parser.keepalive? && @parser.body_eof?
- super
- @pool.put(self) if reusable
- nil
- end
-
- # req is just a string buffer of HTTP headers
- def req_write(req, timeout)
- @sock.req_write(req, timeout)
- end
-
- # returns true if the socket is still alive, nil if dead
- def sock_alive?
- @reused = (:wait_readable == (@sock.kgio_tryread(1) rescue nil)) ?
- true : @sock.close
- end
-
- # returns true if the socket was reused and thus retryable
- def fail_retryable?
- @sock.close
- @reused
- end
-
- def initialize(sock, pool)
- super(sock)
- @reused = false
- @pool = pool
- end
- end # class UpstreamResponse
-
- # take a responder from the pool, we'll add the object back to the
- # pool in UpstreamResponse#close
- def responder_get
- while obj = @pool.get
- return obj if obj.sock_alive?
- end
-
- UpstreamResponse.new(UpstreamSocket.start(@sockaddr), @pool)
- end
-
- def initialize(dest, timeout = 5)
- case dest
- when %r{\Aunix:([^:]+)(?::(/.*))?\z}
- path = $2
- @sockaddr = Socket.sockaddr_un($1)
- when %r{\Ahttp://([^/]+)(/.*)?\z}
- path = $2
- host, port = $1.split(':')
- @sockaddr = Socket.sockaddr_in(port || 80, host)
- else
- raise ArgumentError, "destination must be an HTTP URL or unix: path"
- end
- init_path_vars(path)
- @pool = ConnPool.new
- @timeout = timeout
- end
-
- def init_path_vars(path)
- path ||= '$fullpath'
- # methods from Rack::Request we want:
- allow = %w(fullpath host_with_port host port url path)
- want = path.scan(/\$(\w+)/).flatten! || []
- diff = want - allow
- diff.empty? or
- raise ArgumentError, "vars not allowed: #{diff.uniq.join(' ')}"
-
- # kill leading slash just in case...
- @path = path.gsub(%r{\A/(\$(?:fullpath|path))}, '\1')
- end
-
- def call(env)
- request_method = env['REQUEST_METHOD']
- req = Rack::Request.new(env)
- path = @path.gsub(/\$(\w+)/) { req.__send__($1) }
- req = "#{request_method} #{path} HTTP/1.1\r\n" \
- "X-Forwarded-For: #{env["REMOTE_ADDR"]}\r\n"
-
- # pass most HTTP_* headers through as-is
- chunked = false
- env.each do |key, val|
- %r{\AHTTP_(\w+)\z} =~ key or next
- key = $1
- next if %r{\A(?:VERSION|CONNECTION|KEEP_ALIVE|X_FORWARDED_FOR)} =~ key
- chunked = true if %r{\ATRANSFER_ENCODING} =~ key && val =~ /\bchunked\b/i
- key.tr!("_", "-")
- req << "#{key}: #{val}\r\n"
- end
-
- # special cases which Rack does not prefix:
- ctype = env["CONTENT_TYPE"] and req << "Content-Type: #{ctype}\r\n"
- clen = env["CONTENT_LENGTH"] and req << "Content-Length: #{clen}\r\n"
- req << "\r\n"
-
- # get an open socket and send the headers
- ures = responder_get
- ures.req_write(req, @timeout)
-
- # send the request body if there was one
- send_body(env["rack.input"], ures, chunked) if chunked || clen
-
- # wait for the response here
- _, header, body = res = ures.rack
-
- # don't let the upstream Connection and Keep-Alive headers leak through
- header.delete_if do |k,_|
- k =~ /\A(?:Connection|Keep-Alive)\z/i
- end
-
- case request_method
- when "HEAD"
- # kcar doesn't know if it's a HEAD or GET response, and HEAD
- # responses have Content-Length in it which fools kcar...
- body.parser.body_bytes_left = 0
- res[1] = header.dup
- body.close # clobbers original header
- res[2] = body = []
- end
- res
- rescue => e
- retry if ures && ures.fail_retryable? && request_method != "POST"
- if defined?(Yahns::Log)
- logger = env['rack.logger'] and
- Yahns::Log.exception(logger, 'proxy_pass', e)
- end
- [ 502, [ %w(Content-Length 0), %w(Content-Type text/plain) ], [] ]
- end
-
- def send_body(input, ures, chunked)
- buf = Thread.current[:proxy_pass_buf] ||= ""
-
- if chunked # unlikely
- while input.read(16384, buf)
- buf.replace("#{buf.size.to_s(16)}\r\n#{buf}\r\n")
- ures.req_write(buf, @timeout)
- end
- ures.req_write("0\r\n\r\n", @timeout)
- else # common if we hit uploads
- while input.read(16384, buf)
- ures.req_write(buf, @timeout)
- end
- end
- end
-end
+# compatibility class
+warn <<EOF
+ProxyPass is no longer a generic Rack app but relies on yahns internals
+via rack.hijack. Update require statements to point to 'yahns/proxy_pass'
+and change use the 'Yahns::ProxyPass' constant instead.
+EOF
+
+require 'yahns/proxy_pass'
+ProxyPass = Yahns::ProxyPass
diff --git a/lib/yahns/proxy_pass.rb b/lib/yahns/proxy_pass.rb
new file mode 100644
index 0000000..9c12e99
--- /dev/null
+++ b/lib/yahns/proxy_pass.rb
@@ -0,0 +1,210 @@
+# -*- encoding: binary -*-
+# Copyright (C) 2013-2015 all contributors <yahns-public@yhbt.net>
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+require 'socket'
+require 'kgio'
+require 'kcar' # gem install kcar
+require 'rack/request'
+require 'thread'
+require 'timeout'
+
+# Totally synchronous and Rack 1.1-compatible, this will probably be rewritten.
+# to take advantage of rack.hijack and use the non-blocking I/O facilities
+# in yahns. yahns may have to grow a supported API for that...
+# For now, we this blocks a worker thread; fortunately threads are reasonably
+# cheap on GNU/Linux...
+class Yahns::ProxyPass # :nodoc:
+ class ConnPool
+ def initialize
+ @mtx = Mutex.new
+ @objs = []
+ end
+
+ def get
+ @mtx.synchronize { @objs.pop }
+ end
+
+ def put(obj)
+ @mtx.synchronize { @objs << obj }
+ end
+ end
+
+ class UpstreamSocket < Kgio::Socket # :nodoc:
+ attr_writer :expiry
+
+ # called automatically by kgio_read!
+ def kgio_wait_readable(timeout = nil)
+ super(timeout || wait_time)
+ end
+
+ def wait_time
+ tout = @expiry ? @expiry - Yahns.now : @timeout
+ raise Timeout::Error, "request timed out", [] if tout < 0
+ tout
+ end
+
+ def readpartial(bytes, buf = Thread.current[:yahns_rbuf] ||= "")
+ case rv = kgio_read!(bytes, buf)
+ when String
+ @expiry += @timeout # bump expiry when we succeed
+ end
+ rv
+ end
+
+ def req_write(buf, timeout)
+ @timeout = timeout
+ @expiry = Yahns.now + timeout
+ case rv = kgio_trywrite(buf)
+ when :wait_writable
+ kgio_wait_writable(wait_time)
+ when nil
+ return
+ when String
+ buf = rv
+ end while true
+ end
+ end # class UpstreamSocket
+
+ class UpstreamResponse < Kcar::Response # :nodoc:
+ # Called by the Rack server at the end of a successful response
+ def close
+ reusable = @parser.keepalive? && @parser.body_eof?
+ super
+ @pool.put(self) if reusable
+ nil
+ end
+
+ # req is just a string buffer of HTTP headers
+ def req_write(req, timeout)
+ @sock.req_write(req, timeout)
+ end
+
+ # returns true if the socket is still alive, nil if dead
+ def sock_alive?
+ @reused = (:wait_readable == (@sock.kgio_tryread(1) rescue nil)) ?
+ true : @sock.close
+ end
+
+ # returns true if the socket was reused and thus retryable
+ def fail_retryable?
+ @sock.close
+ @reused
+ end
+
+ def initialize(sock, pool)
+ super(sock)
+ @reused = false
+ @pool = pool
+ end
+ end # class UpstreamResponse
+
+ # take a responder from the pool, we'll add the object back to the
+ # pool in UpstreamResponse#close
+ def responder_get
+ while obj = @pool.get
+ return obj if obj.sock_alive?
+ end
+
+ UpstreamResponse.new(UpstreamSocket.start(@sockaddr), @pool)
+ end
+
+ def initialize(dest, timeout = 5)
+ case dest
+ when %r{\Aunix:([^:]+)(?::(/.*))?\z}
+ path = $2
+ @sockaddr = Socket.sockaddr_un($1)
+ when %r{\Ahttp://([^/]+)(/.*)?\z}
+ path = $2
+ host, port = $1.split(':')
+ @sockaddr = Socket.sockaddr_in(port || 80, host)
+ else
+ raise ArgumentError, "destination must be an HTTP URL or unix: path"
+ end
+ init_path_vars(path)
+ @pool = ConnPool.new
+ @timeout = timeout
+ end
+
+ def init_path_vars(path)
+ path ||= '$fullpath'
+ # methods from Rack::Request we want:
+ allow = %w(fullpath host_with_port host port url path)
+ want = path.scan(/\$(\w+)/).flatten! || []
+ diff = want - allow
+ diff.empty? or
+ raise ArgumentError, "vars not allowed: #{diff.uniq.join(' ')}"
+
+ # kill leading slash just in case...
+ @path = path.gsub(%r{\A/(\$(?:fullpath|path))}, '\1')
+ end
+
+ def call(env)
+ request_method = env['REQUEST_METHOD']
+ req = Rack::Request.new(env)
+ path = @path.gsub(/\$(\w+)/) { req.__send__($1) }
+ req = "#{request_method} #{path} HTTP/1.1\r\n" \
+ "X-Forwarded-For: #{env["REMOTE_ADDR"]}\r\n"
+
+ # pass most HTTP_* headers through as-is
+ chunked = false
+ env.each do |key, val|
+ %r{\AHTTP_(\w+)\z} =~ key or next
+ key = $1
+ next if %r{\A(?:VERSION|CONNECTION|KEEP_ALIVE|X_FORWARDED_FOR)} =~ key
+ chunked = true if %r{\ATRANSFER_ENCODING} =~ key && val =~ /\bchunked\b/i
+ key.tr!("_", "-")
+ req << "#{key}: #{val}\r\n"
+ end
+
+ # special cases which Rack does not prefix:
+ ctype = env["CONTENT_TYPE"] and req << "Content-Type: #{ctype}\r\n"
+ clen = env["CONTENT_LENGTH"] and req << "Content-Length: #{clen}\r\n"
+ req << "\r\n"
+
+ # get an open socket and send the headers
+ ures = responder_get
+ ures.req_write(req, @timeout)
+
+ # send the request body if there was one
+ send_body(env["rack.input"], ures, chunked) if chunked || clen
+
+ # wait for the response here
+ _, header, body = res = ures.rack
+
+ # don't let the upstream Connection and Keep-Alive headers leak through
+ header.delete_if do |k,_|
+ k =~ /\A(?:Connection|Keep-Alive)\z/i
+ end
+
+ case request_method
+ when "HEAD"
+ # kcar doesn't know if it's a HEAD or GET response, and HEAD
+ # responses have Content-Length in it which fools kcar...
+ body.parser.body_bytes_left = 0
+ res[1] = header.dup
+ body.close # clobbers original header
+ res[2] = body = []
+ end
+ res
+ rescue => e
+ retry if ures && ures.fail_retryable? && request_method != "POST"
+ Yahns::Log.exception(env['rack.logger'], 'proxy_pass', e)
+ [ 502, [ %w(Content-Length 0), %w(Content-Type text/plain) ], [] ]
+ end
+
+ def send_body(input, ures, chunked)
+ buf = Thread.current[:yahns_rbuf] ||= ""
+
+ if chunked # unlikely
+ while input.read(16384, buf)
+ buf.replace("#{buf.size.to_s(16)}\r\n#{buf}\r\n")
+ ures.req_write(buf, @timeout)
+ end
+ ures.req_write("0\r\n\r\n", @timeout)
+ else # common if we hit uploads
+ while input.read(16384, buf)
+ ures.req_write(buf, @timeout)
+ end
+ end
+ end
+end
diff --git a/test/test_extras_proxy_pass.rb b/test/test_proxy_pass.rb
similarity index 93%
rename from test/test_extras_proxy_pass.rb
rename to test/test_proxy_pass.rb
index 5bbce73..530b53c 100644
--- a/test/test_extras_proxy_pass.rb
+++ b/test/test_proxy_pass.rb
@@ -45,12 +45,12 @@ class TestExtrasProxyPass < Testcase
@srv.autoclose = @srv2.autoclose = false
ENV["YAHNS_FD"] = "#{@srv.fileno},#{@srv2.fileno}"
$LOAD_PATH.unshift "#{Dir.pwd}/extras"
- require 'proxy_pass'
+ require 'yahns/proxy_pass'
cfg.instance_eval do
- app(:rack, ProxyPass.new("unix:#{unix_path}:/$fullpath")) do
+ app(:rack, Yahns::ProxyPass.new("unix:#{unix_path}:/$fullpath")) do
listen "#{host}:#{port}"
end
- app(:rack, ProxyPass.new("unix:#{unix_path}:/foo$fullpath")) do
+ app(:rack, Yahns::ProxyPass.new("unix:#{unix_path}:/foo$fullpath")) do
listen "#{host2}:#{port2}"
end
stderr_path err.path
@@ -108,10 +108,10 @@ class TestExtrasProxyPass < Testcase
host2, port2 = @srv2.addr[3], @srv2.addr[1]
pid = mkserver(cfg) do
$LOAD_PATH.unshift "#{Dir.pwd}/extras"
- require 'proxy_pass'
+ require 'yahns/proxy_pass'
@srv2.close
cfg.instance_eval do
- app(:rack, ProxyPass.new("http://#{host2}:#{port2}/")) do
+ app(:rack, Yahns::ProxyPass.new("http://#{host2}:#{port2}/")) do
listen "#{host}:#{port}"
end
stderr_path err.path
--
EW
^ permalink raw reply related [relevance 7%]
Results 1-1 of 1 | reverse | options above
-- pct% links below jump to the message on this page, permalinks otherwise --
2015-03-14 3:17 [PATCH 0/3] towards becoming a fully-buffering reverse proxy Eric Wong
2015-03-14 3:17 7% ` [PATCH 3/3] proxy_pass: officially become a part of yahns Eric Wong
Code repositories for project(s) associated with this public inbox
https://yhbt.net/yahns.git/
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).