about summary refs log tree commit homepage
path: root/lib/rainbows/writer_thread_spawn.rb
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2010-12-27 02:13:32 +0000
committerEric Wong <normalperson@yhbt.net>2010-12-27 02:13:32 +0000
commit7f2cb1b56afda847c29e1e65fe0608a6f20a0fe6 (patch)
tree2d77ce76c1076deff32b963d4dd80bf7801a3899 /lib/rainbows/writer_thread_spawn.rb
parenta5ff497e57bc6e8793c38bdd94ea9f1cfefd17fd (diff)
downloadrainbows-7f2cb1b56afda847c29e1e65fe0608a6f20a0fe6.tar.gz
Diffstat (limited to 'lib/rainbows/writer_thread_spawn.rb')
-rw-r--r--lib/rainbows/writer_thread_spawn.rb201
1 files changed, 99 insertions, 102 deletions
diff --git a/lib/rainbows/writer_thread_spawn.rb b/lib/rainbows/writer_thread_spawn.rb
index 02ae0d5..4ee98dd 100644
--- a/lib/rainbows/writer_thread_spawn.rb
+++ b/lib/rainbows/writer_thread_spawn.rb
@@ -1,125 +1,122 @@
 # -*- encoding: binary -*-
 require 'thread'
-module Rainbows
-
-  # This concurrency model implements a single-threaded app dispatch and
-  # spawns a new thread for writing responses.  This concurrency model
-  # should be ideal for apps that serve large responses or stream
-  # responses slowly.
-  #
-  # Unlike most \Rainbows! concurrency models, WriterThreadSpawn is
-  # designed to run behind nginx just like Unicorn is.  This concurrency
-  # model may be useful for existing Unicorn users looking for more
-  # output concurrency than socket buffers can provide while still
-  # maintaining a single-threaded application dispatch (though if the
-  # response body is generated on-the-fly, it must be thread safe).
-  #
-  # For serving large or streaming responses, setting
-  # "proxy_buffering off" in nginx is recommended.  If your application
-  # does not handle uploads, then using any HTTP-aware proxy like
-  # haproxy is fine.  Using a non-HTTP-aware proxy will leave you
-  # vulnerable to slow client denial-of-service attacks.
-
-  module WriterThreadSpawn
-    # :stopdoc:
-    include Base
-
-    CUR = {} # :nodoc:
-
-    # used to wrap a BasicSocket to use with +q+ for all writes
-    # this is compatible with IO.select
-    class MySocket < Struct.new(:to_io, :q, :thr)  # :nodoc: all
-      include Rainbows::Response
-
-      def kgio_addr
-        to_io.kgio_addr
-      end
+# This concurrency model implements a single-threaded app dispatch and
+# spawns a new thread for writing responses.  This concurrency model
+# should be ideal for apps that serve large responses or stream
+# responses slowly.
+#
+# Unlike most \Rainbows! concurrency models, WriterThreadSpawn is
+# designed to run behind nginx just like Unicorn is.  This concurrency
+# model may be useful for existing Unicorn users looking for more
+# output concurrency than socket buffers can provide while still
+# maintaining a single-threaded application dispatch (though if the
+# response body is generated on-the-fly, it must be thread safe).
+#
+# For serving large or streaming responses, setting
+# "proxy_buffering off" in nginx is recommended.  If your application
+# does not handle uploads, then using any HTTP-aware proxy like
+# haproxy is fine.  Using a non-HTTP-aware proxy will leave you
+# vulnerable to slow client denial-of-service attacks.
+
+module Rainbows::WriterThreadSpawn
+  # :stopdoc:
+  include Rainbows::Base
+
+  CUR = {} # :nodoc:
+
+  # used to wrap a BasicSocket to use with +q+ for all writes
+  # this is compatible with IO.select
+  class MySocket < Struct.new(:to_io, :q, :thr)  # :nodoc: all
+    include Rainbows::Response
+
+    def kgio_addr
+      to_io.kgio_addr
+    end
 
