about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2010-01-07 00:37:57 -0800
committerEric Wong <normalperson@yhbt.net>2010-01-07 01:15:05 -0800
commit40d61f55ac53e3cd2f229d0b032da03032e3d53d (patch)
tree4c57be5ba0dd0a9f1dff33bada0635188848e50b
parentd8c8fb4155c1feea454abc3ed3f0a4b26e90be68 (diff)
downloadruby_posix_mq-40d61f55ac53e3cd2f229d0b032da03032e3d53d.tar.gz
This is implementation uses both a short-lived POSIX thread and
a pre-spawned Ruby Thread in a manner that works properly under
both Ruby 1.8 (green threads) and 1.9 (where Ruby Threads are
POSIX threads).

The short-lived POSIX thread will write a single "\0" byte to
a pipe the Ruby Thread waits on.  This operation is atomic
on all platforms.  Once the Ruby Thread is woken up from the
pipe, it will execute th block given to it.

This dual-thread implementation is inspired by the way glibc
implements mq_notify(3) + SIGEV_THREAD under Linux where the
kernel itself cannot directly spawn POSIX threads.
-rw-r--r--README6
-rw-r--r--ext/posix_mq/extconf.rb2
-rw-r--r--ext/posix_mq/posix_mq.c80
-rw-r--r--lib/posix_mq.rb31
-rw-r--r--test/test_posix_mq.rb25
5 files changed, 141 insertions, 3 deletions
diff --git a/README b/README
index c7b65cb..5be5478 100644
--- a/README
+++ b/README
@@ -11,7 +11,8 @@ network-aware message queue implementations.
 
 == Features
 
-* Supports message notifications via signals.
+* Supports message notifications via signals and block execution
+  in a separate thread.
 
 * Supports portable non-blocking operation.  Under Linux 2.6.6+ and
   FreeBSD 7.2+, POSIX_MQ objects may even be used with event
@@ -19,7 +20,8 @@ network-aware message queue implementations.
 
 * Optional timeouts may be applied to send and receive operations.
 
-* Thread-safe under Ruby 1.9, releases GVL before blocking operations.
+* Thread-safe blocking operations under Ruby 1.9, releases GVL
+  before blocking operations.
 
 * Documented library API
 
diff --git a/ext/posix_mq/extconf.rb b/ext/posix_mq/extconf.rb
index b9e8963..b473c13 100644
--- a/ext/posix_mq/extconf.rb
+++ b/ext/posix_mq/extconf.rb
@@ -4,9 +4,11 @@ have_header("sys/select.h")
 have_header("signal.h")
 have_header("mqueue.h") or abort "mqueue.h header missing"
 have_func("__mq_oshandle")
+have_header("pthread.h")
 have_func("rb_str_set_len")
 have_func("rb_struct_alloc_noinit")
 have_func('rb_thread_blocking_region')
 have_library("rt")
+have_library("pthread")
 dir_config("posix_mq")
 create_makefile("posix_mq_ext")
diff --git a/ext/posix_mq/posix_mq.c b/ext/posix_mq/posix_mq.c
index 883aa45..70d9125 100644
--- a/ext/posix_mq/posix_mq.c
+++ b/ext/posix_mq/posix_mq.c
@@ -5,6 +5,9 @@
 #ifdef HAVE_SIGNAL_H
 #  include <signal.h>
 #endif
+#ifdef HAVE_PTHREAD_H
+#  include <pthread.h>
+#endif
 #include <ruby.h>
 
 #include <time.h>
@@ -34,13 +37,14 @@ struct posix_mq {
         mqd_t des;
         long msgsize;
         VALUE name;
+        VALUE thread;
 #ifdef MQD_TO_FD
         VALUE io;
 #endif
 };
 
 static VALUE cPOSIX_MQ, cAttr;
-static ID id_new;
+static ID id_new, id_kill, id_fileno;
 static ID sym_r, sym_w, sym_rw;
 static const mqd_t MQD_INVALID = (mqd_t)-1;
 
@@ -198,6 +202,7 @@ static void mark(void *ptr)
         struct posix_mq *mq = ptr;
 
         rb_gc_mark(mq->name);
+        rb_gc_mark(mq->thread);
         MQ_IO_MARK(mq);
 }
 
