From b4f1271320d38e83141dbb38463c3a368661aef7 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Sat, 7 Nov 2009 20:15:03 -0800 Subject: initial cut of the RevThreadSpawn model Seems to pass all tests, but that may only mean our test cases are lacking... --- lib/rainbows.rb | 1 + lib/rainbows/ev_thread_core.rb | 74 ++++++++++++++++++++++++++++++++++++++ lib/rainbows/rev/client.rb | 2 +- lib/rainbows/rev_thread_spawn.rb | 78 ++++++++++++++++++++++++++++++++++++++++ 4 files changed, 154 insertions(+), 1 deletion(-) create mode 100644 lib/rainbows/ev_thread_core.rb create mode 100644 lib/rainbows/rev_thread_spawn.rb (limited to 'lib') diff --git a/lib/rainbows.rb b/lib/rainbows.rb index 5bd8693..4686f2b 100644 --- a/lib/rainbows.rb +++ b/lib/rainbows.rb @@ -67,6 +67,7 @@ module Rainbows :ThreadSpawn => 30, :ThreadPool => 10, :Rev => 50, + :RevThreadSpawn => 50, :EventMachine => 50, }.each do |model, _| u = model.to_s.gsub(/([a-z0-9])([A-Z0-9])/) { "#{$1}_#{$2.downcase!}" } diff --git a/lib/rainbows/ev_thread_core.rb b/lib/rainbows/ev_thread_core.rb new file mode 100644 index 0000000..784d30a --- /dev/null +++ b/lib/rainbows/ev_thread_core.rb @@ -0,0 +1,74 @@ +# -*- encoding: binary -*- +require 'thread' # for Queue +require 'rainbows/ev_core' + +module Rainbows + + # base module for mixed Thread + evented models like RevThreadSpawn + module EvThreadCore + include EvCore + + def post_init + super + @lock = Mutex.new + @thread = nil + end + + # we pass ourselves off as a Socket to Unicorn::TeeInput and this + # is the only method Unicorn::TeeInput requires from the socket + def readpartial(length, buf = "") + buf.replace(@state.pop) + resume + buf + end + + def app_spawn(input) + begin + @thread.nil? or @thread.join # only one thread per connection + env = @env.dup + alive, headers = @hp.keepalive?, @hp.headers? + @thread = Thread.new(self) do |client| + begin + env[REMOTE_ADDR] = @remote_addr + env[RACK_INPUT] = input || TeeInput.new(client, env, @hp, @buf) + response = APP.call(env.update(RACK_DEFAULTS)) + if 100 == response.first.to_i + write(EXPECT_100_RESPONSE) + env.delete(HTTP_EXPECT) + response = APP.call(env) + end + + alive &&= G.alive + out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if headers + response_write(response, out) + rescue => e + handle_error(e) rescue nil + end + end + if alive # in case we pipeline + @hp.reset + redo if @hp.headers(@env.clear, @buf) + end + end while false + end + + def on_read(data) + case @state + when :headers + @hp.headers(@env, @buf << data) or return + if 0 == @hp.content_length + app_spawn(HttpRequest::NULL_IO) # common case + else # nil or len > 0 + @state = Queue.new + app_spawn(nil) + end + when Queue + pause + @state << data + end + rescue => e + handle_error(e) + end + + end +end diff --git a/lib/rainbows/rev/client.rb b/lib/rainbows/rev/client.rb index 52ddaab..9decac9 100644 --- a/lib/rainbows/rev/client.rb +++ b/lib/rainbows/rev/client.rb @@ -20,7 +20,7 @@ module Rainbows # here to prevent connections from being closed on us. def defer_body(io, out_headers) @deferred_bodies << io - on_write_complete unless out_headers # triggers a write + schedule_write unless out_headers # triggers a write end def app_call diff --git a/lib/rainbows/rev_thread_spawn.rb b/lib/rainbows/rev_thread_spawn.rb new file mode 100644 index 0000000..f0482fd --- /dev/null +++ b/lib/rainbows/rev_thread_spawn.rb @@ -0,0 +1,78 @@ +# -*- encoding: binary -*- +require 'rainbows/rev' +require 'rainbows/ev_thread_core' + +module Rainbows + + # A combination of the Rev and ThreadSpawn models. This allows Ruby + # 1.8 and 1.9 to effectively serve more than ~1024 concurrent clients + # on systems that support kqueue or epoll while still using + # Thread-based concurrency for application processing. It exposes + # Unicorn::TeeInput for a streamable "rack.input" for upload + # processing within the app. Threads are spawned immediately after + # header processing is done for calling the application. Rack + # applications running under this mode should be thread-safe. + # DevFdResponse should be used with this class to proxy asynchronous + # responses. All network I/O between the client and server are + # handled by the main thread (even when streaming "rack.input"). + # + # Caveats: + # + # * TeeInput performance is currently terrible under Ruby 1.9.1-p243 + # with few, fast clients. This appears to be due the Queue + # implementation in 1.9. + + module RevThreadSpawn + class Client < Rainbows::Rev::Client + include EvThreadCore + LOOP = ::Rev::Loop.default + DR = Rainbows::Rev::DeferredResponse + + def pause + @lock.synchronize { detach } + end + + def resume + # we always attach to the loop belonging to the main thread + @lock.synchronize { attach(LOOP) } + end + + def write(data) + if Thread.current != @thread && @lock.locked? + # we're being called inside on_writable + super + else + @lock.synchronize { super } + end + end + + def defer_body(io, out_headers) + @lock.synchronize { super } + end + + def response_write(response, out) + DR.write(self, response, out) + (out && CONN_ALIVE == out.first) or + @lock.synchronize { + quit + schedule_write + } + end + + def on_writable + # don't ever want to block in the main loop with lots of clients, + # libev is level-triggered so we'll always get another chance later + if @lock.try_lock + begin + super + ensure + @lock.unlock + end + end + end + + end + + include Rainbows::Rev::Core + end +end -- cgit v1.2.3-24-ge0c7