about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2013-10-26 01:13:29 +0000
committerEric Wong <normalperson@yhbt.net>2013-10-26 02:00:17 +0000
commitf38e54f8d54f8cdfdc15f43b2394f0acfff5d413 (patch)
tree5a26e626a3fcee6787f62d1dd885137604db97c5
parent8671b632849216476305573e87b5626bc34fedf1 (diff)
downloadyahns-f38e54f8d54f8cdfdc15f43b2394f0acfff5d413.tar.gz
The tiny responses for check_client_connection and "100-Continue"
responses may occasionally fail with EAGAIN.  We must be prepared
for those corner cases and buffer the output appropriately.  We can
safely use a string for buffering here (for once(!)), since the
buffer sizes are bounded and known at buffer time, unlike the
response headers/bodies sent by the Rack application.
-rw-r--r--lib/yahns/http_client.rb69
-rw-r--r--lib/yahns/http_response.rb72
-rw-r--r--lib/yahns/queue_epoll.rb2
-rw-r--r--lib/yahns/wbuf_str.rb24
-rw-r--r--test/test_expect_100.rb94
5 files changed, 230 insertions, 31 deletions
diff --git a/lib/yahns/http_client.rb b/lib/yahns/http_client.rb
index 1dcf381..738ad61 100644
--- a/lib/yahns/http_client.rb
+++ b/lib/yahns/http_client.rb
@@ -10,7 +10,6 @@ class Yahns::HttpClient < Kgio::Socket # :nodoc:
   include Yahns::HttpResponse
   include Yahns::ClientExpire
   QEV_FLAGS = Yahns::Queue::QEV_RD # used by acceptor
-  HTTP_RESPONSE_START = [ 'HTTP', '/1.1 ' ]
 
   # A frozen format for this is about 15% faster (note from Mongrel)
   REMOTE_ADDR = 'REMOTE_ADDR'.freeze
@@ -40,6 +39,9 @@ class Yahns::HttpClient < Kgio::Socket # :nodoc:
       @state = rv # continue looping
     when true, false # done
       return http_response_done(rv)
+    when :ccc_done, :r100_done
+      @state = rv
+      return :wait_writable
     else
       raise "BUG: #{@state.inspect}#wbuf_flush returned #{rv.inspect}"
     end while true
@@ -57,9 +59,6 @@ class Yahns::HttpClient < Kgio::Socket # :nodoc:
     @state = :body
     @input = k.tmpio_for(len)
 
-    # FIXME: we must buffer and not block here even if it's only a few bytes
-    http_100_response(@hs.env)
-
     rbuf = Thread.current[:yahns_rbuf]
     @hs.filter_body(rbuf, @hs.buf)
     @input.write(rbuf)
@@ -70,6 +69,8 @@ class Yahns::HttpClient < Kgio::Socket # :nodoc:
     k = self.class
     case k.input_buffering
     when true
+      rv = http_100_response(@hs.env) and return rv
+
       # common case is an empty body
       return NULL_IO if empty_body
 
@@ -125,7 +126,12 @@ class Yahns::HttpClient < Kgio::Socket # :nodoc:
     case @state
     when :pipelined
       if @hs.parse
-        input = input_ready and return app_call(input)
+        case input = input_ready
+        when :wait_readable, :wait_writable, :close then return input
+        when false # keep looping on @state
+        else
+          return app_call(input)
+        end
         # @state == :body if we get here point (input_ready -> mkinput_preread)
       else
         @state = :headers
@@ -135,8 +141,12 @@ class Yahns::HttpClient < Kgio::Socket # :nodoc:
       case rv = kgio_tryread(k.client_header_buffer_size, rbuf)
       when String
         if @hs.add_parse(rv)
-          input = input_ready and return app_call(input)
-          break # to outer loop to reevaluate @state == :body
+          case input = input_ready
+          when :wait_readable, :wait_writable, :close then return input
+          when false then break # to outer loop to reevaluate @state == :body
+          else
+            return app_call(input)
+          end
         end
         # keep looping on kgio_tryread
       when :wait_readable, :wait_writable, nil
@@ -157,30 +167,57 @@ class Yahns::HttpClient < Kgio::Socket # :nodoc:
     when :trailers
       rv = read_trailers(k.client_header_buffer_size, rbuf)
       return true == rv ? app_call(@input) : rv
+    when :ccc_done # unlikely
+      return app_call(nil)
+    when :r100_done # unlikely
+      rv = r100_done
+      return rv unless rv == true
+      raise "BUG: body=#@state " if @state != :body
+      # @state == :body, keep looping
     end while true # outer loop
   rescue => e
     handle_error(e)
   end
 
