yahns.git  about / heads / tags
sleepy, multi-threaded, non-blocking application server for Ruby
blob 496faa2322a09546f525df03cb5490e198a5cca5 4573 bytes (raw)
$ git show maint:lib/yahns/stream_input.rb	# shows this blob on the CLI

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
 
# -*- encoding: binary -*-
# Copyright (C) 2013-2016 all contributors <yahns-public@yhbt.net>
# License: GPLv2 or later (https://www.gnu.org/licenses/gpl-2.0.txt)
# frozen_string_literal: true

# When processing uploads, Yahns may expose a StreamInput object under
# "rack.input" of the (future) Rack (2.x) environment.
class Yahns::StreamInput # :nodoc:
  # Initializes a new StreamInput object.  You normally do not have to call
  # this unless you are writing an HTTP server.
  def initialize(client, request)
    @chunked = request.content_length.nil?
    @client = client
    @parser = request
    @buf = request.buf
    @rbuf = ''.dup
    @bytes_read = 0
    filter_body(@rbuf, @buf) unless @buf.empty?
  end

  # :call-seq:
  #   ios.read([length [, buffer ]]) => string, buffer, or nil
  #
  # Reads at most length bytes from the I/O stream, or to the end of
  # file if length is omitted or is nil. length must be a non-negative
  # integer or nil. If the optional buffer argument is present, it
  # must reference a String, which will receive the data.
  #
  # At end of file, it returns nil or '' depend on length.
  # ios.read() and ios.read(nil) returns ''.
  # ios.read(length [, buffer]) returns nil.
  #
  # If the Content-Length of the HTTP request is known (as is the common
  # case for POST requests), then ios.read(length [, buffer]) will block
  # until the specified length is read (or it is the last chunk).
  # Otherwise, for uncommon "Transfer-Encoding: chunked" requests,
  # ios.read(length [, buffer]) will return immediately if there is
  # any data and only block when nothing is available (providing
  # IO#readpartial semantics).
  def read(length = nil, rv = ''.dup)
    if length
      if length <= @rbuf.size
        length < 0 and raise ArgumentError, "negative length #{length} given"
        rv.replace(@rbuf.slice!(0, length))
      else
        to_read = length - @rbuf.size
        rv.replace(@rbuf.slice!(0, @rbuf.size))
        until to_read == 0 || eof? || (rv.size > 0 && @chunked)
          @client.yahns_read(to_read, @buf) or eof!
          filter_body(@rbuf, @buf)
          rv << @rbuf
          to_read -= @rbuf.size
        end
        @rbuf.clear
      end
      rv = nil if rv.empty? && length != 0
    else
      read_all(rv)
    end
    rv
  end

  def __rsize
    @client ? @client.class.client_body_buffer_size : nil
  end

  def __tlsbuf
    Thread.current[:yahns_rbuf]
  end

  # :call-seq:
  #   ios.gets   => string or nil
  #
  # Reads the next ``line'' from the I/O stream; lines are separated
  # by the global record separator ($/, typically "\n"). A global
  # record separator of nil reads the entire unread contents of ios.
  # Returns nil if called at the end of file.
  # This takes zero arguments for strict Rack::Lint compatibility,
  # unlike IO#gets.
  def gets
    sep = $/
    if sep.nil?
      read_all(rv = ''.dup)
      return rv.empty? ? nil : rv
    end
    re = /\A(.*?#{Regexp.escape(sep)})/
    rsize = __rsize or return
    tlsbuf = __tlsbuf
    begin
      @rbuf.sub!(re, '') and return $1
      return @rbuf.empty? ? nil : @rbuf.slice!(0, @rbuf.size) if eof?
      @client.yahns_read(rsize, @buf) or eof!
      filter_body(tlsbuf, @buf)
      @rbuf << tlsbuf
    end while true
  end

  # :call-seq:
  #   ios.each { |line| block }  => ios
  #
  # Executes the block for every ``line'' in *ios*, where lines are
  # separated by the global record separator ($/, typically "\n").
  def each
    while line = gets
      yield line
    end

    self # Rack does not specify what the return value is here
  end

  def eof?
    if @parser.body_eof?
      rsize = __rsize
      tlsbuf = __tlsbuf
      while @chunked && ! @parser.parse
        @client.yahns_read(rsize, tlsbuf) or eof!
        @buf << tlsbuf
      end
      @client = nil
      true
    else
      false
    end
  end

  def filter_body(dst, src)
    rv = @parser.filter_body(dst, src)
    @bytes_read += dst.size
    rv
  end

  def read_all(dst)
    dst.replace(@rbuf)
    rsize = __rsize or return
    until eof?
      @client.yahns_read(rsize, @buf) or eof!
      filter_body(@rbuf, @buf)
      dst << @rbuf
    end
  ensure
    @rbuf.clear
  end

  def eof!
    # in case client only did a premature shutdown(SHUT_WR)
    # we do support clients that shutdown(SHUT_WR) after the
    # _entire_ request has been sent, and those will not have
    # raised EOFError on us.
    @client.shutdown if @client
  ensure
    raise Yahns::ClientShutdown, "bytes_read=#{@bytes_read}", []
  end

  def close # return nil
  end
end

git clone git://yhbt.net/yahns.git
git clone https://yhbt.net/yahns.git