about summary refs log tree commit homepage
path: root/lib/rainbows/rev.rb
diff options
context:
space:
mode:
Diffstat (limited to 'lib/rainbows/rev.rb')
-rw-r--r--lib/rainbows/rev.rb150
1 files changed, 150 insertions, 0 deletions
diff --git a/lib/rainbows/rev.rb b/lib/rainbows/rev.rb
new file mode 100644
index 0000000..8cd76ae
--- /dev/null
+++ b/lib/rainbows/rev.rb
@@ -0,0 +1,150 @@
+# -*- encoding: binary -*-
+require 'rev'
+
+# workaround revactor 0.1.4 still using the old Rev::Buffer
+# ref: http://rubyforge.org/pipermail/revactor-talk/2009-October/000034.html
+defined?(Rev::Buffer) or Rev::Buffer = IO::Buffer
+
+module Rainbows
+
+  module Rev
+
+    # global vars because class/instance variables are confusing me :<
+    # this struct is only accessed inside workers and thus private to each
+    G = Struct.new(:nr, :max, :logger, :alive, :app).new
+
+    include Base
+
+    class Client < ::Rev::IO
+      include Unicorn
+      include Rainbows::Const
+      G = Rainbows::Rev::G
+
+      def initialize(io)
+        G.nr += 1
+        super(io)
+        @remote_addr = ::TCPSocket === io ? io.peeraddr.last : LOCALHOST
+        @env = {}
+        @hp = HttpParser.new
+        @state = :headers # [ :body [ :trailers ] ] :app_call :close
+        @buf = ""
+      end
+
+      def handle_error(e)
+        msg = case e
+        when EOFError,Errno::ECONNRESET,Errno::EPIPE,Errno::EINVAL,Errno::EBADF
+          ERROR_500_RESPONSE
+        when HttpParserError # try to tell the client they're bad
+          ERROR_400_RESPONSE
+        else
+          G.logger.error "Read error: #{e.inspect}"
+          G.logger.error e.backtrace.join("\n")
+          ERROR_500_RESPONSE
+        end
+        write(msg)
+        ensure
+          @state = :close
+      end
+
+      def app_call
+        @input.rewind
+        @env[RACK_INPUT] = @input
+        @env[REMOTE_ADDR] = @remote_addr
+        response = G.app.call(@env.update(RACK_DEFAULTS))
+        alive = @hp.keepalive? && G.alive
+        out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if @hp.headers?
+        HttpResponse.write(self, response, out)
+        if alive
+          @env.clear
+          @hp.reset
+          @state = :headers
+        else
+          @state = :close
+        end
+        rescue Object => e
+          handle_error(e)
+      end
+
+      def on_write_complete
+        :close == @state and close
+      end
+
+      def on_close
+        G.nr -= 1
+      end
+
+      def tmpio
+        io = Util.tmpio
+        def io.size
+          # already sync=true at creation, so no need to flush before stat
+          stat.size
+        end
+        io
+      end
+
+      # TeeInput doesn't map too well to this right now...
+      def on_read(data)
+        case @state
+        when :headers
+          @hp.headers(@env, @buf << data) or return
+          @state = :body
+          len = @hp.content_length
+          if len == 0
+            @input = HttpRequest::NULL_IO
+            app_call # common case
+          else # nil or len > 0
+            # since we don't do streaming input, we have no choice but
+            # to take over 100-continue handling from the Rack application
+            if @env[HTTP_EXPECT] =~ /\A100-continue\z/i
+              write(EXPECT_100_RESPONSE)
+              @env.delete(HTTP_EXPECT)
+            end
+            @input = len && len <= MAX_BODY ? StringIO.new("") : tmpio
+            @hp.filter_body(@buf2 = @buf.dup, @buf)
+            @input << @buf2
+            on_read("")
+          end
+        when :body
+          if @hp.body_eof?
+            @state = :trailers
+            on_read(data)
+          elsif data.size > 0
+            @hp.filter_body(@buf2, @buf << data)
+            @input << @buf2
+            on_read("")
+          end
+        when :trailers
+          @hp.trailers(@env, @buf << data) and app_call
+        end
+      end
+    end
+
+    class Server < ::Rev::IO
+      G = Rainbows::Rev::G
+
+      def on_readable
+        return if G.nr >= G.max
+        begin
+          Client.new(@_io.accept_nonblock).attach(::Rev::Loop.default)
+        rescue Errno::EAGAIN, Errno::ECONNBORTED
+        end
+      end
+
+    end
+
+    # runs inside each forked worker, this sits around and waits
+    # for connections and doesn't die until the parent dies (or is
+    # given a INT, QUIT, or TERM signal)
+    def worker_loop(worker)
+      init_worker_process(worker)
+      G.nr = 0
+      G.max = worker_connections
+      G.alive = true
+      G.logger = logger
+      G.app = app
+      LISTENERS.map! { |s| Server.new(s).attach(::Rev::Loop.default) }
+      ::Rev::Loop.default.run
+    end
+
+  end
+end