about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2010-11-11 01:13:12 +0800
committerEric Wong <normalperson@yhbt.net>2010-11-11 07:18:20 +0800
commit7d44b5384758aeddcb49d7606a9908308df7c698 (patch)
tree28b399240b597ffce659ba10767b2ec143ceac6a
parent1493af7cc23afecc8592ce44f5226476afccd212 (diff)
downloadunicorn-7d44b5384758aeddcb49d7606a9908308df7c698.tar.gz
We will eventually expose a Unicorn::StreamInput object as
"rack.input" for Rack 2.x applications.  StreamInput allows
applications to avoid buffering input to disk, removing the
(potentially expensive) rewindability requirement of Rack 1.x.

TeeInput is also rewritten to build off StreamInput for
simplicity.  The only regression is that TeeInput#rewind forces
us to consume an unconsumed stream before returning, a
negligible price to pay for decreased complexity.
-rw-r--r--lib/unicorn.rb1
-rw-r--r--lib/unicorn/stream_input.rb152
-rw-r--r--lib/unicorn/tee_input.rb147
-rw-r--r--test/unit/test_stream_input.rb143
-rw-r--r--test/unit/test_tee_input.rb30
5 files changed, 333 insertions, 140 deletions
diff --git a/lib/unicorn.rb b/lib/unicorn.rb
index 622dc6c..7891d67 100644
--- a/lib/unicorn.rb
+++ b/lib/unicorn.rb
@@ -73,6 +73,7 @@ class Unicorn::ClientShutdown < EOFError; end
 
 require 'unicorn/const'
 require 'unicorn/socket_helper'
+require 'unicorn/stream_input'
 require 'unicorn/tee_input'
 require 'unicorn/http_request'
 require 'unicorn/configurator'
