From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on dcvr.yhbt.net X-Spam-Flag: YES X-Spam-Level: *********** X-Spam-ASN: AS200651 185.100.84.0/23 X-Spam-Status: Yes, score=11.2 required=3.0 tests=AWL,BAYES_50, RCVD_IN_BL_SPAMCOP_NET,RCVD_IN_BRBL_LASTEXT,RCVD_IN_MSPIKE_BL, RCVD_IN_MSPIKE_L5,RCVD_IN_PSBL,RCVD_IN_RP_RNBL,RCVD_IN_SBL_CSS,RCVD_IN_XBL, RDNS_NONE,SPF_FAIL,SPF_HELO_FAIL,URIBL_BLOCKED shortcircuit=no autolearn=no autolearn_force=no version=3.4.0 X-Spam-Report: * 0.0 URIBL_BLOCKED ADMINISTRATOR NOTICE: The query to URIBL was blocked. * See http://wiki.apache.org/spamassassin/DnsBlocklists#dnsbl-block * for more information. * [URIs: yhbt.net] * 3.3 RCVD_IN_SBL_CSS RBL: Received via a relay in Spamhaus SBL-CSS * [185.100.84.108 listed in zen.spamhaus.org] * 0.4 RCVD_IN_XBL RBL: Received via a relay in Spamhaus XBL * 1.4 RCVD_IN_BRBL_LASTEXT RBL: No description available. * [185.100.84.108 listed in bb.barracudacentral.org] * 0.0 RCVD_IN_MSPIKE_L5 RBL: Very bad reputation (-5) * [185.100.84.108 listed in bl.mailspike.net] * 2.7 RCVD_IN_PSBL RBL: Received via a relay in PSBL * [185.100.84.108 listed in psbl.surriel.com] * 1.3 RCVD_IN_BL_SPAMCOP_NET RBL: Received via a relay in bl.spamcop.net * [Blocked - see ] * 1.3 RCVD_IN_RP_RNBL RBL: Relay in RNBL, * https://senderscore.org/blacklistlookup/ * [185.100.84.108 listed in bl.score.senderscore.com] * 0.0 SPF_HELO_FAIL SPF: HELO does not match SPF record (fail) * [SPF failed: Please see http://www.openspf.org/Why?s=helo;id=80x24.org;ip=185.100.84.108;r=dcvr.yhbt.net] * 0.0 SPF_FAIL SPF: sender does not match SPF record (fail) * [SPF failed: Please see http://www.openspf.org/Why?s=mfrom;id=e%4080x24.org;ip=185.100.84.108;r=dcvr.yhbt.net] * 0.8 BAYES_50 BODY: Bayes spam probability is 40 to 60% * [score: 0.5000] * 0.0 RCVD_IN_MSPIKE_BL Mailspike blacklisted * 0.8 RDNS_NONE Delivered to internal network by a host with no rDNS * -0.9 AWL AWL: Adjusted score from AWL reputation of From: address Received: from 80x24.org (unknown [185.100.84.108]) by dcvr.yhbt.net (Postfix) with ESMTP id B1FA01FA7A for ; Mon, 16 May 2016 01:44:00 +0000 (UTC) From: Eric Wong To: yahns-public@yhbt.net Subject: [PATCH 6/7] proxy_pass: split out req_res into a separate file Date: Mon, 16 May 2016 01:43:39 +0000 Message-Id: <20160516014340.8258-7-e@80x24.org> In-Reply-To: <20160516014340.8258-1-e@80x24.org> References: <20160516014340.8258-1-e@80x24.org> List-Id: This makes the ReqRes class easier-to-find and hopefully maintain when using with other parts of yahns, although there may be no reason to use this class outside of ProxyPass. --- lib/yahns/proxy_pass.rb | 157 +---------------------------------------------- lib/yahns/req_res.rb | 159 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 161 insertions(+), 155 deletions(-) create mode 100644 lib/yahns/req_res.rb diff --git a/lib/yahns/proxy_pass.rb b/lib/yahns/proxy_pass.rb index a2d7d81..8e0b742 100644 --- a/lib/yahns/proxy_pass.rb +++ b/lib/yahns/proxy_pass.rb @@ -3,166 +3,13 @@ # License: GPL-3.0+ (https://www.gnu.org/licenses/gpl-3.0.txt) # frozen_string_literal: true require 'socket' -require 'kgio' -require 'kcar' # gem install kcar require 'rack/request' require 'timeout' require_relative 'proxy_http_response' +require_relative 'req_res' class Yahns::ProxyPass # :nodoc: - class ReqRes < Kgio::Socket # :nodoc: - attr_writer :resbuf - attr_accessor :proxy_trailers - - def req_start(c, req, input, chunked) - @hdr = @resbuf = nil - @yahns_client = c - @rrstate = input ? [ req, input, chunked ] : req - Thread.current[:yahns_queue].queue_add(self, Yahns::Queue::QEV_WR) - end - - def yahns_step # yahns event loop entry point - c = @yahns_client - case req = @rrstate - when Kcar::Parser # reading response... - buf = Thread.current[:yahns_rbuf] - - case resbuf = @resbuf # where are we at the response? - when nil # common case, catch the response header in a single read - - case rv = kgio_tryread(0x2000, buf) - when String - if res = req.headers(@hdr = [], rv) - return c.proxy_response_start(res, rv, req, self) - else # ugh, big headers or tricked response - # we must reinitialize the thread-local rbuf if it may - # live beyond the current thread - buf = Thread.current[:yahns_rbuf] = ''.dup - @resbuf = rv - end - # continue looping in middle "case @resbuf" loop - when :wait_readable - return rv # spurious wakeup - when nil then return c.proxy_err_response(502, self, nil, nil) - end # NOT looping here - - when String # continue reading trickled response headers from upstream - - case rv = kgio_tryread(0x2000, buf) - when String then res = req.headers(@hdr, resbuf << rv) and break - when :wait_readable then return rv - when nil then return c.proxy_err_response(502, self, nil, nil) - end while true - - return c.proxy_response_start(res, resbuf, req, self) - - when Yahns::WbufCommon # streaming/buffering the response body - - # we assign wbuf for rescue below: - return c.proxy_response_finish(req, wbuf = resbuf, self) - - end while true # case @resbuf - - when Array # [ (str|vec), rack.input, chunked? ] - send_req_body(req) # returns nil or :wait_writable - when String # buffered request header - send_req_buf(req) - end - rescue => e - # avoid polluting logs with a giant backtrace when the problem isn't - # fixable in code. - case e - when Errno::ECONNREFUSED, Errno::ECONNRESET, Errno::EPIPE - e.set_backtrace([]) - end - c.proxy_err_response(502, self, e, wbuf) - end - - def send_req_body_chunk(buf) - case rv = String === buf ? kgio_trywrite(buf) : kgio_trywritev(buf) - when String, Array - buf.replace(rv) # retry loop on partial write - when :wait_writable, nil - # :wait_writable = upstream is reading slowly and making us wait - return rv - else - abort "BUG: #{rv.inspect} from kgio_trywrite*" - end while true - end - - # returns :wait_readable if complete, :wait_writable if not - def send_req_body(req) # @rrstate == [ (str|vec), rack.input, chunked? ] - buf, input, chunked = req - - # send the first buffered chunk or vector - rv = send_req_body_chunk(buf) and return rv # :wait_writable - - # yay, sent the first chunk, now read the body! - rbuf = buf - if chunked - if String === buf # initial body - req[0] = buf = [] - else - # try to reuse the biggest non-frozen buffer we just wrote; - rbuf = buf.max_by(&:size) - rbuf = ''.dup if rbuf.frozen? # unlikely... - end - end - - # Note: input (env['rack.input']) is fully-buffered by default so - # we should not be waiting on a slow network resource when reading - # input. However, some weird configs may disable this on LANs - # and we may wait indefinitely on input.read here... - while input.read(0x2000, rbuf) - if chunked - buf[0] = "#{rbuf.size.to_s(16)}\r\n".freeze - buf[1] = rbuf - buf[2] = "\r\n".freeze - end - rv = send_req_body_chunk(buf) and return rv # :wait_writable - end - - rbuf.clear # all done, clear the big buffer - - # we cannot use respond_to?(:close) here since Rack::Lint::InputWrapper - # tries to prevent that (and hijack means all Rack specs go out the door) - case input - when Yahns::TeeInput, IO - input.close - end - - # note: we do not send any trailer, they are folded into the header - # because this relies on full request buffering - # prepare_wait_readable is called by send_req_buf - chunked ? send_req_buf("0\r\n\r\n".freeze) : prepare_wait_readable - rescue Errno::EPIPE, Errno::ECONNRESET, Errno::ENOTCONN - # no more reading off the client socket, just prepare to forward - # the rejection response from the upstream (if any) - @yahns_client.to_io.shutdown(Socket::SHUT_RD) - prepare_wait_readable - end - - def prepare_wait_readable - @rrstate = Kcar::Parser.new - :wait_readable # all done sending the request, wait for response - end - - # n.b. buf must be a detached string not shared with - # Thread.current[:yahns_rbuf] of any thread - def send_req_buf(buf) - case rv = kgio_trywrite(buf) - when String - buf = rv # retry inner loop - when :wait_writable - @rrstate = buf - return :wait_writable - when nil - return prepare_wait_readable - end while true - end - end # class ReqRes - def initialize(dest, opts = {}) case dest when %r{\Aunix:([^:]+)(?::(/.*))?\z} @@ -199,7 +46,7 @@ def init_path_vars(path) def call(env) # 3-way handshake for TCP backends while we generate the request header - rr = ReqRes.start(@sockaddr) + rr = Yahns::ReqRes.start(@sockaddr) c = env['rack.hijack'].call req = Rack::Request.new(env) diff --git a/lib/yahns/req_res.rb b/lib/yahns/req_res.rb new file mode 100644 index 0000000..3b0d298 --- /dev/null +++ b/lib/yahns/req_res.rb @@ -0,0 +1,159 @@ +# -*- encoding: binary -*- +# Copyright (C) 2013-2016 all contributors +# License: GPL-3.0+ (https://www.gnu.org/licenses/gpl-3.0.txt) +# frozen_string_literal: true +# Only used by Yahns::ProxyPass +require 'kcar' # gem install kcar +require 'kgio' + +class Yahns::ReqRes < Kgio::Socket # :nodoc: + attr_writer :resbuf + attr_accessor :proxy_trailers + + def req_start(c, req, input, chunked) + @hdr = @resbuf = nil + @yahns_client = c + @rrstate = input ? [ req, input, chunked ] : req + Thread.current[:yahns_queue].queue_add(self, Yahns::Queue::QEV_WR) + end + + def yahns_step # yahns event loop entry point + c = @yahns_client + case req = @rrstate + when Kcar::Parser # reading response... + buf = Thread.current[:yahns_rbuf] + + case resbuf = @resbuf # where are we at the response? + when nil # common case, catch the response header in a single read + + case rv = kgio_tryread(0x2000, buf) + when String + if res = req.headers(@hdr = [], rv) + return c.proxy_response_start(res, rv, req, self) + else # ugh, big headers or tricked response + # we must reinitialize the thread-local rbuf if it may + # live beyond the current thread + buf = Thread.current[:yahns_rbuf] = ''.dup + @resbuf = rv + end + # continue looping in middle "case @resbuf" loop + when :wait_readable + return rv # spurious wakeup + when nil then return c.proxy_err_response(502, self, nil, nil) + end # NOT looping here + + when String # continue reading trickled response headers from upstream + + case rv = kgio_tryread(0x2000, buf) + when String then res = req.headers(@hdr, resbuf << rv) and break + when :wait_readable then return rv + when nil then return c.proxy_err_response(502, self, nil, nil) + end while true + + return c.proxy_response_start(res, resbuf, req, self) + + when Yahns::WbufCommon # streaming/buffering the response body + + # we assign wbuf for rescue below: + return c.proxy_response_finish(req, wbuf = resbuf, self) + + end while true # case @resbuf + + when Array # [ (str|vec), rack.input, chunked? ] + send_req_body(req) # returns nil or :wait_writable + when String # buffered request header + send_req_buf(req) + end + rescue => e + # avoid polluting logs with a giant backtrace when the problem isn't + # fixable in code. + case e + when Errno::ECONNREFUSED, Errno::ECONNRESET, Errno::EPIPE + e.set_backtrace([]) + end + c.proxy_err_response(502, self, e, wbuf) + end + + def send_req_body_chunk(buf) + case rv = String === buf ? kgio_trywrite(buf) : kgio_trywritev(buf) + when String, Array + buf.replace(rv) # retry loop on partial write + when :wait_writable, nil + # :wait_writable = upstream is reading slowly and making us wait + return rv + else + abort "BUG: #{rv.inspect} from kgio_trywrite*" + end while true + end + + # returns :wait_readable if complete, :wait_writable if not + def send_req_body(req) # @rrstate == [ (str|vec), rack.input, chunked? ] + buf, input, chunked = req + + # send the first buffered chunk or vector + rv = send_req_body_chunk(buf) and return rv # :wait_writable + + # yay, sent the first chunk, now read the body! + rbuf = buf + if chunked + if String === buf # initial body + req[0] = buf = [] + else + # try to reuse the biggest non-frozen buffer we just wrote; + rbuf = buf.max_by(&:size) + rbuf = ''.dup if rbuf.frozen? # unlikely... + end + end + + # Note: input (env['rack.input']) is fully-buffered by default so + # we should not be waiting on a slow network resource when reading + # input. However, some weird configs may disable this on LANs + # and we may wait indefinitely on input.read here... + while input.read(0x2000, rbuf) + if chunked + buf[0] = "#{rbuf.size.to_s(16)}\r\n".freeze + buf[1] = rbuf + buf[2] = "\r\n".freeze + end + rv = send_req_body_chunk(buf) and return rv # :wait_writable + end + + rbuf.clear # all done, clear the big buffer + + # we cannot use respond_to?(:close) here since Rack::Lint::InputWrapper + # tries to prevent that (and hijack means all Rack specs go out the door) + case input + when Yahns::TeeInput, IO + input.close + end + + # note: we do not send any trailer, they are folded into the header + # because this relies on full request buffering + # prepare_wait_readable is called by send_req_buf + chunked ? send_req_buf("0\r\n\r\n".freeze) : prepare_wait_readable + rescue Errno::EPIPE, Errno::ECONNRESET, Errno::ENOTCONN + # no more reading off the client socket, just prepare to forward + # the rejection response from the upstream (if any) + @yahns_client.to_io.shutdown(Socket::SHUT_RD) + prepare_wait_readable + end + + def prepare_wait_readable + @rrstate = Kcar::Parser.new + :wait_readable # all done sending the request, wait for response + end + + # n.b. buf must be a detached string not shared with + # Thread.current[:yahns_rbuf] of any thread + def send_req_buf(buf) + case rv = kgio_trywrite(buf) + when String + buf = rv # retry inner loop + when :wait_writable + @rrstate = buf + return :wait_writable + when nil + return prepare_wait_readable + end while true + end +end # class ReqRes