-      def kgio_read(size, buf = "")
-        to_io.kgio_read(size, buf)
-      end
+    def kgio_read(size, buf = "")
+      to_io.kgio_read(size, buf)
+    end
 
-      def kgio_read!(size, buf = "")
-        to_io.kgio_read!(size, buf)
-      end
+    def kgio_read!(size, buf = "")
+      to_io.kgio_read!(size, buf)
+    end
 
-      def kgio_trywrite(buf)
-        to_io.kgio_trywrite(buf)
-      end
+    def kgio_trywrite(buf)
+      to_io.kgio_trywrite(buf)
+    end
 
-      def timed_read(buf)
-        to_io.timed_read(buf)
-      end
+    def timed_read(buf)
+      to_io.timed_read(buf)
+    end
 
-      def queue_writer
-        # not using Thread.pass here because that spins the CPU during
-        # I/O wait and will eat cycles from other worker processes.
-        until CUR.size < MAX
-          CUR.delete_if { |t,_|
-            t.alive? ? t.join(0) : true
-          }.size >= MAX and sleep(0.01)
-        end
+    def queue_writer
+      # not using Thread.pass here because that spins the CPU during
+      # I/O wait and will eat cycles from other worker processes.
+      until CUR.size < MAX
+        CUR.delete_if { |t,_|
+          t.alive? ? t.join(0) : true
+        }.size >= MAX and sleep(0.01)
+      end
 
-        q = Queue.new
-        self.thr = Thread.new(to_io, q) do |io, q|
-          while response = q.shift
-            begin
-              arg1, arg2, arg3 = response
-              case arg1
-              when :body then write_body(io, arg2, arg3)
-              when :close
-                io.close unless io.closed?
-                break
-              else
-                io.write(arg1)
-              end
-            rescue => e
-              Error.write(io, e)
+      q = Queue.new
+      self.thr = Thread.new(to_io, q) do |io, q|
+        while response = q.shift
+          begin
+            arg1, arg2, arg3 = response
+            case arg1
+            when :body then write_body(io, arg2, arg3)
+            when :close
+              io.close unless io.closed?
+              break
+            else
+              io.write(arg1)
             end
+          rescue => e
+            Error.write(io, e)
           end
-          CUR.delete(Thread.current)
         end
-        CUR[thr] = q
-      end
-
-      def write(buf)
-        (self.q ||= queue_writer) << buf
+        CUR.delete(Thread.current)
       end
+      CUR[thr] = q
+    end
 
-      def queue_body(body, range)
-        (self.q ||= queue_writer) << [ :body, body, range ]
-      end
+    def write(buf)
+      (self.q ||= queue_writer) << buf
+    end
 
-      def close
-        if q
-          q << :close
-        else
-          to_io.close
-        end
-      end
+    def queue_body(body, range)
+      (self.q ||= queue_writer) << [ :body, body, range ]
+    end
 
-      def closed?
-        false
+    def close
+      if q
+        q << :close
+      else
+        to_io.close
       end
     end
 
-    def write_body(my_sock, body, range) # :nodoc:
-      my_sock.queue_body(body, range)
+    def closed?
+      false
     end
+  end
 
-    def process_client(client) # :nodoc:
-      super(MySocket[client])
-    end
+  def write_body(my_sock, body, range) # :nodoc:
+    my_sock.queue_body(body, range)
+  end
 
-    def worker_loop(worker)  # :nodoc:
-      MySocket.const_set(:MAX, worker_connections)
-      super(worker) # accept loop from Unicorn
-      CUR.delete_if do |t,q|
-        q << nil
-        G.tick
-        t.alive? ? t.join(0.01) : true
-      end until CUR.empty?
-    end
-    # :startdoc:
+  def process_client(client) # :nodoc:
+    super(MySocket[client])
+  end
+
+  def worker_loop(worker)  # :nodoc:
+    MySocket.const_set(:MAX, worker_connections)
+    super(worker) # accept loop from Unicorn
+    CUR.delete_if do |t,q|
+      q << nil
+      G.tick
+      t.alive? ? t.join(0.01) : true
+    end until CUR.empty?
   end
+  # :startdoc:
 end