rainbows.git  about / heads / tags
Unicorn for sleepy apps and slow clients
blob 896fdacab19b0941520b36bcac35555a1a79c66a 4522 bytes (raw)
$ git show HEAD:lib/rainbows/event_machine.rb	# shows this blob on the CLI

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
 
# -*- encoding: binary -*-
require 'eventmachine'
EM::VERSION >= '0.12.10' or abort 'eventmachine 0.12.10 is required'

# Implements a basic single-threaded event model with
# {EventMachine}[http://rubyeventmachine.com/].  It is capable of
# handling thousands of simultaneous client connections, but with only
# a single-threaded app dispatch.  It is suited for slow clients,
# and can work with slow applications via asynchronous libraries such as
# {async_sinatra}[http://github.com/raggi/async_sinatra],
# {Cramp}[http://cramp.in/],
# and {rack-fiber_pool}[http://github.com/mperham/rack-fiber_pool].
#
# It does not require your Rack application to be thread-safe,
# reentrancy is only required for the DevFdResponse body
# generator.
#
# Compatibility: Whatever \EventMachine ~> 0.12.10 and Unicorn both
# support, currently Ruby 1.8/1.9.
#
# This model is compatible with users of "async.callback" in the Rack
# environment such as
# {async_sinatra}[http://github.com/raggi/async_sinatra].
#
# For a complete asynchronous framework,
# {Cramp}[http://cramp.in/] is fully
# supported when using this concurrency model.
#
# This model is fully-compatible with
# {rack-fiber_pool}[http://github.com/mperham/rack-fiber_pool]
# which allows each request to run inside its own \Fiber after
# all request processing is complete.
#
# Merb (and other frameworks/apps) supporting +deferred?+ execution as
# documented at Rainbows::EventMachine::TryDefer
#
# This model does not implement as streaming "rack.input" which allows
# the Rack application to process data as it arrives.  This means
# "rack.input" will be fully buffered in memory or to a temporary file
# before the application is entered.
#
# === RubyGem Requirements
#
# * event_machine 0.12.10
module Rainbows::EventMachine
  autoload :ResponsePipe, 'rainbows/event_machine/response_pipe'
  autoload :ResponseChunkPipe, 'rainbows/event_machine/response_chunk_pipe'
  autoload :TryDefer, 'rainbows/event_machine/try_defer'
  autoload :Client, 'rainbows/event_machine/client'

  include Rainbows::Base

  # Cramp (and possibly others) can subclass Rainbows::EventMachine::Client
  # and provide the :em_client_class option.  We /don't/ want to load
  # Rainbows::EventMachine::Client in the master process since we need
  # reloadability.
  def em_client_class
    case klass = Rainbows::O[:em_client_class]
    when Proc
      klass.call # e.g.: proc { Cramp::WebSocket::Rainbows }
    when Symbol, String
      eval(klass.to_s) # Object.const_get won't resolve multi-level paths
    else # @use should be either :EventMachine or :NeverBlock
      Rainbows.const_get(@use).const_get(:Client)
    end
  end

  def defers_finished?
    # EventMachine 1.0.0+ has defers_finished?
    EM.respond_to?(:defers_finished?) ? EM.defers_finished? : true
  end

  # runs inside each forked worker, this sits around and waits
  # for connections and doesn't die until the parent dies (or is
  # given a INT, QUIT, or TERM signal)
  def worker_loop(worker) # :nodoc:
    init_worker_process(worker)
    server = Rainbows.server
    server.app.respond_to?(:deferred?) and
      server.app = TryDefer.new(server.app)

    # enable them both, should be non-fatal if not supported
    EM.epoll
    EM.kqueue
    logger.info "#@use: epoll=#{EM.epoll?} kqueue=#{EM.kqueue?}"
    client_class = em_client_class
    max = worker_connections + LISTENERS.size
    Rainbows::EventMachine::Server.const_set(:MAX, max)
    Rainbows::EventMachine::Server.const_set(:CL, client_class)
    Rainbows::EventMachine::Client.const_set(:APP, Rainbows.server.app)
    EM.run {
      conns = EM.instance_variable_get(:@conns) or
        raise RuntimeError, "EM @conns instance variable not accessible!"
      Rainbows::EventMachine::Server.const_set(:CUR, conns)
      Rainbows.at_quit do
        EM.next_tick do
          LISTENERS.clear
          conns.each_value do |c|
            case c
            when client_class
              c.quit
            when Rainbows::EventMachine::Server
              c.detach
            end
          end
        end
      end
      EM.add_periodic_timer(1) do
        if ! Rainbows.tick && conns.empty? && defers_finished? &&
            EM.reactor_running?
          EM.stop
        end
      end
      LISTENERS.map! do |s|
        EM.watch(s, Rainbows::EventMachine::Server) do |c|
          c.notify_readable = true
        end
      end
    }
    EM.reactor_thread.join if EM.reactor_running?
  end
end
# :enddoc:
require 'rainbows/event_machine/server'

git clone https://yhbt.net/rainbows.git