diff options
-rw-r--r-- | lib/rainbows.rb | 15 | ||||
-rw-r--r-- | lib/rainbows/base.rb | 19 | ||||
-rw-r--r-- | lib/rainbows/ev_core.rb | 4 | ||||
-rw-r--r-- | lib/rainbows/event_machine.rb | 21 | ||||
-rw-r--r-- | lib/rainbows/rev.rb | 8 | ||||
-rw-r--r-- | lib/rainbows/rev/heartbeat.rb | 8 | ||||
-rw-r--r-- | lib/rainbows/revactor.rb | 10 | ||||
-rw-r--r-- | lib/rainbows/thread_pool.rb | 11 | ||||
-rw-r--r-- | lib/rainbows/thread_spawn.rb | 9 | ||||
-rwxr-xr-x | t/t0007-worker-follows-master-to-death.sh | 50 | ||||
-rw-r--r-- | t/worker-follows-master-to-death.ru | 17 |
11 files changed, 113 insertions, 59 deletions
diff --git a/lib/rainbows.rb b/lib/rainbows.rb index a8985c6..5bd8693 100644 --- a/lib/rainbows.rb +++ b/lib/rainbows.rb @@ -5,9 +5,20 @@ module Rainbows # global vars because class/instance variables are confusing me :< # this struct is only accessed inside workers and thus private to each - G = Struct.new(:cur, :max, :logger, :alive, :app).new # G.cur may not be used the network concurrency model - G.alive = true + class State < Struct.new(:alive,:m,:cur,:server,:tmp) + def tick + tmp.chmod(self.m = m == 0 ? 1 : 0) + alive && server.master_pid == Process.ppid or quit! + end + + def quit! + self.alive = false + server.class.const_get(:LISTENERS).map! { |s| s.close rescue nil } + false + end + end + G = State.new(true, 0, 0) require 'rainbows/const' require 'rainbows/http_server' diff --git a/lib/rainbows/base.rb b/lib/rainbows/base.rb index 9da148c..9b50d9a 100644 --- a/lib/rainbows/base.rb +++ b/lib/rainbows/base.rb @@ -26,20 +26,14 @@ module Rainbows def init_worker_process(worker) super(worker) - G.cur = 0 - G.max = worker_connections - G.logger = logger - G.app = app + G.server = self + G.tmp = worker.tmp # we're don't use the self-pipe mechanism in the Rainbows! worker # since we don't defer reopening logs HttpServer::SELF_PIPE.each { |x| x.close }.clear trap(:USR1) { reopen_worker_logs(worker.nr) rescue nil } - trap(:QUIT) do - G.alive = false - # closing anything we IO.select on will raise EBADF - HttpServer::LISTENERS.map! { |s| s.close rescue nil } - end + trap(:QUIT) { G.quit! } [:TERM, :INT].each { |sig| trap(sig) { exit!(0) } } # instant shutdown logger.info "Rainbows! #@use worker_connections=#@worker_connections" end @@ -89,13 +83,12 @@ module Rainbows logger.error e.backtrace.join("\n") end - def join_threads(threads, worker) - Rainbows::G.alive = false + def join_threads(threads) + G.quit! expire = Time.now + (timeout * 2.0) - m = 0 until (threads.delete_if { |thr| ! thr.alive? }).empty? threads.each { |thr| - worker.tmp.chmod(m = 0 == m ? 1 : 0) + G.tick thr.join(1) break if Time.now >= expire } diff --git a/lib/rainbows/ev_core.rb b/lib/rainbows/ev_core.rb index 2679b5a..244e726 100644 --- a/lib/rainbows/ev_core.rb +++ b/lib/rainbows/ev_core.rb @@ -28,8 +28,8 @@ module Rainbows when HttpParserError # try to tell the client they're bad ERROR_400_RESPONSE else - G.logger.error "Read error: #{e.inspect}" - G.logger.error e.backtrace.join("\n") + G.server.logger.error "Read error: #{e.inspect}" + G.server.logger.error e.backtrace.join("\n") ERROR_500_RESPONSE end write(msg) diff --git a/lib/rainbows/event_machine.rb b/lib/rainbows/event_machine.rb index 196fbca..5c25ade 100644 --- a/lib/rainbows/event_machine.rb +++ b/lib/rainbows/event_machine.rb @@ -57,7 +57,7 @@ module Rainbows @env[REMOTE_ADDR] = @remote_addr @env[ASYNC_CALLBACK] = method(:response_write) - response = catch(:async) { G.app.call(@env.update(RACK_DEFAULTS)) } + response = catch(:async) { APP.call(@env.update(RACK_DEFAULTS)) } # too tricky to support pipelining with :async since the # second (pipelined) request could be a stuck behind a @@ -166,22 +166,17 @@ module Rainbows module Server - def initialize(conns) - @limit = Rainbows::G.max + HttpServer::LISTENERS.size - @em_conns = conns - end - def close detach @io.close end def notify_readable - return if @em_conns.size >= @limit + return if CUR.size >= MAX begin io = @io.accept_nonblock sig = EM.attach_fd(io.fileno, false) - @em_conns[sig] = Client.new(sig, io) + CUR[sig] = Client.new(sig, io) rescue Errno::EAGAIN, Errno::ECONNABORTED end end @@ -192,24 +187,26 @@ module Rainbows # given a INT, QUIT, or TERM signal) def worker_loop(worker) init_worker_process(worker) - m = 0 # enable them both, should be non-fatal if not supported EM.epoll EM.kqueue logger.info "EventMachine: epoll=#{EM.epoll?} kqueue=#{EM.kqueue?}" + Client.const_set(:APP, G.server.app) + Server.const_set(:MAX, G.server.worker_connections + + HttpServer::LISTENERS.size) EM.run { conns = EM.instance_variable_get(:@conns) or raise RuntimeError, "EM @conns instance variable not accessible!" + Server.const_set(:CUR, conns) EM.add_periodic_timer(1) do - worker.tmp.chmod(m = 0 == m ? 1 : 0) - unless G.alive + unless G.tick conns.each_value { |client| Client === client and client.quit } EM.stop if conns.empty? && EM.reactor_running? end end LISTENERS.map! do |s| - EM.watch(s, Server, conns) { |c| c.notify_readable = true } + EM.watch(s, Server) { |c| c.notify_readable = true } end } end diff --git a/lib/rainbows/rev.rb b/lib/rainbows/rev.rb index c4c77bd..66f6ed1 100644 --- a/lib/rainbows/rev.rb +++ b/lib/rainbows/rev.rb @@ -51,7 +51,7 @@ module Rainbows (@env[RACK_INPUT] = @input).rewind alive = @hp.keepalive? @env[REMOTE_ADDR] = @remote_addr - response = G.app.call(@env.update(RACK_DEFAULTS)) + response = APP.call(@env.update(RACK_DEFAULTS)) alive &&= G.alive out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if @hp.headers? @@ -97,7 +97,7 @@ module Rainbows G = Rainbows::G def on_readable - return if G.cur >= G.max + return if G.cur >= MAX begin Client.new(@_io.accept_nonblock).attach(::Rev::Loop.default) rescue Errno::EAGAIN, Errno::ECONNABORTED @@ -173,8 +173,10 @@ module Rainbows # given a INT, QUIT, or TERM signal) def worker_loop(worker) init_worker_process(worker) + Client.const_set(:APP, G.server.app) + Server.const_set(:MAX, G.server.worker_connections) rloop = ::Rev::Loop.default - Heartbeat.new(worker.tmp).attach(rloop) + Heartbeat.new(1, true).attach(rloop) LISTENERS.map! { |s| Server.new(s).attach(rloop) } rloop.run end diff --git a/lib/rainbows/rev/heartbeat.rb b/lib/rainbows/rev/heartbeat.rb index 755b136..63eb71d 100644 --- a/lib/rainbows/rev/heartbeat.rb +++ b/lib/rainbows/rev/heartbeat.rb @@ -11,15 +11,9 @@ module Rainbows # will also detect and execute the graceful exit if triggered # by SIGQUIT class Heartbeat < ::Rev::TimerWatcher - # +tmp+ must be a +File+ that responds to +chmod+ - def initialize(tmp) - @m, @tmp = 0, tmp - super(1, true) - end def on_timer - @tmp.chmod(@m = 0 == @m ? 1 : 0) - exit if (! G.alive && G.cur <= 0) + exit if (! G.tick && G.cur <= 0) end end diff --git a/lib/rainbows/revactor.rb b/lib/rainbows/revactor.rb index ddcbc04..003b704 100644 --- a/lib/rainbows/revactor.rb +++ b/lib/rainbows/revactor.rb @@ -101,20 +101,14 @@ module Rainbows end end - m = 0 - check_quit = lambda do - worker.tmp.chmod(m = 0 == m ? 1 : 0) - G.alive = false if master_pid != Process.ppid - end - begin Actor.receive do |filter| - filter.after(1, &check_quit) + filter.after(1) { G.tick } filter.when(Case[:exit, Actor, Object]) do |_,actor,_| orig = clients.size clients.delete(actor.object_id) orig >= limit and listeners.each { |l| l << :resume } - check_quit.call + G.tick end end end while G.alive || clients.size > 0 diff --git a/lib/rainbows/thread_pool.rb b/lib/rainbows/thread_pool.rb index 280ba40..7934dc8 100644 --- a/lib/rainbows/thread_pool.rb +++ b/lib/rainbows/thread_pool.rb @@ -28,16 +28,15 @@ module Rainbows def worker_loop(worker) init_worker_process(worker) pool = (1..worker_connections).map { new_worker_thread } - m = 0 - while G.alive && master_pid == Process.ppid + while G.alive + # if any worker dies, something is serious wrong, bail pool.each do |thr| - worker.tmp.chmod(m = 0 == m ? 1 : 0) - # if any worker dies, something is serious wrong, bail - thr.join(1) and break + G.tick + thr.join(1) and G.quit! end end - join_threads(pool, worker) + join_threads(pool) end def new_worker_thread diff --git a/lib/rainbows/thread_spawn.rb b/lib/rainbows/thread_spawn.rb index 39934a6..a3068c9 100644 --- a/lib/rainbows/thread_spawn.rb +++ b/lib/rainbows/thread_spawn.rb @@ -22,21 +22,18 @@ module Rainbows def worker_loop(worker) init_worker_process(worker) threads = ThreadGroup.new - alive = worker.tmp - m = 0 limit = worker_connections begin - G.alive && master_pid == Process.ppid or break ret = begin - alive.chmod(m = 0 == m ? 1 : 0) + G.tick or break IO.select(LISTENERS, nil, nil, 1) or next rescue Errno::EINTR retry rescue Errno::EBADF, TypeError break end - alive.chmod(m = 0 == m ? 1 : 0) + G.tick ret.first.each do |l| # Sleep if we're busy, another less busy worker process may @@ -57,7 +54,7 @@ module Rainbows rescue Object => e listen_loop_error(e) end while true - join_threads(threads.list, worker) + join_threads(threads.list) end end diff --git a/t/t0007-worker-follows-master-to-death.sh b/t/t0007-worker-follows-master-to-death.sh new file mode 100755 index 0000000..ac40277 --- /dev/null +++ b/t/t0007-worker-follows-master-to-death.sh @@ -0,0 +1,50 @@ +#!/bin/sh +. ./test-lib.sh +t_plan 7 "ensure worker follows master to death" + +t_begin "setup" && { + rtmpfiles curl_err curl_out + rainbows_setup + echo timeout 3 >> $unicorn_config + rainbows -D -c $unicorn_config worker-follows-master-to-death.ru + rainbows_wait_start +} + +t_begin "read worker PID" && { + worker_pid=$(curl -sSf http://$listen/pid) + t_info "worker_pid=$worker_pid" +} + +t_begin "start a long sleeping request" && { + curl -sSfv -T- </dev/null http://$listen/sleep/2 >$curl_out 2> $fifo & + curl_pid=$! + t_info "curl_pid=$curl_pid" +} + +t_begin "nuke the master once we're connected" && { + awk -v rainbows_pid=$rainbows_pid ' +{ print $0 } +/100 Continue/ { + print "awk: sending SIGKILL to", rainbows_pid + system("kill -9 "rainbows_pid) +}' < $fifo > $curl_err + wait +} + +t_begin "worker is no longer running" && { + sleep 6 + kill -0 $worker_pid 2> $tmp && false + test -s $tmp +} + +t_begin "sleepy curl request is no longer running" && { + kill -0 $curl_pid 2> $tmp && false + test -s $tmp +} + +t_begin "sleepy curl request completed gracefully" && { + test x$(cat $curl_out) = x$worker_pid + dbgcat curl_err +} + +t_done diff --git a/t/worker-follows-master-to-death.ru b/t/worker-follows-master-to-death.ru new file mode 100644 index 0000000..ed2a519 --- /dev/null +++ b/t/worker-follows-master-to-death.ru @@ -0,0 +1,17 @@ +use Rack::ContentLength +headers = { 'Content-Type' => 'text/plain' } +run lambda { |env| + /\A100-continue\z/i =~ env['HTTP_EXPECT'] and return [ 100, {}, [] ] + env['rack.input'].read + + case env["PATH_INFO"] + when %r{/sleep/(\d+)} + (case env['rainbows.model'] + when :Revactor + Actor + else + Kernel + end).sleep($1.to_i) + end + [ 200, headers, [ "#$$\n" ] ] +} |