about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2009-12-30 01:27:15 -0800
committerEric Wong <normalperson@yhbt.net>2009-12-30 01:53:46 -0800
commit96fbc5e91017c4912169629abc7dbb56cda9082c (patch)
tree763cc51001ef8459e8e65b315740701b15a2b48d
parent44a80df0c4c0d47fd2ac503396cb9accfe770f0a (diff)
downloadrainbows-96fbc5e91017c4912169629abc7dbb56cda9082c.tar.gz
Some async apps rely on more than just "async.callback" and
make full use of Deferrables provided by the EM::Deferrable
module.  Thanks to James Tucker for bringing this to our
attention.
-rw-r--r--lib/rainbows/ev_core.rb2
-rw-r--r--lib/rainbows/event_machine.rb17
-rw-r--r--t/async_examples/README3
-rw-r--r--t/async_examples/async_app.ru126
-rw-r--r--t/async_examples/async_tailer.ru105
-rwxr-xr-xt/t0400-em-async-app.sh57
-rwxr-xr-xt/t0401-em-async-tailer.sh74
7 files changed, 381 insertions, 3 deletions
diff --git a/lib/rainbows/ev_core.rb b/lib/rainbows/ev_core.rb
index 0d3c079..a9c5bfc 100644
--- a/lib/rainbows/ev_core.rb
+++ b/lib/rainbows/ev_core.rb
@@ -11,6 +11,8 @@ module Rainbows
     # Apps may return this Rack response: AsyncResponse = [ -1, {}, [] ]
     ASYNC_CALLBACK = "async.callback".freeze
 
+    ASYNC_CLOSE = "async.close".freeze
+
     def post_init
       @remote_addr = ::TCPSocket === @_io ? @_io.peeraddr.last : LOCALHOST
       @env = {}
diff --git a/lib/rainbows/event_machine.rb b/lib/rainbows/event_machine.rb
index bcc0240..aacdfb5 100644
--- a/lib/rainbows/event_machine.rb
+++ b/lib/rainbows/event_machine.rb
@@ -37,6 +37,7 @@ module Rainbows
 
       def initialize(io)
         @_io = io
+        @body = nil
       end
 
       alias write send_data
@@ -54,6 +55,9 @@ module Rainbows
           @env[REMOTE_ADDR] = @remote_addr
           @env[ASYNC_CALLBACK] = method(:response_write)
 
+          # we're not sure if anybody uses this, but Thin sets it, too
+          @env[ASYNC_CLOSE] = EM::DefaultDeferrable.new
+
           response = catch(:async) { APP.call(@env.update(RACK_DEFAULTS)) }
 
           # too tricky to support pipelining with :async since the
@@ -77,9 +81,14 @@ module Rainbows
         end while true
       end
 
-      def response_write(response, out = [], alive = false)
-        body = response.last
-        unless body.respond_to?(:to_path)
+      def response_write(response, out = [ CONN_CLOSE ], alive = false)
+        @body = body = response.last
+        if body.respond_to?(:errback) && body.respond_to?(:callback)
+          body.callback { quit }
+          body.errback { quit }
+          HttpResponse.write(self, response, out)
+          return
+        elsif ! body.respond_to?(:to_path)
           HttpResponse.write(self, response, out)
           quit unless alive
           return
@@ -120,6 +129,8 @@ module Rainbows
       end
 
       def unbind
+        async_close = @env[ASYNC_CLOSE] and async_close.succeed
+        @body.respond_to?(:fail) and @body.fail
         @_io.close
       end
     end
