about summary refs log tree commit homepage
path: root/lib/rainbows/event_machine/client.rb
diff options
context:
space:
mode:
Diffstat (limited to 'lib/rainbows/event_machine/client.rb')
-rw-r--r--lib/rainbows/event_machine/client.rb13
1 files changed, 9 insertions, 4 deletions
diff --git a/lib/rainbows/event_machine/client.rb b/lib/rainbows/event_machine/client.rb
index 26f0dbd..9871c09 100644
--- a/lib/rainbows/event_machine/client.rb
+++ b/lib/rainbows/event_machine/client.rb
@@ -10,6 +10,7 @@ class Rainbows::EventMachine::Client < EM::Connection
   end
 
   alias write send_data
+  alias hijacked detach
 
   def receive_data(data)
     # To avoid clobbering the current streaming response
@@ -37,9 +38,11 @@ class Rainbows::EventMachine::Client < EM::Connection
     @env[REMOTE_ADDR] = @_io.kgio_addr
     @env[ASYNC_CALLBACK] = method(:write_async_response)
     @env[ASYNC_CLOSE] = EM::DefaultDeferrable.new
+    @hp.hijack_setup(@env, @_io)
     status, headers, body = catch(:async) {
       APP.call(@env.merge!(RACK_DEFAULTS))
     }
+    return hijacked if @hp.hijacked?
 
     if (nil == status || -1 == status)
       @deferred = true
@@ -67,8 +70,8 @@ class Rainbows::EventMachine::Client < EM::Connection
   def ev_write_response(status, headers, body, alive)
     @state = :headers if alive
     if body.respond_to?(:errback) && body.respond_to?(:callback)
+      write_headers(status, headers, alive, body) or return hijacked
       @deferred = body
-      write_headers(status, headers, alive)
       write_body_each(body)
       deferred_errback(body)
       deferred_callback(body, alive)
@@ -77,21 +80,22 @@ class Rainbows::EventMachine::Client < EM::Connection
       st = File.stat(path = body.to_path)
 
       if st.file?
-        write_headers(status, headers, alive)
+        write_headers(status, headers, alive, body) or return hijacked
         @deferred = stream_file_data(path)
         deferred_errback(body)
         deferred_callback(body, alive)
         return
       elsif st.socket? || st.pipe?
+        chunk = stream_response_headers(status, headers, alive, body)
+        return hijacked if nil == chunk
         io = body_to_io(@deferred = body)
-        chunk = stream_response_headers(status, headers, alive)
         m = chunk ? Rainbows::EventMachine::ResponseChunkPipe :
                     Rainbows::EventMachine::ResponsePipe
         return EM.watch(io, m, self).notify_readable = true
       end
       # char or block device... WTF? fall through to body.each
     end
-    write_response(status, headers, body, alive)
+    write_response(status, headers, body, alive) or return hijacked
     if alive
       if @deferred.nil?
         if @buf.empty?
@@ -112,6 +116,7 @@ class Rainbows::EventMachine::Client < EM::Connection
   end
 
   def unbind
+    return if @hp.hijacked?
     async_close = @env[ASYNC_CLOSE] and async_close.succeed
     @deferred.respond_to?(:fail) and @deferred.fail
     begin