From 0cd65fa1e01be369b270c72053cf21a3d6bcb45f Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 19 Jul 2010 10:10:05 +0000 Subject: ensure stream response bodies get closed Some middlewares such as Clogger rely on wrapping the body having the close method called on it for logging. --- lib/rainbows/dev_fd_response.rb | 10 ++++++---- lib/rainbows/event_machine.rb | 2 +- lib/rainbows/event_machine/response_pipe.rb | 7 ++++--- lib/rainbows/response/body.rb | 6 ++++-- t/close-pipe-response.ru | 26 +++++++++++++++++++++++++ t/t0031-close-pipe-response.sh | 30 +++++++++++++++++++++++++++++ 6 files changed, 71 insertions(+), 10 deletions(-) create mode 100644 t/close-pipe-response.ru create mode 100644 t/t0031-close-pipe-response.sh diff --git a/lib/rainbows/dev_fd_response.rb b/lib/rainbows/dev_fd_response.rb index 451cad7..691526c 100644 --- a/lib/rainbows/dev_fd_response.rb +++ b/lib/rainbows/dev_fd_response.rb @@ -36,6 +36,7 @@ class Rainbows::DevFdResponse < Struct.new(:app) headers = HeaderHash.new(headers) st = io.stat + fileno = io.fileno if st.file? headers['Content-Length'] ||= st.size.to_s headers.delete('Transfer-Encoding') @@ -51,15 +52,15 @@ class Rainbows::DevFdResponse < Struct.new(:app) # we need to make sure our pipe output is Fiber-compatible case env["rainbows.model"] when :FiberSpawn, :FiberPool, :RevFiberSpawn - return [ status, headers, Rainbows::Fiber::IO.new(io,::Fiber.current) ] + io = Rainbows::Fiber::IO.new(io,::Fiber.current) end else # unlikely, char/block device file, directory, ... return response end - [ status, headers, Body.new(io, "/dev/fd/#{io.fileno}") ] + [ status, headers, Body.new(io, "/dev/fd/#{fileno}", body) ] end - class Body < Struct.new(:to_io, :to_path) + class Body < Struct.new(:to_io, :to_path, :orig_body) # called by the webserver or other middlewares if they can't # handle #to_path def each(&block) @@ -74,7 +75,8 @@ class Rainbows::DevFdResponse < Struct.new(:app) # called by the web server after #each def close - to_io.close if to_io.respond_to?(:close) + to_io.close unless to_io.closed? + orig_body.close if orig_body.respond_to?(:close) # may not be an IO rescue IOError # could've been IO::new()'ed and closed end end diff --git a/lib/rainbows/event_machine.rb b/lib/rainbows/event_machine.rb index 0876ac9..4faa7a6 100644 --- a/lib/rainbows/event_machine.rb +++ b/lib/rainbows/event_machine.rb @@ -126,7 +126,7 @@ module Rainbows elsif st.socket? || st.pipe? chunk = stream_response_headers(status, headers) if headers m = chunk ? ResponseChunkPipe : ResponsePipe - return EM.watch(io, m, self, alive).notify_readable = true + return EM.watch(io, m, self, alive, body).notify_readable = true end # char or block device... WTF? fall through to body.each end diff --git a/lib/rainbows/event_machine/response_pipe.rb b/lib/rainbows/event_machine/response_pipe.rb index 88d6e5a..7d4988a 100644 --- a/lib/rainbows/event_machine/response_pipe.rb +++ b/lib/rainbows/event_machine/response_pipe.rb @@ -5,8 +5,8 @@ module Rainbows::EventMachine::ResponsePipe # so a single buffer for all clients will work safely BUF = '' - def initialize(client, alive) - @client, @alive = client, alive + def initialize(client, alive, body) + @client, @alive, @body = client, alive, body end def notify_readable @@ -23,6 +23,7 @@ module Rainbows::EventMachine::ResponsePipe def unbind @client.quit unless @alive - @io.close + @body.close if @body.respond_to?(:close) + @io.close unless @io.closed? end end diff --git a/lib/rainbows/response/body.rb b/lib/rainbows/response/body.rb index 0a2bb5d..9e36412 100644 --- a/lib/rainbows/response/body.rb +++ b/lib/rainbows/response/body.rb @@ -88,9 +88,11 @@ module Rainbows::Response::Body # :nodoc: ensure body.respond_to?(:close) && inp != body and body.close end - else + elsif method_defined?(:write_body_stream) def write_body_path(sock, body) - write_body_stream(sock, body_to_io(body)) + write_body_stream(sock, inp = body_to_io(body)) + ensure + body.respond_to?(:close) && inp != body and body.close end end diff --git a/t/close-pipe-response.ru b/t/close-pipe-response.ru new file mode 100644 index 0000000..96116d4 --- /dev/null +++ b/t/close-pipe-response.ru @@ -0,0 +1,26 @@ +# must be run without Rack::Lint since that clobbers to_path +class CloseWrapper < Struct.new(:to_io) + def each(&block) + to_io.each(&block) + end + + def close + ::File.open(ENV['fifo'], 'wb') do |fp| + fp.syswrite("CLOSING #{to_io}\n") + if to_io.respond_to?(:close) && ! to_io.closed? + to_io.close + end + end + end +end +use Rainbows::DevFdResponse +run(lambda { |env| + body = 'hello world' + io = IO.popen("echo '#{body}'", 'rb') + [ 200, + { + 'Content-Length' => (body.size + 1).to_s, + 'Content-Type' => 'application/octet-stream', + }, + CloseWrapper[io] ] +}) diff --git a/t/t0031-close-pipe-response.sh b/t/t0031-close-pipe-response.sh new file mode 100644 index 0000000..7439b5f --- /dev/null +++ b/t/t0031-close-pipe-response.sh @@ -0,0 +1,30 @@ +#!/bin/sh +. ./test-lib.sh + +t_plan 5 "close pipe response for $model" + +t_begin "setup and startup" && { + rtmpfiles err out + rainbows_setup $model + export fifo + rainbows -E none -D close-pipe-response.ru -c $unicorn_config + rainbows_wait_start +} + +t_begin "single request matches" && { + cat $fifo > $out & + test x'hello world' = x"$(curl -sSfv 2> $err http://$listen/)" +} + +t_begin "body.close called" && { + wait # for cat $fifo + grep CLOSING $out || die "body.close not logged" +} + +t_begin "shutdown server" && { + kill -QUIT $rainbows_pid +} + +t_begin "check stderr" && check_stderr + +t_done -- cgit v1.2.3-24-ge0c7