From 0a47b9209b6c677ad03ad2075f671883ca2b7474 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Wed, 14 Oct 2009 17:39:08 -0700 Subject: preliminary Rev support There is no TeeInput (streaming request body) support, yet, as that does not seem fun nor easy to do (or even possible without using Threads or Fibers or something to save/restore the stack...) --- lib/rainbows/rev.rb | 150 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 150 insertions(+) create mode 100644 lib/rainbows/rev.rb (limited to 'lib/rainbows') diff --git a/lib/rainbows/rev.rb b/lib/rainbows/rev.rb new file mode 100644 index 0000000..8cd76ae --- /dev/null +++ b/lib/rainbows/rev.rb @@ -0,0 +1,150 @@ +# -*- encoding: binary -*- +require 'rev' + +# workaround revactor 0.1.4 still using the old Rev::Buffer +# ref: http://rubyforge.org/pipermail/revactor-talk/2009-October/000034.html +defined?(Rev::Buffer) or Rev::Buffer = IO::Buffer + +module Rainbows + + module Rev + + # global vars because class/instance variables are confusing me :< + # this struct is only accessed inside workers and thus private to each + G = Struct.new(:nr, :max, :logger, :alive, :app).new + + include Base + + class Client < ::Rev::IO + include Unicorn + include Rainbows::Const + G = Rainbows::Rev::G + + def initialize(io) + G.nr += 1 + super(io) + @remote_addr = ::TCPSocket === io ? io.peeraddr.last : LOCALHOST + @env = {} + @hp = HttpParser.new + @state = :headers # [ :body [ :trailers ] ] :app_call :close + @buf = "" + end + + def handle_error(e) + msg = case e + when EOFError,Errno::ECONNRESET,Errno::EPIPE,Errno::EINVAL,Errno::EBADF + ERROR_500_RESPONSE + when HttpParserError # try to tell the client they're bad + ERROR_400_RESPONSE + else + G.logger.error "Read error: #{e.inspect}" + G.logger.error e.backtrace.join("\n") + ERROR_500_RESPONSE + end + write(msg) + ensure + @state = :close + end + + def app_call + @input.rewind + @env[RACK_INPUT] = @input + @env[REMOTE_ADDR] = @remote_addr + response = G.app.call(@env.update(RACK_DEFAULTS)) + alive = @hp.keepalive? && G.alive + out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if @hp.headers? + HttpResponse.write(self, response, out) + if alive + @env.clear + @hp.reset + @state = :headers + else + @state = :close + end + rescue Object => e + handle_error(e) + end + + def on_write_complete + :close == @state and close + end + + def on_close + G.nr -= 1 + end + + def tmpio + io = Util.tmpio + def io.size + # already sync=true at creation, so no need to flush before stat + stat.size + end + io + end + + # TeeInput doesn't map too well to this right now... + def on_read(data) + case @state + when :headers + @hp.headers(@env, @buf << data) or return + @state = :body + len = @hp.content_length + if len == 0 + @input = HttpRequest::NULL_IO + app_call # common case + else # nil or len > 0 + # since we don't do streaming input, we have no choice but + # to take over 100-continue handling from the Rack application + if @env[HTTP_EXPECT] =~ /\A100-continue\z/i + write(EXPECT_100_RESPONSE) + @env.delete(HTTP_EXPECT) + end + @input = len && len <= MAX_BODY ? StringIO.new("") : tmpio + @hp.filter_body(@buf2 = @buf.dup, @buf) + @input << @buf2 + on_read("") + end + when :body + if @hp.body_eof? + @state = :trailers + on_read(data) + elsif data.size > 0 + @hp.filter_body(@buf2, @buf << data) + @input << @buf2 + on_read("") + end + when :trailers + @hp.trailers(@env, @buf << data) and app_call + end + end + end + + class Server < ::Rev::IO + G = Rainbows::Rev::G + + def on_readable + return if G.nr >= G.max + begin + Client.new(@_io.accept_nonblock).attach(::Rev::Loop.default) + rescue Errno::EAGAIN, Errno::ECONNBORTED + end + end + + end + + # runs inside each forked worker, this sits around and waits + # for connections and doesn't die until the parent dies (or is + # given a INT, QUIT, or TERM signal) + def worker_loop(worker) + init_worker_process(worker) + G.nr = 0 + G.max = worker_connections + G.alive = true + G.logger = logger + G.app = app + LISTENERS.map! { |s| Server.new(s).attach(::Rev::Loop.default) } + ::Rev::Loop.default.run + end + + end +end -- cgit v1.2.3-24-ge0c7