yahns.git  about / heads / tags
sleepy, multi-threaded, non-blocking application server for Ruby
blob ce55525c0591e787d0fe7eaeea840dc2823c5bf6 6385 bytes (raw)
$ git show v0.0.1:lib/yahns/http_client.rb	# shows this blob on the CLI

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
 
# -*- encoding: binary -*-
# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> and all contributors
# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
class Yahns::HttpClient < Kgio::Socket # :nodoc:
  NULL_IO = StringIO.new("")

  # FIXME: we shouldn't have this at all
  Unicorn::HttpParser.keepalive_requests = 0xffffffff

  include Yahns::HttpResponse
  include Yahns::ClientExpire
  QEV_FLAGS = Yahns::Queue::QEV_RD # used by acceptor
  HTTP_RESPONSE_START = [ 'HTTP', '/1.1 ' ]

  # A frozen format for this is about 15% faster (note from Mongrel)
  REMOTE_ADDR = 'REMOTE_ADDR'.freeze
  RACK_INPUT = 'rack.input'.freeze
  RACK_HIJACK = 'rack.hijack'.freeze
  RACK_HIJACK_IO = "rack.hijack_io".freeze

  # called from acceptor thread
  def yahns_init
    @hs = Unicorn::HttpRequest.new
    @response_start_sent = false
    @state = :headers # :body, :trailers, :pipelined, Wbuf, StreamFile
    @input = nil
  end

  # use if writes are deferred by buffering, this return value goes to
  # the main epoll/kqueue worker loop
  # returns :wait_readable, :wait_writable, or nil
  def step_write
    case rv = @state.wbuf_flush(self)
    when :wait_writable, :wait_readable
      return rv # tell epoll/kqueue to wait on this more
    when :ignore # :ignore on hijack
      @state = :ignore
      return :ignore
    when Yahns::StreamFile
      @state = rv # continue looping
    when true, false # done
      return http_response_done(rv)
    else
      raise "BUG: #{@state.inspect}#wbuf_flush returned #{rv.inspect}"
    end while true
  end

  # used only with "input_buffering true"
  def mkinput_preread
    k = self.class
    len = @hs.content_length
    mbs = k.client_max_body_size
    if mbs && len && len > mbs
      raise Unicorn::RequestEntityTooLargeError,
            "Content-Length:#{len} too large (>#{mbs})", []
    end
    @state = :body
    @input = k.tmpio_for(len)
    rbuf = Thread.current[:yahns_rbuf]
    @hs.filter_body(rbuf, @hs.buf)
    @input.write(rbuf)
  end

  def input_ready
    empty_body = 0 == @hs.content_length
    k = self.class
    case k.input_buffering
    when true
      # common case is an empty body
      return NULL_IO if empty_body

      # content_length is nil (chunked) or len > 0
      mkinput_preread # keep looping
      false
    else # :lazy, false
      empty_body ? NULL_IO : (@input = k.mkinput(self, @hs))
    end
  end

  # the main entry point of the epoll/kqueue worker loop
  def yahns_step
    # always write unwritten data first if we have any
    return step_write if Yahns::WbufCommon === @state

    # only read if we had nothing to write in this event loop iteration
    k = self.class
    rbuf = Thread.current[:yahns_rbuf] # running under spawn_worker_threads

    case @state
    when :pipelined
      if @hs.parse
        input = input_ready and return app_call(input)
        # @state == :body if we get here point (input_ready -> mkinput_preread)
      else
        @state = :headers
      end
      # continue to outer loop
    when :headers
      case rv = kgio_tryread(k.client_header_buffer_size, rbuf)
      when String
        if @hs.add_parse(rv)
          input = input_ready and return app_call(input)
          break # to outer loop to reevaluate @state == :body
        end
        # keep looping on kgio_tryread
      when :wait_readable, :wait_writable, nil
        return rv
      end while true
    when :body
      if @hs.body_eof?
        if @hs.content_length || @hs.parse # hp.parse == trailers done!
          @input.rewind
          return app_call(@input)
        else # possible Transfer-Encoding:chunked, keep looping
          @state = :trailers
        end
      else
        case rv = kgio_tryread(k.client_body_buffer_size, rbuf)
        when String
          @hs.filter_body(rbuf, @hs.buf << rbuf)
          @input.write(rbuf)
          # keep looping on kgio_tryread...
        when :wait_readable, :wait_writable
          return rv # have epoll/kqueue wait for more
        when nil # unexpected EOF
          return @input.close # nil
        end # continue to outer loop (case @state)
      end
    when :trailers
      case rv = kgio_tryread(k.client_header_buffer_size, rbuf)
      when String
        if @hs.add_parse(rbuf)
          @input.rewind
          return app_call(@input)
        end
        # keep looping on kgio_tryread...
      when :wait_readable, :wait_writable
        return rv # wait for more
      when nil # unexpected EOF
        return @input.close # nil
      end while true
    end while true # outer loop
  rescue => e
    handle_error(e)
  end

  def app_call(input)
    env = @hs.env
    env[REMOTE_ADDR] = @kgio_addr
    env[RACK_HIJACK] = hijack_proc(env)
    env[RACK_INPUT] = input
    k = self.class

    if k.check_client_connection && @hs.headers?
      @response_start_sent = true
      # FIXME: we should buffer this just in case
      HTTP_RESPONSE_START.each { |c| kgio_write(c) }
    end

    # run the rack app
    response = k.app.call(env.merge!(k.app_defaults))
    return :ignore if env.include?(RACK_HIJACK_IO)

    # this returns :wait_readable, :wait_writable, :ignore, or nil:
    http_response_write(*response)
  end

  def hijack_proc(env)
    proc { env[RACK_HIJACK_IO] = self }
  end

  # called automatically by kgio_write
  def kgio_wait_writable(timeout = self.class.client_timeout)
    super timeout
  end

  # called automatically by kgio_read
  def kgio_wait_readable(timeout = self.class.client_timeout)
    super timeout
  end

  # if we get any error, try to write something back to the client
  # assuming we haven't closed the socket, but don't get hung up
  # if the socket is already closed or broken.  We'll always return
  # nil to ensure the socket is closed at the end of this function
  def handle_error(e)
    code = case e
    when EOFError,Errno::ECONNRESET,Errno::EPIPE,Errno::ENOTCONN
      return # don't send response, drop the connection
    when Unicorn::RequestURITooLongError
      414
    when Unicorn::RequestEntityTooLargeError
      413
    when Unicorn::HttpParserError # try to tell the client they're bad
      400
    else
      Yahns::Log.exception(@hs.env["rack.logger"], "app error", e)
      500
    end
    kgio_trywrite(err_response(code))
  rescue
  ensure
    shutdown rescue nil
    return # always drop the connection on uncaught errors
  end
end

git clone git://yhbt.net/yahns.git
git clone https://yhbt.net/yahns.git