diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/unicorn.rb | 1 | ||||
-rw-r--r-- | lib/unicorn/stream_input.rb | 152 | ||||
-rw-r--r-- | lib/unicorn/tee_input.rb | 147 |
3 files changed, 173 insertions, 127 deletions
diff --git a/lib/unicorn.rb b/lib/unicorn.rb index 622dc6c..7891d67 100644 --- a/lib/unicorn.rb +++ b/lib/unicorn.rb @@ -73,6 +73,7 @@ class Unicorn::ClientShutdown < EOFError; end require 'unicorn/const' require 'unicorn/socket_helper' +require 'unicorn/stream_input' require 'unicorn/tee_input' require 'unicorn/http_request' require 'unicorn/configurator' diff --git a/lib/unicorn/stream_input.rb b/lib/unicorn/stream_input.rb new file mode 100644 index 0000000..2a05337 --- /dev/null +++ b/lib/unicorn/stream_input.rb @@ -0,0 +1,152 @@ +# -*- encoding: binary -*- + +# When processing uploads, Unicorn may expose a StreamInput object under +# "rack.input" of the Rack (2.x) environment. +class Unicorn::StreamInput + # The I/O chunk size (in +bytes+) for I/O operations where + # the size cannot be user-specified when a method is called. + # The default is 16 kilobytes. + @@io_chunk_size = Unicorn::Const::CHUNK_SIZE + + # Initializes a new StreamInput object. You normally do not have to call + # this unless you are writing an HTTP server. + def initialize(socket, request) + @chunked = request.content_length.nil? + @socket = socket + @parser = request + @buf = request.buf + @rbuf = '' + @bytes_read = 0 + filter_body(@rbuf, @buf) unless @buf.empty? + end + + # :call-seq: + # ios.read([length [, buffer ]]) => string, buffer, or nil + # + # Reads at most length bytes from the I/O stream, or to the end of + # file if length is omitted or is nil. length must be a non-negative + # integer or nil. If the optional buffer argument is present, it + # must reference a String, which will receive the data. + # + # At end of file, it returns nil or '' depend on length. + # ios.read() and ios.read(nil) returns ''. + # ios.read(length [, buffer]) returns nil. + # + # If the Content-Length of the HTTP request is known (as is the common + # case for POST requests), then ios.read(length [, buffer]) will block + # until the specified length is read (or it is the last chunk). + # Otherwise, for uncommon "Transfer-Encoding: chunked" requests, + # ios.read(length [, buffer]) will return immediately if there is + # any data and only block when nothing is available (providing + # IO#readpartial semantics). + def read(*args) + length = args.shift + rv = args.shift || '' + if length.nil? + rv.replace(@rbuf) + @rbuf.replace('') + @socket or return rv + until eof? + @socket.kgio_read(@@io_chunk_size, @buf) or eof! + filter_body(@rbuf, @buf) + rv << @rbuf + end + @rbuf.replace('') + else + if length <= @rbuf.size + rv.replace(@rbuf.slice(0, length)) + @rbuf.replace(@rbuf.slice(length, @rbuf.size) || '') + else + rv.replace(@rbuf) + length -= @rbuf.size + @rbuf.replace('') + until length == 0 || eof? || (rv.size > 0 && @chunked) + @socket.kgio_read(length, @buf) or eof! + filter_body(@rbuf, @buf) + rv << @rbuf + length -= @rbuf.size + @rbuf.replace('') + end + end + rv = nil if rv.empty? + end + rv + end + + # :call-seq: + # ios.gets => string or nil + # + # Reads the next ``line'' from the I/O stream; lines are separated + # by the global record separator ($/, typically "\n"). A global + # record separator of nil reads the entire unread contents of ios. + # Returns nil if called at the end of file. + # This takes zero arguments for strict Rack::Lint compatibility, + # unlike IO#gets. + def gets + sep = $/ + if sep.nil? + rv = read + return rv.empty? ? nil : rv + end + re = /\A(.*?#{Regexp.escape(sep)})/ + + begin + @rbuf.gsub!(re, '') and return $1 + if eof? + if @rbuf.empty? + return nil + else + rv = @rbuf.dup + @rbuf.replace('') + return rv + end + end + @socket.kgio_read(@@io_chunk_size, @buf) or eof! + filter_body(once = '', @buf) + @rbuf << once + end while true + end + + # :call-seq: + # ios.each { |line| block } => ios + # + # Executes the block for every ``line'' in *ios*, where lines are + # separated by the global record separator ($/, typically "\n"). + def each(&block) + while line = gets + yield line + end + + self # Rack does not specify what the return value is here + end + +private + + def eof? + if @parser.body_eof? + until @parser.parse + once = @socket.kgio_read(@@io_chunk_size) or eof! + @buf << once + end + @socket = nil + true + else + false + end + end + + def filter_body(dst, src) + rv = @parser.filter_body(dst, src) + @bytes_read += dst.size + rv + end + + def eof! + # in case client only did a premature shutdown(SHUT_WR) + # we do support clients that shutdown(SHUT_WR) after the + # _entire_ request has been sent, and those will not have + # raised EOFError on us. + @socket.close if @socket + raise Unicorn::ClientShutdown, "bytes_read=#{@bytes_read}", [] + end +end diff --git a/lib/unicorn/tee_input.rb b/lib/unicorn/tee_input.rb index 351d79f..74d9df6 100644 --- a/lib/unicorn/tee_input.rb +++ b/lib/unicorn/tee_input.rb @@ -11,34 +11,18 @@ # # When processing uploads, Unicorn exposes a TeeInput object under # "rack.input" of the Rack environment. -class Unicorn::TeeInput - attr_accessor :tmp, :socket, :parser, :env, :buf, :len, :buf2 - +class Unicorn::TeeInput < Unicorn::StreamInput # The maximum size (in +bytes+) to buffer in memory before # resorting to a temporary file. Default is 112 kilobytes. @@client_body_buffer_size = Unicorn::Const::MAX_BODY - # The I/O chunk size (in +bytes+) for I/O operations where - # the size cannot be user-specified when a method is called. - # The default is 16 kilobytes. - @@io_chunk_size = Unicorn::Const::CHUNK_SIZE - # Initializes a new TeeInput object. You normally do not have to call # this unless you are writing an HTTP server. def initialize(socket, request) - @socket = socket - @parser = request - @buf = request.buf - @env = request.env @len = request.content_length + super @tmp = @len && @len < @@client_body_buffer_size ? StringIO.new("") : Unicorn::TmpIO.new - @buf2 = "" - if @buf.size > 0 - @parser.filter_body(@buf2, @buf) and finalize_input - @tmp.write(@buf2) - @tmp.rewind - end end # :call-seq: @@ -59,15 +43,9 @@ class Unicorn::TeeInput # specified +length+ in a loop until it returns +nil+. def size @len and return @len - - if socket - pos = @tmp.pos - while tee(@@io_chunk_size, @buf2) - end - @tmp.seek(pos) - end - - @len = @tmp.size + consume! + @tmp.rewind + @len = @bytes_read end # :call-seq: @@ -90,24 +68,7 @@ class Unicorn::TeeInput # any data and only block when nothing is available (providing # IO#readpartial semantics). def read(*args) - @socket or return @tmp.read(*args) - - length = args.shift - if nil == length - rv = @tmp.read || "" - while tee(@@io_chunk_size, @buf2) - rv << @buf2 - end - rv - else - rv = args.shift || "" - diff = @tmp.size - @tmp.pos - if 0 == diff - ensure_length(tee(length, rv), length) - else - ensure_length(@tmp.read(diff > length ? length : diff, rv), length) - end - end + @socket ? tee(super) : @tmp.read(*args) end # :call-seq: @@ -121,42 +82,9 @@ class Unicorn::TeeInput # unlike IO#gets. def gets @socket or return @tmp.gets - sep = $/ or return read - - orig_size = @tmp.size - if @tmp.pos == orig_size - tee(@@io_chunk_size, @buf2) or return nil - @tmp.seek(orig_size) - end - - sep_size = Rack::Utils.bytesize(sep) - line = @tmp.gets # cannot be nil here since size > pos - sep == line[-sep_size, sep_size] and return line - - # unlikely, if we got here, then @tmp is at EOF - begin - orig_size = @tmp.pos - tee(@@io_chunk_size, @buf2) or break - @tmp.seek(orig_size) - line << @tmp.gets - sep == line[-sep_size, sep_size] and return line - # @tmp is at EOF again here, retry the loop - end while true - - line - end - - # :call-seq: - # ios.each { |line| block } => ios - # - # Executes the block for every ``line'' in *ios*, where lines are - # separated by the global record separator ($/, typically "\n"). - def each(&block) - while line = gets - yield line - end - - self # Rack does not specify what the return value is here + rv = super + # the $/.nil? case is implemented using read, so don't tee() again + $/.nil? ? rv : tee(rv) end # :call-seq: @@ -166,59 +94,24 @@ class Unicorn::TeeInput # the offset (zero) of the +ios+ pointer. Subsequent reads will # start from the beginning of the previously-buffered input. def rewind + return 0 if @bytes_read == 0 + consume! if @socket @tmp.rewind # Rack does not specify what the return value is here end private - # tees off a +length+ chunk of data from the input into the IO - # backing store as well as returning it. +dst+ must be specified. - # returns nil if reading from the input returns nil - def tee(length, dst) - unless @parser.body_eof? - r = @socket.kgio_read(length, @buf) or eof! - unless @parser.filter_body(dst, @buf) - @tmp.write(dst) - @tmp.seek(0, IO::SEEK_END) # workaround FreeBSD/OSX + MRI 1.8.x bug - return dst - end - end - finalize_input + # consumes the stream of the socket + def consume! + junk = "" + nil while read(@@io_chunk_size, junk) end - def finalize_input - until @parser.parse - r = @socket.kgio_read(@@io_chunk_size) or eof! - @buf << r + def tee(buffer) + if buffer && (n = buffer.size) > 0 + @tmp.write(buffer) + @tmp.seek(0, IO::SEEK_END) # workaround FreeBSD/OSX + MRI 1.8.x bug end - @socket = nil - end - - # tee()s into +dst+ until it is of +length+ bytes (or until - # we've reached the Content-Length of the request body). - # Returns +dst+ (the exact object, not a duplicate) - # To continue supporting applications that need near-real-time - # streaming input bodies, this is a no-op for - # "Transfer-Encoding: chunked" requests. - def ensure_length(dst, length) - # len is nil for chunked bodies, so we can't ensure length for those - # since they could be streaming bidirectionally and we don't want to - # block the caller in that case. - return dst if dst.nil? || @len.nil? - - while dst.size < length && tee(length - dst.size, @buf2) - dst << @buf2 - end - - dst - end - - def eof! - # in case client only did a premature shutdown(SHUT_WR) - # we do support clients that shutdown(SHUT_WR) after the - # _entire_ request has been sent, and those will not have - # raised EOFError on us. - @socket.close if @socket - raise Unicorn::ClientShutdown, "bytes_read=#{@tmp.size}", [] + buffer end end |