From a764983fccd6cce64043d76e09a5e1718e7f8fd6 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Sat, 17 Oct 2009 22:42:56 -0700 Subject: refactor graceful shutdowns again, harder We use the "G" global constant from the Rev model everywhere to simplify things a little. Test cases are more consistent now, too. --- lib/rainbows/base.rb | 22 ++++++++++++++-------- lib/rainbows/rev.rb | 32 ++++++++++++++++++-------------- lib/rainbows/revactor.rb | 18 +++++------------- lib/rainbows/thread_pool.rb | 19 +++++++++---------- lib/rainbows/thread_spawn.rb | 5 +++-- 5 files changed, 49 insertions(+), 47 deletions(-) (limited to 'lib/rainbows') diff --git a/lib/rainbows/base.rb b/lib/rainbows/base.rb index a78262e..95d6545 100644 --- a/lib/rainbows/base.rb +++ b/lib/rainbows/base.rb @@ -8,6 +8,7 @@ module Rainbows include Unicorn include Rainbows::Const + G = Rainbows::G # write a response without caring if it went out or not for error # messages. @@ -17,22 +18,28 @@ module Rainbows client.close rescue nil end - # TODO: migrate into Unicorn::HttpServer def listen_loop_error(e) - return if HttpServer::LISTENERS.first.nil? || IOError === e + G.alive or return logger.error "Unhandled listen loop exception #{e.inspect}." logger.error e.backtrace.join("\n") end def init_worker_process(worker) super(worker) + G.cur = 0 + G.max = worker_connections + G.logger = logger + G.app = app # we're don't use the self-pipe mechanism in the Rainbows! worker # since we don't defer reopening logs HttpServer::SELF_PIPE.each { |x| x.close }.clear trap(:USR1) { reopen_worker_logs(worker.nr) rescue nil } - # closing anything we IO.select on will raise EBADF - trap(:QUIT) { HttpServer::LISTENERS.map! { |s| s.close rescue nil } } + trap(:QUIT) do + G.alive = false + # closing anything we IO.select on will raise EBADF + HttpServer::LISTENERS.map! { |s| s.close rescue nil } + end [:TERM, :INT].each { |sig| trap(sig) { exit!(0) } } # instant shutdown logger.info "Rainbows! #@use worker_connections=#@worker_connections" end @@ -63,7 +70,7 @@ module Rainbows response = app.call(env) end - alive = hp.keepalive? && ! Thread.current[:quit] + alive = hp.keepalive? && G.alive out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if hp.headers? HttpResponse.write(client, response, out) end while alive and hp.reset.nil? and env.clear @@ -83,8 +90,7 @@ module Rainbows end def join_threads(threads, worker) - logger.info "Joining threads..." - threads.each { |thr| thr[:quit] = true } + Rainbows::G.alive = false expire = Time.now + (timeout * 2.0) m = 0 while (nr = threads.count { |thr| thr.alive? }) > 0 @@ -94,11 +100,11 @@ module Rainbows break if Time.now >= expire } end - logger.info "Done joining threads. #{nr} left running" end def self.included(klass) klass.const_set :LISTENERS, HttpServer::LISTENERS + klass.const_set :G, Rainbows::G end end diff --git a/lib/rainbows/rev.rb b/lib/rainbows/rev.rb index 7e5ca27..7d941f6 100644 --- a/lib/rainbows/rev.rb +++ b/lib/rainbows/rev.rb @@ -33,19 +33,15 @@ module Rainbows module Rev - # global vars because class/instance variables are confusing me :< - # this struct is only accessed inside workers and thus private to each - G = Struct.new(:nr, :max, :logger, :alive, :app).new - include Base class Client < ::Rev::IO include Unicorn include Rainbows::Const - G = Rainbows::Rev::G + G = Rainbows::G def initialize(io) - G.nr += 1 + G.cur += 1 super(io) @remote_addr = ::TCPSocket === io ? io.peeraddr.last : LOCALHOST @env = {} @@ -91,7 +87,7 @@ module Rainbows end def on_close - G.nr -= 1 + G.cur -= 1 end def tmpio @@ -143,10 +139,10 @@ module Rainbows end class Server < ::Rev::IO - G = Rainbows::Rev::G + G = Rainbows::G def on_readable - return if G.nr >= G.max + return if G.cur >= G.max begin Client.new(@_io.accept_nonblock).attach(::Rev::Loop.default) rescue Errno::EAGAIN, Errno::ECONNBORTED @@ -160,13 +156,21 @@ module Rainbows # given a INT, QUIT, or TERM signal) def worker_loop(worker) init_worker_process(worker) - G.nr = 0 - G.max = worker_connections - G.alive = true - G.logger = logger - G.app = app + graceful_waiter = nil + trap(:QUIT) do + G.alive = false + LISTENERS.map! { |s| s.close rescue nil } + # Rev may get stuck in a loop with no events possible, spawn a new + # thread to join on graceful exits when our client count goes to zero + graceful_waiter = Thread.new { + sleep(0.1) while G.cur > 0 + exit + } + end + LISTENERS.map! { |s| Server.new(s).attach(::Rev::Loop.default) } ::Rev::Loop.default.run + graceful_waiter.join(timeout * 2.0) end end diff --git a/lib/rainbows/revactor.rb b/lib/rainbows/revactor.rb index f61de97..3db1062 100644 --- a/lib/rainbows/revactor.rb +++ b/lib/rainbows/revactor.rb @@ -55,7 +55,7 @@ module Rainbows response = app.call(env) end - alive = hp.keepalive? && ! Actor.current[:quit] + alive = hp.keepalive? && G.alive out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if hp.headers? HttpResponse.write(client, response, out) end while alive and hp.reset.nil? and env.clear @@ -86,7 +86,6 @@ module Rainbows limit = worker_connections revactorize_listeners! clients = {} - alive = true listeners = LISTENERS.map do |s| Actor.spawn(s) do |l| @@ -99,23 +98,16 @@ module Rainbows clients[actor.object_id] = actor root.link(actor) rescue Errno::EAGAIN, Errno::ECONNABORTED - rescue Errno::EBADF - break rescue Object => e - listen_loop_error(e) if alive - end while alive + listen_loop_error(e) + end while G.alive end end m = 0 check_quit = lambda do worker.tmp.chmod(m = 0 == m ? 1 : 0) - if listeners.any? { |l| l.dead? } || - master_pid != Process.ppid || - LISTENERS.first.nil? - alive = false - clients.each_value { |a| a[:quit] = true } - end + G.alive = false if master_pid != Process.ppid end begin @@ -128,7 +120,7 @@ module Rainbows check_quit.call end end - end while alive || clients.size > 0 + end while G.alive || clients.size > 0 end private diff --git a/lib/rainbows/thread_pool.rb b/lib/rainbows/thread_pool.rb index c742e5d..30e8f69 100644 --- a/lib/rainbows/thread_pool.rb +++ b/lib/rainbows/thread_pool.rb @@ -30,7 +30,7 @@ module Rainbows pool = (1..worker_connections).map { new_worker_thread } m = 0 - while LISTENERS.first && master_pid == Process.ppid + while G.alive && master_pid == Process.ppid pool.each do |thr| worker.tmp.chmod(m = 0 == m ? 1 : 0) # if any worker dies, something is serious wrong, bail @@ -44,21 +44,20 @@ module Rainbows Thread.new { begin begin - ret = IO.select(LISTENERS, nil, nil, timeout) or next - ret.first.each do |sock| - begin - process_client(sock.accept_nonblock) - rescue Errno::EAGAIN, Errno::ECONNABORTED - end - end + ret = IO.select(LISTENERS, nil, nil, 1) and + ret.first.each do |sock| + begin + process_client(sock.accept_nonblock) + rescue Errno::EAGAIN, Errno::ECONNABORTED + end + end rescue Errno::EINTR - next rescue Errno::EBADF, TypeError break end rescue Object => e listen_loop_error(e) - end while ! Thread.current[:quit] && LISTENERS.first + end while G.alive } end diff --git a/lib/rainbows/thread_spawn.rb b/lib/rainbows/thread_spawn.rb index 104e764..39934a6 100644 --- a/lib/rainbows/thread_spawn.rb +++ b/lib/rainbows/thread_spawn.rb @@ -27,9 +27,10 @@ module Rainbows limit = worker_connections begin + G.alive && master_pid == Process.ppid or break ret = begin alive.chmod(m = 0 == m ? 1 : 0) - IO.select(LISTENERS, nil, nil, timeout) or next + IO.select(LISTENERS, nil, nil, 1) or next rescue Errno::EINTR retry rescue Errno::EBADF, TypeError @@ -55,7 +56,7 @@ module Rainbows end rescue Object => e listen_loop_error(e) - end while LISTENERS.first && master_pid == Process.ppid + end while true join_threads(threads.list, worker) end -- cgit v1.2.3-24-ge0c7