diff options
-rw-r--r-- | lib/yahns/http_client.rb | 69 | ||||
-rw-r--r-- | lib/yahns/http_response.rb | 72 | ||||
-rw-r--r-- | lib/yahns/queue_epoll.rb | 2 | ||||
-rw-r--r-- | lib/yahns/wbuf_str.rb | 24 | ||||
-rw-r--r-- | test/test_expect_100.rb | 94 |
5 files changed, 230 insertions, 31 deletions
diff --git a/lib/yahns/http_client.rb b/lib/yahns/http_client.rb index 1dcf381..738ad61 100644 --- a/lib/yahns/http_client.rb +++ b/lib/yahns/http_client.rb @@ -10,7 +10,6 @@ class Yahns::HttpClient < Kgio::Socket # :nodoc: include Yahns::HttpResponse include Yahns::ClientExpire QEV_FLAGS = Yahns::Queue::QEV_RD # used by acceptor - HTTP_RESPONSE_START = [ 'HTTP', '/1.1 ' ] # A frozen format for this is about 15% faster (note from Mongrel) REMOTE_ADDR = 'REMOTE_ADDR'.freeze @@ -40,6 +39,9 @@ class Yahns::HttpClient < Kgio::Socket # :nodoc: @state = rv # continue looping when true, false # done return http_response_done(rv) + when :ccc_done, :r100_done + @state = rv + return :wait_writable else raise "BUG: #{@state.inspect}#wbuf_flush returned #{rv.inspect}" end while true @@ -57,9 +59,6 @@ class Yahns::HttpClient < Kgio::Socket # :nodoc: @state = :body @input = k.tmpio_for(len) - # FIXME: we must buffer and not block here even if it's only a few bytes - http_100_response(@hs.env) - rbuf = Thread.current[:yahns_rbuf] @hs.filter_body(rbuf, @hs.buf) @input.write(rbuf) @@ -70,6 +69,8 @@ class Yahns::HttpClient < Kgio::Socket # :nodoc: k = self.class case k.input_buffering when true + rv = http_100_response(@hs.env) and return rv + # common case is an empty body return NULL_IO if empty_body @@ -125,7 +126,12 @@ class Yahns::HttpClient < Kgio::Socket # :nodoc: case @state when :pipelined if @hs.parse - input = input_ready and return app_call(input) + case input = input_ready + when :wait_readable, :wait_writable, :close then return input + when false # keep looping on @state + else + return app_call(input) + end # @state == :body if we get here point (input_ready -> mkinput_preread) else @state = :headers @@ -135,8 +141,12 @@ class Yahns::HttpClient < Kgio::Socket # :nodoc: case rv = kgio_tryread(k.client_header_buffer_size, rbuf) when String if @hs.add_parse(rv) - input = input_ready and return app_call(input) - break # to outer loop to reevaluate @state == :body + case input = input_ready + when :wait_readable, :wait_writable, :close then return input + when false then break # to outer loop to reevaluate @state == :body + else + return app_call(input) + end end # keep looping on kgio_tryread when :wait_readable, :wait_writable, nil @@ -157,30 +167,57 @@ class Yahns::HttpClient < Kgio::Socket # :nodoc: when :trailers rv = read_trailers(k.client_header_buffer_size, rbuf) return true == rv ? app_call(@input) : rv + when :ccc_done # unlikely + return app_call(nil) + when :r100_done # unlikely + rv = r100_done + return rv unless rv == true + raise "BUG: body=#@state " if @state != :body + # @state == :body, keep looping end while true # outer loop rescue => e handle_error(e) end + # returns :wait_readable, :wait_writable, :ignore, or nil for epoll + # returns false to keep looping inside yahns_step + def r100_done + k = self.class + case k.input_buffering + when true + empty_body = 0 == @hs.content_length + # common case is an empty body + return app_call(NULL_IO) if empty_body + + # content_length is nil (chunked) or len > 0 + mkinput_preread # keep looping (@state == :body) + true + else # :lazy, false + http_response_write(*k.app.call(@hs.env)) + end + end + def app_call(input) env = @hs.env - env[REMOTE_ADDR] = @kgio_addr - env[RACK_HIJACK] = hijack_proc(env) - env[RACK_INPUT] = input k = self.class - if k.check_client_connection && @hs.headers? - @response_start_sent = true - # FIXME: we should buffer this just in case - HTTP_RESPONSE_START.each { |c| kgio_write(c) } + # input is nil if we needed to wait for writability with + # check_client_connection + if input + env[REMOTE_ADDR] = @kgio_addr + env[RACK_HIJACK] = hijack_proc(env) + env[RACK_INPUT] = input + + if k.check_client_connection && @hs.headers? + rv = do_ccc and return rv + end end # run the rack app status, headers, body = k.app.call(env.merge!(k.app_defaults)) return :ignore if env.include?(RACK_HIJACK_IO) if status.to_i == 100 - # FIXME: we must buffer and not wait here even if it's only a few bytes - http_100_response(env) + rv = http_100_response(env) and return rv status, headers, body = k.app.call(env) end diff --git a/lib/yahns/http_response.rb b/lib/yahns/http_response.rb index a34832a..df935e9 100644 --- a/lib/yahns/http_response.rb +++ b/lib/yahns/http_response.rb @@ -2,6 +2,7 @@ # Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> and all contributors # License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) require_relative 'stream_file' +require_relative 'wbuf_str' # Writes a Rack response to your client using the HTTP/1.1 specification. # You use it by simply doing: @@ -18,7 +19,8 @@ module Yahns::HttpResponse # :nodoc: CONN_KA = "Connection: keep-alive\r\n\r\n" CONN_CLOSE = "Connection: close\r\n\r\n" Z = "" - RESPONSE_START = "HTTP/1.1 " + CCC_RESPONSE_START = [ 'HTTP', '/1.1 ' ].map!(&:freeze) + RESPONSE_START = CCC_RESPONSE_START.join('') R100_RAW = "HTTP/1.1 100 Continue\r\n\r\n" R100_CCC = "100 Continue\r\n\r\nHTTP/1.1 " HTTP_EXPECT = "HTTP_EXPECT" @@ -27,22 +29,13 @@ module Yahns::HttpResponse # :nodoc: @response_start_sent ? Z : RESPONSE_START end - # only used if input_buffering is true (not :lazy or false) - # :lazy/false gives control to the app - def http_100_response(env) - if env[HTTP_EXPECT] =~ /\A100-continue\z/i - kgio_write(@response_start_sent ? R100_CCC : R100_RAW) - env.delete(HTTP_EXPECT) - end - end - def response_wait_write(rv) # call the kgio_wait_readable or kgio_wait_writable method ok = __send__("kgio_#{rv}") and return ok k = self.class k.logger.info("fd=#{fileno} ip=#@kgio_addr timeout on :#{rv} after "\ "#{k.client_timeout}s") - nil + false end def err_response(code) @@ -93,7 +86,7 @@ module Yahns::HttpResponse # :nodoc: # shutdown is needed in case the app forked, we rescue here since # StreamInput may issue shutdown as well shutdown rescue nil - nil # trigger close + :close end end @@ -145,7 +138,7 @@ module Yahns::HttpResponse # :nodoc: body = nil # ensure we do not close body in ensure return rv else - response_wait_write(rv) or return + response_wait_write(rv) or return :close end end while true end @@ -176,7 +169,7 @@ module Yahns::HttpResponse # :nodoc: rv = wbuf.wbuf_write(self, chunk) break else - response_wait_write(rv) or return + response_wait_write(rv) or return :close end end while true end @@ -193,4 +186,55 @@ module Yahns::HttpResponse # :nodoc: ensure body.respond_to?(:close) and body.close end + + # returns nil on success + # :wait_readable/:wait_writable/:close for epoll + def do_ccc + @response_start_sent = true + wbuf = nil + rv = nil + CCC_RESPONSE_START.each do |buf| + if wbuf + wbuf << buf + else + case rv = kgio_trywrite(buf) + when nil + break + when String + buf = rv + when :wait_writable, :wait_readable + if self.class.output_buffering + wbuf = buf.dup + @state = Yahns::WbufStr.new(wbuf, :ccc_done) + break + else + response_wait_write(rv) or return :close + end + end while true + end + end + rv + end + + # only used if input_buffering is true (not :lazy or false) + # input_buffering==:lazy/false gives control to the app + # returns nil on success + # returns :close, :wait_writable, or :wait_readable + def http_100_response(env) + env.delete(HTTP_EXPECT) =~ /\A100-continue\z/i or return nil + buf = @response_start_sent ? R100_CCC : R100_RAW + case rv = kgio_trywrite(buf) + when String + buf = rv + when :wait_writable, :wait_readable + if self.class.output_buffering + @state = Yahns::WbufStr.new(buf, :r100_done) + return rv + else + response_wait_write(rv) or return :close + end + else + return rv + end while true + end end diff --git a/lib/yahns/queue_epoll.rb b/lib/yahns/queue_epoll.rb index 624e175..c4a80a3 100644 --- a/lib/yahns/queue_epoll.rb +++ b/lib/yahns/queue_epoll.rb @@ -37,7 +37,7 @@ class Yahns::Queue < SleepyPenguin::Epoll::IO # :nodoc: epoll_ctl(Epoll::CTL_MOD, io, QEV_RDWR) when :ignore # only used by rack.hijack @fdmap.decr - when nil + when nil, :close # this is be the ONLY place where we call IO#close on # things inside the queue io.close diff --git a/lib/yahns/wbuf_str.rb b/lib/yahns/wbuf_str.rb new file mode 100644 index 0000000..414f3d5 --- /dev/null +++ b/lib/yahns/wbuf_str.rb @@ -0,0 +1,24 @@ +# -*- encoding: binary -*- +# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> et. al. +# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) +require_relative 'wbuf_common' + +class Yahns::WbufStr # :nodoc: + include Yahns::WbufCommon + + def initialize(str, next_state) + @str = str + @next = next_state # :check_client_connection, :http_100_response + end + + def wbuf_flush(client) + case rv = client.kgio_trywrite(@str) + when String + @str = rv + when :wait_writable, :wait_readable + return rv + when nil + return @next + end while true + end +end diff --git a/test/test_expect_100.rb b/test/test_expect_100.rb index 338a50a..5b1ffa8 100644 --- a/test/test_expect_100.rb +++ b/test/test_expect_100.rb @@ -14,10 +14,18 @@ class TestExpect100 < Testcase code = env["HTTP_X_FORCE_RCODE"] || 100 [ code, h, [] ] else + env["rack.input"].read [ 201, h, [] ] end end + module TrywriteBlocked + def kgio_trywrite(*args) + return :wait_writable if $_tw_block_on.include?($_tw_blocked += 1) + super + end + end + def test_buffer_noccc; _test_expect_100(true, false); end def test_nobuffer_noccc; _test_expect_100(false, false); end def test_lazybuffer_noccc; _test_expect_100(:lazy, false); end @@ -88,4 +96,90 @@ class TestExpect100 < Testcase def test_reject_true_noccc; _test_reject(false, false); end def test_reject_lazy_ccc; _test_reject(:lazy, true); end def test_reject_true_ccc; _test_reject(false, true); end + + def test_swait_t_t; _swait(true, true, [1]); end + def test_swait_f_f; _swait(false, false, [1]); end + def test_swait_t_f; _swait(true, false, [1]); end + def test_swait_f_t; _swait(false, true, [1]); end + def test_swait_l_t; _swait(:lazy, true, [1]); end + def test_swait_l_f; _swait(:lazy, false, [1]); end + + def test_swait2_t_t; _swait(true, true, [1,2]); end + def test_swait2_f_f; _swait(false, false, [1,2]); end + def test_swait2_t_f; _swait(true, false, [1,2]); end + def test_swait2_f_t; _swait(false, true, [1,2]); end + def test_swait2_l_t; _swait(:lazy, true, [1,2]); end + def test_swait2_l_f; _swait(:lazy, false, [1,2]); end + + def test_swait3_t_t; _swait(true, true, [1,3]); end + def test_swait3_f_f; _swait(false, false, [1,3]); end + def test_swait3_t_f; _swait(true, false, [1,3]); end + def test_swait3_f_t; _swait(false, true, [1,3]); end + def test_swait3_l_t; _swait(:lazy, true, [1,3]); end + def test_swait3_l_f; _swait(:lazy, false, [1,3]); end + + def test_swait_t_t_ccc; _swait(true, true, [1], true); end + def test_swait_f_f_ccc; _swait(false, false, [1], true); end + def test_swait_t_f_ccc; _swait(true, false, [1], true); end + def test_swait_f_t_ccc; _swait(false, true, [1], true); end + def test_swait_l_t_ccc; _swait(:lazy, true, [1], true); end + def test_swait_l_f_ccc; _swait(:lazy, false, [1], true); end + + def test_swait2_t_t_ccc; _swait(true, true, [1,2], true); end + def test_swait2_f_f_ccc; _swait(false, false, [1,2], true); end + def test_swait2_t_f_ccc; _swait(true, false, [1,2], true); end + def test_swait2_f_t_ccc; _swait(false, true, [1,2], true); end + def test_swait2_l_t_ccc; _swait(:lazy, true, [1,2], true); end + def test_swait2_l_f_ccc; _swait(:lazy, false, [1,2], true); end + + def test_swait3_t_t_ccc; _swait(true, true, [1,3], true); end + def test_swait3_f_f_ccc; _swait(false, false, [1,3], true); end + def test_swait3_t_f_ccc; _swait(true, false, [1,3], true); end + def test_swait3_f_t_ccc; _swait(false, true, [1,3], true); end + def test_swait3_l_t_ccc; _swait(:lazy, true, [1,3], true); end + def test_swait3_l_f_ccc; _swait(:lazy, false, [1,3], true); end + + def test_swait3_t_t_ccc_body; _swait(true, true, [1,3], true, "HI"); end + def test_swait3_f_f_ccc_body; _swait(false, false, [1,3], true, "HI"); end + def test_swait3_t_f_ccc_body; _swait(true, false, [1,3], true, "HI"); end + def test_swait3_f_t_ccc_body; _swait(false, true, [1,3], true, "HI"); end + def test_swait3_l_t_ccc_body; _swait(:lazy, true, [1,3], true, "HI"); end + def test_swait3_l_f_ccc_body; _swait(:lazy, false, [1,3], true, "HI"); end + + def _swait(ibtype, obtype, block_on, ccc = false, body = "") + err = @err + cfg = Yahns::Config.new + host, port = @srv.addr[3], @srv.addr[1] + cfg.instance_eval do + stderr_path err.path + GTL.synchronize { + app(:rack, APP) { + listen "#{host}:#{port}" + output_buffering obtype + input_buffering ibtype + check_client_connection ccc + } + } + end + pid = fork do + ENV["YAHNS_FD"] = @srv.fileno.to_s + $_tw_blocked = 0 + $_tw_block_on = block_on + Yahns::HttpClient.__send__(:include, TrywriteBlocked) + Yahns::Server.new(cfg).start.join + end + c = get_tcp_client(host, port) + if body.size > 0 + r = "PUT / HTTP/1.0\r\nExpect: 100-continue\r\n" + r << "Content-Length: #{body.size}\r\n\r\n#{body}" + else + r = "PUT / HTTP/1.0\r\nExpect: 100-continue\r\n\r\n" + end + c.write(r) + assert c.wait(10), "timed out" + buf = c.read + assert_match(%r{\AHTTP/1\.1 100 Continue\r\n\r\nHTTP/1\.1 201}, buf) + ensure + quit_wait(pid) + end end |