@@ -225,6 +230,7 @@ static VALUE alloc(VALUE klass)
         mq->des = MQD_INVALID;
         mq->msgsize = -1;
         mq->name = Qnil;
+        mq->thread = Qnil;
         MQ_IO_SET(mq, Qnil);
 
         return rv;
@@ -683,6 +689,53 @@ static int lookup_sig(VALUE sig)
         return NUM2INT(sig);
 }
 
+/* we spawn a thread just to write ONE byte into an fd (usually a pipe) */
+static void thread_notify_fd(union sigval sv)
+{
+        int fd = sv.sival_int;
+
+        while ((write(fd, "", 1) < 0) && (errno == EINTR || errno == EAGAIN));
+}
+
+static void setup_notify_io(struct sigevent *not, VALUE io)
+{
+        VALUE fileno = rb_funcall(io, id_fileno, 0, 0);
+        int fd = NUM2INT(fileno);
+        int flags;
+        pthread_attr_t attr;
+        int e;
+
+        /*
+         * fd going to be written to inside a native thread,
+         * make it blocking for simplicity
+         */
+        flags = fcntl(fd, F_GETFL);
+        if (flags < 0) {
+                rb_sys_fail("fcntl F_GETFL");
+        } else if (flags & O_NONBLOCK) {
+                flags = fcntl(fd, F_SETFL, flags & ~O_NONBLOCK);
+                if (flags < 0)
+                        rb_sys_fail("fcntl F_SETFL");
+        }
+
+        if ((e = pthread_attr_init(&attr)))
+                goto err;
+        if ((e = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED)))
+                goto err;
+#ifdef PTHREAD_STACK_MIN
+        (void)pthread_attr_setstacksize(&attr, PTHREAD_STACK_MIN);
+#else
+#  warning PTHREAD_STACK_MIN not available,
+#endif
+        not->sigev_notify = SIGEV_THREAD;
+        not->sigev_notify_function = thread_notify_fd;
+        not->sigev_notify_attributes = &attr;
+        not->sigev_value.sival_int = fd;
+        return;
+err:
+        rb_raise(rb_eRuntimeError, "pthread failure: %s\n", strerror(e));
+}
+
 /*
  * call-seq:
  *        mq.notify = signal        => signal
@@ -693,10 +746,16 @@ static int lookup_sig(VALUE sig)
  * request to allow other processes to register a request.
  * If +signal+ is +false+, it will register a no-op notification request
  * which will prevent other processes from registering a notification.
+ * If +signal+ is an +IO+ object, it will spawn a thread upon the
+ * arrival of the next message and write one "\\0" byte to the file
+ * descriptor belonging to that IO object.
  * Only one process may have a notification request for a queue
  * at a time, Errno::EBUSY will be raised if there is already
  * a notification request registration for the queue.
  *
+ * Notifications are only fired once and processes must reregister
+ * for subsequent notifications.
+ *
  * For readers of the mq_notify(3) manpage, passing +false+
  * is equivalent to SIGEV_NONE, and passing +nil+ is equivalent
  * of passing a NULL notification pointer to mq_notify(3).
@@ -708,6 +767,10 @@ static VALUE setnotify(VALUE self, VALUE arg)
         struct sigevent * notification = &not;
         VALUE rv = arg;
 
+        if (!NIL_P(mq->thread)) {
+                rb_funcall(mq->thread, id_kill, 0, 0);
+                mq->thread = Qnil;
+        }
         not.sigev_notify = SIGEV_SIGNAL;
 
         switch (TYPE(arg)) {
@@ -725,6 +788,9 @@ static VALUE setnotify(VALUE self, VALUE arg)
                 not.sigev_signo = lookup_sig(arg);
                 rv = INT2NUM(not.sigev_signo);
                 break;
+        case T_FILE:
+                setup_notify_io(&not, arg);
+                break;
         default:
                 /* maybe support Proc+thread via sigev_notify_function.. */
                 rb_raise(rb_eArgError, "must be a signal or nil");
@@ -784,6 +850,15 @@ static VALUE setnonblock(VALUE self, VALUE nb)
         return nb;
 }
 
