From 40edc84784864063a38ba38bf854a2119c243ce4 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Thu, 9 Jun 2011 00:34:00 +0000 Subject: stream_response_epoll: our most "special" concurrency option yet This doesn't use Rainbows::Base so we have no keepalive support at all. This could eventually be an option for streaming applications. --- lib/rainbows.rb | 1 + lib/rainbows/stream_response_epoll.rb | 72 ++++++++++++++++++++++++++++ lib/rainbows/stream_response_epoll/client.rb | 57 ++++++++++++++++++++++ t/GNUmakefile | 1 + t/t0000-simple-http.sh | 1 + t/t0001-unix-http.sh | 1 + t/t0005-large-file-response.sh | 1 + t/t0009-broken-app.sh | 1 + t/t0010-keepalive-timeout-effective.sh | 2 + t/t0011-close-on-exec-set.sh | 1 + t/t0019-keepalive-cpu-usage.sh | 2 + t/t0020-large-sendfile-response.sh | 1 + t/t0021-sendfile-wrap-to_path.sh | 1 + t/t0023-sendfile-byte-range.sh | 1 + t/t0024-pipelined-sendfile-response.sh | 1 + t/t0030-fast-pipe-response.sh | 1 + t/t0031-close-pipe-response.sh | 1 + t/t0032-close-pipe-to_path-response.sh | 1 + t/t0034-pipelined-pipe-response.sh | 1 + t/t0035-kgio-pipe-response.sh | 1 + t/t0040-keepalive_requests-setting.sh | 1 + t/t0044-autopush.sh | 1 + t/t0050-response-body-close-has-env.sh | 1 + t/t0103-rack-input-limit.sh | 1 + t/t0104-rack-input-limit-tiny.sh | 1 + t/t0105-rack-input-limit-bigger.sh | 1 + t/t0106-rack-input-keepalive.sh | 1 + t/t0107-rack-input-limit-zero.sh | 1 + t/t0200-async-response.sh | 1 + t/t0202-async-response-one-oh.sh | 1 + t/t9001-sendfile-to-path.sh | 1 + 31 files changed, 160 insertions(+) create mode 100644 lib/rainbows/stream_response_epoll.rb create mode 100644 lib/rainbows/stream_response_epoll/client.rb diff --git a/lib/rainbows.rb b/lib/rainbows.rb index 001a76b..bd2c106 100644 --- a/lib/rainbows.rb +++ b/lib/rainbows.rb @@ -136,6 +136,7 @@ module Rainbows autoload :NeverBlock, "rainbows/never_block" autoload :XEpollThreadSpawn, "rainbows/xepoll_thread_spawn" autoload :XEpollThreadPool, "rainbows/xepoll_thread_pool" + autoload :StreamResponseEpoll, "rainbows/stream_response_epoll" autoload :Fiber, 'rainbows/fiber' # core class autoload :StreamFile, 'rainbows/stream_file' diff --git a/lib/rainbows/stream_response_epoll.rb b/lib/rainbows/stream_response_epoll.rb new file mode 100644 index 0000000..9ded810 --- /dev/null +++ b/lib/rainbows/stream_response_epoll.rb @@ -0,0 +1,72 @@ +# -*- encoding: binary -*- +require "sleepy_penguin" +require "raindrops" + +# Like \Unicorn itself, this concurrency model is only intended for use +# behind nginx and completely unsupported otherwise. +# +# It does NOT require a thread-safe Rack application at any point, but +# allows streaming data asynchronously via nginx (using the the +# "X-Accel-Buffering: no" header). +# +# Unlike Rainbows::Base, this does NOT support persistent +# connections or pipelining. All \Rainbows! specific configuration +# options are ignored (except Rainbows::Configurator#use). +# +# === RubyGem Requirements +# +# * raindrops 0.6.0 or later +# * sleepy_penguin 3.0.1 or later +module Rainbows::StreamResponseEpoll + # :stopdoc: + CODES = Unicorn::HttpResponse::CODES + HEADER_END = "X-Accel-Buffering: no\r\n\r\n" + autoload :Client, "rainbows/stream_response_epoll/client" + + def http_response_write(socket, status, headers, body) + status = CODES[status.to_i] || status + ep_client = false + + if headers + buf = "HTTP/1.0 #{status}\r\nStatus: #{status}\r\n" + headers.each do |key, value| + if value =~ /\n/ + # avoiding blank, key-only cookies with /\n+/ + buf << value.split(/\n+/).map! { |v| "#{key}: #{v}\r\n" }.join + else + buf << "#{key}: #{value}\r\n" + end + end + buf << HEADER_END + + case rv = socket.kgio_trywrite(buf) + when nil then break + when String # retry, socket buffer may grow + buf = rv + when :wait_writable + ep_client = Client.new(socket, buf) + body.each { |chunk| ep_client.write(chunk) } + return ep_client.close + end while true + end + + body.each do |chunk| + if ep_client + ep_client.write(chunk) + else + case rv = socket.kgio_trywrite(chunk) + when nil then break + when String # retry, socket buffer may grow + chunk = rv + when :wait_writable + ep_client = Client.new(socket, chunk) + break + end while true + end + end + ep_client.close if ep_client + ensure + body.respond_to?(:close) and body.close + end + # :startdoc: +end diff --git a/lib/rainbows/stream_response_epoll/client.rb b/lib/rainbows/stream_response_epoll/client.rb new file mode 100644 index 0000000..cf3056e --- /dev/null +++ b/lib/rainbows/stream_response_epoll/client.rb @@ -0,0 +1,57 @@ +# -*- encoding: binary -*- +# :enddoc: +class Rainbows::StreamResponseEpoll::Client + OUT = SleepyPenguin::Epoll::OUT + N = Raindrops.new(1) + EP = SleepyPenguin::Epoll.new + timeout = Rainbows.server.timeout + thr = Thread.new do + begin + EP.wait(nil, timeout) { |_,client| client.epoll_run } + rescue Errno::EINTR + rescue => e + Rainbows::Error.listen_loop(e) + end while Rainbows.alive || N[0] > 0 + end + Rainbows.at_quit { thr.join(timeout) } + + attr_reader :to_io + + def initialize(io, unwritten) + @closed = false + @to_io = io.dup + @wr_queue = [ unwritten.dup ] + EP.set(self, OUT) + end + + def write(str) + @wr_queue << str.dup + end + + def close + @closed = true + end + + def epoll_run + return if @to_io.closed? + buf = @wr_queue.shift or return on_write_complete + case rv = @to_io.kgio_trywrite(buf) + when nil + buf = @wr_queue.shift or return on_write_complete + when String # retry, socket buffer may grow + buf = rv + when :wait_writable + return @wr_queue.unshift(buf) + end while true + rescue => err + @to_io.close + N.decr(0, 1) + end + + def on_write_complete + if @closed + @to_io.close + N.decr(0, 1) + end + end +end diff --git a/t/GNUmakefile b/t/GNUmakefile index 3d05052..78a4e83 100644 --- a/t/GNUmakefile +++ b/t/GNUmakefile @@ -32,6 +32,7 @@ models += ThreadSpawn models += Coolio models += EventMachine models += NeverBlock +models += StreamResponseEpoll ifeq ($(RUBY_ENGINE),ruby) rp := ) diff --git a/t/t0000-simple-http.sh b/t/t0000-simple-http.sh index 6f4d738..57a7d59 100755 --- a/t/t0000-simple-http.sh +++ b/t/t0000-simple-http.sh @@ -1,5 +1,6 @@ #!/bin/sh . ./test-lib.sh +skip_models StreamResponseEpoll t_plan 25 "simple HTTP connection keepalive/pipelining tests for $model" t_begin "checking for config.ru for $model" && { diff --git a/t/t0001-unix-http.sh b/t/t0001-unix-http.sh index e9bc919..32d54c7 100755 --- a/t/t0001-unix-http.sh +++ b/t/t0001-unix-http.sh @@ -1,5 +1,6 @@ #!/bin/sh . ./test-lib.sh +skip_models StreamResponseEpoll t_plan 19 "simple HTTP connection keepalive/pipelining tests for $model" t_begin "checking for config.ru for $model" && { diff --git a/t/t0005-large-file-response.sh b/t/t0005-large-file-response.sh index d709d79..c21209d 100755 --- a/t/t0005-large-file-response.sh +++ b/t/t0005-large-file-response.sh @@ -1,5 +1,6 @@ #!/bin/sh . ./test-lib.sh +skip_models StreamResponseEpoll test -r random_blob || die "random_blob required, run with 'make $0'" if ! grep -v ^VmRSS: /proc/self/status >/dev/null 2>&1 diff --git a/t/t0009-broken-app.sh b/t/t0009-broken-app.sh index 5ccb400..efa9ea1 100755 --- a/t/t0009-broken-app.sh +++ b/t/t0009-broken-app.sh @@ -1,5 +1,6 @@ #!/bin/sh . ./test-lib.sh +skip_models StreamResponseEpoll t_plan 9 "graceful handling of broken apps for $model" diff --git a/t/t0010-keepalive-timeout-effective.sh b/t/t0010-keepalive-timeout-effective.sh index 9d4d651..0a6236f 100755 --- a/t/t0010-keepalive-timeout-effective.sh +++ b/t/t0010-keepalive-timeout-effective.sh @@ -1,5 +1,7 @@ #!/bin/sh . ./test-lib.sh +skip_models StreamResponseEpoll + t_plan 6 "keepalive_timeout tests for $model" t_begin "setup and start" && { diff --git a/t/t0011-close-on-exec-set.sh b/t/t0011-close-on-exec-set.sh index 7ff0a6b..9c66575 100755 --- a/t/t0011-close-on-exec-set.sh +++ b/t/t0011-close-on-exec-set.sh @@ -1,6 +1,7 @@ #!/bin/sh nr=${nr-"5"} . ./test-lib.sh +skip_models StreamResponseEpoll t_plan 7 "ensure close-on-exec flag is set for $model" diff --git a/t/t0019-keepalive-cpu-usage.sh b/t/t0019-keepalive-cpu-usage.sh index e368709..b63c3c9 100644 --- a/t/t0019-keepalive-cpu-usage.sh +++ b/t/t0019-keepalive-cpu-usage.sh @@ -5,6 +5,8 @@ then fi . ./test-lib.sh skip_models WriterThreadSpawn WriterThreadPool Base +skip_models StreamResponseEpoll + t_plan 6 "keepalive_timeout CPU usage tests for $model" t_begin "setup and start" && { diff --git a/t/t0020-large-sendfile-response.sh b/t/t0020-large-sendfile-response.sh index 18b0bf5..a8bc8e2 100755 --- a/t/t0020-large-sendfile-response.sh +++ b/t/t0020-large-sendfile-response.sh @@ -1,5 +1,6 @@ #!/bin/sh . ./test-lib.sh +skip_models StreamResponseEpoll test -r random_blob || die "random_blob required, run with 'make $0'" case $RUBY_ENGINE in ruby) ;; diff --git a/t/t0021-sendfile-wrap-to_path.sh b/t/t0021-sendfile-wrap-to_path.sh index 7f3acaa..77437b6 100755 --- a/t/t0021-sendfile-wrap-to_path.sh +++ b/t/t0021-sendfile-wrap-to_path.sh @@ -1,5 +1,6 @@ #!/bin/sh . ./test-lib.sh +skip_models StreamResponseEpoll test -r random_blob || die "random_blob required, run with 'make $0'" case $RUBY_ENGINE in ruby) ;; diff --git a/t/t0023-sendfile-byte-range.sh b/t/t0023-sendfile-byte-range.sh index 2cb8516..a5b6ab2 100755 --- a/t/t0023-sendfile-byte-range.sh +++ b/t/t0023-sendfile-byte-range.sh @@ -1,5 +1,6 @@ #!/bin/sh . ./test-lib.sh +skip_models StreamResponseEpoll test -r random_blob || die "random_blob required, run with 'make $0'" case $RUBY_ENGINE in ruby) ;; diff --git a/t/t0024-pipelined-sendfile-response.sh b/t/t0024-pipelined-sendfile-response.sh index 9111ce9..d98be5f 100755 --- a/t/t0024-pipelined-sendfile-response.sh +++ b/t/t0024-pipelined-sendfile-response.sh @@ -1,5 +1,6 @@ #!/bin/sh . ./test-lib.sh +skip_models StreamResponseEpoll t_plan 6 "pipelined sendfile response for $model" diff --git a/t/t0030-fast-pipe-response.sh b/t/t0030-fast-pipe-response.sh index f81029a..aab4357 100755 --- a/t/t0030-fast-pipe-response.sh +++ b/t/t0030-fast-pipe-response.sh @@ -1,5 +1,6 @@ #!/bin/sh . ./test-lib.sh +skip_models StreamResponseEpoll test -r random_blob || die "random_blob required, run with 'make $0'" t_plan 10 "fast pipe response for $model" diff --git a/t/t0031-close-pipe-response.sh b/t/t0031-close-pipe-response.sh index c863d4a..04ac12b 100755 --- a/t/t0031-close-pipe-response.sh +++ b/t/t0031-close-pipe-response.sh @@ -1,5 +1,6 @@ #!/bin/sh . ./test-lib.sh +skip_models StreamResponseEpoll t_plan 16 "close pipe response for $model" diff --git a/t/t0032-close-pipe-to_path-response.sh b/t/t0032-close-pipe-to_path-response.sh index e3d8f1b..8092930 100755 --- a/t/t0032-close-pipe-to_path-response.sh +++ b/t/t0032-close-pipe-to_path-response.sh @@ -1,5 +1,6 @@ #!/bin/sh . ./test-lib.sh +skip_models StreamResponseEpoll if ! test -d /dev/fd then t_info "skipping $T since /dev/fd is required" diff --git a/t/t0034-pipelined-pipe-response.sh b/t/t0034-pipelined-pipe-response.sh index 6dff9ad..2d28f86 100755 --- a/t/t0034-pipelined-pipe-response.sh +++ b/t/t0034-pipelined-pipe-response.sh @@ -1,5 +1,6 @@ #!/bin/sh . ./test-lib.sh +skip_models StreamResponseEpoll t_plan 6 "pipelined pipe response for $model" diff --git a/t/t0035-kgio-pipe-response.sh b/t/t0035-kgio-pipe-response.sh index 90258eb..552270f 100755 --- a/t/t0035-kgio-pipe-response.sh +++ b/t/t0035-kgio-pipe-response.sh @@ -1,5 +1,6 @@ #!/bin/sh . ./test-lib.sh +skip_models StreamResponseEpoll test -r random_blob || die "random_blob required, run with 'make $0'" t_plan 10 "fast Kgio pipe response for $model" diff --git a/t/t0040-keepalive_requests-setting.sh b/t/t0040-keepalive_requests-setting.sh index 81194f1..103b217 100755 --- a/t/t0040-keepalive_requests-setting.sh +++ b/t/t0040-keepalive_requests-setting.sh @@ -1,5 +1,6 @@ #!/bin/sh . ./test-lib.sh +skip_models StreamResponseEpoll t_plan 6 "keepalive_requests limit tests for $model" t_begin "setup and start" && { diff --git a/t/t0044-autopush.sh b/t/t0044-autopush.sh index bac97d7..5017067 100644 --- a/t/t0044-autopush.sh +++ b/t/t0044-autopush.sh @@ -15,6 +15,7 @@ fi # these buffer internally in external libraries, so we can't detect when # to use TCP_CORK skip_models EventMachine NeverBlock +skip_models StreamResponseEpoll skip_models Coolio CoolioThreadPool CoolioThreadSpawn skip_models Revactor Rev RevThreadPool RevThreadSpawn diff --git a/t/t0050-response-body-close-has-env.sh b/t/t0050-response-body-close-has-env.sh index be16a99..e7e6a68 100644 --- a/t/t0050-response-body-close-has-env.sh +++ b/t/t0050-response-body-close-has-env.sh @@ -1,5 +1,6 @@ #!/bin/sh . ./test-lib.sh +skip_models StreamResponseEpoll t_plan 29 "keepalive does not clear Rack env prematurely for $model" diff --git a/t/t0103-rack-input-limit.sh b/t/t0103-rack-input-limit.sh index 64d6dac..efb87fe 100755 --- a/t/t0103-rack-input-limit.sh +++ b/t/t0103-rack-input-limit.sh @@ -1,5 +1,6 @@ #!/bin/sh . ./test-lib.sh +skip_models StreamResponseEpoll test -r random_blob || die "random_blob required, run with 'make $0'" req_curl_chunked_upload_err_check diff --git a/t/t0104-rack-input-limit-tiny.sh b/t/t0104-rack-input-limit-tiny.sh index 1104a97..7e806db 100755 --- a/t/t0104-rack-input-limit-tiny.sh +++ b/t/t0104-rack-input-limit-tiny.sh @@ -1,5 +1,6 @@ #!/bin/sh . ./test-lib.sh +skip_models StreamResponseEpoll test -r random_blob || die "random_blob required, run with 'make $0'" req_curl_chunked_upload_err_check diff --git a/t/t0105-rack-input-limit-bigger.sh b/t/t0105-rack-input-limit-bigger.sh index ed13d4e..1ae0191 100755 --- a/t/t0105-rack-input-limit-bigger.sh +++ b/t/t0105-rack-input-limit-bigger.sh @@ -1,5 +1,6 @@ #!/bin/sh . ./test-lib.sh +skip_models StreamResponseEpoll test -r random_blob || die "random_blob required, run with 'make $0'" req_curl_chunked_upload_err_check diff --git a/t/t0106-rack-input-keepalive.sh b/t/t0106-rack-input-keepalive.sh index f9c9758..e408701 100755 --- a/t/t0106-rack-input-keepalive.sh +++ b/t/t0106-rack-input-keepalive.sh @@ -1,5 +1,6 @@ #!/bin/sh . ./test-lib.sh +skip_models StreamResponseEpoll t_plan 11 "rack.input pipelining test" t_begin "setup and startup" && { diff --git a/t/t0107-rack-input-limit-zero.sh b/t/t0107-rack-input-limit-zero.sh index 94aa256..5da8667 100755 --- a/t/t0107-rack-input-limit-zero.sh +++ b/t/t0107-rack-input-limit-zero.sh @@ -1,5 +1,6 @@ #!/bin/sh . ./test-lib.sh +skip_models StreamResponseEpoll req_curl_chunked_upload_err_check t_plan 6 "rack.input client_max_body_size zero" diff --git a/t/t0200-async-response.sh b/t/t0200-async-response.sh index 16e1f76..11917f0 100755 --- a/t/t0200-async-response.sh +++ b/t/t0200-async-response.sh @@ -3,6 +3,7 @@ CONFIG_RU=${CONFIG_RU-'async-response.ru'} . ./test-lib.sh skip_models Base WriterThreadPool WriterThreadSpawn +skip_models StreamResponseEpoll case $CONFIG_RU in *no-autochunk.ru) diff --git a/t/t0202-async-response-one-oh.sh b/t/t0202-async-response-one-oh.sh index 0d833ca..80ffc1f 100755 --- a/t/t0202-async-response-one-oh.sh +++ b/t/t0202-async-response-one-oh.sh @@ -3,6 +3,7 @@ CONFIG_RU=${CONFIG_RU-'async-response.ru'} . ./test-lib.sh skip_models Base WriterThreadPool WriterThreadSpawn +skip_models StreamResponseEpoll t_plan 6 "async HTTP/1.0 response for $model" diff --git a/t/t9001-sendfile-to-path.sh b/t/t9001-sendfile-to-path.sh index 88b9c34..5a9fdcd 100755 --- a/t/t9001-sendfile-to-path.sh +++ b/t/t9001-sendfile-to-path.sh @@ -1,5 +1,6 @@ #!/bin/sh . ./test-lib.sh +skip_models StreamResponseEpoll t_plan 7 "Sendfile middleware test for $model" -- cgit v1.2.3-24-ge0c7