about summary refs log tree commit homepage
path: root/lib/yahns/http_client.rb
diff options
context:
space:
mode:
Diffstat (limited to 'lib/yahns/http_client.rb')
-rw-r--r--lib/yahns/http_client.rb196
1 files changed, 196 insertions, 0 deletions
diff --git a/lib/yahns/http_client.rb b/lib/yahns/http_client.rb
new file mode 100644
index 0000000..8171460
--- /dev/null
+++ b/lib/yahns/http_client.rb
@@ -0,0 +1,196 @@
+# -*- encoding: binary -*-
+# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> and all contributors
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+require 'yahns/tiny_input'
+class Yahns::HttpClient < Kgio::Socket # :nodoc:
+  NULL_IO = Yahns::TinyInput.new("")
+
+  include Yahns::HttpResponse
+  include Yahns::ClientExpire
+  QEV_FLAGS = Yahns::Queue::QEV_RD # used by acceptor
+  HTTP_RESPONSE_START = [ 'HTTP', '/1.1 ' ]
+
+  # A frozen format for this is about 15% faster (note from Mongrel)
+  REMOTE_ADDR = 'REMOTE_ADDR'.freeze
+  RACK_INPUT = 'rack.input'.freeze
+  RACK_HIJACK = 'rack.hijack'.freeze
+  RACK_HIJACK_IO = "rack.hijack_io".freeze
+
+  # called from acceptor thread
+  def yahns_init
+    @hs = Unicorn::HttpRequest.new
+    @response_start_sent = false
+    @state = :headers # :body, :trailers, :pipelined, Wbuf, StreamFile
+    @input = nil
+  end
+
+  # use if writes are deferred by buffering, this return value goes to
+  # the main epoll/kqueue worker loop
+  # returns :wait_readable, :wait_writable, or nil
+  def step_write
+    case rv = @state.wbuf_flush(self)
+    when :wait_writable, :wait_readable
+      return rv # tell epoll/kqueue to wait on this more
+    when :delete # :delete on hijack
+      @state = :delete
+      return :delete
+    when Yahns::StreamFile
+      @state = rv # continue looping
+    when true, false # done
+      return http_response_done(rv)
+    else
+      raise "BUG: #{@state.inspect}#wbuf_flush returned #{rv.inspect}"
+    end while true
+  end
+
+  def mkinput_preread
+    @state = :body
+    @input = self.class.tmpio_for(@hs.content_length)
+    rbuf = Thread.current[:yahns_rbuf]
+    @hs.filter_body(rbuf, @hs.buf)
+    @input.write(rbuf)
+  end
+
+  def input_ready
+    empty_body = 0 == @hs.content_length
+    k = self.class
+    case k.input_buffering
+    when true
+      # common case is an empty body
+      return NULL_IO if empty_body
+
+      # content_length is nil (chunked) or len > 0
+      mkinput_preread # keep looping
+      false
+    else # :lazy, false
+      empty_body ? NULL_IO : k.mkinput(self, @hs)
+    end
+  end
+
+  # the main entry point of the epoll/kqueue worker loop
+  def yahns_step
+    # always write unwritten data first if we have any
+    return step_write if Yahns::WbufCommon === @state
+
+    # only read if we had nothing to write in this event loop iteration
+    k = self.class
+    rbuf = Thread.current[:yahns_rbuf] # running under spawn_worker_threads
+
+    case @state
+    when :pipelined
+      if @hs.parse
+        input = input_ready and return app_call(input)
+        # @state == :body if we get here point (input_ready -> mkinput_preread)
+      else
+        @state = :headers
+      end
+      # continue to outer loop
+    when :headers
+      case rv = kgio_tryread(k.client_header_buffer_size, rbuf)
+      when String
+        if @hs.add_parse(rv)
+          input = input_ready and return app_call(input)
+          break # to outer loop to reevaluate @state == :body
+        end
+        # keep looping on kgio_tryread
+      when :wait_readable, :wait_writable, nil
+        return rv
+      end while true
+    when :body
+      if @hs.body_eof?
+        if @hs.content_length || @hs.parse # hp.parse == trailers done!
+          @input.rewind
+          return app_call(@input)
+        else # possible Transfer-Encoding:chunked, keep looping
+          @state = :trailers
+        end
+      else
+        case rv = kgio_tryread(k.client_body_buffer_size, rbuf)
+        when String
+          @hs.filter_body(rbuf, @hs.buf << rbuf)
+          @input.write(rbuf)
+          # keep looping on kgio_tryread...
+        when :wait_readable, :wait_writable
+          return rv # have epoll/kqueue wait for more
+        when nil # unexpected EOF
+          return @input.close # nil
+        end # continue to outer loop (case @state)
+      end
+    when :trailers
+      case rv = kgio_tryread(k.client_header_buffer_size, rbuf)
+      when String
+        if @hs.add_parse(rbuf)
+          @input.rewind
+          return app_call(@input)
+        end
+        # keep looping on kgio_tryread...
+      when :wait_readable, :wait_writable
+        return rv # wait for more
+      when nil # unexpected EOF
+        return @input.close # nil
+      end while true
+    end while true # outer loop
+  rescue => e
+    handle_error(e)
+  end
+
+  def app_call(input)
+    env = @hs.env
+    env[REMOTE_ADDR] = @kgio_addr
+    env[RACK_HIJACK] = hijack_proc(env)
+    env[RACK_INPUT] = @input ||= input
+    k = self.class
+
+    if k.check_client_connection && @hs.headers?
+      @response_start_sent = true
+      # FIXME: we should buffer this just in case
+      HTTP_RESPONSE_START.each { |c| kgio_write(c) }
+    end
+
+    # run the rack app
+    response = k.app.call(env.merge!(k.app_defaults))
+    return :delete if env.include?(RACK_HIJACK_IO)
+
+    # this returns :wait_readable, :wait_writable, :delete, or nil:
+    http_response_write(*response)
+  end
+
+  def hijack_proc(env)
+    proc { env[RACK_HIJACK_IO] = self }
+  end
+
+  # called automatically by kgio_write
+  def kgio_wait_writable(timeout = self.class.client_timeout)
+    super timeout
+  end
+
+  # called automatically by kgio_read
+  def kgio_wait_readable(timeout = self.class.client_timeout)
+    super timeout
+  end
+
+  # if we get any error, try to write something back to the client
+  # assuming we haven't closed the socket, but don't get hung up
+  # if the socket is already closed or broken.  We'll always return
+  # nil to ensure the socket is closed at the end of this function
+  def handle_error(e)
+    code = case e
+    when EOFError,Errno::ECONNRESET,Errno::EPIPE,Errno::ENOTCONN
+      return # don't send response, drop the connection
+    when Unicorn::RequestURITooLongError
+      414
+    when Unicorn::RequestEntityTooLargeError
+      413
+    when Unicorn::HttpParserError # try to tell the client they're bad
+      400
+    else
+      Yahns::Log.exception(@hs.env["rack.logger"], "app error", e)
+      500
+    end
+    kgio_trywrite(err_response(code))
+  rescue
+  ensure
+    shutdown rescue nil
+    return # always drop the connection on uncaught errors
+  end
+end