about summary refs log tree commit homepage
diff options
context:
space:
mode:
-rw-r--r--lib/rainbows/dev_fd_response.rb2
-rw-r--r--lib/rainbows/revactor.rb2
-rw-r--r--lib/rainbows/revactor/proxy.rb55
-rwxr-xr-xt/t0034-pipelined-pipe-response.sh87
4 files changed, 146 insertions, 0 deletions
diff --git a/lib/rainbows/dev_fd_response.rb b/lib/rainbows/dev_fd_response.rb
index 691526c..d839803 100644
--- a/lib/rainbows/dev_fd_response.rb
+++ b/lib/rainbows/dev_fd_response.rb
@@ -53,6 +53,8 @@ class Rainbows::DevFdResponse < Struct.new(:app)
       case env["rainbows.model"]
       when :FiberSpawn, :FiberPool, :RevFiberSpawn
         io = Rainbows::Fiber::IO.new(io,::Fiber.current)
+      when :Revactor
+        io = Rainbows::Revactor::Proxy.new(io)
       end
     else # unlikely, char/block device file, directory, ...
       return response
diff --git a/lib/rainbows/revactor.rb b/lib/rainbows/revactor.rb
index 0120ebe..8ec791d 100644
--- a/lib/rainbows/revactor.rb
+++ b/lib/rainbows/revactor.rb
@@ -22,6 +22,8 @@ module Rainbows::Revactor
   # :stopdoc:
   RD_ARGS = {}
 
+  autoload :Proxy, 'rainbows/revactor/proxy'
+
   include Rainbows::Base
   LOCALHOST = Unicorn::HttpRequest::LOCALHOST
   TCP = ::Revactor::TCP::Socket
diff --git a/lib/rainbows/revactor/proxy.rb b/lib/rainbows/revactor/proxy.rb
new file mode 100644
index 0000000..a7d3be1
--- /dev/null
+++ b/lib/rainbows/revactor/proxy.rb
@@ -0,0 +1,55 @@
+# -*- encoding: binary -*-
+# :enddoc:
+# Generic IO wrapper for proxying pipe and socket objects
+# this behaves more like Rainbows::Fiber::IO than anything,
+# making it highly suitable for proxying data from pipes/sockets
+class Rainbows::Revactor::Proxy < Rev::IO
+  def initialize(io)
+    @receiver = Actor.current
+    super(io)
+    attach(Rev::Loop.default)
+  end
+
+  def close
+    if @_io
+      super
+      @_io = nil
+    end
+  end
+
+  def each(&block)
+    # when yield-ing, Revactor::TCP#write may raise EOFError
+    # (instead of Errno::EPIPE), so we need to limit the rescue
+    # to just readpartial and let EOFErrors during yield bubble up
+    begin
+      buf = readpartial(INPUT_SIZE)
+    rescue EOFError
+      break
+    end while yield(buf) || true
+  end
+
+  # this may return more than the specified length, Rainbows! won't care...
+  def readpartial(length)
+    @receiver = Actor.current
+    enable if attached? && ! enabled?
+
+    Actor.receive do |filter|
+      filter.when(T[:rainbows_io_input, self]) do |_, _, data|
+        return data
+      end
+
+      filter.when(T[:rainbows_io_closed, self]) do
+        raise EOFError, "connection closed"
+      end
+    end
+  end
+
+  def on_close
+    @receiver << T[:rainbows_io_closed, self]
+  end
+
+  def on_read(data)
+    @receiver << T[:rainbows_io_input, self, data ]
+    disable
+  end
+end
diff --git a/t/t0034-pipelined-pipe-response.sh b/t/t0034-pipelined-pipe-response.sh
new file mode 100755
index 0000000..8346af9
--- /dev/null
+++ b/t/t0034-pipelined-pipe-response.sh
@@ -0,0 +1,87 @@
+#!/bin/sh
+. ./test-lib.sh
+
+t_plan 6 "pipelined pipe response for $model"
+
+t_begin "setup and startup" && {
+        rtmpfiles err out dd_fifo
+        rainbows_setup $model
+
+        # can't load Rack::Lint here since it clobbers body#to_path
+        rainbows -E none -D fast-pipe-response.ru -c $unicorn_config
+        rainbows_wait_start
+}
+
+t_begin "read random blob sha1" && {
+        random_blob_sha1=$(rsha1 < random_blob)
+}
+
+script='
+require "digest/sha1"
+require "kcar"
+$stdin.binmode
+expect = ENV["random_blob_sha1"]
+kcar = Kcar::Response.new($stdin, {})
+3.times do
+        nr = 0
+        status, headers, body = kcar.rack
+        dig = Digest::SHA1.new
+        body.each { |buf| dig << buf ; nr += buf.size }
+        sha1 = dig.hexdigest
+        sha1 == expect or abort "mismatch: sha1=#{sha1} != expect=#{expect}"
+        body.close
+end
+$stdout.syswrite("ok\n")
+'
+
+t_begin "staggered pipeline of 3 HTTP requests" && {
+        req='GET /random_blob HTTP/1.1\r\nHost: example.com\r\n'
+        rm -f $ok
+        (
+                export random_blob_sha1
+                $RUBY -e "$script" < $fifo >> $ok &
+                printf "$req"'X-Req:0\r\n\r\n'
+                exec 6>&1
+                (
+                        dd bs=16384 count=1
+                        printf "$req" >&6
+                        dd bs=16384 count=1
+                        printf 'X-Req:1\r\n\r\n' >&6
+                        dd bs=16384 count=1
+                        printf "$req" >&6
+                        dd bs=16384 count=1
+                        printf 'X-Req:2\r\n' >&6
+                        dd bs=16384 count=1
+                        printf 'Connection: close\r\n\r' >&6
+                        dd bs=16384 count=1
+                        printf '\n' >&6
+                        cat
+                ) < $dd_fifo > $fifo &
+                wait
+                echo ok >> $ok
+        ) | socat - TCP:$listen > $dd_fifo
+        test 2 -eq $(grep '^ok$' $ok |wc -l)
+}
+
+t_begin "pipeline 3 HTTP requests" && {
+        rm -f $ok
+        req='GET /random_blob HTTP/1.1\r\nHost: example.com\r\n'
+        req="$req"'\r\n'"$req"'\r\n'"$req"
+        req="$req"'Connection: close\r\n\r\n'
+        (
+                export random_blob_sha1
+                $RUBY -e "$script" < $fifo >> $ok &
+                printf "$req"
+                wait
+                echo ok >> $ok
+        ) | socat - TCP:$listen > $fifo
+        test 2 -eq $(grep '^ok$' $ok |wc -l)
+}
+
+t_begin "shutdown server" && {
+        kill -QUIT $rainbows_pid
+}
+
+t_begin "check stderr" && check_stderr
+
+t_done