about summary refs log tree commit homepage
diff options
context:
space:
mode:
-rw-r--r--lib/rainbows.rb6
-rw-r--r--lib/rainbows/base.rb22
-rw-r--r--lib/rainbows/rev.rb32
-rw-r--r--lib/rainbows/revactor.rb18
-rw-r--r--lib/rainbows/thread_pool.rb19
-rw-r--r--lib/rainbows/thread_spawn.rb5
-rw-r--r--t/lib-graceful.sh47
-rwxr-xr-xt/t1002-thread-pool-graceful.sh37
-rwxr-xr-xt/t2002-thread-spawn-graceful.sh37
-rwxr-xr-xt/t3002-revactor-graceful.sh38
-rwxr-xr-xt/t4002-rev-graceful.sh52
11 files changed, 106 insertions, 207 deletions
diff --git a/lib/rainbows.rb b/lib/rainbows.rb
index 7978288..096f700 100644
--- a/lib/rainbows.rb
+++ b/lib/rainbows.rb
@@ -3,6 +3,12 @@ require 'unicorn'
 
 module Rainbows
 
+  # global vars because class/instance variables are confusing me :<
+  # this struct is only accessed inside workers and thus private to each
+  G = Struct.new(:cur, :max, :logger, :alive, :app).new
+  # G.cur may not be used the network concurrency model
+  G.alive = true
+
   require 'rainbows/const'
   require 'rainbows/http_server'
   require 'rainbows/http_response'
diff --git a/lib/rainbows/base.rb b/lib/rainbows/base.rb
index a78262e..95d6545 100644
--- a/lib/rainbows/base.rb
+++ b/lib/rainbows/base.rb
@@ -8,6 +8,7 @@ module Rainbows
 
     include Unicorn
     include Rainbows::Const
+    G = Rainbows::G
 
     # write a response without caring if it went out or not for error
     # messages.
@@ -17,22 +18,28 @@ module Rainbows
       client.close rescue nil
     end
 
-    # TODO: migrate into Unicorn::HttpServer
     def listen_loop_error(e)
-      return if HttpServer::LISTENERS.first.nil? || IOError === e
+      G.alive or return
       logger.error "Unhandled listen loop exception #{e.inspect}."
       logger.error e.backtrace.join("\n")
     end
 
     def init_worker_process(worker)
       super(worker)
+      G.cur = 0
+      G.max = worker_connections
+      G.logger = logger
+      G.app = app
 
       # we're don't use the self-pipe mechanism in the Rainbows! worker
       # since we don't defer reopening logs
       HttpServer::SELF_PIPE.each { |x| x.close }.clear
       trap(:USR1) { reopen_worker_logs(worker.nr) rescue nil }
-      # closing anything we IO.select on will raise EBADF
-      trap(:QUIT) { HttpServer::LISTENERS.map! { |s| s.close rescue nil } }
+      trap(:QUIT) do
+        G.alive = false
+        # closing anything we IO.select on will raise EBADF
+        HttpServer::LISTENERS.map! { |s| s.close rescue nil }
+      end
       [:TERM, :INT].each { |sig| trap(sig) { exit!(0) } } # instant shutdown
       logger.info "Rainbows! #@use worker_connections=#@worker_connections"
     end
@@ -63,7 +70,7 @@ module Rainbows
           response = app.call(env)
         end
 
-        alive = hp.keepalive? && ! Thread.current[:quit]
+        alive = hp.keepalive? && G.alive
         out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if hp.headers?
         HttpResponse.write(client, response, out)
       end while alive and hp.reset.nil? and env.clear
@@ -83,8 +90,7 @@ module Rainbows
     end
 
     def join_threads(threads, worker)
-      logger.info "Joining threads..."
-      threads.each { |thr| thr[:quit] = true }
+      Rainbows::G.alive = false
       expire = Time.now + (timeout * 2.0)
       m = 0
       while (nr = threads.count { |thr| thr.alive? }) > 0
@@ -94,11 +100,11 @@ module Rainbows
           break if Time.now >= expire
         }
       end
-      logger.info "Done joining threads. #{nr} left running"
     end
 
     def self.included(klass)
       klass.const_set :LISTENERS, HttpServer::LISTENERS
+      klass.const_set :G, Rainbows::G
     end
 
   end
diff --git a/lib/rainbows/rev.rb b/lib/rainbows/rev.rb
index 7e5ca27..7d941f6 100644
--- a/lib/rainbows/rev.rb
+++ b/lib/rainbows/rev.rb
@@ -33,19 +33,15 @@ module Rainbows
 
   module Rev
 
-    # global vars because class/instance variables are confusing me :<
-    # this struct is only accessed inside workers and thus private to each
-    G = Struct.new(:nr, :max, :logger, :alive, :app).new
-
     include Base
 
     class Client < ::Rev::IO
       include Unicorn
       include Rainbows::Const
