From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.3.2 (2011-06-06) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-2.9 required=3.0 tests=ALL_TRUSTED,AWL,BAYES_00, T_RP_MATCHES_RCVD,URIBL_BLOCKED shortcircuit=no autolearn=unavailable version=3.3.2 X-Original-To: yahns-public@yhbt.net Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id D86AD1F738 for ; Sat, 14 Mar 2015 03:18:02 +0000 (UTC) From: Eric Wong To: yahns-public@yhbt.net Subject: [PATCH 3/3] proxy_pass: officially become a part of yahns Date: Sat, 14 Mar 2015 03:17:58 +0000 Message-Id: <1426303078-4525-4-git-send-email-e@80x24.org> In-Reply-To: <1426303078-4525-1-git-send-email-e@80x24.org> References: <1426303078-4525-1-git-send-email-e@80x24.org> List-Id: This will rely on rack.hijack in the future to support asynchronous execution without tying up a thread when waiting for upstreams. For now, this allows simpler code with fewer checks and the use of monotonic time on newer versions of Ruby. --- extras/proxy_pass.rb | 224 +-------------------- lib/yahns/proxy_pass.rb | 210 +++++++++++++++++++ ...est_extras_proxy_pass.rb => test_proxy_pass.rb} | 10 +- 3 files changed, 224 insertions(+), 220 deletions(-) create mode 100644 lib/yahns/proxy_pass.rb rename test/{test_extras_proxy_pass.rb => test_proxy_pass.rb} (93%) diff --git a/extras/proxy_pass.rb b/extras/proxy_pass.rb index 00adf18..af6fb7c 100644 --- a/extras/proxy_pass.rb +++ b/extras/proxy_pass.rb @@ -1,215 +1,9 @@ -# -*- encoding: binary -*- -# Copyright (C) 2013, Eric Wong and all contributors -# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) -require 'time' -require 'socket' -require 'kgio' -require 'kcar' # gem install kcar -require 'rack/request' -require 'thread' -require 'timeout' - -# Totally synchronous and Rack 1.1-compatible, this will probably be rewritten. -# to take advantage of rack.hijack and use the non-blocking I/O facilities -# in yahns. yahns may have to grow a supported API for that... -# For now, we this blocks a worker thread; fortunately threads are reasonably -# cheap on GNU/Linux... -# This is totally untested but currently doesn't serve anything important. -class ProxyPass # :nodoc: - class ConnPool - def initialize - @mtx = Mutex.new - @objs = [] - end - - def get - @mtx.synchronize { @objs.pop } - end - - def put(obj) - @mtx.synchronize { @objs << obj } - end - end - - class UpstreamSocket < Kgio::Socket # :nodoc: - attr_writer :expiry - - # called automatically by kgio_read! - def kgio_wait_readable(timeout = nil) - super(timeout || wait_time) - end - - def wait_time - tout = @expiry ? @expiry - Time.now : @timeout - raise Timeout::Error, "request timed out", [] if tout < 0 - tout - end - - def readpartial(bytes, buf = Thread.current[:proxy_pass_buf] ||= "") - case rv = kgio_read!(bytes, buf) - when String - @expiry += @timeout # bump expiry when we succeed - end - rv - end - - def req_write(buf, timeout) - @timeout = timeout - @expiry = Time.now + timeout - case rv = kgio_trywrite(buf) - when :wait_writable - kgio_wait_writable(wait_time) - when nil - return - when String - buf = rv - end while true - end - end # class UpstreamSocket - - class UpstreamResponse < Kcar::Response # :nodoc: - # Called by the Rack server at the end of a successful response - def close - reusable = @parser.keepalive? && @parser.body_eof? - super - @pool.put(self) if reusable - nil - end - - # req is just a string buffer of HTTP headers - def req_write(req, timeout) - @sock.req_write(req, timeout) - end - - # returns true if the socket is still alive, nil if dead - def sock_alive? - @reused = (:wait_readable == (@sock.kgio_tryread(1) rescue nil)) ? - true : @sock.close - end - - # returns true if the socket was reused and thus retryable - def fail_retryable? - @sock.close - @reused - end - - def initialize(sock, pool) - super(sock) - @reused = false - @pool = pool - end - end # class UpstreamResponse - - # take a responder from the pool, we'll add the object back to the - # pool in UpstreamResponse#close - def responder_get - while obj = @pool.get - return obj if obj.sock_alive? - end - - UpstreamResponse.new(UpstreamSocket.start(@sockaddr), @pool) - end - - def initialize(dest, timeout = 5) - case dest - when %r{\Aunix:([^:]+)(?::(/.*))?\z} - path = $2 - @sockaddr = Socket.sockaddr_un($1) - when %r{\Ahttp://([^/]+)(/.*)?\z} - path = $2 - host, port = $1.split(':') - @sockaddr = Socket.sockaddr_in(port || 80, host) - else - raise ArgumentError, "destination must be an HTTP URL or unix: path" - end - init_path_vars(path) - @pool = ConnPool.new - @timeout = timeout - end - - def init_path_vars(path) - path ||= '$fullpath' - # methods from Rack::Request we want: - allow = %w(fullpath host_with_port host port url path) - want = path.scan(/\$(\w+)/).flatten! || [] - diff = want - allow - diff.empty? or - raise ArgumentError, "vars not allowed: #{diff.uniq.join(' ')}" - - # kill leading slash just in case... - @path = path.gsub(%r{\A/(\$(?:fullpath|path))}, '\1') - end - - def call(env) - request_method = env['REQUEST_METHOD'] - req = Rack::Request.new(env) - path = @path.gsub(/\$(\w+)/) { req.__send__($1) } - req = "#{request_method} #{path} HTTP/1.1\r\n" \ - "X-Forwarded-For: #{env["REMOTE_ADDR"]}\r\n" - - # pass most HTTP_* headers through as-is - chunked = false - env.each do |key, val| - %r{\AHTTP_(\w+)\z} =~ key or next - key = $1 - next if %r{\A(?:VERSION|CONNECTION|KEEP_ALIVE|X_FORWARDED_FOR)} =~ key - chunked = true if %r{\ATRANSFER_ENCODING} =~ key && val =~ /\bchunked\b/i - key.tr!("_", "-") - req << "#{key}: #{val}\r\n" - end - - # special cases which Rack does not prefix: - ctype = env["CONTENT_TYPE"] and req << "Content-Type: #{ctype}\r\n" - clen = env["CONTENT_LENGTH"] and req << "Content-Length: #{clen}\r\n" - req << "\r\n" - - # get an open socket and send the headers - ures = responder_get - ures.req_write(req, @timeout) - - # send the request body if there was one - send_body(env["rack.input"], ures, chunked) if chunked || clen - - # wait for the response here - _, header, body = res = ures.rack - - # don't let the upstream Connection and Keep-Alive headers leak through - header.delete_if do |k,_| - k =~ /\A(?:Connection|Keep-Alive)\z/i - end - - case request_method - when "HEAD" - # kcar doesn't know if it's a HEAD or GET response, and HEAD - # responses have Content-Length in it which fools kcar... - body.parser.body_bytes_left = 0 - res[1] = header.dup - body.close # clobbers original header - res[2] = body = [] - end - res - rescue => e - retry if ures && ures.fail_retryable? && request_method != "POST" - if defined?(Yahns::Log) - logger = env['rack.logger'] and - Yahns::Log.exception(logger, 'proxy_pass', e) - end - [ 502, [ %w(Content-Length 0), %w(Content-Type text/plain) ], [] ] - end - - def send_body(input, ures, chunked) - buf = Thread.current[:proxy_pass_buf] ||= "" - - if chunked # unlikely - while input.read(16384, buf) - buf.replace("#{buf.size.to_s(16)}\r\n#{buf}\r\n") - ures.req_write(buf, @timeout) - end - ures.req_write("0\r\n\r\n", @timeout) - else # common if we hit uploads - while input.read(16384, buf) - ures.req_write(buf, @timeout) - end - end - end -end +# compatibility class +warn < +# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) +require 'socket' +require 'kgio' +require 'kcar' # gem install kcar +require 'rack/request' +require 'thread' +require 'timeout' + +# Totally synchronous and Rack 1.1-compatible, this will probably be rewritten. +# to take advantage of rack.hijack and use the non-blocking I/O facilities +# in yahns. yahns may have to grow a supported API for that... +# For now, we this blocks a worker thread; fortunately threads are reasonably +# cheap on GNU/Linux... +class Yahns::ProxyPass # :nodoc: + class ConnPool + def initialize + @mtx = Mutex.new + @objs = [] + end + + def get + @mtx.synchronize { @objs.pop } + end + + def put(obj) + @mtx.synchronize { @objs << obj } + end + end + + class UpstreamSocket < Kgio::Socket # :nodoc: + attr_writer :expiry + + # called automatically by kgio_read! + def kgio_wait_readable(timeout = nil) + super(timeout || wait_time) + end + + def wait_time + tout = @expiry ? @expiry - Yahns.now : @timeout + raise Timeout::Error, "request timed out", [] if tout < 0 + tout + end + + def readpartial(bytes, buf = Thread.current[:yahns_rbuf] ||= "") + case rv = kgio_read!(bytes, buf) + when String + @expiry += @timeout # bump expiry when we succeed + end + rv + end + + def req_write(buf, timeout) + @timeout = timeout + @expiry = Yahns.now + timeout + case rv = kgio_trywrite(buf) + when :wait_writable + kgio_wait_writable(wait_time) + when nil + return + when String + buf = rv + end while true + end + end # class UpstreamSocket + + class UpstreamResponse < Kcar::Response # :nodoc: + # Called by the Rack server at the end of a successful response + def close + reusable = @parser.keepalive? && @parser.body_eof? + super + @pool.put(self) if reusable + nil + end + + # req is just a string buffer of HTTP headers + def req_write(req, timeout) + @sock.req_write(req, timeout) + end + + # returns true if the socket is still alive, nil if dead + def sock_alive? + @reused = (:wait_readable == (@sock.kgio_tryread(1) rescue nil)) ? + true : @sock.close + end + + # returns true if the socket was reused and thus retryable + def fail_retryable? + @sock.close + @reused + end + + def initialize(sock, pool) + super(sock) + @reused = false + @pool = pool + end + end # class UpstreamResponse + + # take a responder from the pool, we'll add the object back to the + # pool in UpstreamResponse#close + def responder_get + while obj = @pool.get + return obj if obj.sock_alive? + end + + UpstreamResponse.new(UpstreamSocket.start(@sockaddr), @pool) + end + + def initialize(dest, timeout = 5) + case dest + when %r{\Aunix:([^:]+)(?::(/.*))?\z} + path = $2 + @sockaddr = Socket.sockaddr_un($1) + when %r{\Ahttp://([^/]+)(/.*)?\z} + path = $2 + host, port = $1.split(':') + @sockaddr = Socket.sockaddr_in(port || 80, host) + else + raise ArgumentError, "destination must be an HTTP URL or unix: path" + end + init_path_vars(path) + @pool = ConnPool.new + @timeout = timeout + end + + def init_path_vars(path) + path ||= '$fullpath' + # methods from Rack::Request we want: + allow = %w(fullpath host_with_port host port url path) + want = path.scan(/\$(\w+)/).flatten! || [] + diff = want - allow + diff.empty? or + raise ArgumentError, "vars not allowed: #{diff.uniq.join(' ')}" + + # kill leading slash just in case... + @path = path.gsub(%r{\A/(\$(?:fullpath|path))}, '\1') + end + + def call(env) + request_method = env['REQUEST_METHOD'] + req = Rack::Request.new(env) + path = @path.gsub(/\$(\w+)/) { req.__send__($1) } + req = "#{request_method} #{path} HTTP/1.1\r\n" \ + "X-Forwarded-For: #{env["REMOTE_ADDR"]}\r\n" + + # pass most HTTP_* headers through as-is + chunked = false + env.each do |key, val| + %r{\AHTTP_(\w+)\z} =~ key or next + key = $1 + next if %r{\A(?:VERSION|CONNECTION|KEEP_ALIVE|X_FORWARDED_FOR)} =~ key + chunked = true if %r{\ATRANSFER_ENCODING} =~ key && val =~ /\bchunked\b/i + key.tr!("_", "-") + req << "#{key}: #{val}\r\n" + end + + # special cases which Rack does not prefix: + ctype = env["CONTENT_TYPE"] and req << "Content-Type: #{ctype}\r\n" + clen = env["CONTENT_LENGTH"] and req << "Content-Length: #{clen}\r\n" + req << "\r\n" + + # get an open socket and send the headers + ures = responder_get + ures.req_write(req, @timeout) + + # send the request body if there was one + send_body(env["rack.input"], ures, chunked) if chunked || clen + + # wait for the response here + _, header, body = res = ures.rack + + # don't let the upstream Connection and Keep-Alive headers leak through + header.delete_if do |k,_| + k =~ /\A(?:Connection|Keep-Alive)\z/i + end + + case request_method + when "HEAD" + # kcar doesn't know if it's a HEAD or GET response, and HEAD + # responses have Content-Length in it which fools kcar... + body.parser.body_bytes_left = 0 + res[1] = header.dup + body.close # clobbers original header + res[2] = body = [] + end + res + rescue => e + retry if ures && ures.fail_retryable? && request_method != "POST" + Yahns::Log.exception(env['rack.logger'], 'proxy_pass', e) + [ 502, [ %w(Content-Length 0), %w(Content-Type text/plain) ], [] ] + end + + def send_body(input, ures, chunked) + buf = Thread.current[:yahns_rbuf] ||= "" + + if chunked # unlikely + while input.read(16384, buf) + buf.replace("#{buf.size.to_s(16)}\r\n#{buf}\r\n") + ures.req_write(buf, @timeout) + end + ures.req_write("0\r\n\r\n", @timeout) + else # common if we hit uploads + while input.read(16384, buf) + ures.req_write(buf, @timeout) + end + end + end +end diff --git a/test/test_extras_proxy_pass.rb b/test/test_proxy_pass.rb similarity index 93% rename from test/test_extras_proxy_pass.rb rename to test/test_proxy_pass.rb index 5bbce73..530b53c 100644 --- a/test/test_extras_proxy_pass.rb +++ b/test/test_proxy_pass.rb @@ -45,12 +45,12 @@ class TestExtrasProxyPass < Testcase @srv.autoclose = @srv2.autoclose = false ENV["YAHNS_FD"] = "#{@srv.fileno},#{@srv2.fileno}" $LOAD_PATH.unshift "#{Dir.pwd}/extras" - require 'proxy_pass' + require 'yahns/proxy_pass' cfg.instance_eval do - app(:rack, ProxyPass.new("unix:#{unix_path}:/$fullpath")) do + app(:rack, Yahns::ProxyPass.new("unix:#{unix_path}:/$fullpath")) do listen "#{host}:#{port}" end - app(:rack, ProxyPass.new("unix:#{unix_path}:/foo$fullpath")) do + app(:rack, Yahns::ProxyPass.new("unix:#{unix_path}:/foo$fullpath")) do listen "#{host2}:#{port2}" end stderr_path err.path @@ -108,10 +108,10 @@ class TestExtrasProxyPass < Testcase host2, port2 = @srv2.addr[3], @srv2.addr[1] pid = mkserver(cfg) do $LOAD_PATH.unshift "#{Dir.pwd}/extras" - require 'proxy_pass' + require 'yahns/proxy_pass' @srv2.close cfg.instance_eval do - app(:rack, ProxyPass.new("http://#{host2}:#{port2}/")) do + app(:rack, Yahns::ProxyPass.new("http://#{host2}:#{port2}/")) do listen "#{host}:#{port}" end stderr_path err.path -- EW