# -*- encoding: binary -*- # Copyright (C) 2013-2016 all contributors # License: GPL-3.0+ (https://www.gnu.org/licenses/gpl-3.0.txt) # frozen_string_literal: true require 'socket' require 'kgio' require 'kcar' # gem install kcar require 'rack/request' require 'timeout' require_relative 'proxy_http_response' class Yahns::ProxyPass # :nodoc: class ReqRes < Kgio::Socket # :nodoc: attr_writer :resbuf attr_accessor :proxy_trailers def req_start(c, req, input, chunked) @hdr = @resbuf = nil @yahns_client = c @rrstate = input ? [ req, input, chunked ] : req Thread.current[:yahns_queue].queue_add(self, Yahns::Queue::QEV_WR) end def yahns_step # yahns event loop entry point c = @yahns_client case req = @rrstate when Kcar::Parser # reading response... buf = Thread.current[:yahns_rbuf] case resbuf = @resbuf # where are we at the response? when nil # common case, catch the response header in a single read case rv = kgio_tryread(0x2000, buf) when String if res = req.headers(@hdr = [], rv) return c.proxy_response_start(res, rv, req, self) else # ugh, big headers or tricked response # we must reinitialize the thread-local rbuf if it may # live beyond the current thread buf = Thread.current[:yahns_rbuf] = ''.dup @resbuf = rv end # continue looping in middle "case @resbuf" loop when :wait_readable return rv # spurious wakeup when nil then return c.proxy_err_response(502, self, nil, nil) end # NOT looping here when String # continue reading trickled response headers from upstream case rv = kgio_tryread(0x2000, buf) when String then res = req.headers(@hdr, resbuf << rv) and break when :wait_readable then return rv when nil then return c.proxy_err_response(502, self, nil, nil) end while true return c.proxy_response_start(res, resbuf, req, self) when Yahns::WbufCommon # streaming/buffering the response body # we assign wbuf for rescue below: return c.proxy_response_finish(req, wbuf = resbuf, self) end while true # case @resbuf when Array # [ (str|vec), rack.input, chunked? ] send_req_body(req) # returns nil or :wait_writable when String # buffered request header send_req_buf(req) end rescue => e # avoid polluting logs with a giant backtrace when the problem isn't # fixable in code. case e when Errno::ECONNREFUSED, Errno::ECONNRESET, Errno::EPIPE e.set_backtrace([]) end c.proxy_err_response(502, self, e, wbuf) end def send_req_body_chunk(buf) case rv = String === buf ? kgio_trywrite(buf) : kgio_trywritev(buf) when String, Array buf.replace(rv) # retry loop on partial write when :wait_writable, nil # :wait_writable = upstream is reading slowly and making us wait return rv else abort "BUG: #{rv.inspect} from kgio_trywrite*" end while true end # returns :wait_readable if complete, :wait_writable if not def send_req_body(req) # @rrstate == [ (str|vec), rack.input, chunked? ] buf, input, chunked = req # send the first buffered chunk or vector rv = send_req_body_chunk(buf) and return rv # :wait_writable # yay, sent the first chunk, now read the body! rbuf = buf if chunked if String === buf # initial body req[0] = buf = [] else # try to reuse the biggest non-frozen buffer we just wrote; rbuf = buf.max_by(&:size) rbuf = ''.dup if rbuf.frozen? # unlikely... end end # Note: input (env['rack.input']) is fully-buffered by default so # we should not be waiting on a slow network resource when reading # input. However, some weird configs may disable this on LANs # and we may wait indefinitely on input.read here... while input.read(0x2000, rbuf) if chunked buf[0] = "#{rbuf.size.to_s(16)}\r\n".freeze buf[1] = rbuf buf[2] = "\r\n".freeze end rv = send_req_body_chunk(buf) and return rv # :wait_writable end rbuf.clear # all done, clear the big buffer # we cannot use respond_to?(:close) here since Rack::Lint::InputWrapper # tries to prevent that (and hijack means all Rack specs go out the door) case input when Yahns::TeeInput, IO input.close end # note: we do not send any trailer, they are folded into the header # because this relies on full request buffering # prepare_wait_readable is called by send_req_buf chunked ? send_req_buf("0\r\n\r\n".freeze) : prepare_wait_readable rescue Errno::EPIPE, Errno::ECONNRESET, Errno::ENOTCONN # no more reading off the client socket, just prepare to forward # the rejection response from the upstream (if any) @yahns_client.to_io.shutdown(Socket::SHUT_RD) prepare_wait_readable end def prepare_wait_readable @rrstate = Kcar::Parser.new :wait_readable # all done sending the request, wait for response end # n.b. buf must be a detached string not shared with # Thread.current[:yahns_rbuf] of any thread def send_req_buf(buf) case rv = kgio_trywrite(buf) when String buf = rv # retry inner loop when :wait_writable @rrstate = buf return :wait_writable when nil return prepare_wait_readable end while true end end # class ReqRes def initialize(dest, opts = {}) case dest when %r{\Aunix:([^:]+)(?::(/.*))?\z} path = $2 @sockaddr = Socket.sockaddr_un($1) when %r{\Ahttp://([^/]+)(/.*)?\z} path = $2 host, port = $1.split(':') @sockaddr = Socket.sockaddr_in(port || 80, host) else raise ArgumentError, "destination must be an HTTP URL or unix: path" end @response_headers = opts[:response_headers] || {} # It's wrong to send the backend Server tag through. Let users say # { "Server => "yahns" } if they want to advertise for us, but don't # advertise by default (for security) @response_headers['Server'] ||= :ignore init_path_vars(path) end def init_path_vars(path) path ||= '$fullpath' # methods from Rack::Request we want: allow = %w(fullpath host_with_port host port url path) want = path.scan(/\$(\w+)/).flatten! || [] diff = want - allow diff.empty? or raise ArgumentError, "vars not allowed: #{diff.uniq.join(' ')}" # kill leading slash just in case... @path = path.gsub(%r{\A/(\$(?:fullpath|path))}, '\1') end def call(env) # 3-way handshake for TCP backends while we generate the request header rr = ReqRes.start(@sockaddr) c = env['rack.hijack'].call req = Rack::Request.new(env) req = @path.gsub(/\$(\w+)/) { req.__send__($1) } # start the connection asynchronously and early so TCP can do a case ver = env['HTTP_VERSION'] when 'HTTP/1.1' # leave alone, response may be chunked else # no chunking for HTTP/1.0 and HTTP/0.9 ver = 'HTTP/1.0'.freeze end req = "#{env['REQUEST_METHOD']} #{req} #{ver}\r\n" \ "X-Forwarded-Proto: #{env['rack.url_scheme']}\r\n" \ "X-Forwarded-For: #{env["REMOTE_ADDR"]}\r\n".dup # pass most HTTP_* headers through as-is chunked = false env.each do |key, val| %r{\AHTTP_(\w+)\z} =~ key or next key = $1 # trailers are folded into the header, so do not send the Trailer: # header in the request next if /\A(?:VERSION|CONNECTION|KEEP_ALIVE|X_FORWARDED_FOR|TRAILER)/ =~ key 'TRANSFER_ENCODING'.freeze == key && val =~ /\bchunked\b/i and chunked = true key.tr!('_'.freeze, '-'.freeze) req << "#{key}: #{val}\r\n" end # special cases which Rack does not prefix: ctype = env["CONTENT_TYPE"] and req << "Content-Type: #{ctype}\r\n" clen = env["CONTENT_LENGTH"] and req << "Content-Length: #{clen}\r\n" input = chunked || (clen && clen.to_i > 0) ? env['rack.input'] : nil env['yahns.proxy_pass.response_headers'] = @response_headers # finally, prepare to emit the headers rr.req_start(c, req << "\r\n".freeze, input, chunked) # this probably breaks fewer middlewares than returning whatever else... [ 500, [], [] ] rescue => e Yahns::Log.exception(env['rack.logger'], 'proxy_pass', e) [ 502, { 'Content-Length' => '0', 'Content-Type' => 'text/plain' }, [] ] end end