yahns Ruby server user/dev discussion
 help / color / mirror / code / Atom feed
blob e3ba7f03269933866f3ad43da193333556844084 7774 bytes (raw)
name: lib/yahns/proxy_pass.rb 	 # note: path name is non-authoritative(*)

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
 
# -*- encoding: binary -*-
# Copyright (C) 2013-2015 all contributors <yahns-public@yhbt.net>
# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
require 'socket'
require 'kgio'
require 'kcar' # gem install kcar
require 'rack/request'
require 'timeout'

require_relative 'proxy_http_response'

class Yahns::ProxyPass # :nodoc:
  class ReqRes < Kgio::Socket
    attr_writer :resbuf
    attr_accessor :proxy_trailers

    def req_start(c, req, input, chunked)
      @hdr = @resbuf = nil
      @yahns_client = c
      @rrstate = input ? [ req, input, chunked ] : req
      Thread.current[:yahns_queue].queue_add(self, Yahns::Queue::QEV_WR)
    end

    # we must reinitialize the thread-local rbuf if it may get beyond the
    # current thread
    def detach_rbuf!
      Thread.current[:yahns_rbuf] = ''
    end

    def yahns_step # yahns event loop entry point
      case req = @rrstate
      when Kcar::Parser # reading response...
        buf = Thread.current[:yahns_rbuf]
        c = @yahns_client

        case resbuf = @resbuf # where are we at the response?
        when nil # common case, catch the response header in a single read

          case rv = kgio_tryread(0x2000, buf)
          when String
            if res = req.headers(@hdr = [], rv)
              return c.proxy_response_start(res, rv, req, self)
            else # ugh, big headers or tricked response
              buf = detach_rbuf!
              @resbuf = rv
            end
            # continue looping in middle "case @resbuf" loop
          when :wait_readable
            return rv # spurious wakeup
          when nil then return c.proxy_err_response(502, self, nil, nil)
          end # NOT looping here

        when String # continue reading trickled response headers from upstream

          case rv = kgio_tryread(0x2000, buf)
          when String then res = req.headers(@hdr, resbuf << rv) and break
          when :wait_readable then return rv
          when nil then return c.proxy_err_response(502, self, nil, nil)
          end while true

          return c.proxy_response_start(res, resbuf, req, self)

        when Yahns::WbufCommon # streaming/buffering the response body

          return c.proxy_response_finish(req, resbuf, self)

        end while true # case @resbuf

      when Array # [ (str|vec), rack.input, chunked? ]
        send_req_body(req) # returns nil or :wait_writable
      when String # buffered request header
        send_req_buf(req)
      end
    rescue => e
      c.proxy_err_response(502, self, e, nil)
    end

    # Called by the Rack server at the end of a successful response
    def close
      @hdr = @yahns_client = @rrstate = nil
      super
    end

    # returns :wait_readable if complete, :wait_writable if not
    def send_req_body(req)
      buf, input, chunked = req

      # get the first buffered chunk or vector
      case rv = String === buf ? kgio_trywrite(buf) : kgio_trywritev(buf)
      when String, Array
        buf = rv # retry inner loop
      when :wait_writable
        req[0] = buf
        return :wait_writable
      when nil
        break # onto writing body
      end while true

      buf = Thread.current[:yahns_rbuf]

      # Note: input (env['rack.input']) is fully-buffered by default so
      # we should not be waiting on a slow network resource when reading
      # input.  However, some weird configs may disable this on LANs

      if chunked
        while input.read(0x2000, buf)
          vec = [ "#{buf.size.to_s(16)}\r\n", buf, "\r\n".freeze ]
          case rv = kgio_trywritev(vec)
          when Array
            vec = rv # partial write, retry in case loop
          when :wait_writable
            detach_rbuf!
            req[0] = vec
            return :wait_writable
          when nil
            break # continue onto reading next chunk
          end while true
        end
        close_req_body(input)

        # note: we do not send any trailer, they are folded into the header
        # because this relies on full request buffering
        send_req_buf("0\r\n\r\n".freeze)
        # prepare_wait_readable already called by send_req_buf
      else # identity request, easy:
        while input.read(0x2000, buf)
          case rv = kgio_trywrite(buf)
          when String
            buf = rv # partial write, retry in case loop
          when :wait_writable
            detach_rbuf!
            req[0] = buf
            return :wait_writable
          when nil
            break # continue onto reading next block
          end while true
        end

        close_req_body(input)
        prepare_wait_readable
      end
    end

    def prepare_wait_readable
      @rrstate = Kcar::Parser.new
      :wait_readable # all done sending the request, wait for response
    end

    def close_req_body(input)
      case input
      when Yahns::TeeInput, IO, StringIO
        input.close
      end
    end

    # n.b. buf must be a detached string not shared with
    # Thread.current[:yahns_rbuf] of any thread
    def send_req_buf(buf)
      case rv = kgio_trywrite(buf)
      when String
        buf = rv # retry inner loop
      when :wait_writable
        @rrstate = buf
        return :wait_writable
      when nil
        return prepare_wait_readable
      end while true
    end
  end # class ReqRes

  def initialize(dest)
    case dest
    when %r{\Aunix:([^:]+)(?::(/.*))?\z}
      path = $2
      @sockaddr = Socket.sockaddr_un($1)
    when %r{\Ahttp://([^/]+)(/.*)?\z}
      path = $2
      host, port = $1.split(':')
      @sockaddr = Socket.sockaddr_in(port || 80, host)
    else
      raise ArgumentError, "destination must be an HTTP URL or unix: path"
    end
    init_path_vars(path)
  end

  def init_path_vars(path)
    path ||= '$fullpath'
    # methods from Rack::Request we want:
    allow = %w(fullpath host_with_port host port url path)
    want = path.scan(/\$(\w+)/).flatten! || []
    diff = want - allow
    diff.empty? or
             raise ArgumentError, "vars not allowed: #{diff.uniq.join(' ')}"

    # kill leading slash just in case...
    @path = path.gsub(%r{\A/(\$(?:fullpath|path))}, '\1')
  end

  def call(env)
    # 3-way handshake for TCP backends while we generate the request header
    rr = ReqRes.start(@sockaddr)
    c = env['rack.hijack'].call

    req = Rack::Request.new(env)
    req = @path.gsub(/\$(\w+)/) { req.__send__($1) }

    # start the connection asynchronously and early so TCP can do a
    case ver = env['HTTP_VERSION']
    when 'HTTP/1.1' # leave alone, response may be chunked
    else # no chunking for HTTP/1.0 and HTTP/0.9
      ver = 'HTTP/1.0'.freeze
    end

    req = "#{env['REQUEST_METHOD']} #{req} #{ver}\r\n" \
          "X-Forwarded-For: #{env["REMOTE_ADDR"]}\r\n"

    # pass most HTTP_* headers through as-is
    chunked = false
    env.each do |key, val|
      %r{\AHTTP_(\w+)\z} =~ key or next
      key = $1
      # trailers are folded into the header, so do not send the Trailer:
      # header in the request
      next if /\A(?:VERSION|CONNECTION|KEEP_ALIVE|X_FORWARDED_FOR|TRAILER)/ =~
         key
      chunked = true if %r{\ATRANSFER_ENCODING} =~ key && val =~ /\bchunked\b/i
      key.tr!('_'.freeze, '-'.freeze)
      req << "#{key}: #{val}\r\n"
    end

    # special cases which Rack does not prefix:
    ctype = env["CONTENT_TYPE"] and req << "Content-Type: #{ctype}\r\n"
    clen = env["CONTENT_LENGTH"] and req << "Content-Length: #{clen}\r\n"
    input = chunked || (clen && clen.to_i > 0) ? env['rack.input'] : nil

    # finally, prepare to emit the headers
    rr.req_start(c, req << "\r\n".freeze, input, chunked)
  rescue => e
    Yahns::Log.exception(env['rack.logger'], 'proxy_pass', e)
    [ 502, [ %w(Content-Length 0), %w(Content-Type text/plain) ], [] ]
  end
end

debug log:

solving e3ba7f0 ...
found e3ba7f0 in https://yhbt.net/yahns.git/

(*) Git path names are given by the tree(s) the blob belongs to.
    Blobs themselves have no identifier aside from the hash of its contents.^

Code repositories for project(s) associated with this public inbox

	https://yhbt.net/yahns.git/

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).