about summary refs log tree commit homepage
path: root/lib/unicorn/tee_input.rb
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2010-11-11 01:13:12 +0800
committerEric Wong <normalperson@yhbt.net>2010-11-11 07:18:20 +0800
commit7d44b5384758aeddcb49d7606a9908308df7c698 (patch)
tree28b399240b597ffce659ba10767b2ec143ceac6a /lib/unicorn/tee_input.rb
parent1493af7cc23afecc8592ce44f5226476afccd212 (diff)
downloadunicorn-7d44b5384758aeddcb49d7606a9908308df7c698.tar.gz
We will eventually expose a Unicorn::StreamInput object as
"rack.input" for Rack 2.x applications.  StreamInput allows
applications to avoid buffering input to disk, removing the
(potentially expensive) rewindability requirement of Rack 1.x.

TeeInput is also rewritten to build off StreamInput for
simplicity.  The only regression is that TeeInput#rewind forces
us to consume an unconsumed stream before returning, a
negligible price to pay for decreased complexity.
Diffstat (limited to 'lib/unicorn/tee_input.rb')
-rw-r--r--lib/unicorn/tee_input.rb147
1 files changed, 20 insertions, 127 deletions
diff --git a/lib/unicorn/tee_input.rb b/lib/unicorn/tee_input.rb
index 351d79f..74d9df6 100644
--- a/lib/unicorn/tee_input.rb
+++ b/lib/unicorn/tee_input.rb
@@ -11,34 +11,18 @@
 #
 # When processing uploads, Unicorn exposes a TeeInput object under
 # "rack.input" of the Rack environment.
-class Unicorn::TeeInput
-  attr_accessor :tmp, :socket, :parser, :env, :buf, :len, :buf2
-
+class Unicorn::TeeInput < Unicorn::StreamInput
   # The maximum size (in +bytes+) to buffer in memory before
   # resorting to a temporary file.  Default is 112 kilobytes.
   @@client_body_buffer_size = Unicorn::Const::MAX_BODY
 
-  # The I/O chunk size (in +bytes+) for I/O operations where
-  # the size cannot be user-specified when a method is called.
-  # The default is 16 kilobytes.
-  @@io_chunk_size = Unicorn::Const::CHUNK_SIZE
-
   # Initializes a new TeeInput object.  You normally do not have to call
   # this unless you are writing an HTTP server.
   def initialize(socket, request)
-    @socket = socket
-    @parser = request
-    @buf = request.buf
-    @env = request.env
     @len = request.content_length
+    super
     @tmp = @len && @len < @@client_body_buffer_size ?
            StringIO.new("") : Unicorn::TmpIO.new
-    @buf2 = ""
-    if @buf.size > 0
-      @parser.filter_body(@buf2, @buf) and finalize_input
-      @tmp.write(@buf2)
-      @tmp.rewind
-    end
   end
 
   # :call-seq:
@@ -59,15 +43,9 @@ class Unicorn::TeeInput
   # specified +length+ in a loop until it returns +nil+.
   def size
     @len and return @len
-
-    if socket
-      pos = @tmp.pos
-      while tee(@@io_chunk_size, @buf2)
-      end
-      @tmp.seek(pos)
-    end
-
-    @len = @tmp.size
+    consume!
+    @tmp.rewind
+    @len = @bytes_read
   end
 
   # :call-seq:
@@ -90,24 +68,7 @@ class Unicorn::TeeInput
   # any data and only block when nothing is available (providing
   # IO#readpartial semantics).
   def read(*args)
-    @socket or return @tmp.read(*args)
-
-    length = args.shift
-    if nil == length
-      rv = @tmp.read || ""
-      while tee(@@io_chunk_size, @buf2)
-        rv << @buf2
-      end
-      rv
-    else
-      rv = args.shift || ""
-      diff = @tmp.size - @tmp.pos
-      if 0 == diff
-        ensure_length(tee(length, rv), length)
-      else
-        ensure_length(@tmp.read(diff > length ? length : diff, rv), length)
-      end
-    end
+    @socket ? tee(super) : @tmp.read(*args)
   end
 
   # :call-seq:
