about summary refs log tree commit homepage
path: root/lib/rainbows/writer_thread_pool.rb
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2010-12-27 02:13:32 +0000
committerEric Wong <normalperson@yhbt.net>2010-12-27 02:13:32 +0000
commit7f2cb1b56afda847c29e1e65fe0608a6f20a0fe6 (patch)
tree2d77ce76c1076deff32b963d4dd80bf7801a3899 /lib/rainbows/writer_thread_pool.rb
parenta5ff497e57bc6e8793c38bdd94ea9f1cfefd17fd (diff)
downloadrainbows-7f2cb1b56afda847c29e1e65fe0608a6f20a0fe6.tar.gz
Diffstat (limited to 'lib/rainbows/writer_thread_pool.rb')
-rw-r--r--lib/rainbows/writer_thread_pool.rb165
1 files changed, 81 insertions, 84 deletions
diff --git a/lib/rainbows/writer_thread_pool.rb b/lib/rainbows/writer_thread_pool.rb
index e8cad91..7b5e861 100644
--- a/lib/rainbows/writer_thread_pool.rb
+++ b/lib/rainbows/writer_thread_pool.rb
@@ -1,105 +1,102 @@
 # -*- encoding: binary -*-
 
-module Rainbows
-
-  # This concurrency model implements a single-threaded app dispatch
-  # with a separate thread pool for writing responses.
-  #
-  # Unlike most \Rainbows! concurrency models, WriterThreadPool is
-  # designed to run behind nginx just like Unicorn is.  This concurrency
-  # model may be useful for existing Unicorn users looking for more
-  # output concurrency than socket buffers can provide while still
-  # maintaining a single-threaded application dispatch (though if the
-  # response body is dynamically generated, it must be thread safe).
-  #
-  # For serving large or streaming responses, using more threads (via
-  # the +worker_connections+ setting) and setting "proxy_buffering off"
-  # in nginx is recommended.  If your application does not handle
-  # uploads, then using any HTTP-aware proxy like haproxy is fine.
-  # Using a non-HTTP-aware proxy will leave you vulnerable to
-  # slow client denial-of-service attacks.
-
-  module WriterThreadPool
-    # :stopdoc:
-    include Base
-
-    # used to wrap a BasicSocket to use with +q+ for all writes
-    # this is compatible with IO.select
-    class QueueSocket < Struct.new(:to_io, :q) # :nodoc:
-      def kgio_addr
-        to_io.kgio_addr
-      end
+# This concurrency model implements a single-threaded app dispatch
+# with a separate thread pool for writing responses.
+#
+# Unlike most \Rainbows! concurrency models, WriterThreadPool is
+# designed to run behind nginx just like Unicorn is.  This concurrency
+# model may be useful for existing Unicorn users looking for more
+# output concurrency than socket buffers can provide while still
+# maintaining a single-threaded application dispatch (though if the
+# response body is dynamically generated, it must be thread safe).
+#
+# For serving large or streaming responses, using more threads (via
+# the +worker_connections+ setting) and setting "proxy_buffering off"
+# in nginx is recommended.  If your application does not handle
+# uploads, then using any HTTP-aware proxy like haproxy is fine.
+# Using a non-HTTP-aware proxy will leave you vulnerable to
+# slow client denial-of-service attacks.
+module Rainbows::WriterThreadPool
+  # :stopdoc:
+  include Rainbows::Base
+
+  # used to wrap a BasicSocket to use with +q+ for all writes
+  # this is compatible with IO.select
+  class QueueSocket < Struct.new(:to_io, :q) # :nodoc:
+    def kgio_addr
+      to_io.kgio_addr
+    end
 
-      def kgio_read(size, buf = "")
-        to_io.kgio_read(size, buf)
-      end
+    def kgio_read(size, buf = "")
+      to_io.kgio_read(size, buf)
+    end
 
-      def kgio_read!(size, buf = "")
-        to_io.kgio_read!(size, buf)
-      end
+    def kgio_read!(size, buf = "")
+      to_io.kgio_read!(size, buf)
+    end
 
-      def kgio_trywrite(buf)
-        to_io.kgio_trywrite(buf)
-      end
+    def kgio_trywrite(buf)
+      to_io.kgio_trywrite(buf)
+    end
 
-      def timed_read(buf)
-        to_io.timed_read(buf)
-      end
+    def timed_read(buf)
+      to_io.timed_read(buf)
+    end
 
-      def write(buf)
-        q << [ to_io, buf ]
-      end
+    def write(buf)
+      q << [ to_io, buf ]
+    end
 
-      def close
-        q << [ to_io, :close ]
-      end
+    def close
+      q << [ to_io, :close ]
+    end
 
-      def closed?
-        false
-      end
+    def closed?
+      false
     end
+  end
 
-    @@nr = 0
-    @@q = nil
+  @@nr = 0
+  @@q = nil
 
-    def async_write_body(qclient, body, range)
-      qclient.q << [ qclient.to_io, :body, body, range ]
-    end
+  def async_write_body(qclient, body, range)
+    qclient.q << [ qclient.to_io, :body, body, range ]
+  end
 
-    def process_client(client) # :nodoc:
-      @@nr += 1
-      super(QueueSocket.new(client, @@q[@@nr %= @@q.size]))
-    end
+  def process_client(client) # :nodoc:
+    @@nr += 1
+    super(QueueSocket.new(client, @@q[@@nr %= @@q.size]))
+  end
 
-    def init_worker_process(worker)
-      super
-      self.class.__send__(:alias_method, :sync_write_body, :write_body)
-      WriterThreadPool.__send__(:alias_method, :write_body, :async_write_body)
-    end
+  def init_worker_process(worker)
+    super
+    self.class.__send__(:alias_method, :sync_write_body, :write_body)
+    Rainbows::WriterThreadPool.__send__(
+                        :alias_method, :write_body, :async_write_body)
+  end
 
-    def worker_loop(worker) # :nodoc:
-      # we have multiple, single-thread queues since we don't want to
-      # interleave writes from the same client
-      qp = (1..worker_connections).map do |n|
-        QueuePool.new(1) do |response|
-          begin
-            io, arg1, arg2, arg3 = response
-            case arg1
-            when :body then sync_write_body(io, arg2, arg3)
-            when :close then io.close unless io.closed?
-            else
-              io.write(arg1)
-            end
-          rescue => err
-            Error.write(io, err)
+  def worker_loop(worker) # :nodoc:
+    # we have multiple, single-thread queues since we don't want to
+    # interleave writes from the same client
+    qp = (1..worker_connections).map do |n|
+      Rainbows::QueuePool.new(1) do |response|
+        begin
+          io, arg1, arg2, arg3 = response
+          case arg1
+          when :body then sync_write_body(io, arg2, arg3)
+          when :close then io.close unless io.closed?
+          else
+            io.write(arg1)
           end
+        rescue => err
+          Rainbows::Error.write(io, err)
         end
       end
-
-      @@q = qp.map { |q| q.queue }
-      super(worker) # accept loop from Unicorn
-      qp.map { |q| q.quit! }
     end
-    # :startdoc:
+
+    @@q = qp.map { |q| q.queue }
+    super(worker) # accept loop from Unicorn
+    qp.map { |q| q.quit! }
   end
+  # :startdoc:
 end