From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.3.2 (2011-06-06) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-2.9 required=3.0 tests=ALL_TRUSTED,AWL,BAYES_00, URIBL_BLOCKED shortcircuit=no autolearn=unavailable version=3.3.2 X-Original-To: yahns-public@yhbt.net Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 5D6EA1F5F1 for ; Fri, 3 Apr 2015 01:53:34 +0000 (UTC) From: Eric Wong To: yahns-public@yhbt.net Subject: [PATCH] proxy_pass: rewrite to be async, using rack.hijack Date: Fri, 3 Apr 2015 01:53:34 +0000 Message-Id: <1428026014-23421-1-git-send-email-e@80x24.org> List-Id: This allows our reverse proxy to avoid having an innefficient 1:1 relationship between threads and upstream connections, reducing memory usage when there are many upstream connections (possibly to multiple backend machines). --- lib/yahns/proxy_http_response.rb | 226 ++++++++++++++++++++++++++++++++ lib/yahns/proxy_pass.rb | 276 +++++++++++++++++++++------------------ lib/yahns/queue_epoll.rb | 5 + lib/yahns/queue_kqueue.rb | 5 + lib/yahns/wbuf.rb | 9 +- test/test_proxy_pass.rb | 120 ++++++++++++++++- 6 files changed, 510 insertions(+), 131 deletions(-) create mode 100644 lib/yahns/proxy_http_response.rb diff --git a/lib/yahns/proxy_http_response.rb b/lib/yahns/proxy_http_response.rb new file mode 100644 index 0000000..31505d3 --- /dev/null +++ b/lib/yahns/proxy_http_response.rb @@ -0,0 +1,226 @@ +# -*- encoding: binary -*- +# Copyright (C) 2015 all contributors +# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) + +# loaded by yahns/proxy_pass, this relies on Yahns::HttpResponse for +# constants. +module Yahns::HttpResponse # :nodoc: + + # write everything in buf to our client socket (or wbuf, if it exists) + # it may return a newly-created wbuf or nil + def proxy_write(wbuf, buf, alive) + unless wbuf + # no write buffer, try to write directly to the client socket + case rv = String === buf ? kgio_trywrite(buf) : kgio_trywritev(buf) + when nil then return # done writing buf, likely + when String, Array # partial write, hope the skb grows + buf = rv + when :wait_writable, :wait_readable + wbuf = Yahns::Wbuf.new(nil, alive, self.class.output_buffer_tmpdir, rv) + buf = buf.join if Array === buf + break + end while true + end + + wbuf.wbuf_write(self, buf) + wbuf.busy ? wbuf : nil + end + + def proxy_err_response(code, req_res, exc, wbuf) + logger = @hs.env['rack.logger'] + if exc + Yahns::Log.exception(logger, 'upstream error', exc) + else + logger.error('premature upstream EOF') + end + # try to write something, but don't care if we fail + Integer === code and + kgio_trywrite("HTTP/1.1 #{CODES[code]}\r\n\r\n") rescue nil + + shutdown rescue nil + req_res.shutdown rescue nil + nil # signal close of req_res from yahns_step in yahns/proxy_pass.rb + ensure + wbuf.wbuf_abort if wbuf + end + + # returns :wait_readable if we need to read more from req_res + # returns :ignore if we yield control to the client(self) + # returns nil if completely done + def proxy_response_start(res, tip, kcar, req_res) + status, headers = res + si = status.to_i + status = CODES[si] || status + env = @hs.env + have_body = !Rack::Utils::STATUS_WITH_NO_ENTITY_BODY.include?(si) && + env[REQUEST_METHOD] != HEAD + flags = MSG_DONTWAIT + rechunk = false + k = self.class + alive = @hs.next? && k.persistent_connections + + res = "HTTP/1.1 #{status}\r\n" + headers.each do |key,value| # n.b.: headers is an Array of 2-element Arrays + case key + when /\A(?:Connection|Keep-Alive)\z/i + next # do not let some upstream headers leak through + when %r{\AContent-Length\z}i + flags |= MSG_MORE if have_body && value.to_i > 0 + when %r{\ATransfer-Encoding\z}i + if value =~ /\bchunked\b/i + case env['HTTP_VERSION'] # this is the original client request + when 'HTTP/1.1' + rechunk = true + else + # too expensive to calculate Content-Length for HTTP/1.0 + # persistent clients, so we will drop persistence instead + alive = false # this should already be implied + next + end + end + end + + res << "#{key}: #{value}\r\n" + end + + # For now, do not add a Date: header, assume upstream already did it + # but do not care if they did not + res << (alive ? CONN_KA : CONN_CLOSE) + + # send the headers + case rv = kgio_syssend(res, flags) + when nil then break # all done, likely + when String # partial write, highly unlikely + flags = MSG_DONTWAIT + res = rv # hope the skb grows + when :wait_writable, :wait_readable # highly unlikely in real apps + wbuf = proxy_write(nil, res, alive) + break # keep buffering as much as possible + end while true + + rbuf = Thread.current[:yahns_rbuf] + tip = tip.empty? ? [] : [ tip ] + + if have_body + if len = kcar.body_bytes_left + + case tmp = tip.shift || req_res.kgio_tryread(0x2000, rbuf) + when String + len = kcar.body_bytes_left -= tmp.size + wbuf = proxy_write(wbuf, tmp, alive) + when nil # premature EOF + return proxy_err_response(nil, req_res, nil, wbuf) + when :wait_readable + # for ensure: + wbuf ||= Yahns::Wbuf.new(nil, alive, k.output_buffer_tmpdir, false) + return :wait_readable # self remains in :ignore, wait on upstream + end until len == 0 + + else # nasty chunked body + + # Only HTTP/1.1 supports chunked responses, we must translate + # otherwise. Otherwise, we must drop the connection to signal + # the end. We only send HTTP/1.1 requests to the upstream so + # Rack/Rails can always send streaming responses. + buf = '' + case tmp = tip.shift || req_res.kgio_tryread(0x2000, rbuf) + when String + kcar.filter_body(buf, tmp) + unless buf.empty? + tmp = rechunk ? [ "#{buf.size.to_s(16)}\r\n", buf, "\r\n".freeze ] + : buf + wbuf = proxy_write(wbuf, tmp, alive) + end + when nil # premature EOF + return proxy_err_response(nil, req_res, nil, wbuf) + when :wait_readable + # for ensure: + wbuf ||= Yahns::Wbuf.new(nil, alive, k.output_buffer_tmpdir, false) + return :wait_readable # self remains in :ignore, wait on upstream + end until kcar.body_eof? + + # TODO: Trailer support + wbuf = proxy_write(wbuf, "0\r\n\r\n".freeze, alive) if rechunk + end + end + + wbuf and return proxy_busy_mod_blocked(wbuf, wbuf.busy) + proxy_busy_mod_done(alive) + rescue => e + proxy_err_response(502, req_res, e, wbuf) + ensure + # this happens if this method returns :wait_readable + req_res.resbuf = wbuf if wbuf + end + + def proxy_response_finish(kcar, wbuf, req_res) + rbuf = Thread.current[:yahns_rbuf] + if len = kcar.body_bytes_left + + case tmp = req_res.kgio_tryread(0x2000, rbuf) + when String + len = kcar.body_bytes_left -= tmp.size + wbuf.wbuf_write(self, tmp) + when nil # premature EOF + return proxy_err_response(nil, req_res, nil, wbuf) + when :wait_readable + return :wait_readable # self remains in :ignore, wait on upstream + end while len != 0 + + else # nasty chunked body + + # Only HTTP/1.1 supports chunked responses, we must translate + # otherwise. Otherwise, we must drop the connection to signal + # the end. We only send HTTP/1.1 requests to the upstream so + # Rack/Rails can always send streaming responses. + rechunk = @hs.env['HTTP_VERSION'] == 'HTTP/1.1'.freeze + buf = '' + case tmp = req_res.kgio_tryread(0x2000, rbuf) + when String + kcar.filter_body(buf, tmp) + unless buf.empty? + tmp = rechunk ? [ "#{buf.size.to_s(16)}\r\n", buf, "\r\n".freeze ] + : buf + wbuf.wbuf_write(self, tmp) + end + when nil # premature EOF + return proxy_err_response(nil, req_res, nil, wbuf) + when :wait_readable + return :wait_readable # self remains in :ignore, wait on upstream + end until kcar.body_eof? + + # TODO: Trailer support + wbuf.wbuf_write(self, "0\r\n\r\n".freeze) if rechunk + end + + busy = wbuf.busy and return proxy_busy_mod_blocked(wbuf, busy) + proxy_busy_mod_done(wbuf.wbuf_persist) + end + + def proxy_busy_mod_done(alive) + q = Thread.current[:yahns_queue] + case http_response_done(alive) + when :wait_readable then q.queue_mod(self, Yahns::Queue::QEV_RD) + when :wait_writable then q.queue_mod(self, Yahns::Queue::QEV_WR) + when :close then Thread.current[:yahns_fdmap].sync_close(self) + end + + nil # close the req_res, too + end + + def proxy_busy_mod_blocked(wbuf, busy) + q = Thread.current[:yahns_queue] + # we are completely done reading and buffering the upstream response, + # but have not completely written the response to the client, + # yield control to the client socket: + @state = wbuf + case busy + when :wait_readable then q.queue_mod(self, Yahns::Queue::QEV_RD) + when :wait_writable then q.queue_mod(self, Yahns::Queue::QEV_WR) + else + abort "BUG: invalid wbuf.busy: #{busy.inspect}" + end + # no touching self after queue_mod + :ignore + end +end diff --git a/lib/yahns/proxy_pass.rb b/lib/yahns/proxy_pass.rb index 9c12e99..09fb884 100644 --- a/lib/yahns/proxy_pass.rb +++ b/lib/yahns/proxy_pass.rb @@ -5,110 +5,168 @@ require 'kgio' require 'kcar' # gem install kcar require 'rack/request' -require 'thread' require 'timeout' -# Totally synchronous and Rack 1.1-compatible, this will probably be rewritten. -# to take advantage of rack.hijack and use the non-blocking I/O facilities -# in yahns. yahns may have to grow a supported API for that... -# For now, we this blocks a worker thread; fortunately threads are reasonably -# cheap on GNU/Linux... +require_relative 'proxy_http_response' + class Yahns::ProxyPass # :nodoc: - class ConnPool - def initialize - @mtx = Mutex.new - @objs = [] - end + class ReqRes < Kgio::Socket + attr_writer :resbuf - def get - @mtx.synchronize { @objs.pop } + def req_start(c, req, input, chunked) + @hdr = @resbuf = nil + @yahns_client = c + @rrstate = input ? [ req, input, chunked ] : req + Thread.current[:yahns_queue].queue_add(self, Yahns::Queue::QEV_WR) end - def put(obj) - @mtx.synchronize { @objs << obj } + # we must reinitialize the thread-local rbuf if it may get beyond the + # current thread + def detach_rbuf! + Thread.current[:yahns_rbuf] = '' end - end - class UpstreamSocket < Kgio::Socket # :nodoc: - attr_writer :expiry + def yahns_step # yahns event loop entry point + case req = @rrstate + when Kcar::Parser # reading response... + buf = Thread.current[:yahns_rbuf] + c = @yahns_client - # called automatically by kgio_read! - def kgio_wait_readable(timeout = nil) - super(timeout || wait_time) - end + case resbuf = @resbuf # where are we at the response? + when nil # common case, catch the response header in a single read - def wait_time - tout = @expiry ? @expiry - Yahns.now : @timeout - raise Timeout::Error, "request timed out", [] if tout < 0 - tout - end + case rv = kgio_tryread(0x2000, buf) + when String + if res = req.headers(@hdr = [], rv) + return c.proxy_response_start(res, rv, req, self) + else # ugh, big headers or tricked response + buf = detach_rbuf! + @resbuf = rv + end + # continue looping in middle "case @resbuf" loop + when :wait_readable + return rv # spurious wakeup + when nil then return c.proxy_err_response(502, self, nil, nil) + end # NOT looping here - def readpartial(bytes, buf = Thread.current[:yahns_rbuf] ||= "") - case rv = kgio_read!(bytes, buf) - when String - @expiry += @timeout # bump expiry when we succeed - end - rv - end + when String # continue reading trickled response headers from upstream - def req_write(buf, timeout) - @timeout = timeout - @expiry = Yahns.now + timeout - case rv = kgio_trywrite(buf) - when :wait_writable - kgio_wait_writable(wait_time) - when nil - return - when String - buf = rv - end while true + case rv = kgio_tryread(0x2000, buf) + when String then res = req.headers(@hdr, resbuf << rv) and break + when :wait_readable then return rv + when nil then return c.proxy_err_response(502, self, nil, nil) + end while true + + return c.proxy_response_start(res, resbuf, req, self) + + when Yahns::WbufCommon # streaming/buffering the response body + + return c.proxy_response_finish(req, resbuf, self) + + end while true # case @resbuf + + when Array # [ (str|vec), rack.input, chunked? ] + send_req_body(req) # returns nil or :wait_writable + when String # buffered request header + send_req_buf(req) + end + rescue => e + c.proxy_err_response(502, self, e, nil) end - end # class UpstreamSocket - class UpstreamResponse < Kcar::Response # :nodoc: # Called by the Rack server at the end of a successful response def close - reusable = @parser.keepalive? && @parser.body_eof? + @hdr = @yahns_client = @rrstate = nil super - @pool.put(self) if reusable - nil end - # req is just a string buffer of HTTP headers - def req_write(req, timeout) - @sock.req_write(req, timeout) - end + # returns :wait_readable if complete, :wait_writable if not + def send_req_body(req) + buf, input, chunked = req - # returns true if the socket is still alive, nil if dead - def sock_alive? - @reused = (:wait_readable == (@sock.kgio_tryread(1) rescue nil)) ? - true : @sock.close - end + # get the first buffered chunk or vector + case rv = String === buf ? kgio_trywrite(buf) : kgio_trywritev(buf) + when String, Array + buf = rv # retry inner loop + when :wait_writable + req[0] = buf + return :wait_writable + when nil + break # onto writing body + end while true - # returns true if the socket was reused and thus retryable - def fail_retryable? - @sock.close - @reused + buf = Thread.current[:yahns_rbuf] + + # Note: input (env['rack.input']) is fully-buffered by default so + # we should not be waiting on a slow network resource when reading + # input. However, some weird configs may disable this on LANs + + if chunked + while input.read(0x2000, buf) + vec = [ "#{buf.size.to_s(16)}\r\n", buf, "\r\n".freeze ] + case rv = kgio_trywritev(vec) + when Array + vec = rv # partial write, retry in case loop + when :wait_writable + buf = detach_rbuf! + req[0] = vec + return :wait_writable + when nil + break # continue onto reading next chunk + end while true + end + close_req_body(input) + + # note: we do not send any trailer, they are folded into the header + # because this relies on full request buffering + send_req_buf("0\r\n\r\n".freeze) + else # identity request, easy: + while input.read(0x2000, buf) + case rv = kgio_trywrite(buf) + when String + buf = rv # partial write, retry in case loop + when :wait_writable + buf = detach_rbuf! + req[0] = buf + return :wait_writable + when nil + break # continue onto reading next block + end while true + end + + close_req_body(input) + prepare_wait_readable + end end - def initialize(sock, pool) - super(sock) - @reused = false - @pool = pool + def prepare_wait_readable + @rrstate = Kcar::Parser.new + :wait_readable # all done sending the request, wait for response end - end # class UpstreamResponse - # take a responder from the pool, we'll add the object back to the - # pool in UpstreamResponse#close - def responder_get - while obj = @pool.get - return obj if obj.sock_alive? + def close_req_body(input) + case input + when Yahns::TeeInput, IO, StringIO + input.close + end end - UpstreamResponse.new(UpstreamSocket.start(@sockaddr), @pool) - end + # n.b. buf must be a detached string not shared with + # Thread.current[:yahns_rbuf] of any thread + def send_req_buf(buf) + case rv = kgio_trywrite(buf) + when String + buf = rv # retry inner loop + when :wait_writable + @rrstate = buf + return :wait_writable + when nil + return prepare_wait_readable + end while true + end + end # class ReqRes - def initialize(dest, timeout = 5) + def initialize(dest) case dest when %r{\Aunix:([^:]+)(?::(/.*))?\z} path = $2 @@ -121,8 +179,6 @@ def initialize(dest, timeout = 5) raise ArgumentError, "destination must be an HTTP URL or unix: path" end init_path_vars(path) - @pool = ConnPool.new - @timeout = timeout end def init_path_vars(path) @@ -139,10 +195,15 @@ def init_path_vars(path) end def call(env) - request_method = env['REQUEST_METHOD'] + # 3-way handshake for TCP backends while we generate the request header + rr = ReqRes.start(@sockaddr) + c = env['rack.hijack'].call + req = Rack::Request.new(env) - path = @path.gsub(/\$(\w+)/) { req.__send__($1) } - req = "#{request_method} #{path} HTTP/1.1\r\n" \ + req = @path.gsub(/\$(\w+)/) { req.__send__($1) } + + # start the connection asynchronously and early so TCP can do a + req = "#{env['REQUEST_METHOD']} #{req} HTTP/1.1\r\n" \ "X-Forwarded-For: #{env["REMOTE_ADDR"]}\r\n" # pass most HTTP_* headers through as-is @@ -150,61 +211,24 @@ def call(env) env.each do |key, val| %r{\AHTTP_(\w+)\z} =~ key or next key = $1 - next if %r{\A(?:VERSION|CONNECTION|KEEP_ALIVE|X_FORWARDED_FOR)} =~ key + # trailers are folded into the header, so do not send the Trailer: + # header in the request + next if /\A(?:VERSION|CONNECTION|KEEP_ALIVE|X_FORWARDED_FOR|TRAILER)/ =~ + key chunked = true if %r{\ATRANSFER_ENCODING} =~ key && val =~ /\bchunked\b/i - key.tr!("_", "-") + key.tr!('_'.freeze, '-'.freeze) req << "#{key}: #{val}\r\n" end # special cases which Rack does not prefix: ctype = env["CONTENT_TYPE"] and req << "Content-Type: #{ctype}\r\n" clen = env["CONTENT_LENGTH"] and req << "Content-Length: #{clen}\r\n" - req << "\r\n" - - # get an open socket and send the headers - ures = responder_get - ures.req_write(req, @timeout) + input = chunked || (clen && clen.to_i > 0) ? env['rack.input'] : nil - # send the request body if there was one - send_body(env["rack.input"], ures, chunked) if chunked || clen - - # wait for the response here - _, header, body = res = ures.rack - - # don't let the upstream Connection and Keep-Alive headers leak through - header.delete_if do |k,_| - k =~ /\A(?:Connection|Keep-Alive)\z/i - end - - case request_method - when "HEAD" - # kcar doesn't know if it's a HEAD or GET response, and HEAD - # responses have Content-Length in it which fools kcar... - body.parser.body_bytes_left = 0 - res[1] = header.dup - body.close # clobbers original header - res[2] = body = [] - end - res + # finally, prepare to emit the headers + rr.req_start(c, req << "\r\n".freeze, input, chunked) rescue => e - retry if ures && ures.fail_retryable? && request_method != "POST" Yahns::Log.exception(env['rack.logger'], 'proxy_pass', e) [ 502, [ %w(Content-Length 0), %w(Content-Type text/plain) ], [] ] end - - def send_body(input, ures, chunked) - buf = Thread.current[:yahns_rbuf] ||= "" - - if chunked # unlikely - while input.read(16384, buf) - buf.replace("#{buf.size.to_s(16)}\r\n#{buf}\r\n") - ures.req_write(buf, @timeout) - end - ures.req_write("0\r\n\r\n", @timeout) - else # common if we hit uploads - while input.read(16384, buf) - ures.req_write(buf, @timeout) - end - end - end end diff --git a/lib/yahns/queue_epoll.rb b/lib/yahns/queue_epoll.rb index da90a95..10ff607 100644 --- a/lib/yahns/queue_epoll.rb +++ b/lib/yahns/queue_epoll.rb @@ -27,9 +27,14 @@ def queue_add(io, flags) epoll_ctl(Epoll::CTL_ADD, io, flags) end + def queue_mod(io, flags) + epoll_ctl(Epoll::CTL_MOD, io, flags) + end + def thr_init Thread.current[:yahns_rbuf] = "" Thread.current[:yahns_fdmap] = @fdmap + Thread.current[:yahns_queue] = self end # returns an array of infinitely running threads diff --git a/lib/yahns/queue_kqueue.rb b/lib/yahns/queue_kqueue.rb index 4e5c133..c0667b7 100644 --- a/lib/yahns/queue_kqueue.rb +++ b/lib/yahns/queue_kqueue.rb @@ -36,9 +36,14 @@ def queue_add(io, flags) kevent(Kevent[io.fileno, flags, fflags, 0, 0, io]) end + def queue_mod(io, flags) + kevent(Kevent[io.fileno, flags, ADD_ONESHOT, 0, 0, io]) + end + def thr_init Thread.current[:yahns_rbuf] = "" Thread.current[:yahns_fdmap] = @fdmap + Thread.current[:yahns_queue] = self end # returns an array of infinitely running threads diff --git a/lib/yahns/wbuf.rb b/lib/yahns/wbuf.rb index fa39b9b..42776cf 100644 --- a/lib/yahns/wbuf.rb +++ b/lib/yahns/wbuf.rb @@ -30,6 +30,7 @@ class Yahns::Wbuf # :nodoc: include Yahns::WbufCommon attr_reader :busy + attr_reader :wbuf_persist def initialize(body, persist, tmpdir, busy) @tmpio = nil @@ -71,7 +72,7 @@ def wbuf_write(c, buf) # we're all caught up, try to prevent dirty data from getting flushed # to disk if we can help it. - @tmpio = @tmpio.close + wbuf_abort @sf_offset = 0 @busy = false nil @@ -79,7 +80,11 @@ def wbuf_write(c, buf) # called by last wbuf_flush def wbuf_close(client) - @tmpio = @tmpio.close if @tmpio + wbuf_abort wbuf_close_common(client) end + + def wbuf_abort + @tmpio = @tmpio.close if @tmpio + end end diff --git a/test/test_proxy_pass.rb b/test/test_proxy_pass.rb index 5398b29..5bf8722 100644 --- a/test/test_proxy_pass.rb +++ b/test/test_proxy_pass.rb @@ -6,13 +6,45 @@ class TestProxyPass < Testcase ENV["N"].to_i > 1 and parallelize_me! include ServerHelper + OMFG = 'a' * (1024 * 1024 * 32) class ProxiedApp def call(env) h = [ %w(Content-Length 3), %w(Content-Type text/plain) ] case env['REQUEST_METHOD'] when 'GET' - [ 200, h, [ "hi\n"] ] + case env['PATH_INFO'] + when '/giant-body' + h = [ %W(Content-Length #{OMFG.size}), %w(Content-Type text/plain) ] + [ 200, h, [ OMFG ] ] + when %r{\A/slow-headers-(\d+(?:\.\d+)?)\z} + delay = $1.to_f + io = env['rack.hijack'].call + [ "HTTP/1.1 200 OK\r\n", + "Content-Length: 7\r\n", + "Content-Type: text/PAIN\r\n", + "connection: close\r\n\r\n", + "HIHIHI!" + ].each do |l| + io.write(l) + sleep delay + end + io.close + when %r{\A/chunky-slow-(\d+(?:\.\d+)?)\z} + delay = $1.to_f + chunky = Object.new + chunky.instance_variable_set(:@delay, delay) + def chunky.each + sleep @delay + yield "3\r\nHI!\r\n" + sleep @delay + yield "0\r\n\r\n" + end + h = [ %w(Content-Type text/pain), %w(Transfer-Encoding chunked) ] + [ 200, h, chunky ] + else + [ 200, h, [ "hi\n"] ] + end when 'HEAD' [ 200, h, [] ] when 'PUT' @@ -72,6 +104,7 @@ def test_unix_socket_no_path stderr_path err.path end end + Net::HTTP.start(host, port) do |http| res = http.request(Net::HTTP::Get.new('/f00')) assert_equal 200, res.code.to_i @@ -109,7 +142,7 @@ def test_proxy_pass require 'yahns/proxy_pass' @srv2.close cfg.instance_eval do - app(:rack, Yahns::ProxyPass.new("http://#{host2}:#{port2}/")) do + app(:rack, Yahns::ProxyPass.new("http://#{host2}:#{port2}")) do listen "#{host}:#{port}" end stderr_path err.path @@ -126,6 +159,8 @@ def test_proxy_pass end end + check_pipelining(host, port) + gplv3 = File.open('COPYING') Net::HTTP.start(host, port) do |http| @@ -138,7 +173,7 @@ def test_proxy_pass assert_equal n, res['Content-Length'].to_i assert_nil res.body - # chunked encoding + # chunked encoding (PUT) req = Net::HTTP::Put.new('/') req.body_stream = gplv3 req.content_type = 'application/octet-stream' @@ -148,6 +183,18 @@ def test_proxy_pass assert_equal gplv3.read, res.body assert_equal 201, res.code.to_i + # chunked encoding (GET) + res = http.request(Net::HTTP::Get.new('/chunky-slow-0.1')) + assert_equal 200, res.code.to_i + assert_equal 'chunked', res['Transfer-encoding'] + assert_equal "HI!", res.body + + # slow headers (GET) + res = http.request(Net::HTTP::Get.new('/slow-headers-0.01')) + assert_equal 200, res.code.to_i + assert_equal 'text/PAIN', res['Content-Type'] + assert_equal 'HIHIHI!', res.body + # normal content-length gplv3.rewind req = Net::HTTP::Put.new('/') @@ -158,10 +205,77 @@ def test_proxy_pass gplv3.rewind assert_equal gplv3.read, res.body assert_equal 201, res.code.to_i + + # giant body + res = http.request(Net::HTTP::Get.new('/giant-body')) + assert_equal 200, res.code.to_i + assert_equal OMFG, res.body + end + + # ensure we do not chunk responses back to an HTTP/1.0 client even if + # the proxy <-> upstream connection is chunky + %w(0 0.1).each do |delay| + begin + h10 = TCPSocket.new(host, port) + h10.write "GET /chunky-slow-#{delay} HTTP/1.0\r\n\r\n" + res = Timeout.timeout(60) { h10.read } + assert_match %r{^Connection: close\r\n}, res + assert_match %r{^Content-Type: text/pain\r\n}, res + assert_match %r{\r\n\r\nHI!\z}, res + refute_match %r{^Transfer-Encoding:}, res + refute_match %r{\r0\r\n}, res + ensure + h10.close + end end ensure gplv3.close if gplv3 quit_wait pid quit_wait pid2 end + + def check_pipelining(host, port) + pl = TCPSocket.new(host, port) + r1 = '' + r2 = '' + r3 = '' + Timeout.timeout(60) do + pl.write "GET / HTTP/1.1\r\nHost: example.com\r\n\r\nGET /" + until r1 =~ /hi\n/ + r1 << pl.readpartial(666) + end + + pl.write "chunky-slow-0.1 HTTP/1.1\r\nHost: example.com\r\n\r\nP" + until r2 =~ /\r\n3\r\nHI!\r\n0\r\n\r\n/ + r2 << pl.readpartial(666) + end + + if false + pl.write "ET / HTTP/1.1\r\nHost: example.com\r\n\r\n" + until r3 =~ /hi\n/ + r3 << pl.readpartial(666) + end + else + pl.write "UT / HTTP/1.1\r\nHost: example.com\r\n" + pl.write "Transfer-Encoding: chunked\r\n\r\n" + pl.write "6\r\nchunky\r\n" + pl.write "0\r\n\r\n" + + until r3 =~ /chunky/ + r3 << pl.readpartial(666) + end + end + end + r1 = r1.split("\r\n").reject { |x| x =~ /^Date: / } + r2 = r2.split("\r\n").reject { |x| x =~ /^Date: / } + assert_equal 'HTTP/1.1 200 OK', r1[0] + assert_equal 'HTTP/1.1 200 OK', r2[0] + assert_match %r{\r\n\r\nchunky\z}, r3 + assert_match %r{\AHTTP/1\.1 201 Created\r\n}, r3 + rescue => e + warn [ e.class, e.message ].inspect + warn e.backtrace.join("\n") + ensure + pl.close + end end -- EW