yahns.git  about / heads / tags
sleepy, multi-threaded, non-blocking application server for Ruby
blob eb48647d914057b2d289ece5c2246f08337b3e71 8995 bytes (raw)
$ git show maint:lib/yahns/proxy_pass.rb	# shows this blob on the CLI

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
 
# -*- encoding: binary -*-
# Copyright (C) 2013-2016 all contributors <yahns-public@yhbt.net>
# License: GPL-3.0+ (https://www.gnu.org/licenses/gpl-3.0.txt)
# frozen_string_literal: true
require 'socket'
require 'kgio'
require 'kcar' # gem install kcar
require 'rack/request'
require 'timeout'

require_relative 'proxy_http_response'

class Yahns::ProxyPass # :nodoc:
  class ReqRes < Kgio::Socket # :nodoc:
    attr_writer :resbuf
    attr_accessor :proxy_trailers

    def req_start(c, req, input, chunked)
      @hdr = @resbuf = nil
      @yahns_client = c
      @rrstate = input ? [ req, input, chunked ] : req
      Thread.current[:yahns_queue].queue_add(self, Yahns::Queue::QEV_WR)
    end

    # we must reinitialize the thread-local rbuf if it may get beyond the
    # current thread
    def detach_rbuf!
      Thread.current[:yahns_rbuf] = ''.dup
    end

    def yahns_step # yahns event loop entry point
      c = @yahns_client
      case req = @rrstate
      when Kcar::Parser # reading response...
        buf = Thread.current[:yahns_rbuf]

        case resbuf = @resbuf # where are we at the response?
        when nil # common case, catch the response header in a single read

          case rv = kgio_tryread(0x2000, buf)
          when String
            if res = req.headers(@hdr = [], rv)
              return c.proxy_response_start(res, rv, req, self)
            else # ugh, big headers or tricked response
              buf = detach_rbuf!
              @resbuf = rv
            end
            # continue looping in middle "case @resbuf" loop
          when :wait_readable
            return rv # spurious wakeup
          when nil then return c.proxy_err_response(502, self, nil, nil)
          end # NOT looping here

        when String # continue reading trickled response headers from upstream

          case rv = kgio_tryread(0x2000, buf)
          when String then res = req.headers(@hdr, resbuf << rv) and break
          when :wait_readable then return rv
          when nil then return c.proxy_err_response(502, self, nil, nil)
          end while true

          return c.proxy_response_start(res, resbuf, req, self)

        when Yahns::WbufCommon # streaming/buffering the response body

          # we assign wbuf for rescue below:
          return c.proxy_response_finish(req, wbuf = resbuf, self)

        end while true # case @resbuf

      when Array # [ (str|vec), rack.input, chunked? ]
        send_req_body(req) # returns nil or :wait_writable
      when String # buffered request header
        send_req_buf(req)
      end
    rescue => e
      # avoid polluting logs with a giant backtrace when the problem isn't
      # fixable in code.
      case e
      when Errno::ECONNREFUSED, Errno::ECONNRESET, Errno::EPIPE
        e.set_backtrace([])
      end
      c.proxy_err_response(502, self, e, wbuf)
    end

    # returns :wait_readable if complete, :wait_writable if not
    def send_req_body(req)
      buf, input, chunked = req

      # get the first buffered chunk or vector
      case rv = String === buf ? kgio_trywrite(buf) : kgio_trywritev(buf)
      when String, Array
        buf = rv # retry inner loop
      when :wait_writable
        req[0] = buf
        return :wait_writable
      when nil
        break # onto writing body
      end while true

      buf = Thread.current[:yahns_rbuf]

      # Note: input (env['rack.input']) is fully-buffered by default so
      # we should not be waiting on a slow network resource when reading
      # input.  However, some weird configs may disable this on LANs

      if chunked
        while input.read(0x2000, buf)
          vec = [ "#{buf.size.to_s(16)}\r\n", buf, "\r\n".freeze ]
          case rv = kgio_trywritev(vec)
          when Array
            vec = rv # partial write, retry in case loop
          when :wait_writable
            detach_rbuf!
            req[0] = vec
            return :wait_writable
          when nil
            break # continue onto reading next chunk
          end while true
        end
        close_req_body(input)

        # note: we do not send any trailer, they are folded into the header
        # because this relies on full request buffering
        send_req_buf("0\r\n\r\n".freeze)
        # prepare_wait_readable already called by send_req_buf
      else # identity request, easy:
        while input.read(0x2000, buf)
          case rv = kgio_trywrite(buf)
          when String
            buf = rv # partial write, retry in case loop
          when :wait_writable
            detach_rbuf!
            req[0] = buf
            return :wait_writable
          when nil
            break # continue onto reading next block
          end while true
        end

        close_req_body(input)
        prepare_wait_readable
      end
    rescue Errno::EPIPE, Errno::ECONNRESET, Errno::ENOTCONN
      # no more reading off the client socket, just prepare to forward
      # the rejection response from the upstream (if any)
      @yahns_client.to_io.shutdown(Socket::SHUT_RD)
      prepare_wait_readable
    end

    def prepare_wait_readable
      @rrstate = Kcar::Parser.new
      :wait_readable # all done sending the request, wait for response
    end

    def close_req_body(input)
      # we cannot use respond_to?(:close) here since Rack::Lint::InputWrapper
      # tries to prevent that (and hijack means all Rack specs go out the door)
      case input
      when Yahns::TeeInput, IO
        input.close
      end
    end

    # n.b. buf must be a detached string not shared with
    # Thread.current[:yahns_rbuf] of any thread
    def send_req_buf(buf)
      case rv = kgio_trywrite(buf)
      when String
        buf = rv # retry inner loop
      when :wait_writable
        @rrstate = buf
        return :wait_writable
      when nil
        return prepare_wait_readable
      end while true
    end
  end # class ReqRes

  def initialize(dest, opts = {})
    case dest
    when %r{\Aunix:([^:]+)(?::(/.*))?\z}
      path = $2
      @sockaddr = Socket.sockaddr_un($1)
    when %r{\Ahttp://([^/]+)(/.*)?\z}
      path = $2
      host, port = $1.split(':')
      @sockaddr = Socket.sockaddr_in(port || 80, host)
    else
      raise ArgumentError, "destination must be an HTTP URL or unix: path"
    end
    @response_headers = opts[:response_headers] || {}

    # It's wrong to send the backend Server tag through.  Let users say
    # { "Server => "yahns" } if they want to advertise for us, but don't
    # advertise by default (for security)
    @response_headers['Server'] ||= :ignore
    init_path_vars(path)
  end

  def init_path_vars(path)
    path ||= '$fullpath'
    # methods from Rack::Request we want:
    allow = %w(fullpath host_with_port host port url path)
    want = path.scan(/\$(\w+)/).flatten! || []
    diff = want - allow
    diff.empty? or
             raise ArgumentError, "vars not allowed: #{diff.uniq.join(' ')}"

    # kill leading slash just in case...
    @path = path.gsub(%r{\A/(\$(?:fullpath|path))}, '\1')
  end

  def call(env)
    # 3-way handshake for TCP backends while we generate the request header
    rr = ReqRes.start(@sockaddr)
    c = env['rack.hijack'].call

    req = Rack::Request.new(env)
    req = @path.gsub(/\$(\w+)/) { req.__send__($1) }

    # start the connection asynchronously and early so TCP can do a
    case ver = env['HTTP_VERSION']
    when 'HTTP/1.1' # leave alone, response may be chunked
    else # no chunking for HTTP/1.0 and HTTP/0.9
      ver = 'HTTP/1.0'.freeze
    end

    addr = env['REMOTE_ADDR']
    xff = env['HTTP_X_FORWARDED_FOR']
    xff = xff =~ /\S/ ? "#{xff}, #{addr}" : addr
    req = "#{env['REQUEST_METHOD']} #{req} #{ver}\r\n" \
          "X-Forwarded-Proto: #{env['rack.url_scheme']}\r\n" \
          "X-Forwarded-For: #{xff}\r\n".dup

    # pass most HTTP_* headers through as-is
    chunked = false
    env.each do |key, val|
      %r{\AHTTP_(\w+)\z} =~ key or next
      key = $1
      # trailers are folded into the header, so do not send the Trailer:
      # header in the request
      next if /\A(?:VERSION|CONNECTION|KEEP_ALIVE|X_FORWARDED_FOR|TRAILER)/ =~
         key
      'TRANSFER_ENCODING'.freeze == key && val =~ /\bchunked\b/i and
        chunked = true
      key.tr!('_'.freeze, '-'.freeze)
      req << "#{key}: #{val}\r\n"
    end

    # special cases which Rack does not prefix:
    ctype = env["CONTENT_TYPE"] and req << "Content-Type: #{ctype}\r\n"
    clen = env["CONTENT_LENGTH"] and req << "Content-Length: #{clen}\r\n"
    input = chunked || (clen && clen.to_i > 0) ? env['rack.input'] : nil
    env['yahns.proxy_pass.response_headers'] = @response_headers

    # finally, prepare to emit the headers
    rr.req_start(c, req << "\r\n".freeze, input, chunked)

    # this probably breaks fewer middlewares than returning whatever else...
    [ 500, [], [] ]
  rescue => e
    Yahns::Log.exception(env['rack.logger'], 'proxy_pass', e)
    [ 502, { 'Content-Length' => '0', 'Content-Type' => 'text/plain' }, [] ]
  end
end

git clone git://yhbt.net/yahns.git
git clone https://yhbt.net/yahns.git