about summary refs log tree commit homepage
path: root/lib/rainbows/rev.rb
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2009-10-14 17:39:08 -0700
committerEric Wong <normalperson@yhbt.net>2009-10-14 17:39:51 -0700
commit0a47b9209b6c677ad03ad2075f671883ca2b7474 (patch)
tree2c2fc415f527c9f3f8b96912b20a76bbe0420647 /lib/rainbows/rev.rb
parenta42148fe4d62f812bc57418daecdb95f3c4d18cd (diff)
downloadrainbows-0a47b9209b6c677ad03ad2075f671883ca2b7474.tar.gz
There is no TeeInput (streaming request body) support, yet,
as that does not seem fun nor easy to do (or even possible
without using Threads or Fibers or something to save/restore
the stack...)
Diffstat (limited to 'lib/rainbows/rev.rb')
-rw-r--r--lib/rainbows/rev.rb150
1 files changed, 150 insertions, 0 deletions
diff --git a/lib/rainbows/rev.rb b/lib/rainbows/rev.rb
new file mode 100644
index 0000000..8cd76ae
--- /dev/null
+++ b/lib/rainbows/rev.rb
@@ -0,0 +1,150 @@
+# -*- encoding: binary -*-
+require 'rev'
+
+# workaround revactor 0.1.4 still using the old Rev::Buffer
+# ref: http://rubyforge.org/pipermail/revactor-talk/2009-October/000034.html
+defined?(Rev::Buffer) or Rev::Buffer = IO::Buffer
+
+module Rainbows
+
+  module Rev
+
+    # global vars because class/instance variables are confusing me :<
+    # this struct is only accessed inside workers and thus private to each
+    G = Struct.new(:nr, :max, :logger, :alive, :app).new
+
+    include Base
+
+    class Client < ::Rev::IO
+      include Unicorn
+      include Rainbows::Const
+      G = Rainbows::Rev::G
+
+      def initialize(io)
+        G.nr += 1
+        super(io)
+        @remote_addr = ::TCPSocket === io ? io.peeraddr.last : LOCALHOST
+        @env = {}
+        @hp = HttpParser.new
+        @state = :headers # [ :body [ :trailers ] ] :app_call :close
+        @buf = ""
+      end
+
+      def handle_error(e)
+        msg = case e
+        when EOFError,Errno::ECONNRESET,Errno::EPIPE,Errno::EINVAL,Errno::EBADF
+          ERROR_500_RESPONSE
+        when HttpParserError # try to tell the client they're bad
+          ERROR_400_RESPONSE
+        else
+          G.logger.error "Read error: #{e.inspect}"
+          G.logger.error e.backtrace.join("\n")
+          ERROR_500_RESPONSE
+        end
+        write(msg)
+        ensure
+          @state = :close
+      end
+
+      def app_call
+        @input.rewind
+        @env[RACK_INPUT] = @input
+        @env[REMOTE_ADDR] = @remote_addr
+        response = G.app.call(@env.update(RACK_DEFAULTS))
+        alive = @hp.keepalive? && G.alive
+        out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if @hp.headers?
+        HttpResponse.write(self, response, out)
+        if alive
+          @env.clear
+          @hp.reset
+          @state = :headers
+        else
+          @state = :close
+        end
+        rescue Object => e
+          handle_error(e)
+      end
+
+      def on_write_complete
+        :close == @state and close
+      end
+
+      def on_close
+        G.nr -= 1
+      end
+
+      def tmpio
+        io = Util.tmpio
+        def io.size
+          # already sync=true at creation, so no need to flush before stat
+          stat.size
+        end
+        io
+      end
+
+      # TeeInput doesn't map too well to this right now...
+      def on_read(data)
+        case @state
+        when :headers
+          @hp.headers(@env, @buf << data) or return
+          @state = :body
+          len = @hp.content_length
+          if len == 0
+            @input = HttpRequest::NULL_IO
+            app_call # common case
+          else # nil or len > 0
+            # since we don't do streaming input, we have no choice but
+            # to take over 100-continue handling from the Rack application
+            if @env[HTTP_EXPECT] =~ /\A100-continue\z/i
+              write(EXPECT_100_RESPONSE)
+              @env.delete(HTTP_EXPECT)
+            end
+            @input = len && len <= MAX_BODY ? StringIO.new("") : tmpio
+            @hp.filter_body(@buf2 = @buf.dup, @buf)
+            @input << @buf2
+            on_read("")
+          end
+        when :body
+          if @hp.body_eof?
+            @state = :trailers
+            on_read(data)
+          elsif data.size > 0
+            @hp.filter_body(@buf2, @buf << data)
+            @input << @buf2
+            on_read("")
+          end
+        when :trailers
+          @hp.trailers(@env, @buf << data) and app_call
+        end
+      end
+    end
+
+    class Server < ::Rev::IO
+      G = Rainbows::Rev::G
+
+      def on_readable
+        return if G.nr >= G.max
+        begin
+          Client.new(@_io.accept_nonblock).attach(::Rev::Loop.default)
+        rescue Errno::EAGAIN, Errno::ECONNBORTED
+        end
+      end
+
+    end
+
+    # runs inside each forked worker, this sits around and waits
+    # for connections and doesn't die until the parent dies (or is
+    # given a INT, QUIT, or TERM signal)
+    def worker_loop(worker)
+      init_worker_process(worker)
+      G.nr = 0
+      G.max = worker_connections
+      G.alive = true
+      G.logger = logger
+      G.app = app
+      LISTENERS.map! { |s| Server.new(s).attach(::Rev::Loop.default) }
+      ::Rev::Loop.default.run
+    end
+
+  end
+end