From 6945342a1f0a4caaa918f2b0b1efef88824439e0 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Fri, 5 Jun 2009 18:03:46 -0700 Subject: Transfer-Encoding: chunked streaming input support This adds support for handling POST/PUT request bodies sent with chunked transfer encodings ("Transfer-Encoding: chunked"). Attention has been paid to ensure that a client cannot OOM us by sending an extremely large chunk. This implementation is pure Ruby as the Ragel-based implementation in rfuzz didn't offer a streaming interface. It should be reasonably close to RFC-compliant but please test it in an attempt to break it. The more interesting part is the ability to stream data to the hosted Rack application as it is being transferred to the server. This can be done regardless if the input is chunked or not, enabling the streaming of POST/PUT bodies can allow the hosted Rack application to process input as it receives it. See examples/echo.ru for an example echo server over HTTP. Enabling streaming also allows Rack applications to support upload progress monitoring previously supported by Mongrel handlers. Since Rack specifies that the input needs to be rewindable, this input is written to a temporary file (a la tee(1)) as it is streamed to the application the first time. Subsequent rewinded reads will read from the temporary file instead of the socket. Streaming input to the application is disabled by default since applications may not necessarily read the entire input body before returning. Since this is a completely new feature we've never seen in any Ruby HTTP application server before, we're taking the safe route by leaving it disabled by default. Enabling this can only be done globally by changing the Unicorn HttpRequest::DEFAULTS hash: Unicorn::HttpRequest::DEFAULTS["unicorn.stream_input"] = true Similarly, a Rack application can check if streaming input is enabled by checking the value of the "unicorn.stream_input" key in the environment hashed passed to it. All of this code has only been lightly tested and test coverage is lacking at the moment. [1] - http://tools.ietf.org/html/rfc2616#section-3.6.1 --- test/test_helper.rb | 26 +++++++ test/unit/test_chunked_reader.rb | 145 +++++++++++++++++++++++++++++++++++++++ test/unit/test_request.rb | 1 + test/unit/test_upload.rb | 109 ++--------------------------- 4 files changed, 179 insertions(+), 102 deletions(-) create mode 100644 test/unit/test_chunked_reader.rb (limited to 'test') diff --git a/test/test_helper.rb b/test/test_helper.rb index 787adbf..0f2f311 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -262,3 +262,29 @@ def wait_for_death(pid) end raise "PID:#{pid} never died!" end + +# executes +cmd+ and chunks its STDOUT +def chunked_spawn(stdout, *cmd) + fork { + crd, cwr = IO.pipe + crd.binmode + cwr.binmode + crd.sync = cwr.sync = true + + pid = fork { + STDOUT.reopen(cwr) + crd.close + cwr.close + exec(*cmd) + } + cwr.close + begin + buf = crd.readpartial(16384) + stdout.write("#{'%x' % buf.size}\r\n#{buf}") + rescue EOFError + stdout.write("0\r\n") + pid, status = Process.waitpid(pid) + exit status.exitstatus + end while true + } +end diff --git a/test/unit/test_chunked_reader.rb b/test/unit/test_chunked_reader.rb new file mode 100644 index 0000000..d9fc56f --- /dev/null +++ b/test/unit/test_chunked_reader.rb @@ -0,0 +1,145 @@ +require 'test/unit' +require 'unicorn' +require 'tempfile' +require 'io/nonblock' +require 'digest/sha1' + +class TestChunkedReader < Test::Unit::TestCase + + def setup + @cr = Unicorn::ChunkedReader.new + @rd, @wr = IO.pipe + @rd.binmode + @wr.binmode + @rd.sync = @wr.sync = true + @start_pid = $$ + end + + def teardown + return if $$ != @start_pid + @rd.close rescue nil + @wr.close rescue nil + end + + def test_eof1 + @cr.reopen(@rd, "0\r\n") + assert_raises(EOFError) { @cr.readpartial(8192) } + end + + def test_eof2 + @cr.reopen(@rd, "0\r\n\r\n") + assert_raises(EOFError) { @cr.readpartial(8192) } + end + + def test_readpartial1 + @cr.reopen(@rd, "4\r\nasdf\r\n0\r\n") + assert_equal 'asdf', @cr.readpartial(8192) + assert_raises(EOFError) { @cr.readpartial(8192) } + end + + def test_gets1 + @cr.reopen(@rd, "4\r\nasdf\r\n0\r\n") + STDOUT.sync = true + assert_equal 'asdf', @cr.gets + assert_raises(EOFError) { @cr.readpartial(8192) } + end + + def test_gets2 + @cr.reopen(@rd, "4\r\nasd\n\r\n0\r\n\r\n") + assert_equal "asd\n", @cr.gets + assert_nil @cr.gets + end + + def test_gets3 + max = Unicorn::Const::CHUNK_SIZE * 2 + str = ('a' * max).freeze + first = 5 + last = str.size - first + @cr.reopen(@rd, + "#{'%x' % first}\r\n#{str[0, first]}\r\n" \ + "#{'%x' % last}\r\n#{str[-last, last]}\r\n" \ + "0\r\n") + assert_equal str, @cr.gets + assert_nil @cr.gets + end + + def test_readpartial_gets_mixed1 + max = Unicorn::Const::CHUNK_SIZE * 2 + str = ('a' * max).freeze + first = 5 + last = str.size - first + @cr.reopen(@rd, + "#{'%x' % first}\r\n#{str[0, first]}\r\n" \ + "#{'%x' % last}\r\n#{str[-last, last]}\r\n" \ + "0\r\n") + partial = @cr.readpartial(16384) + assert String === partial + + len = max - partial.size + assert_equal(str[-len, len], @cr.gets) + assert_raises(EOFError) { @cr.readpartial(1) } + assert_nil @cr.gets + end + + def test_gets_mixed_readpartial + max = 10 + str = ("z\n" * max).freeze + first = 5 + last = str.size - first + @cr.reopen(@rd, + "#{'%x' % first}\r\n#{str[0, first]}\r\n" \ + "#{'%x' % last}\r\n#{str[-last, last]}\r\n" \ + "0\r\n") + assert_equal("z\n", @cr.gets) + assert_equal("z\n", @cr.gets) + end + + def test_dd + @cr.reopen(@rd, "6\r\nhello\n\r\n") + tmp = Tempfile.new('test_dd') + tmp.sync = true + + pid = fork { + crd, cwr = IO.pipe + crd.binmode + cwr.binmode + crd.sync = cwr.sync = true + + pid = fork { + STDOUT.reopen(cwr) + crd.close + cwr.close + exec('dd', 'if=/dev/urandom', 'bs=93390', 'count=16') + } + cwr.close + begin + buf = crd.readpartial(16384) + tmp.write(buf) + @wr.write("#{'%x' % buf.size}\r\n#{buf}\r\n") + rescue EOFError + @wr.write("0\r\n\r\n") + Process.waitpid(pid) + exit 0 + end while true + } + assert_equal "hello\n", @cr.gets + sha1 = Digest::SHA1.new + buf = '' + begin + @cr.readpartial(16384, buf) + sha1.update(buf) + rescue EOFError + break + end while true + + assert_nothing_raised { Process.waitpid(pid) } + sha1_file = Digest::SHA1.new + File.open(tmp.path, 'rb') { |fp| + while fp.read(16384, buf) + sha1_file.update(buf) + end + } + assert_equal sha1_file.hexdigest, sha1.hexdigest + end + +end diff --git a/test/unit/test_request.rb b/test/unit/test_request.rb index 0bfff7d..598a0f5 100644 --- a/test/unit/test_request.rb +++ b/test/unit/test_request.rb @@ -16,6 +16,7 @@ class RequestTest < Test::Unit::TestCase class MockRequest < StringIO alias_method :readpartial, :sysread + alias_method :read_nonblock, :sysread end def setup diff --git a/test/unit/test_upload.rb b/test/unit/test_upload.rb index 9ef3ed7..adc036d 100644 --- a/test/unit/test_upload.rb +++ b/test/unit/test_upload.rb @@ -18,24 +18,20 @@ class UploadTest < Test::Unit::TestCase @sha1 = Digest::SHA1.new @sha1_app = lambda do |env| input = env['rack.input'] - resp = { :pos => input.pos, :size => input.size, :class => input.class } + resp = { :size => input.size } - # sysread @sha1.reset - begin - loop { @sha1.update(input.sysread(@bs)) } - rescue EOFError + while buf = input.read(@bs) + @sha1.update(buf) end resp[:sha1] = @sha1.hexdigest - # read - input.sysseek(0) if input.respond_to?(:sysseek) + # rewind and read again input.rewind @sha1.reset - loop { - buf = input.read(@bs) or break + while buf = input.read(@bs) @sha1.update(buf) - } + end if resp[:sha1] == @sha1.hexdigest resp[:sysread_read_byte_match] = true @@ -54,7 +50,7 @@ class UploadTest < Test::Unit::TestCase start_server(@sha1_app) sock = TCPSocket.new(@addr, @port) sock.syswrite("PUT / HTTP/1.0\r\nContent-Length: #{length}\r\n\r\n") - @count.times do + @count.times do |i| buf = @random.sysread(@bs) @sha1.update(buf) sock.syswrite(buf) @@ -63,7 +59,6 @@ class UploadTest < Test::Unit::TestCase assert_equal "HTTP/1.1 200 OK", read[0] resp = eval(read.grep(/^X-Resp: /).first.sub!(/X-Resp: /, '')) assert_equal length, resp[:size] - assert_equal 0, resp[:pos] assert_equal @sha1.hexdigest, resp[:sha1] end @@ -85,42 +80,7 @@ class UploadTest < Test::Unit::TestCase assert_equal "HTTP/1.1 200 OK", read[0] resp = eval(read.grep(/^X-Resp: /).first.sub!(/X-Resp: /, '')) assert_equal length, resp[:size] - assert_equal 0, resp[:pos] assert_equal @sha1.hexdigest, resp[:sha1] - assert_equal StringIO, resp[:class] - end - - def test_tempfile_unlinked - spew_path = lambda do |env| - if orig = env['HTTP_X_OLD_PATH'] - assert orig != env['rack.input'].path - end - assert_equal length, env['rack.input'].size - [ 200, @hdr.merge('X-Tempfile-Path' => env['rack.input'].path), [] ] - end - start_server(spew_path) - sock = TCPSocket.new(@addr, @port) - sock.syswrite("PUT / HTTP/1.0\r\nContent-Length: #{length}\r\n\r\n") - @count.times { sock.syswrite(' ' * @bs) } - path = sock.read[/^X-Tempfile-Path: (\S+)/, 1] - sock.close - - # send another request to ensure we hit the next request - sock = TCPSocket.new(@addr, @port) - sock.syswrite("PUT / HTTP/1.0\r\nX-Old-Path: #{path}\r\n" \ - "Content-Length: #{length}\r\n\r\n") - @count.times { sock.syswrite(' ' * @bs) } - path2 = sock.read[/^X-Tempfile-Path: (\S+)/, 1] - sock.close - assert path != path2 - - # make sure the next request comes in so the unlink got processed - sock = TCPSocket.new(@addr, @port) - sock.syswrite("GET ?lasdf\r\n\r\n\r\n\r\n") - sock.sysread(4096) rescue nil - sock.close - - assert ! File.exist?(path) end def test_put_keepalive_truncates_small_overwrite @@ -140,7 +100,6 @@ class UploadTest < Test::Unit::TestCase assert_equal "HTTP/1.1 200 OK", read[0] resp = eval(read.grep(/^X-Resp: /).first.sub!(/X-Resp: /, '')) assert_equal to_upload, resp[:size] - assert_equal 0, resp[:pos] assert_equal @sha1.hexdigest, resp[:sha1] end @@ -155,58 +114,6 @@ class UploadTest < Test::Unit::TestCase end end - def test_put_handler_closed_file - nr = '0' - start_server(lambda { |env| - env['rack.input'].close - resp = { :nr => nr.succ! } - [ 200, @hdr.merge({ 'X-Resp' => resp.inspect}), [] ] - }) - sock = TCPSocket.new(@addr, @port) - buf = ' ' * @bs - sock.syswrite("PUT / HTTP/1.0\r\nContent-Length: #{length}\r\n\r\n") - @count.times { sock.syswrite(buf) } - read = sock.read.split(/\r\n/) - assert_equal "HTTP/1.1 200 OK", read[0] - resp = eval(read.grep(/^X-Resp: /).first.sub!(/X-Resp: /, '')) - assert_equal '1', resp[:nr] - - # server still alive? - sock = TCPSocket.new(@addr, @port) - sock.syswrite("GET / HTTP/1.0\r\n\r\n") - read = sock.read.split(/\r\n/) - assert_equal "HTTP/1.1 200 OK", read[0] - resp = eval(read.grep(/^X-Resp: /).first.sub!(/X-Resp: /, '')) - assert_equal '2', resp[:nr] - end - - def test_renamed_file_not_closed - start_server(lambda { |env| - new_tmp = Tempfile.new('unicorn_test') - input = env['rack.input'] - File.rename(input.path, new_tmp.path) - resp = { - :inode => input.stat.ino, - :size => input.stat.size, - :new_tmp => new_tmp.path, - :old_tmp => input.path, - } - [ 200, @hdr.merge({ 'X-Resp' => resp.inspect}), [] ] - }) - sock = TCPSocket.new(@addr, @port) - buf = ' ' * @bs - sock.syswrite("PUT / HTTP/1.0\r\nContent-Length: #{length}\r\n\r\n") - @count.times { sock.syswrite(buf) } - read = sock.read.split(/\r\n/) - assert_equal "HTTP/1.1 200 OK", read[0] - resp = eval(read.grep(/^X-Resp: /).first.sub!(/X-Resp: /, '')) - new_tmp = File.open(resp[:new_tmp]) - assert_equal resp[:inode], new_tmp.stat.ino - assert_equal length, resp[:size] - assert ! File.exist?(resp[:old_tmp]) - assert_equal resp[:size], new_tmp.stat.size - end - # Despite reading numerous articles and inspecting the 1.9.1-p0 C # source, Eric Wong will never trust that we're always handling # encoding-aware IO objects correctly. Thus this test uses shell @@ -233,7 +140,6 @@ class UploadTest < Test::Unit::TestCase resp = `curl -isSfN -T#{tmp.path} http://#@addr:#@port/` assert $?.success?, 'curl ran OK' assert_match(%r!\b#{sha1}\b!, resp) - assert_match(/Tempfile/, resp) assert_match(/sysread_read_byte_match/, resp) # small StringIO path @@ -249,7 +155,6 @@ class UploadTest < Test::Unit::TestCase resp = `curl -isSfN -T#{tmp.path} http://#@addr:#@port/` assert $?.success?, 'curl ran OK' assert_match(%r!\b#{sha1}\b!, resp) - assert_match(/StringIO/, resp) assert_match(/sysread_read_byte_match/, resp) end -- cgit v1.2.3-24-ge0c7