about summary refs log tree commit homepage
path: root/lib
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2009-11-06 19:45:17 -0800
committerEric Wong <normalperson@yhbt.net>2009-11-06 19:45:17 -0800
commit1a9a718a3f9a5b582a4a339a9bb9249c2ca392d7 (patch)
tree0c95f2d8fc4de8542f7716832800614e1e7a8872 /lib
parente1dcadef6ca242e36e99aab19e3e040bf01070f9 (diff)
downloadrainbows-1a9a718a3f9a5b582a4a339a9bb9249c2ca392d7.tar.gz
It turns out neither the EventMachine and Rev classes
checked for master death in its heartbeat mechanism.
Since we managed to forget the same thing twice, we
now have a test case for it and also centralized the
code to remove duplication.
Diffstat (limited to 'lib')
-rw-r--r--lib/rainbows.rb15
-rw-r--r--lib/rainbows/base.rb19
-rw-r--r--lib/rainbows/ev_core.rb4
-rw-r--r--lib/rainbows/event_machine.rb21
-rw-r--r--lib/rainbows/rev.rb8
-rw-r--r--lib/rainbows/rev/heartbeat.rb8
-rw-r--r--lib/rainbows/revactor.rb10
-rw-r--r--lib/rainbows/thread_pool.rb11
-rw-r--r--lib/rainbows/thread_spawn.rb9
9 files changed, 46 insertions, 59 deletions
diff --git a/lib/rainbows.rb b/lib/rainbows.rb
index a8985c6..5bd8693 100644
--- a/lib/rainbows.rb
+++ b/lib/rainbows.rb
@@ -5,9 +5,20 @@ module Rainbows
 
   # global vars because class/instance variables are confusing me :<
   # this struct is only accessed inside workers and thus private to each
-  G = Struct.new(:cur, :max, :logger, :alive, :app).new
   # G.cur may not be used the network concurrency model
-  G.alive = true
+  class State < Struct.new(:alive,:m,:cur,:server,:tmp)
+    def tick
+      tmp.chmod(self.m = m == 0 ? 1 : 0)
+      alive && server.master_pid == Process.ppid or quit!
+    end
+
+    def quit!
+      self.alive = false
+      server.class.const_get(:LISTENERS).map! { |s| s.close rescue nil }
+      false
+    end
+  end
+  G = State.new(true, 0, 0)
 
   require 'rainbows/const'
   require 'rainbows/http_server'
diff --git a/lib/rainbows/base.rb b/lib/rainbows/base.rb
index 9da148c..9b50d9a 100644
--- a/lib/rainbows/base.rb
+++ b/lib/rainbows/base.rb
@@ -26,20 +26,14 @@ module Rainbows
 
     def init_worker_process(worker)
       super(worker)
-      G.cur = 0
-      G.max = worker_connections
-      G.logger = logger
-      G.app = app
+      G.server = self
+      G.tmp = worker.tmp
 
       # we're don't use the self-pipe mechanism in the Rainbows! worker
       # since we don't defer reopening logs
       HttpServer::SELF_PIPE.each { |x| x.close }.clear
       trap(:USR1) { reopen_worker_logs(worker.nr) rescue nil }
-      trap(:QUIT) do
-        G.alive = false
-        # closing anything we IO.select on will raise EBADF
-        HttpServer::LISTENERS.map! { |s| s.close rescue nil }
-      end
+      trap(:QUIT) { G.quit! }
       [:TERM, :INT].each { |sig| trap(sig) { exit!(0) } } # instant shutdown
       logger.info "Rainbows! #@use worker_connections=#@worker_connections"
     end
@@ -89,13 +83,12 @@ module Rainbows
       logger.error e.backtrace.join("\n")
     end
 
-    def join_threads(threads, worker)
-      Rainbows::G.alive = false
+    def join_threads(threads)
+      G.quit!
       expire = Time.now + (timeout * 2.0)
-      m = 0
       until (threads.delete_if { |thr| ! thr.alive? }).empty?
         threads.each { |thr|
-          worker.tmp.chmod(m = 0 == m ? 1 : 0)
+          G.tick
           thr.join(1)
           break if Time.now >= expire
         }
