diff options
author | Eric Wong <normalperson@yhbt.net> | 2010-05-26 22:20:57 +0000 |
---|---|---|
committer | Eric Wong <normalperson@yhbt.net> | 2010-05-26 22:34:42 +0000 |
commit | 58661617ab802010ecbc45ce3afbca1d63cb9189 (patch) | |
tree | 6015af3842630d5b50278689630497810103e816 /lib | |
parent | c6ecda097af9cc559b2d38b01ae23daf733b3786 (diff) | |
download | rainbows-58661617ab802010ecbc45ce3afbca1d63cb9189.tar.gz |
Diffstat (limited to 'lib')
-rw-r--r-- | lib/rainbows.rb | 1 | ||||
-rw-r--r-- | lib/rainbows/base.rb | 2 | ||||
-rw-r--r-- | lib/rainbows/writer_thread_spawn.rb | 104 |
3 files changed, 107 insertions, 0 deletions
diff --git a/lib/rainbows.rb b/lib/rainbows.rb index f01c942..41d436e 100644 --- a/lib/rainbows.rb +++ b/lib/rainbows.rb @@ -135,6 +135,7 @@ module Rainbows MODEL_WORKER_CONNECTIONS = { :Base => 1, # this one can't change :WriterThreadPool => 20, + :WriterThreadSpawn => 1, :Revactor => 50, :ThreadSpawn => 30, :ThreadPool => 20, diff --git a/lib/rainbows/base.rb b/lib/rainbows/base.rb index faec951..a773722 100644 --- a/lib/rainbows/base.rb +++ b/lib/rainbows/base.rb @@ -49,6 +49,8 @@ module Rainbows end end + module_function :write_body + # once a client is accepted, it is processed in its entirety here # in 3 easy steps: read request, call app, write app response # this is used by synchronous concurrency models diff --git a/lib/rainbows/writer_thread_spawn.rb b/lib/rainbows/writer_thread_spawn.rb new file mode 100644 index 0000000..3b1356a --- /dev/null +++ b/lib/rainbows/writer_thread_spawn.rb @@ -0,0 +1,104 @@ +# -*- encoding: binary -*- +require 'thread' +module Rainbows + + # This concurrency model implements a single-threaded app dispatch and + # spawns a new thread for writing responses. This concurrency model + # should be ideal for apps that serve large responses or stream + # responses slowly. + # + # Unlike most \Rainbows! concurrency models, WriterThreadSpawn is + # designed to run behind nginx just like Unicorn is. This concurrency + # model may be useful for existing Unicorn users looking for more + # output concurrency than socket buffers can provide while still + # maintaining a single-threaded application dispatch (though if the + # response body is generated on-the-fly, it must be thread safe). + # + # For serving large or streaming responses, setting + # "proxy_buffering off" in nginx is recommended. If your application + # does not handle uploads, then using any HTTP-aware proxy like + # haproxy is fine. Using a non-HTTP-aware proxy will leave you + # vulnerable to slow client denial-of-service attacks. + + module WriterThreadSpawn + include Base + + CUR = {} + + # used to wrap a BasicSocket to use with +q+ for all writes + # this is compatible with IO.select + class MySocket < Struct.new(:to_io, :q, :thr) + def readpartial(size, buf = "") + to_io.readpartial(size, buf) + end + + def write_nonblock(buf) + to_io.write_nonblock(buf) + end + + def queue_writer + q = Queue.new + self.thr = Thread.new(to_io, q) do |io, q| + while response = q.shift + begin + arg1, arg2 = response + case arg1 + when :body then Base.write_body(io, arg2) + when :close + io.close unless io.closed? + break + else + io.write(arg1) + end + rescue => e + Error.app(e) + end + end + CUR.delete(Thread.current) + end + CUR[thr] = q + end + + def write(buf) + (self.q ||= queue_writer) << buf + end + + def write_body(body) + (self.q ||= queue_writer) << [ :body, body ] + end + + def close + if q + q << :close + else + to_io.close + end + end + + def closed? + false + end + end + + if IO.respond_to?(:copy_stream) + undef_method :write_body + + def write_body(my_sock, body) + my_sock.write_body(body) + end + end + + def process_client(client) + super(MySocket[client]) + end + + def worker_loop(worker) + super(worker) # accept loop from Unicorn + CUR.delete_if do |t,q| + q << nil + G.tick + t.alive? ? thr.join(0.01) : true + end until CUR.empty? + end + end +end |