+  # returns :wait_readable, :wait_writable, :ignore, or nil for epoll
+  # returns false to keep looping inside yahns_step
+  def r100_done
+    k = self.class
+    case k.input_buffering
+    when true
+      empty_body = 0 == @hs.content_length
+      # common case is an empty body
+      return app_call(NULL_IO) if empty_body
+
+      # content_length is nil (chunked) or len > 0
+      mkinput_preread # keep looping (@state == :body)
+      true
+    else # :lazy, false
+      http_response_write(*k.app.call(@hs.env))
+    end
+  end
+
   def app_call(input)
     env = @hs.env
-    env[REMOTE_ADDR] = @kgio_addr
-    env[RACK_HIJACK] = hijack_proc(env)
-    env[RACK_INPUT] = input
     k = self.class
 
-    if k.check_client_connection && @hs.headers?
-      @response_start_sent = true
-      # FIXME: we should buffer this just in case
-      HTTP_RESPONSE_START.each { |c| kgio_write(c) }
+    # input is nil if we needed to wait for writability with
+    # check_client_connection
+    if input
+      env[REMOTE_ADDR] = @kgio_addr
+      env[RACK_HIJACK] = hijack_proc(env)
+      env[RACK_INPUT] = input
+
+      if k.check_client_connection && @hs.headers?
+        rv = do_ccc and return rv
+      end
     end
 
     # run the rack app
     status, headers, body = k.app.call(env.merge!(k.app_defaults))
     return :ignore if env.include?(RACK_HIJACK_IO)
     if status.to_i == 100
-      # FIXME: we must buffer and not wait here even if it's only a few bytes
-      http_100_response(env)
+      rv = http_100_response(env) and return rv
       status, headers, body = k.app.call(env)
     end
 
diff --git a/lib/yahns/http_response.rb b/lib/yahns/http_response.rb
index a34832a..df935e9 100644
--- a/lib/yahns/http_response.rb
+++ b/lib/yahns/http_response.rb
@@ -2,6 +2,7 @@
 # Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> and all contributors
 # License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
 require_relative 'stream_file'
+require_relative 'wbuf_str'
 
 # Writes a Rack response to your client using the HTTP/1.1 specification.
 # You use it by simply doing:
@@ -18,7 +19,8 @@ module Yahns::HttpResponse # :nodoc:
   CONN_KA = "Connection: keep-alive\r\n\r\n"
   CONN_CLOSE = "Connection: close\r\n\r\n"
   Z = ""
-  RESPONSE_START = "HTTP/1.1 "
+  CCC_RESPONSE_START = [ 'HTTP', '/1.1 ' ].map!(&:freeze)
+  RESPONSE_START = CCC_RESPONSE_START.join('')
   R100_RAW = "HTTP/1.1 100 Continue\r\n\r\n"
   R100_CCC = "100 Continue\r\n\r\nHTTP/1.1 "
   HTTP_EXPECT = "HTTP_EXPECT"
@@ -27,22 +29,13 @@ module Yahns::HttpResponse # :nodoc:
     @response_start_sent ? Z : RESPONSE_START
   end
 
-  # only used if input_buffering is true (not :lazy or false)
-  # :lazy/false gives control to the app
-  def http_100_response(env)
-    if env[HTTP_EXPECT] =~ /\A100-continue\z/i
-      kgio_write(@response_start_sent ? R100_CCC : R100_RAW)
-      env.delete(HTTP_EXPECT)
-    end
-  end
-
   def response_wait_write(rv)
     # call the kgio_wait_readable or kgio_wait_writable method
     ok = __send__("kgio_#{rv}") and return ok
     k = self.class
     k.logger.info("fd=#{fileno} ip=#@kgio_addr timeout on :#{rv} after "\
                   "#{k.client_timeout}s")
-    nil
+    false
   end
 
   def err_response(code)
@@ -93,7 +86,7 @@ module Yahns::HttpResponse # :nodoc:
       # shutdown is needed in case the app forked, we rescue here since
       # StreamInput may issue shutdown as well
       shutdown rescue nil
-      nil # trigger close
+      :close
     end
   end
 
@@ -145,7 +138,7 @@ module Yahns::HttpResponse # :nodoc:
           body = nil # ensure we do not close body in ensure
           return rv
         else
-          response_wait_write(rv) or return
+          response_wait_write(rv) or return :close
         end
       end while true
     end
@@ -176,7 +169,7 @@ module Yahns::HttpResponse # :nodoc:
             rv = wbuf.wbuf_write(self, chunk)
             break
           else
-            response_wait_write(rv) or return
+            response_wait_write(rv) or return :close
           end
         end while true
       end
