about summary refs log tree commit homepage
path: root/lib
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2009-11-07 20:15:03 -0800
committerEric Wong <normalperson@yhbt.net>2009-11-08 00:34:35 -0800
commitb4f1271320d38e83141dbb38463c3a368661aef7 (patch)
treebc0514443ebc62d2f24b2c690e3499c5a6dd9cb2 /lib
parent026219a98c0ecf919c3ecce32ba389254a571795 (diff)
downloadrainbows-b4f1271320d38e83141dbb38463c3a368661aef7.tar.gz
Seems to pass all tests, but that may only mean our
test cases are lacking...
Diffstat (limited to 'lib')
-rw-r--r--lib/rainbows.rb1
-rw-r--r--lib/rainbows/ev_thread_core.rb74
-rw-r--r--lib/rainbows/rev/client.rb2
-rw-r--r--lib/rainbows/rev_thread_spawn.rb78
4 files changed, 154 insertions, 1 deletions
diff --git a/lib/rainbows.rb b/lib/rainbows.rb
index 5bd8693..4686f2b 100644
--- a/lib/rainbows.rb
+++ b/lib/rainbows.rb
@@ -67,6 +67,7 @@ module Rainbows
     :ThreadSpawn => 30,
     :ThreadPool => 10,
     :Rev => 50,
+    :RevThreadSpawn => 50,
     :EventMachine => 50,
   }.each do |model, _|
     u = model.to_s.gsub(/([a-z0-9])([A-Z0-9])/) { "#{$1}_#{$2.downcase!}" }
diff --git a/lib/rainbows/ev_thread_core.rb b/lib/rainbows/ev_thread_core.rb
new file mode 100644
index 0000000..784d30a
--- /dev/null
+++ b/lib/rainbows/ev_thread_core.rb
@@ -0,0 +1,74 @@
+# -*- encoding: binary -*-
+require 'thread' # for Queue
+require 'rainbows/ev_core'
+
+module Rainbows
+
+  # base module for mixed Thread + evented models like RevThreadSpawn
+  module EvThreadCore
+    include EvCore
+
+    def post_init
+      super
+      @lock = Mutex.new
+      @thread = nil
+    end
+
+    # we pass ourselves off as a Socket to Unicorn::TeeInput and this
+    # is the only method Unicorn::TeeInput requires from the socket
+    def readpartial(length, buf = "")
+      buf.replace(@state.pop)
+      resume
+      buf
+    end
+
+    def app_spawn(input)
+      begin
+        @thread.nil? or @thread.join # only one thread per connection
+        env = @env.dup
+        alive, headers = @hp.keepalive?, @hp.headers?
+        @thread = Thread.new(self) do |client|
+          begin
+            env[REMOTE_ADDR] = @remote_addr
+            env[RACK_INPUT] = input || TeeInput.new(client, env, @hp, @buf)
+            response = APP.call(env.update(RACK_DEFAULTS))
+            if 100 == response.first.to_i
+              write(EXPECT_100_RESPONSE)
+              env.delete(HTTP_EXPECT)
+              response = APP.call(env)
+            end
+
+            alive &&= G.alive
+            out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if headers
+            response_write(response, out)
+          rescue => e
+            handle_error(e) rescue nil
+          end
+        end
+        if alive # in case we pipeline
+          @hp.reset
+          redo if @hp.headers(@env.clear, @buf)
+        end
+      end while false
+    end
+
+    def on_read(data)
+      case @state
+      when :headers
+        @hp.headers(@env, @buf << data) or return
+        if 0 == @hp.content_length
+          app_spawn(HttpRequest::NULL_IO) # common case
+        else # nil or len > 0
+          @state = Queue.new
+          app_spawn(nil)
+        end
+      when Queue
+        pause
+        @state << data
+      end
+      rescue => e
+        handle_error(e)
+    end
+
+  end
+end
diff --git a/lib/rainbows/rev/client.rb b/lib/rainbows/rev/client.rb
index 52ddaab..9decac9 100644
--- a/lib/rainbows/rev/client.rb
+++ b/lib/rainbows/rev/client.rb
@@ -20,7 +20,7 @@ module Rainbows
       # here to prevent connections from being closed on us.
       def defer_body(io, out_headers)
         @deferred_bodies << io
-        on_write_complete unless out_headers # triggers a write
+        schedule_write unless out_headers # triggers a write
       end
 
       def app_call
diff --git a/lib/rainbows/rev_thread_spawn.rb b/lib/rainbows/rev_thread_spawn.rb
new file mode 100644
index 0000000..f0482fd
--- /dev/null
+++ b/lib/rainbows/rev_thread_spawn.rb
@@ -0,0 +1,78 @@
+# -*- encoding: binary -*-
+require 'rainbows/rev'
+require 'rainbows/ev_thread_core'
+
+module Rainbows
+
+  # A combination of the Rev and ThreadSpawn models.  This allows Ruby
+  # 1.8 and 1.9 to effectively serve more than ~1024 concurrent clients
+  # on systems that support kqueue or epoll while still using
+  # Thread-based concurrency for application processing.  It exposes
+  # Unicorn::TeeInput for a streamable "rack.input" for upload
+  # processing within the app.  Threads are spawned immediately after
+  # header processing is done for calling the application.  Rack
+  # applications running under this mode should be thread-safe.
+  # DevFdResponse should be used with this class to proxy asynchronous
+  # responses.  All network I/O between the client and server are
+  # handled by the main thread (even when streaming "rack.input").
+  #
+  # Caveats:
+  #
+  # * TeeInput performance is currently terrible under Ruby 1.9.1-p243
+  #   with few, fast clients.  This appears to be due the Queue
+  #   implementation in 1.9.
+
+  module RevThreadSpawn
+    class Client < Rainbows::Rev::Client
+      include EvThreadCore
+      LOOP = ::Rev::Loop.default
+      DR = Rainbows::Rev::DeferredResponse
+
+      def pause
+        @lock.synchronize { detach }
+      end
+
+      def resume
+        # we always attach to the loop belonging to the main thread
+        @lock.synchronize { attach(LOOP) }
+      end
+
+      def write(data)
+        if Thread.current != @thread && @lock.locked?
+          # we're being called inside on_writable
+          super
+        else
+          @lock.synchronize { super }
+        end
+      end
+
+      def defer_body(io, out_headers)
+        @lock.synchronize { super }
+      end
+
+      def response_write(response, out)
+        DR.write(self, response, out)
+        (out && CONN_ALIVE == out.first) or
+            @lock.synchronize {
+              quit
+              schedule_write
+            }
+      end
+
+      def on_writable
+        # don't ever want to block in the main loop with lots of clients,
+        # libev is level-triggered so we'll always get another chance later
+        if @lock.try_lock
+          begin
+            super
+          ensure
+            @lock.unlock
+          end
+        end
+      end
+
+    end
+
+    include Rainbows::Rev::Core
+  end
+end