From a3d3d13711869d420b4473d492bd788ebe493053 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 27 Dec 2010 02:58:31 +0000 Subject: thread_*: unindent Hopefully this will make our code easier to follow. --- lib/rainbows/thread_pool.rb | 136 +++++++++++++++++++++---------------------- lib/rainbows/thread_spawn.rb | 82 +++++++++++++------------- 2 files changed, 106 insertions(+), 112 deletions(-) diff --git a/lib/rainbows/thread_pool.rb b/lib/rainbows/thread_pool.rb index 321d3e4..f6420ae 100644 --- a/lib/rainbows/thread_pool.rb +++ b/lib/rainbows/thread_pool.rb @@ -1,82 +1,78 @@ # -*- encoding: binary -*- -module Rainbows +# Implements a worker thread pool model. This is suited for platforms +# like Ruby 1.9, where the cost of dynamically spawning a new thread +# for every new client connection is higher than with the ThreadSpawn +# model. +# +# This model should provide a high level of compatibility with all +# Ruby implementations, and most libraries and applications. +# Applications running under this model should be thread-safe +# but not necessarily reentrant. +# +# Applications using this model are required to be thread-safe. +# Threads are never spawned dynamically under this model. If you're +# connecting to external services and need to perform DNS lookups, +# consider using the "resolv-replace" library which replaces parts of +# the core Socket package with concurrent DNS lookup capabilities. +# +# This model probably less suited for many slow clients than the +# others and thus a lower +worker_connections+ setting is recommended. - # Implements a worker thread pool model. This is suited for platforms - # like Ruby 1.9, where the cost of dynamically spawning a new thread - # for every new client connection is higher than with the ThreadSpawn - # model. - # - # This model should provide a high level of compatibility with all - # Ruby implementations, and most libraries and applications. - # Applications running under this model should be thread-safe - # but not necessarily reentrant. - # - # Applications using this model are required to be thread-safe. - # Threads are never spawned dynamically under this model. If you're - # connecting to external services and need to perform DNS lookups, - # consider using the "resolv-replace" library which replaces parts of - # the core Socket package with concurrent DNS lookup capabilities. - # - # This model probably less suited for many slow clients than the - # others and thus a lower +worker_connections+ setting is recommended. +module Rainbows::ThreadPool + include Rainbows::Base - module ThreadPool - include Base - - def worker_loop(worker) # :nodoc: - init_worker_process(worker) - pool = (1..worker_connections).map do - Thread.new { LISTENERS.size == 1 ? sync_worker : async_worker } - end - - while G.alive - # if any worker dies, something is serious wrong, bail - pool.each do |thr| - G.tick or break - thr.join(1) and G.quit! - end - end - join_threads(pool) + def worker_loop(worker) # :nodoc: + init_worker_process(worker) + pool = (1..worker_connections).map do + Thread.new { LISTENERS.size == 1 ? sync_worker : async_worker } end - def sync_worker # :nodoc: - s = LISTENERS[0] - begin - c = s.kgio_accept and process_client(c) - rescue => e - Error.listen_loop(e) - end while G.alive + while G.alive + # if any worker dies, something is serious wrong, bail + pool.each do |thr| + G.tick or break + thr.join(1) and G.quit! + end end + join_threads(pool) + end - def async_worker # :nodoc: - begin - # TODO: check if select() or accept() is a problem on large - # SMP systems under Ruby 1.9. Hundreds of native threads - # all working off the same socket could be a thundering herd - # problem. On the other hand, a thundering herd may not - # even incur as much overhead as an extra Mutex#synchronize - ret = IO.select(LISTENERS, nil, nil, 1) and ret[0].each do |s| - s = s.kgio_tryaccept and process_client(s) - end - rescue Errno::EINTR - rescue => e - Error.listen_loop(e) - end while G.alive - end + def sync_worker # :nodoc: + s = LISTENERS[0] + begin + c = s.kgio_accept and process_client(c) + rescue => e + Rainbows::Error.listen_loop(e) + end while G.alive + end - def join_threads(threads) # :nodoc: - G.quit! - threads.delete_if do |thr| - G.tick - begin - thr.run - thr.join(0.01) - rescue - true - end - end until threads.empty? - end + def async_worker # :nodoc: + begin + # TODO: check if select() or accept() is a problem on large + # SMP systems under Ruby 1.9. Hundreds of native threads + # all working off the same socket could be a thundering herd + # problem. On the other hand, a thundering herd may not + # even incur as much overhead as an extra Mutex#synchronize + ret = IO.select(LISTENERS, nil, nil, 1) and ret[0].each do |s| + s = s.kgio_tryaccept and process_client(s) + end + rescue Errno::EINTR + rescue => e + Rainbows::Error.listen_loop(e) + end while G.alive + end + def join_threads(threads) # :nodoc: + G.quit! + threads.delete_if do |thr| + G.tick + begin + thr.run + thr.join(0.01) + rescue + true + end + end until threads.empty? end end diff --git a/lib/rainbows/thread_spawn.rb b/lib/rainbows/thread_spawn.rb index 9da75f1..acdaa69 100644 --- a/lib/rainbows/thread_spawn.rb +++ b/lib/rainbows/thread_spawn.rb @@ -1,54 +1,52 @@ # -*- encoding: binary -*- require 'thread' -module Rainbows - # Spawns a new thread for every client connection we accept(). This - # model is recommended for platforms like Ruby 1.8 where spawning new - # threads is inexpensive. - # - # This model should provide a high level of compatibility with all - # Ruby implementations, and most libraries and applications. - # Applications running under this model should be thread-safe - # but not necessarily reentrant. - # - # If you're connecting to external services and need to perform DNS - # lookups, consider using the "resolv-replace" library which replaces - # parts of the core Socket package with concurrent DNS lookup - # capabilities +# Spawns a new thread for every client connection we accept(). This +# model is recommended for platforms like Ruby 1.8 where spawning new +# threads is inexpensive. +# +# This model should provide a high level of compatibility with all +# Ruby implementations, and most libraries and applications. +# Applications running under this model should be thread-safe +# but not necessarily reentrant. +# +# If you're connecting to external services and need to perform DNS +# lookups, consider using the "resolv-replace" library which replaces +# parts of the core Socket package with concurrent DNS lookup +# capabilities - module ThreadSpawn - include Base - include Rainbows::WorkerYield +module Rainbows::ThreadSpawn + include Rainbows::Base + include Rainbows::WorkerYield - def accept_loop(klass) #:nodoc: - lock = Mutex.new - limit = worker_connections - LISTENERS.each do |l| - klass.new(l) do |l| - begin - if lock.synchronize { G.cur >= limit } - worker_yield - elsif c = l.kgio_accept - klass.new(c) do |c| - begin - lock.synchronize { G.cur += 1 } - process_client(c) - ensure - lock.synchronize { G.cur -= 1 } - end + def accept_loop(klass) #:nodoc: + lock = Mutex.new + limit = worker_connections + LISTENERS.each do |l| + klass.new(l) do |l| + begin + if lock.synchronize { G.cur >= limit } + worker_yield + elsif c = l.kgio_accept + klass.new(c) do |c| + begin + lock.synchronize { G.cur += 1 } + process_client(c) + ensure + lock.synchronize { G.cur -= 1 } end end - rescue => e - Error.listen_loop(e) - end while G.alive - end + end + rescue => e + Rainbows::Error.listen_loop(e) + end while G.alive end - sleep 1 while G.tick || lock.synchronize { G.cur > 0 } end + sleep 1 while G.tick || lock.synchronize { G.cur > 0 } + end - def worker_loop(worker) #:nodoc: - init_worker_process(worker) - accept_loop(Thread) - end + def worker_loop(worker) #:nodoc: + init_worker_process(worker) + accept_loop(Thread) end end -- cgit v1.2.3-24-ge0c7