about summary refs log tree commit homepage
path: root/lib
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2010-12-26 23:12:18 +0000
committerEric Wong <normalperson@yhbt.net>2010-12-26 23:12:18 +0000
commit20e8d57127f16da8e4242582dee3b99d54cbb729 (patch)
tree750fb8d19e7fc839b1a28de241e4aeee8698b1e7 /lib
parenta50c9d312b9d5274a95f2816b5f53a3738d0cb92 (diff)
downloadrainbows-20e8d57127f16da8e4242582dee3b99d54cbb729.tar.gz
This should make things easier to find
Diffstat (limited to 'lib')
-rw-r--r--lib/rainbows/event_machine.rb147
-rw-r--r--lib/rainbows/event_machine/client.rb117
-rw-r--r--lib/rainbows/event_machine/server.rb15
3 files changed, 144 insertions, 135 deletions
diff --git a/lib/rainbows/event_machine.rb b/lib/rainbows/event_machine.rb
index a307f4c..b226cab 100644
--- a/lib/rainbows/event_machine.rb
+++ b/lib/rainbows/event_machine.rb
@@ -41,140 +41,11 @@ EM::VERSION >= '0.12.10' or abort 'eventmachine 0.12.10 is required'
 # "rack.input" will be fully buffered in memory or to a temporary file
 # before the application is entered.
 module Rainbows::EventMachine
-
-  include Rainbows::Base
   autoload :ResponsePipe, 'rainbows/event_machine/response_pipe'
   autoload :ResponseChunkPipe, 'rainbows/event_machine/response_chunk_pipe'
   autoload :TryDefer, 'rainbows/event_machine/try_defer'
 
-  class Client < EM::Connection # :nodoc: all
-    attr_writer :body
-    include Rainbows::EvCore
-
-    def initialize(io)
-      @_io = io
-      @body = nil
-    end
-
-    alias write send_data
-
-    def receive_data(data)
-      # To avoid clobbering the current streaming response
-      # (often a static file), we do not attempt to process another
-      # request on the same connection until the first is complete
-      if @body
-        @buf << data
-        @_io.shutdown(Socket::SHUT_RD) if @buf.size > 0x1c000
-        EM.next_tick { receive_data('') }
-      else
-        on_read(data)
-      end
-    end
-
-    def quit
-      super
-      close_connection_after_writing
-    end
-
-    def app_call
-      set_comm_inactivity_timeout 0
-      @env[RACK_INPUT] = @input
-      @env[REMOTE_ADDR] = @_io.kgio_addr
-      @env[ASYNC_CALLBACK] = method(:em_write_response)
-      @env[ASYNC_CLOSE] = EM::DefaultDeferrable.new
-
-      response = catch(:async) { APP.call(@env.update(RACK_DEFAULTS)) }
-
-      # too tricky to support pipelining with :async since the
-      # second (pipelined) request could be a stuck behind a
-      # long-running async response
-      (response.nil? || -1 == response[0]) and return @state = :close
-
-      alive = @hp.next? && G.alive && G.kato > 0
-      em_write_response(response, alive)
-      if alive
-        @state = :headers
-        if @buf.empty?
-          set_comm_inactivity_timeout(G.kato)
-        else
-          EM.next_tick { receive_data('') }
-        end
-      end
-    end
-
-    def em_write_response(response, alive = false)
-      status, headers, body = response
-      if @hp.headers?
-        headers = HH.new(headers)
-        headers[CONNECTION] = alive ? KEEP_ALIVE : CLOSE
-      else
-        headers = nil
-      end
-
-      if body.respond_to?(:errback) && body.respond_to?(:callback)
-        @body = body
-        body.callback { quit }
-        body.errback { quit }
-        # async response, this could be a trickle as is in comet-style apps
-        headers[CONNECTION] = CLOSE if headers
-        alive = true
-      elsif body.respond_to?(:to_path)
-        st = File.stat(path = body.to_path)
-
-        if st.file?
-          write(response_header(status, headers)) if headers
-          @body = stream_file_data(path)
-          @body.errback do
-            body.close if body.respond_to?(:close)
-            quit
-          end
-          @body.callback do
-            body.close if body.respond_to?(:close)
-            @body = nil
-            alive ? receive_data('') : quit
-          end
-          return
-        elsif st.socket? || st.pipe?
-          @body = io = body_to_io(body)
-          chunk = stream_response_headers(status, headers) if headers
-          m = chunk ? ResponseChunkPipe : ResponsePipe
-          return EM.watch(io, m, self, alive, body).notify_readable = true
-        end
-        # char or block device... WTF? fall through to body.each
-      end
-
-      write(response_header(status, headers)) if headers
-      write_body_each(self, body)
-      quit unless alive
-    end
-
-    def unbind
-      async_close = @env[ASYNC_CLOSE] and async_close.succeed
-      @body.respond_to?(:fail) and @body.fail
-      begin
-        @_io.close
-      rescue Errno::EBADF
-        # EventMachine's EventableDescriptor::Close() may close
-        # the underlying file descriptor without invalidating the
-        # associated IO object on errors, so @_io.closed? isn't
-        # sufficient.
-      end
-    end
-  end
-
-  module Server # :nodoc: all
-    def close
-      detach
-      @io.close
-    end
-
-    def notify_readable
-      return if CUR.size >= MAX
-      io = @io.kgio_tryaccept or return
-      sig = EM.attach_fd(io.fileno, false)
-      CUR[sig] = CL.new(sig, io)
-    end
-  end
+  include Rainbows::Base
 
   def init_worker_process(worker) # :nodoc:
     Rainbows::Response.setup(Rainbows::EventMachine::Client)