diff --git a/lib/rainbows/ev_core.rb b/lib/rainbows/ev_core.rb
index 2679b5a..244e726 100644
--- a/lib/rainbows/ev_core.rb
+++ b/lib/rainbows/ev_core.rb
@@ -28,8 +28,8 @@ module Rainbows
       when HttpParserError # try to tell the client they're bad
         ERROR_400_RESPONSE
       else
-        G.logger.error "Read error: #{e.inspect}"
-        G.logger.error e.backtrace.join("\n")
+        G.server.logger.error "Read error: #{e.inspect}"
+        G.server.logger.error e.backtrace.join("\n")
         ERROR_500_RESPONSE
       end
       write(msg)
diff --git a/lib/rainbows/event_machine.rb b/lib/rainbows/event_machine.rb
index 196fbca..5c25ade 100644
--- a/lib/rainbows/event_machine.rb
+++ b/lib/rainbows/event_machine.rb
@@ -57,7 +57,7 @@ module Rainbows
           @env[REMOTE_ADDR] = @remote_addr
           @env[ASYNC_CALLBACK] = method(:response_write)
 
-          response = catch(:async) { G.app.call(@env.update(RACK_DEFAULTS)) }
+          response = catch(:async) { APP.call(@env.update(RACK_DEFAULTS)) }
 
           # too tricky to support pipelining with :async since the
           # second (pipelined) request could be a stuck behind a
@@ -166,22 +166,17 @@ module Rainbows
 
     module Server
 
-      def initialize(conns)
-        @limit = Rainbows::G.max + HttpServer::LISTENERS.size
-        @em_conns = conns
-      end
-
       def close
         detach
         @io.close
       end
 
       def notify_readable
-        return if @em_conns.size >= @limit
+        return if CUR.size >= MAX
         begin
           io = @io.accept_nonblock
           sig = EM.attach_fd(io.fileno, false)
-          @em_conns[sig] = Client.new(sig, io)
+          CUR[sig] = Client.new(sig, io)
         rescue Errno::EAGAIN, Errno::ECONNABORTED
         end
       end
@@ -192,24 +187,26 @@ module Rainbows
     # given a INT, QUIT, or TERM signal)
     def worker_loop(worker)
       init_worker_process(worker)
-      m = 0
 
       # enable them both, should be non-fatal if not supported
       EM.epoll
       EM.kqueue
       logger.info "EventMachine: epoll=#{EM.epoll?} kqueue=#{EM.kqueue?}"
+      Client.const_set(:APP, G.server.app)
+      Server.const_set(:MAX, G.server.worker_connections +
+                             HttpServer::LISTENERS.size)
       EM.run {
         conns = EM.instance_variable_get(:@conns) or
           raise RuntimeError, "EM @conns instance variable not accessible!"
+        Server.const_set(:CUR, conns)
         EM.add_periodic_timer(1) do
-          worker.tmp.chmod(m = 0 == m ? 1 : 0)
-          unless G.alive
+          unless G.tick
             conns.each_value { |client| Client === client and client.quit }
             EM.stop if conns.empty? && EM.reactor_running?
           end
         end
         LISTENERS.map! do |s|
-          EM.watch(s, Server, conns) { |c| c.notify_readable = true }
+          EM.watch(s, Server) { |c| c.notify_readable = true }
         end
       }
     end
diff --git a/lib/rainbows/rev.rb b/lib/rainbows/rev.rb
index c4c77bd..66f6ed1 100644
--- a/lib/rainbows/rev.rb
+++ b/lib/rainbows/rev.rb
@@ -51,7 +51,7 @@ module Rainbows
           (@env[RACK_INPUT] = @input).rewind
           alive = @hp.keepalive?
           @env[REMOTE_ADDR] = @remote_addr
-          response = G.app.call(@env.update(RACK_DEFAULTS))
+          response = APP.call(@env.update(RACK_DEFAULTS))
           alive &&= G.alive
           out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if @hp.headers?
 
@@ -97,7 +97,7 @@ module Rainbows
       G = Rainbows::G
 
       def on_readable
-        return if G.cur >= G.max
+        return if G.cur >= MAX
         begin
           Client.new(@_io.accept_nonblock).attach(::Rev::Loop.default)
         rescue Errno::EAGAIN, Errno::ECONNABORTED
@@ -173,8 +173,10 @@ module Rainbows
     # given a INT, QUIT, or TERM signal)
     def worker_loop(worker)
       init_worker_process(worker)
