From 6ae020c9ac483d822902b5d33f038f79b44d3a50 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 27 Dec 2010 02:30:58 +0000 Subject: writer_thread_*: split out classes into separate files Use a consistent "Client" naming to reduce confusion --- lib/rainbows/response.rb | 2 +- lib/rainbows/writer_thread_pool.rb | 22 ++-------- lib/rainbows/writer_thread_pool/client.rb | 19 ++++++++ lib/rainbows/writer_thread_spawn.rb | 70 +++--------------------------- lib/rainbows/writer_thread_spawn/client.rb | 60 +++++++++++++++++++++++++ 5 files changed, 89 insertions(+), 84 deletions(-) create mode 100644 lib/rainbows/writer_thread_pool/client.rb create mode 100644 lib/rainbows/writer_thread_spawn/client.rb (limited to 'lib/rainbows') diff --git a/lib/rainbows/response.rb b/lib/rainbows/response.rb index ac09d6c..8be4177 100644 --- a/lib/rainbows/response.rb +++ b/lib/rainbows/response.rb @@ -38,7 +38,7 @@ module Rainbows::Response range_class = body_class = klass case Rainbows::Const::RACK_DEFAULTS['rainbows.model'] when :WriterThreadSpawn - body_class = Rainbows::WriterThreadSpawn::MySocket + body_class = Rainbows::WriterThreadSpawn::Client range_class = Rainbows::HttpServer when :EventMachine, :NeverBlock range_class = nil # :< diff --git a/lib/rainbows/writer_thread_pool.rb b/lib/rainbows/writer_thread_pool.rb index e4e0228..6896787 100644 --- a/lib/rainbows/writer_thread_pool.rb +++ b/lib/rainbows/writer_thread_pool.rb @@ -20,24 +20,6 @@ module Rainbows::WriterThreadPool # :stopdoc: include Rainbows::Base - # used to wrap a BasicSocket to use with +q+ for all writes - # this is compatible with IO.select - class QueueSocket < Struct.new(:to_io, :q) # :nodoc: - include Rainbows::SocketProxy - - def write(buf) - q << [ to_io, buf ] - end - - def close - q << [ to_io, :close ] - end - - def closed? - false - end - end - @@nr = 0 @@q = nil @@ -47,7 +29,7 @@ module Rainbows::WriterThreadPool def process_client(client) # :nodoc: @@nr += 1 - super(QueueSocket.new(client, @@q[@@nr %= @@q.size])) + super(Client.new(client, @@q[@@nr %= @@q.size])) end def init_worker_process(worker) @@ -82,3 +64,5 @@ module Rainbows::WriterThreadPool end # :startdoc: end +# :enddoc: +require 'rainbows/writer_thread_pool/client' diff --git a/lib/rainbows/writer_thread_pool/client.rb b/lib/rainbows/writer_thread_pool/client.rb new file mode 100644 index 0000000..5889e84 --- /dev/null +++ b/lib/rainbows/writer_thread_pool/client.rb @@ -0,0 +1,19 @@ +# -*- encoding: binary -*- +# +# used to wrap a BasicSocket to use with +q+ for all writes +# this is compatible with IO.select +class Rainbows::WriterThreadPool::Client < Struct.new(:to_io, :q) # :nodoc: + include Rainbows::SocketProxy + + def write(buf) + q << [ to_io, buf ] + end + + def close + q << [ to_io, :close ] + end + + def closed? + false + end +end diff --git a/lib/rainbows/writer_thread_spawn.rb b/lib/rainbows/writer_thread_spawn.rb index 4215254..a11e82c 100644 --- a/lib/rainbows/writer_thread_spawn.rb +++ b/lib/rainbows/writer_thread_spawn.rb @@ -22,82 +22,24 @@ module Rainbows::WriterThreadSpawn # :stopdoc: include Rainbows::Base - CUR = {} # :nodoc: - - # used to wrap a BasicSocket to use with +q+ for all writes - # this is compatible with IO.select - class MySocket < Struct.new(:to_io, :q, :thr) # :nodoc: all - include Rainbows::Response - include Rainbows::SocketProxy - - def queue_writer - # not using Thread.pass here because that spins the CPU during - # I/O wait and will eat cycles from other worker processes. - until CUR.size < MAX - CUR.delete_if { |t,_| - t.alive? ? t.join(0) : true - }.size >= MAX and sleep(0.01) - end - - q = Queue.new - self.thr = Thread.new(to_io, q) do |io, q| - while response = q.shift - begin - arg1, arg2, arg3 = response - case arg1 - when :body then write_body(io, arg2, arg3) - when :close - io.close unless io.closed? - break - else - io.write(arg1) - end - rescue => e - Error.write(io, e) - end - end - CUR.delete(Thread.current) - end - CUR[thr] = q - end - - def write(buf) - (self.q ||= queue_writer) << buf - end - - def queue_body(body, range) - (self.q ||= queue_writer) << [ :body, body, range ] - end - - def close - if q - q << :close - else - to_io.close - end - end - - def closed? - false - end - end - def write_body(my_sock, body, range) # :nodoc: my_sock.queue_body(body, range) end def process_client(client) # :nodoc: - super(MySocket[client]) + super(Client.new(client)) end def worker_loop(worker) # :nodoc: - MySocket.const_set(:MAX, worker_connections) + Client.const_set(:MAX, worker_connections) super(worker) # accept loop from Unicorn - CUR.delete_if do |t,q| + Client::CUR.delete_if do |t,q| q << nil G.tick t.alive? ? t.join(0.01) : true - end until CUR.empty? + end until Client::CUR.empty? end # :startdoc: end +# :enddoc: +require 'rainbows/writer_thread_spawn/client' diff --git a/lib/rainbows/writer_thread_spawn/client.rb b/lib/rainbows/writer_thread_spawn/client.rb new file mode 100644 index 0000000..4341b9a --- /dev/null +++ b/lib/rainbows/writer_thread_spawn/client.rb @@ -0,0 +1,60 @@ +# -*- encoding: binary -*- +# used to wrap a BasicSocket to use with +q+ for all writes +# this is compatible with IO.select +class Rainbows::WriterThreadSpawn::Client < Struct.new(:to_io, :q, :thr) + include Rainbows::Response + include Rainbows::SocketProxy + + CUR = {} # :nodoc: + + def queue_writer + # not using Thread.pass here because that spins the CPU during + # I/O wait and will eat cycles from other worker processes. + until CUR.size < MAX + CUR.delete_if { |t,_| + t.alive? ? t.join(0) : true + }.size >= MAX and sleep(0.01) + end + + q = Queue.new + self.thr = Thread.new(to_io, q) do |io, q| + while response = q.shift + begin + arg1, arg2, arg3 = response + case arg1 + when :body then write_body(io, arg2, arg3) + when :close + io.close unless io.closed? + break + else + io.write(arg1) + end + rescue => e + Rainbows::Error.write(io, e) + end + end + CUR.delete(Thread.current) + end + CUR[thr] = q + end + + def write(buf) + (self.q ||= queue_writer) << buf + end + + def queue_body(body, range) + (self.q ||= queue_writer) << [ :body, body, range ] + end + + def close + if q + q << :close + else + to_io.close + end + end + + def closed? + false + end +end -- cgit v1.2.3-24-ge0c7