@@ -187,21 +58,22 @@ module Rainbows::EventMachine
   def worker_loop(worker) # :nodoc:
     init_worker_process(worker)
     G.server.app.respond_to?(:deferred?) and
-      G.server.app = TryDefer[G.server.app]
+      G.server.app = Rainbows::EventMachine::TryDefer[G.server.app]
 
     # enable them both, should be non-fatal if not supported
     EM.epoll
     EM.kqueue
     logger.info "#@use: epoll=#{EM.epoll?} kqueue=#{EM.kqueue?}"
     client_class = Rainbows.const_get(@use).const_get(:Client)
-    Server.const_set(:MAX, worker_connections + LISTENERS.size)
-    Server.const_set(:CL, client_class)
+    max = worker_connections + LISTENERS.size
+    Rainbows::EventMachine::Server.const_set(:MAX, max)
+    Rainbows::EventMachine::Server.const_set(:CL, client_class)
     client_class.const_set(:APP, G.server.app)
     Rainbows::EvCore.setup
     EM.run {
       conns = EM.instance_variable_get(:@conns) or
         raise RuntimeError, "EM @conns instance variable not accessible!"
-      Server.const_set(:CUR, conns)
+      Rainbows::EventMachine::Server.const_set(:CUR, conns)
       EM.add_periodic_timer(1) do
         unless G.tick
           conns.each_value { |c| client_class === c and c.quit }
@@ -209,8 +81,13 @@ module Rainbows::EventMachine
         end
       end
       LISTENERS.map! do |s|
-        EM.watch(s, Server) { |c| c.notify_readable = true }
+        EM.watch(s, Rainbows::EventMachine::Server) do |c|
+          c.notify_readable = true
+        end
       end
     }
   end
 end