@@ -193,4 +186,55 @@ module Yahns::HttpResponse # :nodoc:
   ensure
     body.respond_to?(:close) and body.close
   end
+
+  # returns nil on success
+  # :wait_readable/:wait_writable/:close for epoll
+  def do_ccc
+    @response_start_sent = true
+    wbuf = nil
+    rv = nil
+    CCC_RESPONSE_START.each do |buf|
+      if wbuf
+        wbuf << buf
+      else
+        case rv = kgio_trywrite(buf)
+        when nil
+          break
+        when String
+          buf = rv
+        when :wait_writable, :wait_readable
+          if self.class.output_buffering
+            wbuf = buf.dup
+            @state = Yahns::WbufStr.new(wbuf, :ccc_done)
+            break
+          else
+            response_wait_write(rv) or return :close
+          end
+        end while true
+      end
+    end
+    rv
+  end
+
+  # only used if input_buffering is true (not :lazy or false)
+  # input_buffering==:lazy/false gives control to the app
+  # returns nil on success
+  # returns :close, :wait_writable, or :wait_readable
+  def http_100_response(env)
+    env.delete(HTTP_EXPECT) =~ /\A100-continue\z/i or return nil
+    buf = @response_start_sent ? R100_CCC : R100_RAW
+    case rv = kgio_trywrite(buf)
+    when String
+      buf = rv
+    when :wait_writable, :wait_readable
+      if self.class.output_buffering
+        @state = Yahns::WbufStr.new(buf, :r100_done)
+        return rv
+      else
+        response_wait_write(rv) or return :close
+      end
+    else
+      return rv
+    end while true
+  end
 end
diff --git a/lib/yahns/queue_epoll.rb b/lib/yahns/queue_epoll.rb
index 624e175..c4a80a3 100644
--- a/lib/yahns/queue_epoll.rb
+++ b/lib/yahns/queue_epoll.rb
@@ -37,7 +37,7 @@ class Yahns::Queue < SleepyPenguin::Epoll::IO # :nodoc:
             epoll_ctl(Epoll::CTL_MOD, io, QEV_RDWR)
           when :ignore # only used by rack.hijack
             @fdmap.decr
-          when nil
+          when nil, :close
             # this is be the ONLY place where we call IO#close on
             # things inside the queue
             io.close
diff --git a/lib/yahns/wbuf_str.rb b/lib/yahns/wbuf_str.rb
new file mode 100644
index 0000000..414f3d5
--- /dev/null
+++ b/lib/yahns/wbuf_str.rb
@@ -0,0 +1,24 @@
+# -*- encoding: binary -*-
+# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> et. al.
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+require_relative 'wbuf_common'
+
+class Yahns::WbufStr # :nodoc:
+  include Yahns::WbufCommon
+
+  def initialize(str, next_state)
+    @str = str
+    @next = next_state # :check_client_connection, :http_100_response
+  end
+
+  def wbuf_flush(client)
+    case rv = client.kgio_trywrite(@str)
+    when String
+      @str = rv
+    when :wait_writable, :wait_readable
+      return rv
+    when nil
+      return @next
+    end while true
+  end
+end
diff --git a/test/test_expect_100.rb b/test/test_expect_100.rb
index 338a50a..5b1ffa8 100644
--- a/test/test_expect_100.rb
+++ b/test/test_expect_100.rb
@@ -14,10 +14,18 @@ class TestExpect100 < Testcase
       code = env["HTTP_X_FORCE_RCODE"] || 100
       [ code, h, [] ]
     else
+      env["rack.input"].read
       [ 201, h, [] ]
     end
   end
 
+  module TrywriteBlocked
+    def kgio_trywrite(*args)
+      return :wait_writable if $_tw_block_on.include?($_tw_blocked += 1)
+      super
+    end
+  end
+
   def test_buffer_noccc; _test_expect_100(true, false); end
   def test_nobuffer_noccc; _test_expect_100(false, false); end
   def test_lazybuffer_noccc; _test_expect_100(:lazy, false); end
@@ -88,4 +96,90 @@ class TestExpect100 < Testcase
   def test_reject_true_noccc; _test_reject(false, false); end
   def test_reject_lazy_ccc; _test_reject(:lazy, true); end
   def test_reject_true_ccc; _test_reject(false, true); end