-      G = Rainbows::Rev::G
+      G = Rainbows::G
 
       def initialize(io)
-        G.nr += 1
+        G.cur += 1
         super(io)
         @remote_addr = ::TCPSocket === io ? io.peeraddr.last : LOCALHOST
         @env = {}
@@ -91,7 +87,7 @@ module Rainbows
       end
 
       def on_close
-        G.nr -= 1
+        G.cur -= 1
       end
 
       def tmpio
@@ -143,10 +139,10 @@ module Rainbows
     end
 
     class Server < ::Rev::IO
-      G = Rainbows::Rev::G
+      G = Rainbows::G
 
       def on_readable
-        return if G.nr >= G.max
+        return if G.cur >= G.max
         begin
           Client.new(@_io.accept_nonblock).attach(::Rev::Loop.default)
         rescue Errno::EAGAIN, Errno::ECONNBORTED
@@ -160,13 +156,21 @@ module Rainbows
     # given a INT, QUIT, or TERM signal)
     def worker_loop(worker)
       init_worker_process(worker)
-      G.nr = 0
-      G.max = worker_connections
-      G.alive = true
-      G.logger = logger
-      G.app = app
+      graceful_waiter = nil
+      trap(:QUIT) do
+        G.alive = false
+        LISTENERS.map! { |s| s.close rescue nil }
+        # Rev may get stuck in a loop with no events possible, spawn a new
+        # thread to join on graceful exits when our client count goes to zero
+        graceful_waiter = Thread.new {
+          sleep(0.1) while G.cur > 0
+          exit
+        }
+      end
+
       LISTENERS.map! { |s| Server.new(s).attach(::Rev::Loop.default) }
       ::Rev::Loop.default.run
+      graceful_waiter.join(timeout * 2.0)
     end
 
   end
diff --git a/lib/rainbows/revactor.rb b/lib/rainbows/revactor.rb
index f61de97..3db1062 100644
--- a/lib/rainbows/revactor.rb
+++ b/lib/rainbows/revactor.rb
@@ -55,7 +55,7 @@ module Rainbows
           response = app.call(env)
         end
 
-        alive = hp.keepalive? && ! Actor.current[:quit]
+        alive = hp.keepalive? && G.alive
         out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if hp.headers?
         HttpResponse.write(client, response, out)
       end while alive and hp.reset.nil? and env.clear
@@ -86,7 +86,6 @@ module Rainbows
       limit = worker_connections
       revactorize_listeners!
       clients = {}
-      alive = true
 
       listeners = LISTENERS.map do |s|
         Actor.spawn(s) do |l|
@@ -99,23 +98,16 @@ module Rainbows
             clients[actor.object_id] = actor
             root.link(actor)
           rescue Errno::EAGAIN, Errno::ECONNABORTED
-          rescue Errno::EBADF
-            break
           rescue Object => e
-            listen_loop_error(e) if alive
-          end while alive
+            listen_loop_error(e)
+          end while G.alive
         end
       end
 
       m = 0
       check_quit = lambda do
         worker.tmp.chmod(m = 0 == m ? 1 : 0)
-        if listeners.any? { |l| l.dead? } ||
-           master_pid != Process.ppid ||
-           LISTENERS.first.nil?
-          alive = false
-          clients.each_value { |a| a[:quit] = true }
-        end
+        G.alive = false if master_pid != Process.ppid
       end
 
       begin
@@ -128,7 +120,7 @@ module Rainbows
             check_quit.call
           end
         end
-      end while alive || clients.size > 0
+      end while G.alive || clients.size > 0
     end
 
   private
diff --git a/lib/rainbows/thread_pool.rb b/lib/rainbows/thread_pool.rb
index c742e5d..30e8f69 100644
--- a/lib/rainbows/thread_pool.rb
+++ b/lib/rainbows/thread_pool.rb
@@ -30,7 +30,7 @@ module Rainbows
       pool = (1..worker_connections).map { new_worker_thread }
       m = 0
 
-      while LISTENERS.first && master_pid == Process.ppid
+      while G.alive && master_pid == Process.ppid
         pool.each do |thr|
           worker.tmp.chmod(m = 0 == m ? 1 : 0)
           # if any worker dies, something is serious wrong, bail
