From df204a05d3a5bda8f716fa9f51be464fa59a3af1 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Sun, 11 Oct 2009 12:15:47 -0700 Subject: cleanup thread models, threads no longer time out The process-based heartbeat continues, but we no longer time threads out just because a client is idle for any reason (for now). --- lib/rainbows/base.rb | 26 +++++++++++++++++---- lib/rainbows/thread_pool.rb | 55 ++++++++++++-------------------------------- lib/rainbows/thread_spawn.rb | 49 +++++++++++++-------------------------- 3 files changed, 53 insertions(+), 77 deletions(-) (limited to 'lib') diff --git a/lib/rainbows/base.rb b/lib/rainbows/base.rb index 2da6d41..8a38117 100644 --- a/lib/rainbows/base.rb +++ b/lib/rainbows/base.rb @@ -2,7 +2,8 @@ module Rainbows - # base class for Rainbows concurrency models + # base class for Rainbows concurrency models, this is currently + # used by ThreadSpawn and ThreadPool models module Base include Unicorn @@ -41,10 +42,10 @@ module Rainbows buf = client.readpartial(CHUNK_SIZE) hp = HttpParser.new env = {} + alive = true remote_addr = TCPSocket === client ? client.peeraddr.last : LOCALHOST begin # loop - Thread.current[:t] = Time.now while ! hp.headers(env, buf) buf << client.readpartial(CHUNK_SIZE) end @@ -61,9 +62,10 @@ module Rainbows response = app.call(env) end - out = [ hp.keepalive? ? CONN_ALIVE : CONN_CLOSE ] if hp.headers? + alive = hp.keepalive? && ! Thread.current[:quit] + out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if hp.headers? HttpResponse.write(client, response, out) - end while hp.keepalive? and hp.reset.nil? and env.clear + end while alive and hp.reset.nil? and env.clear client.close # if we get any error, try to write something back to the client # assuming we haven't closed the socket, but don't get hung up @@ -79,6 +81,22 @@ module Rainbows logger.error e.backtrace.join("\n") end + def join_threads(threads) + logger.info "Joining threads..." + threads.each { |thr| thr[:quit] = true } + t0 = Time.now + timeleft = timeout * 2.0 + m = 0 + while (nr = threads.count { |thr| thr.alive? }) > 0 && timeleft > 0 + threads.each { |thr| + worker.tmp.chmod(m = 0 == m ? 1 : 0) + thr.join(1) + break if (timeleft -= (Time.now - t0)) < 0 + } + end + logger.info "Done joining threads. #{nr} left running" + end + def self.included(klass) klass.const_set :LISTENERS, HttpServer::LISTENERS end diff --git a/lib/rainbows/thread_pool.rb b/lib/rainbows/thread_pool.rb index c26f47b..647436b 100644 --- a/lib/rainbows/thread_pool.rb +++ b/lib/rainbows/thread_pool.rb @@ -20,63 +20,38 @@ module Rainbows def worker_loop(worker) init_worker_process(worker) - threads = ThreadGroup.new - alive = worker.tmp + pool = (1..worker_connections).map { new_worker_thread } m = 0 while LISTENERS.first && master_pid == Process.ppid - maintain_thread_count(threads) - threads.list.each do |thr| - alive.chmod(m = 0 == m ? 1 : 0) + pool.each do |thr| + worker.tmp.chmod(m = 0 == m ? 1 : 0) + # if any worker dies, something is serious wrong, bail thr.join(timeout) and break end end - join_worker_threads(threads) - end - - def join_worker_threads(threads) - logger.info "Joining worker threads..." - t0 = Time.now - timeleft = timeout - threads.list.each { |thr| - thr.join(timeleft) - timeleft -= (Time.now - t0) - } - logger.info "Done joining worker threads." - end - - def maintain_thread_count(threads) - threads.list.each do |thr| - next if (Time.now - (thr[:t] || next)) < timeout - thr.kill - logger.error "killed #{thr.inspect} for being too old" - end - - while threads.list.size < worker_connections - threads.add(new_worker_thread) - end + join_threads(threads) end def new_worker_thread Thread.new { begin - ret = begin - Thread.current[:t] = Time.now - IO.select(LISTENERS, nil, nil, timeout) or next + begin + ret = IO.select(LISTENERS, nil, nil, timeout) or next + ret.first.each do |sock| + begin + process_client(sock.accept_nonblock) + rescue Errno::EAGAIN, Errno::ECONNABORTED + end + end rescue Errno::EINTR - retry + next rescue Errno::EBADF, TypeError return end - ret.first.each do |sock| - begin - process_client(sock.accept_nonblock) - rescue Errno::EAGAIN, Errno::ECONNABORTED - end - end rescue Object => e listen_loop_error(e) if LISTENERS.first - end while LISTENERS.first + end while ! Thread.current[:quit] && LISTENERS.first } end diff --git a/lib/rainbows/thread_spawn.rb b/lib/rainbows/thread_spawn.rb index c9fd23c..77cc3f2 100644 --- a/lib/rainbows/thread_spawn.rb +++ b/lib/rainbows/thread_spawn.rb @@ -29,45 +29,28 @@ module Rainbows rescue Errno::EBADF, TypeError break end + alive.chmod(m = 0 == m ? 1 : 0) ret.first.each do |l| - nuke_old_thread(threads, limit) - c = begin - l.accept_nonblock + # Sleep if we're busy, another less busy worker process may + # take it for us if we sleep. This is gross but other options + # still suck because they require expensive/complicated + # synchronization primitives for _every_ case, not just this + # unlikely one. Since this case is (or should be) uncommon, + # just busy wait when we have to. + while threads.list.size > limit # unlikely + sleep(0.1) # hope another process took it + break # back to IO.select + end + begin + threads.add(Thread.new(l.accept_nonblock) {|c| process_client(c) }) rescue Errno::EAGAIN, Errno::ECONNABORTED - next end - threads.add(Thread.new(c) { |c| process_client(c) }) end rescue Object => e - listen_loop_error(e) if alive - end while alive && master_pid == Process.ppid - join_spawned_threads(threads) - end - - def nuke_old_thread(threads, limit) - while (list = threads.list).size > limit - list.each do |thr| - thr.alive? or return # it _just_ died, we don't need it - next if (age = (Time.now - (thr[:t] || next))) < timeout - thr.kill # no-op if already dead - logger.error "killed #{thr.inspect} for being too old: #{age}" - return - end - # nothing to kill, yield to another thread - Thread.pass - end - end - - def join_spawned_threads(threads) - logger.info "Joining spawned threads..." - t0 = Time.now - timeleft = timeout - threads.list.each { |thr| - thr.join(timeleft) - timeleft -= (Time.now - t0) - } - logger.info "Done joining spawned threads." + listen_loop_error(e) if LISTENERS.first + end while LISTENERS.first && master_pid == Process.ppid + join_threads(threads.list) end end -- cgit v1.2.3-24-ge0c7