diff --git a/lib/unicorn/stream_input.rb b/lib/unicorn/stream_input.rb
new file mode 100644
index 0000000..2a05337
--- /dev/null
+++ b/lib/unicorn/stream_input.rb
@@ -0,0 +1,152 @@
+# -*- encoding: binary -*-
+
+# When processing uploads, Unicorn may expose a StreamInput object under
+# "rack.input" of the Rack (2.x) environment.
+class Unicorn::StreamInput
+  # The I/O chunk size (in +bytes+) for I/O operations where
+  # the size cannot be user-specified when a method is called.
+  # The default is 16 kilobytes.
+  @@io_chunk_size = Unicorn::Const::CHUNK_SIZE
+
+  # Initializes a new StreamInput object.  You normally do not have to call
+  # this unless you are writing an HTTP server.
+  def initialize(socket, request)
+    @chunked = request.content_length.nil?
+    @socket = socket
+    @parser = request
+    @buf = request.buf
+    @rbuf = ''
+    @bytes_read = 0
+    filter_body(@rbuf, @buf) unless @buf.empty?
+  end
+
+  # :call-seq:
+  #   ios.read([length [, buffer ]]) => string, buffer, or nil
+  #
+  # Reads at most length bytes from the I/O stream, or to the end of
+  # file if length is omitted or is nil. length must be a non-negative
+  # integer or nil. If the optional buffer argument is present, it
+  # must reference a String, which will receive the data.
+  #
+  # At end of file, it returns nil or '' depend on length.
+  # ios.read() and ios.read(nil) returns ''.
+  # ios.read(length [, buffer]) returns nil.
+  #
+  # If the Content-Length of the HTTP request is known (as is the common
+  # case for POST requests), then ios.read(length [, buffer]) will block
+  # until the specified length is read (or it is the last chunk).
+  # Otherwise, for uncommon "Transfer-Encoding: chunked" requests,
+  # ios.read(length [, buffer]) will return immediately if there is
+  # any data and only block when nothing is available (providing
+  # IO#readpartial semantics).
+  def read(*args)
+    length = args.shift
+    rv = args.shift || ''
+    if length.nil?
+      rv.replace(@rbuf)
+      @rbuf.replace('')
+      @socket or return rv
+      until eof?
+        @socket.kgio_read(@@io_chunk_size, @buf) or eof!
+        filter_body(@rbuf, @buf)
+        rv << @rbuf
+      end
+      @rbuf.replace('')
+    else
+      if length <= @rbuf.size
+        rv.replace(@rbuf.slice(0, length))
+        @rbuf.replace(@rbuf.slice(length, @rbuf.size) || '')
+      else
+        rv.replace(@rbuf)
+        length -= @rbuf.size
+        @rbuf.replace('')
+        until length == 0 || eof? || (rv.size > 0 && @chunked)
+          @socket.kgio_read(length, @buf) or eof!
+          filter_body(@rbuf, @buf)
+          rv << @rbuf
+          length -= @rbuf.size
+          @rbuf.replace('')
+        end
+      end
+      rv = nil if rv.empty?
+    end
+    rv
+  end
+
+  # :call-seq:
+  #   ios.gets   => string or nil
+  #
+  # Reads the next ``line'' from the I/O stream; lines are separated
+  # by the global record separator ($/, typically "\n"). A global
+  # record separator of nil reads the entire unread contents of ios.
+  # Returns nil if called at the end of file.
+  # This takes zero arguments for strict Rack::Lint compatibility,
+  # unlike IO#gets.
+  def gets
+    sep = $/
+    if sep.nil?
+      rv = read
+      return rv.empty? ? nil : rv
+    end
+    re = /\A(.*?#{Regexp.escape(sep)})/
+
+    begin
+      @rbuf.gsub!(re, '') and return $1
+      if eof?
+        if @rbuf.empty?
+          return nil
+        else
+          rv = @rbuf.dup
+          @rbuf.replace('')
+          return rv
+        end
+      end
+      @socket.kgio_read(@@io_chunk_size, @buf) or eof!
+      filter_body(once = '', @buf)
+      @rbuf << once
+    end while true
+  end
+
+  # :call-seq:
+  #   ios.each { |line| block }  => ios
+  #
+  # Executes the block for every ``line'' in *ios*, where lines are
+  # separated by the global record separator ($/, typically "\n").
+  def each(&block)
+    while line = gets
+      yield line
+    end
+
+    self # Rack does not specify what the return value is here
+  end
+
+private
+
+  def eof?
+    if @parser.body_eof?
+      until @parser.parse
+        once = @socket.kgio_read(@@io_chunk_size) or eof!
+        @buf << once
+      end
+      @socket = nil
+      true
+    else
+      false
+    end
+  end
+
+  def filter_body(dst, src)
+    rv = @parser.filter_body(dst, src)
+    @bytes_read += dst.size
+    rv
+  end
+
+  def eof!
+    # in case client only did a premature shutdown(SHUT_WR)
+    # we do support clients that shutdown(SHUT_WR) after the
+    # _entire_ request has been sent, and those will not have
+    # raised EOFError on us.
+    @socket.close if @socket
+    raise Unicorn::ClientShutdown, "bytes_read=#{@bytes_read}", []
+  end
+end
diff --git a/lib/unicorn/tee_input.rb b/lib/unicorn/tee_input.rb
index 351d79f..74d9df6 100644
--- a/lib/unicorn/tee_input.rb
+++ b/lib/unicorn/tee_input.rb
@@ -11,34 +11,18 @@
 #
 # When processing uploads, Unicorn exposes a TeeInput object under
 # "rack.input" of the Rack environment.
-class Unicorn::TeeInput
-  attr_accessor :tmp, :socket, :parser, :env, :buf, :len, :buf2
-
+class Unicorn::TeeInput < Unicorn::StreamInput
   # The maximum size (in +bytes+) to buffer in memory before
   # resorting to a temporary file.  Default is 112 kilobytes.
   @@client_body_buffer_size = Unicorn::Const::MAX_BODY
 
-  # The I/O chunk size (in +bytes+) for I/O operations where
-  # the size cannot be user-specified when a method is called.
-  # The default is 16 kilobytes.
-  @@io_chunk_size = Unicorn::Const::CHUNK_SIZE
-
   # Initializes a new TeeInput object.  You normally do not have to call
   # this unless you are writing an HTTP server.
   def initialize(socket, request)
-    @socket = socket
-    @parser = request
-    @buf = request.buf
-    @env = request.env
     @len = request.content_length
+    super
     @tmp = @len && @len < @@client_body_buffer_size ?
            StringIO.new("") : Unicorn::TmpIO.new
-    @buf2 = ""
-    if @buf.size > 0
-      @parser.filter_body(@buf2, @buf) and finalize_input
-      @tmp.write(@buf2)
-      @tmp.rewind
-    end
   end
 
   # :call-seq:
@@ -59,15 +43,9 @@ class Unicorn::TeeInput
   # specified +length+ in a loop until it returns +nil+.
   def size
     @len and return @len
-
-    if socket
-      pos = @tmp.pos
-      while tee(@@io_chunk_size, @buf2)
-      end
-      @tmp.seek(pos)
-    end
-
-    @len = @tmp.size
+    consume!
+    @tmp.rewind
+    @len = @bytes_read
   end
 
   # :call-seq:
@@ -90,24 +68,7 @@ class Unicorn::TeeInput
   # any data and only block when nothing is available (providing
   # IO#readpartial semantics).
   def read(*args)
-    @socket or return @tmp.read(*args)
-
-    length = args.shift
-    if nil == length
-      rv = @tmp.read || ""
-      while tee(@@io_chunk_size, @buf2)
-        rv << @buf2
-      end
-      rv
-    else
-      rv = args.shift || ""
-      diff = @tmp.size - @tmp.pos
-      if 0 == diff
-        ensure_length(tee(length, rv), length)
-      else
-        ensure_length(@tmp.read(diff > length ? length : diff, rv), length)
-      end
-    end
+    @socket ? tee(super) : @tmp.read(*args)
   end
 
   # :call-seq:
@@ -121,42 +82,9 @@ class Unicorn::TeeInput
   # unlike IO#gets.
   def gets
     @socket or return @tmp.gets
-    sep = $/ or return read
-
-    orig_size = @tmp.size
-    if @tmp.pos == orig_size
-      tee(@@io_chunk_size, @buf2) or return nil
-      @tmp.seek(orig_size)
-    end
-
-    sep_size = Rack::Utils.bytesize(sep)
-    line = @tmp.gets # cannot be nil here since size > pos
-    sep == line[-sep_size, sep_size] and return line
-
-    # unlikely, if we got here, then @tmp is at EOF
-    begin
-      orig_size = @tmp.pos
-      tee(@@io_chunk_size, @buf2) or break
-      @tmp.seek(orig_size)
-      line << @tmp.gets
-      sep == line[-sep_size, sep_size] and return line
-      # @tmp is at EOF again here, retry the loop
-    end while true
-
-    line
-  end
-
-  # :call-seq:
-  #   ios.each { |line| block }  => ios
-  #
-  # Executes the block for every ``line'' in *ios*, where lines are
-  # separated by the global record separator ($/, typically "\n").
-  def each(&block)
-    while line = gets
-      yield line
-    end
-
-    self # Rack does not specify what the return value is here
+    rv = super
+    # the $/.nil? case is implemented using read, so don't tee() again
+    $/.nil? ? rv : tee(rv)
   end
 
   # :call-seq:
@@ -166,59 +94,24 @@ class Unicorn::TeeInput
   # the offset (zero) of the +ios+ pointer.  Subsequent reads will
   # start from the beginning of the previously-buffered input.
   def rewind
+    return 0 if @bytes_read == 0
+    consume! if @socket
     @tmp.rewind # Rack does not specify what the return value is here
   end
 
 private
 
-  # tees off a +length+ chunk of data from the input into the IO
-  # backing store as well as returning it.  +dst+ must be specified.
-  # returns nil if reading from the input returns nil
-  def tee(length, dst)
-    unless @parser.body_eof?
-      r = @socket.kgio_read(length, @buf) or eof!
-      unless @parser.filter_body(dst, @buf)
-        @tmp.write(dst)
-        @tmp.seek(0, IO::SEEK_END) # workaround FreeBSD/OSX + MRI 1.8.x bug
-        return dst
-      end
-    end
-    finalize_input
+  # consumes the stream of the socket
+  def consume!
+    junk = ""
+    nil while read(@@io_chunk_size, junk)
   end
 
-  def finalize_input
-    until @parser.parse
-      r = @socket.kgio_read(@@io_chunk_size) or eof!
-      @buf << r
+  def tee(buffer)
+    if buffer && (n = buffer.size) > 0
+      @tmp.write(buffer)
+      @tmp.seek(0, IO::SEEK_END) # workaround FreeBSD/OSX + MRI 1.8.x bug
     end
-    @socket = nil
-  end
-
-  # tee()s into +dst+ until it is of +length+ bytes (or until
-  # we've reached the Content-Length of the request body).
-  # Returns +dst+ (the exact object, not a duplicate)
-  # To continue supporting applications that need near-real-time
-  # streaming input bodies, this is a no-op for
-  # "Transfer-Encoding: chunked" requests.
-  def ensure_length(dst, length)
-    # len is nil for chunked bodies, so we can't ensure length for those
-    # since they could be streaming bidirectionally and we don't want to
-    # block the caller in that case.
-    return dst if dst.nil? || @len.nil?
-
-    while dst.size < length && tee(length - dst.size, @buf2)
-      dst << @buf2
-    end
-
-    dst
-  end
-
-  def eof!
-    # in case client only did a premature shutdown(SHUT_WR)
-    # we do support clients that shutdown(SHUT_WR) after the
-    # _entire_ request has been sent, and those will not have
-    # raised EOFError on us.
-    @socket.close if @socket
-    raise Unicorn::ClientShutdown, "bytes_read=#{@tmp.size}", []
+    buffer
   end
 end
diff --git a/test/unit/test_stream_input.rb b/test/unit/test_stream_input.rb
new file mode 100644
index 0000000..20e048d
--- /dev/null
+++ b/test/unit/test_stream_input.rb
@@ -0,0 +1,143 @@
+# -*- encoding: binary -*-
+
+require 'test/unit'
+require 'digest/sha1'
+require 'unicorn'
+
+class TestStreamInput < Test::Unit::TestCase
+  def setup
+    @rs = $/
+    @env = {}
+    @rd, @wr = Kgio::UNIXSocket.pair
+    @rd.sync = @wr.sync = true
+    @start_pid = $$
+  end
+
+  def teardown
+    return if $$ != @start_pid
+    $/ = @rs
+    @rd.close rescue nil
+    @wr.close rescue nil
+    Process.waitall
+  end
+
+  def test_read_small
+    r = init_request('hello')
+    si = Unicorn::StreamInput.new(@rd, r)
+    assert_equal 'hello', si.read
+    assert_equal '', si.read
+    assert_nil si.read(5)
+    assert_nil si.gets
+  end
+
+  def test_gets_oneliner
+    r = init_request('hello')
+    si = Unicorn::StreamInput.new(@rd, r)
+    assert_equal 'hello', si.gets
+    assert_nil si.gets
+  end
+
+  def test_gets_multiline
+    r = init_request("a\nb\n\n")
+    si = Unicorn::StreamInput.new(@rd, r)
+    assert_equal "a\n", si.gets
+    assert_equal "b\n", si.gets
+    assert_equal "\n", si.gets
+    assert_nil si.gets
+  end
+
+  def test_gets_empty_rs
+    $/ = nil
+    r = init_request("a\nb\n\n")
+    si = Unicorn::StreamInput.new(@rd, r)
+    assert_equal "a\nb\n\n", si.gets
+    assert_nil si.gets
+  end
+
+  def test_read_with_equal_len
+    r = init_request("abcde")
+    si = Unicorn::StreamInput.new(@rd, r)
+    assert_equal "abcde", si.read(5)
+    assert_nil si.read(5)
+  end
+
+  def test_big_body_multi
+    r = init_request('.', Unicorn::Const::MAX_BODY + 1)
+    si = Unicorn::StreamInput.new(@rd, r)
+    assert_equal Unicorn::Const::MAX_BODY, @parser.content_length
+    assert ! @parser.body_eof?
+    nr = Unicorn::Const::MAX_BODY / 4
+    pid = fork {
+      @rd.close
+      nr.times { @wr.write('....') }
+      @wr.close
+    }
+    @wr.close
+    assert_equal '.', si.read(1)
+    nr.times { |x|
+      assert_equal '....', si.read(4), "nr=#{x}"
+    }
+    assert_nil si.read(1)
+    status = nil
+    assert_nothing_raised { pid, status = Process.waitpid2(pid) }
+    assert status.success?
+  end
+
+  def test_gets_long
+    r = init_request("hello", 5 + (4096 * 4 * 3) + "#$/foo#$/".size)
+    si = Unicorn::StreamInput.new(@rd, r)
+    status = line = nil
+    pid = fork {
+      @rd.close
+      3.times { @wr.write("ffff" * 4096) }
+      @wr.write "#$/foo#$/"
+      @wr.close
+    }
+    @wr.close
+    assert_nothing_raised { line = si.gets }
+    assert_equal(4096 * 4 * 3 + 5 + $/.size, line.size)
+    assert_equal("hello" << ("ffff" * 4096 * 3) << "#$/", line)
+    assert_nothing_raised { line = si.gets }
+    assert_equal "foo#$/", line
+    assert_nil si.gets
+    assert_nothing_raised { pid, status = Process.waitpid2(pid) }
+    assert status.success?
+  end
+
+  def test_read_with_buffer
+    r = init_request('hello')
+    si = Unicorn::StreamInput.new(@rd, r)
+    buf = ''
+    rv = si.read(4, buf)
+    assert_equal 'hell', rv
+    assert_equal 'hell', buf
+    assert_equal rv.object_id, buf.object_id
+    assert_equal 'o', si.read
+    assert_equal nil, si.read(5, buf)
+  end
+
+  def test_read_with_buffer_clobbers
+    r = init_request('hello')
+    si = Unicorn::StreamInput.new(@rd, r)
+    buf = 'foo'
+    assert_equal 'hello', si.read(nil, buf)
+    assert_equal 'hello', buf
+    assert_equal '', si.read(nil, buf)
+    assert_equal '', buf
+    buf = 'asdf'
+    assert_nil si.read(5, buf)
+    assert_equal '', buf
+  end
+
+  def init_request(body, size = nil)
+    @parser = Unicorn::HttpParser.new
+    body = body.to_s.freeze
+    @buf = "POST / HTTP/1.1\r\n" \
+           "Host: localhost\r\n" \
+           "Content-Length: #{size || body.size}\r\n" \
+           "\r\n#{body}"
+    assert_equal @env, @parser.headers(@env, @buf)
+    assert_equal body, @buf
+    @parser
+  end
+end
diff --git a/test/unit/test_tee_input.rb b/test/unit/test_tee_input.rb
index a10ca34..b44f609 100644
--- a/test/unit/test_tee_input.rb
+++ b/test/unit/test_tee_input.rb
@@ -4,6 +4,10 @@ require 'test/unit'
 require 'digest/sha1'
 require 'unicorn'
 
+class TeeInput < Unicorn::TeeInput
+  attr_accessor :tmp, :len
+end
+
 class TestTeeInput < Test::Unit::TestCase
 
   def setup
@@ -28,7 +32,7 @@ class TestTeeInput < Test::Unit::TestCase
 
   def test_gets_long
     r = init_request("hello", 5 + (4096 * 4 * 3) + "#$/foo#$/".size)
-    ti = Unicorn::TeeInput.new(@rd, r)
+    ti = TeeInput.new(@rd, r)
     status = line = nil
     pid = fork {
       @rd.close
@@ -49,7 +53,7 @@ class TestTeeInput < Test::Unit::TestCase
 
   def test_gets_short
     r = init_request("hello", 5 + "#$/foo".size)
-    ti = Unicorn::TeeInput.new(@rd, r)
+    ti = TeeInput.new(@rd, r)
     status = line = nil
     pid = fork {
       @rd.close
@@ -68,7 +72,7 @@ class TestTeeInput < Test::Unit::TestCase
 
   def test_small_body
     r = init_request('hello')
-    ti = Unicorn::TeeInput.new(@rd, r)
+    ti = TeeInput.new(@rd, r)
     assert_equal 0, @parser.content_length
     assert @parser.body_eof?
     assert_equal StringIO, ti.tmp.class
@@ -77,11 +81,12 @@ class TestTeeInput < Test::Unit::TestCase
     assert_equal 'hello', ti.read
     assert_equal '', ti.read
     assert_nil ti.read(4096)
+    assert_equal 5, ti.size
   end
 
   def test_read_with_buffer
     r = init_request('hello')
-    ti = Unicorn::TeeInput.new(@rd, r)
+    ti = TeeInput.new(@rd, r)
     buf = ''
     rv = ti.read(4, buf)
     assert_equal 'hell', rv
@@ -96,7 +101,7 @@ class TestTeeInput < Test::Unit::TestCase
 
   def test_big_body
     r = init_request('.' * Unicorn::Const::MAX_BODY << 'a')
-    ti = Unicorn::TeeInput.new(@rd, r)
+    ti = TeeInput.new(@rd, r)
     assert_equal 0, @parser.content_length
     assert @parser.body_eof?
     assert_kind_of File, ti.tmp
@@ -108,7 +113,7 @@ class TestTeeInput < Test::Unit::TestCase
     a, b = 300, 3
     r = init_request('.' * b, 300)
     assert_equal 300, @parser.content_length
-    ti = Unicorn::TeeInput.new(@rd, r)
+    ti = TeeInput.new(@rd, r)
     pid = fork {
       @wr.write('.' * 197)
       sleep 1 # still a *potential* race here that would make the test moot...
@@ -122,12 +127,11 @@ class TestTeeInput < Test::Unit::TestCase
 
   def test_big_body_multi
     r = init_request('.', Unicorn::Const::MAX_BODY + 1)
-    ti = Unicorn::TeeInput.new(@rd, r)
+    ti = TeeInput.new(@rd, r)
     assert_equal Unicorn::Const::MAX_BODY, @parser.content_length
     assert ! @parser.body_eof?
     assert_kind_of File, ti.tmp
     assert_equal 0, ti.tmp.pos
-    assert_equal 1, ti.tmp.size
     assert_equal Unicorn::Const::MAX_BODY + 1, ti.size
     nr = Unicorn::Const::MAX_BODY / 4
     pid = fork {
@@ -138,8 +142,8 @@ class TestTeeInput < Test::Unit::TestCase
     @wr.close
     assert_equal '.', ti.read(1)
     assert_equal Unicorn::Const::MAX_BODY + 1, ti.size
-    nr.times {
-      assert_equal '....', ti.read(4)
+    nr.times { |x|
+      assert_equal '....', ti.read(4), "nr=#{x}"
       assert_equal Unicorn::Const::MAX_BODY + 1, ti.size
     }
     assert_nil ti.read(1)
@@ -163,7 +167,7 @@ class TestTeeInput < Test::Unit::TestCase
       @wr.write("0\r\n\r\n")
     }
     @wr.close
-    ti = Unicorn::TeeInput.new(@rd, @parser)
+    ti = TeeInput.new(@rd, @parser)
     assert_nil @parser.content_length
     assert_nil ti.len
     assert ! @parser.body_eof?
@@ -201,7 +205,7 @@ class TestTeeInput < Test::Unit::TestCase
       end
       @wr.write("0\r\n\r\n")
     }
-    ti = Unicorn::TeeInput.new(@rd, @parser)
+    ti = TeeInput.new(@rd, @parser)
     assert_nil @parser.content_length
     assert_nil ti.len
     assert ! @parser.body_eof?
@@ -230,7 +234,7 @@ class TestTeeInput < Test::Unit::TestCase
       @wr.write("Hello: World\r\n\r\n")
     }
     @wr.close
-    ti = Unicorn::TeeInput.new(@rd, @parser)
+    ti = TeeInput.new(@rd, @parser)
     assert_nil @parser.content_length
     assert_nil ti.len
     assert ! @parser.body_eof?