about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2009-11-07 12:23:26 -0800
committerEric Wong <normalperson@yhbt.net>2009-11-07 12:30:47 -0800
commit7e35ea595f4742ace9579402323515031d69fc87 (patch)
tree006c5a6a57c159da466afddc5a7edfb086f6b1cf
parent1266417999aeb939d4e2a7d01aa6730f13cae9fa (diff)
downloadrainbows-7e35ea595f4742ace9579402323515031d69fc87.tar.gz
This will make things easier to manage with more
Rev-based concurrency models.
-rw-r--r--lib/rainbows/rev.rb163
-rw-r--r--lib/rainbows/rev/client.rb75
-rw-r--r--lib/rainbows/rev/core.rb42
-rw-r--r--lib/rainbows/rev/deferred_response.rb70
-rw-r--r--lib/rainbows/rev/heartbeat.rb3
5 files changed, 191 insertions, 162 deletions
diff --git a/lib/rainbows/rev.rb b/lib/rainbows/rev.rb
index 0d8b6c9..602545d 100644
--- a/lib/rainbows/rev.rb
+++ b/lib/rainbows/rev.rb
@@ -1,6 +1,7 @@
 # -*- encoding: binary -*-
-require 'rainbows/rev/heartbeat'
-require 'rainbows/ev_core'
+require 'rainbows/rev/core'
+require 'rainbows/rev/client'
+require 'rainbows/rev/deferred_response'
 
 module Rainbows
 
@@ -23,162 +24,6 @@ module Rainbows
   # temporary file before the application is entered.
 
   module Rev
-
-    include Base
-
-    class Client < ::Rev::IO
-      include Rainbows::EvCore
-      G = Rainbows::G
-
-      def initialize(io)
-        G.cur += 1
-        super(io)
-        post_init
-        @deferred_bodies = [] # for (fast) regular files only
-      end
-
-      # queued, optional response bodies, it should only be unpollable "fast"
-      # devices where read(2) is uninterruptable.  Unfortunately, NFS and ilk
-      # are also part of this.  We'll also stick DeferredResponse bodies in
-      # here to prevent connections from being closed on us.
-      def defer_body(io)
-        @deferred_bodies << io
-        on_write_complete unless @hp.headers? # triggers a write
-      end
-
-      def app_call
-        begin
-          (@env[RACK_INPUT] = @input).rewind
-          @env[REMOTE_ADDR] = @remote_addr
-          response = APP.call(@env.update(RACK_DEFAULTS))
-          alive = @hp.keepalive? && G.alive
-          out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if @hp.headers?
-
-          DeferredResponse.write(self, response, out)
-          if alive
-            @env.clear
-            @hp.reset
-            @state = :headers
-            # keepalive requests are always body-less, so @input is unchanged
-            @hp.headers(@env, @buf) and next
-          else
-            quit
-          end
-          return
-        end while true
-      end
-
-      def on_write_complete
-        if body = @deferred_bodies.first
-          return if DeferredResponse === body
-          begin
-            begin
-              write(body.sysread(CHUNK_SIZE))
-            rescue EOFError # expected at file EOF
-              @deferred_bodies.shift
-              body.close
-              close if :close == @state && @deferred_bodies.empty?
-            end
-          rescue Object => e
-            handle_error(e)
-          end
-        else
-          close if :close == @state
-        end
-      end
-
-      def on_close
-        G.cur -= 1
-      end
-    end
-
-    class Server < ::Rev::IO
-      G = Rainbows::G
-
-      def on_readable
-        return if G.cur >= MAX
-        begin
-          Client.new(@_io.accept_nonblock).attach(::Rev::Loop.default)
-        rescue Errno::EAGAIN, Errno::ECONNABORTED
-        end
-      end
-
-    end
-
-    class DeferredResponse < ::Rev::IO
-      include Unicorn
-      include Rainbows::Const
-      G = Rainbows::G
-
-      def self.defer!(client, response, out)
-        body = response.last
-        headers = Rack::Utils::HeaderHash.new(response[1])
-
-        # to_io is not part of the Rack spec, but make an exception
-        # here since we can't get here without checking to_path first
-        io = body.to_io if body.respond_to?(:to_io)
-        io ||= ::IO.new($1.to_i) if body.to_path =~ %r{\A/dev/fd/(\d+)\z}
-        io ||= File.open(body.to_path, 'rb')
-        st = io.stat
-
-        if st.socket? || st.pipe?
-          do_chunk = !!(headers['Transfer-Encoding'] =~ %r{\Achunked\z}i)
-          do_chunk = false if headers.delete('X-Rainbows-Autochunk') == 'no'
-          # too tricky to support keepalive/pipelining when a response can
-          # take an indeterminate amount of time here.
-          if out.nil?
-            do_chunk = false
-          else
-            out[0] = CONN_CLOSE
-          end
-
-          io = new(io, client, do_chunk, body).attach(::Rev::Loop.default)
-        elsif st.file?
-          headers.delete('Transfer-Encoding')
-          headers['Content-Length'] ||= st.size.to_s
-        else # char/block device, directory, whatever... nobody cares
-          return response
-        end
-        client.defer_body(io)
-        [ response.first, headers.to_hash, [] ]
-      end
-
-      def self.write(client, response, out)
-        response.last.respond_to?(:to_path) and
-          response = defer!(client, response, out)
-        HttpResponse.write(client, response, out)
-      end
-
-      def initialize(io, client, do_chunk, body)
-        super(io)
-        @client, @do_chunk, @body = client, do_chunk, body
-      end
-
-      def on_read(data)
-        @do_chunk and @client.write(sprintf("%x\r\n", data.size))
-        @client.write(data)
-        @do_chunk and @client.write("\r\n")
-      end
-
-      def on_close
-        @do_chunk and @client.write("0\r\n\r\n")
-        @client.quit
-        @body.respond_to?(:close) and @body.close
-      end
-    end
-
-    # runs inside each forked worker, this sits around and waits
-    # for connections and doesn't die until the parent dies (or is
-    # given a INT, QUIT, or TERM signal)
-    def worker_loop(worker)
-      init_worker_process(worker)
-      Client.const_set(:APP, G.server.app)
-      Server.const_set(:MAX, G.server.worker_connections)
-      rloop = ::Rev::Loop.default
-      Heartbeat.new(1, true).attach(rloop)
-      LISTENERS.map! { |s| Server.new(s).attach(rloop) }
-      rloop.run
-    end
-
+    include Core
   end
 end
