about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2011-01-05 18:01:36 -0800
committerEric Wong <normalperson@yhbt.net>2011-01-06 19:50:34 -0800
commit370fb8c7811704ed65384f599b52ac1b6d0c36c9 (patch)
treeff5024a3d1f507c6e88801f0c8f0c6e154bfe1e2
parent2cb26ba8084cd37996330616b885de1c780d848e (diff)
downloadrainbows-370fb8c7811704ed65384f599b52ac1b6d0c36c9.tar.gz
async.callback will be useful with Coolio (and more!) soon, so
ensure it works as well as the rest of Rainbows!
-rw-r--r--lib/rainbows/ev_core.rb10
-rw-r--r--lib/rainbows/event_machine/client.rb13
-rw-r--r--lib/rainbows/event_machine/response.rb44
-rw-r--r--t/async_chunk_app.ru44
-rw-r--r--t/t0402-em-async-keepalive.sh86
5 files changed, 171 insertions, 26 deletions
diff --git a/lib/rainbows/ev_core.rb b/lib/rainbows/ev_core.rb
index 013df39..23505d3 100644
--- a/lib/rainbows/ev_core.rb
+++ b/lib/rainbows/ev_core.rb
@@ -13,7 +13,15 @@ module Rainbows::EvCore
 
   def write_async_response(response)
     status, headers, body = response
-    write_response(status, headers, body, false)
+    if alive = @hp.next?
+      # we can't do HTTP keepalive without Content-Length or
+      # "Transfer-Encoding: chunked", and the async.callback stuff
+      # isn't Rack::Lint-compatible, so we have to enforce it here.
+      headers = Rack::Utils::HeaderHash.new(headers) unless Hash === headers
+      alive = headers.include?("Content-Length") ||
+              !!(%r{\Achunked\z}i =~ headers["Transfer-Encoding"])
+    end
+    write_response(status, headers, body, alive)
   end
 
   ASYNC_CLOSE = "async.close".freeze
diff --git a/lib/rainbows/event_machine/client.rb b/lib/rainbows/event_machine/client.rb
index 5abdc3b..22e5360 100644
--- a/lib/rainbows/event_machine/client.rb
+++ b/lib/rainbows/event_machine/client.rb
@@ -48,18 +48,7 @@ class Rainbows::EventMachine::Client < EM::Connection
     # second (pipelined) request could be a stuck behind a
     # long-running async response
     (status.nil? || -1 == status) and return @state = :close
-
-    if @hp.next?
-      @state = :headers
-      write_response(status, headers, body, true)
-      if @buf.empty?
-        set_comm_inactivity_timeout(Rainbows.keepalive_timeout)
-      elsif @body.nil?
-        EM.next_tick { receive_data(nil) }
-      end
-    else
-      write_response(status, headers, body, false)
-    end
+    write_response(status, headers, body, @hp.next?)
   end
 
   def next!
diff --git a/lib/rainbows/event_machine/response.rb b/lib/rainbows/event_machine/response.rb
index 49bcbd5..7b88261 100644
--- a/lib/rainbows/event_machine/response.rb
+++ b/lib/rainbows/event_machine/response.rb
@@ -1,27 +1,35 @@
 # -*- encoding: binary -*-
 # :enddoc:
 module Rainbows::EventMachine::Response
+  def deferred_errback(orig_body)
+    @body.errback do
+      orig_body.close if orig_body.respond_to?(:close)
+      quit
+    end
+  end
+
+  def deferred_callback(orig_body, alive)
+    @body.callback do
+      orig_body.close if orig_body.respond_to?(:close)
+      @body = nil
+      alive ? receive_data(nil) : quit
+    end
+  end
+
   def write_response(status, headers, body, alive)
+    @state = :headers if alive
     if body.respond_to?(:errback) && body.respond_to?(:callback)
       @body = body
-      body.callback { quit }
-      body.errback { quit }
-      alive = true
+      deferred_errback(body)
+      deferred_callback(body, alive)
     elsif body.respond_to?(:to_path)
       st = File.stat(path = body.to_path)
 
       if st.file?
         write_headers(status, headers, alive)
         @body = stream_file_data(path)
-        @body.errback do
-          body.close if body.respond_to?(:close)
-          quit
-        end
-        @body.callback do
-          body.close if body.respond_to?(:close)
-          @body = nil
-          alive ? receive_data(nil) : quit
-        end
+        deferred_errback(body)
+        deferred_callback(body, alive)
         return
       elsif st.socket? || st.pipe?
         io = body_to_io(@body = body)
@@ -33,6 +41,16 @@ module Rainbows::EventMachine::Response
       # char or block device... WTF? fall through to body.each
     end
     super(status, headers, body, alive)
-    quit unless alive
+    if alive
+      if @body.nil?
+        if @buf.empty?
+          set_comm_inactivity_timeout(Rainbows.keepalive_timeout)
+        else
+          EM.next_tick { receive_data(nil) }
+        end
+      end
+    else
+      quit unless @body
+    end
   end
 end
