From 5479b15c766204e31495e87a64fa689141cc38a3 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Thu, 29 Jul 2010 07:05:48 +0000 Subject: revactor: Actor-aware dev_fd_response proxying Proxying regular Ruby IO objects while Revactor is in use is highly suboptimal, so proxy it with an Actor-aware wrapper for better scheduling. --- lib/rainbows/dev_fd_response.rb | 2 + lib/rainbows/revactor.rb | 2 + lib/rainbows/revactor/proxy.rb | 55 ++++++++++++++++++++++++ t/t0034-pipelined-pipe-response.sh | 87 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 146 insertions(+) create mode 100644 lib/rainbows/revactor/proxy.rb create mode 100755 t/t0034-pipelined-pipe-response.sh diff --git a/lib/rainbows/dev_fd_response.rb b/lib/rainbows/dev_fd_response.rb index 691526c..d839803 100644 --- a/lib/rainbows/dev_fd_response.rb +++ b/lib/rainbows/dev_fd_response.rb @@ -53,6 +53,8 @@ class Rainbows::DevFdResponse < Struct.new(:app) case env["rainbows.model"] when :FiberSpawn, :FiberPool, :RevFiberSpawn io = Rainbows::Fiber::IO.new(io,::Fiber.current) + when :Revactor + io = Rainbows::Revactor::Proxy.new(io) end else # unlikely, char/block device file, directory, ... return response diff --git a/lib/rainbows/revactor.rb b/lib/rainbows/revactor.rb index 0120ebe..8ec791d 100644 --- a/lib/rainbows/revactor.rb +++ b/lib/rainbows/revactor.rb @@ -22,6 +22,8 @@ module Rainbows::Revactor # :stopdoc: RD_ARGS = {} + autoload :Proxy, 'rainbows/revactor/proxy' + include Rainbows::Base LOCALHOST = Unicorn::HttpRequest::LOCALHOST TCP = ::Revactor::TCP::Socket diff --git a/lib/rainbows/revactor/proxy.rb b/lib/rainbows/revactor/proxy.rb new file mode 100644 index 0000000..a7d3be1 --- /dev/null +++ b/lib/rainbows/revactor/proxy.rb @@ -0,0 +1,55 @@ +# -*- encoding: binary -*- +# :enddoc: +# Generic IO wrapper for proxying pipe and socket objects +# this behaves more like Rainbows::Fiber::IO than anything, +# making it highly suitable for proxying data from pipes/sockets +class Rainbows::Revactor::Proxy < Rev::IO + def initialize(io) + @receiver = Actor.current + super(io) + attach(Rev::Loop.default) + end + + def close + if @_io + super + @_io = nil + end + end + + def each(&block) + # when yield-ing, Revactor::TCP#write may raise EOFError + # (instead of Errno::EPIPE), so we need to limit the rescue + # to just readpartial and let EOFErrors during yield bubble up + begin + buf = readpartial(INPUT_SIZE) + rescue EOFError + break + end while yield(buf) || true + end + + # this may return more than the specified length, Rainbows! won't care... + def readpartial(length) + @receiver = Actor.current + enable if attached? && ! enabled? + + Actor.receive do |filter| + filter.when(T[:rainbows_io_input, self]) do |_, _, data| + return data + end + + filter.when(T[:rainbows_io_closed, self]) do + raise EOFError, "connection closed" + end + end + end + + def on_close + @receiver << T[:rainbows_io_closed, self] + end + + def on_read(data) + @receiver << T[:rainbows_io_input, self, data ] + disable + end +end diff --git a/t/t0034-pipelined-pipe-response.sh b/t/t0034-pipelined-pipe-response.sh new file mode 100755 index 0000000..8346af9 --- /dev/null +++ b/t/t0034-pipelined-pipe-response.sh @@ -0,0 +1,87 @@ +#!/bin/sh +. ./test-lib.sh + +t_plan 6 "pipelined pipe response for $model" + +t_begin "setup and startup" && { + rtmpfiles err out dd_fifo + rainbows_setup $model + + # can't load Rack::Lint here since it clobbers body#to_path + rainbows -E none -D fast-pipe-response.ru -c $unicorn_config + rainbows_wait_start +} + +t_begin "read random blob sha1" && { + random_blob_sha1=$(rsha1 < random_blob) +} + +script=' +require "digest/sha1" +require "kcar" +$stdin.binmode +expect = ENV["random_blob_sha1"] +kcar = Kcar::Response.new($stdin, {}) +3.times do + nr = 0 + status, headers, body = kcar.rack + dig = Digest::SHA1.new + body.each { |buf| dig << buf ; nr += buf.size } + sha1 = dig.hexdigest + sha1 == expect or abort "mismatch: sha1=#{sha1} != expect=#{expect}" + body.close +end +$stdout.syswrite("ok\n") +' + +t_begin "staggered pipeline of 3 HTTP requests" && { + req='GET /random_blob HTTP/1.1\r\nHost: example.com\r\n' + rm -f $ok + ( + export random_blob_sha1 + $RUBY -e "$script" < $fifo >> $ok & + printf "$req"'X-Req:0\r\n\r\n' + exec 6>&1 + ( + dd bs=16384 count=1 + printf "$req" >&6 + dd bs=16384 count=1 + printf 'X-Req:1\r\n\r\n' >&6 + dd bs=16384 count=1 + printf "$req" >&6 + dd bs=16384 count=1 + printf 'X-Req:2\r\n' >&6 + dd bs=16384 count=1 + printf 'Connection: close\r\n\r' >&6 + dd bs=16384 count=1 + printf '\n' >&6 + cat + ) < $dd_fifo > $fifo & + wait + echo ok >> $ok + ) | socat - TCP:$listen > $dd_fifo + test 2 -eq $(grep '^ok$' $ok |wc -l) +} + +t_begin "pipeline 3 HTTP requests" && { + rm -f $ok + req='GET /random_blob HTTP/1.1\r\nHost: example.com\r\n' + req="$req"'\r\n'"$req"'\r\n'"$req" + req="$req"'Connection: close\r\n\r\n' + ( + export random_blob_sha1 + $RUBY -e "$script" < $fifo >> $ok & + printf "$req" + wait + echo ok >> $ok + ) | socat - TCP:$listen > $fifo + test 2 -eq $(grep '^ok$' $ok |wc -l) +} + +t_begin "shutdown server" && { + kill -QUIT $rainbows_pid +} + +t_begin "check stderr" && check_stderr + +t_done -- cgit v1.2.3-24-ge0c7