@@ -44,21 +44,20 @@ module Rainbows
       Thread.new {
         begin
           begin
-            ret = IO.select(LISTENERS, nil, nil, timeout) or next
-            ret.first.each do |sock|
-              begin
-                process_client(sock.accept_nonblock)
-              rescue Errno::EAGAIN, Errno::ECONNABORTED
-              end
-            end
+            ret = IO.select(LISTENERS, nil, nil, 1) and
+                  ret.first.each do |sock|
+                    begin
+                      process_client(sock.accept_nonblock)
+                    rescue Errno::EAGAIN, Errno::ECONNABORTED
+                    end
+                  end
           rescue Errno::EINTR
-            next
           rescue Errno::EBADF, TypeError
             break
           end
         rescue Object => e
           listen_loop_error(e)
-        end while ! Thread.current[:quit] && LISTENERS.first
+        end while G.alive
       }
     end
 
diff --git a/lib/rainbows/thread_spawn.rb b/lib/rainbows/thread_spawn.rb
index 104e764..39934a6 100644
--- a/lib/rainbows/thread_spawn.rb
+++ b/lib/rainbows/thread_spawn.rb
@@ -27,9 +27,10 @@ module Rainbows
       limit = worker_connections
 
       begin
+        G.alive && master_pid == Process.ppid or break
         ret = begin
           alive.chmod(m = 0 == m ? 1 : 0)
-          IO.select(LISTENERS, nil, nil, timeout) or next
+          IO.select(LISTENERS, nil, nil, 1) or next
         rescue Errno::EINTR
           retry
         rescue Errno::EBADF, TypeError
@@ -55,7 +56,7 @@ module Rainbows
         end
       rescue Object => e
         listen_loop_error(e)
-      end while LISTENERS.first && master_pid == Process.ppid
+      end while true
       join_threads(threads.list, worker)
     end
 
diff --git a/t/lib-graceful.sh b/t/lib-graceful.sh
new file mode 100644
index 0000000..d2a6be8
--- /dev/null
+++ b/t/lib-graceful.sh
@@ -0,0 +1,47 @@
+model=$1
+. ./test-lib.sh
+echo "graceful test for model=$model"
+case $model in
+Rev) require_rev ;;
+Revactor) require_revactor ;;
+esac
+
+eval $(unused_listen)
+rtmpfiles unicorn_config curl_out pid r_err r_out fifo
+rm -f $fifo
+mkfifo $fifo
+
+cat > $unicorn_config <<EOF
+listen "$listen"
+stderr_path "$r_err"
+stdout_path "$r_out"
+pid "$pid"
+Rainbows! { use :$model }
+EOF
+
+rainbows -D sleep.ru -c $unicorn_config
+wait_for_pid $pid
+rainbows_pid=$(cat $pid)
+
+curl -sSfv -T- </dev/null http://$listen/5 > $curl_out 2> $fifo &
+
+awk -v rainbows_pid=$rainbows_pid '
+{ print $0 }
+/100 Continue/ {
+        print "awk: sending SIGQUIT to", rainbows_pid
+        system("kill -QUIT "rainbows_pid)
+}' $fifo
+wait
+
+dbgcat r_err
+
+test x"$(wc -l < $curl_out)" = x1
+nr=$(sort < $curl_out | uniq | wc -l)
+
+test "$nr" -eq 1
+test x$(sort < $curl_out | uniq) = xHello
+! grep Error $r_err
+while kill -0 $rainbows_pid >/dev/null 2>&1
+do
+        sleep 1
+done
diff --git a/t/t1002-thread-pool-graceful.sh b/t/t1002-thread-pool-graceful.sh
index 0830bc7..c5d1f87 100755
--- a/t/t1002-thread-pool-graceful.sh
+++ b/t/t1002-thread-pool-graceful.sh
@@ -1,37 +1,2 @@
 #!/bin/sh
-. ./test-lib.sh
-
-eval $(unused_listen)
-rtmpfiles unicorn_config curl_out curl_err pid r_err r_out
-nr_thread=10
-nr_client=10
-cat > $unicorn_config <<EOF
-listen "$listen"
-stderr_path "$r_err"
-stdout_path "$r_out"
-pid "$pid"
-Rainbows! do
-  use :ThreadPool
-  worker_connections $nr_thread
-end
-EOF
-
-rainbows -D sleep.ru -c $unicorn_config
-wait_for_pid $pid
-
-for i in $(awk "BEGIN{for(i=0;i<$nr_client;++i) print i}" </dev/null)
-do
-        curl -sSf http://$listen/5 >> $curl_out 2>> $curl_err &
-done
-sleep 2
-kill -QUIT $(cat $pid)
-wait
-
-dbgcat r_err
-! test -s $curl_err
-test x"$(wc -l < $curl_out)" = x$nr_client
-nr=$(sort < $curl_out | uniq | wc -l)
-
-test "$nr" -eq 1
-test x$(sort < $curl_out | uniq) = xHello
-! grep Error $r_err
+. ./lib-graceful.sh ThreadPool
diff --git a/t/t2002-thread-spawn-graceful.sh b/t/t2002-thread-spawn-graceful.sh
index 5a02670..b263306 100755
--- a/t/t2002-thread-spawn-graceful.sh
+++ b/t/t2002-thread-spawn-graceful.sh
@@ -1,37 +1,2 @@
 #!/bin/sh
