From 1a9a718a3f9a5b582a4a339a9bb9249c2ca392d7 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Fri, 6 Nov 2009 19:45:17 -0800 Subject: cleanup worker heartbeat and master deathwatch It turns out neither the EventMachine and Rev classes checked for master death in its heartbeat mechanism. Since we managed to forget the same thing twice, we now have a test case for it and also centralized the code to remove duplication. --- lib/rainbows.rb | 15 +++++++++++++-- lib/rainbows/base.rb | 19 ++++++------------- lib/rainbows/ev_core.rb | 4 ++-- lib/rainbows/event_machine.rb | 21 +++++++++------------ lib/rainbows/rev.rb | 8 +++++--- lib/rainbows/rev/heartbeat.rb | 8 +------- lib/rainbows/revactor.rb | 10 ++-------- lib/rainbows/thread_pool.rb | 11 +++++------ lib/rainbows/thread_spawn.rb | 9 +++------ 9 files changed, 46 insertions(+), 59 deletions(-) (limited to 'lib') diff --git a/lib/rainbows.rb b/lib/rainbows.rb index a8985c6..5bd8693 100644 --- a/lib/rainbows.rb +++ b/lib/rainbows.rb @@ -5,9 +5,20 @@ module Rainbows # global vars because class/instance variables are confusing me :< # this struct is only accessed inside workers and thus private to each - G = Struct.new(:cur, :max, :logger, :alive, :app).new # G.cur may not be used the network concurrency model - G.alive = true + class State < Struct.new(:alive,:m,:cur,:server,:tmp) + def tick + tmp.chmod(self.m = m == 0 ? 1 : 0) + alive && server.master_pid == Process.ppid or quit! + end + + def quit! + self.alive = false + server.class.const_get(:LISTENERS).map! { |s| s.close rescue nil } + false + end + end + G = State.new(true, 0, 0) require 'rainbows/const' require 'rainbows/http_server' diff --git a/lib/rainbows/base.rb b/lib/rainbows/base.rb index 9da148c..9b50d9a 100644 --- a/lib/rainbows/base.rb +++ b/lib/rainbows/base.rb @@ -26,20 +26,14 @@ module Rainbows def init_worker_process(worker) super(worker) - G.cur = 0 - G.max = worker_connections - G.logger = logger - G.app = app + G.server = self + G.tmp = worker.tmp # we're don't use the self-pipe mechanism in the Rainbows! worker # since we don't defer reopening logs HttpServer::SELF_PIPE.each { |x| x.close }.clear trap(:USR1) { reopen_worker_logs(worker.nr) rescue nil } - trap(:QUIT) do - G.alive = false - # closing anything we IO.select on will raise EBADF - HttpServer::LISTENERS.map! { |s| s.close rescue nil } - end + trap(:QUIT) { G.quit! } [:TERM, :INT].each { |sig| trap(sig) { exit!(0) } } # instant shutdown logger.info "Rainbows! #@use worker_connections=#@worker_connections" end @@ -89,13 +83,12 @@ module Rainbows logger.error e.backtrace.join("\n") end - def join_threads(threads, worker) - Rainbows::G.alive = false + def join_threads(threads) + G.quit! expire = Time.now + (timeout * 2.0) - m = 0 until (threads.delete_if { |thr| ! thr.alive? }).empty? threads.each { |thr| - worker.tmp.chmod(m = 0 == m ? 1 : 0) + G.tick thr.join(1) break if Time.now >= expire } diff --git a/lib/rainbows/ev_core.rb b/lib/rainbows/ev_core.rb index 2679b5a..244e726 100644 --- a/lib/rainbows/ev_core.rb +++ b/lib/rainbows/ev_core.rb @@ -28,8 +28,8 @@ module Rainbows when HttpParserError # try to tell the client they're bad ERROR_400_RESPONSE else - G.logger.error "Read error: #{e.inspect}" - G.logger.error e.backtrace.join("\n") + G.server.logger.error "Read error: #{e.inspect}" + G.server.logger.error e.backtrace.join("\n") ERROR_500_RESPONSE end write(msg) diff --git a/lib/rainbows/event_machine.rb b/lib/rainbows/event_machine.rb index 196fbca..5c25ade 100644 --- a/lib/rainbows/event_machine.rb +++ b/lib/rainbows/event_machine.rb @@ -57,7 +57,7 @@ module Rainbows @env[REMOTE_ADDR] = @remote_addr @env[ASYNC_CALLBACK] = method(:response_write) - response = catch(:async) { G.app.call(@env.update(RACK_DEFAULTS)) } + response = catch(:async) { APP.call(@env.update(RACK_DEFAULTS)) } # too tricky to support pipelining with :async since the # second (pipelined) request could be a stuck behind a @@ -166,22 +166,17 @@ module Rainbows module Server - def initialize(conns) - @limit = Rainbows::G.max + HttpServer::LISTENERS.size - @em_conns = conns - end - def close detach @io.close end def notify_readable - return if @em_conns.size >= @limit + return if CUR.size >= MAX begin io = @io.accept_nonblock sig = EM.attach_fd(io.fileno, false) - @em_conns[sig] = Client.new(sig, io) + CUR[sig] = Client.new(sig, io) rescue Errno::EAGAIN, Errno::ECONNABORTED end end @@ -192,24 +187,26 @@ module Rainbows # given a INT, QUIT, or TERM signal) def worker_loop(worker) init_worker_process(worker) - m = 0 # enable them both, should be non-fatal if not supported EM.epoll EM.kqueue logger.info "EventMachine: epoll=#{EM.epoll?} kqueue=#{EM.kqueue?}" + Client.const_set(:APP, G.server.app) + Server.const_set(:MAX, G.server.worker_connections + + HttpServer::LISTENERS.size) EM.run { conns = EM.instance_variable_get(:@conns) or raise RuntimeError, "EM @conns instance variable not accessible!" + Server.const_set(:CUR, conns) EM.add_periodic_timer(1) do - worker.tmp.chmod(m = 0 == m ? 1 : 0) - unless G.alive + unless G.tick conns.each_value { |client| Client === client and client.quit } EM.stop if conns.empty? && EM.reactor_running? end end LISTENERS.map! do |s| - EM.watch(s, Server, conns) { |c| c.notify_readable = true } + EM.watch(s, Server) { |c| c.notify_readable = true } end } end diff --git a/lib/rainbows/rev.rb b/lib/rainbows/rev.rb index c4c77bd..66f6ed1 100644 --- a/lib/rainbows/rev.rb +++ b/lib/rainbows/rev.rb @@ -51,7 +51,7 @@ module Rainbows (@env[RACK_INPUT] = @input).rewind alive = @hp.keepalive? @env[REMOTE_ADDR] = @remote_addr - response = G.app.call(@env.update(RACK_DEFAULTS)) + response = APP.call(@env.update(RACK_DEFAULTS)) alive &&= G.alive out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if @hp.headers? @@ -97,7 +97,7 @@ module Rainbows G = Rainbows::G def on_readable - return if G.cur >= G.max + return if G.cur >= MAX begin Client.new(@_io.accept_nonblock).attach(::Rev::Loop.default) rescue Errno::EAGAIN, Errno::ECONNABORTED @@ -173,8 +173,10 @@ module Rainbows # given a INT, QUIT, or TERM signal) def worker_loop(worker) init_worker_process(worker) + Client.const_set(:APP, G.server.app) + Server.const_set(:MAX, G.server.worker_connections) rloop = ::Rev::Loop.default - Heartbeat.new(worker.tmp).attach(rloop) + Heartbeat.new(1, true).attach(rloop) LISTENERS.map! { |s| Server.new(s).attach(rloop) } rloop.run end diff --git a/lib/rainbows/rev/heartbeat.rb b/lib/rainbows/rev/heartbeat.rb index 755b136..63eb71d 100644 --- a/lib/rainbows/rev/heartbeat.rb +++ b/lib/rainbows/rev/heartbeat.rb @@ -11,15 +11,9 @@ module Rainbows # will also detect and execute the graceful exit if triggered # by SIGQUIT class Heartbeat < ::Rev::TimerWatcher - # +tmp+ must be a +File+ that responds to +chmod+ - def initialize(tmp) - @m, @tmp = 0, tmp - super(1, true) - end def on_timer - @tmp.chmod(@m = 0 == @m ? 1 : 0) - exit if (! G.alive && G.cur <= 0) + exit if (! G.tick && G.cur <= 0) end end diff --git a/lib/rainbows/revactor.rb b/lib/rainbows/revactor.rb index ddcbc04..003b704 100644 --- a/lib/rainbows/revactor.rb +++ b/lib/rainbows/revactor.rb @@ -101,20 +101,14 @@ module Rainbows end end - m = 0 - check_quit = lambda do - worker.tmp.chmod(m = 0 == m ? 1 : 0) - G.alive = false if master_pid != Process.ppid - end - begin Actor.receive do |filter| - filter.after(1, &check_quit) + filter.after(1) { G.tick } filter.when(Case[:exit, Actor, Object]) do |_,actor,_| orig = clients.size clients.delete(actor.object_id) orig >= limit and listeners.each { |l| l << :resume } - check_quit.call + G.tick end end end while G.alive || clients.size > 0 diff --git a/lib/rainbows/thread_pool.rb b/lib/rainbows/thread_pool.rb index 280ba40..7934dc8 100644 --- a/lib/rainbows/thread_pool.rb +++ b/lib/rainbows/thread_pool.rb @@ -28,16 +28,15 @@ module Rainbows def worker_loop(worker) init_worker_process(worker) pool = (1..worker_connections).map { new_worker_thread } - m = 0 - while G.alive && master_pid == Process.ppid + while G.alive + # if any worker dies, something is serious wrong, bail pool.each do |thr| - worker.tmp.chmod(m = 0 == m ? 1 : 0) - # if any worker dies, something is serious wrong, bail - thr.join(1) and break + G.tick + thr.join(1) and G.quit! end end - join_threads(pool, worker) + join_threads(pool) end def new_worker_thread diff --git a/lib/rainbows/thread_spawn.rb b/lib/rainbows/thread_spawn.rb index 39934a6..a3068c9 100644 --- a/lib/rainbows/thread_spawn.rb +++ b/lib/rainbows/thread_spawn.rb @@ -22,21 +22,18 @@ module Rainbows def worker_loop(worker) init_worker_process(worker) threads = ThreadGroup.new - alive = worker.tmp - m = 0 limit = worker_connections begin - G.alive && master_pid == Process.ppid or break ret = begin - alive.chmod(m = 0 == m ? 1 : 0) + G.tick or break IO.select(LISTENERS, nil, nil, 1) or next rescue Errno::EINTR retry rescue Errno::EBADF, TypeError break end - alive.chmod(m = 0 == m ? 1 : 0) + G.tick ret.first.each do |l| # Sleep if we're busy, another less busy worker process may @@ -57,7 +54,7 @@ module Rainbows rescue Object => e listen_loop_error(e) end while true - join_threads(threads.list, worker) + join_threads(threads.list) end end -- cgit v1.2.3-24-ge0c7