about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2011-05-16 20:18:12 -0700
committerEric Wong <normalperson@yhbt.net>2011-05-17 14:14:26 -0700
commit86508cdc09ef36d8c5ae708ef9e3e19a82844e98 (patch)
tree242519a48afe5d324a6c5655fdd0ac902143dd6a
parentb9cf769a639d22e63d2c611541000b46e5d07d45 (diff)
downloadruby_io_splice-86508cdc09ef36d8c5ae708ef9e3e19a82844e98.tar.gz
The pipe may be full from small buffers due to how TCP
works, so we need to use non-blocking I/O on the pipe.

ref: http://lkml.org/lkml/2009/1/13/478
-rw-r--r--lib/io/splice.rb19
-rw-r--r--test/test_tcp_splice.rb49
2 files changed, 58 insertions, 10 deletions
diff --git a/lib/io/splice.rb b/lib/io/splice.rb
index 1ff2f6a..89fab92 100644
--- a/lib/io/splice.rb
+++ b/lib/io/splice.rb
@@ -94,16 +94,15 @@ module IO::Splice
   # Returns the number of bytes actually spliced.
   # Like IO#readpartial, this never returns Errno::EAGAIN
   def self.partial(src, dst, len, src_offset)
-    IO.splice(src, src_offset, dst, nil, len, F_MOVE)
-    rescue EOFError
-      nil
-    rescue Errno::EAGAIN
-      begin
-        src.to_io.wait
-        IO.select(nil, [dst])
-        rv = IO.trysplice(src, src_offset, dst, nil, len, F_MOVE)
-      end while rv == :EAGAIN
-      rv
+    case rv = IO.trysplice(src, src_offset, dst, nil, len, F_MOVE)
+    when :EAGAIN
+      src.to_io.wait
+      IO.select(nil, [dst])
+    when Integer
+      return rv
+    else
+      return nil
+    end while true
   end
 end
 if (! defined?(RUBY_ENGINE) || RUBY_ENGINE == "ruby") &&
diff --git a/test/test_tcp_splice.rb b/test/test_tcp_splice.rb
new file mode 100644
index 0000000..2ddbfcb
--- /dev/null
+++ b/test/test_tcp_splice.rb
@@ -0,0 +1,49 @@
+require 'socket'
+require 'io/wait'
+require 'io/splice'
+require 'io/nonblock'
+require "test/unit"
+
+class TestTCPCopyStream < Test::Unit::TestCase
+  def setup
+    host = ENV["TEST_HOST"] || "127.0.0.1"
+    @srv = TCPServer.new(host, 0)
+    @port = @srv.addr[1]
+    @client = TCPSocket.new(host, @port)
+    @client.setsockopt Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1
+    @accept = @srv.accept
+    @accept.setsockopt Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1
+    @client.sync = @accept.sync = true
+    @r, @w = IO.pipe
+  end
+
+  def teardown
+    @srv.close
+    [ @client, @accept, @r, @w ].each { |io| io.close unless io.closed? }
+  end
+
+  def test_client_to_server_eof
+    nr = 2000
+    buf = '0123456789abcdef' * 1024
+    expect = buf.size * nr
+    thr = Thread.new do
+      nr.times { @client.write(buf) }
+      @client.close
+    end
+    sleep 1 # wait for rcvbuf to fill up
+    bytes = IO::Splice.copy_stream(@accept, "/dev/null")
+    assert_equal expect, bytes
+  end
+
+  def test_client_to_server_expect
+    nr = 2000
+    buf = '0123456789abcdef' * 1024
+    expect = buf.size * nr
+    thr = Thread.new do
+      nr.times { @client.write(buf) }
+    end
+    sleep 1 # wait for rcvbuf to fill up
+    bytes = IO::Splice.copy_stream(@accept, "/dev/null", expect)
+    assert_equal expect, bytes
+  end
+end