+/* :nodoc: */
+static VALUE setnotifythread(VALUE self, VALUE thread)
+{
+        struct posix_mq *mq = get(self, 1);
+
+        mq->thread = thread;
+        return thread;
+}
+
 void Init_posix_mq_ext(void)
 {
         cPOSIX_MQ = rb_define_class("POSIX_MQ", rb_cObject);
@@ -822,12 +897,15 @@ void Init_posix_mq_ext(void)
         rb_define_method(cPOSIX_MQ, "name", name, 0);
         rb_define_method(cPOSIX_MQ, "notify=", setnotify, 1);
         rb_define_method(cPOSIX_MQ, "nonblock=", setnonblock, 1);
+        rb_define_method(cPOSIX_MQ, "notify_thread=", setnotifythread, 1);
         rb_define_method(cPOSIX_MQ, "nonblock?", getnonblock, 0);
 #ifdef MQD_TO_FD
         rb_define_method(cPOSIX_MQ, "to_io", to_io, 0);
 #endif
 
         id_new = rb_intern("new");
+        id_kill = rb_intern("kill");
+        id_fileno = rb_intern("fileno");
         sym_r = ID2SYM(rb_intern("r"));
         sym_w = ID2SYM(rb_intern("w"));
         sym_rw = ID2SYM(rb_intern("rw"));
diff --git a/lib/posix_mq.rb b/lib/posix_mq.rb
index 5e660ec..91f4140 100644
--- a/lib/posix_mq.rb
+++ b/lib/posix_mq.rb
@@ -26,7 +26,38 @@ class POSIX_MQ
         mq.close unless mq.closed?
       end
     end
+  end
 
+  # Executes the given block upon reception of the next message in an
+  # empty queue.  If the message queue is not empty, then this block
+  # will only be fired after the queue is emptied and repopulated with
+  # one message.
+  #
+  # This block will only be executed upon the arrival of the
+  # first message and must be reset/reenabled for subsequent
+  # notifications.  This block will execute in a separate Ruby
+  # Thread (and thus will safely have the GVL by default).
+  def notify(&block)
+    block.arity == 1 or
+      raise ArgumentError, "arity of notify block must be 1"
+    r, w = IO.pipe
+    thr = Thread.new(r, w, self) do |r, w, mq|
+      begin
+        begin
+          r.read(1) or raise Errno::EINTR
+        rescue Errno::EINTR, Errno::EAGAIN
+          retry
+        end
+        block.call(mq)
+      ensure
+        mq.notify_thread = nil
+        r.close rescue nil
+        w.close rescue nil
+      end
+    end
+    self.notify = w
+    self.notify_thread = thr
+    nil
   end
 
 end
diff --git a/test/test_posix_mq.rb b/test/test_posix_mq.rb
index dccc547..58ee6e5 100644
--- a/test/test_posix_mq.rb
+++ b/test/test_posix_mq.rb
@@ -1,6 +1,7 @@
 # -*- encoding: binary -*-
 require 'test/unit'
 require 'posix_mq'
+require 'thread'
 require 'fcntl'
 $stderr.sync = $stdout.sync = true
 
@@ -238,4 +239,28 @@ class Test_POSIX_MQ < Test::Unit::TestCase
     assert POSIX_MQ::OPEN_MAX.kind_of?(Integer)
   end
 
+  def test_notify_block_replace
+    q = Queue.new
+    @mq = POSIX_MQ.new(@path, :rw)
+    assert_nothing_raised { @mq.notify { |mq| q << mq } }
+    @mq << "hi"
+    assert_equal POSIX_MQ, q.pop.class
+    assert_equal "hi", @mq.receive.first
+    assert_nothing_raised { @mq.notify { |mq| q << "hi" } }
+    @mq << "bye"
+    assert_equal "hi", q.pop
+  end
+
+  def test_notify_thread
+    q = Queue.new
+    @mq = POSIX_MQ.new(@path, :rw)
+    @mq.notify_thread = thr = Thread.new { sleep }
+    assert thr.alive?
+    @mq.notify { |mq| q << Thread.current }
+    @mq << "."
+    x = q.pop
+    assert x.instance_of?(Thread)
+    assert Thread.current != x
+    assert ! thr.alive?
+  end
 end