diff options
author | Eric Wong <normalperson@yhbt.net> | 2009-12-30 01:27:15 -0800 |
---|---|---|
committer | Eric Wong <normalperson@yhbt.net> | 2009-12-30 01:53:46 -0800 |
commit | 96fbc5e91017c4912169629abc7dbb56cda9082c (patch) | |
tree | 763cc51001ef8459e8e65b315740701b15a2b48d | |
parent | 44a80df0c4c0d47fd2ac503396cb9accfe770f0a (diff) | |
download | rainbows-96fbc5e91017c4912169629abc7dbb56cda9082c.tar.gz |
Some async apps rely on more than just "async.callback" and make full use of Deferrables provided by the EM::Deferrable module. Thanks to James Tucker for bringing this to our attention.
-rw-r--r-- | lib/rainbows/ev_core.rb | 2 | ||||
-rw-r--r-- | lib/rainbows/event_machine.rb | 17 | ||||
-rw-r--r-- | t/async_examples/README | 3 | ||||
-rw-r--r-- | t/async_examples/async_app.ru | 126 | ||||
-rw-r--r-- | t/async_examples/async_tailer.ru | 105 | ||||
-rwxr-xr-x | t/t0400-em-async-app.sh | 57 | ||||
-rwxr-xr-x | t/t0401-em-async-tailer.sh | 74 |
7 files changed, 381 insertions, 3 deletions
diff --git a/lib/rainbows/ev_core.rb b/lib/rainbows/ev_core.rb index 0d3c079..a9c5bfc 100644 --- a/lib/rainbows/ev_core.rb +++ b/lib/rainbows/ev_core.rb @@ -11,6 +11,8 @@ module Rainbows # Apps may return this Rack response: AsyncResponse = [ -1, {}, [] ] ASYNC_CALLBACK = "async.callback".freeze + ASYNC_CLOSE = "async.close".freeze + def post_init @remote_addr = ::TCPSocket === @_io ? @_io.peeraddr.last : LOCALHOST @env = {} diff --git a/lib/rainbows/event_machine.rb b/lib/rainbows/event_machine.rb index bcc0240..aacdfb5 100644 --- a/lib/rainbows/event_machine.rb +++ b/lib/rainbows/event_machine.rb @@ -37,6 +37,7 @@ module Rainbows def initialize(io) @_io = io + @body = nil end alias write send_data @@ -54,6 +55,9 @@ module Rainbows @env[REMOTE_ADDR] = @remote_addr @env[ASYNC_CALLBACK] = method(:response_write) + # we're not sure if anybody uses this, but Thin sets it, too + @env[ASYNC_CLOSE] = EM::DefaultDeferrable.new + response = catch(:async) { APP.call(@env.update(RACK_DEFAULTS)) } # too tricky to support pipelining with :async since the @@ -77,9 +81,14 @@ module Rainbows end while true end - def response_write(response, out = [], alive = false) - body = response.last - unless body.respond_to?(:to_path) + def response_write(response, out = [ CONN_CLOSE ], alive = false) + @body = body = response.last + if body.respond_to?(:errback) && body.respond_to?(:callback) + body.callback { quit } + body.errback { quit } + HttpResponse.write(self, response, out) + return + elsif ! body.respond_to?(:to_path) HttpResponse.write(self, response, out) quit unless alive return @@ -120,6 +129,8 @@ module Rainbows end def unbind + async_close = @env[ASYNC_CLOSE] and async_close.succeed + @body.respond_to?(:fail) and @body.fail @_io.close end end diff --git a/t/async_examples/README b/t/async_examples/README new file mode 100644 index 0000000..4023cce --- /dev/null +++ b/t/async_examples/README @@ -0,0 +1,3 @@ +These examples in this directory are stolen from Thin 1.2.5 with only +trivial changes. All examples in this directory retain their original +license (MIT) and copyrights. diff --git a/t/async_examples/async_app.ru b/t/async_examples/async_app.ru new file mode 100644 index 0000000..328effb --- /dev/null +++ b/t/async_examples/async_app.ru @@ -0,0 +1,126 @@ +#!/usr/bin/env rackup -s thin +# +# async_app.ru +# raggi/thin +# +# A second demo app for async rack + thin app processing! +# Now using http status code 100 instead. +# +# Created by James Tucker on 2008-06-17. +# Copyright 2008 James Tucker <raggi@rubyforge.org>. +# +#-- +# Benchmark Results: +# +# raggi@mbk:~$ ab -c 100 -n 500 http://127.0.0.1:3000/ +# This is ApacheBench, Version 2.0.40-dev <$Revision: 1.146 $> apache-2.0 +# Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/ +# Copyright 2006 The Apache Software Foundation, http://www.apache.org/ +# +# Benchmarking 127.0.0.1 (be patient) +# Completed 100 requests +# Completed 200 requests +# Completed 300 requests +# Completed 400 requests +# Finished 500 requests +# +# +# Server Software: thin +# Server Hostname: 127.0.0.1 +# Server Port: 3000 +# +# Document Path: / +# Document Length: 12 bytes +# +# Concurrency Level: 100 +# Time taken for tests: 5.263089 seconds +# Complete requests: 500 +# Failed requests: 0 +# Write errors: 0 +# Total transferred: 47000 bytes +# HTML transferred: 6000 bytes +# Requests per second: 95.00 [#/sec] (mean) +# Time per request: 1052.618 [ms] (mean) +# Time per request: 10.526 [ms] (mean, across all concurrent requests) +# Transfer rate: 8.55 [Kbytes/sec] received +# +# Connection Times (ms) +# min mean[+/-sd] median max +# Connect: 0 3 2.2 3 8 +# Processing: 1042 1046 3.1 1046 1053 +# Waiting: 1037 1042 3.6 1041 1050 +# Total: 1045 1049 3.1 1049 1057 +# +# Percentage of the requests served within a certain time (ms) +# 50% 1049 +# 66% 1051 +# 75% 1053 +# 80% 1053 +# 90% 1054 +# 95% 1054 +# 98% 1056 +# 99% 1057 +# 100% 1057 (longest request) + +class DeferrableBody + include EventMachine::Deferrable + + def call(body) + body.each do |chunk| + @body_callback.call(chunk) + end + end + + def each &blk + @body_callback = blk + end + +end + +class AsyncApp + + # This is a template async response. N.B. Can't use string for body on 1.9 + AsyncResponse = [-1, {}, []].freeze + + def call(env) + + body = DeferrableBody.new + + # Get the headers out there asap, let the client know we're alive... + EventMachine::next_tick { env['async.callback'].call [200, {'Content-Type' => 'text/plain'}, body] } + + # Semi-emulate a long db request, instead of a timer, in reality we'd be + # waiting for the response data. Whilst this happens, other connections + # can be serviced. + # This could be any callback based thing though, a deferrable waiting on + # IO data, a db request, an http request, an smtp send, whatever. + EventMachine::add_timer(1) { + body.call ["Woah, async!\n"] + + EventMachine::next_tick { + # This could actually happen any time, you could spawn off to new + # threads, pause as a good looking lady walks by, whatever. + # Just shows off how we can defer chunks of data in the body, you can + # even call this many times. + body.call ["Cheers then!"] + body.succeed + } + } + + # throw :async # Still works for supporting non-async frameworks... + + AsyncResponse # May end up in Rack :-) + end + +end + +# The additions to env for async.connection and async.callback absolutely +# destroy the speed of the request if Lint is doing it's checks on env. +# It is also important to note that an async response will not pass through +# any further middleware, as the async response notification has been passed +# right up to the webserver, and the callback goes directly there too. +# Middleware could possibly catch :async, and also provide a different +# async.connection and async.callback. + +# use Rack::Lint +run AsyncApp.new diff --git a/t/async_examples/async_tailer.ru b/t/async_examples/async_tailer.ru new file mode 100644 index 0000000..c144720 --- /dev/null +++ b/t/async_examples/async_tailer.ru @@ -0,0 +1,105 @@ +#!/usr/bin/env rackup -s thin +# +# async_tailer.ru +# raggi/thin +# +# Tested with 150 spawned tails on OS X +# +# Created by James Tucker on 2008-06-18. +# Copyright 2008 James Tucker <raggi@rubyforge.org>. + +# Uncomment if appropriate for you.. +# EM.epoll +# EM.kqueue + +tail_log_file = ENV["TAIL_LOG_FILE"] or abort "TAIL_LOG_FILE= env must be set" +unless ::File.file?(tail_log_file) && ::File.readable?(tail_log_file) + abort "#{tail_log_file} must be a readable regular file" +end + +class DeferrableBody + include EventMachine::Deferrable + + def initialize + @queue = [] + # make sure to flush out the queue before closing the connection + callback{ + until @queue.empty? + @queue.shift.each{|chunk| @body_callback.call(chunk) } + end + } + end + + def schedule_dequeue + return unless @body_callback + EventMachine::next_tick do + next unless body = @queue.shift + body.each do |chunk| + @body_callback.call(chunk) + end + schedule_dequeue unless @queue.empty? + end + end + + def call(body) + @queue << body + schedule_dequeue + end + + def each &blk + @body_callback = blk + schedule_dequeue + end + +end + +module TailRenderer + attr_accessor :callback + + def receive_data(data) + @callback.call([data]) + end + + def unbind + @callback.succeed + end +end + +class AsyncTailer + + AsyncResponse = [-1, {}, []].freeze + + def call(env) + + body = DeferrableBody.new + + EventMachine::next_tick do + + env['async.callback'].call [200, {'Content-Type' => 'text/html'}, body] + + body.call ["<h1>Async Tailer</h1><pre>"] + + end + + EventMachine::popen("tail -f #{ENV["TAIL_LOG_FILE"]}", TailRenderer) do |t| + + t.callback = body + + # If for some reason we 'complete' body, close the tail. + body.callback do + t.close_connection + end + + # If for some reason the client disconnects, close the tail. + body.errback do + t.close_connection + end + + end + + AsyncResponse + end + +end + +run AsyncTailer.new diff --git a/t/t0400-em-async-app.sh b/t/t0400-em-async-app.sh new file mode 100755 index 0000000..34da2ad --- /dev/null +++ b/t/t0400-em-async-app.sh @@ -0,0 +1,57 @@ +#!/bin/sh +nr=${nr-5} +. ./test-lib.sh +case $model in +NeverBlock|EventMachine) ;; +*) + t_info "skipping $T since it's not compatible with $model" + exit 0 + ;; +esac + +t_plan 7 "async_app test for test for EM" + +CONFIG_RU=async_examples/async_app.ru + +t_begin "setup and start" && { + rainbows_setup + rtmpfiles a b c curl_err expect + + # this does not does not support Rack::Lint + rainbows -E deployment -D $CONFIG_RU -c $unicorn_config + rainbows_wait_start +} + +t_begin "send async requests off in parallel" && { + t0=$(date +%s) + curl --no-buffer -sSf http://$listen/ > $a 2>> $curl_err & + curl --no-buffer -sSf http://$listen/ > $b 2>> $curl_err & + curl --no-buffer -sSf http://$listen/ > $c 2>> $curl_err & +} + +t_begin "wait for curl terminations" && { + wait + t1=$(date +%s) + elapsed=$(( $t1 - $t0 )) + t_info "elapsed=$elapsed" +} + +t_begin "termination signal sent" && { + kill $rainbows_pid +} + +t_begin "no errors from curl" && { + test ! -s $curl_err +} + +t_begin "no errors in stderr" && check_stderr + +t_begin "responses match expected" && { + echo 'Woah, async!' > $expect + printf 'Cheers then!' >> $expect + cmp $expect $a + cmp $expect $b + cmp $expect $c +} + +t_done diff --git a/t/t0401-em-async-tailer.sh b/t/t0401-em-async-tailer.sh new file mode 100755 index 0000000..cd3bd31 --- /dev/null +++ b/t/t0401-em-async-tailer.sh @@ -0,0 +1,74 @@ +#!/bin/sh +nr=${nr-5} +. ./test-lib.sh +case $model in +NeverBlock|EventMachine) ;; +*) + t_info "skipping $T since it's not compatible with $model" + exit 0 + ;; +esac + +t_plan 8 "async_tailer test for test for EM" + +CONFIG_RU=async_examples/async_tailer.ru + +t_begin "setup and start" && { + rainbows_setup + rtmpfiles a b c curl_err TAIL_LOG_FILE expect + + printf '<h1>Async Tailer</h1><pre>' >> $expect + + export TAIL_LOG_FILE + + # this does not does not support Rack::Lint + rainbows -E deployment -D $CONFIG_RU -c $unicorn_config + rainbows_wait_start +} + +t_begin "send async requests off in parallel" && { + t0=$(date +%s) + curl --no-buffer -sSf http://$listen/ > $a 2>> $curl_err & + curl_a=$! + curl --no-buffer -sSf http://$listen/ > $b 2>> $curl_err & + curl_b=$! + curl --no-buffer -sSf http://$listen/ > $c 2>> $curl_err & + curl_c=$! +} + +t_begin "generate log output" && { + for i in $(awk "BEGIN {for(i=0;i<$nr;i++) print i}" < /dev/null) + do + date >> $TAIL_LOG_FILE + sleep 1 + done + # sometimes tail(1) can be slow + sleep 2 +} + +t_begin "kill curls and wait for termination" && { + kill $curl_a $curl_b $curl_c + wait + t1=$(date +%s) + elapsed=$(( $t1 - $t0 )) + t_info "elapsed=$elapsed" +} + +t_begin "termination signal sent" && { + kill $rainbows_pid +} + +t_begin "no errors from curl" && { + test ! -s $curl_err +} + +t_begin "no errors in stderr" && check_stderr + +t_begin "responses match expected" && { + cat $TAIL_LOG_FILE >> $expect + cmp $expect $a + cmp $expect $b + cmp $expect $c +} + +t_done |