-. ./test-lib.sh
-
-eval $(unused_listen)
-rtmpfiles unicorn_config curl_out curl_err pid r_err r_out
-nr_thread=10
-nr_client=10
-cat > $unicorn_config <<EOF
-listen "$listen"
-stderr_path "$r_err"
-stdout_path "$r_out"
-pid "$pid"
-Rainbows! do
-  use :ThreadSpawn
-  worker_connections $nr_thread
-end
-EOF
-
-rainbows -D sleep.ru -c $unicorn_config
-wait_for_pid $pid
-
-for i in $(awk "BEGIN{for(i=0;i<$nr_client;++i) print i}" </dev/null)
-do
-        curl -sSf http://$listen/5 >> $curl_out 2>> $curl_err &
-done
-sleep 2
-kill -QUIT $(cat $pid)
-wait
-
-dbgcat r_err
-! test -s $curl_err
-test x"$(wc -l < $curl_out)" = x$nr_client
-nr=$(sort < $curl_out | uniq | wc -l)
-
-test "$nr" -eq 1
-test x$(sort < $curl_out | uniq) = xHello
-! grep Error $r_err
+. ./lib-graceful.sh ThreadSpawn
diff --git a/t/t3002-revactor-graceful.sh b/t/t3002-revactor-graceful.sh
index 67c6ba3..8696e06 100755
--- a/t/t3002-revactor-graceful.sh
+++ b/t/t3002-revactor-graceful.sh
@@ -1,38 +1,2 @@
 #!/bin/sh
-. ./test-lib.sh
-require_revactor
-
-eval $(unused_listen)
-rtmpfiles unicorn_config curl_out curl_err pid r_err r_out
-nr_actor=10
-nr_client=10
-cat > $unicorn_config <<EOF
-listen "$listen"
-stderr_path "$r_err"
-stdout_path "$r_out"
-pid "$pid"
-Rainbows! do
-  use :Revactor
-  worker_connections $nr_actor
-end
-EOF
-
-rainbows -D sleep.ru -c $unicorn_config
-wait_for_pid $pid
-
-for i in $(awk "BEGIN{for(i=0;i<$nr_client;++i) print i}" </dev/null)
-do
-        curl -sSf http://$listen/5 >> $curl_out 2>> $curl_err &
-done
-sleep 2
-kill -QUIT $(cat $pid)
-wait
-
-dbgcat r_err
-! test -s $curl_err
-test x"$(wc -l < $curl_out)" = x$nr_client
-nr=$(sort < $curl_out | uniq | wc -l)
-
-test "$nr" -eq 1
-test x$(sort < $curl_out | uniq) = xHello
-! grep Error $r_err
+. ./lib-graceful.sh Revactor
diff --git a/t/t4002-rev-graceful.sh b/t/t4002-rev-graceful.sh
index e286886..788bc6b 100755
--- a/t/t4002-rev-graceful.sh
+++ b/t/t4002-rev-graceful.sh
@@ -1,52 +1,2 @@
 #!/bin/sh
-. ./test-lib.sh
-require_rev
-
-eval $(unused_listen)
-rtmpfiles unicorn_config tmp pid r_err r_out out
-nr_client=10
-cat > $unicorn_config <<EOF
-listen "$listen"
-stderr_path "$r_err"
-stdout_path "$r_out"
-pid "$pid"
-Rainbows! do
-  use :Rev
-end
-EOF
-
-rainbows -D sleep.ru -c $unicorn_config
-wait_for_pid $pid
-
-for i in $(awk "BEGIN{for(i=0;i<$nr_client;++i) print i}" </dev/null)
-do
-        (
-                rtmpfiles fifo tmp
-                rm -f $fifo
-                mkfifo $fifo
-                (
-                        printf 'GET /0 HTTP/1.1\r\n'
-                        cat $fifo > $tmp &
-                        sleep 1
-                        printf 'Host: example.com\r\n'
-                        sleep 1
-                        printf 'Connection: close\r\n'
-                        sleep 1
-                        printf '\r\n'
-                        wait
-                ) | socat - TCP:$listen > $fifo
-                fgrep 'Hello' $tmp >> $out || :
-                rm -f $fifo $tmp
-        ) &
-done
-
-sleep 2 # potentially racy :<
-kill -QUIT $(cat $pid)
-wait
-
-test x"$(wc -l < $out)" = x$nr_client
-nr=$(sort < $out | uniq | wc -l)
-test "$nr" -eq 1
-
-test x$(sort < $out | uniq) = xHello
-! grep Error $r_err
+. ./lib-graceful.sh Rev