From 96fbc5e91017c4912169629abc7dbb56cda9082c Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Wed, 30 Dec 2009 01:27:15 -0800 Subject: EventMachine: support deferrables in responses Some async apps rely on more than just "async.callback" and make full use of Deferrables provided by the EM::Deferrable module. Thanks to James Tucker for bringing this to our attention. --- lib/rainbows/ev_core.rb | 2 + lib/rainbows/event_machine.rb | 17 +++++- t/async_examples/README | 3 + t/async_examples/async_app.ru | 126 +++++++++++++++++++++++++++++++++++++++ t/async_examples/async_tailer.ru | 105 ++++++++++++++++++++++++++++++++ t/t0400-em-async-app.sh | 57 ++++++++++++++++++ t/t0401-em-async-tailer.sh | 74 +++++++++++++++++++++++ 7 files changed, 381 insertions(+), 3 deletions(-) create mode 100644 t/async_examples/README create mode 100644 t/async_examples/async_app.ru create mode 100644 t/async_examples/async_tailer.ru create mode 100755 t/t0400-em-async-app.sh create mode 100755 t/t0401-em-async-tailer.sh diff --git a/lib/rainbows/ev_core.rb b/lib/rainbows/ev_core.rb index 0d3c079..a9c5bfc 100644 --- a/lib/rainbows/ev_core.rb +++ b/lib/rainbows/ev_core.rb @@ -11,6 +11,8 @@ module Rainbows # Apps may return this Rack response: AsyncResponse = [ -1, {}, [] ] ASYNC_CALLBACK = "async.callback".freeze + ASYNC_CLOSE = "async.close".freeze + def post_init @remote_addr = ::TCPSocket === @_io ? @_io.peeraddr.last : LOCALHOST @env = {} diff --git a/lib/rainbows/event_machine.rb b/lib/rainbows/event_machine.rb index bcc0240..aacdfb5 100644 --- a/lib/rainbows/event_machine.rb +++ b/lib/rainbows/event_machine.rb @@ -37,6 +37,7 @@ module Rainbows def initialize(io) @_io = io + @body = nil end alias write send_data @@ -54,6 +55,9 @@ module Rainbows @env[REMOTE_ADDR] = @remote_addr @env[ASYNC_CALLBACK] = method(:response_write) + # we're not sure if anybody uses this, but Thin sets it, too + @env[ASYNC_CLOSE] = EM::DefaultDeferrable.new + response = catch(:async) { APP.call(@env.update(RACK_DEFAULTS)) } # too tricky to support pipelining with :async since the @@ -77,9 +81,14 @@ module Rainbows end while true end - def response_write(response, out = [], alive = false) - body = response.last - unless body.respond_to?(:to_path) + def response_write(response, out = [ CONN_CLOSE ], alive = false) + @body = body = response.last + if body.respond_to?(:errback) && body.respond_to?(:callback) + body.callback { quit } + body.errback { quit } + HttpResponse.write(self, response, out) + return + elsif ! body.respond_to?(:to_path) HttpResponse.write(self, response, out) quit unless alive return @@ -120,6 +129,8 @@ module Rainbows end def unbind + async_close = @env[ASYNC_CLOSE] and async_close.succeed + @body.respond_to?(:fail) and @body.fail @_io.close end end diff --git a/t/async_examples/README b/t/async_examples/README new file mode 100644 index 0000000..4023cce --- /dev/null +++ b/t/async_examples/README @@ -0,0 +1,3 @@ +These examples in this directory are stolen from Thin 1.2.5 with only +trivial changes. All examples in this directory retain their original +license (MIT) and copyrights. diff --git a/t/async_examples/async_app.ru b/t/async_examples/async_app.ru new file mode 100644 index 0000000..328effb --- /dev/null +++ b/t/async_examples/async_app.ru @@ -0,0 +1,126 @@ +#!/usr/bin/env rackup -s thin +# +# async_app.ru +# raggi/thin +# +# A second demo app for async rack + thin app processing! +# Now using http status code 100 instead. +# +# Created by James Tucker on 2008-06-17. +# Copyright 2008 James Tucker . +# +#-- +# Benchmark Results: +# +# raggi@mbk:~$ ab -c 100 -n 500 http://127.0.0.1:3000/ +# This is ApacheBench, Version 2.0.40-dev <$Revision: 1.146 $> apache-2.0 +# Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/ +# Copyright 2006 The Apache Software Foundation, http://www.apache.org/ +# +# Benchmarking 127.0.0.1 (be patient) +# Completed 100 requests +# Completed 200 requests +# Completed 300 requests +# Completed 400 requests +# Finished 500 requests +# +# +# Server Software: thin +# Server Hostname: 127.0.0.1 +# Server Port: 3000 +# +# Document Path: / +# Document Length: 12 bytes +# +# Concurrency Level: 100 +# Time taken for tests: 5.263089 seconds +# Complete requests: 500 +# Failed requests: 0 +# Write errors: 0 +# Total transferred: 47000 bytes +# HTML transferred: 6000 bytes +# Requests per second: 95.00 [#/sec] (mean) +# Time per request: 1052.618 [ms] (mean) +# Time per request: 10.526 [ms] (mean, across all concurrent requests) +# Transfer rate: 8.55 [Kbytes/sec] received +# +# Connection Times (ms) +# min mean[+/-sd] median max +# Connect: 0 3 2.2 3 8 +# Processing: 1042 1046 3.1 1046 1053 +# Waiting: 1037 1042 3.6 1041 1050 +# Total: 1045 1049 3.1 1049 1057 +# +# Percentage of the requests served within a certain time (ms) +# 50% 1049 +# 66% 1051 +# 75% 1053 +# 80% 1053 +# 90% 1054 +# 95% 1054 +# 98% 1056 +# 99% 1057 +# 100% 1057 (longest request) + +class DeferrableBody + include EventMachine::Deferrable + + def call(body) + body.each do |chunk| + @body_callback.call(chunk) + end + end + + def each &blk + @body_callback = blk + end + +end + +class AsyncApp + + # This is a template async response. N.B. Can't use string for body on 1.9 + AsyncResponse = [-1, {}, []].freeze + + def call(env) + + body = DeferrableBody.new + + # Get the headers out there asap, let the client know we're alive... + EventMachine::next_tick { env['async.callback'].call [200, {'Content-Type' => 'text/plain'}, body] } + + # Semi-emulate a long db request, instead of a timer, in reality we'd be + # waiting for the response data. Whilst this happens, other connections + # can be serviced. + # This could be any callback based thing though, a deferrable waiting on + # IO data, a db request, an http request, an smtp send, whatever. + EventMachine::add_timer(1) { + body.call ["Woah, async!\n"] + + EventMachine::next_tick { + # This could actually happen any time, you could spawn off to new + # threads, pause as a good looking lady walks by, whatever. + # Just shows off how we can defer chunks of data in the body, you can + # even call this many times. + body.call ["Cheers then!"] + body.succeed + } + } + + # throw :async # Still works for supporting non-async frameworks... + + AsyncResponse # May end up in Rack :-) + end + +end + +# The additions to env for async.connection and async.callback absolutely +# destroy the speed of the request if Lint is doing it's checks on env. +# It is also important to note that an async response will not pass through +# any further middleware, as the async response notification has been passed +# right up to the webserver, and the callback goes directly there too. +# Middleware could possibly catch :async, and also provide a different +# async.connection and async.callback. + +# use Rack::Lint +run AsyncApp.new diff --git a/t/async_examples/async_tailer.ru b/t/async_examples/async_tailer.ru new file mode 100644 index 0000000..c144720 --- /dev/null +++ b/t/async_examples/async_tailer.ru @@ -0,0 +1,105 @@ +#!/usr/bin/env rackup -s thin +# +# async_tailer.ru +# raggi/thin +# +# Tested with 150 spawned tails on OS X +# +# Created by James Tucker on 2008-06-18. +# Copyright 2008 James Tucker . + +# Uncomment if appropriate for you.. +# EM.epoll +# EM.kqueue + +tail_log_file = ENV["TAIL_LOG_FILE"] or abort "TAIL_LOG_FILE= env must be set" +unless ::File.file?(tail_log_file) && ::File.readable?(tail_log_file) + abort "#{tail_log_file} must be a readable regular file" +end + +class DeferrableBody + include EventMachine::Deferrable + + def initialize + @queue = [] + # make sure to flush out the queue before closing the connection + callback{ + until @queue.empty? + @queue.shift.each{|chunk| @body_callback.call(chunk) } + end + } + end + + def schedule_dequeue + return unless @body_callback + EventMachine::next_tick do + next unless body = @queue.shift + body.each do |chunk| + @body_callback.call(chunk) + end + schedule_dequeue unless @queue.empty? + end + end + + def call(body) + @queue << body + schedule_dequeue + end + + def each &blk + @body_callback = blk + schedule_dequeue + end + +end + +module TailRenderer + attr_accessor :callback + + def receive_data(data) + @callback.call([data]) + end + + def unbind + @callback.succeed + end +end + +class AsyncTailer + + AsyncResponse = [-1, {}, []].freeze + + def call(env) + + body = DeferrableBody.new + + EventMachine::next_tick do + + env['async.callback'].call [200, {'Content-Type' => 'text/html'}, body] + + body.call ["

