From 90a86c9822238f01e8d60c9303b9a0da64351c7f Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Sat, 5 Feb 2011 10:44:52 +0000 Subject: *epoll: refactor common loop code acceptor thread pools could use some work, still --- lib/rainbows.rb | 1 + lib/rainbows/epoll.rb | 19 +++++++++++++++---- lib/rainbows/epoll/server.rb | 12 +++--------- lib/rainbows/join_threads.rb | 18 ++++++++++++++++++ lib/rainbows/thread_pool.rb | 2 +- lib/rainbows/xepoll/client.rb | 24 ++++-------------------- 6 files changed, 42 insertions(+), 34 deletions(-) create mode 100644 lib/rainbows/join_threads.rb diff --git a/lib/rainbows.rb b/lib/rainbows.rb index 480f2d6..dac2952 100644 --- a/lib/rainbows.rb +++ b/lib/rainbows.rb @@ -144,6 +144,7 @@ module Rainbows autoload :WorkerYield, 'rainbows/worker_yield' autoload :SyncClose, 'rainbows/sync_close' autoload :ReverseProxy, 'rainbows/reverse_proxy' + autoload :JoinThreads, 'rainbows/join_threads' end require 'rainbows/error' diff --git a/lib/rainbows/epoll.rb b/lib/rainbows/epoll.rb index b567142..075fcfb 100644 --- a/lib/rainbows/epoll.rb +++ b/lib/rainbows/epoll.rb @@ -12,11 +12,22 @@ module Rainbows::Epoll autoload :Client, 'rainbows/epoll/client' autoload :ResponsePipe, 'rainbows/epoll/response_pipe' autoload :ResponseChunkPipe, 'rainbows/epoll/response_chunk_pipe' + class << self + attr_writer :nr_clients + end - def self.rerun - while obj = ReRun.shift - obj.epoll_run - end + def self.loop + timeout = Rainbows.server.timeout + begin + EP.wait(nil, timeout) { |flags, obj| obj.epoll_run } + while obj = ReRun.shift + obj.epoll_run + end + Rainbows::Epoll::Client.expire + rescue Errno::EINTR + rescue => e + Rainbows::Error.listen_loop(e) + end while Rainbows.tick || @nr_clients.call > 0 end def init_worker_process(worker) diff --git a/lib/rainbows/epoll/server.rb b/lib/rainbows/epoll/server.rb index a8be3e2..96b3308 100644 --- a/lib/rainbows/epoll/server.rb +++ b/lib/rainbows/epoll/server.rb @@ -1,8 +1,9 @@ # -*- encoding: binary -*- # :nodoc: module Rainbows::Epoll::Server - IN = SleepyPenguin::Epoll::IN | SleepyPenguin::Epoll::ET @@nr = 0 + Rainbows::Epoll.nr_clients = lambda { @@nr } + IN = SleepyPenguin::Epoll::IN | SleepyPenguin::Epoll::ET MAX = Rainbows.server.worker_connections THRESH = MAX - 1 LISTENERS = Rainbows::HttpServer::LISTENERS @@ -10,14 +11,7 @@ module Rainbows::Epoll::Server def self.run LISTENERS.each { |sock| EP.add(sock.extend(self), IN) } - begin - EP.wait(nil, 1000) { |_, obj| obj.epoll_run } - Rainbows::Epoll.rerun - Rainbows::Epoll::Client.expire - rescue Errno::EINTR - rescue => e - Rainbows::Error.listen_loop(e) - end while Rainbows.tick || @@nr > 0 + Rainbows::Epoll.loop end # rearms all listeners when there's a free slot diff --git a/lib/rainbows/join_threads.rb b/lib/rainbows/join_threads.rb new file mode 100644 index 0000000..6636e7c --- /dev/null +++ b/lib/rainbows/join_threads.rb @@ -0,0 +1,18 @@ +# -*- encoding: binary -*- +# :nodoc: +# This module only gets loaded on shutdown +module Rainbows::JoinThreads + + # blocking acceptor threads must be forced to run + def self.acceptors(threads) + threads.delete_if do |thr| + Rainbows.tick + begin + thr.run + thr.join(0.01) + rescue + true + end + end until threads.empty? + end +end diff --git a/lib/rainbows/thread_pool.rb b/lib/rainbows/thread_pool.rb index 3b8e68e..8f2b629 100644 --- a/lib/rainbows/thread_pool.rb +++ b/lib/rainbows/thread_pool.rb @@ -35,7 +35,7 @@ module Rainbows::ThreadPool thr.join(1) and Rainbows.quit! end end - join_threads(pool) + Rainbows::JoinThreads.acceptors(pool) end def sync_worker # :nodoc: diff --git a/lib/rainbows/xepoll/client.rb b/lib/rainbows/xepoll/client.rb index 2db684e..da0a0d1 100644 --- a/lib/rainbows/xepoll/client.rb +++ b/lib/rainbows/xepoll/client.rb @@ -2,12 +2,12 @@ # :enddoc: module Rainbows::XEpoll::Client + N = Raindrops.new(1) + Rainbows::Epoll.nr_clients = lambda { N[0] } include Rainbows::Epoll::Client MAX = Rainbows.server.worker_connections THRESH = MAX - 1 EP = Rainbows::Epoll::EP - N = Raindrops.new(1) - @timeout = Rainbows.server.timeout / 2.0 THREADS = Rainbows::HttpServer::LISTENERS.map do |sock| Thread.new(sock) do |sock| sleep @@ -25,24 +25,8 @@ module Rainbows::XEpoll::Client def self.run THREADS.each { |t| t.run } - begin - EP.wait(nil, @timeout) { |flags, obj| obj.epoll_run } - Rainbows::Epoll.rerun - Rainbows::Epoll::Client.expire - rescue Errno::EINTR - rescue => e - Rainbows::Error.listen_loop(e) - end while Rainbows.tick - - THREADS.delete_if do |thr| - Rainbows.tick - begin - thr.run - thr.join(0.01) - rescue - true - end - end until THREADS.empty? + Rainbows::Epoll.loop + Rainbows::JoinThreads.acceptors(THREADS) end # only call this once -- cgit v1.2.3-24-ge0c7