about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <e@80x24.org>2013-10-29 02:36:06 +0000
committerEric Wong <normalperson@yhbt.net>2013-10-29 02:42:26 +0000
commit01cef69c94033ecc2cb22b209f79deab2b0d55d6 (patch)
tree210f762f44bb0eaa6987d40c9600f0278d8f4f8a
parent66d4e85c5d8c5fa2874c6b5c73cf936e51e8537c (diff)
downloadyahns-01cef69c94033ecc2cb22b209f79deab2b0d55d6.tar.gz
We may also return HTTP 408 errors in this case.
-rw-r--r--lib/yahns.rb3
-rw-r--r--lib/yahns/client_expire_portable.rb5
-rw-r--r--lib/yahns/http_client.rb14
-rw-r--r--lib/yahns/stream_input.rb9
-rw-r--r--test/test_input.rb52
5 files changed, 74 insertions, 9 deletions
diff --git a/lib/yahns.rb b/lib/yahns.rb
index 0499640..51a6c4c 100644
--- a/lib/yahns.rb
+++ b/lib/yahns.rb
@@ -48,6 +48,9 @@ module Yahns # :nodoc:
   # for client shutdowns/disconnects.
   class ClientShutdown < EOFError # :nodoc:
   end
+
+  class ClientTimeout < RuntimeError # :nodoc:
+  end
 end
 
 # FIXME: require lazily
diff --git a/lib/yahns/client_expire_portable.rb b/lib/yahns/client_expire_portable.rb
index 2ea7706..daf396c 100644
--- a/lib/yahns/client_expire_portable.rb
+++ b/lib/yahns/client_expire_portable.rb
@@ -17,11 +17,6 @@ module Yahns::ClientExpire # :nodoc:
     0
   end
 
-  def kgio_read(*args)
-    @last_io_at = __timestamp
-    super
-  end
-
   def kgio_write(*args)
     @last_io_at = __timestamp
     super
diff --git a/lib/yahns/http_client.rb b/lib/yahns/http_client.rb
index 738ad61..3af18cf 100644
--- a/lib/yahns/http_client.rb
+++ b/lib/yahns/http_client.rb
@@ -239,6 +239,18 @@ class Yahns::HttpClient < Kgio::Socket # :nodoc:
     super timeout
   end
 
+  # used by StreamInput (and thus TeeInput) for input_buffering {false|:lazy}
+  def yahns_read(bytes, buf)
+    case rv = kgio_tryread(bytes, buf)
+    when String, nil
+      return rv
+    when :wait_readable
+      kgio_wait_readable or raise Yahns::ClientTimeout, "waiting for read", []
+    when :wait_writable
+      kgio_wait_writable or raise Yahns::ClientTimeout, "waiting for write", []
+    end while true
+  end
+
   # if we get any error, try to write something back to the client
   # assuming we haven't closed the socket, but don't get hung up
   # if the socket is already closed or broken.  We'll always return
@@ -247,6 +259,8 @@ class Yahns::HttpClient < Kgio::Socket # :nodoc:
     code = case e
     when EOFError,Errno::ECONNRESET,Errno::EPIPE,Errno::ENOTCONN
       return # don't send response, drop the connection
+    when Yahns::ClientTimeout
+      408
     when Unicorn::RequestURITooLongError
       414
     when Unicorn::RequestEntityTooLargeError
diff --git a/lib/yahns/stream_input.rb b/lib/yahns/stream_input.rb
index b8a3a8f..03f81be 100644
--- a/lib/yahns/stream_input.rb
+++ b/lib/yahns/stream_input.rb
@@ -45,7 +45,7 @@ class Yahns::StreamInput # :nodoc:
         to_read = length - @rbuf.size
         rv.replace(@rbuf.slice!(0, @rbuf.size))
         until to_read == 0 || eof? || (rv.size > 0 && @chunked)
-          @client.kgio_read(to_read, @buf) or eof!
+          @client.yahns_read(to_read, @buf) or eof!
           filter_body(@rbuf, @buf)
           rv << @rbuf
           to_read -= @rbuf.size
@@ -83,7 +83,7 @@ class Yahns::StreamInput # :nodoc:
     begin
       @rbuf.sub!(re, '') and return $1
       return @rbuf.empty? ? nil : @rbuf.slice!(0, @rbuf.size) if eof?
-      @client.kgio_read(rsize, @buf) or eof!
+      @client.yahns_read(rsize, @buf) or eof!
       filter_body(once = '', @buf)
       @rbuf << once
     end while true
@@ -105,8 +105,9 @@ class Yahns::StreamInput # :nodoc:
   def eof?
     if @parser.body_eof?
       rsize = __rsize
+      buf = Thread.current[:yahns_rbuf]
       while @chunked && ! @parser.parse
-        once = @client.kgio_read(rsize) or eof!
+        once = @client.yahns_read(rsize, buf) or eof!
         @buf << once
       end
       @client = nil
@@ -126,7 +127,7 @@ class Yahns::StreamInput # :nodoc:
     dst.replace(@rbuf)
     rsize = __rsize or return
     until eof?
-      @client.kgio_read(rsize, @buf) or eof!
+      @client.yahns_read(rsize, @buf) or eof!
       filter_body(@rbuf, @buf)
       dst << @rbuf
     end
diff --git a/test/test_input.rb b/test/test_input.rb
new file mode 100644
index 0000000..608ff23
--- /dev/null
+++ b/test/test_input.rb
@@ -0,0 +1,52 @@
+# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> and all contributors
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+require_relative 'server_helper'
+require 'digest/md5'
+
+class TestInput < Testcase
+  ENV["N"].to_i > 1 and parallelize_me!
+  include ServerHelper
+  alias setup server_helper_setup
+  alias teardown server_helper_teardown
+
+  MD5 = lambda do |e|
+    input = e["rack.input"]
+    buf = ""
+    md5 = Digest::MD5.new
+    while input.read(16384, buf)
+      md5 << buf
+    end
+    body = md5.hexdigest
+    h = { "Content-Length" => body.size.to_s, "Content-Type" => 'text/plain' }
+    [ 200, h, [body] ]
+  end
+
+  def test_input_timeout_lazybuffer
+    stream_input_timeout(:lazy)
+  end
+
+  def test_input_timeout_nobuffer
+    stream_input_timeout(false)
+  end
+
+  def stream_input_timeout(ibtype)
+    err, cfg, host, port = @err, Yahns::Config.new, @srv.addr[3], @srv.addr[1]
+    cfg.instance_eval do
+      GTL.synchronize do
+        app(:rack, MD5) do
+          listen "#{host}:#{port}"
+          input_buffering ibtype
+          client_timeout 1
+        end
+      end
+      stderr_path err.path
+    end
+    pid = mkserver(cfg)
+    c = get_tcp_client(host, port)
+    c.write "PUT / HTTP/1.1\r\nContent-Length: 666\r\n\r\n"
+    assert_equal c, c.wait(6)
+    Timeout.timeout(30) { assert_match %r{HTTP/1\.1 408 }, c.read }
+  ensure
+    quit_wait(pid)
+  end
+end