Async Tailer

"]
+
+    end
+
+    EventMachine::popen("tail -f #{ENV["TAIL_LOG_FILE"]}", TailRenderer) do |t|
+
+      t.callback = body
+
+      # If for some reason we 'complete' body, close the tail.
+      body.callback do
+        t.close_connection
+      end
+
+      # If for some reason the client disconnects, close the tail.
+      body.errback do
+        t.close_connection
+      end
+
+    end
+
+    AsyncResponse
+  end
+
+end
+
+run AsyncTailer.new
diff --git a/t/t0400-em-async-app.sh b/t/t0400-em-async-app.sh
new file mode 100755
index 0000000..34da2ad
--- /dev/null
+++ b/t/t0400-em-async-app.sh
@@ -0,0 +1,57 @@
+#!/bin/sh
+nr=${nr-5}
+. ./test-lib.sh
+case $model in
+NeverBlock|EventMachine) ;;
+*)
+	t_info "skipping $T since it's not compatible with $model"
+	exit 0
+	;;
+esac
+
+t_plan 7 "async_app test for test for EM"
+
+CONFIG_RU=async_examples/async_app.ru
+
+t_begin "setup and start" && {
+	rainbows_setup
+	rtmpfiles a b c curl_err expect
+
+	# this does not does not support Rack::Lint
+	rainbows -E deployment -D $CONFIG_RU -c $unicorn_config
+	rainbows_wait_start
+}
+
+t_begin "send async requests off in parallel" && {
+	t0=$(date +%s)
+	curl --no-buffer -sSf http://$listen/ > $a 2>> $curl_err &
+	curl --no-buffer -sSf http://$listen/ > $b 2>> $curl_err &
+	curl --no-buffer -sSf http://$listen/ > $c 2>> $curl_err &
+}
+
+t_begin "wait for curl terminations" && {
+	wait
+	t1=$(date +%s)
+	elapsed=$(( $t1 - $t0 ))
+	t_info "elapsed=$elapsed"
+}
+
+t_begin "termination signal sent" && {
+	kill $rainbows_pid
+}
+
+t_begin "no errors from curl" && {
+	test ! -s $curl_err
+}
+
+t_begin "no errors in stderr" && check_stderr
+
+t_begin "responses match expected" && {
+	echo 'Woah, async!' > $expect
+	printf 'Cheers then!' >> $expect
+	cmp $expect $a
+	cmp $expect $b
+	cmp $expect $c
+}
+
+t_done
diff --git a/t/t0401-em-async-tailer.sh b/t/t0401-em-async-tailer.sh
new file mode 100755
index 0000000..cd3bd31
--- /dev/null
+++ b/t/t0401-em-async-tailer.sh
@@ -0,0 +1,74 @@
+#!/bin/sh
+nr=${nr-5}
+. ./test-lib.sh
+case $model in
+NeverBlock|EventMachine) ;;
+*)
+	t_info "skipping $T since it's not compatible with $model"
+	exit 0
+	;;
+esac
+
+t_plan 8 "async_tailer test for test for EM"
+
+CONFIG_RU=async_examples/async_tailer.ru
+
+t_begin "setup and start" && {
+	rainbows_setup
+	rtmpfiles a b c curl_err TAIL_LOG_FILE expect
+
+	printf '

Async Tailer

' >> $expect
+
+	export TAIL_LOG_FILE
+
+	# this does not does not support Rack::Lint
+	rainbows -E deployment -D $CONFIG_RU -c $unicorn_config
+	rainbows_wait_start
+}
+
+t_begin "send async requests off in parallel" && {
+	t0=$(date +%s)
+	curl --no-buffer -sSf http://$listen/ > $a 2>> $curl_err &
+	curl_a=$!
+	curl --no-buffer -sSf http://$listen/ > $b 2>> $curl_err &
+	curl_b=$!
+	curl --no-buffer -sSf http://$listen/ > $c 2>> $curl_err &
+	curl_c=$!
+}
+
+t_begin "generate log output" && {
+	for i in $(awk "BEGIN {for(i=0;i<$nr;i++) print i}" < /dev/null)
+	do
+		date >> $TAIL_LOG_FILE
+		sleep 1
+	done
+	# sometimes tail(1) can be slow
+	sleep 2
+}
+
+t_begin "kill curls and wait for termination" && {
+	kill $curl_a $curl_b $curl_c
+	wait
+	t1=$(date +%s)
+	elapsed=$(( $t1 - $t0 ))
+	t_info "elapsed=$elapsed"
+}
+
+t_begin "termination signal sent" && {
+	kill $rainbows_pid
+}
+
+t_begin "no errors from curl" && {
+	test ! -s $curl_err
+}
+
+t_begin "no errors in stderr" && check_stderr
+
+t_begin "responses match expected" && {
+	cat $TAIL_LOG_FILE >> $expect
+	cmp $expect $a
+	cmp $expect $b
+	cmp $expect $c
+}
+
+t_done
-- 
cgit v1.2.3-24-ge0c7