diff --git a/lib/rainbows/rev/client.rb b/lib/rainbows/rev/client.rb
new file mode 100644
index 0000000..d191787
--- /dev/null
+++ b/lib/rainbows/rev/client.rb
@@ -0,0 +1,75 @@
+# -*- encoding: binary -*-
+require 'rainbows/ev_core'
+module Rainbows
+  module Rev
+
+    include Base
+
+    class Client < ::Rev::IO
+      include Rainbows::EvCore
+      G = Rainbows::G
+
+      def initialize(io)
+        G.cur += 1
+        super(io)
+        post_init
+        @deferred_bodies = [] # for (fast) regular files only
+      end
+
+      # queued, optional response bodies, it should only be unpollable "fast"
+      # devices where read(2) is uninterruptable.  Unfortunately, NFS and ilk
+      # are also part of this.  We'll also stick DeferredResponse bodies in
+      # here to prevent connections from being closed on us.
+      def defer_body(io)
+        @deferred_bodies << io
+        on_write_complete unless @hp.headers? # triggers a write
+      end
+
+      def app_call
+        begin
+          (@env[RACK_INPUT] = @input).rewind
+          @env[REMOTE_ADDR] = @remote_addr
+          response = APP.call(@env.update(RACK_DEFAULTS))
+          alive = @hp.keepalive? && G.alive
+          out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if @hp.headers?
+
+          DeferredResponse.write(self, response, out)
+          if alive
+            @env.clear
+            @hp.reset
+            @state = :headers
+            # keepalive requests are always body-less, so @input is unchanged
+            @hp.headers(@env, @buf) and next
+          else
+            quit
+          end
+          return
+        end while true
+      end
+
+      def on_write_complete
+        if body = @deferred_bodies.first
+          return if DeferredResponse === body
+          begin
+            begin
+              write(body.sysread(CHUNK_SIZE))
+            rescue EOFError # expected at file EOF
+              @deferred_bodies.shift
+              body.close
+              close if :close == @state && @deferred_bodies.empty?
+            end
+          rescue => e
+            handle_error(e)
+          end
+        else
+          close if :close == @state
+        end
+      end
+
+      def on_close
+        G.cur -= 1
+      end
+
+    end # module Client
+  end # module Rev
+end # module Rainbows
diff --git a/lib/rainbows/rev/core.rb b/lib/rainbows/rev/core.rb
new file mode 100644
index 0000000..0d1add5
--- /dev/null
+++ b/lib/rainbows/rev/core.rb
@@ -0,0 +1,42 @@
+# -*- encoding: binary -*-
+require 'rev'
+Rev::VERSION >= '0.3.0' or abort 'rev >= 0.3.0 is required'
+require 'rainbows/rev/heartbeat'
+
+module Rainbows
+  module Rev
+    class Server < ::Rev::IO
+      G = Rainbows::G
+      LOOP = ::Rev::Loop.default
+      # CL and MAX will be defined in the corresponding worker loop
+
+      def on_readable
+        return if G.cur >= MAX
+        begin
+          CL.new(@_io.accept_nonblock).attach(LOOP)
+        rescue Errno::EAGAIN, Errno::ECONNABORTED
+        end
+      end
+    end # class Server
+
+    module Core
+
+      # runs inside each forked worker, this sits around and waits
+      # for connections and doesn't die until the parent dies (or is
+      # given a INT, QUIT, or TERM signal)
+      def worker_loop(worker)
+        init_worker_process(worker)
+        mod = self.class.const_get(@use)
+        client = mod.const_get(:Client)
+        client.const_set(:APP, G.server.app)
+        Server.const_set(:MAX, G.server.worker_connections)
+        Server.const_set(:CL, client)
+        rloop = ::Rev::Loop.default
+        Heartbeat.new(1, true).attach(rloop)
+        LISTENERS.map! { |s| Server.new(s).attach(rloop) }
+        rloop.run
+      end
+
+    end # module Core
+  end # module Rev
+end # module Rainbows
diff --git a/lib/rainbows/rev/deferred_response.rb b/lib/rainbows/rev/deferred_response.rb
new file mode 100644
index 0000000..d97abbe
--- /dev/null
+++ b/lib/rainbows/rev/deferred_response.rb
@@ -0,0 +1,70 @@
+# -*- encoding: binary -*-
+module Rainbows
+  module Rev
+
+    # this is class is specific to Rev for writing large static files
+    # or proxying IO-derived objects
+    class DeferredResponse < ::Rev::IO
+      include Unicorn
+      include Rainbows::Const
+      G = Rainbows::G
+      HH = Rack::Utils::HeaderHash
+
+      def self.defer!(client, response, out)
+        body = response.last
+        headers = HH.new(response[1])
+
+        # to_io is not part of the Rack spec, but make an exception
+        # here since we can't get here without checking to_path first
+        io = body.to_io if body.respond_to?(:to_io)
+        io ||= ::IO.new($1.to_i) if body.to_path =~ %r{\A/dev/fd/(\d+)\z}
+        io ||= File.open(body.to_path, 'rb')
+        st = io.stat
+
+        if st.socket? || st.pipe?
+          do_chunk = !!(headers['Transfer-Encoding'] =~ %r{\Achunked\z}i)
+          do_chunk = false if headers.delete('X-Rainbows-Autochunk') == 'no'
+          # too tricky to support keepalive/pipelining when a response can
+          # take an indeterminate amount of time here.
+          if out.nil?
+            do_chunk = false
+          else
+            out[0] = CONN_CLOSE
+          end
+
+          io = new(io, client, do_chunk, body).attach(::Rev::Loop.default)
+        elsif st.file?
+          headers.delete('Transfer-Encoding')
+          headers['Content-Length'] ||= st.size.to_s
+        else # char/block device, directory, whatever... nobody cares
+          return response
+        end
+        client.defer_body(io)
+        [ response.first, headers.to_hash, [] ]
+      end
+
+      def self.write(client, response, out)
+        response.last.respond_to?(:to_path) and
+          response = defer!(client, response, out)
+        HttpResponse.write(client, response, out)
+      end
+
+      def initialize(io, client, do_chunk, body)
+        super(io)
+        @client, @do_chunk, @body = client, do_chunk, body
+      end
+
+      def on_read(data)
+        @do_chunk and @client.write(sprintf("%x\r\n", data.size))
+        @client.write(data)
+        @do_chunk and @client.write("\r\n")
+      end
+
+      def on_close
+        @do_chunk and @client.write("0\r\n\r\n")
+        @client.quit
+        @body.respond_to?(:close) and @body.close
+      end
+    end # class DeferredResponse
+  end # module Rev
+end # module Rainbows
diff --git a/lib/rainbows/rev/heartbeat.rb b/lib/rainbows/rev/heartbeat.rb
index 63eb71d..1f07b2d 100644
--- a/lib/rainbows/rev/heartbeat.rb
+++ b/lib/rainbows/rev/heartbeat.rb
@@ -1,7 +1,4 @@
 # -*- encoding: binary -*-
-require 'rev'
-Rev::VERSION >= '0.3.0' or abort 'rev >= 0.3.0 is required'
-
 module Rainbows
   module Rev