+      Client.const_set(:APP, G.server.app)
+      Server.const_set(:MAX, G.server.worker_connections)
       rloop = ::Rev::Loop.default
-      Heartbeat.new(worker.tmp).attach(rloop)
+      Heartbeat.new(1, true).attach(rloop)
       LISTENERS.map! { |s| Server.new(s).attach(rloop) }
       rloop.run
     end
diff --git a/lib/rainbows/rev/heartbeat.rb b/lib/rainbows/rev/heartbeat.rb
index 755b136..63eb71d 100644
--- a/lib/rainbows/rev/heartbeat.rb
+++ b/lib/rainbows/rev/heartbeat.rb
@@ -11,15 +11,9 @@ module Rainbows
     # will also detect and execute the graceful exit if triggered
     # by SIGQUIT
     class Heartbeat < ::Rev::TimerWatcher
-      # +tmp+ must be a +File+ that responds to +chmod+
-      def initialize(tmp)
-        @m, @tmp = 0, tmp
-        super(1, true)
-      end
 
       def on_timer
-        @tmp.chmod(@m = 0 == @m ? 1 : 0)
-        exit if (! G.alive && G.cur <= 0)
+        exit if (! G.tick && G.cur <= 0)
       end
 
     end
diff --git a/lib/rainbows/revactor.rb b/lib/rainbows/revactor.rb
index ddcbc04..003b704 100644
--- a/lib/rainbows/revactor.rb
+++ b/lib/rainbows/revactor.rb
@@ -101,20 +101,14 @@ module Rainbows
         end
       end
 
-      m = 0
-      check_quit = lambda do
-        worker.tmp.chmod(m = 0 == m ? 1 : 0)
-        G.alive = false if master_pid != Process.ppid
-      end
-
       begin
         Actor.receive do |filter|
-          filter.after(1, &check_quit)
+          filter.after(1) { G.tick }
           filter.when(Case[:exit, Actor, Object]) do |_,actor,_|
             orig = clients.size
             clients.delete(actor.object_id)
             orig >= limit and listeners.each { |l| l << :resume }
-            check_quit.call
+            G.tick
           end
         end
       end while G.alive || clients.size > 0
diff --git a/lib/rainbows/thread_pool.rb b/lib/rainbows/thread_pool.rb
index 280ba40..7934dc8 100644
--- a/lib/rainbows/thread_pool.rb
+++ b/lib/rainbows/thread_pool.rb
@@ -28,16 +28,15 @@ module Rainbows
     def worker_loop(worker)
       init_worker_process(worker)
       pool = (1..worker_connections).map { new_worker_thread }
-      m = 0
 
-      while G.alive && master_pid == Process.ppid
+      while G.alive
+        # if any worker dies, something is serious wrong, bail
         pool.each do |thr|
-          worker.tmp.chmod(m = 0 == m ? 1 : 0)
-          # if any worker dies, something is serious wrong, bail
-          thr.join(1) and break
+          G.tick
+          thr.join(1) and G.quit!
         end
       end
-      join_threads(pool, worker)
+      join_threads(pool)
     end
 
     def new_worker_thread
diff --git a/lib/rainbows/thread_spawn.rb b/lib/rainbows/thread_spawn.rb
index 39934a6..a3068c9 100644
--- a/lib/rainbows/thread_spawn.rb
+++ b/lib/rainbows/thread_spawn.rb
@@ -22,21 +22,18 @@ module Rainbows
     def worker_loop(worker)
       init_worker_process(worker)
       threads = ThreadGroup.new
-      alive = worker.tmp
-      m = 0
       limit = worker_connections
 
       begin
-        G.alive && master_pid == Process.ppid or break
         ret = begin
-          alive.chmod(m = 0 == m ? 1 : 0)
+          G.tick or break
           IO.select(LISTENERS, nil, nil, 1) or next
         rescue Errno::EINTR
           retry
         rescue Errno::EBADF, TypeError
           break
         end
-        alive.chmod(m = 0 == m ? 1 : 0)
+        G.tick
 
         ret.first.each do |l|
           # Sleep if we're busy, another less busy worker process may
@@ -57,7 +54,7 @@ module Rainbows
       rescue Object => e
         listen_loop_error(e)
       end while true
-      join_threads(threads.list, worker)
+      join_threads(threads.list)
     end
 
   end