about summary refs log tree commit homepage
path: root/extras
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2015-03-14 01:39:23 +0000
committerEric Wong <e@80x24.org>2015-03-14 02:57:04 +0000
commit7542c8365613d0104b8a3171418bb8c7f1a518cb (patch)
treea706db76615c9ecb48f14c73c9d315fc5b0ab7a4 /extras
parent001ad1c290a89e6d3c7e4d66283197bd003f367a (diff)
downloadyahns-7542c8365613d0104b8a3171418bb8c7f1a518cb.tar.gz
This will rely on rack.hijack in the future to support
asynchronous execution without tying up a thread when waiting
for upstreams.  For now, this allows simpler code with fewer
checks and the use of monotonic time on newer versions of Ruby.
Diffstat (limited to 'extras')
-rw-r--r--extras/proxy_pass.rb224
1 files changed, 9 insertions, 215 deletions
diff --git a/extras/proxy_pass.rb b/extras/proxy_pass.rb
index 00adf18..af6fb7c 100644
--- a/extras/proxy_pass.rb
+++ b/extras/proxy_pass.rb
@@ -1,215 +1,9 @@
-# -*- encoding: binary -*-
-# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> and all contributors
-# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
-require 'time'
-require 'socket'
-require 'kgio'
-require 'kcar' # gem install kcar
-require 'rack/request'
-require 'thread'
-require 'timeout'
-
-# Totally synchronous and Rack 1.1-compatible, this will probably be rewritten.
-# to take advantage of rack.hijack and use the non-blocking I/O facilities
-# in yahns.  yahns may have to grow a supported API for that...
-# For now, we this blocks a worker thread; fortunately threads are reasonably
-# cheap on GNU/Linux...
-# This is totally untested but currently doesn't serve anything important.
-class ProxyPass # :nodoc:
-  class ConnPool
-    def initialize
-      @mtx = Mutex.new
-      @objs = []
-    end
-
-    def get
-      @mtx.synchronize { @objs.pop }
-    end
-
-    def put(obj)
-      @mtx.synchronize { @objs << obj }
-    end
-  end
-
-  class UpstreamSocket < Kgio::Socket # :nodoc:
-    attr_writer :expiry
-
-    # called automatically by kgio_read!
-    def kgio_wait_readable(timeout = nil)
-      super(timeout || wait_time)
-    end
-
-    def wait_time
-      tout = @expiry ? @expiry - Time.now : @timeout
-      raise Timeout::Error, "request timed out", [] if tout < 0
-      tout
-    end
-
-    def readpartial(bytes, buf = Thread.current[:proxy_pass_buf] ||= "")
-      case rv = kgio_read!(bytes, buf)
-      when String
-        @expiry += @timeout # bump expiry when we succeed
-      end
-      rv
-    end
-
-    def req_write(buf, timeout)
-      @timeout = timeout
-      @expiry = Time.now + timeout
-      case rv = kgio_trywrite(buf)
-      when :wait_writable
-        kgio_wait_writable(wait_time)
-      when nil
-        return
-      when String
-        buf = rv
-      end while true
-    end
-  end # class UpstreamSocket
-
-  class UpstreamResponse < Kcar::Response # :nodoc:
-    # Called by the Rack server at the end of a successful response
-    def close
-      reusable = @parser.keepalive? && @parser.body_eof?
-      super
-      @pool.put(self) if reusable
-      nil
-    end
-
-    # req is just a string buffer of HTTP headers
-    def req_write(req, timeout)
-      @sock.req_write(req, timeout)
-    end
-
-    # returns true if the socket is still alive, nil if dead
-    def sock_alive?
-      @reused = (:wait_readable == (@sock.kgio_tryread(1) rescue nil)) ?
-                true : @sock.close
-    end
-
-    # returns true if the socket was reused and thus retryable
-    def fail_retryable?
-      @sock.close
-      @reused
-    end
-
-    def initialize(sock, pool)
-      super(sock)
-      @reused = false
-      @pool = pool
-    end
-  end # class UpstreamResponse
-
-  # take a responder from the pool, we'll add the object back to the
-  # pool in UpstreamResponse#close
-  def responder_get
-    while obj = @pool.get
-      return obj if obj.sock_alive?
-    end
-
-    UpstreamResponse.new(UpstreamSocket.start(@sockaddr), @pool)
-  end
-
-  def initialize(dest, timeout = 5)
-    case dest
-    when %r{\Aunix:([^:]+)(?::(/.*))?\z}
-      path = $2
-      @sockaddr = Socket.sockaddr_un($1)
-    when %r{\Ahttp://([^/]+)(/.*)?\z}
-      path = $2
-      host, port = $1.split(':')
-      @sockaddr = Socket.sockaddr_in(port || 80, host)
-    else
-      raise ArgumentError, "destination must be an HTTP URL or unix: path"
-    end
-    init_path_vars(path)
-    @pool = ConnPool.new
-    @timeout = timeout
-  end
-
-  def init_path_vars(path)
-    path ||= '$fullpath'
-    # methods from Rack::Request we want:
-    allow = %w(fullpath host_with_port host port url path)
-    want = path.scan(/\$(\w+)/).flatten! || []
-    diff = want - allow
-    diff.empty? or
-             raise ArgumentError, "vars not allowed: #{diff.uniq.join(' ')}"
-
-    # kill leading slash just in case...
-    @path = path.gsub(%r{\A/(\$(?:fullpath|path))}, '\1')
-  end
-
-  def call(env)
-    request_method = env['REQUEST_METHOD']
-    req = Rack::Request.new(env)
-    path = @path.gsub(/\$(\w+)/) { req.__send__($1) }
-    req = "#{request_method} #{path} HTTP/1.1\r\n" \
-          "X-Forwarded-For: #{env["REMOTE_ADDR"]}\r\n"
-
-    # pass most HTTP_* headers through as-is
-    chunked = false
-    env.each do |key, val|
-      %r{\AHTTP_(\w+)\z} =~ key or next
-      key = $1
-      next if %r{\A(?:VERSION|CONNECTION|KEEP_ALIVE|X_FORWARDED_FOR)} =~ key
-      chunked = true if %r{\ATRANSFER_ENCODING} =~ key && val =~ /\bchunked\b/i
-      key.tr!("_", "-")
-      req << "#{key}: #{val}\r\n"
-    end
-
-    # special cases which Rack does not prefix:
-    ctype = env["CONTENT_TYPE"] and req << "Content-Type: #{ctype}\r\n"
-    clen = env["CONTENT_LENGTH"] and req << "Content-Length: #{clen}\r\n"
-    req << "\r\n"
-
-    # get an open socket and send the headers
-    ures = responder_get
-    ures.req_write(req, @timeout)
-
-    # send the request body if there was one
-    send_body(env["rack.input"], ures, chunked) if chunked || clen
-
-    # wait for the response here
-    _, header, body = res = ures.rack
-
-    # don't let the upstream Connection and Keep-Alive headers leak through
-    header.delete_if do |k,_|
-      k =~ /\A(?:Connection|Keep-Alive)\z/i
-    end
-
-    case request_method
-    when "HEAD"
-      # kcar doesn't know if it's a HEAD or GET response, and HEAD
-      # responses have Content-Length in it which fools kcar...
-      body.parser.body_bytes_left = 0
-      res[1] = header.dup
-      body.close # clobbers original header
-      res[2] = body = []
-    end
-    res
-  rescue => e
-    retry if ures && ures.fail_retryable? && request_method != "POST"
-    if defined?(Yahns::Log)
-      logger = env['rack.logger'] and
-        Yahns::Log.exception(logger, 'proxy_pass', e)
-    end
-    [ 502, [ %w(Content-Length 0), %w(Content-Type text/plain) ], [] ]
-  end
-
-  def send_body(input, ures, chunked)
-    buf = Thread.current[:proxy_pass_buf] ||= ""
-
-    if chunked # unlikely
-      while input.read(16384, buf)
-        buf.replace("#{buf.size.to_s(16)}\r\n#{buf}\r\n")
-        ures.req_write(buf, @timeout)
-      end
-      ures.req_write("0\r\n\r\n", @timeout)
-    else # common if we hit uploads
-      while input.read(16384, buf)
-        ures.req_write(buf, @timeout)
-      end
-    end
-  end
-end
+# compatibility class
+warn <<EOF
+ProxyPass is no longer a generic Rack app but relies on yahns internals
+via rack.hijack.  Update require statements to point to 'yahns/proxy_pass'
+and change use the 'Yahns::ProxyPass' constant instead.
+EOF
+
+require 'yahns/proxy_pass'
+ProxyPass = Yahns::ProxyPass