rainbows.git  about / heads / tags
Unicorn for sleepy apps and slow clients
blob bc85fbd167d80bdd866afeb03a978de117163e1b 5074 bytes (raw)
$ git show v2.0.0:lib/rainbows/rev/client.rb	# shows this blob on the CLI

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
 
# -*- encoding: binary -*-
# :enddoc:
require 'rainbows/ev_core'
module Rainbows
  module Rev

    class Client < ::Rev::IO
      include Rainbows::EvCore
      G = Rainbows::G
      F = Rainbows::StreamFile

      def initialize(io)
        CONN[self] = false
        super(io)
        post_init
        @deferred = nil
      end

      def quit
        super
        close if @deferred.nil? && @_write_buffer.empty?
      end

      # override the ::Rev::IO#write method try to write directly to the
      # kernel socket buffers to avoid an extra userspace copy if
      # possible.
      def write(buf)
        if @_write_buffer.empty?
          begin
            case rv = @_io.kgio_trywrite(buf)
            when nil
              return enable_write_watcher
            when :wait_writable
              break # fall through to super(buf)
            when String
              buf = rv # retry, skb could grow or been drained
            end
          rescue => e
            return handle_error(e)
          end while true
        end
        super(buf)
      end

      def on_readable
        buf = @_io.kgio_tryread(16384)
        case buf
        when :wait_readable
        when nil # eof
          close
        else
          on_read buf
        end
      rescue Errno::ECONNRESET
        close
      end

      # queued, optional response bodies, it should only be unpollable "fast"
      # devices where read(2) is uninterruptable.  Unfortunately, NFS and ilk
      # are also part of this.  We'll also stick DeferredResponse bodies in
      # here to prevent connections from being closed on us.
      def defer_body(io)
        @deferred = io
        enable_write_watcher
      end

      # allows enabling of write watcher even when read watcher is disabled
      def evloop
        Rainbows::Rev::Server::LOOP
      end

      def next!
        @deferred = nil
        enable_write_watcher
      end

      def timeout?
        @deferred.nil? && @_write_buffer.empty? and close.nil?
      end

      # used for streaming sockets and pipes
      def stream_response(status, headers, io, body)
        c = stream_response_headers(status, headers) if headers
        # we only want to attach to the Rev::Loop belonging to the
        # main thread in Ruby 1.9
        io = (c ? DeferredChunkResponse : DeferredResponse).new(io, self, body)
        defer_body(io.attach(Server::LOOP))
      end

      def rev_write_response(response, alive)
        status, headers, body = response
        headers = @hp.headers? ? HH.new(headers) : nil

        headers[CONNECTION] = alive ? KEEP_ALIVE : CLOSE if headers
        if body.respond_to?(:to_path)
          io = body_to_io(body)
          st = io.stat

          if st.file?
            offset, count = 0, st.size
            if headers
              if range = make_range!(@env, status, headers)
                status, offset, count = range
              end
              write(response_header(status, headers))
            end
            return defer_body(F.new(offset, count, io, body))
          elsif st.socket? || st.pipe?
            return stream_response(status, headers, io, body)
          end
          # char or block device... WTF? fall through to body.each
        end
        write(response_header(status, headers)) if headers
        write_body_each(self, body, nil)
      end

      def app_call
        KATO.delete(self)
        @env[RACK_INPUT] = @input
        @env[REMOTE_ADDR] = @_io.kgio_addr
        response = APP.call(@env.update(RACK_DEFAULTS))

        rev_write_response(response, alive = @hp.keepalive? && G.alive)
        return quit unless alive && :close != @state
        @hp.reset
        @state = :headers
        disable if enabled?
      end

      def on_write_complete
        case @deferred
        when DeferredResponse then return
        when NilClass # fall through
        else
          begin
            return rev_sendfile(@deferred)
          rescue EOFError # expected at file EOF
            close_deferred
          end
        end

        case @state
        when :close
          close if @_write_buffer.empty?
        when :headers
          if @hp.parse
            app_call
          else
            unless enabled?
              enable
              KATO[self] = Time.now
            end
          end
        end
        rescue => e
          handle_error(e)
      end

      def handle_error(e)
        close_deferred
        if msg = Error.response(e)
          @_io.kgio_trywrite(msg) rescue nil
        end
        @_write_buffer.clear
        ensure
          quit
      end

      def close_deferred
        case @deferred
        when DeferredResponse, NilClass
        else
          begin
            @deferred.close
          rescue => e
            G.server.logger.error("closing #@deferred: #{e}")
          end
          @deferred = nil
        end
      end

      def on_close
        close_deferred
        CONN.delete(self)
        KATO.delete(self)
      end

    end # module Client
  end # module Rev
end # module Rainbows

git clone https://yhbt.net/rainbows.git