about summary refs log tree commit homepage
path: root/lib/rainbows/writer_thread_spawn.rb
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2010-12-27 02:30:58 +0000
committerEric Wong <normalperson@yhbt.net>2010-12-27 02:30:58 +0000
commit6ae020c9ac483d822902b5d33f038f79b44d3a50 (patch)
tree98b1ef14929bb868cb3cf32867c4e0962ee91432 /lib/rainbows/writer_thread_spawn.rb
parente7d295fd8e3628eba7a1ba52e95b7dee11532e98 (diff)
downloadrainbows-6ae020c9ac483d822902b5d33f038f79b44d3a50.tar.gz
Use a consistent "Client" naming to reduce confusion
Diffstat (limited to 'lib/rainbows/writer_thread_spawn.rb')
-rw-r--r--lib/rainbows/writer_thread_spawn.rb70
1 files changed, 6 insertions, 64 deletions
diff --git a/lib/rainbows/writer_thread_spawn.rb b/lib/rainbows/writer_thread_spawn.rb
index 4215254..a11e82c 100644
--- a/lib/rainbows/writer_thread_spawn.rb
+++ b/lib/rainbows/writer_thread_spawn.rb
@@ -22,82 +22,24 @@ module Rainbows::WriterThreadSpawn
   # :stopdoc:
   include Rainbows::Base
 
-  CUR = {} # :nodoc:
-
-  # used to wrap a BasicSocket to use with +q+ for all writes
-  # this is compatible with IO.select
-  class MySocket < Struct.new(:to_io, :q, :thr)  # :nodoc: all
-    include Rainbows::Response
-    include Rainbows::SocketProxy
-
-    def queue_writer
-      # not using Thread.pass here because that spins the CPU during
-      # I/O wait and will eat cycles from other worker processes.
-      until CUR.size < MAX
-        CUR.delete_if { |t,_|
-          t.alive? ? t.join(0) : true
-        }.size >= MAX and sleep(0.01)
-      end
-
-      q = Queue.new
-      self.thr = Thread.new(to_io, q) do |io, q|
-        while response = q.shift
-          begin
-            arg1, arg2, arg3 = response
-            case arg1
-            when :body then write_body(io, arg2, arg3)
-            when :close
-              io.close unless io.closed?
-              break
-            else
-              io.write(arg1)
-            end
-          rescue => e
-            Error.write(io, e)
-          end
-        end
-        CUR.delete(Thread.current)
-      end
-      CUR[thr] = q
-    end
-
-    def write(buf)
-      (self.q ||= queue_writer) << buf
-    end
-
-    def queue_body(body, range)
-      (self.q ||= queue_writer) << [ :body, body, range ]
-    end
-
-    def close
-      if q
-        q << :close
-      else
-        to_io.close
-      end
-    end
-
-    def closed?
-      false
-    end
-  end
-
   def write_body(my_sock, body, range) # :nodoc:
     my_sock.queue_body(body, range)
   end
 
   def process_client(client) # :nodoc:
-    super(MySocket[client])
+    super(Client.new(client))
   end
 
   def worker_loop(worker)  # :nodoc:
-    MySocket.const_set(:MAX, worker_connections)
+    Client.const_set(:MAX, worker_connections)
     super(worker) # accept loop from Unicorn
-    CUR.delete_if do |t,q|
+    Client::CUR.delete_if do |t,q|
       q << nil
       G.tick
       t.alive? ? t.join(0.01) : true
-    end until CUR.empty?
+    end until Client::CUR.empty?
   end
   # :startdoc:
 end
+# :enddoc:
+require 'rainbows/writer_thread_spawn/client'