From 154e7af0225a0375274991ee7bd1fc8ad22c1c37 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Tue, 1 Dec 2009 23:39:32 -0800 Subject: add RevThreadPool concurrency model This should be like RevThreadSpawn except with more predictable performance (but higher memory usage under low load). --- lib/rainbows/rev/master.rb | 29 ++++++++++++++ lib/rainbows/rev/thread.rb | 53 +++++++++++++++++++++++++ lib/rainbows/rev_thread_pool.rb | 75 ++++++++++++++++++++++++++++++++++++ lib/rainbows/rev_thread_spawn.rb | 83 +++++++--------------------------------- 4 files changed, 170 insertions(+), 70 deletions(-) create mode 100644 lib/rainbows/rev/master.rb create mode 100644 lib/rainbows/rev/thread.rb create mode 100644 lib/rainbows/rev_thread_pool.rb (limited to 'lib/rainbows') diff --git a/lib/rainbows/rev/master.rb b/lib/rainbows/rev/master.rb new file mode 100644 index 0000000..5c112c6 --- /dev/null +++ b/lib/rainbows/rev/master.rb @@ -0,0 +1,29 @@ +# -*- encoding: binary -*- +require 'rainbows/rev' + +RUBY_VERSION =~ %r{\A1\.8} && ::Rev::VERSION < "0.3.2" and + warn "Rainbows::RevThreadSpawn + Rev (< 0.3.2)" \ + " does not work well under Ruby 1.8" + +module Rainbows + + module Rev + class Master < ::Rev::AsyncWatcher + + def initialize(queue) + super() + @queue = queue + end + + def <<(output) + @queue << output + signal + end + + def on_signal + client, response = @queue.pop + client.response_write(response) + end + end + end +end diff --git a/lib/rainbows/rev/thread.rb b/lib/rainbows/rev/thread.rb new file mode 100644 index 0000000..8fa43ac --- /dev/null +++ b/lib/rainbows/rev/thread.rb @@ -0,0 +1,53 @@ +# -*- encoding: binary -*- +require 'thread' +require 'rainbows/rev/master' + +module Rainbows + module Rev + + class ThreadClient < Client + + def app_call + KATO.delete(self) + disable + @env[RACK_INPUT] = @input + @input = nil # not sure why, @input seems to get closed otherwise... + app_dispatch # must be implemented by subclass + end + + # this is only called in the master thread + def response_write(response) + enable + alive = @hp.keepalive? && G.alive + out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if @hp.headers? + DeferredResponse.write(self, response, out) + return quit unless alive && G.alive + + @env.clear + @hp.reset + @state = :headers + # keepalive requests are always body-less, so @input is unchanged + if @hp.headers(@env, @buf) + @input = HttpRequest::NULL_IO + app_call + else + KATO[self] = Time.now + end + end + + # fails-safe application dispatch, we absolutely cannot + # afford to fail or raise an exception (killing the thread) + # here because that could cause a deadlock and we'd leak FDs + def app_response + begin + @env[REMOTE_ADDR] = @remote_addr + APP.call(@env.update(RACK_DEFAULTS)) + rescue => e + Error.app(e) # we guarantee this does not raise + [ 500, {}, [] ] + end + end + + end + end +end diff --git a/lib/rainbows/rev_thread_pool.rb b/lib/rainbows/rev_thread_pool.rb new file mode 100644 index 0000000..47b451e --- /dev/null +++ b/lib/rainbows/rev_thread_pool.rb @@ -0,0 +1,75 @@ +# -*- encoding: binary -*- +require 'rainbows/rev/thread' + +module Rainbows + + # A combination of the Rev and ThreadPool models. This allows Ruby + # Thread-based concurrency for application processing. It DOES NOT + # expose a streamable "rack.input" for upload processing within the + # app. DevFdResponse should be used with this class to proxy + # asynchronous responses. All network I/O between the client and + # server are handled by the main thread and outside of the core + # application dispatch. + # + # Unlike ThreadPool, Rev makes this model highly suitable for + # slow clients and applications with medium-to-slow response times + # (I/O bound), but less suitable for sleepy applications. + # + # WARNING: this model does not currently perform well under 1.8 with + # Rev 0.3.1. Rev 0.3.2 should include significant performance + # improvements under Ruby 1.8. + + module RevThreadPool + + DEFAULTS = { + :pool_size => 10, # same default size as ThreadPool (w/o Rev) + } + + def self.setup + DEFAULTS.each { |k,v| O[k] ||= v } + Integer === O[:pool_size] && O[:pool_size] > 0 or + raise ArgumentError, "pool_size must a be an Integer > 0" + end + + class PoolWatcher < ::Rev::TimerWatcher + def initialize(threads) + @threads = threads + super(G.server.timeout, true) + end + + def on_timer + @threads.each { |t| t.join(0) and G.quit! } + end + end + + class Client < Rainbows::Rev::ThreadClient + def app_dispatch + QUEUE << self + end + end + + include Rainbows::Rev::Core + + def init_worker_threads(master, queue) + O[:pool_size].times.map do + Thread.new do + begin + client = queue.pop + master << [ client, client.app_response ] + rescue => e + Error.listen_loop(e) + end while true + end + end + end + + def init_worker_process(worker) + super + master = Rev::Master.new(Queue.new).attach(::Rev::Loop.default) + queue = Client.const_set(:QUEUE, Queue.new) + threads = init_worker_threads(master, queue) + PoolWatcher.new(threads).attach(::Rev::Loop.default) + logger.info "RevThreadPool pool_size=#{O[:pool_size]}" + end + end +end diff --git a/lib/rainbows/rev_thread_spawn.rb b/lib/rainbows/rev_thread_spawn.rb index 0bfeb36..00d8b6b 100644 --- a/lib/rainbows/rev_thread_spawn.rb +++ b/lib/rainbows/rev_thread_spawn.rb @@ -1,9 +1,5 @@ # -*- encoding: binary -*- -require 'rainbows/rev' - -RUBY_VERSION =~ %r{\A1\.8} && ::Rev::VERSION < "0.3.2" and - warn "Rainbows::RevThreadSpawn + Rev (< 0.3.2)" \ - " does not work well under Ruby 1.8" +require 'rainbows/rev/thread' module Rainbows @@ -15,73 +11,19 @@ module Rainbows # server are handled by the main thread and outside of the core # application dispatch. # - # WARNING: this model does not currently perform well under 1.8. See the - # {rev-talk mailing list}[http://rubyforge.org/mailman/listinfo/rev-talk] - # for ongoing performance work that will hopefully make it into the - # next release of {Rev}[http://rev.rubyforge.org/]. + # Unlike ThreadSpawn, Rev makes this model highly suitable for + # slow clients and applications with medium-to-slow response times + # (I/O bound), but less suitable for sleepy applications. + # + # WARNING: this model does not currently perform well under 1.8 with + # Rev 0.3.1. Rev 0.3.2 should include significant performance + # improvements under Ruby 1.8. module RevThreadSpawn - class Master < ::Rev::AsyncWatcher - - def initialize - super - @queue = Queue.new - end - - def <<(output) - @queue << output - signal - end - - def on_signal - client, response = @queue.pop - client.response_write(response) - end - end - - class Client < Rainbows::Rev::Client - DR = Rainbows::Rev::DeferredResponse - KATO = Rainbows::Rev::KATO - - def response_write(response) - enable - alive = @hp.keepalive? && G.alive - out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if @hp.headers? - DR.write(self, response, out) - return quit unless alive && G.alive - - @env.clear - @hp.reset - @state = :headers - # keepalive requests are always body-less, so @input is unchanged - if @hp.headers(@env, @buf) - @input = HttpRequest::NULL_IO - app_call - else - KATO[self] = Time.now - end - end - - # fails-safe application dispatch, we absolutely cannot - # afford to fail or raise an exception (killing the thread) - # here because that could cause a deadlock and we'd leak FDs - def app_response - begin - @env[REMOTE_ADDR] = @remote_addr - APP.call(@env.update(RACK_DEFAULTS)) - rescue => e - Error.app(e) # we guarantee this does not raise - [ 500, {}, [] ] - end - end - - def app_call - KATO.delete(client = self) - disable - @env[RACK_INPUT] = @input - @input = nil # not sure why, @input seems to get closed otherwise... - Thread.new { MASTER << [ client, app_response ] } + class Client < Rainbows::Rev::ThreadClient + def app_dispatch + Thread.new(self) { |client| MASTER << [ client, app_response ] } end end @@ -89,7 +31,8 @@ module Rainbows def init_worker_process(worker) super - Client.const_set(:MASTER, Master.new.attach(::Rev::Loop.default)) + master = Rev::Master.new(Queue.new).attach(::Rev::Loop.default) + Client.const_set(:MASTER, master) end end -- cgit v1.2.3-24-ge0c7