diff --git a/t/async_examples/README b/t/async_examples/README
new file mode 100644
index 0000000..4023cce
--- /dev/null
+++ b/t/async_examples/README
@@ -0,0 +1,3 @@
+These examples in this directory are stolen from Thin 1.2.5 with only
+trivial changes.  All examples in this directory retain their original
+license (MIT) and copyrights.
diff --git a/t/async_examples/async_app.ru b/t/async_examples/async_app.ru
new file mode 100644
index 0000000..328effb
--- /dev/null
+++ b/t/async_examples/async_app.ru
@@ -0,0 +1,126 @@
+#!/usr/bin/env rackup -s thin
+#
+#  async_app.ru
+#  raggi/thin
+#
+#   A second demo app for async rack + thin app processing!
+#   Now using http status code 100 instead.
+#
+#  Created by James Tucker on 2008-06-17.
+#  Copyright 2008 James Tucker <raggi@rubyforge.org>.
+#
+#--
+# Benchmark Results:
+#
+# raggi@mbk:~$ ab -c 100 -n 500 http://127.0.0.1:3000/
+# This is ApacheBench, Version 2.0.40-dev <$Revision: 1.146 $> apache-2.0
+# Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
+# Copyright 2006 The Apache Software Foundation, http://www.apache.org/
+#
+# Benchmarking 127.0.0.1 (be patient)
+# Completed 100 requests
+# Completed 200 requests
+# Completed 300 requests
+# Completed 400 requests
+# Finished 500 requests
+#
+#
+# Server Software:        thin
+# Server Hostname:        127.0.0.1
+# Server Port:            3000
+#
+# Document Path:          /
+# Document Length:        12 bytes
+#
+# Concurrency Level:      100
+# Time taken for tests:   5.263089 seconds
+# Complete requests:      500
+# Failed requests:        0
+# Write errors:           0
+# Total transferred:      47000 bytes
+# HTML transferred:       6000 bytes
+# Requests per second:    95.00 [#/sec] (mean)
+# Time per request:       1052.618 [ms] (mean)
+# Time per request:       10.526 [ms] (mean, across all concurrent requests)
+# Transfer rate:          8.55 [Kbytes/sec] received
+#
+# Connection Times (ms)
+#               min  mean[+/-sd] median   max
+# Connect:        0    3   2.2      3       8
+# Processing:  1042 1046   3.1   1046    1053
+# Waiting:     1037 1042   3.6   1041    1050
+# Total:       1045 1049   3.1   1049    1057
+#
+# Percentage of the requests served within a certain time (ms)
+#   50%   1049
+#   66%   1051
+#   75%   1053
+#   80%   1053
+#   90%   1054
+#   95%   1054
+#   98%   1056
+#   99%   1057
+#  100%   1057 (longest request)
+
+class DeferrableBody
+  include EventMachine::Deferrable
+
+  def call(body)
+    body.each do |chunk|
+      @body_callback.call(chunk)
+    end
+  end
+
+  def each &blk
+    @body_callback = blk
+  end
+
+end
+
+class AsyncApp
+
+  # This is a template async response. N.B. Can't use string for body on 1.9
+  AsyncResponse = [-1, {}, []].freeze
+
+  def call(env)
+
+    body = DeferrableBody.new
+
+    # Get the headers out there asap, let the client know we're alive...
+    EventMachine::next_tick { env['async.callback'].call [200, {'Content-Type' => 'text/plain'}, body] }
+
+    # Semi-emulate a long db request, instead of a timer, in reality we'd be
+    # waiting for the response data. Whilst this happens, other connections
+    # can be serviced.
+    # This could be any callback based thing though, a deferrable waiting on
+    # IO data, a db request, an http request, an smtp send, whatever.
+    EventMachine::add_timer(1) {
+      body.call ["Woah, async!\n"]
+
+      EventMachine::next_tick {
+        # This could actually happen any time, you could spawn off to new
+        # threads, pause as a good looking lady walks by, whatever.
+        # Just shows off how we can defer chunks of data in the body, you can
+        # even call this many times.
+        body.call ["Cheers then!"]
+        body.succeed
+      }
+    }
+
+    # throw :async # Still works for supporting non-async frameworks...
+
+    AsyncResponse # May end up in Rack :-)
+  end
+
+end
+
+# The additions to env for async.connection and async.callback absolutely
+# destroy the speed of the request if Lint is doing it's checks on env.
+# It is also important to note that an async response will not pass through
+# any further middleware, as the async response notification has been passed
+# right up to the webserver, and the callback goes directly there too.
+# Middleware could possibly catch :async, and also provide a different
+# async.connection and async.callback.
+
+# use Rack::Lint
+run AsyncApp.new
diff --git a/t/async_examples/async_tailer.ru b/t/async_examples/async_tailer.ru
new file mode 100644
index 0000000..c144720
--- /dev/null
+++ b/t/async_examples/async_tailer.ru
@@ -0,0 +1,105 @@
+#!/usr/bin/env rackup -s thin
+#
+#  async_tailer.ru
+#  raggi/thin
+#
+#  Tested with 150 spawned tails on OS X
+#
+#  Created by James Tucker on 2008-06-18.
+#  Copyright 2008 James Tucker <raggi@rubyforge.org>.
+
+# Uncomment if appropriate for you..
+# EM.epoll
+# EM.kqueue
+
+tail_log_file = ENV["TAIL_LOG_FILE"] or abort "TAIL_LOG_FILE= env must be set"
+unless ::File.file?(tail_log_file) && ::File.readable?(tail_log_file)
+  abort "#{tail_log_file} must be a readable regular file"
+end
+
+class DeferrableBody
+  include EventMachine::Deferrable
+
+  def initialize
+    @queue = []
+    # make sure to flush out the queue before closing the connection
+    callback{
+      until @queue.empty?
+        @queue.shift.each{|chunk| @body_callback.call(chunk) }
+      end
+    }
+  end
+
+  def schedule_dequeue
+    return unless @body_callback
+    EventMachine::next_tick do
+      next unless body = @queue.shift
+      body.each do |chunk|
+        @body_callback.call(chunk)
+      end
+      schedule_dequeue unless @queue.empty?
+    end
+  end
+
+  def call(body)
+    @queue << body
+    schedule_dequeue
+  end
+
+  def each &blk
+    @body_callback = blk
+    schedule_dequeue
+  end
+
+end
+
+module TailRenderer
+  attr_accessor :callback
+
+  def receive_data(data)
+    @callback.call([data])
+  end
+
+  def unbind
+    @callback.succeed
+  end
+end
+
+class AsyncTailer
+
+  AsyncResponse = [-1, {}, []].freeze
+
+  def call(env)
+
+    body = DeferrableBody.new
+
+    EventMachine::next_tick do
+
+      env['async.callback'].call [200, {'Content-Type' => 'text/html'}, body]
+
+      body.call ["<h1>Async Tailer</h1><pre>"]
+
+    end
+
+    EventMachine::popen("tail -f #{ENV["TAIL_LOG_FILE"]}", TailRenderer) do |t|
+
+      t.callback = body
+
+      # If for some reason we 'complete' body, close the tail.
+      body.callback do
+        t.close_connection
+      end
+
+      # If for some reason the client disconnects, close the tail.
+      body.errback do
+        t.close_connection
+      end
+
+    end
+
+    AsyncResponse
+  end
+
+end
+
+run AsyncTailer.new
diff --git a/t/t0400-em-async-app.sh b/t/t0400-em-async-app.sh
new file mode 100755
index 0000000..34da2ad
--- /dev/null
+++ b/t/t0400-em-async-app.sh
@@ -0,0 +1,57 @@
+#!/bin/sh
+nr=${nr-5}
+. ./test-lib.sh
+case $model in
+NeverBlock|EventMachine) ;;
+*)
+        t_info "skipping $T since it's not compatible with $model"
+        exit 0
+        ;;
+esac
+
+t_plan 7 "async_app test for test for EM"
+
+CONFIG_RU=async_examples/async_app.ru
+
+t_begin "setup and start" && {
+        rainbows_setup
+        rtmpfiles a b c curl_err expect
+
+        # this does not does not support Rack::Lint
+        rainbows -E deployment -D $CONFIG_RU -c $unicorn_config
+        rainbows_wait_start
+}
+
+t_begin "send async requests off in parallel" && {
+        t0=$(date +%s)
+        curl --no-buffer -sSf http://$listen/ > $a 2>> $curl_err &
+        curl --no-buffer -sSf http://$listen/ > $b 2>> $curl_err &
+        curl --no-buffer -sSf http://$listen/ > $c 2>> $curl_err &
+}
+
+t_begin "wait for curl terminations" && {
+        wait
+        t1=$(date +%s)
+        elapsed=$(( $t1 - $t0 ))
+        t_info "elapsed=$elapsed"
+}
+
+t_begin "termination signal sent" && {
+        kill $rainbows_pid
+}
+
+t_begin "no errors from curl" && {
+        test ! -s $curl_err
+}
+
+t_begin "no errors in stderr" && check_stderr
+
+t_begin "responses match expected" && {
+        echo 'Woah, async!' > $expect
+        printf 'Cheers then!' >> $expect
+        cmp $expect $a
+        cmp $expect $b
+        cmp $expect $c
+}
+
+t_done
diff --git a/t/t0401-em-async-tailer.sh b/t/t0401-em-async-tailer.sh
new file mode 100755
index 0000000..cd3bd31
--- /dev/null
+++ b/t/t0401-em-async-tailer.sh
@@ -0,0 +1,74 @@
+#!/bin/sh
+nr=${nr-5}
+. ./test-lib.sh
+case $model in
+NeverBlock|EventMachine) ;;
+*)
+        t_info "skipping $T since it's not compatible with $model"
+        exit 0
+        ;;
+esac
+
+t_plan 8 "async_tailer test for test for EM"
+
+CONFIG_RU=async_examples/async_tailer.ru
+
+t_begin "setup and start" && {
+        rainbows_setup
+        rtmpfiles a b c curl_err TAIL_LOG_FILE expect
+
+        printf '<h1>Async Tailer</h1><pre>' >> $expect
+
+        export TAIL_LOG_FILE
+
+        # this does not does not support Rack::Lint
+        rainbows -E deployment -D $CONFIG_RU -c $unicorn_config
+        rainbows_wait_start
+}
+
+t_begin "send async requests off in parallel" && {
+        t0=$(date +%s)
+        curl --no-buffer -sSf http://$listen/ > $a 2>> $curl_err &
+        curl_a=$!
+        curl --no-buffer -sSf http://$listen/ > $b 2>> $curl_err &
+        curl_b=$!
+        curl --no-buffer -sSf http://$listen/ > $c 2>> $curl_err &
+        curl_c=$!
+}
+
+t_begin "generate log output" && {
+        for i in $(awk "BEGIN {for(i=0;i<$nr;i++) print i}" < /dev/null)
+        do
+                date >> $TAIL_LOG_FILE
+                sleep 1
+        done
+        # sometimes tail(1) can be slow
+        sleep 2
+}
+
+t_begin "kill curls and wait for termination" && {
+        kill $curl_a $curl_b $curl_c
+        wait
+        t1=$(date +%s)
+        elapsed=$(( $t1 - $t0 ))
+        t_info "elapsed=$elapsed"
+}
+
+t_begin "termination signal sent" && {
+        kill $rainbows_pid
+}
+
+t_begin "no errors from curl" && {
+        test ! -s $curl_err
+}
+
+t_begin "no errors in stderr" && check_stderr
+
+t_begin "responses match expected" && {
+        cat $TAIL_LOG_FILE >> $expect
+        cmp $expect $a
+        cmp $expect $b
+        cmp $expect $c
+}
+
+t_done