From 7f2cb1b56afda847c29e1e65fe0608a6f20a0fe6 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 27 Dec 2010 02:13:32 +0000 Subject: writer_thread_*: unindent --- lib/rainbows/writer_thread_spawn.rb | 201 ++++++++++++++++++------------------ 1 file changed, 99 insertions(+), 102 deletions(-) (limited to 'lib/rainbows/writer_thread_spawn.rb') diff --git a/lib/rainbows/writer_thread_spawn.rb b/lib/rainbows/writer_thread_spawn.rb index 02ae0d5..4ee98dd 100644 --- a/lib/rainbows/writer_thread_spawn.rb +++ b/lib/rainbows/writer_thread_spawn.rb @@ -1,125 +1,122 @@ # -*- encoding: binary -*- require 'thread' -module Rainbows - - # This concurrency model implements a single-threaded app dispatch and - # spawns a new thread for writing responses. This concurrency model - # should be ideal for apps that serve large responses or stream - # responses slowly. - # - # Unlike most \Rainbows! concurrency models, WriterThreadSpawn is - # designed to run behind nginx just like Unicorn is. This concurrency - # model may be useful for existing Unicorn users looking for more - # output concurrency than socket buffers can provide while still - # maintaining a single-threaded application dispatch (though if the - # response body is generated on-the-fly, it must be thread safe). - # - # For serving large or streaming responses, setting - # "proxy_buffering off" in nginx is recommended. If your application - # does not handle uploads, then using any HTTP-aware proxy like - # haproxy is fine. Using a non-HTTP-aware proxy will leave you - # vulnerable to slow client denial-of-service attacks. - - module WriterThreadSpawn - # :stopdoc: - include Base - - CUR = {} # :nodoc: - - # used to wrap a BasicSocket to use with +q+ for all writes - # this is compatible with IO.select - class MySocket < Struct.new(:to_io, :q, :thr) # :nodoc: all - include Rainbows::Response - - def kgio_addr - to_io.kgio_addr - end +# This concurrency model implements a single-threaded app dispatch and +# spawns a new thread for writing responses. This concurrency model +# should be ideal for apps that serve large responses or stream +# responses slowly. +# +# Unlike most \Rainbows! concurrency models, WriterThreadSpawn is +# designed to run behind nginx just like Unicorn is. This concurrency +# model may be useful for existing Unicorn users looking for more +# output concurrency than socket buffers can provide while still +# maintaining a single-threaded application dispatch (though if the +# response body is generated on-the-fly, it must be thread safe). +# +# For serving large or streaming responses, setting +# "proxy_buffering off" in nginx is recommended. If your application +# does not handle uploads, then using any HTTP-aware proxy like +# haproxy is fine. Using a non-HTTP-aware proxy will leave you +# vulnerable to slow client denial-of-service attacks. + +module Rainbows::WriterThreadSpawn + # :stopdoc: + include Rainbows::Base + + CUR = {} # :nodoc: + + # used to wrap a BasicSocket to use with +q+ for all writes + # this is compatible with IO.select + class MySocket < Struct.new(:to_io, :q, :thr) # :nodoc: all + include Rainbows::Response + + def kgio_addr + to_io.kgio_addr + end - def kgio_read(size, buf = "") - to_io.kgio_read(size, buf) - end + def kgio_read(size, buf = "") + to_io.kgio_read(size, buf) + end - def kgio_read!(size, buf = "") - to_io.kgio_read!(size, buf) - end + def kgio_read!(size, buf = "") + to_io.kgio_read!(size, buf) + end - def kgio_trywrite(buf) - to_io.kgio_trywrite(buf) - end + def kgio_trywrite(buf) + to_io.kgio_trywrite(buf) + end - def timed_read(buf) - to_io.timed_read(buf) - end + def timed_read(buf) + to_io.timed_read(buf) + end - def queue_writer - # not using Thread.pass here because that spins the CPU during - # I/O wait and will eat cycles from other worker processes. - until CUR.size < MAX - CUR.delete_if { |t,_| - t.alive? ? t.join(0) : true - }.size >= MAX and sleep(0.01) - end + def queue_writer + # not using Thread.pass here because that spins the CPU during + # I/O wait and will eat cycles from other worker processes. + until CUR.size < MAX + CUR.delete_if { |t,_| + t.alive? ? t.join(0) : true + }.size >= MAX and sleep(0.01) + end - q = Queue.new - self.thr = Thread.new(to_io, q) do |io, q| - while response = q.shift - begin - arg1, arg2, arg3 = response - case arg1 - when :body then write_body(io, arg2, arg3) - when :close - io.close unless io.closed? - break - else - io.write(arg1) - end - rescue => e - Error.write(io, e) + q = Queue.new + self.thr = Thread.new(to_io, q) do |io, q| + while response = q.shift + begin + arg1, arg2, arg3 = response + case arg1 + when :body then write_body(io, arg2, arg3) + when :close + io.close unless io.closed? + break + else + io.write(arg1) end + rescue => e + Error.write(io, e) end - CUR.delete(Thread.current) end - CUR[thr] = q - end - - def write(buf) - (self.q ||= queue_writer) << buf + CUR.delete(Thread.current) end + CUR[thr] = q + end - def queue_body(body, range) - (self.q ||= queue_writer) << [ :body, body, range ] - end + def write(buf) + (self.q ||= queue_writer) << buf + end - def close - if q - q << :close - else - to_io.close - end - end + def queue_body(body, range) + (self.q ||= queue_writer) << [ :body, body, range ] + end - def closed? - false + def close + if q + q << :close + else + to_io.close end end - def write_body(my_sock, body, range) # :nodoc: - my_sock.queue_body(body, range) + def closed? + false end + end - def process_client(client) # :nodoc: - super(MySocket[client]) - end + def write_body(my_sock, body, range) # :nodoc: + my_sock.queue_body(body, range) + end - def worker_loop(worker) # :nodoc: - MySocket.const_set(:MAX, worker_connections) - super(worker) # accept loop from Unicorn - CUR.delete_if do |t,q| - q << nil - G.tick - t.alive? ? t.join(0.01) : true - end until CUR.empty? - end - # :startdoc: + def process_client(client) # :nodoc: + super(MySocket[client]) + end + + def worker_loop(worker) # :nodoc: + MySocket.const_set(:MAX, worker_connections) + super(worker) # accept loop from Unicorn + CUR.delete_if do |t,q| + q << nil + G.tick + t.alive? ? t.join(0.01) : true + end until CUR.empty? end + # :startdoc: end -- cgit v1.2.3-24-ge0c7