diff options
author | Eric Wong <normalperson@yhbt.net> | 2010-12-26 23:12:18 +0000 |
---|---|---|
committer | Eric Wong <normalperson@yhbt.net> | 2010-12-26 23:12:18 +0000 |
commit | 20e8d57127f16da8e4242582dee3b99d54cbb729 (patch) | |
tree | 750fb8d19e7fc839b1a28de241e4aeee8698b1e7 /lib/rainbows/event_machine.rb | |
parent | a50c9d312b9d5274a95f2816b5f53a3738d0cb92 (diff) | |
download | rainbows-20e8d57127f16da8e4242582dee3b99d54cbb729.tar.gz |
This should make things easier to find
Diffstat (limited to 'lib/rainbows/event_machine.rb')
-rw-r--r-- | lib/rainbows/event_machine.rb | 147 |
1 files changed, 12 insertions, 135 deletions
diff --git a/lib/rainbows/event_machine.rb b/lib/rainbows/event_machine.rb index a307f4c..b226cab 100644 --- a/lib/rainbows/event_machine.rb +++ b/lib/rainbows/event_machine.rb @@ -41,140 +41,11 @@ EM::VERSION >= '0.12.10' or abort 'eventmachine 0.12.10 is required' # "rack.input" will be fully buffered in memory or to a temporary file # before the application is entered. module Rainbows::EventMachine - - include Rainbows::Base autoload :ResponsePipe, 'rainbows/event_machine/response_pipe' autoload :ResponseChunkPipe, 'rainbows/event_machine/response_chunk_pipe' autoload :TryDefer, 'rainbows/event_machine/try_defer' - class Client < EM::Connection # :nodoc: all - attr_writer :body - include Rainbows::EvCore - - def initialize(io) - @_io = io - @body = nil - end - - alias write send_data - - def receive_data(data) - # To avoid clobbering the current streaming response - # (often a static file), we do not attempt to process another - # request on the same connection until the first is complete - if @body - @buf << data - @_io.shutdown(Socket::SHUT_RD) if @buf.size > 0x1c000 - EM.next_tick { receive_data('') } - else - on_read(data) - end - end - - def quit - super - close_connection_after_writing - end - - def app_call - set_comm_inactivity_timeout 0 - @env[RACK_INPUT] = @input - @env[REMOTE_ADDR] = @_io.kgio_addr - @env[ASYNC_CALLBACK] = method(:em_write_response) - @env[ASYNC_CLOSE] = EM::DefaultDeferrable.new - - response = catch(:async) { APP.call(@env.update(RACK_DEFAULTS)) } - - # too tricky to support pipelining with :async since the - # second (pipelined) request could be a stuck behind a - # long-running async response - (response.nil? || -1 == response[0]) and return @state = :close - - alive = @hp.next? && G.alive && G.kato > 0 - em_write_response(response, alive) - if alive - @state = :headers - if @buf.empty? - set_comm_inactivity_timeout(G.kato) - else - EM.next_tick { receive_data('') } - end - end - end - - def em_write_response(response, alive = false) - status, headers, body = response - if @hp.headers? - headers = HH.new(headers) - headers[CONNECTION] = alive ? KEEP_ALIVE : CLOSE - else - headers = nil - end - - if body.respond_to?(:errback) && body.respond_to?(:callback) - @body = body - body.callback { quit } - body.errback { quit } - # async response, this could be a trickle as is in comet-style apps - headers[CONNECTION] = CLOSE if headers - alive = true - elsif body.respond_to?(:to_path) - st = File.stat(path = body.to_path) - - if st.file? - write(response_header(status, headers)) if headers - @body = stream_file_data(path) - @body.errback do - body.close if body.respond_to?(:close) - quit - end - @body.callback do - body.close if body.respond_to?(:close) - @body = nil - alive ? receive_data('') : quit - end - return - elsif st.socket? || st.pipe? - @body = io = body_to_io(body) - chunk = stream_response_headers(status, headers) if headers - m = chunk ? ResponseChunkPipe : ResponsePipe - return EM.watch(io, m, self, alive, body).notify_readable = true - end - # char or block device... WTF? fall through to body.each - end - - write(response_header(status, headers)) if headers - write_body_each(self, body) - quit unless alive - end - - def unbind - async_close = @env[ASYNC_CLOSE] and async_close.succeed - @body.respond_to?(:fail) and @body.fail - begin - @_io.close - rescue Errno::EBADF - # EventMachine's EventableDescriptor::Close() may close - # the underlying file descriptor without invalidating the - # associated IO object on errors, so @_io.closed? isn't - # sufficient. - end - end - end - - module Server # :nodoc: all - def close - detach - @io.close - end - - def notify_readable - return if CUR.size >= MAX - io = @io.kgio_tryaccept or return - sig = EM.attach_fd(io.fileno, false) - CUR[sig] = CL.new(sig, io) - end - end + include Rainbows::Base def init_worker_process(worker) # :nodoc: Rainbows::Response.setup(Rainbows::EventMachine::Client) @@ -187,21 +58,22 @@ module Rainbows::EventMachine def worker_loop(worker) # :nodoc: init_worker_process(worker) G.server.app.respond_to?(:deferred?) and - G.server.app = TryDefer[G.server.app] + G.server.app = Rainbows::EventMachine::TryDefer[G.server.app] # enable them both, should be non-fatal if not supported EM.epoll EM.kqueue logger.info "#@use: epoll=#{EM.epoll?} kqueue=#{EM.kqueue?}" client_class = Rainbows.const_get(@use).const_get(:Client) - Server.const_set(:MAX, worker_connections + LISTENERS.size) - Server.const_set(:CL, client_class) + max = worker_connections + LISTENERS.size + Rainbows::EventMachine::Server.const_set(:MAX, max) + Rainbows::EventMachine::Server.const_set(:CL, client_class) client_class.const_set(:APP, G.server.app) Rainbows::EvCore.setup EM.run { conns = EM.instance_variable_get(:@conns) or raise RuntimeError, "EM @conns instance variable not accessible!" - Server.const_set(:CUR, conns) + Rainbows::EventMachine::Server.const_set(:CUR, conns) EM.add_periodic_timer(1) do unless G.tick conns.each_value { |c| client_class === c and c.quit } @@ -209,8 +81,13 @@ module Rainbows::EventMachine end end LISTENERS.map! do |s| - EM.watch(s, Server) { |c| c.notify_readable = true } + EM.watch(s, Rainbows::EventMachine::Server) do |c| + c.notify_readable = true + end end } end end +# :enddoc: +require 'rainbows/event_machine/client' +require 'rainbows/event_machine/server' |