yahns.git  about / heads / tags
sleepy, multi-threaded, non-blocking application server for Ruby
blob 826eb8d5df6341b28d0d39c764553317471bfe60 10253 bytes (raw)
$ git show HEAD:lib/yahns/http_client.rb	# shows this blob on the CLI

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
 
# -*- encoding: binary -*-
# Copyright (C) 2013-2016 all contributors <yahns-public@yhbt.net>
# License: GPL-3.0+ (https://www.gnu.org/licenses/gpl-3.0.txt)
# frozen_string_literal: true
begin
  raise LoadError, 'SENDFILE_BROKEN env set' if ENV['SENDFILE_BROKEN']
  require 'sendfile'
rescue LoadError
end

class Yahns::HttpClient < Kgio::Socket # :nodoc:
  NULL_IO = StringIO.new(''.dup) # :nodoc:

  include Yahns::HttpResponse
  QEV_FLAGS = Yahns::Queue::QEV_RD # used by acceptor

  # called from acceptor thread
  def yahns_init
    @hs = Unicorn::HttpRequest.new
    @state = :headers # :body, :trailers, :pipelined, Wbuf, StreamFile
    @input = nil
  end

  # use if writes are deferred by buffering, this return value goes to
  # the main epoll/kqueue worker loop
  # returns :wait_readable, :wait_writable, or nil
  def step_write
    case rv = @state.wbuf_flush(self)
    when :wait_writable, :wait_readable
      return rv # tell epoll/kqueue to wait on this more
    when :ignore # :ignore on hijack, @state already set in hijack_cleanup
      return :ignore
    when Yahns::StreamFile
      @state = rv # continue looping
    when true, false # done
      return http_response_done(rv)
    when :ccc_done, :r100_done
      @state = rv
      return :wait_writable
    else
      raise "BUG: #{@state.inspect}#wbuf_flush returned #{rv.inspect}"
    end while true
  end

  # used only with "input_buffering true"
  def mkinput_preread
    k = self.class
    len = @hs.content_length
    mbs = k.client_max_body_size
    if mbs && len && len > mbs
      raise Unicorn::RequestEntityTooLargeError,
            "Content-Length:#{len} too large (>#{mbs})", []
    end
    @state = :body
    @input = k.tmpio_for(len, @hs.env)

    rbuf = Thread.current[:yahns_rbuf]
    @hs.filter_body(rbuf, @hs.buf)
    @input.write(rbuf)
  end

  def input_ready
    empty_body = 0 == @hs.content_length
    k = self.class
    case k.input_buffering
    when true
      rv = http_100_response(@hs.env) and return rv

      # common case is an empty body
      return NULL_IO if empty_body

      # content_length is nil (chunked) or len > 0
      mkinput_preread # keep looping
      false
    else # :lazy, false
      empty_body ? NULL_IO : (@input = k.mkinput(self, @hs))
    end
  end

  # returns true if we want to keep looping on this
  # returns :wait_readable/wait_writable/nil to yield back to epoll
  def fill_body(rsize, rbuf)
    case rv = kgio_tryread(rsize, rbuf)
    when String
      @hs.filter_body(rbuf, @hs.buf << rbuf)
      @input.write(rbuf)
      true # keep looping on kgio_tryread (but check body_eof? first)
    when :wait_readable, :wait_writable
      rv # have epoll/kqueue wait for more
    when nil # unexpected EOF
      @input.close # nil
    end
  end

  # returns true if we are ready to dispatch the app
  # returns :wait_readable/wait_writable/nil to yield back to epoll
  def read_trailers(rsize, rbuf)
    case rv = kgio_tryread(rsize, rbuf)
    when String
      if @hs.add_parse(rbuf)
        @input.rewind
        return true
      end
      # keep looping on kgio_tryread...
    when :wait_readable, :wait_writable
      return rv # wait for more
    when nil # unexpected EOF
      return @input.close # nil
    end while true
  end

  # the main entry point of the epoll/kqueue worker loop
  def yahns_step
    # always write unwritten data first if we have any
    return step_write if Yahns::WbufCommon === @state

    # only read if we had nothing to write in this event loop iteration
    k = self.class
    rbuf = Thread.current[:yahns_rbuf] # running under spawn_worker_threads

    case @state
    when :pipelined
      if @hs.parse
        case input = input_ready
        when :wait_readable, :wait_writable, :close then return input
        when false # keep looping on @state
        else
          return app_call(input)
        end
        # @state == :body if we get here point (input_ready -> mkinput_preread)
      else
        @state = :headers
      end
      # continue to outer loop
    when :headers
      case rv = kgio_tryread(k.client_header_buffer_size, rbuf)
      when String
        if @hs.add_parse(rv)
          case input = input_ready
          when :wait_readable, :wait_writable, :close then return input
          when false then break # to outer loop to reevaluate @state == :body
          else
            return app_call(input)
          end
        end
        # keep looping on kgio_tryread
      when :wait_readable, :wait_writable, nil
        return rv
      end while true
    when :body
      if @hs.body_eof?
        if @hs.content_length || @hs.parse # hp.parse == trailers done!
          @input.rewind
          return app_call(@input)
        else # possible Transfer-Encoding:chunked, keep looping
          @state = :trailers
        end
      else
        rv = fill_body(k.client_body_buffer_size, rbuf)
        return rv unless true == rv
      end
    when :trailers
      rv = read_trailers(k.client_header_buffer_size, rbuf)
      return true == rv ? app_call(@input) : rv
    when :ccc_done # unlikely
      return app_call(nil)
    when :r100_done # unlikely
      rv = r100_done
      return rv unless rv == true
      raise "BUG: body=#@state " if @state != :body
      # @state == :body, keep looping
    end while true # outer loop
  rescue => e
    handle_error(e)
  end

  # only called when buffering slow clients
  # returns :wait_readable, :wait_writable, :ignore, or nil for epoll
  # returns true to keep looping inside yahns_step
  def r100_done
    k = self.class
    case k.input_buffering
    when true
      empty_body = 0 == @hs.content_length
      # common case is an empty body
      return app_call(NULL_IO) if empty_body

      # content_length is nil (chunked) or len > 0
      mkinput_preread # keep looping (@state == :body)
      true
    else # :lazy, false
      env = @hs.env
      opt = http_response_prep(env)
      res = k.app.call(env)
      return :ignore if app_hijacked?(env, res)
      http_response_write(res, opt)
    end
  end

  def app_call(input)
    env = @hs.env
    k = self.class

    # input is nil if we needed to wait for writability with
    # check_client_connection
    if input
      env['REMOTE_ADDR'] = @kgio_addr
      env['rack.hijack'] = self
      env['rack.input'] = input

      if k.check_client_connection && @hs.headers?
        rv = do_ccc and return rv
      end
    end

    env.merge!(k.app_defaults)

    # workaround stupid unicorn_http parser behavior when it parses HTTP_HOST
    if env['HTTPS'] == 'on'.freeze &&
        env['HTTP_HOST'] &&
        env['SERVER_PORT'] == '80'.freeze
      env['SERVER_PORT'] = '443'.freeze
    end

    opt = http_response_prep(env)
    # run the rack app
    res = k.app.call(env)
    return :ignore if app_hijacked?(env, res)
    if res[0].to_i == 100
      rv = http_100_response(env) and return rv
      res = k.app.call(env)
    end

    # this returns :wait_readable, :wait_writable, :ignore, or nil:
    http_response_write(res, opt)
  end

  # used by StreamInput (and thus TeeInput) for input_buffering {false|:lazy}
  def yahns_read(bytes, buf)
    case rv = kgio_tryread(bytes, buf)
    when String, nil
      return rv
    when :wait_readable
      wait_readable(self.class.client_timeout) or
        raise Yahns::ClientTimeout, "waiting for read", []
    when :wait_writable
      wait_writable(self.class.client_timeout) or
        raise Yahns::ClientTimeout, "waiting for write", []
    end while true
  end

  # allow releasing some memory if rack.hijack is used
  # n.b. we no longer issue EPOLL_CTL_DEL because it becomes more expensive
  # (and complicated) as our hijack support will allow "un-hijacking"
  # the socket.
  def hijack_cleanup
    # prevent socket from holding process exit up
    Thread.current[:yahns_fdmap].forget(self)
    @state = :ignore
    @input = nil # keep env["rack.input"] accessible, though
  end

  # this is the env["rack.hijack"] callback exposed to the Rack app
  def call
    hijack_cleanup
    @hs.env['rack.hijack_io'] = self
  end

  def response_hijacked(fn)
    hijack_cleanup
    fn.call(self)
    :ignore
  end

  # if we get any error, try to write something back to the client
  # assuming we haven't closed the socket, but don't get hung up
  # if the socket is already closed or broken.  We'll always return
  # nil to ensure the socket is closed at the end of this function
  def handle_error(e)
    code = case e
    when EOFError,Errno::ECONNRESET,Errno::EPIPE,Errno::ENOTCONN,
         Errno::ETIMEDOUT,Errno::EHOSTUNREACH
      return # don't send response, drop the connection
    when Yahns::ClientTimeout
      408
    when Unicorn::RequestURITooLongError
      414
    when Unicorn::RequestEntityTooLargeError
      413
    when Unicorn::HttpParserError # try to tell the client they're bad
      400
    else
      n = 500
      case e.class.to_s
      when 'OpenSSL::SSL::SSLError'
        if e.message.include?('wrong version number')
          n = nil
          e.set_backtrace([])
        end
      end
      Yahns::Log.exception(@hs.env["rack.logger"], "app error", e)
      n
    end
    kgio_trywrite(err_response(code)) if code
  rescue
  ensure
    shutdown rescue nil
    return # always drop the connection on uncaught errors
  end

  def app_hijacked?(env, res)
    return false unless env.include?('rack.hijack_io'.freeze)
    res[2].close if res && res[2].respond_to?(:close)
    true
  end

  def do_pread(io, count, offset)
    count = 16384 if count > 16384
    buf = Thread.current[:yahns_sfbuf] ||= ''.dup
    if io.respond_to?(:pread)
      io.pread(count, offset, buf)
    else
      io.pos = offset
      io.read(count, buf)
    end
  rescue EOFError
    nil
  end

  def trysendio(io, offset, count)
    return 0 if count == 0
    str = do_pread(io, count, offset) or return # nil for EOF
    n = 0
    case rv = kgio_trywrite(str)
    when String # partial write, keep trying
      n += (str.size - rv.size)
      str = rv
    when :wait_writable, :wait_readable
      return n > 0 ? n : rv
    when nil
      return n + str.size # yay!
    end while true
  end

  alias trysendfile trysendio unless IO.instance_methods.include?(:trysendfile)
end

git clone git://yhbt.net/yahns.git
git clone https://yhbt.net/yahns.git