about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2010-06-28 05:14:23 +0000
committerEric Wong <normalperson@yhbt.net>2010-06-28 05:14:23 +0000
commiteaf6cca3e1d38b7307c65c257b080041e65fd215 (patch)
tree3659becf4ece7f43d51726a2729484add4456cf6
parent5cde25e2fb6f7de9143a40da5bad4b91b582fcab (diff)
downloadrainbows-eaf6cca3e1d38b7307c65c257b080041e65fd215.tar.gz
-rw-r--r--lib/rainbows/revactor.rb294
1 files changed, 147 insertions, 147 deletions
diff --git a/lib/rainbows/revactor.rb b/lib/rainbows/revactor.rb
index 21bee8a..0ec23d8 100644
--- a/lib/rainbows/revactor.rb
+++ b/lib/rainbows/revactor.rb
@@ -2,174 +2,174 @@
 require 'revactor'
 Revactor::VERSION >= '0.1.5' or abort 'revactor 0.1.5 is required'
 
-module Rainbows
-
-  # Enables use of the Actor model through
-  # {Revactor}[http://revactor.org] under Ruby 1.9.  It spawns one
-  # long-lived Actor for every listen socket in the process and spawns a
-  # new Actor for every client connection accept()-ed.
-  # +worker_connections+ will limit the number of client Actors we have
-  # running at any one time.
-  #
-  # Applications using this model are required to be reentrant, but do
-  # not have to worry about race conditions unless they use threads
-  # internally.  \Rainbows! does not spawn threads under this model.
-  # Multiple instances of the same app may run in the same address space
-  # sequentially (but at interleaved points).  Any network dependencies
-  # in the application using this model should be implemented using the
-  # \Revactor library as well, to take advantage of the networking
-  # concurrency features this model provides.
-
-  module Revactor
-    RD_ARGS = {}
-
-    include Base
-
-    # once a client is accepted, it is processed in its entirety here
-    # in 3 easy steps: read request, call app, write app response
-    def process_client(client)
-      io = client.instance_variable_get(:@_io)
-      io.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC)
-      rd_args = [ nil ]
-      remote_addr = if ::Revactor::TCP::Socket === client
-        rd_args << RD_ARGS
-        client.remote_addr
-      else
-        Unicorn::HttpRequest::LOCALHOST
-      end
-      buf = client.read(*rd_args)
-      hp = HttpParser.new
-      env = {}
-      alive = true
-
-      begin
-        while ! hp.headers(env, buf)
-          buf << client.read(*rd_args)
-        end
-
-        env[Const::CLIENT_IO] = client
-        env[Const::RACK_INPUT] = 0 == hp.content_length ?
-                 NULL_IO :
-                 TeeInput.new(PartialSocket.new(client), env, hp, buf)
-        env[Const::REMOTE_ADDR] = remote_addr
-        response = app.call(env.update(RACK_DEFAULTS))
-
-        if 100 == response[0].to_i
-          client.write(Const::EXPECT_100_RESPONSE)
-          env.delete(Const::HTTP_EXPECT)
-          response = app.call(env)
-        end
-
-        alive = hp.keepalive? && G.alive
-        out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if hp.headers?
-        HttpResponse.write(client, response, out)
-      end while alive and hp.reset.nil? and env.clear
-    rescue ::Revactor::TCP::ReadError
-    rescue => e
-      Error.write(io, e)
-    ensure
-      client.close
+# Enables use of the Actor model through
+# {Revactor}[http://revactor.org] under Ruby 1.9.  It spawns one
+# long-lived Actor for every listen socket in the process and spawns a
+# new Actor for every client connection accept()-ed.
+# +worker_connections+ will limit the number of client Actors we have
+# running at any one time.
+#
+# Applications using this model are required to be reentrant, but do
+# not have to worry about race conditions unless they use threads
+# internally.  \Rainbows! does not spawn threads under this model.
+# Multiple instances of the same app may run in the same address space
+# sequentially (but at interleaved points).  Any network dependencies
+# in the application using this model should be implemented using the
+# \Revactor library as well, to take advantage of the networking
+# concurrency features this model provides.
+module Rainbows::Revactor
+
+  # :stopdoc:
+  RD_ARGS = {}
+
+  include Rainbows::Base
+  LOCALHOST = Unicorn::HttpRequest::LOCALHOST
+  TCP = ::Revactor::TCP::Socket
+
+  # once a client is accepted, it is processed in its entirety here
+  # in 3 easy steps: read request, call app, write app response
+  def process_client(client)
+    io = client.instance_variable_get(:@_io)
+    io.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC)
+    rd_args = [ nil ]
+    remote_addr = if TCP === client
+      rd_args << RD_ARGS
+      client.remote_addr
+    else
+      LOCALHOST
     end
