about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2013-05-11 10:05:35 +0000
committerEric Wong <normalperson@yhbt.net>2013-05-11 10:05:35 +0000
commitabdd10fa31fa771602ba198440693ef43a78eb6c (patch)
tree0daa3545f63ddbee1f2f88982d4f5877906e3d16
parent903c58d583a1382f90063d3fcc3ded5b06d8004a (diff)
downloadomgf-abdd10fa31fa771602ba198440693ef43a78eb6c.tar.gz
regurgitator: fall back to tracker if regurgitator fails
regurgitator may be hooked up to a read-only slave DB, so we
should be able to fall back to the tracker if the slave DB
fails.

While we're at it, add long-overdue tests for regurgitator
integration.
-rw-r--r--lib/omgf/hysterical_raisins.rb8
-rw-r--r--lib/omgf/pool.rb6
-rw-r--r--lib/omgf/regurgitator.rb20
-rw-r--r--omgf.gemspec2
-rw-r--r--test/test_regurgitator.rb69
5 files changed, 95 insertions, 10 deletions
diff --git a/lib/omgf/hysterical_raisins.rb b/lib/omgf/hysterical_raisins.rb
index f48e9fe..e528977 100644
--- a/lib/omgf/hysterical_raisins.rb
+++ b/lib/omgf/hysterical_raisins.rb
@@ -114,7 +114,7 @@ class OMGF::HystericalRaisins
 
   # HEAD /$DOMAIN/$KEY
   def stat_key(env, domain, key)
-    size, uris = mg_size_and_uris(domain, key, @get_paths_opts)
+    size, uris = mg_size_and_uris(env, domain, key, @get_paths_opts)
     uris = @vp.verify(uris, 1, @verify_timeout).flatten!
 
     return r(503, "") unless uris && uris[0]
@@ -130,7 +130,7 @@ class OMGF::HystericalRaisins
 
   # GET /$DOMAIN/$KEY
   def redirect_key(env, domain, key)
-    uris = mg_get_uris(domain, key, @get_paths_opts)
+    uris = mg_get_uris(env, domain, key, @get_paths_opts)
     uris = @vp.verify(uris, 1, @verify_timeout).flatten!
 
     return r(503, "") unless uris && dest = uris.shift
@@ -159,12 +159,12 @@ class OMGF::HystericalRaisins
     when "application/json"
       tmp = []
       h = { "Content-Type" => "application/json" }
-      mg_list_keys(domain, prefix, after, limit) { |*x| tmp << x }
+      mg_list_keys(env, domain, prefix, after, limit) { |*x| tmp << x }
       tmp = tmp.to_json
     else
       tmp = ""
       h = { "Content-Type" => "text/plain" }
-      mg_list_keys(domain, prefix, after, limit) do |dkey,length,devcount|
+      mg_list_keys(env, domain, prefix, after, limit) do |dkey,length,devcount|
         tmp << "#{dkey}|#{length}|#{devcount}\n"
       end
     end
diff --git a/lib/omgf/pool.rb b/lib/omgf/pool.rb
index 6046287..4470e1a 100644
--- a/lib/omgf/pool.rb
+++ b/lib/omgf/pool.rb
@@ -32,15 +32,15 @@ module OMGF::Pool
     end
   end
 
-  def mg_list_keys(domain, prefix, after, limit, &block) # :nodoc:
+  def mg_list_keys(env, domain, prefix, after, limit, &block) # :nodoc:
     pool_use(domain) { |mg| mg.list_keys(prefix, after, limit, &block) }
   end
 
-  def mg_get_uris(domain, key, opts) # :nodoc:
+  def mg_get_uris(env, domain, key, opts) # :nodoc:
     pool_use(domain) { |mg| mg.get_uris(key, opts) }
   end
 
-  def mg_size_and_uris(domain, key, opts) # :nodoc:
+  def mg_size_and_uris(env, domain, key, opts) # :nodoc:
     pool_use(domain) do |mg|
       [ mg.size(key), mg.get_uris(key, opts) ]
     end
diff --git a/lib/omgf/regurgitator.rb b/lib/omgf/regurgitator.rb
index 094a01c..d9b2bfd 100644
--- a/lib/omgf/regurgitator.rb
+++ b/lib/omgf/regurgitator.rb
@@ -15,28 +15,42 @@ module OMGF::Regurgitator
     get_dmid(domain) or raise MogileFS::Backend::UnregDomainError
   end
 
