about summary refs log tree commit homepage
diff options
context:
space:
mode:
-rw-r--r--ext/posix_mq/posix_mq.c78
-rw-r--r--lib/posix_mq.rb7
-rw-r--r--test/test_posix_mq.rb33
3 files changed, 61 insertions, 57 deletions
diff --git a/ext/posix_mq/posix_mq.c b/ext/posix_mq/posix_mq.c
index 4bf4262..db50534 100644
--- a/ext/posix_mq/posix_mq.c
+++ b/ext/posix_mq/posix_mq.c
@@ -724,6 +724,10 @@ static int lookup_sig(VALUE sig)
         return NUM2INT(sig);
 }
 
+/*
+ * TODO: Under Linux, we could just use netlink directly
+ * the same way glibc does...
+ */
 /* we spawn a thread just to write ONE byte into an fd (usually a pipe) */
 static void thread_notify_fd(union sigval sv)
 {
@@ -732,30 +736,49 @@ static void thread_notify_fd(union sigval sv)
         while ((write(fd, "", 1) < 0) && (errno == EINTR || errno == EAGAIN));
 }
 
-/*
- * TODO: Under Linux, we could just use netlink directly
- * the same way glibc does...
- */
-static void setup_notify_io(struct sigevent *not, VALUE io)
+/* :nodoc: */
+static VALUE setnotify_exec(VALUE self, VALUE io, VALUE thr)
 {
         int fd = NUM2INT(rb_funcall(io, id_fileno, 0, 0));
+        struct posix_mq *mq = get(self, 1);
+        struct sigevent not;
         pthread_attr_t attr;
-        int e;
 
-        if ((e = pthread_attr_init(&attr)))
-                goto err;
-        if ((e = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED)))
-                goto err;
+        errno = pthread_attr_init(&attr);
+        if (errno) rb_sys_fail("pthread_attr_init");
+
+        errno = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
+        if (errno) rb_sys_fail("pthread_attr_setdetachstate");
+
 #ifdef PTHREAD_STACK_MIN
         (void)pthread_attr_setstacksize(&attr, PTHREAD_STACK_MIN);
 #endif
-        not->sigev_notify = SIGEV_THREAD;
-        not->sigev_notify_function = thread_notify_fd;
-        not->sigev_notify_attributes = &attr;
-        not->sigev_value.sival_int = fd;
-        return;
-err:
-        rb_raise(rb_eRuntimeError, "pthread failure: %s\n", strerror(e));
+
+        not.sigev_notify = SIGEV_THREAD;
+        not.sigev_notify_function = thread_notify_fd;
+        not.sigev_notify_attributes = &attr;
+        not.sigev_value.sival_int = fd;
+
+        if (!NIL_P(mq->thread))
+                rb_funcall(mq->thread, id_kill, 0, 0);
+        mq->thread = thr;
+
+        if (mq_notify(mq->des, &not) < 0)
+                rb_sys_fail("mq_notify");
+
+        return thr;
+}
+
+/* :nodoc: */
+static VALUE notify_cleanup(VALUE self)
+{
+        struct posix_mq *mq = get(self, 1);
+
+        if (!NIL_P(mq->thread)) {
+                rb_funcall(mq->thread, id_kill, 0, 0);
+                mq->thread = Qnil;
+        }
+        return Qnil;
 }
 
 /*
@@ -789,10 +812,7 @@ static VALUE setnotify(VALUE self, VALUE arg)
         struct sigevent * notification = &not;
         VALUE rv = arg;
 
-        if (!NIL_P(mq->thread)) {
-                rb_funcall(mq->thread, id_kill, 0, 0);
-                mq->thread = Qnil;
-        }
+        notify_cleanup(self);
         not.sigev_notify = SIGEV_SIGNAL;
 
         switch (TYPE(arg)) {
@@ -810,11 +830,7 @@ static VALUE setnotify(VALUE self, VALUE arg)
                 not.sigev_signo = lookup_sig(arg);
                 rv = INT2NUM(not.sigev_signo);
                 break;
-        case T_FILE:
-                setup_notify_io(&not, arg);
-                break;
         default:
-                /* maybe support Proc+thread via sigev_notify_function.. */
                 rb_raise(rb_eArgError, "must be a signal or nil");
         }
 
@@ -866,15 +882,6 @@ static VALUE setnonblock(VALUE self, VALUE nb)
         return nb;
 }
 
