From 344f8cf1e000704d53d7841eb896d83b470d7a08 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Sat, 24 Oct 2009 22:13:23 -0700 Subject: rev/evma: move common code for event models into ev_core EventMachine and Rev models seem to be able to share a lot of common code, so lets share. We may support Packet in the future, too, and end up with a similar programming model there as well. --- lib/rainbows/ev_core.rb | 89 +++++++++++++++++++++++++++++++++++++++++++ lib/rainbows/event_machine.rb | 84 ++-------------------------------------- lib/rainbows/rev.rb | 79 ++------------------------------------ 3 files changed, 95 insertions(+), 157 deletions(-) create mode 100644 lib/rainbows/ev_core.rb (limited to 'lib') diff --git a/lib/rainbows/ev_core.rb b/lib/rainbows/ev_core.rb new file mode 100644 index 0000000..aa0b155 --- /dev/null +++ b/lib/rainbows/ev_core.rb @@ -0,0 +1,89 @@ +# -*- encoding: binary -*- + +module Rainbows + + # base module for evented models like Rev and EventMachine + module EvCore + include Unicorn + include Rainbows::Const + G = Rainbows::G + + def post_init + @remote_addr = ::TCPSocket === @_io ? @_io.peeraddr.last : LOCALHOST + @env = {} + @hp = HttpParser.new + @state = :headers # [ :body [ :trailers ] ] :app_call :close + @buf = "" + @deferred_bodies = [] # for (fast) regular files only + end + + # graceful exit, like SIGQUIT + def quit + @deferred_bodies.clear + @state = :close + end + + def handle_error(e) + quit + msg = case e + when EOFError,Errno::ECONNRESET,Errno::EPIPE,Errno::EINVAL,Errno::EBADF + ERROR_500_RESPONSE + when HttpParserError # try to tell the client they're bad + ERROR_400_RESPONSE + else + G.logger.error "Read error: #{e.inspect}" + G.logger.error e.backtrace.join("\n") + ERROR_500_RESPONSE + end + write(msg) + end + + def tmpio + io = Util.tmpio + def io.size + # already sync=true at creation, so no need to flush before stat + stat.size + end + io + end + + # TeeInput doesn't map too well to this right now... + def on_read(data) + case @state + when :headers + @hp.headers(@env, @buf << data) or return + @state = :body + len = @hp.content_length + if len == 0 + @input = HttpRequest::NULL_IO + app_call # common case + else # nil or len > 0 + # since we don't do streaming input, we have no choice but + # to take over 100-continue handling from the Rack application + if @env[HTTP_EXPECT] =~ /\A100-continue\z/i + write(EXPECT_100_RESPONSE) + @env.delete(HTTP_EXPECT) + end + @input = len && len <= MAX_BODY ? StringIO.new("") : tmpio + @hp.filter_body(@buf2 = @buf.dup, @buf) + @input << @buf2 + on_read("") + end + when :body + if @hp.body_eof? + @state = :trailers + on_read(data) + elsif data.size > 0 + @hp.filter_body(@buf2, @buf << data) + @input << @buf2 + on_read("") + end + when :trailers + @hp.trailers(@env, @buf << data) and app_call + end + rescue Object => e + handle_error(e) + end + + end +end diff --git a/lib/rainbows/event_machine.rb b/lib/rainbows/event_machine.rb index 3a7349c..2cc0f15 100644 --- a/lib/rainbows/event_machine.rb +++ b/lib/rainbows/event_machine.rb @@ -1,5 +1,6 @@ # -*- encoding: binary -*- require 'eventmachine' +require 'rainbows/ev_core' module Rainbows @@ -26,45 +27,15 @@ module Rainbows include Base class Client < EM::Connection - include Unicorn - include Rainbows::Const + include Rainbows::EvCore G = Rainbows::G def initialize(io) @_io = io end - def post_init - @remote_addr = ::TCPSocket === @_io ? @_io.peeraddr.last : LOCALHOST - @env = {} - @hp = HttpParser.new - @state = :headers # [ :body [ :trailers ] ] :app_call :close - @buf = "" - @deferred_bodies = [] # for (fast) regular files only - end - - # graceful exit, like SIGQUIT - def quit - @deferred_bodies.clear - @state = :close - end - alias write send_data - - def handle_error(e) - quit - msg = case e - when EOFError,Errno::ECONNRESET,Errno::EPIPE,Errno::EINVAL,Errno::EBADF - ERROR_500_RESPONSE - when HttpParserError # try to tell the client they're bad - ERROR_400_RESPONSE - else - G.logger.error "Read error: #{e.inspect}" - G.logger.error e.backtrace.join("\n") - ERROR_500_RESPONSE - end - write(msg) - end + alias receive_data on_read def app_call begin @@ -108,55 +79,6 @@ module Rainbows end end - def tmpio - io = Util.tmpio - def io.size - # already sync=true at creation, so no need to flush before stat - stat.size - end - io - end - - alias on_read receive_data - - # TeeInput doesn't map too well to this right now... - def receive_data(data) - case @state - when :headers - @hp.headers(@env, @buf << data) or return - @state = :body - len = @hp.content_length - if len == 0 - @input = HttpRequest::NULL_IO - app_call # common case - else # nil or len > 0 - # since we don't do streaming input, we have no choice but - # to take over 100-continue handling from the Rack application - if @env[HTTP_EXPECT] =~ /\A100-continue\z/i - write(EXPECT_100_RESPONSE) - @env.delete(HTTP_EXPECT) - end - @input = len && len <= MAX_BODY ? StringIO.new("") : tmpio - @hp.filter_body(@buf2 = @buf.dup, @buf) - @input << @buf2 - on_read("") - end - when :body - if @hp.body_eof? - @state = :trailers - on_read(data) - elsif data.size > 0 - @hp.filter_body(@buf2, @buf << data) - @input << @buf2 - on_read("") - end - when :trailers - @hp.trailers(@env, @buf << data) and app_call - end - rescue Object => e - handle_error(e) - end - end module Server diff --git a/lib/rainbows/rev.rb b/lib/rainbows/rev.rb index d27538c..572b88a 100644 --- a/lib/rainbows/rev.rb +++ b/lib/rainbows/rev.rb @@ -1,5 +1,6 @@ # -*- encoding: binary -*- require 'rev' +require 'rainbows/ev_core' module Rainbows @@ -26,25 +27,13 @@ module Rainbows include Base class Client < ::Rev::IO - include Unicorn - include Rainbows::Const + include Rainbows::EvCore G = Rainbows::G def initialize(io) G.cur += 1 super(io) - @remote_addr = ::TCPSocket === io ? io.peeraddr.last : LOCALHOST - @env = {} - @hp = HttpParser.new - @state = :headers # [ :body [ :trailers ] ] :app_call :close - @buf = "" - @deferred_bodies = [] # for (fast) regular files only - end - - # graceful exit, like SIGQUIT - def quit - @deferred_bodies.clear - @state = :close + post_init end # queued, optional response bodies, it should only be unpollable "fast" @@ -56,21 +45,6 @@ module Rainbows on_write_complete unless @hp.headers? # triggers a write end - def handle_error(e) - quit - msg = case e - when EOFError,Errno::ECONNRESET,Errno::EPIPE,Errno::EINVAL,Errno::EBADF - ERROR_500_RESPONSE - when HttpParserError # try to tell the client they're bad - ERROR_400_RESPONSE - else - G.logger.error "Read error: #{e.inspect}" - G.logger.error e.backtrace.join("\n") - ERROR_500_RESPONSE - end - write(msg) - end - def app_call begin (@env[RACK_INPUT] = @input).rewind @@ -116,53 +90,6 @@ module Rainbows def on_close G.cur -= 1 end - - def tmpio - io = Util.tmpio - def io.size - # already sync=true at creation, so no need to flush before stat - stat.size - end - io - end - - # TeeInput doesn't map too well to this right now... - def on_read(data) - case @state - when :headers - @hp.headers(@env, @buf << data) or return - @state = :body - len = @hp.content_length - if len == 0 - @input = HttpRequest::NULL_IO - app_call # common case - else # nil or len > 0 - # since we don't do streaming input, we have no choice but - # to take over 100-continue handling from the Rack application - if @env[HTTP_EXPECT] =~ /\A100-continue\z/i - write(EXPECT_100_RESPONSE) - @env.delete(HTTP_EXPECT) - end - @input = len && len <= MAX_BODY ? StringIO.new("") : tmpio - @hp.filter_body(@buf2 = @buf.dup, @buf) - @input << @buf2 - on_read("") - end - when :body - if @hp.body_eof? - @state = :trailers - on_read(data) - elsif data.size > 0 - @hp.filter_body(@buf2, @buf << data) - @input << @buf2 - on_read("") - end - when :trailers - @hp.trailers(@env, @buf << data) and app_call - end - rescue Object => e - handle_error(e) - end end class Server < ::Rev::IO -- cgit v1.2.3-24-ge0c7