From 7d44b5384758aeddcb49d7606a9908308df7c698 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Thu, 11 Nov 2010 01:13:12 +0800 Subject: add stream_input class and build tee_input on it We will eventually expose a Unicorn::StreamInput object as "rack.input" for Rack 2.x applications. StreamInput allows applications to avoid buffering input to disk, removing the (potentially expensive) rewindability requirement of Rack 1.x. TeeInput is also rewritten to build off StreamInput for simplicity. The only regression is that TeeInput#rewind forces us to consume an unconsumed stream before returning, a negligible price to pay for decreased complexity. --- lib/unicorn.rb | 1 + lib/unicorn/stream_input.rb | 152 +++++++++++++++++++++++++++++++++++++++++ lib/unicorn/tee_input.rb | 147 ++++++--------------------------------- test/unit/test_stream_input.rb | 143 ++++++++++++++++++++++++++++++++++++++ test/unit/test_tee_input.rb | 30 ++++---- 5 files changed, 333 insertions(+), 140 deletions(-) create mode 100644 lib/unicorn/stream_input.rb create mode 100644 test/unit/test_stream_input.rb diff --git a/lib/unicorn.rb b/lib/unicorn.rb index 622dc6c..7891d67 100644 --- a/lib/unicorn.rb +++ b/lib/unicorn.rb @@ -73,6 +73,7 @@ class Unicorn::ClientShutdown < EOFError; end require 'unicorn/const' require 'unicorn/socket_helper' +require 'unicorn/stream_input' require 'unicorn/tee_input' require 'unicorn/http_request' require 'unicorn/configurator' diff --git a/lib/unicorn/stream_input.rb b/lib/unicorn/stream_input.rb new file mode 100644 index 0000000..2a05337 --- /dev/null +++ b/lib/unicorn/stream_input.rb @@ -0,0 +1,152 @@ +# -*- encoding: binary -*- + +# When processing uploads, Unicorn may expose a StreamInput object under +# "rack.input" of the Rack (2.x) environment. +class Unicorn::StreamInput + # The I/O chunk size (in +bytes+) for I/O operations where + # the size cannot be user-specified when a method is called. + # The default is 16 kilobytes. + @@io_chunk_size = Unicorn::Const::CHUNK_SIZE + + # Initializes a new StreamInput object. You normally do not have to call + # this unless you are writing an HTTP server. + def initialize(socket, request) + @chunked = request.content_length.nil? + @socket = socket + @parser = request + @buf = request.buf + @rbuf = '' + @bytes_read = 0 + filter_body(@rbuf, @buf) unless @buf.empty? + end + + # :call-seq: + # ios.read([length [, buffer ]]) => string, buffer, or nil + # + # Reads at most length bytes from the I/O stream, or to the end of + # file if length is omitted or is nil. length must be a non-negative + # integer or nil. If the optional buffer argument is present, it + # must reference a String, which will receive the data. + # + # At end of file, it returns nil or '' depend on length. + # ios.read() and ios.read(nil) returns ''. + # ios.read(length [, buffer]) returns nil. + # + # If the Content-Length of the HTTP request is known (as is the common + # case for POST requests), then ios.read(length [, buffer]) will block + # until the specified length is read (or it is the last chunk). + # Otherwise, for uncommon "Transfer-Encoding: chunked" requests, + # ios.read(length [, buffer]) will return immediately if there is + # any data and only block when nothing is available (providing + # IO#readpartial semantics). + def read(*args) + length = args.shift + rv = args.shift || '' + if length.nil? + rv.replace(@rbuf) + @rbuf.replace('') + @socket or return rv + until eof? + @socket.kgio_read(@@io_chunk_size, @buf) or eof! + filter_body(@rbuf, @buf) + rv << @rbuf + end + @rbuf.replace('') + else + if length <= @rbuf.size + rv.replace(@rbuf.slice(0, length)) + @rbuf.replace(@rbuf.slice(length, @rbuf.size) || '') + else + rv.replace(@rbuf) + length -= @rbuf.size + @rbuf.replace('') + until length == 0 || eof? || (rv.size > 0 && @chunked) + @socket.kgio_read(length, @buf) or eof! + filter_body(@rbuf, @buf) + rv << @rbuf + length -= @rbuf.size + @rbuf.replace('') + end + end + rv = nil if rv.empty? + end + rv + end + + # :call-seq: + # ios.gets => string or nil + # + # Reads the next ``line'' from the I/O stream; lines are separated + # by the global record separator ($/, typically "\n"). A global + # record separator of nil reads the entire unread contents of ios. + # Returns nil if called at the end of file. + # This takes zero arguments for strict Rack::Lint compatibility, + # unlike IO#gets. + def gets + sep = $/ + if sep.nil? + rv = read + return rv.empty? ? nil : rv + end + re = /\A(.*?#{Regexp.escape(sep)})/ + + begin + @rbuf.gsub!(re, '') and return $1 + if eof? + if @rbuf.empty? + return nil + else + rv = @rbuf.dup + @rbuf.replace('') + return rv + end + end + @socket.kgio_read(@@io_chunk_size, @buf) or eof! + filter_body(once = '', @buf) + @rbuf << once + end while true + end + + # :call-seq: + # ios.each { |line| block } => ios + # + # Executes the block for every ``line'' in *ios*, where lines are + # separated by the global record separator ($/, typically "\n"). + def each(&block) + while line = gets + yield line + end + + self # Rack does not specify what the return value is here + end + +private + + def eof? + if @parser.body_eof? + until @parser.parse + once = @socket.kgio_read(@@io_chunk_size) or eof! + @buf << once + end + @socket = nil + true + else + false + end + end + + def filter_body(dst, src) + rv = @parser.filter_body(dst, src) + @bytes_read += dst.size + rv + end + + def eof! + # in case client only did a premature shutdown(SHUT_WR) + # we do support clients that shutdown(SHUT_WR) after the + # _entire_ request has been sent, and those will not have + # raised EOFError on us. + @socket.close if @socket + raise Unicorn::ClientShutdown, "bytes_read=#{@bytes_read}", [] + end +end diff --git a/lib/unicorn/tee_input.rb b/lib/unicorn/tee_input.rb index 351d79f..74d9df6 100644 --- a/lib/unicorn/tee_input.rb +++ b/lib/unicorn/tee_input.rb @@ -11,34 +11,18 @@ # # When processing uploads, Unicorn exposes a TeeInput object under # "rack.input" of the Rack environment. -class Unicorn::TeeInput - attr_accessor :tmp, :socket, :parser, :env, :buf, :len, :buf2 - +class Unicorn::TeeInput < Unicorn::StreamInput # The maximum size (in +bytes+) to buffer in memory before # resorting to a temporary file. Default is 112 kilobytes. @@client_body_buffer_size = Unicorn::Const::MAX_BODY - # The I/O chunk size (in +bytes+) for I/O operations where - # the size cannot be user-specified when a method is called. - # The default is 16 kilobytes. - @@io_chunk_size = Unicorn::Const::CHUNK_SIZE - # Initializes a new TeeInput object. You normally do not have to call # this unless you are writing an HTTP server. def initialize(socket, request) - @socket = socket - @parser = request - @buf = request.buf - @env = request.env @len = request.content_length + super @tmp = @len && @len < @@client_body_buffer_size ? StringIO.new("") : Unicorn::TmpIO.new - @buf2 = "" - if @buf.size > 0 - @parser.filter_body(@buf2, @buf) and finalize_input - @tmp.write(@buf2) - @tmp.rewind - end end # :call-seq: @@ -59,15 +43,9 @@ class Unicorn::TeeInput # specified +length+ in a loop until it returns +nil+. def size @len and return @len - - if socket - pos = @tmp.pos - while tee(@@io_chunk_size, @buf2) - end - @tmp.seek(pos) - end - - @len = @tmp.size + consume! + @tmp.rewind + @len = @bytes_read end # :call-seq: @@ -90,24 +68,7 @@ class Unicorn::TeeInput # any data and only block when nothing is available (providing # IO#readpartial semantics). def read(*args) - @socket or return @tmp.read(*args) - - length = args.shift - if nil == length - rv = @tmp.read || "" - while tee(@@io_chunk_size, @buf2) - rv << @buf2 - end - rv - else - rv = args.shift || "" - diff = @tmp.size - @tmp.pos - if 0 == diff - ensure_length(tee(length, rv), length) - else - ensure_length(@tmp.read(diff > length ? length : diff, rv), length) - end - end + @socket ? tee(super) : @tmp.read(*args) end # :call-seq: @@ -121,42 +82,9 @@ class Unicorn::TeeInput # unlike IO#gets. def gets @socket or return @tmp.gets - sep = $/ or return read - - orig_size = @tmp.size - if @tmp.pos == orig_size - tee(@@io_chunk_size, @buf2) or return nil - @tmp.seek(orig_size) - end - - sep_size = Rack::Utils.bytesize(sep) - line = @tmp.gets # cannot be nil here since size > pos - sep == line[-sep_size, sep_size] and return line - - # unlikely, if we got here, then @tmp is at EOF - begin - orig_size = @tmp.pos - tee(@@io_chunk_size, @buf2) or break - @tmp.seek(orig_size) - line << @tmp.gets - sep == line[-sep_size, sep_size] and return line - # @tmp is at EOF again here, retry the loop - end while true - - line - end - - # :call-seq: - # ios.each { |line| block } => ios - # - # Executes the block for every ``line'' in *ios*, where lines are - # separated by the global record separator ($/, typically "\n"). - def each(&block) - while line = gets - yield line - end - - self # Rack does not specify what the return value is here + rv = super + # the $/.nil? case is implemented using read, so don't tee() again + $/.nil? ? rv : tee(rv) end # :call-seq: @@ -166,59 +94,24 @@ class Unicorn::TeeInput # the offset (zero) of the +ios+ pointer. Subsequent reads will # start from the beginning of the previously-buffered input. def rewind + return 0 if @bytes_read == 0 + consume! if @socket @tmp.rewind # Rack does not specify what the return value is here end private - # tees off a +length+ chunk of data from the input into the IO - # backing store as well as returning it. +dst+ must be specified. - # returns nil if reading from the input returns nil - def tee(length, dst) - unless @parser.body_eof? - r = @socket.kgio_read(length, @buf) or eof! - unless @parser.filter_body(dst, @buf) - @tmp.write(dst) - @tmp.seek(0, IO::SEEK_END) # workaround FreeBSD/OSX + MRI 1.8.x bug - return dst - end - end - finalize_input + # consumes the stream of the socket + def consume! + junk = "" + nil while read(@@io_chunk_size, junk) end - def finalize_input - until @parser.parse - r = @socket.kgio_read(@@io_chunk_size) or eof! - @buf << r + def tee(buffer) + if buffer && (n = buffer.size) > 0 + @tmp.write(buffer) + @tmp.seek(0, IO::SEEK_END) # workaround FreeBSD/OSX + MRI 1.8.x bug end - @socket = nil - end - - # tee()s into +dst+ until it is of +length+ bytes (or until - # we've reached the Content-Length of the request body). - # Returns +dst+ (the exact object, not a duplicate) - # To continue supporting applications that need near-real-time - # streaming input bodies, this is a no-op for - # "Transfer-Encoding: chunked" requests. - def ensure_length(dst, length) - # len is nil for chunked bodies, so we can't ensure length for those - # since they could be streaming bidirectionally and we don't want to - # block the caller in that case. - return dst if dst.nil? || @len.nil? - - while dst.size < length && tee(length - dst.size, @buf2) - dst << @buf2 - end - - dst - end - - def eof! - # in case client only did a premature shutdown(SHUT_WR) - # we do support clients that shutdown(SHUT_WR) after the - # _entire_ request has been sent, and those will not have - # raised EOFError on us. - @socket.close if @socket - raise Unicorn::ClientShutdown, "bytes_read=#{@tmp.size}", [] + buffer end end diff --git a/test/unit/test_stream_input.rb b/test/unit/test_stream_input.rb new file mode 100644 index 0000000..20e048d --- /dev/null +++ b/test/unit/test_stream_input.rb @@ -0,0 +1,143 @@ +# -*- encoding: binary -*- + +require 'test/unit' +require 'digest/sha1' +require 'unicorn' + +class TestStreamInput < Test::Unit::TestCase + def setup + @rs = $/ + @env = {} + @rd, @wr = Kgio::UNIXSocket.pair + @rd.sync = @wr.sync = true + @start_pid = $$ + end + + def teardown + return if $$ != @start_pid + $/ = @rs + @rd.close rescue nil + @wr.close rescue nil + Process.waitall + end + + def test_read_small + r = init_request('hello') + si = Unicorn::StreamInput.new(@rd, r) + assert_equal 'hello', si.read + assert_equal '', si.read + assert_nil si.read(5) + assert_nil si.gets + end + + def test_gets_oneliner + r = init_request('hello') + si = Unicorn::StreamInput.new(@rd, r) + assert_equal 'hello', si.gets + assert_nil si.gets + end + + def test_gets_multiline + r = init_request("a\nb\n\n") + si = Unicorn::StreamInput.new(@rd, r) + assert_equal "a\n", si.gets + assert_equal "b\n", si.gets + assert_equal "\n", si.gets + assert_nil si.gets + end + + def test_gets_empty_rs + $/ = nil + r = init_request("a\nb\n\n") + si = Unicorn::StreamInput.new(@rd, r) + assert_equal "a\nb\n\n", si.gets + assert_nil si.gets + end + + def test_read_with_equal_len + r = init_request("abcde") + si = Unicorn::StreamInput.new(@rd, r) + assert_equal "abcde", si.read(5) + assert_nil si.read(5) + end + + def test_big_body_multi + r = init_request('.', Unicorn::Const::MAX_BODY + 1) + si = Unicorn::StreamInput.new(@rd, r) + assert_equal Unicorn::Const::MAX_BODY, @parser.content_length + assert ! @parser.body_eof? + nr = Unicorn::Const::MAX_BODY / 4 + pid = fork { + @rd.close + nr.times { @wr.write('....') } + @wr.close + } + @wr.close + assert_equal '.', si.read(1) + nr.times { |x| + assert_equal '....', si.read(4), "nr=#{x}" + } + assert_nil si.read(1) + status = nil + assert_nothing_raised { pid, status = Process.waitpid2(pid) } + assert status.success? + end + + def test_gets_long + r = init_request("hello", 5 + (4096 * 4 * 3) + "#$/foo#$/".size) + si = Unicorn::StreamInput.new(@rd, r) + status = line = nil + pid = fork { + @rd.close + 3.times { @wr.write("ffff" * 4096) } + @wr.write "#$/foo#$/" + @wr.close + } + @wr.close + assert_nothing_raised { line = si.gets } + assert_equal(4096 * 4 * 3 + 5 + $/.size, line.size) + assert_equal("hello" << ("ffff" * 4096 * 3) << "#$/", line) + assert_nothing_raised { line = si.gets } + assert_equal "foo#$/", line + assert_nil si.gets + assert_nothing_raised { pid, status = Process.waitpid2(pid) } + assert status.success? + end + + def test_read_with_buffer + r = init_request('hello') + si = Unicorn::StreamInput.new(@rd, r) + buf = '' + rv = si.read(4, buf) + assert_equal 'hell', rv + assert_equal 'hell', buf + assert_equal rv.object_id, buf.object_id + assert_equal 'o', si.read + assert_equal nil, si.read(5, buf) + end + + def test_read_with_buffer_clobbers + r = init_request('hello') + si = Unicorn::StreamInput.new(@rd, r) + buf = 'foo' + assert_equal 'hello', si.read(nil, buf) + assert_equal 'hello', buf + assert_equal '', si.read(nil, buf) + assert_equal '', buf + buf = 'asdf' + assert_nil si.read(5, buf) + assert_equal '', buf + end + + def init_request(body, size = nil) + @parser = Unicorn::HttpParser.new + body = body.to_s.freeze + @buf = "POST / HTTP/1.1\r\n" \ + "Host: localhost\r\n" \ + "Content-Length: #{size || body.size}\r\n" \ + "\r\n#{body}" + assert_equal @env, @parser.headers(@env, @buf) + assert_equal body, @buf + @parser + end +end diff --git a/test/unit/test_tee_input.rb b/test/unit/test_tee_input.rb index a10ca34..b44f609 100644 --- a/test/unit/test_tee_input.rb +++ b/test/unit/test_tee_input.rb @@ -4,6 +4,10 @@ require 'test/unit' require 'digest/sha1' require 'unicorn' +class TeeInput < Unicorn::TeeInput + attr_accessor :tmp, :len +end + class TestTeeInput < Test::Unit::TestCase def setup @@ -28,7 +32,7 @@ class TestTeeInput < Test::Unit::TestCase def test_gets_long r = init_request("hello", 5 + (4096 * 4 * 3) + "#$/foo#$/".size) - ti = Unicorn::TeeInput.new(@rd, r) + ti = TeeInput.new(@rd, r) status = line = nil pid = fork { @rd.close @@ -49,7 +53,7 @@ class TestTeeInput < Test::Unit::TestCase def test_gets_short r = init_request("hello", 5 + "#$/foo".size) - ti = Unicorn::TeeInput.new(@rd, r) + ti = TeeInput.new(@rd, r) status = line = nil pid = fork { @rd.close @@ -68,7 +72,7 @@ class TestTeeInput < Test::Unit::TestCase def test_small_body r = init_request('hello') - ti = Unicorn::TeeInput.new(@rd, r) + ti = TeeInput.new(@rd, r) assert_equal 0, @parser.content_length assert @parser.body_eof? assert_equal StringIO, ti.tmp.class @@ -77,11 +81,12 @@ class TestTeeInput < Test::Unit::TestCase assert_equal 'hello', ti.read assert_equal '', ti.read assert_nil ti.read(4096) + assert_equal 5, ti.size end def test_read_with_buffer r = init_request('hello') - ti = Unicorn::TeeInput.new(@rd, r) + ti = TeeInput.new(@rd, r) buf = '' rv = ti.read(4, buf) assert_equal 'hell', rv @@ -96,7 +101,7 @@ class TestTeeInput < Test::Unit::TestCase def test_big_body r = init_request('.' * Unicorn::Const::MAX_BODY << 'a') - ti = Unicorn::TeeInput.new(@rd, r) + ti = TeeInput.new(@rd, r) assert_equal 0, @parser.content_length assert @parser.body_eof? assert_kind_of File, ti.tmp @@ -108,7 +113,7 @@ class TestTeeInput < Test::Unit::TestCase a, b = 300, 3 r = init_request('.' * b, 300) assert_equal 300, @parser.content_length - ti = Unicorn::TeeInput.new(@rd, r) + ti = TeeInput.new(@rd, r) pid = fork { @wr.write('.' * 197) sleep 1 # still a *potential* race here that would make the test moot... @@ -122,12 +127,11 @@ class TestTeeInput < Test::Unit::TestCase def test_big_body_multi r = init_request('.', Unicorn::Const::MAX_BODY + 1) - ti = Unicorn::TeeInput.new(@rd, r) + ti = TeeInput.new(@rd, r) assert_equal Unicorn::Const::MAX_BODY, @parser.content_length assert ! @parser.body_eof? assert_kind_of File, ti.tmp assert_equal 0, ti.tmp.pos - assert_equal 1, ti.tmp.size assert_equal Unicorn::Const::MAX_BODY + 1, ti.size nr = Unicorn::Const::MAX_BODY / 4 pid = fork { @@ -138,8 +142,8 @@ class TestTeeInput < Test::Unit::TestCase @wr.close assert_equal '.', ti.read(1) assert_equal Unicorn::Const::MAX_BODY + 1, ti.size - nr.times { - assert_equal '....', ti.read(4) + nr.times { |x| + assert_equal '....', ti.read(4), "nr=#{x}" assert_equal Unicorn::Const::MAX_BODY + 1, ti.size } assert_nil ti.read(1) @@ -163,7 +167,7 @@ class TestTeeInput < Test::Unit::TestCase @wr.write("0\r\n\r\n") } @wr.close - ti = Unicorn::TeeInput.new(@rd, @parser) + ti = TeeInput.new(@rd, @parser) assert_nil @parser.content_length assert_nil ti.len assert ! @parser.body_eof? @@ -201,7 +205,7 @@ class TestTeeInput < Test::Unit::TestCase end @wr.write("0\r\n\r\n") } - ti = Unicorn::TeeInput.new(@rd, @parser) + ti = TeeInput.new(@rd, @parser) assert_nil @parser.content_length assert_nil ti.len assert ! @parser.body_eof? @@ -230,7 +234,7 @@ class TestTeeInput < Test::Unit::TestCase @wr.write("Hello: World\r\n\r\n") } @wr.close - ti = Unicorn::TeeInput.new(@rd, @parser) + ti = TeeInput.new(@rd, @parser) assert_nil @parser.content_length assert_nil ti.len assert ! @parser.body_eof? -- cgit v1.2.3-24-ge0c7