about summary refs log tree commit homepage
path: root/lib/rainbows/ev_core.rb
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2009-10-24 22:13:23 -0700
committerEric Wong <normalperson@yhbt.net>2009-10-26 02:26:22 -0700
commit344f8cf1e000704d53d7841eb896d83b470d7a08 (patch)
tree90c62aaab04269aa34b9728070d56f5551109722 /lib/rainbows/ev_core.rb
parentbb9939c2397b76628e862c93799fdc57909504f4 (diff)
downloadrainbows-344f8cf1e000704d53d7841eb896d83b470d7a08.tar.gz
EventMachine and Rev models seem to be able to share a lot of
common code, so lets share.  We may support Packet in the
future, too, and end up with a similar programming model there
as well.
Diffstat (limited to 'lib/rainbows/ev_core.rb')
-rw-r--r--lib/rainbows/ev_core.rb89
1 files changed, 89 insertions, 0 deletions
diff --git a/lib/rainbows/ev_core.rb b/lib/rainbows/ev_core.rb
new file mode 100644
index 0000000..aa0b155
--- /dev/null
+++ b/lib/rainbows/ev_core.rb
@@ -0,0 +1,89 @@
+# -*- encoding: binary -*-
+
+module Rainbows
+
+  # base module for evented models like Rev and EventMachine
+  module EvCore
+    include Unicorn
+    include Rainbows::Const
+    G = Rainbows::G
+
+    def post_init
+      @remote_addr = ::TCPSocket === @_io ? @_io.peeraddr.last : LOCALHOST
+      @env = {}
+      @hp = HttpParser.new
+      @state = :headers # [ :body [ :trailers ] ] :app_call :close
+      @buf = ""
+      @deferred_bodies = [] # for (fast) regular files only
+    end
+
+    # graceful exit, like SIGQUIT
+    def quit
+      @deferred_bodies.clear
+      @state = :close
+    end
+
+    def handle_error(e)
+      quit
+      msg = case e
+      when EOFError,Errno::ECONNRESET,Errno::EPIPE,Errno::EINVAL,Errno::EBADF
+        ERROR_500_RESPONSE
+      when HttpParserError # try to tell the client they're bad
+        ERROR_400_RESPONSE
+      else
+        G.logger.error "Read error: #{e.inspect}"
+        G.logger.error e.backtrace.join("\n")
+        ERROR_500_RESPONSE
+      end
+      write(msg)
+    end
+
+    def tmpio
+      io = Util.tmpio
+      def io.size
+        # already sync=true at creation, so no need to flush before stat
+        stat.size
+      end
+      io
+    end
+
+    # TeeInput doesn't map too well to this right now...
+    def on_read(data)
+      case @state
+      when :headers
+        @hp.headers(@env, @buf << data) or return
+        @state = :body
+        len = @hp.content_length
+        if len == 0
+          @input = HttpRequest::NULL_IO
+          app_call # common case
+        else # nil or len > 0
+          # since we don't do streaming input, we have no choice but
+          # to take over 100-continue handling from the Rack application
+          if @env[HTTP_EXPECT] =~ /\A100-continue\z/i
+            write(EXPECT_100_RESPONSE)
+            @env.delete(HTTP_EXPECT)
+          end
+          @input = len && len <= MAX_BODY ? StringIO.new("") : tmpio
+          @hp.filter_body(@buf2 = @buf.dup, @buf)
+          @input << @buf2
+          on_read("")
+        end
+      when :body
+        if @hp.body_eof?
+          @state = :trailers
+          on_read(data)
+        elsif data.size > 0
+          @hp.filter_body(@buf2, @buf << data)
+          @input << @buf2
+          on_read("")
+        end
+      when :trailers
+        @hp.trailers(@env, @buf << data) and app_call
+      end
+      rescue Object => e
+        handle_error(e)
+    end
+
+  end
+end