From 20e8d57127f16da8e4242582dee3b99d54cbb729 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Sun, 26 Dec 2010 23:12:18 +0000 Subject: event_machine: split out server and client classes This should make things easier to find --- lib/rainbows/event_machine.rb | 147 +++-------------------------------- lib/rainbows/event_machine/client.rb | 117 ++++++++++++++++++++++++++++ lib/rainbows/event_machine/server.rb | 15 ++++ 3 files changed, 144 insertions(+), 135 deletions(-) create mode 100644 lib/rainbows/event_machine/client.rb create mode 100644 lib/rainbows/event_machine/server.rb diff --git a/lib/rainbows/event_machine.rb b/lib/rainbows/event_machine.rb index a307f4c..b226cab 100644 --- a/lib/rainbows/event_machine.rb +++ b/lib/rainbows/event_machine.rb @@ -41,140 +41,11 @@ EM::VERSION >= '0.12.10' or abort 'eventmachine 0.12.10 is required' # "rack.input" will be fully buffered in memory or to a temporary file # before the application is entered. module Rainbows::EventMachine - - include Rainbows::Base autoload :ResponsePipe, 'rainbows/event_machine/response_pipe' autoload :ResponseChunkPipe, 'rainbows/event_machine/response_chunk_pipe' autoload :TryDefer, 'rainbows/event_machine/try_defer' - class Client < EM::Connection # :nodoc: all - attr_writer :body - include Rainbows::EvCore - - def initialize(io) - @_io = io - @body = nil - end - - alias write send_data - - def receive_data(data) - # To avoid clobbering the current streaming response - # (often a static file), we do not attempt to process another - # request on the same connection until the first is complete - if @body - @buf << data - @_io.shutdown(Socket::SHUT_RD) if @buf.size > 0x1c000 - EM.next_tick { receive_data('') } - else - on_read(data) - end - end - - def quit - super - close_connection_after_writing - end - - def app_call - set_comm_inactivity_timeout 0 - @env[RACK_INPUT] = @input - @env[REMOTE_ADDR] = @_io.kgio_addr - @env[ASYNC_CALLBACK] = method(:em_write_response) - @env[ASYNC_CLOSE] = EM::DefaultDeferrable.new - - response = catch(:async) { APP.call(@env.update(RACK_DEFAULTS)) } - - # too tricky to support pipelining with :async since the - # second (pipelined) request could be a stuck behind a - # long-running async response - (response.nil? || -1 == response[0]) and return @state = :close - - alive = @hp.next? && G.alive && G.kato > 0 - em_write_response(response, alive) - if alive - @state = :headers - if @buf.empty? - set_comm_inactivity_timeout(G.kato) - else - EM.next_tick { receive_data('') } - end - end - end - - def em_write_response(response, alive = false) - status, headers, body = response - if @hp.headers? - headers = HH.new(headers) - headers[CONNECTION] = alive ? KEEP_ALIVE : CLOSE - else - headers = nil - end - - if body.respond_to?(:errback) && body.respond_to?(:callback) - @body = body - body.callback { quit } - body.errback { quit } - # async response, this could be a trickle as is in comet-style apps - headers[CONNECTION] = CLOSE if headers - alive = true - elsif body.respond_to?(:to_path) - st = File.stat(path = body.to_path) - - if st.file? - write(response_header(status, headers)) if headers - @body = stream_file_data(path) - @body.errback do - body.close if body.respond_to?(:close) - quit - end - @body.callback do - body.close if body.respond_to?(:close) - @body = nil - alive ? receive_data('') : quit - end - return - elsif st.socket? || st.pipe? - @body = io = body_to_io(body) - chunk = stream_response_headers(status, headers) if headers - m = chunk ? ResponseChunkPipe : ResponsePipe - return EM.watch(io, m, self, alive, body).notify_readable = true - end - # char or block device... WTF? fall through to body.each - end - - write(response_header(status, headers)) if headers - write_body_each(self, body) - quit unless alive - end - - def unbind - async_close = @env[ASYNC_CLOSE] and async_close.succeed - @body.respond_to?(:fail) and @body.fail - begin - @_io.close - rescue Errno::EBADF - # EventMachine's EventableDescriptor::Close() may close - # the underlying file descriptor without invalidating the - # associated IO object on errors, so @_io.closed? isn't - # sufficient. - end - end - end - - module Server # :nodoc: all - def close - detach - @io.close - end - - def notify_readable - return if CUR.size >= MAX - io = @io.kgio_tryaccept or return - sig = EM.attach_fd(io.fileno, false) - CUR[sig] = CL.new(sig, io) - end - end + include Rainbows::Base def init_worker_process(worker) # :nodoc: Rainbows::Response.setup(Rainbows::EventMachine::Client) @@ -187,21 +58,22 @@ module Rainbows::EventMachine def worker_loop(worker) # :nodoc: init_worker_process(worker) G.server.app.respond_to?(:deferred?) and - G.server.app = TryDefer[G.server.app] + G.server.app = Rainbows::EventMachine::TryDefer[G.server.app] # enable them both, should be non-fatal if not supported EM.epoll EM.kqueue logger.info "#@use: epoll=#{EM.epoll?} kqueue=#{EM.kqueue?}" client_class = Rainbows.const_get(@use).const_get(:Client) - Server.const_set(:MAX, worker_connections + LISTENERS.size) - Server.const_set(:CL, client_class) + max = worker_connections + LISTENERS.size + Rainbows::EventMachine::Server.const_set(:MAX, max) + Rainbows::EventMachine::Server.const_set(:CL, client_class) client_class.const_set(:APP, G.server.app) Rainbows::EvCore.setup EM.run { conns = EM.instance_variable_get(:@conns) or raise RuntimeError, "EM @conns instance variable not accessible!" - Server.const_set(:CUR, conns) + Rainbows::EventMachine::Server.const_set(:CUR, conns) EM.add_periodic_timer(1) do unless G.tick conns.each_value { |c| client_class === c and c.quit } @@ -209,8 +81,13 @@ module Rainbows::EventMachine end end LISTENERS.map! do |s| - EM.watch(s, Server) { |c| c.notify_readable = true } + EM.watch(s, Rainbows::EventMachine::Server) do |c| + c.notify_readable = true + end end } end end +# :enddoc: +require 'rainbows/event_machine/client' +require 'rainbows/event_machine/server' diff --git a/lib/rainbows/event_machine/client.rb b/lib/rainbows/event_machine/client.rb new file mode 100644 index 0000000..fab1dbc --- /dev/null +++ b/lib/rainbows/event_machine/client.rb @@ -0,0 +1,117 @@ +# -*- encoding: binary -*- +# :enddoc: +class Rainbows::EventMachine::Client < EM::Connection + attr_writer :body + include Rainbows::EvCore + + def initialize(io) + @_io = io + @body = nil + end + + alias write send_data + + def receive_data(data) + # To avoid clobbering the current streaming response + # (often a static file), we do not attempt to process another + # request on the same connection until the first is complete + if @body + @buf << data + @_io.shutdown(Socket::SHUT_RD) if @buf.size > 0x1c000 + EM.next_tick { receive_data('') } + else + on_read(data) + end + end + + def quit + super + close_connection_after_writing + end + + def app_call + set_comm_inactivity_timeout 0 + @env[RACK_INPUT] = @input + @env[REMOTE_ADDR] = @_io.kgio_addr + @env[ASYNC_CALLBACK] = method(:em_write_response) + @env[ASYNC_CLOSE] = EM::DefaultDeferrable.new + + response = catch(:async) { APP.call(@env.update(RACK_DEFAULTS)) } + + # too tricky to support pipelining with :async since the + # second (pipelined) request could be a stuck behind a + # long-running async response + (response.nil? || -1 == response[0]) and return @state = :close + + alive = @hp.next? && G.alive && G.kato > 0 + em_write_response(response, alive) + if alive + @state = :headers + if @buf.empty? + set_comm_inactivity_timeout(G.kato) + else + EM.next_tick { receive_data('') } + end + end + end + + def em_write_response(response, alive = false) + status, headers, body = response + if @hp.headers? + headers = HH.new(headers) + headers[CONNECTION] = alive ? KEEP_ALIVE : CLOSE + else + headers = nil + end + + if body.respond_to?(:errback) && body.respond_to?(:callback) + @body = body + body.callback { quit } + body.errback { quit } + # async response, this could be a trickle as is in comet-style apps + headers[CONNECTION] = CLOSE if headers + alive = true + elsif body.respond_to?(:to_path) + st = File.stat(path = body.to_path) + + if st.file? + write(response_header(status, headers)) if headers + @body = stream_file_data(path) + @body.errback do + body.close if body.respond_to?(:close) + quit + end + @body.callback do + body.close if body.respond_to?(:close) + @body = nil + alive ? receive_data('') : quit + end + return + elsif st.socket? || st.pipe? + @body = io = body_to_io(body) + chunk = stream_response_headers(status, headers) if headers + m = chunk ? Rainbows::EventMachine::ResponseChunkPipe : + Rainbows::EventMachine::ResponsePipe + return EM.watch(io, m, self, alive, body).notify_readable = true + end + # char or block device... WTF? fall through to body.each + end + + write(response_header(status, headers)) if headers + write_body_each(self, body) + quit unless alive + end + + def unbind + async_close = @env[ASYNC_CLOSE] and async_close.succeed + @body.respond_to?(:fail) and @body.fail + begin + @_io.close + rescue Errno::EBADF + # EventMachine's EventableDescriptor::Close() may close + # the underlying file descriptor without invalidating the + # associated IO object on errors, so @_io.closed? isn't + # sufficient. + end + end +end diff --git a/lib/rainbows/event_machine/server.rb b/lib/rainbows/event_machine/server.rb new file mode 100644 index 0000000..696bc8b --- /dev/null +++ b/lib/rainbows/event_machine/server.rb @@ -0,0 +1,15 @@ +# -*- encoding: binary -*- +module Rainbows::EventMachine::Server # :nodoc: all + def close + detach + @io.close + end + + # CL, CUR and MAX will be set when worker_loop starts + def notify_readable + return if CUR.size >= MAX + io = @io.kgio_tryaccept or return + sig = EM.attach_fd(io.fileno, false) + CUR[sig] = CL.new(sig, io) + end +end -- cgit v1.2.3-24-ge0c7