about summary refs log tree commit homepage
path: root/lib/rainbows/event_machine.rb
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2010-07-27 06:36:07 +0000
committerEric Wong <normalperson@yhbt.net>2010-07-27 07:00:55 +0000
commit71ecfee987f13ba447abe97cac14274f38ff70f4 (patch)
tree9c255960dca3fa1440ff024f85fec5ce93142687 /lib/rainbows/event_machine.rb
parent052e2b3608071d9cd9d6b1d12f8cb69ac29124af (diff)
downloadrainbows-71ecfee987f13ba447abe97cac14274f38ff70f4.tar.gz
EM::FileStreamer writes may be intermingled with the headers
in the subsequent response if we enable processing of the
second pipelined response right away, so wait until the
first response is complete before hitting the second one.

This also avoids potential deep stack recursion in the unlikely
case where too many requests are pipelined.
Diffstat (limited to 'lib/rainbows/event_machine.rb')
-rw-r--r--lib/rainbows/event_machine.rb71
1 files changed, 39 insertions, 32 deletions
diff --git a/lib/rainbows/event_machine.rb b/lib/rainbows/event_machine.rb
index 4402c72..c290a07 100644
--- a/lib/rainbows/event_machine.rb
+++ b/lib/rainbows/event_machine.rb
@@ -52,6 +52,7 @@ module Rainbows
     autoload :TryDefer, 'rainbows/event_machine/try_defer'
 
     class Client < EM::Connection # :nodoc: all
+      attr_writer :body
       include Rainbows::EvCore
       G = Rainbows::G
 
@@ -69,33 +70,35 @@ module Rainbows
       end
 
       def app_call
+        # To avoid clobbering the current streaming response
+        # (often a static file), we do not attempt to process another
+        # request on the same connection until the first is complete
+        return EM.next_tick { app_call } if @body
+
         set_comm_inactivity_timeout 0
-        begin
-          @env[RACK_INPUT] = @input
-          @env[REMOTE_ADDR] = @remote_addr
-          @env[ASYNC_CALLBACK] = method(:em_write_response)
-
-          # we're not sure if anybody uses this, but Thin sets it, too
-          @env[ASYNC_CLOSE] = EM::DefaultDeferrable.new
-
-          response = catch(:async) { APP.call(@env.update(RACK_DEFAULTS)) }
-
-          # too tricky to support pipelining with :async since the
-          # second (pipelined) request could be a stuck behind a
-          # long-running async response
-          (response.nil? || -1 == response[0]) and return @state = :close
-
-          em_write_response(response, alive = @hp.keepalive? && G.alive)
-          if alive
-            @env.clear
-            @hp.reset
-            @state = :headers
-            # keepalive requests are always body-less, so @input is unchanged
-            @hp.headers(@env, @buf) and next
-            set_comm_inactivity_timeout G.kato
+        @env[RACK_INPUT] = @input
+        @env[REMOTE_ADDR] = @remote_addr
+        @env[ASYNC_CALLBACK] = method(:em_write_response)
+        @env[ASYNC_CLOSE] = EM::DefaultDeferrable.new
+
+        response = catch(:async) { APP.call(@env.update(RACK_DEFAULTS)) }
+
+        # too tricky to support pipelining with :async since the
+        # second (pipelined) request could be a stuck behind a
+        # long-running async response
+        (response.nil? || -1 == response[0]) and return @state = :close
+
+        em_write_response(response, alive = @hp.keepalive? && G.alive)
+        if alive
+          @env.clear
+          @hp.reset
+          @state = :headers
+          if @body.nil? && @hp.headers(@env, @buf)
+            EM.next_tick { on_read('') }
+          else
+            set_comm_inactivity_timeout(G.kato)
           end
-          return
-        end while true
+        end
       end
 
       def em_write_response(response, alive = false)
@@ -118,16 +121,20 @@ module Rainbows
           st = File.stat(path = body.to_path)
 
           if st.file?
-            cb = lambda do
+            write(response_header(status, headers)) if headers
+            @body = stream_file_data(path)
+            @body.errback do
               body.close if body.respond_to?(:close)
-              quit unless alive
+              quit
             end
-            write(response_header(status, headers)) if headers
-            @body = stream = stream_file_data(path)
-            stream.errback(&cb)
-            return stream.callback(&cb)
+            @body.callback do
+              body.close if body.respond_to?(:close)
+              @body = nil
+              alive ? on_read('') : quit
+            end
+            return
           elsif st.socket? || st.pipe?
-            io = body_to_io(body)
+            @body = io = body_to_io(body)
             chunk = stream_response_headers(status, headers) if headers
             m = chunk ? ResponseChunkPipe : ResponsePipe
             return EM.watch(io, m, self, alive, body).notify_readable = true