* [PATCH] proxy_pass: rewrite to be async, using rack.hijack
@ 2015-04-03 1:53 Eric Wong
0 siblings, 0 replies; only message in thread
From: Eric Wong @ 2015-04-03 1:53 UTC (permalink / raw)
To: yahns-public
This allows our reverse proxy to avoid having an innefficient 1:1
relationship between threads and upstream connections, reducing
memory usage when there are many upstream connections (possibly to
multiple backend machines).
---
lib/yahns/proxy_http_response.rb | 226 ++++++++++++++++++++++++++++++++
lib/yahns/proxy_pass.rb | 276 +++++++++++++++++++++------------------
lib/yahns/queue_epoll.rb | 5 +
lib/yahns/queue_kqueue.rb | 5 +
lib/yahns/wbuf.rb | 9 +-
test/test_proxy_pass.rb | 120 ++++++++++++++++-
6 files changed, 510 insertions(+), 131 deletions(-)
create mode 100644 lib/yahns/proxy_http_response.rb
diff --git a/lib/yahns/proxy_http_response.rb b/lib/yahns/proxy_http_response.rb
new file mode 100644
index 0000000..31505d3
--- /dev/null
+++ b/lib/yahns/proxy_http_response.rb
@@ -0,0 +1,226 @@
+# -*- encoding: binary -*-
+# Copyright (C) 2015 all contributors <yahns-public@yhbt.net>
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+
+# loaded by yahns/proxy_pass, this relies on Yahns::HttpResponse for
+# constants.
+module Yahns::HttpResponse # :nodoc:
+
+ # write everything in buf to our client socket (or wbuf, if it exists)
+ # it may return a newly-created wbuf or nil
+ def proxy_write(wbuf, buf, alive)
+ unless wbuf
+ # no write buffer, try to write directly to the client socket
+ case rv = String === buf ? kgio_trywrite(buf) : kgio_trywritev(buf)
+ when nil then return # done writing buf, likely
+ when String, Array # partial write, hope the skb grows
+ buf = rv
+ when :wait_writable, :wait_readable
+ wbuf = Yahns::Wbuf.new(nil, alive, self.class.output_buffer_tmpdir, rv)
+ buf = buf.join if Array === buf
+ break
+ end while true
+ end
+
+ wbuf.wbuf_write(self, buf)
+ wbuf.busy ? wbuf : nil
+ end
+
+ def proxy_err_response(code, req_res, exc, wbuf)
+ logger = @hs.env['rack.logger']
+ if exc
+ Yahns::Log.exception(logger, 'upstream error', exc)
+ else
+ logger.error('premature upstream EOF')
+ end
+ # try to write something, but don't care if we fail
+ Integer === code and
+ kgio_trywrite("HTTP/1.1 #{CODES[code]}\r\n\r\n") rescue nil
+
+ shutdown rescue nil
+ req_res.shutdown rescue nil
+ nil # signal close of req_res from yahns_step in yahns/proxy_pass.rb
+ ensure
+ wbuf.wbuf_abort if wbuf
+ end
+
+ # returns :wait_readable if we need to read more from req_res
+ # returns :ignore if we yield control to the client(self)
+ # returns nil if completely done
+ def proxy_response_start(res, tip, kcar, req_res)
+ status, headers = res
+ si = status.to_i
+ status = CODES[si] || status
+ env = @hs.env
+ have_body = !Rack::Utils::STATUS_WITH_NO_ENTITY_BODY.include?(si) &&
+ env[REQUEST_METHOD] != HEAD
+ flags = MSG_DONTWAIT
+ rechunk = false
+ k = self.class
+ alive = @hs.next? && k.persistent_connections
+
+ res = "HTTP/1.1 #{status}\r\n"
+ headers.each do |key,value| # n.b.: headers is an Array of 2-element Arrays
+ case key
+ when /\A(?:Connection|Keep-Alive)\z/i
+ next # do not let some upstream headers leak through
+ when %r{\AContent-Length\z}i
+ flags |= MSG_MORE if have_body && value.to_i > 0
+ when %r{\ATransfer-Encoding\z}i
+ if value =~ /\bchunked\b/i
+ case env['HTTP_VERSION'] # this is the original client request
+ when 'HTTP/1.1'
+ rechunk = true
+ else
+ # too expensive to calculate Content-Length for HTTP/1.0
+ # persistent clients, so we will drop persistence instead
+ alive = false # this should already be implied
+ next
+ end
+ end
+ end
+
+ res << "#{key}: #{value}\r\n"
+ end
+
+ # For now, do not add a Date: header, assume upstream already did it
+ # but do not care if they did not
+ res << (alive ? CONN_KA : CONN_CLOSE)
+
+ # send the headers
+ case rv = kgio_syssend(res, flags)
+ when nil then break # all done, likely
+ when String # partial write, highly unlikely
+ flags = MSG_DONTWAIT
+ res = rv # hope the skb grows
+ when :wait_writable, :wait_readable # highly unlikely in real apps
+ wbuf = proxy_write(nil, res, alive)
+ break # keep buffering as much as possible
+ end while true
+
+ rbuf = Thread.current[:yahns_rbuf]
+ tip = tip.empty? ? [] : [ tip ]
+
+ if have_body
+ if len = kcar.body_bytes_left
+
+ case tmp = tip.shift || req_res.kgio_tryread(0x2000, rbuf)
+ when String
+ len = kcar.body_bytes_left -= tmp.size
+ wbuf = proxy_write(wbuf, tmp, alive)
+ when nil # premature EOF
+ return proxy_err_response(nil, req_res, nil, wbuf)
+ when :wait_readable
+ # for ensure:
+ wbuf ||= Yahns::Wbuf.new(nil, alive, k.output_buffer_tmpdir, false)
+ return :wait_readable # self remains in :ignore, wait on upstream
+ end until len == 0
+
+ else # nasty chunked body
+
+ # Only HTTP/1.1 supports chunked responses, we must translate
+ # otherwise. Otherwise, we must drop the connection to signal
+ # the end. We only send HTTP/1.1 requests to the upstream so
+ # Rack/Rails can always send streaming responses.
+ buf = ''
+ case tmp = tip.shift || req_res.kgio_tryread(0x2000, rbuf)
+ when String
+ kcar.filter_body(buf, tmp)
+ unless buf.empty?
+ tmp = rechunk ? [ "#{buf.size.to_s(16)}\r\n", buf, "\r\n".freeze ]
+ : buf
+ wbuf = proxy_write(wbuf, tmp, alive)
+ end
+ when nil # premature EOF
+ return proxy_err_response(nil, req_res, nil, wbuf)
+ when :wait_readable
+ # for ensure:
+ wbuf ||= Yahns::Wbuf.new(nil, alive, k.output_buffer_tmpdir, false)
+ return :wait_readable # self remains in :ignore, wait on upstream
+ end until kcar.body_eof?
+
+ # TODO: Trailer support
+ wbuf = proxy_write(wbuf, "0\r\n\r\n".freeze, alive) if rechunk
+ end
+ end
+
+ wbuf and return proxy_busy_mod_blocked(wbuf, wbuf.busy)
+ proxy_busy_mod_done(alive)
+ rescue => e
+ proxy_err_response(502, req_res, e, wbuf)
+ ensure
+ # this happens if this method returns :wait_readable
+ req_res.resbuf = wbuf if wbuf
+ end
+
+ def proxy_response_finish(kcar, wbuf, req_res)
+ rbuf = Thread.current[:yahns_rbuf]
+ if len = kcar.body_bytes_left
+
+ case tmp = req_res.kgio_tryread(0x2000, rbuf)
+ when String
+ len = kcar.body_bytes_left -= tmp.size
+ wbuf.wbuf_write(self, tmp)
+ when nil # premature EOF
+ return proxy_err_response(nil, req_res, nil, wbuf)
+ when :wait_readable
+ return :wait_readable # self remains in :ignore, wait on upstream
+ end while len != 0
+
+ else # nasty chunked body
+
+ # Only HTTP/1.1 supports chunked responses, we must translate
+ # otherwise. Otherwise, we must drop the connection to signal
+ # the end. We only send HTTP/1.1 requests to the upstream so
+ # Rack/Rails can always send streaming responses.
+ rechunk = @hs.env['HTTP_VERSION'] == 'HTTP/1.1'.freeze
+ buf = ''
+ case tmp = req_res.kgio_tryread(0x2000, rbuf)
+ when String
+ kcar.filter_body(buf, tmp)
+ unless buf.empty?
+ tmp = rechunk ? [ "#{buf.size.to_s(16)}\r\n", buf, "\r\n".freeze ]
+ : buf
+ wbuf.wbuf_write(self, tmp)
+ end
+ when nil # premature EOF
+ return proxy_err_response(nil, req_res, nil, wbuf)
+ when :wait_readable
+ return :wait_readable # self remains in :ignore, wait on upstream
+ end until kcar.body_eof?
+
+ # TODO: Trailer support
+ wbuf.wbuf_write(self, "0\r\n\r\n".freeze) if rechunk
+ end
+
+ busy = wbuf.busy and return proxy_busy_mod_blocked(wbuf, busy)
+ proxy_busy_mod_done(wbuf.wbuf_persist)
+ end
+
+ def proxy_busy_mod_done(alive)
+ q = Thread.current[:yahns_queue]
+ case http_response_done(alive)
+ when :wait_readable then q.queue_mod(self, Yahns::Queue::QEV_RD)
+ when :wait_writable then q.queue_mod(self, Yahns::Queue::QEV_WR)
+ when :close then Thread.current[:yahns_fdmap].sync_close(self)
+ end
+
+ nil # close the req_res, too
+ end
+
+ def proxy_busy_mod_blocked(wbuf, busy)
+ q = Thread.current[:yahns_queue]
+ # we are completely done reading and buffering the upstream response,
+ # but have not completely written the response to the client,
+ # yield control to the client socket:
+ @state = wbuf
+ case busy
+ when :wait_readable then q.queue_mod(self, Yahns::Queue::QEV_RD)
+ when :wait_writable then q.queue_mod(self, Yahns::Queue::QEV_WR)
+ else
+ abort "BUG: invalid wbuf.busy: #{busy.inspect}"
+ end
+ # no touching self after queue_mod
+ :ignore
+ end
+end
diff --git a/lib/yahns/proxy_pass.rb b/lib/yahns/proxy_pass.rb
index 9c12e99..09fb884 100644
--- a/lib/yahns/proxy_pass.rb
+++ b/lib/yahns/proxy_pass.rb
@@ -5,110 +5,168 @@
require 'kgio'
require 'kcar' # gem install kcar
require 'rack/request'
-require 'thread'
require 'timeout'
-# Totally synchronous and Rack 1.1-compatible, this will probably be rewritten.
-# to take advantage of rack.hijack and use the non-blocking I/O facilities
-# in yahns. yahns may have to grow a supported API for that...
-# For now, we this blocks a worker thread; fortunately threads are reasonably
-# cheap on GNU/Linux...
+require_relative 'proxy_http_response'
+
class Yahns::ProxyPass # :nodoc:
- class ConnPool
- def initialize
- @mtx = Mutex.new
- @objs = []
- end
+ class ReqRes < Kgio::Socket
+ attr_writer :resbuf
- def get
- @mtx.synchronize { @objs.pop }
+ def req_start(c, req, input, chunked)
+ @hdr = @resbuf = nil
+ @yahns_client = c
+ @rrstate = input ? [ req, input, chunked ] : req
+ Thread.current[:yahns_queue].queue_add(self, Yahns::Queue::QEV_WR)
end
- def put(obj)
- @mtx.synchronize { @objs << obj }
+ # we must reinitialize the thread-local rbuf if it may get beyond the
+ # current thread
+ def detach_rbuf!
+ Thread.current[:yahns_rbuf] = ''
end
- end
- class UpstreamSocket < Kgio::Socket # :nodoc:
- attr_writer :expiry
+ def yahns_step # yahns event loop entry point
+ case req = @rrstate
+ when Kcar::Parser # reading response...
+ buf = Thread.current[:yahns_rbuf]
+ c = @yahns_client
- # called automatically by kgio_read!
- def kgio_wait_readable(timeout = nil)
- super(timeout || wait_time)
- end
+ case resbuf = @resbuf # where are we at the response?
+ when nil # common case, catch the response header in a single read
- def wait_time
- tout = @expiry ? @expiry - Yahns.now : @timeout
- raise Timeout::Error, "request timed out", [] if tout < 0
- tout
- end
+ case rv = kgio_tryread(0x2000, buf)
+ when String
+ if res = req.headers(@hdr = [], rv)
+ return c.proxy_response_start(res, rv, req, self)
+ else # ugh, big headers or tricked response
+ buf = detach_rbuf!
+ @resbuf = rv
+ end
+ # continue looping in middle "case @resbuf" loop
+ when :wait_readable
+ return rv # spurious wakeup
+ when nil then return c.proxy_err_response(502, self, nil, nil)
+ end # NOT looping here
- def readpartial(bytes, buf = Thread.current[:yahns_rbuf] ||= "")
- case rv = kgio_read!(bytes, buf)
- when String
- @expiry += @timeout # bump expiry when we succeed
- end
- rv
- end
+ when String # continue reading trickled response headers from upstream
- def req_write(buf, timeout)
- @timeout = timeout
- @expiry = Yahns.now + timeout
- case rv = kgio_trywrite(buf)
- when :wait_writable
- kgio_wait_writable(wait_time)
- when nil
- return
- when String
- buf = rv
- end while true
+ case rv = kgio_tryread(0x2000, buf)
+ when String then res = req.headers(@hdr, resbuf << rv) and break
+ when :wait_readable then return rv
+ when nil then return c.proxy_err_response(502, self, nil, nil)
+ end while true
+
+ return c.proxy_response_start(res, resbuf, req, self)
+
+ when Yahns::WbufCommon # streaming/buffering the response body
+
+ return c.proxy_response_finish(req, resbuf, self)
+
+ end while true # case @resbuf
+
+ when Array # [ (str|vec), rack.input, chunked? ]
+ send_req_body(req) # returns nil or :wait_writable
+ when String # buffered request header
+ send_req_buf(req)
+ end
+ rescue => e
+ c.proxy_err_response(502, self, e, nil)
end
- end # class UpstreamSocket
- class UpstreamResponse < Kcar::Response # :nodoc:
# Called by the Rack server at the end of a successful response
def close
- reusable = @parser.keepalive? && @parser.body_eof?
+ @hdr = @yahns_client = @rrstate = nil
super
- @pool.put(self) if reusable
- nil
end
- # req is just a string buffer of HTTP headers
- def req_write(req, timeout)
- @sock.req_write(req, timeout)
- end
+ # returns :wait_readable if complete, :wait_writable if not
+ def send_req_body(req)
+ buf, input, chunked = req
- # returns true if the socket is still alive, nil if dead
- def sock_alive?
- @reused = (:wait_readable == (@sock.kgio_tryread(1) rescue nil)) ?
- true : @sock.close
- end
+ # get the first buffered chunk or vector
+ case rv = String === buf ? kgio_trywrite(buf) : kgio_trywritev(buf)
+ when String, Array
+ buf = rv # retry inner loop
+ when :wait_writable
+ req[0] = buf
+ return :wait_writable
+ when nil
+ break # onto writing body
+ end while true
- # returns true if the socket was reused and thus retryable
- def fail_retryable?
- @sock.close
- @reused
+ buf = Thread.current[:yahns_rbuf]
+
+ # Note: input (env['rack.input']) is fully-buffered by default so
+ # we should not be waiting on a slow network resource when reading
+ # input. However, some weird configs may disable this on LANs
+
+ if chunked
+ while input.read(0x2000, buf)
+ vec = [ "#{buf.size.to_s(16)}\r\n", buf, "\r\n".freeze ]
+ case rv = kgio_trywritev(vec)
+ when Array
+ vec = rv # partial write, retry in case loop
+ when :wait_writable
+ buf = detach_rbuf!
+ req[0] = vec
+ return :wait_writable
+ when nil
+ break # continue onto reading next chunk
+ end while true
+ end
+ close_req_body(input)
+
+ # note: we do not send any trailer, they are folded into the header
+ # because this relies on full request buffering
+ send_req_buf("0\r\n\r\n".freeze)
+ else # identity request, easy:
+ while input.read(0x2000, buf)
+ case rv = kgio_trywrite(buf)
+ when String
+ buf = rv # partial write, retry in case loop
+ when :wait_writable
+ buf = detach_rbuf!
+ req[0] = buf
+ return :wait_writable
+ when nil
+ break # continue onto reading next block
+ end while true
+ end
+
+ close_req_body(input)
+ prepare_wait_readable
+ end
end
- def initialize(sock, pool)
- super(sock)
- @reused = false
- @pool = pool
+ def prepare_wait_readable
+ @rrstate = Kcar::Parser.new
+ :wait_readable # all done sending the request, wait for response
end
- end # class UpstreamResponse
- # take a responder from the pool, we'll add the object back to the
- # pool in UpstreamResponse#close
- def responder_get
- while obj = @pool.get
- return obj if obj.sock_alive?
+ def close_req_body(input)
+ case input
+ when Yahns::TeeInput, IO, StringIO
+ input.close
+ end
end
- UpstreamResponse.new(UpstreamSocket.start(@sockaddr), @pool)
- end
+ # n.b. buf must be a detached string not shared with
+ # Thread.current[:yahns_rbuf] of any thread
+ def send_req_buf(buf)
+ case rv = kgio_trywrite(buf)
+ when String
+ buf = rv # retry inner loop
+ when :wait_writable
+ @rrstate = buf
+ return :wait_writable
+ when nil
+ return prepare_wait_readable
+ end while true
+ end
+ end # class ReqRes
- def initialize(dest, timeout = 5)
+ def initialize(dest)
case dest
when %r{\Aunix:([^:]+)(?::(/.*))?\z}
path = $2
@@ -121,8 +179,6 @@ def initialize(dest, timeout = 5)
raise ArgumentError, "destination must be an HTTP URL or unix: path"
end
init_path_vars(path)
- @pool = ConnPool.new
- @timeout = timeout
end
def init_path_vars(path)
@@ -139,10 +195,15 @@ def init_path_vars(path)
end
def call(env)
- request_method = env['REQUEST_METHOD']
+ # 3-way handshake for TCP backends while we generate the request header
+ rr = ReqRes.start(@sockaddr)
+ c = env['rack.hijack'].call
+
req = Rack::Request.new(env)
- path = @path.gsub(/\$(\w+)/) { req.__send__($1) }
- req = "#{request_method} #{path} HTTP/1.1\r\n" \
+ req = @path.gsub(/\$(\w+)/) { req.__send__($1) }
+
+ # start the connection asynchronously and early so TCP can do a
+ req = "#{env['REQUEST_METHOD']} #{req} HTTP/1.1\r\n" \
"X-Forwarded-For: #{env["REMOTE_ADDR"]}\r\n"
# pass most HTTP_* headers through as-is
@@ -150,61 +211,24 @@ def call(env)
env.each do |key, val|
%r{\AHTTP_(\w+)\z} =~ key or next
key = $1
- next if %r{\A(?:VERSION|CONNECTION|KEEP_ALIVE|X_FORWARDED_FOR)} =~ key
+ # trailers are folded into the header, so do not send the Trailer:
+ # header in the request
+ next if /\A(?:VERSION|CONNECTION|KEEP_ALIVE|X_FORWARDED_FOR|TRAILER)/ =~
+ key
chunked = true if %r{\ATRANSFER_ENCODING} =~ key && val =~ /\bchunked\b/i
- key.tr!("_", "-")
+ key.tr!('_'.freeze, '-'.freeze)
req << "#{key}: #{val}\r\n"
end
# special cases which Rack does not prefix:
ctype = env["CONTENT_TYPE"] and req << "Content-Type: #{ctype}\r\n"
clen = env["CONTENT_LENGTH"] and req << "Content-Length: #{clen}\r\n"
- req << "\r\n"
-
- # get an open socket and send the headers
- ures = responder_get
- ures.req_write(req, @timeout)
+ input = chunked || (clen && clen.to_i > 0) ? env['rack.input'] : nil
- # send the request body if there was one
- send_body(env["rack.input"], ures, chunked) if chunked || clen
-
- # wait for the response here
- _, header, body = res = ures.rack
-
- # don't let the upstream Connection and Keep-Alive headers leak through
- header.delete_if do |k,_|
- k =~ /\A(?:Connection|Keep-Alive)\z/i
- end
-
- case request_method
- when "HEAD"
- # kcar doesn't know if it's a HEAD or GET response, and HEAD
- # responses have Content-Length in it which fools kcar...
- body.parser.body_bytes_left = 0
- res[1] = header.dup
- body.close # clobbers original header
- res[2] = body = []
- end
- res
+ # finally, prepare to emit the headers
+ rr.req_start(c, req << "\r\n".freeze, input, chunked)
rescue => e
- retry if ures && ures.fail_retryable? && request_method != "POST"
Yahns::Log.exception(env['rack.logger'], 'proxy_pass', e)
[ 502, [ %w(Content-Length 0), %w(Content-Type text/plain) ], [] ]
end
-
- def send_body(input, ures, chunked)
- buf = Thread.current[:yahns_rbuf] ||= ""
-
- if chunked # unlikely
- while input.read(16384, buf)
- buf.replace("#{buf.size.to_s(16)}\r\n#{buf}\r\n")
- ures.req_write(buf, @timeout)
- end
- ures.req_write("0\r\n\r\n", @timeout)
- else # common if we hit uploads
- while input.read(16384, buf)
- ures.req_write(buf, @timeout)
- end
- end
- end
end
diff --git a/lib/yahns/queue_epoll.rb b/lib/yahns/queue_epoll.rb
index da90a95..10ff607 100644
--- a/lib/yahns/queue_epoll.rb
+++ b/lib/yahns/queue_epoll.rb
@@ -27,9 +27,14 @@ def queue_add(io, flags)
epoll_ctl(Epoll::CTL_ADD, io, flags)
end
+ def queue_mod(io, flags)
+ epoll_ctl(Epoll::CTL_MOD, io, flags)
+ end
+
def thr_init
Thread.current[:yahns_rbuf] = ""
Thread.current[:yahns_fdmap] = @fdmap
+ Thread.current[:yahns_queue] = self
end
# returns an array of infinitely running threads
diff --git a/lib/yahns/queue_kqueue.rb b/lib/yahns/queue_kqueue.rb
index 4e5c133..c0667b7 100644
--- a/lib/yahns/queue_kqueue.rb
+++ b/lib/yahns/queue_kqueue.rb
@@ -36,9 +36,14 @@ def queue_add(io, flags)
kevent(Kevent[io.fileno, flags, fflags, 0, 0, io])
end
+ def queue_mod(io, flags)
+ kevent(Kevent[io.fileno, flags, ADD_ONESHOT, 0, 0, io])
+ end
+
def thr_init
Thread.current[:yahns_rbuf] = ""
Thread.current[:yahns_fdmap] = @fdmap
+ Thread.current[:yahns_queue] = self
end
# returns an array of infinitely running threads
diff --git a/lib/yahns/wbuf.rb b/lib/yahns/wbuf.rb
index fa39b9b..42776cf 100644
--- a/lib/yahns/wbuf.rb
+++ b/lib/yahns/wbuf.rb
@@ -30,6 +30,7 @@
class Yahns::Wbuf # :nodoc:
include Yahns::WbufCommon
attr_reader :busy
+ attr_reader :wbuf_persist
def initialize(body, persist, tmpdir, busy)
@tmpio = nil
@@ -71,7 +72,7 @@ def wbuf_write(c, buf)
# we're all caught up, try to prevent dirty data from getting flushed
# to disk if we can help it.
- @tmpio = @tmpio.close
+ wbuf_abort
@sf_offset = 0
@busy = false
nil
@@ -79,7 +80,11 @@ def wbuf_write(c, buf)
# called by last wbuf_flush
def wbuf_close(client)
- @tmpio = @tmpio.close if @tmpio
+ wbuf_abort
wbuf_close_common(client)
end
+
+ def wbuf_abort
+ @tmpio = @tmpio.close if @tmpio
+ end
end
diff --git a/test/test_proxy_pass.rb b/test/test_proxy_pass.rb
index 5398b29..5bf8722 100644
--- a/test/test_proxy_pass.rb
+++ b/test/test_proxy_pass.rb
@@ -6,13 +6,45 @@
class TestProxyPass < Testcase
ENV["N"].to_i > 1 and parallelize_me!
include ServerHelper
+ OMFG = 'a' * (1024 * 1024 * 32)
class ProxiedApp
def call(env)
h = [ %w(Content-Length 3), %w(Content-Type text/plain) ]
case env['REQUEST_METHOD']
when 'GET'
- [ 200, h, [ "hi\n"] ]
+ case env['PATH_INFO']
+ when '/giant-body'
+ h = [ %W(Content-Length #{OMFG.size}), %w(Content-Type text/plain) ]
+ [ 200, h, [ OMFG ] ]
+ when %r{\A/slow-headers-(\d+(?:\.\d+)?)\z}
+ delay = $1.to_f
+ io = env['rack.hijack'].call
+ [ "HTTP/1.1 200 OK\r\n",
+ "Content-Length: 7\r\n",
+ "Content-Type: text/PAIN\r\n",
+ "connection: close\r\n\r\n",
+ "HIHIHI!"
+ ].each do |l|
+ io.write(l)
+ sleep delay
+ end
+ io.close
+ when %r{\A/chunky-slow-(\d+(?:\.\d+)?)\z}
+ delay = $1.to_f
+ chunky = Object.new
+ chunky.instance_variable_set(:@delay, delay)
+ def chunky.each
+ sleep @delay
+ yield "3\r\nHI!\r\n"
+ sleep @delay
+ yield "0\r\n\r\n"
+ end
+ h = [ %w(Content-Type text/pain), %w(Transfer-Encoding chunked) ]
+ [ 200, h, chunky ]
+ else
+ [ 200, h, [ "hi\n"] ]
+ end
when 'HEAD'
[ 200, h, [] ]
when 'PUT'
@@ -72,6 +104,7 @@ def test_unix_socket_no_path
stderr_path err.path
end
end
+
Net::HTTP.start(host, port) do |http|
res = http.request(Net::HTTP::Get.new('/f00'))
assert_equal 200, res.code.to_i
@@ -109,7 +142,7 @@ def test_proxy_pass
require 'yahns/proxy_pass'
@srv2.close
cfg.instance_eval do
- app(:rack, Yahns::ProxyPass.new("http://#{host2}:#{port2}/")) do
+ app(:rack, Yahns::ProxyPass.new("http://#{host2}:#{port2}")) do
listen "#{host}:#{port}"
end
stderr_path err.path
@@ -126,6 +159,8 @@ def test_proxy_pass
end
end
+ check_pipelining(host, port)
+
gplv3 = File.open('COPYING')
Net::HTTP.start(host, port) do |http|
@@ -138,7 +173,7 @@ def test_proxy_pass
assert_equal n, res['Content-Length'].to_i
assert_nil res.body
- # chunked encoding
+ # chunked encoding (PUT)
req = Net::HTTP::Put.new('/')
req.body_stream = gplv3
req.content_type = 'application/octet-stream'
@@ -148,6 +183,18 @@ def test_proxy_pass
assert_equal gplv3.read, res.body
assert_equal 201, res.code.to_i
+ # chunked encoding (GET)
+ res = http.request(Net::HTTP::Get.new('/chunky-slow-0.1'))
+ assert_equal 200, res.code.to_i
+ assert_equal 'chunked', res['Transfer-encoding']
+ assert_equal "HI!", res.body
+
+ # slow headers (GET)
+ res = http.request(Net::HTTP::Get.new('/slow-headers-0.01'))
+ assert_equal 200, res.code.to_i
+ assert_equal 'text/PAIN', res['Content-Type']
+ assert_equal 'HIHIHI!', res.body
+
# normal content-length
gplv3.rewind
req = Net::HTTP::Put.new('/')
@@ -158,10 +205,77 @@ def test_proxy_pass
gplv3.rewind
assert_equal gplv3.read, res.body
assert_equal 201, res.code.to_i
+
+ # giant body
+ res = http.request(Net::HTTP::Get.new('/giant-body'))
+ assert_equal 200, res.code.to_i
+ assert_equal OMFG, res.body
+ end
+
+ # ensure we do not chunk responses back to an HTTP/1.0 client even if
+ # the proxy <-> upstream connection is chunky
+ %w(0 0.1).each do |delay|
+ begin
+ h10 = TCPSocket.new(host, port)
+ h10.write "GET /chunky-slow-#{delay} HTTP/1.0\r\n\r\n"
+ res = Timeout.timeout(60) { h10.read }
+ assert_match %r{^Connection: close\r\n}, res
+ assert_match %r{^Content-Type: text/pain\r\n}, res
+ assert_match %r{\r\n\r\nHI!\z}, res
+ refute_match %r{^Transfer-Encoding:}, res
+ refute_match %r{\r0\r\n}, res
+ ensure
+ h10.close
+ end
end
ensure
gplv3.close if gplv3
quit_wait pid
quit_wait pid2
end
+
+ def check_pipelining(host, port)
+ pl = TCPSocket.new(host, port)
+ r1 = ''
+ r2 = ''
+ r3 = ''
+ Timeout.timeout(60) do
+ pl.write "GET / HTTP/1.1\r\nHost: example.com\r\n\r\nGET /"
+ until r1 =~ /hi\n/
+ r1 << pl.readpartial(666)
+ end
+
+ pl.write "chunky-slow-0.1 HTTP/1.1\r\nHost: example.com\r\n\r\nP"
+ until r2 =~ /\r\n3\r\nHI!\r\n0\r\n\r\n/
+ r2 << pl.readpartial(666)
+ end
+
+ if false
+ pl.write "ET / HTTP/1.1\r\nHost: example.com\r\n\r\n"
+ until r3 =~ /hi\n/
+ r3 << pl.readpartial(666)
+ end
+ else
+ pl.write "UT / HTTP/1.1\r\nHost: example.com\r\n"
+ pl.write "Transfer-Encoding: chunked\r\n\r\n"
+ pl.write "6\r\nchunky\r\n"
+ pl.write "0\r\n\r\n"
+
+ until r3 =~ /chunky/
+ r3 << pl.readpartial(666)
+ end
+ end
+ end
+ r1 = r1.split("\r\n").reject { |x| x =~ /^Date: / }
+ r2 = r2.split("\r\n").reject { |x| x =~ /^Date: / }
+ assert_equal 'HTTP/1.1 200 OK', r1[0]
+ assert_equal 'HTTP/1.1 200 OK', r2[0]
+ assert_match %r{\r\n\r\nchunky\z}, r3
+ assert_match %r{\AHTTP/1\.1 201 Created\r\n}, r3
+ rescue => e
+ warn [ e.class, e.message ].inspect
+ warn e.backtrace.join("\n")
+ ensure
+ pl.close
+ end
end
--
EW
^ permalink raw reply related [flat|nested] only message in thread
only message in thread, other threads:[~2015-04-03 1:53 UTC | newest]
Thread overview: (only message) (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2015-04-03 1:53 [PATCH] proxy_pass: rewrite to be async, using rack.hijack Eric Wong
Code repositories for project(s) associated with this public inbox
https://yhbt.net/yahns.git/
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).