* [PATCH 2/3] extras/proxy_pass: support Unix domain sockets as backends
2015-03-14 3:17 [PATCH 0/3] towards becoming a fully-buffering reverse proxy Eric Wong
2015-03-14 3:17 ` [PATCH 1/3] extras/proxy_pass: implicit $fullpath expansion for upstreams Eric Wong
@ 2015-03-14 3:17 ` Eric Wong
2015-03-14 3:17 ` [PATCH 3/3] proxy_pass: officially become a part of yahns Eric Wong
2 siblings, 0 replies; 4+ messages in thread
From: Eric Wong @ 2015-03-14 3:17 UTC (permalink / raw)
To: yahns-public
Of course, some users will prefer to bind HTTP application
servers to Unix domain sockets for better isolation and (maybe)
better performance.
---
| 7 +++--
| 69 ++++++++++++++++++++++++++++++++++++++++++
2 files changed, 74 insertions(+), 2 deletions(-)
--git a/extras/proxy_pass.rb b/extras/proxy_pass.rb
index d435ebe..00adf18 100644
--- a/extras/proxy_pass.rb
+++ b/extras/proxy_pass.rb
@@ -112,12 +112,15 @@ class ProxyPass # :nodoc:
def initialize(dest, timeout = 5)
case dest
+ when %r{\Aunix:([^:]+)(?::(/.*))?\z}
+ path = $2
+ @sockaddr = Socket.sockaddr_un($1)
when %r{\Ahttp://([^/]+)(/.*)?\z}
path = $2
host, port = $1.split(':')
@sockaddr = Socket.sockaddr_in(port || 80, host)
else
- raise ArgumentError, "destination must be an HTTP URL"
+ raise ArgumentError, "destination must be an HTTP URL or unix: path"
end
init_path_vars(path)
@pool = ConnPool.new
@@ -125,7 +128,7 @@ class ProxyPass # :nodoc:
end
def init_path_vars(path)
- path ||= '$(fullpath)'
+ path ||= '$fullpath'
# methods from Rack::Request we want:
allow = %w(fullpath host_with_port host port url path)
want = path.scan(/\$(\w+)/).flatten! || []
--git a/test/test_extras_proxy_pass.rb b/test/test_extras_proxy_pass.rb
index 8842683..5bbce73 100644
--- a/test/test_extras_proxy_pass.rb
+++ b/test/test_extras_proxy_pass.rb
@@ -1,6 +1,7 @@
# Copyright (C) 2015 all contributors <yahns-public@yhbt.net>
# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
require_relative 'server_helper'
+require 'json'
class TestExtrasProxyPass < Testcase
ENV["N"].to_i > 1 and parallelize_me!
@@ -34,6 +35,74 @@ class TestExtrasProxyPass < Testcase
server_helper_teardown
end
+ def test_unix_socket_no_path
+ tmpdir = Dir.mktmpdir
+ unix_path = "#{tmpdir}/proxy_pass.sock"
+ unix_srv = UNIXServer.new(unix_path)
+ err, cfg, host, port = @err, Yahns::Config.new, @srv.addr[3], @srv.addr[1]
+ host2, port2 = @srv2.addr[3], @srv2.addr[1]
+ pid = mkserver(cfg) do
+ @srv.autoclose = @srv2.autoclose = false
+ ENV["YAHNS_FD"] = "#{@srv.fileno},#{@srv2.fileno}"
+ $LOAD_PATH.unshift "#{Dir.pwd}/extras"
+ require 'proxy_pass'
+ cfg.instance_eval do
+ app(:rack, ProxyPass.new("unix:#{unix_path}:/$fullpath")) do
+ listen "#{host}:#{port}"
+ end
+ app(:rack, ProxyPass.new("unix:#{unix_path}:/foo$fullpath")) do
+ listen "#{host2}:#{port2}"
+ end
+ stderr_path err.path
+ end
+ end
+
+ pid2 = mkserver(cfg, unix_srv) do
+ @srv.close
+ @srv2.close
+ cfg.instance_eval do
+ rapp = lambda do |env|
+ body = env.to_json
+ hdr = {
+ 'Content-Length' => body.bytesize.to_s,
+ 'Content-Type' => 'application/json',
+ }
+ [ 200, hdr, [ body ] ]
+ end
+ app(:rack, rapp) { listen unix_path }
+ stderr_path err.path
+ end
+ end
+ Net::HTTP.start(host, port) do |http|
+ res = http.request(Net::HTTP::Get.new('/f00'))
+ assert_equal 200, res.code.to_i
+ body = JSON.parse(res.body)
+ assert_equal '/f00', body['PATH_INFO']
+
+ res = http.request(Net::HTTP::Get.new('/f00foo'))
+ assert_equal 200, res.code.to_i
+ body = JSON.parse(res.body)
+ assert_equal '/f00foo', body['PATH_INFO']
+ end
+
+ Net::HTTP.start(host2, port2) do |http|
+ res = http.request(Net::HTTP::Get.new('/Foo'))
+ assert_equal 200, res.code.to_i
+ body = JSON.parse(res.body)
+ assert_equal '/foo/Foo', body['PATH_INFO']
+
+ res = http.request(Net::HTTP::Get.new('/Foofoo'))
+ assert_equal 200, res.code.to_i
+ body = JSON.parse(res.body)
+ assert_equal '/foo/Foofoo', body['PATH_INFO']
+ end
+ ensure
+ quit_wait(pid)
+ quit_wait(pid2)
+ unix_srv.close if unix_srv
+ FileUtils.rm_rf(tmpdir) if tmpdir
+ end
+
def test_proxy_pass
err, cfg, host, port = @err, Yahns::Config.new, @srv.addr[3], @srv.addr[1]
host2, port2 = @srv2.addr[3], @srv2.addr[1]
--
EW
^ permalink raw reply related [flat|nested] 4+ messages in thread
* [PATCH 3/3] proxy_pass: officially become a part of yahns
2015-03-14 3:17 [PATCH 0/3] towards becoming a fully-buffering reverse proxy Eric Wong
2015-03-14 3:17 ` [PATCH 1/3] extras/proxy_pass: implicit $fullpath expansion for upstreams Eric Wong
2015-03-14 3:17 ` [PATCH 2/3] extras/proxy_pass: support Unix domain sockets as backends Eric Wong
@ 2015-03-14 3:17 ` Eric Wong
2 siblings, 0 replies; 4+ messages in thread
From: Eric Wong @ 2015-03-14 3:17 UTC (permalink / raw)
To: yahns-public
This will rely on rack.hijack in the future to support
asynchronous execution without tying up a thread when waiting
for upstreams. For now, this allows simpler code with fewer
checks and the use of monotonic time on newer versions of Ruby.
---
| 224 +--------------------
lib/yahns/proxy_pass.rb | 210 +++++++++++++++++++
...est_extras_proxy_pass.rb => test_proxy_pass.rb} | 10 +-
3 files changed, 224 insertions(+), 220 deletions(-)
create mode 100644 lib/yahns/proxy_pass.rb
rename test/{test_extras_proxy_pass.rb => test_proxy_pass.rb} (93%)
--git a/extras/proxy_pass.rb b/extras/proxy_pass.rb
index 00adf18..af6fb7c 100644
--- a/extras/proxy_pass.rb
+++ b/extras/proxy_pass.rb
@@ -1,215 +1,9 @@
-# -*- encoding: binary -*-
-# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> and all contributors
-# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
-require 'time'
-require 'socket'
-require 'kgio'
-require 'kcar' # gem install kcar
-require 'rack/request'
-require 'thread'
-require 'timeout'
-
-# Totally synchronous and Rack 1.1-compatible, this will probably be rewritten.
-# to take advantage of rack.hijack and use the non-blocking I/O facilities
-# in yahns. yahns may have to grow a supported API for that...
-# For now, we this blocks a worker thread; fortunately threads are reasonably
-# cheap on GNU/Linux...
-# This is totally untested but currently doesn't serve anything important.
-class ProxyPass # :nodoc:
- class ConnPool
- def initialize
- @mtx = Mutex.new
- @objs = []
- end
-
- def get
- @mtx.synchronize { @objs.pop }
- end
-
- def put(obj)
- @mtx.synchronize { @objs << obj }
- end
- end
-
- class UpstreamSocket < Kgio::Socket # :nodoc:
- attr_writer :expiry
-
- # called automatically by kgio_read!
- def kgio_wait_readable(timeout = nil)
- super(timeout || wait_time)
- end
-
- def wait_time
- tout = @expiry ? @expiry - Time.now : @timeout
- raise Timeout::Error, "request timed out", [] if tout < 0
- tout
- end
-
- def readpartial(bytes, buf = Thread.current[:proxy_pass_buf] ||= "")
- case rv = kgio_read!(bytes, buf)
- when String
- @expiry += @timeout # bump expiry when we succeed
- end
- rv
- end
-
- def req_write(buf, timeout)
- @timeout = timeout
- @expiry = Time.now + timeout
- case rv = kgio_trywrite(buf)
- when :wait_writable
- kgio_wait_writable(wait_time)
- when nil
- return
- when String
- buf = rv
- end while true
- end
- end # class UpstreamSocket
-
- class UpstreamResponse < Kcar::Response # :nodoc:
- # Called by the Rack server at the end of a successful response
- def close
- reusable = @parser.keepalive? && @parser.body_eof?
- super
- @pool.put(self) if reusable
- nil
- end
-
- # req is just a string buffer of HTTP headers
- def req_write(req, timeout)
- @sock.req_write(req, timeout)
- end
-
- # returns true if the socket is still alive, nil if dead
- def sock_alive?
- @reused = (:wait_readable == (@sock.kgio_tryread(1) rescue nil)) ?
- true : @sock.close
- end
-
- # returns true if the socket was reused and thus retryable
- def fail_retryable?
- @sock.close
- @reused
- end
-
- def initialize(sock, pool)
- super(sock)
- @reused = false
- @pool = pool
- end
- end # class UpstreamResponse
-
- # take a responder from the pool, we'll add the object back to the
- # pool in UpstreamResponse#close
- def responder_get
- while obj = @pool.get
- return obj if obj.sock_alive?
- end
-
- UpstreamResponse.new(UpstreamSocket.start(@sockaddr), @pool)
- end
-
- def initialize(dest, timeout = 5)
- case dest
- when %r{\Aunix:([^:]+)(?::(/.*))?\z}
- path = $2
- @sockaddr = Socket.sockaddr_un($1)
- when %r{\Ahttp://([^/]+)(/.*)?\z}
- path = $2
- host, port = $1.split(':')
- @sockaddr = Socket.sockaddr_in(port || 80, host)
- else
- raise ArgumentError, "destination must be an HTTP URL or unix: path"
- end
- init_path_vars(path)
- @pool = ConnPool.new
- @timeout = timeout
- end
-
- def init_path_vars(path)
- path ||= '$fullpath'
- # methods from Rack::Request we want:
- allow = %w(fullpath host_with_port host port url path)
- want = path.scan(/\$(\w+)/).flatten! || []
- diff = want - allow
- diff.empty? or
- raise ArgumentError, "vars not allowed: #{diff.uniq.join(' ')}"
-
- # kill leading slash just in case...
- @path = path.gsub(%r{\A/(\$(?:fullpath|path))}, '\1')
- end
-
- def call(env)
- request_method = env['REQUEST_METHOD']
- req = Rack::Request.new(env)
- path = @path.gsub(/\$(\w+)/) { req.__send__($1) }
- req = "#{request_method} #{path} HTTP/1.1\r\n" \
- "X-Forwarded-For: #{env["REMOTE_ADDR"]}\r\n"
-
- # pass most HTTP_* headers through as-is
- chunked = false
- env.each do |key, val|
- %r{\AHTTP_(\w+)\z} =~ key or next
- key = $1
- next if %r{\A(?:VERSION|CONNECTION|KEEP_ALIVE|X_FORWARDED_FOR)} =~ key
- chunked = true if %r{\ATRANSFER_ENCODING} =~ key && val =~ /\bchunked\b/i
- key.tr!("_", "-")
- req << "#{key}: #{val}\r\n"
- end
-
- # special cases which Rack does not prefix:
- ctype = env["CONTENT_TYPE"] and req << "Content-Type: #{ctype}\r\n"
- clen = env["CONTENT_LENGTH"] and req << "Content-Length: #{clen}\r\n"
- req << "\r\n"
-
- # get an open socket and send the headers
- ures = responder_get
- ures.req_write(req, @timeout)
-
- # send the request body if there was one
- send_body(env["rack.input"], ures, chunked) if chunked || clen
-
- # wait for the response here
- _, header, body = res = ures.rack
-
- # don't let the upstream Connection and Keep-Alive headers leak through
- header.delete_if do |k,_|
- k =~ /\A(?:Connection|Keep-Alive)\z/i
- end
-
- case request_method
- when "HEAD"
- # kcar doesn't know if it's a HEAD or GET response, and HEAD
- # responses have Content-Length in it which fools kcar...
- body.parser.body_bytes_left = 0
- res[1] = header.dup
- body.close # clobbers original header
- res[2] = body = []
- end
- res
- rescue => e
- retry if ures && ures.fail_retryable? && request_method != "POST"
- if defined?(Yahns::Log)
- logger = env['rack.logger'] and
- Yahns::Log.exception(logger, 'proxy_pass', e)
- end
- [ 502, [ %w(Content-Length 0), %w(Content-Type text/plain) ], [] ]
- end
-
- def send_body(input, ures, chunked)
- buf = Thread.current[:proxy_pass_buf] ||= ""
-
- if chunked # unlikely
- while input.read(16384, buf)
- buf.replace("#{buf.size.to_s(16)}\r\n#{buf}\r\n")
- ures.req_write(buf, @timeout)
- end
- ures.req_write("0\r\n\r\n", @timeout)
- else # common if we hit uploads
- while input.read(16384, buf)
- ures.req_write(buf, @timeout)
- end
- end
- end
-end
+# compatibility class
+warn <<EOF
+ProxyPass is no longer a generic Rack app but relies on yahns internals
+via rack.hijack. Update require statements to point to 'yahns/proxy_pass'
+and change use the 'Yahns::ProxyPass' constant instead.
+EOF
+
+require 'yahns/proxy_pass'
+ProxyPass = Yahns::ProxyPass
diff --git a/lib/yahns/proxy_pass.rb b/lib/yahns/proxy_pass.rb
new file mode 100644
index 0000000..9c12e99
--- /dev/null
+++ b/lib/yahns/proxy_pass.rb
@@ -0,0 +1,210 @@
+# -*- encoding: binary -*-
+# Copyright (C) 2013-2015 all contributors <yahns-public@yhbt.net>
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+require 'socket'
+require 'kgio'
+require 'kcar' # gem install kcar
+require 'rack/request'
+require 'thread'
+require 'timeout'
+
+# Totally synchronous and Rack 1.1-compatible, this will probably be rewritten.
+# to take advantage of rack.hijack and use the non-blocking I/O facilities
+# in yahns. yahns may have to grow a supported API for that...
+# For now, we this blocks a worker thread; fortunately threads are reasonably
+# cheap on GNU/Linux...
+class Yahns::ProxyPass # :nodoc:
+ class ConnPool
+ def initialize
+ @mtx = Mutex.new
+ @objs = []
+ end
+
+ def get
+ @mtx.synchronize { @objs.pop }
+ end
+
+ def put(obj)
+ @mtx.synchronize { @objs << obj }
+ end
+ end
+
+ class UpstreamSocket < Kgio::Socket # :nodoc:
+ attr_writer :expiry
+
+ # called automatically by kgio_read!
+ def kgio_wait_readable(timeout = nil)
+ super(timeout || wait_time)
+ end
+
+ def wait_time
+ tout = @expiry ? @expiry - Yahns.now : @timeout
+ raise Timeout::Error, "request timed out", [] if tout < 0
+ tout
+ end
+
+ def readpartial(bytes, buf = Thread.current[:yahns_rbuf] ||= "")
+ case rv = kgio_read!(bytes, buf)
+ when String
+ @expiry += @timeout # bump expiry when we succeed
+ end
+ rv
+ end
+
+ def req_write(buf, timeout)
+ @timeout = timeout
+ @expiry = Yahns.now + timeout
+ case rv = kgio_trywrite(buf)
+ when :wait_writable
+ kgio_wait_writable(wait_time)
+ when nil
+ return
+ when String
+ buf = rv
+ end while true
+ end
+ end # class UpstreamSocket
+
+ class UpstreamResponse < Kcar::Response # :nodoc:
+ # Called by the Rack server at the end of a successful response
+ def close
+ reusable = @parser.keepalive? && @parser.body_eof?
+ super
+ @pool.put(self) if reusable
+ nil
+ end
+
+ # req is just a string buffer of HTTP headers
+ def req_write(req, timeout)
+ @sock.req_write(req, timeout)
+ end
+
+ # returns true if the socket is still alive, nil if dead
+ def sock_alive?
+ @reused = (:wait_readable == (@sock.kgio_tryread(1) rescue nil)) ?
+ true : @sock.close
+ end
+
+ # returns true if the socket was reused and thus retryable
+ def fail_retryable?
+ @sock.close
+ @reused
+ end
+
+ def initialize(sock, pool)
+ super(sock)
+ @reused = false
+ @pool = pool
+ end
+ end # class UpstreamResponse
+
+ # take a responder from the pool, we'll add the object back to the
+ # pool in UpstreamResponse#close
+ def responder_get
+ while obj = @pool.get
+ return obj if obj.sock_alive?
+ end
+
+ UpstreamResponse.new(UpstreamSocket.start(@sockaddr), @pool)
+ end
+
+ def initialize(dest, timeout = 5)
+ case dest
+ when %r{\Aunix:([^:]+)(?::(/.*))?\z}
+ path = $2
+ @sockaddr = Socket.sockaddr_un($1)
+ when %r{\Ahttp://([^/]+)(/.*)?\z}
+ path = $2
+ host, port = $1.split(':')
+ @sockaddr = Socket.sockaddr_in(port || 80, host)
+ else
+ raise ArgumentError, "destination must be an HTTP URL or unix: path"
+ end
+ init_path_vars(path)
+ @pool = ConnPool.new
+ @timeout = timeout
+ end
+
+ def init_path_vars(path)
+ path ||= '$fullpath'
+ # methods from Rack::Request we want:
+ allow = %w(fullpath host_with_port host port url path)
+ want = path.scan(/\$(\w+)/).flatten! || []
+ diff = want - allow
+ diff.empty? or
+ raise ArgumentError, "vars not allowed: #{diff.uniq.join(' ')}"
+
+ # kill leading slash just in case...
+ @path = path.gsub(%r{\A/(\$(?:fullpath|path))}, '\1')
+ end
+
+ def call(env)
+ request_method = env['REQUEST_METHOD']
+ req = Rack::Request.new(env)
+ path = @path.gsub(/\$(\w+)/) { req.__send__($1) }
+ req = "#{request_method} #{path} HTTP/1.1\r\n" \
+ "X-Forwarded-For: #{env["REMOTE_ADDR"]}\r\n"
+
+ # pass most HTTP_* headers through as-is
+ chunked = false
+ env.each do |key, val|
+ %r{\AHTTP_(\w+)\z} =~ key or next
+ key = $1
+ next if %r{\A(?:VERSION|CONNECTION|KEEP_ALIVE|X_FORWARDED_FOR)} =~ key
+ chunked = true if %r{\ATRANSFER_ENCODING} =~ key && val =~ /\bchunked\b/i
+ key.tr!("_", "-")
+ req << "#{key}: #{val}\r\n"
+ end
+
+ # special cases which Rack does not prefix:
+ ctype = env["CONTENT_TYPE"] and req << "Content-Type: #{ctype}\r\n"
+ clen = env["CONTENT_LENGTH"] and req << "Content-Length: #{clen}\r\n"
+ req << "\r\n"
+
+ # get an open socket and send the headers
+ ures = responder_get
+ ures.req_write(req, @timeout)
+
+ # send the request body if there was one
+ send_body(env["rack.input"], ures, chunked) if chunked || clen
+
+ # wait for the response here
+ _, header, body = res = ures.rack
+
+ # don't let the upstream Connection and Keep-Alive headers leak through
+ header.delete_if do |k,_|
+ k =~ /\A(?:Connection|Keep-Alive)\z/i
+ end
+
+ case request_method
+ when "HEAD"
+ # kcar doesn't know if it's a HEAD or GET response, and HEAD
+ # responses have Content-Length in it which fools kcar...
+ body.parser.body_bytes_left = 0
+ res[1] = header.dup
+ body.close # clobbers original header
+ res[2] = body = []
+ end
+ res
+ rescue => e
+ retry if ures && ures.fail_retryable? && request_method != "POST"
+ Yahns::Log.exception(env['rack.logger'], 'proxy_pass', e)
+ [ 502, [ %w(Content-Length 0), %w(Content-Type text/plain) ], [] ]
+ end
+
+ def send_body(input, ures, chunked)
+ buf = Thread.current[:yahns_rbuf] ||= ""
+
+ if chunked # unlikely
+ while input.read(16384, buf)
+ buf.replace("#{buf.size.to_s(16)}\r\n#{buf}\r\n")
+ ures.req_write(buf, @timeout)
+ end
+ ures.req_write("0\r\n\r\n", @timeout)
+ else # common if we hit uploads
+ while input.read(16384, buf)
+ ures.req_write(buf, @timeout)
+ end
+ end
+ end
+end
diff --git a/test/test_extras_proxy_pass.rb b/test/test_proxy_pass.rb
similarity index 93%
rename from test/test_extras_proxy_pass.rb
rename to test/test_proxy_pass.rb
index 5bbce73..530b53c 100644
--- a/test/test_extras_proxy_pass.rb
+++ b/test/test_proxy_pass.rb
@@ -45,12 +45,12 @@ class TestExtrasProxyPass < Testcase
@srv.autoclose = @srv2.autoclose = false
ENV["YAHNS_FD"] = "#{@srv.fileno},#{@srv2.fileno}"
$LOAD_PATH.unshift "#{Dir.pwd}/extras"
- require 'proxy_pass'
+ require 'yahns/proxy_pass'
cfg.instance_eval do
- app(:rack, ProxyPass.new("unix:#{unix_path}:/$fullpath")) do
+ app(:rack, Yahns::ProxyPass.new("unix:#{unix_path}:/$fullpath")) do
listen "#{host}:#{port}"
end
- app(:rack, ProxyPass.new("unix:#{unix_path}:/foo$fullpath")) do
+ app(:rack, Yahns::ProxyPass.new("unix:#{unix_path}:/foo$fullpath")) do
listen "#{host2}:#{port2}"
end
stderr_path err.path
@@ -108,10 +108,10 @@ class TestExtrasProxyPass < Testcase
host2, port2 = @srv2.addr[3], @srv2.addr[1]
pid = mkserver(cfg) do
$LOAD_PATH.unshift "#{Dir.pwd}/extras"
- require 'proxy_pass'
+ require 'yahns/proxy_pass'
@srv2.close
cfg.instance_eval do
- app(:rack, ProxyPass.new("http://#{host2}:#{port2}/")) do
+ app(:rack, Yahns::ProxyPass.new("http://#{host2}:#{port2}/")) do
listen "#{host}:#{port}"
end
stderr_path err.path
--
EW
^ permalink raw reply related [flat|nested] 4+ messages in thread