rainbows.git  about / heads / tags
Unicorn for sleepy apps and slow clients
blob 8bfeb31f01db6919ee0dedda6a69a21414c668a4 4975 bytes (raw)
$ git show v0.97.0:lib/rainbows/rev/client.rb	# shows this blob on the CLI

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
 
# -*- encoding: binary -*-
# :enddoc:
require 'rainbows/ev_core'
module Rainbows
  module Rev

    class Client < ::Rev::IO
      include Rainbows::ByteSlice
      include Rainbows::EvCore
      G = Rainbows::G
      F = Rainbows::StreamFile

      def initialize(io)
        CONN[self] = false
        super(io)
        post_init
        @deferred = nil
      end

      def quit
        super
        close if @deferred.nil? && @_write_buffer.empty?
      end

      # override the ::Rev::IO#write method try to write directly to the
      # kernel socket buffers to avoid an extra userspace copy if
      # possible.
      def write(buf)
        if @_write_buffer.empty?
          begin
            w = @_io.write_nonblock(buf)
            return enable_write_watcher if w == Rack::Utils.bytesize(buf)
            # we never care for the return value, but yes, we may return
            # a "fake" short write from super(buf) if anybody cares.
            buf = byte_slice(buf, w..-1)
          rescue Errno::EAGAIN
            break # fall through to super(buf)
          rescue => e
            return handle_error(e)
          end while true
        end
        super(buf)
      end

      # queued, optional response bodies, it should only be unpollable "fast"
      # devices where read(2) is uninterruptable.  Unfortunately, NFS and ilk
      # are also part of this.  We'll also stick DeferredResponse bodies in
      # here to prevent connections from being closed on us.
      def defer_body(io)
        @deferred = io
        enable_write_watcher
      end

      # allows enabling of write watcher even when read watcher is disabled
      def evloop
        Rainbows::Rev::Server::LOOP
      end

      def next!
        @deferred = nil
        enable_write_watcher
      end

      def timeout?
        @deferred.nil? && @_write_buffer.empty? and close.nil?
      end

      # used for streaming sockets and pipes
      def stream_response(status, headers, io, body)
        c = stream_response_headers(status, headers) if headers
        # we only want to attach to the Rev::Loop belonging to the
        # main thread in Ruby 1.9
        io = (c ? DeferredChunkResponse : DeferredResponse).new(io, self, body)
        defer_body(io.attach(Server::LOOP))
      end

      def rev_write_response(response, alive)
        status, headers, body = response
        headers = @hp.headers? ? HH.new(headers) : nil

        headers[CONNECTION] = alive ? KEEP_ALIVE : CLOSE if headers
        if body.respond_to?(:to_path)
          io = body_to_io(body)
          st = io.stat

          if st.file?
            offset, count = 0, st.size
            if headers
              if range = make_range!(@env, status, headers)
                status, offset, count = range
              end
              write(response_header(status, headers))
            end
            return defer_body(F.new(offset, count, io, body))
          elsif st.socket? || st.pipe?
            return stream_response(status, headers, io, body)
          end
          # char or block device... WTF? fall through to body.each
        end
        write(response_header(status, headers)) if headers
        write_body_each(self, body, nil)
      end

      def app_call
        KATO.delete(self)
        @env[RACK_INPUT] = @input
        @env[REMOTE_ADDR] = @remote_addr
        response = APP.call(@env.update(RACK_DEFAULTS))

        rev_write_response(response, alive = @hp.keepalive? && G.alive)
        return quit unless alive && :close != @state
        @env.clear
        @hp.reset
        @state = :headers
        disable if enabled?
      end

      def on_write_complete
        case @deferred
        when DeferredResponse then return
        when NilClass # fall through
        else
          begin
            return rev_sendfile(@deferred)
          rescue EOFError # expected at file EOF
            close_deferred
          end
        end

        case @state
        when :close
          close if @_write_buffer.empty?
        when :headers
          if @hp.headers(@env, @buf)
            app_call
          else
            unless enabled?
              enable
              KATO[self] = Time.now
            end
          end
        end
        rescue => e
          handle_error(e)
      end

      def handle_error(e)
        close_deferred
        if msg = Error.response(e)
          @_io.write_nonblock(msg) rescue nil
        end
        @_write_buffer.clear
        ensure
          quit
      end

      def close_deferred
        case @deferred
        when DeferredResponse, NilClass
        else
          begin
            @deferred.close
          rescue => e
            G.server.logger.error("closing #@deferred: #{e}")
          end
          @deferred = nil
        end
      end

      def on_close
        close_deferred
        CONN.delete(self)
        KATO.delete(self)
      end

    end # module Client
  end # module Rev
end # module Rainbows

git clone https://yhbt.net/rainbows.git