-  def mg_list_keys(domain, prefix, after, limit)
+  def _log_err(env, m, e)
+    l = env["rack.logger"] or return
+    l.error("retrying #{m} against tracker: #{e.message} (#{e.class})")
+  end
+
+  def mg_list_keys(env, domain, prefix, after, limit)
     check_domain!(domain)
     list_keys(domain, prefix: prefix, after: after, limit: limit) do |x|
       yield(x[:dkey], x[:length], x[:devcount])
     end
+  rescue Sequel::Error => e
+    _log_err(env, "mg_list_keys", e)
+    super
   end
 
   def _uris!(info)
     info[:uris].values.flatten!
   end
 
-  def mg_get_uris(domain, key, get_path_opts = {})
+  def mg_get_uris(env, domain, key, get_path_opts = {})
     check_domain!(domain)
     info = file_info({}, domain, key) or
       raise MogileFS::Backend::UnknownKeyError
     _uris!(info)
+  rescue Sequel::Error => e
+    _log_err(env, "mg_get_uris", e)
+    super
   end
 
-  def mg_size_and_uris(domain, key, get_path_opts = {})
+  def mg_size_and_uris(env, domain, key, get_path_opts = {})
     check_domain!(domain)
     info = file_info({}, domain, key) or
       raise MogileFS::Backend::UnknownKeyError
     [ info[:length], _uris!(info) ]
+  rescue Sequel::Error => e
+    _log_err(env, "mg_size_and_uris", e)
+    super
   end
 end
diff --git a/omgf.gemspec b/omgf.gemspec
index 9c3c649..d9a2c6e 100644
--- a/omgf.gemspec
+++ b/omgf.gemspec
@@ -23,6 +23,8 @@ Gem::Specification.new do |s|
   s.add_dependency('kgio', ['~> 2.7'])
   s.add_dependency('kcar', ['~> 0.3'])
   s.add_dependency('mogilefs-client', ['~> 3.1'])
+  s.add_development_dependency('regurgitator')
+  s.add_development_dependency('sqlite3')
 
   s.licenses = %w(AGPLv3+)
 end
diff --git a/test/test_regurgitator.rb b/test/test_regurgitator.rb
new file mode 100644
index 0000000..950f354
--- /dev/null
+++ b/test/test_regurgitator.rb
@@ -0,0 +1,69 @@
+# Copyright (C) 2008-2013, Eric Wong <normalperson@yhbt.net>
+# License: AGPLv3 or later (https://www.gnu.org/licenses/agpl-3.0.txt)
+require './test/integration'
+require 'rack/mock'
+require 'open-uri'
+require 'omgf/hysterical_raisins'
+require 'digest/md5'
+require 'sequel' # regurgitator uses sequel
+Sequel.extension :migration, :schema_dumper
+
+class TestRegurgitator < Test::Unit::TestCase
+  include TestMogileFSIntegration
+  def setup
+    setup_mogilefs
+    @admin.create_domain("testdom")
+    @err = StringIO.new
+    @mirror = Tempfile.new("db.mirror")
+    logger = Logger.new(@err)
+    @opts = { "rack.logger" => logger }
+    @orig_db = Sequel.connect("sqlite://#{@dbname.path}")
+    @mirror_db = Sequel.connect("sqlite://#{@mirror.path}")
+    mig = @orig_db.dump_schema_migration(:indexes => false, :same_db => true)
+    mig = eval(mig)
+    mig.apply(@mirror_db, :up)
+    omgf_opts = {
+      :hosts => @hosts,
+      :logger => logger,
+      :db => @mirror_db,
+    }
+    @app = OMGF::HystericalRaisins.new(omgf_opts)
+    @req = Rack::MockRequest.new(@app)
+  end
+
+  def get_recent
+    s = TCPSocket.new(@test_host, @tracker_port)
+    s.write("!recent\r\n")
+    lines = []
+    while line = s.gets
+      break if line =~ /\A\.\r?\n/
+      lines << line
+    end
+    lines
+  ensure
+    s.close if s
+  end
+
+  def test_listing
+    # missing domain
+    resp = @req.get("/non-existent", @opts)
+    assert_equal 0, @err.string.size
+    assert_equal 404, resp.status, "non-existent domain listing gives 404"
+    lines = get_recent
+    assert(/non-existent/ !~ lines.join, lines.inspect)
+
+    # corrupt the mirror
+    @mirror_db.disconnect
+    @mirror.syswrite("\0" * (4096 * 1024))
+
+    resp = @req.get("/non-existant", @opts)
+    assert_match(/retrying mg_list_keys against tracker/, @err.string)
+    assert_equal 404, resp.status, "non-existant domain listing gives 404"
+    lines = get_recent
+    assert_match(/non-existant/, lines.join, lines.inspect)
+  end
+
+  def teardown
+    teardown_mogilefs
+  end
+end