From 6bde32081338ce8075854f4c47ce8ca5347df919 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Wed, 5 Jan 2011 17:06:20 -0800 Subject: eliminate G constant and just use the Rainbows! module Code organization is hard :< --- lib/rainbows.rb | 47 ++++++++++++++++-------------- lib/rainbows/base.rb | 10 ++----- lib/rainbows/coolio/client.rb | 3 +- lib/rainbows/coolio/core.rb | 2 +- lib/rainbows/coolio/heartbeat.rb | 3 +- lib/rainbows/coolio_fiber_spawn.rb | 2 +- lib/rainbows/coolio_thread_pool/watcher.rb | 6 ++-- lib/rainbows/error.rb | 12 ++++---- lib/rainbows/ev_core.rb | 1 - lib/rainbows/event_machine.rb | 9 +++--- lib/rainbows/fiber/base.rb | 6 ++-- lib/rainbows/fiber/coolio/heartbeat.rb | 4 +-- lib/rainbows/fiber/coolio/server.rb | 8 ++--- lib/rainbows/fiber_pool.rb | 2 +- lib/rainbows/fiber_spawn.rb | 4 +-- lib/rainbows/http_server.rb | 8 ++--- lib/rainbows/max_body.rb | 4 +-- lib/rainbows/never_block/core.rb | 2 +- lib/rainbows/queue_pool.rb | 4 +-- lib/rainbows/revactor.rb | 4 +-- lib/rainbows/thread_pool.rb | 14 ++++----- lib/rainbows/thread_spawn.rb | 10 +++---- lib/rainbows/thread_timeout.rb | 2 +- lib/rainbows/writer_thread_spawn/client.rb | 3 +- 24 files changed, 77 insertions(+), 93 deletions(-) diff --git a/lib/rainbows.rb b/lib/rainbows.rb index f0b5a23..ae77dbe 100644 --- a/lib/rainbows.rb +++ b/lib/rainbows.rb @@ -13,27 +13,7 @@ Unicorn::SocketHelper::DEFAULTS.merge!({ module Rainbows - # global vars because class/instance variables are confusing me :< - # this struct is only accessed inside workers and thus private to each - # G.cur may not be used in the network concurrency model - # :stopdoc: - class State < Struct.new(:alive,:m,:cur,:server,:tmp,:expire) - def tick - tmp.chmod(self.m = m == 0 ? 1 : 0) - exit!(2) if expire && Time.now >= expire - alive && server.master_pid == Process.ppid or quit! - end - - def quit! - self.alive = false - Rainbows::HttpParser.quit - self.expire ||= Time.now + (server.timeout * 2.0) - server.class.const_get(:LISTENERS).map! { |s| s.close rescue nil } - false - end - end - G = State.new(true, 0, 0) - O = {} + O = {} # :nodoc: class Response416 < RangeError; end # map of numeric file descriptors to IO objects to avoid using IO.new @@ -65,7 +45,7 @@ module Rainbows # with the basic :Coolio or :EventMachine models is not recommended. # This should be used within your Rack application. def self.sleep(nr) - case G.server.use + case Rainbows.server.use when :FiberPool, :FiberSpawn Rainbows::Fiber.sleep(nr) when :RevFiberSpawn, :CoolioFiberSpawn @@ -86,6 +66,10 @@ module Rainbows # :stopdoc: class << self attr_accessor :max_bytes, :keepalive_timeout + attr_accessor :server + attr_accessor :cur # may not always be used + attr_reader :alive + attr_writer :tick_io end # :startdoc: @@ -96,6 +80,25 @@ module Rainbows @keepalive_timeout = 5 # :stopdoc: + @alive = true + @cur = 0 + @tick_mod = 0 + @expire = nil + + def self.tick + @tick_io.chmod(@tick_mod = 0 == @tick_mod ? 1 : 0) + exit!(2) if @expire && Time.now >= @expire + @alive && @server.master_pid == Process.ppid or quit! + end + + def self.quit! + @alive = false + Rainbows::HttpParser.quit + @expire ||= Time.now + (@server.timeout * 2.0) + @server.class.const_get(:LISTENERS).map! { |s| s.close rescue nil } + false + end + # maps models to default worker counts, default worker count numbers are # pretty arbitrary and tuning them to your application and hardware is # highly recommended diff --git a/lib/rainbows/base.rb b/lib/rainbows/base.rb index 5d56063..b94ddc9 100644 --- a/lib/rainbows/base.rb +++ b/lib/rainbows/base.rb @@ -8,16 +8,13 @@ module Rainbows::Base # :stopdoc: - # shortcuts... - G = Rainbows::G - # this method is called by all current concurrency models def init_worker_process(worker) # :nodoc: super(worker) Rainbows::Response.setup(self.class) Rainbows::MaxBody.setup Rainbows::RackInput.setup - G.tmp = worker.tmp + Rainbows.tick_io = worker.tmp listeners = Rainbows::HttpServer::LISTENERS Rainbows::HttpServer::IO_PURGATORY.concat(listeners) @@ -26,9 +23,9 @@ module Rainbows::Base # since we don't defer reopening logs Rainbows::HttpServer::SELF_PIPE.each { |x| x.close }.clear trap(:USR1) { reopen_worker_logs(worker.nr) } - trap(:QUIT) { G.quit! } + trap(:QUIT) { Rainbows.quit! } [:TERM, :INT].each { |sig| trap(sig) { exit!(0) } } # instant shutdown - Rainbows::ProcessClient.const_set(:APP, G.server.app) + Rainbows::ProcessClient.const_set(:APP, Rainbows.server.app) logger.info "Rainbows! #@use worker_connections=#@worker_connections" end @@ -38,7 +35,6 @@ module Rainbows::Base def self.included(klass) # :nodoc: klass.const_set :LISTENERS, Rainbows::HttpServer::LISTENERS - klass.const_set :G, Rainbows::G end # :startdoc: diff --git a/lib/rainbows/coolio/client.rb b/lib/rainbows/coolio/client.rb index 4a19f31..b5430f6 100644 --- a/lib/rainbows/coolio/client.rb +++ b/lib/rainbows/coolio/client.rb @@ -2,7 +2,6 @@ # :enddoc: class Rainbows::Coolio::Client < Coolio::IO include Rainbows::EvCore - G = Rainbows::G SF = Rainbows::StreamFile CONN = Rainbows::Coolio::CONN KATO = Rainbows::Coolio::KATO @@ -175,7 +174,7 @@ class Rainbows::Coolio::Client < Coolio::IO begin @deferred.close rescue => e - G.server.logger.error("closing #@deferred: #{e}") + Rainbows.server.logger.error("closing #@deferred: #{e}") end @deferred = nil end diff --git a/lib/rainbows/coolio/core.rb b/lib/rainbows/coolio/core.rb index 48907ab..b55cf7f 100644 --- a/lib/rainbows/coolio/core.rb +++ b/lib/rainbows/coolio/core.rb @@ -16,7 +16,7 @@ module Rainbows::Coolio::Core Rainbows::Coolio::Client.const_set(:LOOP, rloop) Rainbows::Coolio::Server.const_set(:MAX, @worker_connections) Rainbows::Coolio::Server.const_set(:CL, mod.const_get(:Client)) - Rainbows::EvCore.const_set(:APP, G.server.app) + Rainbows::EvCore.const_set(:APP, Rainbows.server.app) Rainbows::EvCore.setup Rainbows::Coolio::Heartbeat.new(1, true).attach(rloop) LISTENERS.map! { |s| Rainbows::Coolio::Server.new(s).attach(rloop) } diff --git a/lib/rainbows/coolio/heartbeat.rb b/lib/rainbows/coolio/heartbeat.rb index 4506b84..4657155 100644 --- a/lib/rainbows/coolio/heartbeat.rb +++ b/lib/rainbows/coolio/heartbeat.rb @@ -8,13 +8,12 @@ class Rainbows::Coolio::Heartbeat < Coolio::TimerWatcher KATO = Rainbows::Coolio::KATO CONN = Rainbows::Coolio::CONN - G = Rainbows::G def on_timer if (ot = Rainbows.keepalive_timeout) >= 0 ot = Time.now - ot KATO.delete_if { |client, time| time < ot and client.timeout? } end - exit if (! G.tick && CONN.size <= 0) + exit if (! Rainbows.tick && CONN.size <= 0) end end diff --git a/lib/rainbows/coolio_fiber_spawn.rb b/lib/rainbows/coolio_fiber_spawn.rb index 9c5af5f..094e18e 100644 --- a/lib/rainbows/coolio_fiber_spawn.rb +++ b/lib/rainbows/coolio_fiber_spawn.rb @@ -18,7 +18,7 @@ module Rainbows::CoolioFiberSpawn init_worker_process(worker) Server.const_set(:MAX, @worker_connections) Rainbows::Fiber::Base.setup(Server, nil) - Server.const_set(:APP, G.server.app) + Server.const_set(:APP, Rainbows.server.app) Heartbeat.new(1, true).attach(Coolio::Loop.default) LISTENERS.map! { |s| Server.new(s).attach(Coolio::Loop.default) } Coolio::Loop.default.run diff --git a/lib/rainbows/coolio_thread_pool/watcher.rb b/lib/rainbows/coolio_thread_pool/watcher.rb index 9b0e97e..6537ebc 100644 --- a/lib/rainbows/coolio_thread_pool/watcher.rb +++ b/lib/rainbows/coolio_thread_pool/watcher.rb @@ -1,14 +1,12 @@ # -*- encoding: binary -*- # :enddoc: class Rainbows::CoolioThreadPool::Watcher < Coolio::TimerWatcher - G = Rainbows::G - def initialize(threads) @threads = threads - super(G.server.timeout, true) + super(Rainbows.server.timeout, true) end def on_timer - @threads.each { |t| t.join(0) and G.quit! } + @threads.each { |t| t.join(0) and Rainbows.quit! } end end diff --git a/lib/rainbows/error.rb b/lib/rainbows/error.rb index 68cdec4..fa6da61 100644 --- a/lib/rainbows/error.rb +++ b/lib/rainbows/error.rb @@ -2,8 +2,6 @@ # :enddoc: module Rainbows::Error - G = Rainbows::G - # if we get any error, try to write something back to the client # assuming we haven't closed the socket, but don't get hung up # if the socket is already closed or broken. We'll always ensure @@ -20,15 +18,15 @@ module Rainbows::Error end def self.app(e) - G.server.logger.error "app error: #{e.inspect}" - G.server.logger.error e.backtrace.join("\n") + Rainbows.server.logger.error "app error: #{e.inspect}" + Rainbows.server.logger.error e.backtrace.join("\n") rescue end def self.listen_loop(e) - G.alive or return - G.server.logger.error "listen loop error: #{e.inspect}." - G.server.logger.error e.backtrace.join("\n") + Rainbows.alive or return + Rainbows.server.logger.error "listen loop error: #{e.inspect}." + Rainbows.server.logger.error e.backtrace.join("\n") rescue end diff --git a/lib/rainbows/ev_core.rb b/lib/rainbows/ev_core.rb index 471f6a3..a8bedce 100644 --- a/lib/rainbows/ev_core.rb +++ b/lib/rainbows/ev_core.rb @@ -4,7 +4,6 @@ module Rainbows::EvCore include Rainbows::Const include Rainbows::Response - G = Rainbows::G NULL_IO = Unicorn::HttpRequest::NULL_IO HttpParser = Rainbows::HttpParser autoload :CapInput, 'rainbows/ev_core/cap_input' diff --git a/lib/rainbows/event_machine.rb b/lib/rainbows/event_machine.rb index cb76669..990a186 100644 --- a/lib/rainbows/event_machine.rb +++ b/lib/rainbows/event_machine.rb @@ -58,8 +58,9 @@ module Rainbows::EventMachine # given a INT, QUIT, or TERM signal) def worker_loop(worker) # :nodoc: init_worker_process(worker) - G.server.app.respond_to?(:deferred?) and - G.server.app = Rainbows::EventMachine::TryDefer[G.server.app] + server = Rainbows.server + server.app.respond_to?(:deferred?) and + server.app = TryDefer.new(server.app) # enable them both, should be non-fatal if not supported EM.epoll @@ -69,14 +70,14 @@ module Rainbows::EventMachine max = worker_connections + LISTENERS.size Rainbows::EventMachine::Server.const_set(:MAX, max) Rainbows::EventMachine::Server.const_set(:CL, client_class) - client_class.const_set(:APP, G.server.app) + client_class.const_set(:APP, Rainbows.server.app) Rainbows::EvCore.setup EM.run { conns = EM.instance_variable_get(:@conns) or raise RuntimeError, "EM @conns instance variable not accessible!" Rainbows::EventMachine::Server.const_set(:CUR, conns) EM.add_periodic_timer(1) do - unless G.tick + unless Rainbows.tick conns.each_value { |c| client_class === c and c.quit } EM.stop if conns.empty? && EM.reactor_running? end diff --git a/lib/rainbows/fiber/base.rb b/lib/rainbows/fiber/base.rb index ae885b6..126f338 100644 --- a/lib/rainbows/fiber/base.rb +++ b/lib/rainbows/fiber/base.rb @@ -19,7 +19,7 @@ module Rainbows::Fiber::Base # will cause it. def schedule(&block) begin - G.tick + Rainbows.tick t = schedule_sleepers ret = select(RD.compact.concat(LISTENERS), WR.compact, nil, t) rescue Errno::EINTR @@ -56,10 +56,10 @@ module Rainbows::Fiber::Base end def process(client) - G.cur += 1 + Rainbows.cur += 1 client.process_loop ensure - G.cur -= 1 + Rainbows.cur -= 1 ZZ.delete(client.f) end diff --git a/lib/rainbows/fiber/coolio/heartbeat.rb b/lib/rainbows/fiber/coolio/heartbeat.rb index f48f7ef..6b1e4f9 100644 --- a/lib/rainbows/fiber/coolio/heartbeat.rb +++ b/lib/rainbows/fiber/coolio/heartbeat.rb @@ -1,12 +1,10 @@ # -*- encoding: binary -*- # :enddoc: class Rainbows::Fiber::Coolio::Heartbeat < Coolio::TimerWatcher - G = Rainbows::G - # ZZ gets populated by read_expire in rainbows/fiber/io/methods ZZ = Rainbows::Fiber::ZZ def on_timer - exit if (! G.tick && G.cur <= 0) + exit if (! Rainbows.tick && Rainbows.cur <= 0) now = Time.now fibs = [] ZZ.delete_if { |fib, time| now >= time ? fibs << fib : ! fib.alive? } diff --git a/lib/rainbows/fiber/coolio/server.rb b/lib/rainbows/fiber/coolio/server.rb index b064953..3d8d85e 100644 --- a/lib/rainbows/fiber/coolio/server.rb +++ b/lib/rainbows/fiber/coolio/server.rb @@ -1,8 +1,6 @@ # -*- encoding: binary -*- # :enddoc: class Rainbows::Fiber::Coolio::Server < Coolio::IOWatcher - G = Rainbows::G - def to_io @io end @@ -18,14 +16,14 @@ class Rainbows::Fiber::Coolio::Server < Coolio::IOWatcher end def on_readable - return if G.cur >= MAX + return if Rainbows.cur >= MAX c = @io.kgio_tryaccept and Fiber.new { process(c) }.resume end def process(io) - G.cur += 1 + Rainbows.cur += 1 io.process_loop ensure - G.cur -= 1 + Rainbows.cur -= 1 end end diff --git a/lib/rainbows/fiber_pool.rb b/lib/rainbows/fiber_pool.rb index c62345d..229f560 100644 --- a/lib/rainbows/fiber_pool.rb +++ b/lib/rainbows/fiber_pool.rb @@ -34,6 +34,6 @@ module Rainbows::FiberPool end rescue => e Rainbows::Error.listen_loop(e) - end while G.alive || G.cur > 0 + end while Rainbows.alive || Rainbows.cur > 0 end end diff --git a/lib/rainbows/fiber_spawn.rb b/lib/rainbows/fiber_spawn.rb index 17bd884..84df30d 100644 --- a/lib/rainbows/fiber_spawn.rb +++ b/lib/rainbows/fiber_spawn.rb @@ -17,12 +17,12 @@ module Rainbows::FiberSpawn begin schedule do |l| - break if G.cur >= limit + break if Rainbows.cur >= limit io = l.kgio_tryaccept or next Fiber.new { process(io) }.resume end rescue => e Rainbows::Error.listen_loop(e) - end while G.alive || G.cur > 0 + end while Rainbows.alive || Rainbows.cur > 0 end end diff --git a/lib/rainbows/http_server.rb b/lib/rainbows/http_server.rb index 13b731b..84d5a32 100644 --- a/lib/rainbows/http_server.rb +++ b/lib/rainbows/http_server.rb @@ -2,14 +2,12 @@ # :enddoc: class Rainbows::HttpServer < Unicorn::HttpServer - G = Rainbows::G - def self.setup(block) - G.server.instance_eval(&block) + Rainbows.server.instance_eval(&block) end def initialize(app, options) - G.server = self + Rainbows.server = self @logger = Unicorn::Configurator::DEFAULTS[:logger] rv = super(app, options) defined?(@use) or use(:Base) @@ -21,7 +19,7 @@ class Rainbows::HttpServer < Unicorn::HttpServer Unicorn::Util.reopen_logs logger.info "worker=#{worker_nr} done reopening logs" rescue - G.quit! # let the master reopen and refork us + Rainbows.quit! # let the master reopen and refork us end # Add one second to the timeout since our fchmod heartbeat is less diff --git a/lib/rainbows/max_body.rb b/lib/rainbows/max_body.rb index 878b04d..808f762 100644 --- a/lib/rainbows/max_body.rb +++ b/lib/rainbows/max_body.rb @@ -52,7 +52,7 @@ class Rainbows::MaxBody # if it's reconfigured def self.setup # :nodoc: Rainbows.max_bytes or return - case Rainbows::G.server.use + case Rainbows.server.use when :Rev, :Coolio, :EventMachine, :NeverBlock, :RevThreadSpawn, :RevThreadPool, :CoolioThreadSpawn, :CoolioThreadPool @@ -60,7 +60,7 @@ class Rainbows::MaxBody end # force ourselves to the outermost middleware layer - Rainbows::G.server.app = self.new(Rainbows::G.server.app) + Rainbows.server.app = self.new(Rainbows.server.app) end # Rack response returned when there's an error diff --git a/lib/rainbows/never_block/core.rb b/lib/rainbows/never_block/core.rb index 7188f75..611b3c3 100644 --- a/lib/rainbows/never_block/core.rb +++ b/lib/rainbows/never_block/core.rb @@ -8,7 +8,7 @@ module Rainbows::NeverBlock::Core base = o[:backend].to_s.gsub!(/([a-z])([A-Z])/, '\1_\2').downcase! require "rainbows/never_block/#{base}" client_class = Rainbows::NeverBlock::Client - client_class.superclass.const_set(:APP, Rainbows::G.server.app) + client_class.superclass.const_set(:APP, Rainbows.server.app) client_class.const_set(:POOL, pool) logger.info "NeverBlock/#{o[:backend]} pool_size=#{o[:pool_size]}" end diff --git a/lib/rainbows/queue_pool.rb b/lib/rainbows/queue_pool.rb index 4a2ab8c..99cb9db 100644 --- a/lib/rainbows/queue_pool.rb +++ b/lib/rainbows/queue_pool.rb @@ -6,8 +6,6 @@ require 'thread' # This is NOT used for the ThreadPool class, since that class does not # need a userspace Queue. class Rainbows::QueuePool < Struct.new(:queue, :threads) - G = Rainbows::G - def initialize(size = 20, &block) q = Queue.new self.threads = (1..size).map do @@ -23,7 +21,7 @@ class Rainbows::QueuePool < Struct.new(:queue, :threads) def quit! threads.each { |_| queue << nil } threads.delete_if do |t| - G.tick + Rainbows.tick t.alive? ? t.join(0.01) : true end until threads.empty? end diff --git a/lib/rainbows/revactor.rb b/lib/rainbows/revactor.rb index be4badf..a335835 100644 --- a/lib/rainbows/revactor.rb +++ b/lib/rainbows/revactor.rb @@ -60,7 +60,7 @@ module Rainbows::Revactor end rescue => e Rainbows::Error.listen_loop(e) - end while G.alive + end while Rainbows.alive Actor.receive do |f| f.when(close) {} f.when(actor_exit) { nr -= 1 } @@ -68,7 +68,7 @@ module Rainbows::Revactor end end - Actor.sleep 1 while G.tick || nr > 0 + Actor.sleep 1 while Rainbows.tick || nr > 0 rescue Errno::EMFILE # ignore, let another worker process take it end diff --git a/lib/rainbows/thread_pool.rb b/lib/rainbows/thread_pool.rb index c82e22a..3b8e68e 100644 --- a/lib/rainbows/thread_pool.rb +++ b/lib/rainbows/thread_pool.rb @@ -28,11 +28,11 @@ module Rainbows::ThreadPool Thread.new { LISTENERS.size == 1 ? sync_worker : async_worker } end - while G.alive + while Rainbows.alive # if any worker dies, something is serious wrong, bail pool.each do |thr| - G.tick or break - thr.join(1) and G.quit! + Rainbows.tick or break + thr.join(1) and Rainbows.quit! end end join_threads(pool) @@ -44,7 +44,7 @@ module Rainbows::ThreadPool c = s.kgio_accept and c.process_loop rescue => e Rainbows::Error.listen_loop(e) - end while G.alive + end while Rainbows.alive end def async_worker # :nodoc: @@ -60,13 +60,13 @@ module Rainbows::ThreadPool rescue Errno::EINTR rescue => e Rainbows::Error.listen_loop(e) - end while G.alive + end while Rainbows.alive end def join_threads(threads) # :nodoc: - G.quit! + Rainbows.quit! threads.delete_if do |thr| - G.tick + Rainbows.tick begin thr.run thr.join(0.01) diff --git a/lib/rainbows/thread_spawn.rb b/lib/rainbows/thread_spawn.rb index d2d41e8..a0520d1 100644 --- a/lib/rainbows/thread_spawn.rb +++ b/lib/rainbows/thread_spawn.rb @@ -25,24 +25,24 @@ module Rainbows::ThreadSpawn LISTENERS.each do |l| klass.new(l) do |l| begin - if lock.synchronize { G.cur >= limit } + if lock.synchronize { Rainbows.cur >= limit } worker_yield elsif c = l.kgio_accept klass.new(c) do |c| begin - lock.synchronize { G.cur += 1 } + lock.synchronize { Rainbows.cur += 1 } c.process_loop ensure - lock.synchronize { G.cur -= 1 } + lock.synchronize { Rainbows.cur -= 1 } end end end rescue => e Rainbows::Error.listen_loop(e) - end while G.alive + end while Rainbows.alive end end - sleep 1 while G.tick || lock.synchronize { G.cur > 0 } + sleep 1 while Rainbows.tick || lock.synchronize { Rainbows.cur > 0 } end def worker_loop(worker) #:nodoc: diff --git a/lib/rainbows/thread_timeout.rb b/lib/rainbows/thread_timeout.rb index dc34804..920270d 100644 --- a/lib/rainbows/thread_timeout.rb +++ b/lib/rainbows/thread_timeout.rb @@ -47,7 +47,7 @@ class Rainbows::ThreadTimeout @threshold == 0 and raise ArgumentError, "threshold=0 does not make sense" @threshold < 0 and - @threshold += Rainbows::G.server.worker_connections + @threshold += Rainbows.server.worker_connections end @app = app @active = {} diff --git a/lib/rainbows/writer_thread_spawn/client.rb b/lib/rainbows/writer_thread_spawn/client.rb index 15264d0..b4166fa 100644 --- a/lib/rainbows/writer_thread_spawn/client.rb +++ b/lib/rainbows/writer_thread_spawn/client.rb @@ -54,10 +54,9 @@ class Rainbows::WriterThreadSpawn::Client < Struct.new(:to_io, :q, :thr) include Methods def self.quit - g = Rainbows::G CUR.delete_if do |t,q| q << nil - g.tick + Rainbows.tick t.alive? ? t.join(0.01) : true end until CUR.empty? end -- cgit v1.2.3-24-ge0c7