+    buf = client.read(*rd_args)
+    hp = HttpParser.new
+    env = {}
+    alive = true
+
+    begin
+      while ! hp.headers(env, buf)
+        buf << client.read(*rd_args)
+      end
 
-    # runs inside each forked worker, this sits around and waits
-    # for connections and doesn't die until the parent dies (or is
-    # given a INT, QUIT, or TERM signal)
-    def worker_loop(worker)
-      init_worker_process(worker)
-      RD_ARGS[:timeout] = G.kato if G.kato > 0
-      nr = 0
-      limit = worker_connections
-      actor_exit = Case[:exit, Actor, Object]
-
-      revactorize_listeners.each do |l, close, accept|
-        Actor.spawn(l, close, accept) do |l, close, accept|
-          Actor.current.trap_exit = true
-          l.controller = l.instance_variable_set(:@receiver, Actor.current)
-          begin
-            while nr >= limit
-              l.disable if l.enabled?
-              logger.info "busy: clients=#{nr} >= limit=#{limit}"
-              Actor.receive do |f|
-                f.when(close) {}
-                f.when(actor_exit) { nr -= 1 }
-                f.after(0.01) {} # another listener could've gotten an exit
-              end
-            end
+      env[CLIENT_IO] = client
+      env[RACK_INPUT] = 0 == hp.content_length ?
+               NULL_IO : TeeInput.new(PartialSocket.new(client), env, hp, buf)
+      env[REMOTE_ADDR] = remote_addr
+      response = app.call(env.update(RACK_DEFAULTS))
+
+      if 100 == response[0].to_i
+        client.write(EXPECT_100_RESPONSE)
+        env.delete(HTTP_EXPECT)
+        response = app.call(env)
+      end
 
-            l.enable unless l.enabled?
+      alive = hp.keepalive? && G.alive
+      out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if hp.headers?
+      HttpResponse.write(client, response, out)
+    end while alive and hp.reset.nil? and env.clear
+  rescue ::Revactor::TCP::ReadError
+  rescue => e
+    Rainbows::Error.write(io, e)
+  ensure
+    client.close
+  end
+
+  # runs inside each forked worker, this sits around and waits
+  # for connections and doesn't die until the parent dies (or is
+  # given a INT, QUIT, or TERM signal)
+  def worker_loop(worker)
+    init_worker_process(worker)
+    RD_ARGS[:timeout] = G.kato if G.kato > 0
+    nr = 0
+    limit = worker_connections
+    actor_exit = Case[:exit, Actor, Object]
+
+    revactorize_listeners.each do |l, close, accept|
+      Actor.spawn(l, close, accept) do |l, close, accept|
+        Actor.current.trap_exit = true
+        l.controller = l.instance_variable_set(:@receiver, Actor.current)
+        begin
+          while nr >= limit
+            l.disable if l.enabled?
+            logger.info "busy: clients=#{nr} >= limit=#{limit}"
             Actor.receive do |f|
               f.when(close) {}
               f.when(actor_exit) { nr -= 1 }
-              f.when(accept) do |_, _, s|
-                nr += 1
-                Actor.spawn_link(s) { |c| process_client(c) }
-              end
+              f.after(0.01) {} # another listener could've gotten an exit
             end
-          rescue => e
-            Error.listen_loop(e)
-          end while G.alive
+          end
+
+          l.enable unless l.enabled?
           Actor.receive do |f|
             f.when(close) {}
             f.when(actor_exit) { nr -= 1 }
-          end while nr > 0
-        end
+            f.when(accept) do |_, _, s|
+              nr += 1
+              Actor.spawn_link(s) { |c| process_client(c) }
+            end
+          end
+        rescue => e
+          Rainbows::Error.listen_loop(e)
+        end while G.alive
+        Actor.receive do |f|
+          f.when(close) {}
+          f.when(actor_exit) { nr -= 1 }
+        end while nr > 0
       end
