about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2011-11-12 12:28:49 +0000
committerEric Wong <normalperson@yhbt.net>2011-11-12 12:28:49 +0000
commit7bedbf3ed5920a922da89874a2bd134fb1a82c83 (patch)
treeb5e00c841dcc372f8ef7e48dbbe6f74dcf71a52b
parent435d43d6b85481e7b678c3092dec7fcb246e30e8 (diff)
downloadmogilefs-client-7bedbf3ed5920a922da89874a2bd134fb1a82c83.tar.gz
Sometimes a server will shut down on us in the
middle of a pipeline.  That is bad.
-rw-r--r--lib/mogilefs/mogilefs.rb34
-rw-r--r--test/test_mogilefs.rb77
2 files changed, 97 insertions, 14 deletions
diff --git a/lib/mogilefs/mogilefs.rb b/lib/mogilefs/mogilefs.rb
index cfbf082..73827c3 100644
--- a/lib/mogilefs/mogilefs.rb
+++ b/lib/mogilefs/mogilefs.rb
@@ -263,21 +263,27 @@ class MogileFS::MogileFS < MogileFS::Client
       end
     end
     opts = { :domain => @domain }
-    keys.each do |key|
-      opts[:key] = key
-      @backend.pipeline_dispatch(:file_info, opts, &on_file_info)
-    end
-    @backend.pipeline_wait
-  rescue MogileFS::Backend::UnknownCommandError # MogileFS < 2.45
-    @backend.shutdown # reset the socket
-    args = { :pathcount => 0x7fffffff }
-    keys.each do |key|
-      paths = get_paths(key, args)
-      block.call(key, paths_size(paths), paths.size)
+    begin
+      keys.each do |key|
+        opts[:key] = key
+        @backend.pipeline_dispatch(:file_info, opts, &on_file_info)
+      end
+      @backend.pipeline_wait
+    rescue MogileFS::Backend::UnknownCommandError # MogileFS < 2.45
+      @backend.shutdown # reset the socket
+      args = { :pathcount => 0x7fffffff }
+      keys.each do |key|
+        paths = get_paths(key, args)
+        block.call(key, paths_size(paths), paths.size)
+      end
+    rescue MogileFS::PipelineError
+      @backend.shutdown
+      keys = ordered - ready.keys
+      retry
+    rescue
+      @backend.shutdown
+      raise
     end
-  rescue
-    @backend.shutdown
-    raise
   end
 
   # Return metadata about a file as a hash.
diff --git a/test/test_mogilefs.rb b/test/test_mogilefs.rb
index 0ab5dab..92137be 100644
--- a/test/test_mogilefs.rb
+++ b/test/test_mogilefs.rb
@@ -625,6 +625,83 @@ class TestMogileFS__MogileFS < TestMogileFS
     assert_equal 1, received.size
   end
 
+  def test_list_keys_verbose_ordering # implementation detail
+    received = []
+    sock = TCPServer.new("127.0.0.1", 0)
+    nargs = { :hosts => [ "127.0.0.1:#{sock.addr[1]}" ], :domain => "foo" }
+    c = MogileFS::MogileFS.new(nargs)
+    th = Thread.new do
+      a = sock.accept
+      %w(a b c d e).each do |key|
+        line = a.gets
+        cmd, args = line.split(/\s+/, 2)
+        args = c.backend.url_decode(args.strip)
+        assert_equal "file_info", cmd
+        assert_equal key, args["key"]
+      end
+      out = { "length" => 3, "devcount" => 6 }
+      %w(a b c d e).shuffle.each do |key|
+        out["key"] = key
+        a.write "OK #{c.backend.url_encode(out)}\r\n"
+      end
+      a.close
+    end
+
+    blk = lambda do |key, length, devcount|
+      received << [ key, length, devcount ]
+    end
+    c.list_keys_verbose(%w(a b c d e), blk)
+    th.join
+    received.map! { |(key,_,_)| key }
+    assert_equal %w(a b c d e), received
+    ensure
+      sock.close
+  end
+
+  def test_list_keys_verbose_retry # implementation detail
+    received = []
+    sock = TCPServer.new("127.0.0.1", 0)
+    nargs = { :hosts => [ "127.0.0.1:#{sock.addr[1]}" ], :domain => "foo" }
+    c = MogileFS::MogileFS.new(nargs)
+    th = Thread.new do
+      a = sock.accept
+      %w(a b c d e).each do |key|
+        line = a.gets
+        cmd, args = line.split(/\s+/, 2)
+        args = c.backend.url_decode(args.strip)
+        assert_equal "file_info", cmd
+        assert_equal key, args["key"]
+      end
+      out = { "length" => 3, "devcount" => 6 }
+      %w(d e).each do |key|
+        out["key"] = key
+        a.write "OK #{c.backend.url_encode(out)}\r\n"
+      end
+      a.close # trigger EOF
+      a = sock.accept # client will retry
+      %w(a b c).each do |key|
+        line = a.gets
+        cmd, args = line.split(/\s+/, 2)
+        args = c.backend.url_decode(args.strip)
+        assert_equal "file_info", cmd
+        assert_equal key, args["key"]
+        out["key"] = key
+        a.write "OK #{c.backend.url_encode(out)}\r\n"
+      end
+      a.close
+    end
+
+    blk = lambda do |key, length, devcount|
+      received << [ key, length, devcount ]
+    end
+    c.list_keys_verbose(%w(a b c d e), blk)
+    th.join
+    received.map! { |(key,_,_)| key }
+    assert_equal %w(a b c d e), received
+    ensure
+      sock.close
+  end
+
   def test_sleep
     @backend.sleep = {}
     assert_nothing_raised do