diff --git a/t/async_chunk_app.ru b/t/async_chunk_app.ru
new file mode 100644
index 0000000..26b9915
--- /dev/null
+++ b/t/async_chunk_app.ru
@@ -0,0 +1,44 @@
+# based on async_examples/async_app.ru by James Tucker
+class DeferrableChunkBody
+  include EventMachine::Deferrable
+
+  def call(*body)
+    body.each do |chunk|
+      @body_callback.call("#{chunk.size.to_s(16)}\r\n")
+      @body_callback.call(chunk)
+      @body_callback.call("\r\n")
+    end
+  end
+
+  def each(&block)
+    @body_callback = block
+  end
+
+  def finish
+    @body_callback.call("0\r\n\r\n")
+  end
+end
+
+class AsyncChunkApp
+  def call(env)
+    body = DeferrableChunkBody.new
+    body.callback { body.finish }
+    headers = {
+      'Content-Type' => 'text/plain',
+      'Transfer-Encoding' => 'chunked',
+    }
+    EM.next_tick {
+      env['async.callback'].call([ 200, headers, body ])
+    }
+    EM.add_timer(1) {
+      body.call "Hello "
+
+      EM.add_timer(1) {
+        body.call "World #{env['PATH_INFO']}\n"
+        body.succeed
+      }
+    }
+    nil
+  end
+end
+run AsyncChunkApp.new
diff --git a/t/t0402-em-async-keepalive.sh b/t/t0402-em-async-keepalive.sh
new file mode 100644
index 0000000..24eb678
--- /dev/null
+++ b/t/t0402-em-async-keepalive.sh
@@ -0,0 +1,86 @@
+#!/bin/sh
+. ./test-lib.sh
+case $model in
+NeverBlock|EventMachine) ;;
+*)
+        t_info "skipping $T since it's not compatible with $model"
+        exit 0
+        ;;
+esac
+
+t_plan 9 "async_chunk_app test for test for EM"
+
+CONFIG_RU=async_chunk_app.ru
+
+t_begin "setup and start" && {
+        rainbows_setup
+        rtmpfiles a b c curl_err expect
+
+        # this does not does not support Rack::Lint
+        rainbows -E none -D $CONFIG_RU -c $unicorn_config
+        rainbows_wait_start
+
+        echo 'Hello World /0' >> $expect
+        echo 'Hello World /1' >> $expect
+        echo 'Hello World /2' >> $expect
+}
+
+t_begin "async.callback supports pipelining" && {
+        rm -f $tmp
+        t0=$(date +%s)
+        (
+                cat $fifo > $tmp &
+                printf 'GET /0 HTTP/1.1\r\nHost: example.com\r\n\r\n'
+                printf 'GET /1 HTTP/1.1\r\nHost: example.com\r\n\r\n'
+                printf 'GET /2 HTTP/1.0\r\nHost: example.com\r\n\r\n'
+                wait
+        ) | socat - TCP:$listen > $fifo
+        t1=$(date +%s)
+        elapsed=$(( $t1 - $t0 ))
+        t_info "elapsed=$elapsed $model.$0 ($t_current)"
+        test 3 -eq "$(fgrep 'HTTP/1.1 200 OK' $tmp | wc -l)"
+}
+
+t_begin "async.callback supports keepalive" && {
+        t0=$(date +%s)
+        curl -v --no-buffer -sSf http://$listen/[0-2] > $tmp 2>> $curl_err
+        t1=$(date +%s)
+        elapsed=$(( $t1 - $t0 ))
+        t_info "elapsed=$elapsed $model.$0 ($t_current)"
+        cmp $expect $tmp
+        test 2 -eq "$(fgrep 'Re-using existing connection!' $curl_err |wc -l)"
+        rm -f $curl_err
+}
+
+t_begin "send async requests off in parallel" && {
+        t0=$(date +%s)
+        curl --no-buffer -sSf http://$listen/[0-2] > $a 2>> $curl_err &
+        curl --no-buffer -sSf http://$listen/[0-2] > $b 2>> $curl_err &
+        curl --no-buffer -sSf http://$listen/[0-2] > $c 2>> $curl_err &
+}
+
+t_begin "wait for curl terminations" && {
+        wait
+        t1=$(date +%s)
+        elapsed=$(( $t1 - $t0 ))
+        t_info "elapsed=$elapsed"
+}
+
+t_begin "termination signal sent" && {
+        kill $rainbows_pid
+}
+
+t_begin "no errors from curl" && {
+        test ! -s $curl_err
+}
+
+t_begin "no errors in stderr" && check_stderr
+
+t_begin "responses match expected" && {
+        cmp $expect $a
+        cmp $expect $b
+        cmp $expect $c
+}
+
+t_done
+