From c6c2f7782e2270ee4684d2405376a186aa806fcd Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Fri, 3 Dec 2010 17:27:53 -0800 Subject: add optional thread-safety on a per-object basis Thread-safety is useful sometimes and needless overhead otherwise. Default to whatever TDB upstream defaults to. --- ext/tdb/tdb.c | 8 +++-- lib/tdb.rb | 14 +++++++++ lib/tdb/mt.rb | 38 ++++++++++++++++++++++++ test/test_tdb_mt.rb | 85 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 143 insertions(+), 2 deletions(-) create mode 100644 lib/tdb/mt.rb create mode 100644 test/test_tdb_mt.rb diff --git a/ext/tdb/tdb.c b/ext/tdb/tdb.c index a68c360..373da17 100644 --- a/ext/tdb/tdb.c +++ b/ext/tdb/tdb.c @@ -158,7 +158,7 @@ static VALUE nogvl_open(void *ptr) return (VALUE)tdb; } -static void set_args(struct open_args *o, VALUE opts) +static void set_args(VALUE self, struct open_args *o, VALUE opts) { VALUE tmp; @@ -203,6 +203,10 @@ static void set_args(struct open_args *o, VALUE opts) o->hash_fn = (tdb_hash_func)NUM2ULONG(num); } + + tmp = rb_hash_aref(opts, ID2SYM(rb_intern("threadsafe"))); + if (RTEST(tmp)) + rb_funcall(self, rb_intern("threadsafe!"), 0); } /* @@ -242,7 +246,7 @@ static VALUE init(int argc, VALUE *argv, VALUE self) if (tdb) rb_raise(rb_eRuntimeError, "TDB already initialized"); rb_scan_args(argc, argv, "11", &path, &opts); - set_args(&o, opts); + set_args(self, &o, opts); if (NIL_P(path)) o.tdb_flags |= TDB_INTERNAL; diff --git a/lib/tdb.rb b/lib/tdb.rb index 2a75193..67114c9 100644 --- a/lib/tdb.rb +++ b/lib/tdb.rb @@ -1,2 +1,16 @@ # -*- encoding: binary -*- require 'tdb_ext' +class TDB + autoload :MT, 'tdb/mt' + + # makes the current TDB object thread-safe + def threadsafe! + extend MT + end + + # will return true when TDB::MT is included in TDB or the TDB + # object is extended by TDB + def threadsafe? + false + end +end diff --git a/lib/tdb/mt.rb b/lib/tdb/mt.rb new file mode 100644 index 0000000..6742cc4 --- /dev/null +++ b/lib/tdb/mt.rb @@ -0,0 +1,38 @@ +# -*- encoding: binary -*- +module TDB::MT + def initialize + super + @lock = Mutex.new + end + + %w( + close closed? fetch [] store []= insert! modify! insert modify + key? has_key? include? member? + nuke! delete + lockall trylockall unlockall + lockall_read trylockall_read unlockall_read + lockall_mark lockall_unmark + ).each do |meth| + eval "def #{meth}(*args); @lock.synchronize { super }; end" + end + + def each(&block) + @lock.synchronize do + super { |k,v| @lock.exclusive_unlock { yield(k,v) } } + end + end + + def threadsafe? + true + end + + def self.extended(obj) + obj.instance_eval { @lock = Mutex.new unless defined?(@lock) } + end + + def self.included(klass) + ObjectSpace.each_object(klass) { |obj| + obj.instance_eval { @lock = Mutex.new unless defined?(@lock) } + } + end +end diff --git a/test/test_tdb_mt.rb b/test/test_tdb_mt.rb new file mode 100644 index 0000000..0fbe09f --- /dev/null +++ b/test/test_tdb_mt.rb @@ -0,0 +1,85 @@ +# -*- encoding: binary -*- +$stdout.sync = $stderr.sync = true +require 'test/unit' +require 'tempfile' +$-w = true +require 'tdb' + +class Test_TDB_MT < Test::Unit::TestCase + def setup + @tdb = @tmp = nil + @start_pid = $$ + end + + def teardown + return if @start_pid != $$ + @tmp.close! if @tmp.respond_to?(:close!) + @tdb.close if @tdb && ! @tdb.closed? + end + + def test_make_threadsafe + @tdb = TDB.new(nil) + assert_kind_of TDB, @tdb + assert ! @tdb.threadsafe? + assert_nothing_raised { @tdb.threadsafe! } + assert @tdb.threadsafe? + @tdb.each { |k,v| assert_equal v, @tdb[k] } + end + + def test_init_threadsafe + @tdb = TDB.new(nil, :threadsafe => true) + assert @tdb.threadsafe? + @tdb.close + @tdb = TDB.new(nil, :threadsafe => false) + assert ! @tdb.threadsafe? + @tdb.close + @tdb = TDB.new(nil) + assert ! @tdb.threadsafe? + @tdb.close + end + + def test_thread_safe_torture_test + @tdb = TDB.new(nil) + assert_nothing_raised { @tdb.threadsafe! } + pid = fork do + Thread.abort_on_exception = true + threads = [] + blob = 'foo' * 1000 + nr = 10000 + t = Thread.new do + while true + Thread.pass + @tdb.to_a + end + end + threads << Thread.new { nr.times { |i| @tdb[i.to_s] = blob } } + threads << Thread.new { nr.times { |i| @tdb[i.to_s] = blob } } + threads << Thread.new { nr.times { |i| @tdb[i.to_s] = blob } } + threads << Thread.new { nr.times { |i| @tdb[i.to_s] = blob } } + threads << t + + t.kill + threads.each { |t| t.join } + end + _, status = Process.waitpid2(pid) + assert status.success?, status.inspect + end + + def test_check_methods + m = TDB.instance_methods.sort + m -= Object.instance_methods + m -= Enumerable.instance_methods + m.map! { |x| x.to_sym } + mt = TDB::MT.instance_methods.sort + m -= [ :threadsafe! ] + m += [ :include?, :member? ] + m.sort! + unwrapped = ( (m - mt) | (mt - m)).uniq + assert unwrapped.empty?, "unwrapped methods: #{unwrapped.inspect}" + @tdb = TDB.new(nil) + respond_to?(:refute_match) and + m.each { |meth| refute_match(/\bTDB::MT\b/, @tdb.method(meth).to_s) } + @tdb.threadsafe! + m.each { |meth| assert_match(/\bTDB::MT\b/, @tdb.method(meth).to_s) } + end +end -- cgit v1.2.3-24-ge0c7