diff options
Diffstat (limited to 'lib/yahns/http_client.rb')
-rw-r--r-- | lib/yahns/http_client.rb | 196 |
1 files changed, 196 insertions, 0 deletions
diff --git a/lib/yahns/http_client.rb b/lib/yahns/http_client.rb new file mode 100644 index 0000000..8171460 --- /dev/null +++ b/lib/yahns/http_client.rb @@ -0,0 +1,196 @@ +# -*- encoding: binary -*- +# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> and all contributors +# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) +require 'yahns/tiny_input' +class Yahns::HttpClient < Kgio::Socket # :nodoc: + NULL_IO = Yahns::TinyInput.new("") + + include Yahns::HttpResponse + include Yahns::ClientExpire + QEV_FLAGS = Yahns::Queue::QEV_RD # used by acceptor + HTTP_RESPONSE_START = [ 'HTTP', '/1.1 ' ] + + # A frozen format for this is about 15% faster (note from Mongrel) + REMOTE_ADDR = 'REMOTE_ADDR'.freeze + RACK_INPUT = 'rack.input'.freeze + RACK_HIJACK = 'rack.hijack'.freeze + RACK_HIJACK_IO = "rack.hijack_io".freeze + + # called from acceptor thread + def yahns_init + @hs = Unicorn::HttpRequest.new + @response_start_sent = false + @state = :headers # :body, :trailers, :pipelined, Wbuf, StreamFile + @input = nil + end + + # use if writes are deferred by buffering, this return value goes to + # the main epoll/kqueue worker loop + # returns :wait_readable, :wait_writable, or nil + def step_write + case rv = @state.wbuf_flush(self) + when :wait_writable, :wait_readable + return rv # tell epoll/kqueue to wait on this more + when :delete # :delete on hijack + @state = :delete + return :delete + when Yahns::StreamFile + @state = rv # continue looping + when true, false # done + return http_response_done(rv) + else + raise "BUG: #{@state.inspect}#wbuf_flush returned #{rv.inspect}" + end while true + end + + def mkinput_preread + @state = :body + @input = self.class.tmpio_for(@hs.content_length) + rbuf = Thread.current[:yahns_rbuf] + @hs.filter_body(rbuf, @hs.buf) + @input.write(rbuf) + end + + def input_ready + empty_body = 0 == @hs.content_length + k = self.class + case k.input_buffering + when true + # common case is an empty body + return NULL_IO if empty_body + + # content_length is nil (chunked) or len > 0 + mkinput_preread # keep looping + false + else # :lazy, false + empty_body ? NULL_IO : k.mkinput(self, @hs) + end + end + + # the main entry point of the epoll/kqueue worker loop + def yahns_step + # always write unwritten data first if we have any + return step_write if Yahns::WbufCommon === @state + + # only read if we had nothing to write in this event loop iteration + k = self.class + rbuf = Thread.current[:yahns_rbuf] # running under spawn_worker_threads + + case @state + when :pipelined + if @hs.parse + input = input_ready and return app_call(input) + # @state == :body if we get here point (input_ready -> mkinput_preread) + else + @state = :headers + end + # continue to outer loop + when :headers + case rv = kgio_tryread(k.client_header_buffer_size, rbuf) + when String + if @hs.add_parse(rv) + input = input_ready and return app_call(input) + break # to outer loop to reevaluate @state == :body + end + # keep looping on kgio_tryread + when :wait_readable, :wait_writable, nil + return rv + end while true + when :body + if @hs.body_eof? + if @hs.content_length || @hs.parse # hp.parse == trailers done! + @input.rewind + return app_call(@input) + else # possible Transfer-Encoding:chunked, keep looping + @state = :trailers + end + else + case rv = kgio_tryread(k.client_body_buffer_size, rbuf) + when String + @hs.filter_body(rbuf, @hs.buf << rbuf) + @input.write(rbuf) + # keep looping on kgio_tryread... + when :wait_readable, :wait_writable + return rv # have epoll/kqueue wait for more + when nil # unexpected EOF + return @input.close # nil + end # continue to outer loop (case @state) + end + when :trailers + case rv = kgio_tryread(k.client_header_buffer_size, rbuf) + when String + if @hs.add_parse(rbuf) + @input.rewind + return app_call(@input) + end + # keep looping on kgio_tryread... + when :wait_readable, :wait_writable + return rv # wait for more + when nil # unexpected EOF + return @input.close # nil + end while true + end while true # outer loop + rescue => e + handle_error(e) + end + + def app_call(input) + env = @hs.env + env[REMOTE_ADDR] = @kgio_addr + env[RACK_HIJACK] = hijack_proc(env) + env[RACK_INPUT] = @input ||= input + k = self.class + + if k.check_client_connection && @hs.headers? + @response_start_sent = true + # FIXME: we should buffer this just in case + HTTP_RESPONSE_START.each { |c| kgio_write(c) } + end + + # run the rack app + response = k.app.call(env.merge!(k.app_defaults)) + return :delete if env.include?(RACK_HIJACK_IO) + + # this returns :wait_readable, :wait_writable, :delete, or nil: + http_response_write(*response) + end + + def hijack_proc(env) + proc { env[RACK_HIJACK_IO] = self } + end + + # called automatically by kgio_write + def kgio_wait_writable(timeout = self.class.client_timeout) + super timeout + end + + # called automatically by kgio_read + def kgio_wait_readable(timeout = self.class.client_timeout) + super timeout + end + + # if we get any error, try to write something back to the client + # assuming we haven't closed the socket, but don't get hung up + # if the socket is already closed or broken. We'll always return + # nil to ensure the socket is closed at the end of this function + def handle_error(e) + code = case e + when EOFError,Errno::ECONNRESET,Errno::EPIPE,Errno::ENOTCONN + return # don't send response, drop the connection + when Unicorn::RequestURITooLongError + 414 + when Unicorn::RequestEntityTooLargeError + 413 + when Unicorn::HttpParserError # try to tell the client they're bad + 400 + else + Yahns::Log.exception(@hs.env["rack.logger"], "app error", e) + 500 + end + kgio_trywrite(err_response(code)) + rescue + ensure + shutdown rescue nil + return # always drop the connection on uncaught errors + end +end |