rainbows.git  about / heads / tags
Unicorn for sleepy apps and slow clients
blob 84c1e7c10e62b016c58a5ed3e05c1e164c1974ff 2578 bytes (raw)
$ git show HEAD:lib/rainbows/writer_thread_spawn/client.rb	# shows this blob on the CLI

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
 
# -*- encoding: binary -*-
# :enddoc:
# used to wrap a BasicSocket to use with +q+ for all writes
# this is compatible with IO.select
class Rainbows::WriterThreadSpawn::Client < Struct.new(:to_io, :q, :thr)
  include Rainbows::SocketProxy
  include Rainbows::ProcessClient
  include Rainbows::WorkerYield

  CUR = {} # :nodoc:

  module Methods
    def write_body_each(body)
      q << [ :write_body_each, body ]
    end

    def write_response_close(status, headers, body, alive)
      to_io.instance_variable_set(:@hp, @hp) # XXX ugh
      Rainbows::SyncClose.new(body) { |sync_body|
        q << [ :write_response, status, headers, sync_body, alive ]
      }
    end

    if Rainbows::Response::COPY_STREAM || IO.method_defined?(:trysendfile)
      def write_response(status, headers, body, alive)
        self.q ||= queue_writer
        if body.respond_to?(:close)
          write_response_close(status, headers, body, alive)
        elsif body.respond_to?(:to_path)
          write_response_path(status, headers, body, alive)
        else
          super
        end
      end

      def write_body_file(body, range)
        q << [ :write_body_file, body, range ]
      end

      def write_body_stream(body)
        q << [ :write_body_stream, body ]
      end
    else # each-only body response
      def write_response(status, headers, body, alive)
        self.q ||= queue_writer
        if body.respond_to?(:close)
          write_response_close(status, headers, body, alive)
        else
          super
        end
      end
    end # each-only body response
  end # module Methods
  include Methods

  def self.quit
    CUR.delete_if do |t,q|
      q << nil
      Rainbows.tick
      t.alive? ? t.join(0.01) : true
    end until CUR.empty?
  end

  def queue_writer
    until CUR.size < MAX
      CUR.delete_if { |t,_|
        t.alive? ? t.join(0) : true
      }.size >= MAX and worker_yield
    end

    q = Queue.new
    self.thr = Thread.new(to_io) do |io|
      while op = q.shift
        begin
          op, *rest = op
          case op
          when String
            io.kgio_write(op)
          when :close
            io.close unless io.closed?
            break
          else
            io.__send__ op, *rest
          end
        rescue => e
          Rainbows::Error.write(io, e)
        end
      end
      CUR.delete(Thread.current)
    end
    CUR[thr] = q
  end

  def write(buf)
    (self.q ||= queue_writer) << buf
  end

  def close
    if q
      q << :close
    else
      to_io.close
    end
  end

  def closed?
    to_io.closed?
  end
end

git clone https://yhbt.net/rainbows.git