about summary refs log tree commit homepage
diff options
context:
space:
mode:
-rw-r--r--lib/rainbows.rb1
-rw-r--r--lib/rainbows/writer_thread_pool.rb105
-rw-r--r--t/GNUmakefile1
-rw-r--r--t/simple-http_WriterThreadPool.ru9
-rwxr-xr-xt/t0200-async-response.sh2
5 files changed, 117 insertions, 1 deletions
diff --git a/lib/rainbows.rb b/lib/rainbows.rb
index 16e2662..d97262f 100644
--- a/lib/rainbows.rb
+++ b/lib/rainbows.rb
@@ -134,6 +134,7 @@ module Rainbows
   # highly recommended
   MODEL_WORKER_CONNECTIONS = {
     :Base => 1, # this one can't change
+    :WriterThreadPool => 1,
     :Revactor => 50,
     :ThreadSpawn => 30,
     :ThreadPool => 20,
diff --git a/lib/rainbows/writer_thread_pool.rb b/lib/rainbows/writer_thread_pool.rb
new file mode 100644
index 0000000..f881187
--- /dev/null
+++ b/lib/rainbows/writer_thread_pool.rb
@@ -0,0 +1,105 @@
+# -*- encoding: binary -*-
+
+module Rainbows
+
+  # This concurrency model implements a single-threaded app dispatch
+  # with a separate thread pool for writing responses.  By default, this
+  # thread pool is only a single thread: ideal for typical applications
+  # that do not serve large or streaming responses.
+  #
+  # Unlike most \Rainbows! concurrency models, WriterThreadPool is
+  # designed to run behind nginx just like Unicorn is.  This concurrency
+  # model may be useful for existing Unicorn users looking for more
+  # output concurrency than socket buffers can provide while still
+  # maintaining a single-threaded application dispatch (though if the
+  # response body is dynamically generated, it must be thread safe).
+  #
+  # For serving large or streaming responses, using more threads (via
+  # the +worker_connections+ setting) and setting "proxy_buffering off"
+  # in nginx is recommended.  If your application does not handle
+  # uploads, then using any HTTP-aware proxy like haproxy is fine.
+  # Using a non-HTTP-aware proxy will leave you vulnerable to
+  # slow client denial-of-service attacks.
+
+  module WriterThreadPool
+    include Base
+
+    # used to wrap a BasicSocket to use with +q+ for all writes
+    # this is compatible with IO.select
+    class QueueSocket < Struct.new(:to_io, :q)
+      def readpartial(size, buf = "")
+        to_io.readpartial(size, buf)
+      end
+
+      def write_nonblock(buf)
+        to_io.write_nonblock(buf)
+      end
+
+      def write(buf)
+        q << [ to_io, buf ]
+      end
+
+      def close
+        q << [ to_io, :close ]
+      end
+
+      def closed?
+        false
+      end
+    end
+
+    alias base_write_body write_body
+    if IO.respond_to?(:copy_stream)
+      undef_method :write_body
+
+      def write_body(qclient, body)
+        qclient.q << [ qclient.to_io, :body, body ]
+      end
+    end
+
+    @@nr = 0
+    @@q = nil
+
+    def worker_loop(worker)
+      # we have multiple, single-thread queues since we don't want to
+      # interleave writes from the same client
+      qp = (1..worker_connections).map do |n|
+        QueuePool.new(1) do |response|
+          begin
+            io, arg1, arg2 = response
+            case arg1
+            when :body then base_write_body(io, arg2)
+            when :close then io.close unless io.closed?
+            else
+              io.write(arg1)
+            end
+          rescue => err
+            Error.app(err)
+          end
+        end
+      end
+
+      if qp.size == 1
+        # avoid unnecessary calculations when there's only one queue,
+        # most users should only need one queue...
+        WriterThreadPool.module_eval do
+          def process_client(client)
+            super(QueueSocket[client, @@q])
+          end
+        end
+        @@q = qp.first.queue
+      else
+        WriterThreadPool.module_eval do
+          def process_client(client)
+            @@nr += 1
+            super(QueueSocket[client, @@q[@@nr %= @@q.size]])
+          end
+        end
+        @@q = qp.map { |q| q.queue }
+      end
+
+      super(worker) # accept loop from Unicorn
+      qp.map { |q| q.quit! }
+    end
+  end
+end
diff --git a/t/GNUmakefile b/t/GNUmakefile
index 1c979b3..6540aa0 100644
--- a/t/GNUmakefile
+++ b/t/GNUmakefile
@@ -22,6 +22,7 @@ else
 endif
 export RUBYLIB RUBY_VERSION
 
+models += WriterThreadPool
 models += ThreadPool
 models += ThreadSpawn
 models += Rev
diff --git a/t/simple-http_WriterThreadPool.ru b/t/simple-http_WriterThreadPool.ru
new file mode 100644
index 0000000..0514db4
--- /dev/null
+++ b/t/simple-http_WriterThreadPool.ru
@@ -0,0 +1,9 @@
+use Rack::ContentLength
+use Rack::ContentType
+run lambda { |env|
+  if env['rack.multithread'] && env['rainbows.model'] == :WriterThreadPool
+    [ 200, {}, [ Thread.current.inspect << "\n" ] ]
+  else
+    raise "rack.multithread is false"
+  end
+}
diff --git a/t/t0200-async-response.sh b/t/t0200-async-response.sh
index de5a7de..a1c5928 100755
--- a/t/t0200-async-response.sh
+++ b/t/t0200-async-response.sh
@@ -2,7 +2,7 @@
 CONFIG_RU=${CONFIG_RU-'async-response.ru'}
 . ./test-lib.sh
 
-skip_models Base
+skip_models Base WriterThreadPool
 
 case $CONFIG_RU in
 *no-autochunk.ru)