From a764983fccd6cce64043d76e09a5e1718e7f8fd6 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Sat, 17 Oct 2009 22:42:56 -0700 Subject: refactor graceful shutdowns again, harder We use the "G" global constant from the Rev model everywhere to simplify things a little. Test cases are more consistent now, too. --- lib/rainbows.rb | 6 +++++ lib/rainbows/base.rb | 22 ++++++++++------- lib/rainbows/rev.rb | 32 ++++++++++++++----------- lib/rainbows/revactor.rb | 18 ++++---------- lib/rainbows/thread_pool.rb | 19 +++++++-------- lib/rainbows/thread_spawn.rb | 5 ++-- t/lib-graceful.sh | 47 ++++++++++++++++++++++++++++++++++++ t/t1002-thread-pool-graceful.sh | 37 +--------------------------- t/t2002-thread-spawn-graceful.sh | 37 +--------------------------- t/t3002-revactor-graceful.sh | 38 +---------------------------- t/t4002-rev-graceful.sh | 52 +--------------------------------------- 11 files changed, 106 insertions(+), 207 deletions(-) create mode 100644 t/lib-graceful.sh diff --git a/lib/rainbows.rb b/lib/rainbows.rb index 7978288..096f700 100644 --- a/lib/rainbows.rb +++ b/lib/rainbows.rb @@ -3,6 +3,12 @@ require 'unicorn' module Rainbows + # global vars because class/instance variables are confusing me :< + # this struct is only accessed inside workers and thus private to each + G = Struct.new(:cur, :max, :logger, :alive, :app).new + # G.cur may not be used the network concurrency model + G.alive = true + require 'rainbows/const' require 'rainbows/http_server' require 'rainbows/http_response' diff --git a/lib/rainbows/base.rb b/lib/rainbows/base.rb index a78262e..95d6545 100644 --- a/lib/rainbows/base.rb +++ b/lib/rainbows/base.rb @@ -8,6 +8,7 @@ module Rainbows include Unicorn include Rainbows::Const + G = Rainbows::G # write a response without caring if it went out or not for error # messages. @@ -17,22 +18,28 @@ module Rainbows client.close rescue nil end - # TODO: migrate into Unicorn::HttpServer def listen_loop_error(e) - return if HttpServer::LISTENERS.first.nil? || IOError === e + G.alive or return logger.error "Unhandled listen loop exception #{e.inspect}." logger.error e.backtrace.join("\n") end def init_worker_process(worker) super(worker) + G.cur = 0 + G.max = worker_connections + G.logger = logger + G.app = app # we're don't use the self-pipe mechanism in the Rainbows! worker # since we don't defer reopening logs HttpServer::SELF_PIPE.each { |x| x.close }.clear trap(:USR1) { reopen_worker_logs(worker.nr) rescue nil } - # closing anything we IO.select on will raise EBADF - trap(:QUIT) { HttpServer::LISTENERS.map! { |s| s.close rescue nil } } + trap(:QUIT) do + G.alive = false + # closing anything we IO.select on will raise EBADF + HttpServer::LISTENERS.map! { |s| s.close rescue nil } + end [:TERM, :INT].each { |sig| trap(sig) { exit!(0) } } # instant shutdown logger.info "Rainbows! #@use worker_connections=#@worker_connections" end @@ -63,7 +70,7 @@ module Rainbows response = app.call(env) end - alive = hp.keepalive? && ! Thread.current[:quit] + alive = hp.keepalive? && G.alive out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if hp.headers? HttpResponse.write(client, response, out) end while alive and hp.reset.nil? and env.clear @@ -83,8 +90,7 @@ module Rainbows end def join_threads(threads, worker) - logger.info "Joining threads..." - threads.each { |thr| thr[:quit] = true } + Rainbows::G.alive = false expire = Time.now + (timeout * 2.0) m = 0 while (nr = threads.count { |thr| thr.alive? }) > 0 @@ -94,11 +100,11 @@ module Rainbows break if Time.now >= expire } end - logger.info "Done joining threads. #{nr} left running" end def self.included(klass) klass.const_set :LISTENERS, HttpServer::LISTENERS + klass.const_set :G, Rainbows::G end end diff --git a/lib/rainbows/rev.rb b/lib/rainbows/rev.rb index 7e5ca27..7d941f6 100644 --- a/lib/rainbows/rev.rb +++ b/lib/rainbows/rev.rb @@ -33,19 +33,15 @@ module Rainbows module Rev - # global vars because class/instance variables are confusing me :< - # this struct is only accessed inside workers and thus private to each - G = Struct.new(:nr, :max, :logger, :alive, :app).new - include Base class Client < ::Rev::IO include Unicorn include Rainbows::Const - G = Rainbows::Rev::G + G = Rainbows::G def initialize(io) - G.nr += 1 + G.cur += 1 super(io) @remote_addr = ::TCPSocket === io ? io.peeraddr.last : LOCALHOST @env = {} @@ -91,7 +87,7 @@ module Rainbows end def on_close - G.nr -= 1 + G.cur -= 1 end def tmpio @@ -143,10 +139,10 @@ module Rainbows end class Server < ::Rev::IO - G = Rainbows::Rev::G + G = Rainbows::G def on_readable - return if G.nr >= G.max + return if G.cur >= G.max begin Client.new(@_io.accept_nonblock).attach(::Rev::Loop.default) rescue Errno::EAGAIN, Errno::ECONNBORTED @@ -160,13 +156,21 @@ module Rainbows # given a INT, QUIT, or TERM signal) def worker_loop(worker) init_worker_process(worker) - G.nr = 0 - G.max = worker_connections - G.alive = true - G.logger = logger - G.app = app + graceful_waiter = nil + trap(:QUIT) do + G.alive = false + LISTENERS.map! { |s| s.close rescue nil } + # Rev may get stuck in a loop with no events possible, spawn a new + # thread to join on graceful exits when our client count goes to zero + graceful_waiter = Thread.new { + sleep(0.1) while G.cur > 0 + exit + } + end + LISTENERS.map! { |s| Server.new(s).attach(::Rev::Loop.default) } ::Rev::Loop.default.run + graceful_waiter.join(timeout * 2.0) end end diff --git a/lib/rainbows/revactor.rb b/lib/rainbows/revactor.rb index f61de97..3db1062 100644 --- a/lib/rainbows/revactor.rb +++ b/lib/rainbows/revactor.rb @@ -55,7 +55,7 @@ module Rainbows response = app.call(env) end - alive = hp.keepalive? && ! Actor.current[:quit] + alive = hp.keepalive? && G.alive out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if hp.headers? HttpResponse.write(client, response, out) end while alive and hp.reset.nil? and env.clear @@ -86,7 +86,6 @@ module Rainbows limit = worker_connections revactorize_listeners! clients = {} - alive = true listeners = LISTENERS.map do |s| Actor.spawn(s) do |l| @@ -99,23 +98,16 @@ module Rainbows clients[actor.object_id] = actor root.link(actor) rescue Errno::EAGAIN, Errno::ECONNABORTED - rescue Errno::EBADF - break rescue Object => e - listen_loop_error(e) if alive - end while alive + listen_loop_error(e) + end while G.alive end end m = 0 check_quit = lambda do worker.tmp.chmod(m = 0 == m ? 1 : 0) - if listeners.any? { |l| l.dead? } || - master_pid != Process.ppid || - LISTENERS.first.nil? - alive = false - clients.each_value { |a| a[:quit] = true } - end + G.alive = false if master_pid != Process.ppid end begin @@ -128,7 +120,7 @@ module Rainbows check_quit.call end end - end while alive || clients.size > 0 + end while G.alive || clients.size > 0 end private diff --git a/lib/rainbows/thread_pool.rb b/lib/rainbows/thread_pool.rb index c742e5d..30e8f69 100644 --- a/lib/rainbows/thread_pool.rb +++ b/lib/rainbows/thread_pool.rb @@ -30,7 +30,7 @@ module Rainbows pool = (1..worker_connections).map { new_worker_thread } m = 0 - while LISTENERS.first && master_pid == Process.ppid + while G.alive && master_pid == Process.ppid pool.each do |thr| worker.tmp.chmod(m = 0 == m ? 1 : 0) # if any worker dies, something is serious wrong, bail @@ -44,21 +44,20 @@ module Rainbows Thread.new { begin begin - ret = IO.select(LISTENERS, nil, nil, timeout) or next - ret.first.each do |sock| - begin - process_client(sock.accept_nonblock) - rescue Errno::EAGAIN, Errno::ECONNABORTED - end - end + ret = IO.select(LISTENERS, nil, nil, 1) and + ret.first.each do |sock| + begin + process_client(sock.accept_nonblock) + rescue Errno::EAGAIN, Errno::ECONNABORTED + end + end rescue Errno::EINTR - next rescue Errno::EBADF, TypeError break end rescue Object => e listen_loop_error(e) - end while ! Thread.current[:quit] && LISTENERS.first + end while G.alive } end diff --git a/lib/rainbows/thread_spawn.rb b/lib/rainbows/thread_spawn.rb index 104e764..39934a6 100644 --- a/lib/rainbows/thread_spawn.rb +++ b/lib/rainbows/thread_spawn.rb @@ -27,9 +27,10 @@ module Rainbows limit = worker_connections begin + G.alive && master_pid == Process.ppid or break ret = begin alive.chmod(m = 0 == m ? 1 : 0) - IO.select(LISTENERS, nil, nil, timeout) or next + IO.select(LISTENERS, nil, nil, 1) or next rescue Errno::EINTR retry rescue Errno::EBADF, TypeError @@ -55,7 +56,7 @@ module Rainbows end rescue Object => e listen_loop_error(e) - end while LISTENERS.first && master_pid == Process.ppid + end while true join_threads(threads.list, worker) end diff --git a/t/lib-graceful.sh b/t/lib-graceful.sh new file mode 100644 index 0000000..d2a6be8 --- /dev/null +++ b/t/lib-graceful.sh @@ -0,0 +1,47 @@ +model=$1 +. ./test-lib.sh +echo "graceful test for model=$model" +case $model in +Rev) require_rev ;; +Revactor) require_revactor ;; +esac + +eval $(unused_listen) +rtmpfiles unicorn_config curl_out pid r_err r_out fifo +rm -f $fifo +mkfifo $fifo + +cat > $unicorn_config < $curl_out 2> $fifo & + +awk -v rainbows_pid=$rainbows_pid ' +{ print $0 } +/100 Continue/ { + print "awk: sending SIGQUIT to", rainbows_pid + system("kill -QUIT "rainbows_pid) +}' $fifo +wait + +dbgcat r_err + +test x"$(wc -l < $curl_out)" = x1 +nr=$(sort < $curl_out | uniq | wc -l) + +test "$nr" -eq 1 +test x$(sort < $curl_out | uniq) = xHello +! grep Error $r_err +while kill -0 $rainbows_pid >/dev/null 2>&1 +do + sleep 1 +done diff --git a/t/t1002-thread-pool-graceful.sh b/t/t1002-thread-pool-graceful.sh index 0830bc7..c5d1f87 100755 --- a/t/t1002-thread-pool-graceful.sh +++ b/t/t1002-thread-pool-graceful.sh @@ -1,37 +1,2 @@ #!/bin/sh -. ./test-lib.sh - -eval $(unused_listen) -rtmpfiles unicorn_config curl_out curl_err pid r_err r_out -nr_thread=10 -nr_client=10 -cat > $unicorn_config <> $curl_out 2>> $curl_err & -done -sleep 2 -kill -QUIT $(cat $pid) -wait - -dbgcat r_err -! test -s $curl_err -test x"$(wc -l < $curl_out)" = x$nr_client -nr=$(sort < $curl_out | uniq | wc -l) - -test "$nr" -eq 1 -test x$(sort < $curl_out | uniq) = xHello -! grep Error $r_err +. ./lib-graceful.sh ThreadPool diff --git a/t/t2002-thread-spawn-graceful.sh b/t/t2002-thread-spawn-graceful.sh index 5a02670..b263306 100755 --- a/t/t2002-thread-spawn-graceful.sh +++ b/t/t2002-thread-spawn-graceful.sh @@ -1,37 +1,2 @@ #!/bin/sh -. ./test-lib.sh - -eval $(unused_listen) -rtmpfiles unicorn_config curl_out curl_err pid r_err r_out -nr_thread=10 -nr_client=10 -cat > $unicorn_config <> $curl_out 2>> $curl_err & -done -sleep 2 -kill -QUIT $(cat $pid) -wait - -dbgcat r_err -! test -s $curl_err -test x"$(wc -l < $curl_out)" = x$nr_client -nr=$(sort < $curl_out | uniq | wc -l) - -test "$nr" -eq 1 -test x$(sort < $curl_out | uniq) = xHello -! grep Error $r_err +. ./lib-graceful.sh ThreadSpawn diff --git a/t/t3002-revactor-graceful.sh b/t/t3002-revactor-graceful.sh index 67c6ba3..8696e06 100755 --- a/t/t3002-revactor-graceful.sh +++ b/t/t3002-revactor-graceful.sh @@ -1,38 +1,2 @@ #!/bin/sh -. ./test-lib.sh -require_revactor - -eval $(unused_listen) -rtmpfiles unicorn_config curl_out curl_err pid r_err r_out -nr_actor=10 -nr_client=10 -cat > $unicorn_config <> $curl_out 2>> $curl_err & -done -sleep 2 -kill -QUIT $(cat $pid) -wait - -dbgcat r_err -! test -s $curl_err -test x"$(wc -l < $curl_out)" = x$nr_client -nr=$(sort < $curl_out | uniq | wc -l) - -test "$nr" -eq 1 -test x$(sort < $curl_out | uniq) = xHello -! grep Error $r_err +. ./lib-graceful.sh Revactor diff --git a/t/t4002-rev-graceful.sh b/t/t4002-rev-graceful.sh index e286886..788bc6b 100755 --- a/t/t4002-rev-graceful.sh +++ b/t/t4002-rev-graceful.sh @@ -1,52 +1,2 @@ #!/bin/sh -. ./test-lib.sh -require_rev - -eval $(unused_listen) -rtmpfiles unicorn_config tmp pid r_err r_out out -nr_client=10 -cat > $unicorn_config < $tmp & - sleep 1 - printf 'Host: example.com\r\n' - sleep 1 - printf 'Connection: close\r\n' - sleep 1 - printf '\r\n' - wait - ) | socat - TCP:$listen > $fifo - fgrep 'Hello' $tmp >> $out || : - rm -f $fifo $tmp - ) & -done - -sleep 2 # potentially racy :< -kill -QUIT $(cat $pid) -wait - -test x"$(wc -l < $out)" = x$nr_client -nr=$(sort < $out | uniq | wc -l) -test "$nr" -eq 1 - -test x$(sort < $out | uniq) = xHello -! grep Error $r_err +. ./lib-graceful.sh Rev -- cgit v1.2.3-24-ge0c7