@@ -121,42 +82,9 @@ class Unicorn::TeeInput
   # unlike IO#gets.
   def gets
     @socket or return @tmp.gets
-    sep = $/ or return read
-
-    orig_size = @tmp.size
-    if @tmp.pos == orig_size
-      tee(@@io_chunk_size, @buf2) or return nil
-      @tmp.seek(orig_size)
-    end
-
-    sep_size = Rack::Utils.bytesize(sep)
-    line = @tmp.gets # cannot be nil here since size > pos
-    sep == line[-sep_size, sep_size] and return line
-
-    # unlikely, if we got here, then @tmp is at EOF
-    begin
-      orig_size = @tmp.pos
-      tee(@@io_chunk_size, @buf2) or break
-      @tmp.seek(orig_size)
-      line << @tmp.gets
-      sep == line[-sep_size, sep_size] and return line
-      # @tmp is at EOF again here, retry the loop
-    end while true
-
-    line
-  end
-
-  # :call-seq:
-  #   ios.each { |line| block }  => ios
-  #
-  # Executes the block for every ``line'' in *ios*, where lines are
-  # separated by the global record separator ($/, typically "\n").
-  def each(&block)
-    while line = gets
-      yield line
-    end
-
-    self # Rack does not specify what the return value is here
+    rv = super
+    # the $/.nil? case is implemented using read, so don't tee() again
+    $/.nil? ? rv : tee(rv)
   end
 
   # :call-seq:
@@ -166,59 +94,24 @@ class Unicorn::TeeInput
   # the offset (zero) of the +ios+ pointer.  Subsequent reads will
   # start from the beginning of the previously-buffered input.
   def rewind
+    return 0 if @bytes_read == 0
+    consume! if @socket
     @tmp.rewind # Rack does not specify what the return value is here
   end
 
 private
 
-  # tees off a +length+ chunk of data from the input into the IO
-  # backing store as well as returning it.  +dst+ must be specified.
-  # returns nil if reading from the input returns nil
-  def tee(length, dst)
-    unless @parser.body_eof?
-      r = @socket.kgio_read(length, @buf) or eof!
-      unless @parser.filter_body(dst, @buf)
-        @tmp.write(dst)
-        @tmp.seek(0, IO::SEEK_END) # workaround FreeBSD/OSX + MRI 1.8.x bug
-        return dst
-      end
-    end
-    finalize_input
+  # consumes the stream of the socket
+  def consume!
+    junk = ""
+    nil while read(@@io_chunk_size, junk)
   end
 
-  def finalize_input
-    until @parser.parse
-      r = @socket.kgio_read(@@io_chunk_size) or eof!
-      @buf << r
+  def tee(buffer)
+    if buffer && (n = buffer.size) > 0
+      @tmp.write(buffer)
+      @tmp.seek(0, IO::SEEK_END) # workaround FreeBSD/OSX + MRI 1.8.x bug
     end
-    @socket = nil
-  end
-
-  # tee()s into +dst+ until it is of +length+ bytes (or until
-  # we've reached the Content-Length of the request body).
-  # Returns +dst+ (the exact object, not a duplicate)
-  # To continue supporting applications that need near-real-time
-  # streaming input bodies, this is a no-op for
-  # "Transfer-Encoding: chunked" requests.
-  def ensure_length(dst, length)
-    # len is nil for chunked bodies, so we can't ensure length for those
-    # since they could be streaming bidirectionally and we don't want to
-    # block the caller in that case.
-    return dst if dst.nil? || @len.nil?
-
-    while dst.size < length && tee(length - dst.size, @buf2)
-      dst << @buf2
-    end
-
-    dst
-  end
-
-  def eof!
-    # in case client only did a premature shutdown(SHUT_WR)
-    # we do support clients that shutdown(SHUT_WR) after the
-    # _entire_ request has been sent, and those will not have
-    # raised EOFError on us.
-    @socket.close if @socket
-    raise Unicorn::ClientShutdown, "bytes_read=#{@tmp.size}", []
+    buffer
   end
 end