about summary refs log tree commit homepage
path: root/lib/rainbows/revactor.rb
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2009-12-01 01:08:23 -0800
committerEric Wong <normalperson@yhbt.net>2009-12-01 21:13:03 -0800
commit1269cbb93d26ff938f443e8931e908481374bdc3 (patch)
treee1bcc63f511b81a2a5a5253e001845fdac313c85 /lib/rainbows/revactor.rb
parentc8972f91b2b2e1c49c7c8fdc070430435c38de4e (diff)
downloadrainbows-1269cbb93d26ff938f443e8931e908481374bdc3.tar.gz
This model has basically been rewritten to avoid unbounded
memory growth (slow without keepalive) due to listeners
not properly handling :*_closed messages.

Performance is much more stable as a result, too.
Diffstat (limited to 'lib/rainbows/revactor.rb')
-rw-r--r--lib/rainbows/revactor.rb70
1 files changed, 40 insertions, 30 deletions
diff --git a/lib/rainbows/revactor.rb b/lib/rainbows/revactor.rb
index 9a18157..125e148 100644
--- a/lib/rainbows/revactor.rb
+++ b/lib/rainbows/revactor.rb
@@ -78,42 +78,48 @@ module Rainbows
     def worker_loop(worker)
       init_worker_process(worker)
       RD_ARGS[:timeout] = G.kato if G.kato > 0
-
-      root = Actor.current
-      root.trap_exit = true
-
+      nr = 0
       limit = worker_connections
-      revactorize_listeners!
-      clients = {}
+      actor_exit = Case[:exit, Actor, Object]
 
-      listeners = LISTENERS.map do |s|
-        Actor.spawn(s) do |l|
+      revactorize_listeners.each do |l, close, accept|
+        Actor.spawn(l, close, accept) do |l, close, accept|
+          Actor.current.trap_exit = true
+          l.controller = l.instance_eval { @receiver = Actor.current }
           begin
-            while clients.size >= limit
-              logger.info "busy: clients=#{clients.size} >= limit=#{limit}"
-              Actor.receive { |filter| filter.when(:resume) {} }
+            while nr >= limit
+              l.disable if l.enabled?
+              logger.info "busy: clients=#{nr} >= limit=#{limit}"
+              Actor.receive do |f|
+                f.when(close) {}
+                f.when(actor_exit) { nr -= 1 }
+                f.after(0.01) {} # another listener could've gotten an exit
+              end
+            end
+
+            l.enable unless l.enabled?
+            Actor.receive do |f|
+              f.when(close) {}
+              f.when(actor_exit) { nr -= 1 }
+              f.when(accept) do |_, _, s|
+                nr += 1
+                Actor.spawn_link(s) { |c| process_client(c) }
+              end
             end
-            actor = Actor.spawn(l.accept) { |c| process_client(c) }
-            clients[actor.object_id] = actor
-            root.link(actor)
-          rescue Errno::EAGAIN, Errno::ECONNABORTED
           rescue => e
             Error.listen_loop(e)
           end while G.alive
+          Actor.receive do |f|
+            f.when(close) {}
+            f.when(actor_exit) { nr -= 1 }
+          end while nr > 0
         end
       end
 
-      begin
-        Actor.receive do |filter|
-          filter.after(1) { G.tick }
-          filter.when(Case[:exit, Actor, Object]) do |_,actor,_|
-            orig = clients.size
-            clients.delete(actor.object_id)
-            orig >= limit and listeners.each { |l| l << :resume }
-            G.tick
-          end
-        end
-      end while G.alive || clients.size > 0
+      Actor.sleep 1 while G.tick
+      expire = Time.now + timeout * 2.0
+      Actor.sleep 1 while nr > 0 && Time.now < expire
+      rescue Errno::EMFILE => e
     end
 
     # if we get any error, try to write something back to the client
@@ -127,13 +133,17 @@ module Rainbows
       rescue
     end
 
-    def revactorize_listeners!
-      LISTENERS.map! do |s|
+    def revactorize_listeners
+      LISTENERS.map do |s|
         case s
         when TCPServer
-          ::Revactor::TCP.listen(s, nil)
+          l = ::Revactor::TCP.listen(s, nil)
+          [ l, T[:tcp_closed, ::Revactor::TCP::Socket],
+            T[:tcp_connection, l, ::Revactor::TCP::Socket] ]
         when UNIXServer
-          ::Revactor::UNIX.listen(s)
+          l = ::Revactor::UNIX.listen(s)
+          [ l, T[:unix_closed, ::Revactor::UNIX::Socket ],
+            T[:unix_connection, l, ::Revactor::UNIX::Socket] ]
         end
       end
     end