-/* :nodoc: */
-static VALUE setnotifythread(VALUE self, VALUE thread)
-{
-        struct posix_mq *mq = get(self, 1);
-
-        mq->thread = thread;
-        return thread;
-}
-
 void Init_posix_mq_ext(void)
 {
         cPOSIX_MQ = rb_define_class("POSIX_MQ", rb_cObject);
@@ -914,7 +921,8 @@ void Init_posix_mq_ext(void)
         rb_define_method(cPOSIX_MQ, "name", name, 0);
         rb_define_method(cPOSIX_MQ, "notify=", setnotify, 1);
         rb_define_method(cPOSIX_MQ, "nonblock=", setnonblock, 1);
-        rb_define_method(cPOSIX_MQ, "notify_thread=", setnotifythread, 1);
+        rb_define_method(cPOSIX_MQ, "notify_exec", setnotify_exec, 2);
+        rb_define_method(cPOSIX_MQ, "notify_cleanup", notify_cleanup, 0);
         rb_define_method(cPOSIX_MQ, "nonblock?", getnonblock, 0);
 #ifdef MQD_TO_FD
         rb_define_method(cPOSIX_MQ, "to_io", to_io, 0);
diff --git a/lib/posix_mq.rb b/lib/posix_mq.rb
index b40a4c4..5125db5 100644
--- a/lib/posix_mq.rb
+++ b/lib/posix_mq.rb
@@ -47,7 +47,7 @@ class POSIX_MQ
     block.arity == 1 or
       raise ArgumentError, "arity of notify block must be 1"
     r, w = IO.pipe
-    self.notify_thread = Thread.new(r, w, self) do |r, w, mq|
+    self.notify_exec(w, Thread.new(r, w, self) do |r, w, mq|
       begin
         begin
           r.read(1) or raise Errno::EINTR
@@ -56,12 +56,11 @@ class POSIX_MQ
         end
         block.call(mq)
       ensure
-        mq.notify_thread = nil
+        mq.notify_cleanup
         r.close rescue nil
         w.close rescue nil
       end
-    end
-    self.notify = w
+    end)
     nil
   end if RUBY_PLATFORM =~ /linux/
 
diff --git a/test/test_posix_mq.rb b/test/test_posix_mq.rb
index 963d623..964fa94 100644
--- a/test/test_posix_mq.rb
+++ b/test/test_posix_mq.rb
@@ -3,16 +3,16 @@ require 'test/unit'
 require 'posix_mq'
 require 'thread'
 require 'fcntl'
+require 'set'
 $stderr.sync = $stdout.sync = true
 
 class Test_POSIX_MQ < Test::Unit::TestCase
+  METHODS = Set.new(POSIX_MQ.instance_methods.map { |x| x.to_sym })
 
-  HAVE_TO_IO = if POSIX_MQ.instance_methods.grep(/\Ato_io\z/).empty?
+  METHODS.include?(:to_io) or
     warn "POSIX_MQ#to_io not supported on this platform: #{RUBY_PLATFORM}"
-    false
-  else
-    true
-  end
+  METHODS.include?(:notify) or
+    warn "POSIX_MQ#notify not supported on this platform: #{RUBY_PLATFORM}"
 
   def setup
     @mq = nil
@@ -155,7 +155,7 @@ class Test_POSIX_MQ < Test::Unit::TestCase
     @mq = POSIX_MQ.new @path, IO::CREAT|IO::RDWR, 0666
     assert @mq.to_io.kind_of?(IO)
     assert_nothing_raised { IO.select([@mq], nil, nil, 0) }
-  end if HAVE_TO_IO
+  end if METHODS.include?(:to_io)
 
   def test_notify
     rd, wr = IO.pipe
@@ -224,7 +224,7 @@ class Test_POSIX_MQ < Test::Unit::TestCase
   def test_new_sym_w
     @mq = POSIX_MQ.new @path, :w
     assert_equal IO::WRONLY, @mq.to_io.fcntl(Fcntl::F_GETFL)
-  end if HAVE_TO_IO
+  end if METHODS.include?(:to_io)
 
   def test_new_sym_r
     @mq = POSIX_MQ.new @path, :w
@@ -232,7 +232,7 @@ class Test_POSIX_MQ < Test::Unit::TestCase
     assert_nothing_raised { mq = POSIX_MQ.new @path, :r }
     assert_equal IO::RDONLY, mq.to_io.fcntl(Fcntl::F_GETFL)
     assert_nil mq.close
-  end if HAVE_TO_IO
+  end if METHODS.include?(:to_io)
 
   def test_new_path_only
     @mq = POSIX_MQ.new @path, :w
@@ -240,12 +240,12 @@ class Test_POSIX_MQ < Test::Unit::TestCase
     assert_nothing_raised { mq = POSIX_MQ.new @path }
     assert_equal IO::RDONLY, mq.to_io.fcntl(Fcntl::F_GETFL)
     assert_nil mq.close
-  end if HAVE_TO_IO
+  end if METHODS.include?(:to_io)
 
   def test_new_sym_wr
     @mq = POSIX_MQ.new @path, :rw
     assert_equal IO::RDWR, @mq.to_io.fcntl(Fcntl::F_GETFL)
-  end if HAVE_TO_IO
+  end if METHODS.include?(:to_io)
 
   def test_new_attr
     mq_attr = POSIX_MQ::Attr.new(IO::NONBLOCK, 1, 1, 0)
@@ -274,24 +274,21 @@ class Test_POSIX_MQ < Test::Unit::TestCase
     q = Queue.new
     @mq = POSIX_MQ.new(@path, :rw)
     assert_nothing_raised { @mq.notify { |mq| q << mq } }
-    @mq << "hi"
-    assert_equal POSIX_MQ, q.pop.class
+    assert_nothing_raised { Process.waitpid2(fork { @mq << "hi" }) }
+    assert_equal @mq.object_id, q.pop.object_id
     assert_equal "hi", @mq.receive.first
     assert_nothing_raised { @mq.notify { |mq| q << "hi" } }
-    @mq << "bye"
+    assert_nothing_raised { Process.waitpid2(fork { @mq << "bye" }) }
     assert_equal "hi", q.pop
-  end if POSIX_MQ.respond_to?(:notify)
+  end if METHODS.include?(:notify)
 
   def test_notify_thread
     q = Queue.new
     @mq = POSIX_MQ.new(@path, :rw)
-    @mq.notify_thread = thr = Thread.new { sleep }
-    assert thr.alive?
     @mq.notify { |mq| q << Thread.current }
     @mq << "."
     x = q.pop
     assert x.instance_of?(Thread)
     assert Thread.current != x
-    assert ! thr.alive?
-  end if POSIX_MQ.respond_to?(:notify)
+  end if METHODS.include?(:notify)
 end