diff options
author | Eric Wong <normalperson@yhbt.net> | 2009-11-06 19:45:17 -0800 |
---|---|---|
committer | Eric Wong <normalperson@yhbt.net> | 2009-11-06 19:45:17 -0800 |
commit | 1a9a718a3f9a5b582a4a339a9bb9249c2ca392d7 (patch) | |
tree | 0c95f2d8fc4de8542f7716832800614e1e7a8872 /lib | |
parent | e1dcadef6ca242e36e99aab19e3e040bf01070f9 (diff) | |
download | rainbows-1a9a718a3f9a5b582a4a339a9bb9249c2ca392d7.tar.gz |
It turns out neither the EventMachine and Rev classes checked for master death in its heartbeat mechanism. Since we managed to forget the same thing twice, we now have a test case for it and also centralized the code to remove duplication.
Diffstat (limited to 'lib')
-rw-r--r-- | lib/rainbows.rb | 15 | ||||
-rw-r--r-- | lib/rainbows/base.rb | 19 | ||||
-rw-r--r-- | lib/rainbows/ev_core.rb | 4 | ||||
-rw-r--r-- | lib/rainbows/event_machine.rb | 21 | ||||
-rw-r--r-- | lib/rainbows/rev.rb | 8 | ||||
-rw-r--r-- | lib/rainbows/rev/heartbeat.rb | 8 | ||||
-rw-r--r-- | lib/rainbows/revactor.rb | 10 | ||||
-rw-r--r-- | lib/rainbows/thread_pool.rb | 11 | ||||
-rw-r--r-- | lib/rainbows/thread_spawn.rb | 9 |
9 files changed, 46 insertions, 59 deletions
diff --git a/lib/rainbows.rb b/lib/rainbows.rb index a8985c6..5bd8693 100644 --- a/lib/rainbows.rb +++ b/lib/rainbows.rb @@ -5,9 +5,20 @@ module Rainbows # global vars because class/instance variables are confusing me :< # this struct is only accessed inside workers and thus private to each - G = Struct.new(:cur, :max, :logger, :alive, :app).new # G.cur may not be used the network concurrency model - G.alive = true + class State < Struct.new(:alive,:m,:cur,:server,:tmp) + def tick + tmp.chmod(self.m = m == 0 ? 1 : 0) + alive && server.master_pid == Process.ppid or quit! + end + + def quit! + self.alive = false + server.class.const_get(:LISTENERS).map! { |s| s.close rescue nil } + false + end + end + G = State.new(true, 0, 0) require 'rainbows/const' require 'rainbows/http_server' diff --git a/lib/rainbows/base.rb b/lib/rainbows/base.rb index 9da148c..9b50d9a 100644 --- a/lib/rainbows/base.rb +++ b/lib/rainbows/base.rb @@ -26,20 +26,14 @@ module Rainbows def init_worker_process(worker) super(worker) - G.cur = 0 - G.max = worker_connections - G.logger = logger - G.app = app + G.server = self + G.tmp = worker.tmp # we're don't use the self-pipe mechanism in the Rainbows! worker # since we don't defer reopening logs HttpServer::SELF_PIPE.each { |x| x.close }.clear trap(:USR1) { reopen_worker_logs(worker.nr) rescue nil } - trap(:QUIT) do - G.alive = false - # closing anything we IO.select on will raise EBADF - HttpServer::LISTENERS.map! { |s| s.close rescue nil } - end + trap(:QUIT) { G.quit! } [:TERM, :INT].each { |sig| trap(sig) { exit!(0) } } # instant shutdown logger.info "Rainbows! #@use worker_connections=#@worker_connections" end @@ -89,13 +83,12 @@ module Rainbows logger.error e.backtrace.join("\n") end - def join_threads(threads, worker) - Rainbows::G.alive = false + def join_threads(threads) + G.quit! expire = Time.now + (timeout * 2.0) - m = 0 until (threads.delete_if { |thr| ! thr.alive? }).empty? threads.each { |thr| - worker.tmp.chmod(m = 0 == m ? 1 : 0) + G.tick thr.join(1) break if Time.now >= expire } diff --git a/lib/rainbows/ev_core.rb b/lib/rainbows/ev_core.rb index 2679b5a..244e726 100644 --- a/lib/rainbows/ev_core.rb +++ b/lib/rainbows/ev_core.rb @@ -28,8 +28,8 @@ module Rainbows when HttpParserError # try to tell the client they're bad ERROR_400_RESPONSE else - G.logger.error "Read error: #{e.inspect}" - G.logger.error e.backtrace.join("\n") + G.server.logger.error "Read error: #{e.inspect}" + G.server.logger.error e.backtrace.join("\n") ERROR_500_RESPONSE end write(msg) diff --git a/lib/rainbows/event_machine.rb b/lib/rainbows/event_machine.rb index 196fbca..5c25ade 100644 --- a/lib/rainbows/event_machine.rb +++ b/lib/rainbows/event_machine.rb @@ -57,7 +57,7 @@ module Rainbows @env[REMOTE_ADDR] = @remote_addr @env[ASYNC_CALLBACK] = method(:response_write) - response = catch(:async) { G.app.call(@env.update(RACK_DEFAULTS)) } + response = catch(:async) { APP.call(@env.update(RACK_DEFAULTS)) } # too tricky to support pipelining with :async since the # second (pipelined) request could be a stuck behind a @@ -166,22 +166,17 @@ module Rainbows module Server - def initialize(conns) - @limit = Rainbows::G.max + HttpServer::LISTENERS.size - @em_conns = conns - end - def close detach @io.close end def notify_readable - return if @em_conns.size >= @limit + return if CUR.size >= MAX begin io = @io.accept_nonblock sig = EM.attach_fd(io.fileno, false) - @em_conns[sig] = Client.new(sig, io) + CUR[sig] = Client.new(sig, io) rescue Errno::EAGAIN, Errno::ECONNABORTED end end @@ -192,24 +187,26 @@ module Rainbows # given a INT, QUIT, or TERM signal) def worker_loop(worker) init_worker_process(worker) - m = 0 # enable them both, should be non-fatal if not supported EM.epoll EM.kqueue logger.info "EventMachine: epoll=#{EM.epoll?} kqueue=#{EM.kqueue?}" + Client.const_set(:APP, G.server.app) + Server.const_set(:MAX, G.server.worker_connections + + HttpServer::LISTENERS.size) EM.run { conns = EM.instance_variable_get(:@conns) or raise RuntimeError, "EM @conns instance variable not accessible!" + Server.const_set(:CUR, conns) EM.add_periodic_timer(1) do - worker.tmp.chmod(m = 0 == m ? 1 : 0) - unless G.alive + unless G.tick conns.each_value { |client| Client === client and client.quit } EM.stop if conns.empty? && EM.reactor_running? end end LISTENERS.map! do |s| - EM.watch(s, Server, conns) { |c| c.notify_readable = true } + EM.watch(s, Server) { |c| c.notify_readable = true } end } end diff --git a/lib/rainbows/rev.rb b/lib/rainbows/rev.rb index c4c77bd..66f6ed1 100644 --- a/lib/rainbows/rev.rb +++ b/lib/rainbows/rev.rb @@ -51,7 +51,7 @@ module Rainbows (@env[RACK_INPUT] = @input).rewind alive = @hp.keepalive? @env[REMOTE_ADDR] = @remote_addr - response = G.app.call(@env.update(RACK_DEFAULTS)) + response = APP.call(@env.update(RACK_DEFAULTS)) alive &&= G.alive out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if @hp.headers? @@ -97,7 +97,7 @@ module Rainbows G = Rainbows::G def on_readable - return if G.cur >= G.max + return if G.cur >= MAX begin Client.new(@_io.accept_nonblock).attach(::Rev::Loop.default) rescue Errno::EAGAIN, Errno::ECONNABORTED @@ -173,8 +173,10 @@ module Rainbows # given a INT, QUIT, or TERM signal) def worker_loop(worker) init_worker_process(worker) + Client.const_set(:APP, G.server.app) + Server.const_set(:MAX, G.server.worker_connections) rloop = ::Rev::Loop.default - Heartbeat.new(worker.tmp).attach(rloop) + Heartbeat.new(1, true).attach(rloop) LISTENERS.map! { |s| Server.new(s).attach(rloop) } rloop.run end diff --git a/lib/rainbows/rev/heartbeat.rb b/lib/rainbows/rev/heartbeat.rb index 755b136..63eb71d 100644 --- a/lib/rainbows/rev/heartbeat.rb +++ b/lib/rainbows/rev/heartbeat.rb @@ -11,15 +11,9 @@ module Rainbows # will also detect and execute the graceful exit if triggered # by SIGQUIT class Heartbeat < ::Rev::TimerWatcher - # +tmp+ must be a +File+ that responds to +chmod+ - def initialize(tmp) - @m, @tmp = 0, tmp - super(1, true) - end def on_timer - @tmp.chmod(@m = 0 == @m ? 1 : 0) - exit if (! G.alive && G.cur <= 0) + exit if (! G.tick && G.cur <= 0) end end diff --git a/lib/rainbows/revactor.rb b/lib/rainbows/revactor.rb index ddcbc04..003b704 100644 --- a/lib/rainbows/revactor.rb +++ b/lib/rainbows/revactor.rb @@ -101,20 +101,14 @@ module Rainbows end end - m = 0 - check_quit = lambda do - worker.tmp.chmod(m = 0 == m ? 1 : 0) - G.alive = false if master_pid != Process.ppid - end - begin Actor.receive do |filter| - filter.after(1, &check_quit) + filter.after(1) { G.tick } filter.when(Case[:exit, Actor, Object]) do |_,actor,_| orig = clients.size clients.delete(actor.object_id) orig >= limit and listeners.each { |l| l << :resume } - check_quit.call + G.tick end end end while G.alive || clients.size > 0 diff --git a/lib/rainbows/thread_pool.rb b/lib/rainbows/thread_pool.rb index 280ba40..7934dc8 100644 --- a/lib/rainbows/thread_pool.rb +++ b/lib/rainbows/thread_pool.rb @@ -28,16 +28,15 @@ module Rainbows def worker_loop(worker) init_worker_process(worker) pool = (1..worker_connections).map { new_worker_thread } - m = 0 - while G.alive && master_pid == Process.ppid + while G.alive + # if any worker dies, something is serious wrong, bail pool.each do |thr| - worker.tmp.chmod(m = 0 == m ? 1 : 0) - # if any worker dies, something is serious wrong, bail - thr.join(1) and break + G.tick + thr.join(1) and G.quit! end end - join_threads(pool, worker) + join_threads(pool) end def new_worker_thread diff --git a/lib/rainbows/thread_spawn.rb b/lib/rainbows/thread_spawn.rb index 39934a6..a3068c9 100644 --- a/lib/rainbows/thread_spawn.rb +++ b/lib/rainbows/thread_spawn.rb @@ -22,21 +22,18 @@ module Rainbows def worker_loop(worker) init_worker_process(worker) threads = ThreadGroup.new - alive = worker.tmp - m = 0 limit = worker_connections begin - G.alive && master_pid == Process.ppid or break ret = begin - alive.chmod(m = 0 == m ? 1 : 0) + G.tick or break IO.select(LISTENERS, nil, nil, 1) or next rescue Errno::EINTR retry rescue Errno::EBADF, TypeError break end - alive.chmod(m = 0 == m ? 1 : 0) + G.tick ret.first.each do |l| # Sleep if we're busy, another less busy worker process may @@ -57,7 +54,7 @@ module Rainbows rescue Object => e listen_loop_error(e) end while true - join_threads(threads.list, worker) + join_threads(threads.list) end end |