+
+  def test_swait_t_t; _swait(true, true, [1]); end
+  def test_swait_f_f; _swait(false, false, [1]); end
+  def test_swait_t_f; _swait(true, false, [1]); end
+  def test_swait_f_t; _swait(false, true, [1]); end
+  def test_swait_l_t; _swait(:lazy, true, [1]); end
+  def test_swait_l_f; _swait(:lazy, false, [1]); end
+
+  def test_swait2_t_t; _swait(true, true, [1,2]); end
+  def test_swait2_f_f; _swait(false, false, [1,2]); end
+  def test_swait2_t_f; _swait(true, false, [1,2]); end
+  def test_swait2_f_t; _swait(false, true, [1,2]); end
+  def test_swait2_l_t; _swait(:lazy, true, [1,2]); end
+  def test_swait2_l_f; _swait(:lazy, false, [1,2]); end
+
+  def test_swait3_t_t; _swait(true, true, [1,3]); end
+  def test_swait3_f_f; _swait(false, false, [1,3]); end
+  def test_swait3_t_f; _swait(true, false, [1,3]); end
+  def test_swait3_f_t; _swait(false, true, [1,3]); end
+  def test_swait3_l_t; _swait(:lazy, true, [1,3]); end
+  def test_swait3_l_f; _swait(:lazy, false, [1,3]); end
+
+  def test_swait_t_t_ccc; _swait(true, true, [1], true); end
+  def test_swait_f_f_ccc; _swait(false, false, [1], true); end
+  def test_swait_t_f_ccc; _swait(true, false, [1], true); end
+  def test_swait_f_t_ccc; _swait(false, true, [1], true); end
+  def test_swait_l_t_ccc; _swait(:lazy, true, [1], true); end
+  def test_swait_l_f_ccc; _swait(:lazy, false, [1], true); end
+
+  def test_swait2_t_t_ccc; _swait(true, true, [1,2], true); end
+  def test_swait2_f_f_ccc; _swait(false, false, [1,2], true); end
+  def test_swait2_t_f_ccc; _swait(true, false, [1,2], true); end
+  def test_swait2_f_t_ccc; _swait(false, true, [1,2], true); end
+  def test_swait2_l_t_ccc; _swait(:lazy, true, [1,2], true); end
+  def test_swait2_l_f_ccc; _swait(:lazy, false, [1,2], true); end
+
+  def test_swait3_t_t_ccc; _swait(true, true, [1,3], true); end
+  def test_swait3_f_f_ccc; _swait(false, false, [1,3], true); end
+  def test_swait3_t_f_ccc; _swait(true, false, [1,3], true); end
+  def test_swait3_f_t_ccc; _swait(false, true, [1,3], true); end
+  def test_swait3_l_t_ccc; _swait(:lazy, true, [1,3], true); end
+  def test_swait3_l_f_ccc; _swait(:lazy, false, [1,3], true); end
+
+  def test_swait3_t_t_ccc_body; _swait(true, true, [1,3], true, "HI"); end
+  def test_swait3_f_f_ccc_body; _swait(false, false, [1,3], true, "HI"); end
+  def test_swait3_t_f_ccc_body; _swait(true, false, [1,3], true, "HI"); end
+  def test_swait3_f_t_ccc_body; _swait(false, true, [1,3], true, "HI"); end
+  def test_swait3_l_t_ccc_body; _swait(:lazy, true, [1,3], true, "HI"); end
+  def test_swait3_l_f_ccc_body; _swait(:lazy, false, [1,3], true, "HI"); end
+
+  def _swait(ibtype, obtype, block_on, ccc = false, body = "")
+    err = @err
+    cfg = Yahns::Config.new
+    host, port = @srv.addr[3], @srv.addr[1]
+    cfg.instance_eval do
+      stderr_path err.path
+      GTL.synchronize {
+        app(:rack, APP) {
+         listen "#{host}:#{port}"
+         output_buffering obtype
+         input_buffering ibtype
+         check_client_connection ccc
+        }
+      }
+    end
+    pid = fork do
+      ENV["YAHNS_FD"] = @srv.fileno.to_s
+      $_tw_blocked = 0
+      $_tw_block_on = block_on
+      Yahns::HttpClient.__send__(:include, TrywriteBlocked)
+      Yahns::Server.new(cfg).start.join
+    end
+    c = get_tcp_client(host, port)
+    if body.size > 0
+      r = "PUT / HTTP/1.0\r\nExpect: 100-continue\r\n"
+      r << "Content-Length: #{body.size}\r\n\r\n#{body}"
+    else
+      r = "PUT / HTTP/1.0\r\nExpect: 100-continue\r\n\r\n"
+    end
+    c.write(r)
+    assert c.wait(10), "timed out"
+    buf = c.read
+    assert_match(%r{\AHTTP/1\.1 100 Continue\r\n\r\nHTTP/1\.1 201}, buf)
+  ensure
+    quit_wait(pid)
+  end
 end