about summary refs log tree commit homepage
diff options
context:
space:
mode:
-rw-r--r--lib/mogilefs/mogilefs.rb14
-rw-r--r--lib/mogilefs/util.rb23
-rw-r--r--test/test_util.rb61
3 files changed, 86 insertions, 12 deletions
diff --git a/lib/mogilefs/mogilefs.rb b/lib/mogilefs/mogilefs.rb
index c7c33ed..14fc0c2 100644
--- a/lib/mogilefs/mogilefs.rb
+++ b/lib/mogilefs/mogilefs.rb
@@ -71,17 +71,8 @@ class MogileFS::MogileFS < MogileFS::Client
       when /^http:\/\// then
         begin
           sock = http_get_sock(URI.parse(path))
-          return( if block_given?
-                    yield(sock)
-                  else
-                    begin
-                      sock.read
-                    rescue Errno::EAGAIN
-                      IO.select([sock])
-                      retry
-                    end
-                  end )
-          # return block_given? ? yield(sock) : sock.read
+          return yield(sock) if block_given?
+          return sysread_full(sock, sock.mogilefs_size, @get_file_data_timeout)
         rescue MogileFS::Timeout, Errno::ECONNREFUSED,
                EOFError, SystemCallError, MogileFS::InvalidResponseError
           next
@@ -288,6 +279,7 @@ class MogileFS::MogileFS < MogileFS::Client
       buf = sock.recv(4096, Socket::MSG_PEEK)
       head, body = buf.split(/\r\n\r\n/, 2)
       if head =~ HTTP_200_OK
+        sock.mogilefs_size = head[/^Content-Length:\s*(\d+)/i, 1].to_i
         sock.recv(head.size + 4, 0)
         return sock
       end
diff --git a/lib/mogilefs/util.rb b/lib/mogilefs/util.rb
index deaf6b0..9ccd4f0 100644
--- a/lib/mogilefs/util.rb
+++ b/lib/mogilefs/util.rb
@@ -67,6 +67,27 @@ module MogileFS::Util
     # should never get here
   end
 
+  def sysread_full(io_rd, size, timeout = nil, full_timeout = false)
+    tmp = [] # avoid expensive string concatenation with every loop iteration
+    reader = io_rd.method(timeout ? :read_nonblock : :sysread)
+    begin
+      while size > 0
+        tmp << reader.call(size)
+        size -= tmp.last.size
+      end
+    rescue Errno::EAGAIN, Errno::EINTR
+      t0 = Time.now
+      ready = IO.select([ io_rd ], nil, nil, timeout)
+      timeout -= (Time.now - t0) if full_timeout
+      if ready != [ io_rd ] || timeout < 0
+        raise MogileFS::Timeout, 'sysread_full timeout'
+      end
+      retry
+    rescue EOFError
+    end
+    tmp.join('')
+  end
+
   class StoreContent < Proc
     def initialize(total_size, &writer_proc)
       @total_size = total_size
@@ -87,7 +108,7 @@ require 'timeout'
 class MogileFS::Timeout < Timeout::Error; end
 
 class Socket
-  attr_accessor :mogilefs_addr, :mogilefs_connected
+  attr_accessor :mogilefs_addr, :mogilefs_connected, :mogilefs_size
 
   TCP_CORK = 3 if ! defined?(TCP_CORK) && RUBY_PLATFORM =~ /linux/
 
diff --git a/test/test_util.rb b/test/test_util.rb
index 49c1735..55cac66 100644
--- a/test/test_util.rb
+++ b/test/test_util.rb
@@ -56,4 +56,65 @@ class TestMogileFS__Util < Test::Unit::TestCase
       t.destroy!
   end
 
+  def test_sysread_slowly
+    nr = 10
+    str = 'abcde'
+    expect = str * nr
+    rd, wr = IO.pipe
+    pid = fork do
+      rd.close
+      nr.times do
+        syswrite_full(wr, str)
+        sleep(0.1)
+      end
+    end
+    wr.close
+    buf = sysread_full(rd, expect.size)
+    assert_equal expect, buf
+    rd.close
+    ensure
+      Process.kill('TERM', pid) rescue nil
+      Process.waitpid(pid) rescue nil
+  end
+
+  def test_sysread_timeout
+    nr = 10
+    str = 'abcde'
+    expect = str * nr
+    rd, wr = IO.pipe
+    pid = fork do
+      rd.close
+      nr.times do
+        syswrite_full(wr, str)
+        sleep 1
+      end
+    end
+    wr.close
+    assert_raises(MogileFS::Timeout) { sysread_full(rd, expect.size, 0.1) }
+    rd.close
+    ensure
+      Process.kill('TERM', pid) rescue nil
+      Process.waitpid(pid) rescue nil
+  end
+
+  def test_sysread_full_timeout
+    nr = 100
+    str = 'abcde'
+    expect = str * nr
+    rd, wr = IO.pipe
+    pid = fork do
+      rd.close
+      nr.times do
+        syswrite_full(wr, str)
+        sleep 0.01
+      end
+    end
+    wr.close
+    assert_raises(MogileFS::Timeout) { sysread_full(rd,expect.size,0.1,true) }
+    rd.close
+    ensure
+      Process.kill('TERM', pid) rescue nil
+      Process.waitpid(pid) rescue nil
+  end
+
 end