-
-      Actor.sleep 1 while G.tick || nr > 0
-      rescue Errno::EMFILE
-        # ignore, let another worker process take it
     end
 
-    def revactorize_listeners
-      LISTENERS.map do |s|
-        case s
-        when TCPServer
-          l = ::Revactor::TCP.listen(s, nil)
-          [ l, T[:tcp_closed, ::Revactor::TCP::Socket],
-            T[:tcp_connection, l, ::Revactor::TCP::Socket] ]
-        when UNIXServer
-          l = ::Revactor::UNIX.listen(s)
-          [ l, T[:unix_closed, ::Revactor::UNIX::Socket ],
-            T[:unix_connection, l, ::Revactor::UNIX::Socket] ]
-        end
-      end
-    end
+    Actor.sleep 1 while G.tick || nr > 0
+    rescue Errno::EMFILE
+      # ignore, let another worker process take it
+  end
 
-    # Revactor Sockets do not implement readpartial, so we emulate just
-    # enough to avoid mucking with TeeInput internals.  Fortunately
-    # this code is not heavily used so we can usually avoid the overhead
-    # of adding a userspace buffer.
-    class PartialSocket < Struct.new(:socket, :rbuf)
-      def initialize(socket)
-        # IO::Buffer is used internally by Rev which Revactor is based on
-        # so we'll always have it available
-        super(socket, IO::Buffer.new)
+  def revactorize_listeners
+    LISTENERS.map do |s|
+      case s
+      when TCPServer
+        l = ::Revactor::TCP.listen(s, nil)
+        [ l, T[:tcp_closed, ::Revactor::TCP::Socket],
+          T[:tcp_connection, l, ::Revactor::TCP::Socket] ]
+      when UNIXServer
+        l = ::Revactor::UNIX.listen(s)
+        [ l, T[:unix_closed, ::Revactor::UNIX::Socket ],
+          T[:unix_connection, l, ::Revactor::UNIX::Socket] ]
       end
+    end
+  end
 
-      # Revactor socket reads always return an unspecified amount,
-      # sometimes too much
-      def readpartial(length, dst = "")
-        return dst if length == 0
-        # always check and return from the userspace buffer first
-        rbuf.size > 0 and return dst.replace(rbuf.read(length))
+  # Revactor Sockets do not implement readpartial, so we emulate just
+  # enough to avoid mucking with TeeInput internals.  Fortunately
+  # this code is not heavily used so we can usually avoid the overhead
+  # of adding a userspace buffer.
+  class PartialSocket < Struct.new(:socket, :rbuf)
+    def initialize(socket)
+      # IO::Buffer is used internally by Rev which Revactor is based on
+      # so we'll always have it available
+      super(socket, IO::Buffer.new)
+    end
 
-        # read off the socket since there was nothing in rbuf
-        tmp = socket.read
+    # Revactor socket reads always return an unspecified amount,
+    # sometimes too much
+    def readpartial(length, dst = "")
+      return dst if length == 0
+      # always check and return from the userspace buffer first
+      rbuf.size > 0 and return dst.replace(rbuf.read(length))
 
-        # we didn't read too much, good, just return it straight back
-        # to avoid needlessly wasting memory bandwidth
-        tmp.size <= length and return dst.replace(tmp)
+      # read off the socket since there was nothing in rbuf
+      tmp = socket.read
 
-        # ugh, read returned too much, copy + reread to avoid slicing
-        rbuf << tmp[length, tmp.size]
-        dst.replace(tmp[0, length])
-      end
+      # we didn't read too much, good, just return it straight back
+      # to avoid needlessly wasting memory bandwidth
+      tmp.size <= length and return dst.replace(tmp)
 
-      # just proxy any remaining methods TeeInput may use
-      def close
-        socket.close
-      end
+      # ugh, read returned too much, copy + reread to avoid slicing
+      rbuf << tmp[length, tmp.size]
+      dst.replace(tmp[0, length])
     end
 
+    # just proxy any remaining methods TeeInput may use
+    def close
+      socket.close
+    end
   end
+
+  # :startdoc:
 end