+# :enddoc:
+require 'rainbows/event_machine/client'
+require 'rainbows/event_machine/server'
diff --git a/lib/rainbows/event_machine/client.rb b/lib/rainbows/event_machine/client.rb
new file mode 100644
index 0000000..fab1dbc
--- /dev/null
+++ b/lib/rainbows/event_machine/client.rb
@@ -0,0 +1,117 @@
+# -*- encoding: binary -*-
+# :enddoc:
+class Rainbows::EventMachine::Client < EM::Connection
+  attr_writer :body
+  include Rainbows::EvCore
+
+  def initialize(io)
+    @_io = io
+    @body = nil
+  end
+
+  alias write send_data
+
+  def receive_data(data)
+    # To avoid clobbering the current streaming response
+    # (often a static file), we do not attempt to process another
+    # request on the same connection until the first is complete
+    if @body
+      @buf << data
+      @_io.shutdown(Socket::SHUT_RD) if @buf.size > 0x1c000
+      EM.next_tick { receive_data('') }
+    else
+      on_read(data)
+    end
+  end
+
+  def quit
+    super
+    close_connection_after_writing
+  end
+
+  def app_call
+    set_comm_inactivity_timeout 0
+    @env[RACK_INPUT] = @input
+    @env[REMOTE_ADDR] = @_io.kgio_addr
+    @env[ASYNC_CALLBACK] = method(:em_write_response)
+    @env[ASYNC_CLOSE] = EM::DefaultDeferrable.new
+
+    response = catch(:async) { APP.call(@env.update(RACK_DEFAULTS)) }
+
+    # too tricky to support pipelining with :async since the
+    # second (pipelined) request could be a stuck behind a
+    # long-running async response
+    (response.nil? || -1 == response[0]) and return @state = :close
+
+    alive = @hp.next? && G.alive && G.kato > 0
+    em_write_response(response, alive)
+    if alive
+      @state = :headers
+      if @buf.empty?
+        set_comm_inactivity_timeout(G.kato)
+      else
+        EM.next_tick { receive_data('') }
+      end
+    end
+  end
+
+  def em_write_response(response, alive = false)
+    status, headers, body = response
+    if @hp.headers?
+      headers = HH.new(headers)
+      headers[CONNECTION] = alive ? KEEP_ALIVE : CLOSE
+    else
+      headers = nil
+    end
+
+    if body.respond_to?(:errback) && body.respond_to?(:callback)
+      @body = body
+      body.callback { quit }
+      body.errback { quit }
+      # async response, this could be a trickle as is in comet-style apps
+      headers[CONNECTION] = CLOSE if headers
+      alive = true
+    elsif body.respond_to?(:to_path)
+      st = File.stat(path = body.to_path)
+
+      if st.file?
+        write(response_header(status, headers)) if headers
+        @body = stream_file_data(path)
+        @body.errback do
+          body.close if body.respond_to?(:close)
+          quit
+        end
+        @body.callback do
+          body.close if body.respond_to?(:close)
+          @body = nil
+          alive ? receive_data('') : quit
+        end
+        return
+      elsif st.socket? || st.pipe?
+        @body = io = body_to_io(body)
+        chunk = stream_response_headers(status, headers) if headers
+        m = chunk ? Rainbows::EventMachine::ResponseChunkPipe :
+                    Rainbows::EventMachine::ResponsePipe
+        return EM.watch(io, m, self, alive, body).notify_readable = true
+      end
+      # char or block device... WTF? fall through to body.each
+    end
+
+    write(response_header(status, headers)) if headers
+    write_body_each(self, body)
+    quit unless alive
+  end
+
+  def unbind
+    async_close = @env[ASYNC_CLOSE] and async_close.succeed
+    @body.respond_to?(:fail) and @body.fail
+    begin
+      @_io.close
+    rescue Errno::EBADF
+      # EventMachine's EventableDescriptor::Close() may close
+      # the underlying file descriptor without invalidating the
+      # associated IO object on errors, so @_io.closed? isn't
+      # sufficient.
+    end
+  end
+end
diff --git a/lib/rainbows/event_machine/server.rb b/lib/rainbows/event_machine/server.rb
new file mode 100644
index 0000000..696bc8b
--- /dev/null
+++ b/lib/rainbows/event_machine/server.rb
@@ -0,0 +1,15 @@
+# -*- encoding: binary -*-
+module Rainbows::EventMachine::Server # :nodoc: all
+  def close
+    detach
+    @io.close
+  end
+
+  # CL, CUR and MAX will be set when worker_loop starts
+  def notify_readable
+    return if CUR.size >= MAX
+    io = @io.kgio_tryaccept or return
+    sig = EM.attach_fd(io.fileno, false)
+    CUR[sig] = CL.new(sig, io)
+  end
+end