From 1269cbb93d26ff938f443e8931e908481374bdc3 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Tue, 1 Dec 2009 01:08:23 -0800 Subject: revactor: avoid unbounded memory growth :x This model has basically been rewritten to avoid unbounded memory growth (slow without keepalive) due to listeners not properly handling :*_closed messages. Performance is much more stable as a result, too. --- lib/rainbows/revactor.rb | 70 +++++++++++++++++++++++++++--------------------- 1 file changed, 40 insertions(+), 30 deletions(-) (limited to 'lib/rainbows/revactor.rb') diff --git a/lib/rainbows/revactor.rb b/lib/rainbows/revactor.rb index 9a18157..125e148 100644 --- a/lib/rainbows/revactor.rb +++ b/lib/rainbows/revactor.rb @@ -78,42 +78,48 @@ module Rainbows def worker_loop(worker) init_worker_process(worker) RD_ARGS[:timeout] = G.kato if G.kato > 0 - - root = Actor.current - root.trap_exit = true - + nr = 0 limit = worker_connections - revactorize_listeners! - clients = {} + actor_exit = Case[:exit, Actor, Object] - listeners = LISTENERS.map do |s| - Actor.spawn(s) do |l| + revactorize_listeners.each do |l, close, accept| + Actor.spawn(l, close, accept) do |l, close, accept| + Actor.current.trap_exit = true + l.controller = l.instance_eval { @receiver = Actor.current } begin - while clients.size >= limit - logger.info "busy: clients=#{clients.size} >= limit=#{limit}" - Actor.receive { |filter| filter.when(:resume) {} } + while nr >= limit + l.disable if l.enabled? + logger.info "busy: clients=#{nr} >= limit=#{limit}" + Actor.receive do |f| + f.when(close) {} + f.when(actor_exit) { nr -= 1 } + f.after(0.01) {} # another listener could've gotten an exit + end + end + + l.enable unless l.enabled? + Actor.receive do |f| + f.when(close) {} + f.when(actor_exit) { nr -= 1 } + f.when(accept) do |_, _, s| + nr += 1 + Actor.spawn_link(s) { |c| process_client(c) } + end end - actor = Actor.spawn(l.accept) { |c| process_client(c) } - clients[actor.object_id] = actor - root.link(actor) - rescue Errno::EAGAIN, Errno::ECONNABORTED rescue => e Error.listen_loop(e) end while G.alive + Actor.receive do |f| + f.when(close) {} + f.when(actor_exit) { nr -= 1 } + end while nr > 0 end end - begin - Actor.receive do |filter| - filter.after(1) { G.tick } - filter.when(Case[:exit, Actor, Object]) do |_,actor,_| - orig = clients.size - clients.delete(actor.object_id) - orig >= limit and listeners.each { |l| l << :resume } - G.tick - end - end - end while G.alive || clients.size > 0 + Actor.sleep 1 while G.tick + expire = Time.now + timeout * 2.0 + Actor.sleep 1 while nr > 0 && Time.now < expire + rescue Errno::EMFILE => e end # if we get any error, try to write something back to the client @@ -127,13 +133,17 @@ module Rainbows rescue end - def revactorize_listeners! - LISTENERS.map! do |s| + def revactorize_listeners + LISTENERS.map do |s| case s when TCPServer - ::Revactor::TCP.listen(s, nil) + l = ::Revactor::TCP.listen(s, nil) + [ l, T[:tcp_closed, ::Revactor::TCP::Socket], + T[:tcp_connection, l, ::Revactor::TCP::Socket] ] when UNIXServer - ::Revactor::UNIX.listen(s) + l = ::Revactor::UNIX.listen(s) + [ l, T[:unix_closed, ::Revactor::UNIX::Socket ], + T[:unix_connection, l, ::Revactor::UNIX::Socket] ] end end end -- cgit v1.2.3-24-ge0c7