From 53b04c96d38bc6bb5fb3b4874fbf59aae81eb6f0 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 19 Jul 2010 10:09:57 +0000 Subject: rev + em: enable keepalive for pipe/socket responses This makes it easier to write proxies for slow clients that benefit from keep-alive. We also need to be careful about non-HTTP/1.1 connections that can't do keepalive, now. --- lib/rainbows/ev_core.rb | 1 - lib/rainbows/event_machine.rb | 34 ++++++++++++++-------------------- lib/rainbows/rev/client.rb | 15 +++++++-------- lib/rainbows/rev/deferred_response.rb | 2 +- 4 files changed, 22 insertions(+), 30 deletions(-) (limited to 'lib/rainbows') diff --git a/lib/rainbows/ev_core.rb b/lib/rainbows/ev_core.rb index dbcdeba..3e64ff9 100644 --- a/lib/rainbows/ev_core.rb +++ b/lib/rainbows/ev_core.rb @@ -42,7 +42,6 @@ module Rainbows rv = !!(headers['Transfer-Encoding'] =~ %r{\Achunked\z}i) rv = false if headers.delete('X-Rainbows-Autochunk') == 'no' end - headers[CONNECTION] = CLOSE # TODO: allow keep-alive write(response_header(status, headers)) rv end diff --git a/lib/rainbows/event_machine.rb b/lib/rainbows/event_machine.rb index 86cb4eb..d6d41a0 100644 --- a/lib/rainbows/event_machine.rb +++ b/lib/rainbows/event_machine.rb @@ -95,16 +95,14 @@ module Rainbows end while true end - # used for streaming sockets and pipes - def stream_response(status, headers, io) - do_chunk = stream_response_headers(status, headers) if headers - mod = do_chunk ? ResponseChunkPipe : ResponsePipe - EM.watch(io, mod, self).notify_readable = true - end - def em_write_response(response, alive = false) status, headers, body = response - headers = @hp.headers? ? HH.new(headers) : nil if headers + if @hp.headers? + headers = HH.new(headers) + headers[CONNECTION] = alive ? KEEP_ALIVE : CLOSE + else + headers = nil + end @body = body if body.respond_to?(:errback) && body.respond_to?(:callback) @@ -121,23 +119,19 @@ module Rainbows st = io.stat if st.file? - if headers - headers[CONNECTION] = alive ? KEEP_ALIVE : CLOSE - write(response_header(status, headers)) - end + write(response_header(status, headers)) if headers stream = stream_file_data(body.to_path) stream.callback { quit } unless alive return elsif st.socket? || st.pipe? - return stream_response(status, headers, io) + chunk = stream_response_headers(status, headers) if headers + m = chunk ? ResponseChunkPipe : ResponsePipe + return EM.watch(io, m, self, alive).notify_readable = true end # char or block device... WTF? fall through to body.each end - if headers - headers[CONNECTION] = alive ? KEEP_ALIVE : CLOSE - write(response_header(status, headers)) - end + write(response_header(status, headers)) if headers write_body_each(self, body) quit unless alive end @@ -154,8 +148,8 @@ module Rainbows # so a single buffer for all clients will work safely BUF = '' - def initialize(client) - @client = client + def initialize(client, alive) + @client, @alive = client, alive end def notify_readable @@ -172,8 +166,8 @@ module Rainbows end def unbind + @client.quit unless @alive @io.close - @client.quit end end diff --git a/lib/rainbows/rev/client.rb b/lib/rainbows/rev/client.rb index f067d1b..5c61109 100644 --- a/lib/rainbows/rev/client.rb +++ b/lib/rainbows/rev/client.rb @@ -52,6 +52,10 @@ module Rainbows schedule_write end + def next + @deferred_bodies.shift + end + def timeout? @_write_buffer.empty? && @deferred_bodies.empty? and close.nil? end @@ -69,25 +73,20 @@ module Rainbows status, headers, body = response headers = @hp.headers? ? HH.new(headers) : nil + headers[CONNECTION] = alive ? KEEP_ALIVE : CLOSE if headers if body.respond_to?(:to_path) io = body_to_io(body) st = io.stat if st.file? - if headers - headers[CONNECTION] = alive ? KEEP_ALIVE : CLOSE - write(response_header(status, headers)) - end + write(response_header(status, headers)) if headers return defer_body(to_sendfile(io)) elsif st.socket? || st.pipe? return stream_response(status, headers, io, body) end # char or block device... WTF? fall through to body.each end - if headers - headers[CONNECTION] = alive ? KEEP_ALIVE : CLOSE - write(response_header(status, headers)) - end + write(response_header(status, headers)) if headers write_body_each(self, body) end diff --git a/lib/rainbows/rev/deferred_response.rb b/lib/rainbows/rev/deferred_response.rb index de348bb..cc4ea10 100644 --- a/lib/rainbows/rev/deferred_response.rb +++ b/lib/rainbows/rev/deferred_response.rb @@ -20,7 +20,7 @@ module Rainbows def on_close @do_chunk and @client.write("0\r\n\r\n") - @client.quit + @client.next @body.respond_to?(:close) and @body.close end end # class DeferredResponse -- cgit v1.2.3-24-ge0c7