about summary refs log tree commit homepage
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/rainbows.rb1
-rw-r--r--lib/rainbows/thread_spawn.rb71
2 files changed, 72 insertions, 0 deletions
diff --git a/lib/rainbows.rb b/lib/rainbows.rb
index 611c0ec..b2c315d 100644
--- a/lib/rainbows.rb
+++ b/lib/rainbows.rb
@@ -15,6 +15,7 @@ module Rainbows
 
   autoload :Revactor, 'rainbows/revactor'
   autoload :ThreadPool, 'rainbows/thread_pool'
+  autoload :ThreadSpawn, 'rainbows/thread_spawn'
 
   class << self
     def run(app, options = {})
diff --git a/lib/rainbows/thread_spawn.rb b/lib/rainbows/thread_spawn.rb
new file mode 100644
index 0000000..085da39
--- /dev/null
+++ b/lib/rainbows/thread_spawn.rb
@@ -0,0 +1,71 @@
+# -*- encoding: binary -*-
+module Rainbows
+
+  module ThreadSpawn
+
+    include Base
+
+    def worker_loop(worker)
+      init_worker_process(worker)
+      threads = ThreadGroup.new
+      alive = worker.tmp
+      nr = 0
+      limit = worker_connections
+
+      # closing anything we IO.select on will raise EBADF
+      trap(:USR1) { reopen_worker_logs(worker.nr) rescue nil }
+      trap(:QUIT) { alive = false; LISTENERS.map! { |s| s.close rescue nil } }
+      [:TERM, :INT].each { |sig| trap(sig) { exit(0) } } # instant shutdown
+      logger.info "worker=#{worker.nr} ready with ThreadSpawn"
+
+      while alive && master_pid == Process.ppid
+        ret = begin
+          IO.select(LISTENERS, nil, nil, timeout/2.0) or next
+        rescue Errno::EINTR
+          retry
+        rescue Errno::EBADF
+          alive = false
+        end
+
+        ret.first.each do |l|
+          while threads.list.size >= limit
+            nuke_old_thread(threads)
+          end
+          c = begin
+            l.accept_nonblock
+          rescue Errno::EINTR, Errno::ECONNABORTED
+            next
+          end
+          threads.add(Thread.new(c) { |c|
+            Thread.current[:t] = Time.now
+            process_client(c)
+          })
+        end
+      end
+      join_spawned_threads(threads)
+    end
+
+    def nuke_old_thread(threads)
+      threads.list.each do |thr|
+        next if (Time.now - (thr[:t] || next)) < timeout
+        thr.kill
+        logger.error "killed #{thr.inspect} for being too old"
+        return
+      end
+      # nothing to kill, yield to another thread
+      Thread.pass
+    end
+
+    def join_spawned_threads(threads)
+      logger.info "Joining spawned threads..."
+      t0 = Time.now
+      timeleft = timeout
+      threads.list.each { |thr|
+        thr.join(timeleft)
+        timeleft -= (Time.now - t0)
+      }
+      logger.info "Done joining spawned threads."
+    end
+
+  end
+end