From a997f4822a99590c7a5175be4a694b4482a4b997 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Sun, 9 May 2010 00:05:03 -0700 Subject: fix POSIX_MQ#notify(&block) aka SIGEV_THREAD tests for them were stupidly broken and never executed :x --- ext/posix_mq/posix_mq.c | 78 +++++++++++++++++++++++++++---------------------- lib/posix_mq.rb | 7 ++--- test/test_posix_mq.rb | 33 ++++++++++----------- 3 files changed, 61 insertions(+), 57 deletions(-) diff --git a/ext/posix_mq/posix_mq.c b/ext/posix_mq/posix_mq.c index 4bf4262..db50534 100644 --- a/ext/posix_mq/posix_mq.c +++ b/ext/posix_mq/posix_mq.c @@ -724,6 +724,10 @@ static int lookup_sig(VALUE sig) return NUM2INT(sig); } +/* + * TODO: Under Linux, we could just use netlink directly + * the same way glibc does... + */ /* we spawn a thread just to write ONE byte into an fd (usually a pipe) */ static void thread_notify_fd(union sigval sv) { @@ -732,30 +736,49 @@ static void thread_notify_fd(union sigval sv) while ((write(fd, "", 1) < 0) && (errno == EINTR || errno == EAGAIN)); } -/* - * TODO: Under Linux, we could just use netlink directly - * the same way glibc does... - */ -static void setup_notify_io(struct sigevent *not, VALUE io) +/* :nodoc: */ +static VALUE setnotify_exec(VALUE self, VALUE io, VALUE thr) { int fd = NUM2INT(rb_funcall(io, id_fileno, 0, 0)); + struct posix_mq *mq = get(self, 1); + struct sigevent not; pthread_attr_t attr; - int e; - if ((e = pthread_attr_init(&attr))) - goto err; - if ((e = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED))) - goto err; + errno = pthread_attr_init(&attr); + if (errno) rb_sys_fail("pthread_attr_init"); + + errno = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); + if (errno) rb_sys_fail("pthread_attr_setdetachstate"); + #ifdef PTHREAD_STACK_MIN (void)pthread_attr_setstacksize(&attr, PTHREAD_STACK_MIN); #endif - not->sigev_notify = SIGEV_THREAD; - not->sigev_notify_function = thread_notify_fd; - not->sigev_notify_attributes = &attr; - not->sigev_value.sival_int = fd; - return; -err: - rb_raise(rb_eRuntimeError, "pthread failure: %s\n", strerror(e)); + + not.sigev_notify = SIGEV_THREAD; + not.sigev_notify_function = thread_notify_fd; + not.sigev_notify_attributes = &attr; + not.sigev_value.sival_int = fd; + + if (!NIL_P(mq->thread)) + rb_funcall(mq->thread, id_kill, 0, 0); + mq->thread = thr; + + if (mq_notify(mq->des, ¬) < 0) + rb_sys_fail("mq_notify"); + + return thr; +} + +/* :nodoc: */ +static VALUE notify_cleanup(VALUE self) +{ + struct posix_mq *mq = get(self, 1); + + if (!NIL_P(mq->thread)) { + rb_funcall(mq->thread, id_kill, 0, 0); + mq->thread = Qnil; + } + return Qnil; } /* @@ -789,10 +812,7 @@ static VALUE setnotify(VALUE self, VALUE arg) struct sigevent * notification = ¬ VALUE rv = arg; - if (!NIL_P(mq->thread)) { - rb_funcall(mq->thread, id_kill, 0, 0); - mq->thread = Qnil; - } + notify_cleanup(self); not.sigev_notify = SIGEV_SIGNAL; switch (TYPE(arg)) { @@ -810,11 +830,7 @@ static VALUE setnotify(VALUE self, VALUE arg) not.sigev_signo = lookup_sig(arg); rv = INT2NUM(not.sigev_signo); break; - case T_FILE: - setup_notify_io(¬, arg); - break; default: - /* maybe support Proc+thread via sigev_notify_function.. */ rb_raise(rb_eArgError, "must be a signal or nil"); } @@ -866,15 +882,6 @@ static VALUE setnonblock(VALUE self, VALUE nb) return nb; } -/* :nodoc: */ -static VALUE setnotifythread(VALUE self, VALUE thread) -{ - struct posix_mq *mq = get(self, 1); - - mq->thread = thread; - return thread; -} - void Init_posix_mq_ext(void) { cPOSIX_MQ = rb_define_class("POSIX_MQ", rb_cObject); @@ -914,7 +921,8 @@ void Init_posix_mq_ext(void) rb_define_method(cPOSIX_MQ, "name", name, 0); rb_define_method(cPOSIX_MQ, "notify=", setnotify, 1); rb_define_method(cPOSIX_MQ, "nonblock=", setnonblock, 1); - rb_define_method(cPOSIX_MQ, "notify_thread=", setnotifythread, 1); + rb_define_method(cPOSIX_MQ, "notify_exec", setnotify_exec, 2); + rb_define_method(cPOSIX_MQ, "notify_cleanup", notify_cleanup, 0); rb_define_method(cPOSIX_MQ, "nonblock?", getnonblock, 0); #ifdef MQD_TO_FD rb_define_method(cPOSIX_MQ, "to_io", to_io, 0); diff --git a/lib/posix_mq.rb b/lib/posix_mq.rb index b40a4c4..5125db5 100644 --- a/lib/posix_mq.rb +++ b/lib/posix_mq.rb @@ -47,7 +47,7 @@ class POSIX_MQ block.arity == 1 or raise ArgumentError, "arity of notify block must be 1" r, w = IO.pipe - self.notify_thread = Thread.new(r, w, self) do |r, w, mq| + self.notify_exec(w, Thread.new(r, w, self) do |r, w, mq| begin begin r.read(1) or raise Errno::EINTR @@ -56,12 +56,11 @@ class POSIX_MQ end block.call(mq) ensure - mq.notify_thread = nil + mq.notify_cleanup r.close rescue nil w.close rescue nil end - end - self.notify = w + end) nil end if RUBY_PLATFORM =~ /linux/ diff --git a/test/test_posix_mq.rb b/test/test_posix_mq.rb index 963d623..964fa94 100644 --- a/test/test_posix_mq.rb +++ b/test/test_posix_mq.rb @@ -3,16 +3,16 @@ require 'test/unit' require 'posix_mq' require 'thread' require 'fcntl' +require 'set' $stderr.sync = $stdout.sync = true class Test_POSIX_MQ < Test::Unit::TestCase + METHODS = Set.new(POSIX_MQ.instance_methods.map { |x| x.to_sym }) - HAVE_TO_IO = if POSIX_MQ.instance_methods.grep(/\Ato_io\z/).empty? + METHODS.include?(:to_io) or warn "POSIX_MQ#to_io not supported on this platform: #{RUBY_PLATFORM}" - false - else - true - end + METHODS.include?(:notify) or + warn "POSIX_MQ#notify not supported on this platform: #{RUBY_PLATFORM}" def setup @mq = nil @@ -155,7 +155,7 @@ class Test_POSIX_MQ < Test::Unit::TestCase @mq = POSIX_MQ.new @path, IO::CREAT|IO::RDWR, 0666 assert @mq.to_io.kind_of?(IO) assert_nothing_raised { IO.select([@mq], nil, nil, 0) } - end if HAVE_TO_IO + end if METHODS.include?(:to_io) def test_notify rd, wr = IO.pipe @@ -224,7 +224,7 @@ class Test_POSIX_MQ < Test::Unit::TestCase def test_new_sym_w @mq = POSIX_MQ.new @path, :w assert_equal IO::WRONLY, @mq.to_io.fcntl(Fcntl::F_GETFL) - end if HAVE_TO_IO + end if METHODS.include?(:to_io) def test_new_sym_r @mq = POSIX_MQ.new @path, :w @@ -232,7 +232,7 @@ class Test_POSIX_MQ < Test::Unit::TestCase assert_nothing_raised { mq = POSIX_MQ.new @path, :r } assert_equal IO::RDONLY, mq.to_io.fcntl(Fcntl::F_GETFL) assert_nil mq.close - end if HAVE_TO_IO + end if METHODS.include?(:to_io) def test_new_path_only @mq = POSIX_MQ.new @path, :w @@ -240,12 +240,12 @@ class Test_POSIX_MQ < Test::Unit::TestCase assert_nothing_raised { mq = POSIX_MQ.new @path } assert_equal IO::RDONLY, mq.to_io.fcntl(Fcntl::F_GETFL) assert_nil mq.close - end if HAVE_TO_IO + end if METHODS.include?(:to_io) def test_new_sym_wr @mq = POSIX_MQ.new @path, :rw assert_equal IO::RDWR, @mq.to_io.fcntl(Fcntl::F_GETFL) - end if HAVE_TO_IO + end if METHODS.include?(:to_io) def test_new_attr mq_attr = POSIX_MQ::Attr.new(IO::NONBLOCK, 1, 1, 0) @@ -274,24 +274,21 @@ class Test_POSIX_MQ < Test::Unit::TestCase q = Queue.new @mq = POSIX_MQ.new(@path, :rw) assert_nothing_raised { @mq.notify { |mq| q << mq } } - @mq << "hi" - assert_equal POSIX_MQ, q.pop.class + assert_nothing_raised { Process.waitpid2(fork { @mq << "hi" }) } + assert_equal @mq.object_id, q.pop.object_id assert_equal "hi", @mq.receive.first assert_nothing_raised { @mq.notify { |mq| q << "hi" } } - @mq << "bye" + assert_nothing_raised { Process.waitpid2(fork { @mq << "bye" }) } assert_equal "hi", q.pop - end if POSIX_MQ.respond_to?(:notify) + end if METHODS.include?(:notify) def test_notify_thread q = Queue.new @mq = POSIX_MQ.new(@path, :rw) - @mq.notify_thread = thr = Thread.new { sleep } - assert thr.alive? @mq.notify { |mq| q << Thread.current } @mq << "." x = q.pop assert x.instance_of?(Thread) assert Thread.current != x - assert ! thr.alive? - end if POSIX_MQ.respond_to?(:notify) + end if METHODS.include?(:notify) end -- cgit v1.2.3-24-ge0c7