about summary refs log tree commit homepage
diff options
context:
space:
mode:
-rw-r--r--README6
-rw-r--r--ext/posix_mq/extconf.rb2
-rw-r--r--ext/posix_mq/posix_mq.c80
-rw-r--r--lib/posix_mq.rb31
-rw-r--r--test/test_posix_mq.rb25
5 files changed, 141 insertions, 3 deletions
diff --git a/README b/README
index c7b65cb..5be5478 100644
--- a/README
+++ b/README
@@ -11,7 +11,8 @@ network-aware message queue implementations.
 
 == Features
 
-* Supports message notifications via signals.
+* Supports message notifications via signals and block execution
+  in a separate thread.
 
 * Supports portable non-blocking operation.  Under Linux 2.6.6+ and
   FreeBSD 7.2+, POSIX_MQ objects may even be used with event
@@ -19,7 +20,8 @@ network-aware message queue implementations.
 
 * Optional timeouts may be applied to send and receive operations.
 
-* Thread-safe under Ruby 1.9, releases GVL before blocking operations.
+* Thread-safe blocking operations under Ruby 1.9, releases GVL
+  before blocking operations.
 
 * Documented library API
 
diff --git a/ext/posix_mq/extconf.rb b/ext/posix_mq/extconf.rb
index b9e8963..b473c13 100644
--- a/ext/posix_mq/extconf.rb
+++ b/ext/posix_mq/extconf.rb
@@ -4,9 +4,11 @@ have_header("sys/select.h")
 have_header("signal.h")
 have_header("mqueue.h") or abort "mqueue.h header missing"
 have_func("__mq_oshandle")
+have_header("pthread.h")
 have_func("rb_str_set_len")
 have_func("rb_struct_alloc_noinit")
 have_func('rb_thread_blocking_region')
 have_library("rt")
+have_library("pthread")
 dir_config("posix_mq")
 create_makefile("posix_mq_ext")
diff --git a/ext/posix_mq/posix_mq.c b/ext/posix_mq/posix_mq.c
index 883aa45..70d9125 100644
--- a/ext/posix_mq/posix_mq.c
+++ b/ext/posix_mq/posix_mq.c
@@ -5,6 +5,9 @@
 #ifdef HAVE_SIGNAL_H
 #  include <signal.h>
 #endif
+#ifdef HAVE_PTHREAD_H
+#  include <pthread.h>
+#endif
 #include <ruby.h>
 
 #include <time.h>
@@ -34,13 +37,14 @@ struct posix_mq {
         mqd_t des;
         long msgsize;
         VALUE name;
+        VALUE thread;
 #ifdef MQD_TO_FD
         VALUE io;
 #endif
 };
 
 static VALUE cPOSIX_MQ, cAttr;
-static ID id_new;
+static ID id_new, id_kill, id_fileno;
 static ID sym_r, sym_w, sym_rw;
 static const mqd_t MQD_INVALID = (mqd_t)-1;
 
@@ -198,6 +202,7 @@ static void mark(void *ptr)
         struct posix_mq *mq = ptr;
 
         rb_gc_mark(mq->name);
+        rb_gc_mark(mq->thread);
         MQ_IO_MARK(mq);
 }
 
@@ -225,6 +230,7 @@ static VALUE alloc(VALUE klass)
         mq->des = MQD_INVALID;
         mq->msgsize = -1;
         mq->name = Qnil;
+        mq->thread = Qnil;
         MQ_IO_SET(mq, Qnil);
 
         return rv;
@@ -683,6 +689,53 @@ static int lookup_sig(VALUE sig)
         return NUM2INT(sig);
 }
 
+/* we spawn a thread just to write ONE byte into an fd (usually a pipe) */
+static void thread_notify_fd(union sigval sv)
+{
+        int fd = sv.sival_int;
+
+        while ((write(fd, "", 1) < 0) && (errno == EINTR || errno == EAGAIN));
+}
+
+static void setup_notify_io(struct sigevent *not, VALUE io)
+{
+        VALUE fileno = rb_funcall(io, id_fileno, 0, 0);
+        int fd = NUM2INT(fileno);
+        int flags;
+        pthread_attr_t attr;
+        int e;
+
+        /*
+         * fd going to be written to inside a native thread,
+         * make it blocking for simplicity
+         */
+        flags = fcntl(fd, F_GETFL);
+        if (flags < 0) {
+                rb_sys_fail("fcntl F_GETFL");
+        } else if (flags & O_NONBLOCK) {
+                flags = fcntl(fd, F_SETFL, flags & ~O_NONBLOCK);
+                if (flags < 0)
+                        rb_sys_fail("fcntl F_SETFL");
+        }
+
+        if ((e = pthread_attr_init(&attr)))
+                goto err;
+        if ((e = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED)))
+                goto err;
+#ifdef PTHREAD_STACK_MIN
+        (void)pthread_attr_setstacksize(&attr, PTHREAD_STACK_MIN);
+#else
+#  warning PTHREAD_STACK_MIN not available,
+#endif
+        not->sigev_notify = SIGEV_THREAD;
+        not->sigev_notify_function = thread_notify_fd;
+        not->sigev_notify_attributes = &attr;
+        not->sigev_value.sival_int = fd;
+        return;
+err:
+        rb_raise(rb_eRuntimeError, "pthread failure: %s\n", strerror(e));
+}
+
 /*
  * call-seq:
  *        mq.notify = signal        => signal
@@ -693,10 +746,16 @@ static int lookup_sig(VALUE sig)
  * request to allow other processes to register a request.
  * If +signal+ is +false+, it will register a no-op notification request
  * which will prevent other processes from registering a notification.
+ * If +signal+ is an +IO+ object, it will spawn a thread upon the
+ * arrival of the next message and write one "\\0" byte to the file
+ * descriptor belonging to that IO object.
  * Only one process may have a notification request for a queue
  * at a time, Errno::EBUSY will be raised if there is already
  * a notification request registration for the queue.
  *
+ * Notifications are only fired once and processes must reregister
+ * for subsequent notifications.
+ *
  * For readers of the mq_notify(3) manpage, passing +false+
  * is equivalent to SIGEV_NONE, and passing +nil+ is equivalent
  * of passing a NULL notification pointer to mq_notify(3).
@@ -708,6 +767,10 @@ static VALUE setnotify(VALUE self, VALUE arg)
         struct sigevent * notification = &not;
         VALUE rv = arg;
 
+        if (!NIL_P(mq->thread)) {
+                rb_funcall(mq->thread, id_kill, 0, 0);
+                mq->thread = Qnil;
+        }
         not.sigev_notify = SIGEV_SIGNAL;
 
         switch (TYPE(arg)) {
@@ -725,6 +788,9 @@ static VALUE setnotify(VALUE self, VALUE arg)
                 not.sigev_signo = lookup_sig(arg);
                 rv = INT2NUM(not.sigev_signo);
                 break;
+        case T_FILE:
+                setup_notify_io(&not, arg);
+                break;
         default:
                 /* maybe support Proc+thread via sigev_notify_function.. */
                 rb_raise(rb_eArgError, "must be a signal or nil");
@@ -784,6 +850,15 @@ static VALUE setnonblock(VALUE self, VALUE nb)
         return nb;
 }
 
+/* :nodoc: */
+static VALUE setnotifythread(VALUE self, VALUE thread)
+{
+        struct posix_mq *mq = get(self, 1);
+
+        mq->thread = thread;
+        return thread;
+}
+
 void Init_posix_mq_ext(void)
 {
         cPOSIX_MQ = rb_define_class("POSIX_MQ", rb_cObject);
@@ -822,12 +897,15 @@ void Init_posix_mq_ext(void)
         rb_define_method(cPOSIX_MQ, "name", name, 0);
         rb_define_method(cPOSIX_MQ, "notify=", setnotify, 1);
         rb_define_method(cPOSIX_MQ, "nonblock=", setnonblock, 1);
+        rb_define_method(cPOSIX_MQ, "notify_thread=", setnotifythread, 1);
         rb_define_method(cPOSIX_MQ, "nonblock?", getnonblock, 0);
 #ifdef MQD_TO_FD
         rb_define_method(cPOSIX_MQ, "to_io", to_io, 0);
 #endif
 
         id_new = rb_intern("new");
+        id_kill = rb_intern("kill");
+        id_fileno = rb_intern("fileno");
         sym_r = ID2SYM(rb_intern("r"));
         sym_w = ID2SYM(rb_intern("w"));
         sym_rw = ID2SYM(rb_intern("rw"));
diff --git a/lib/posix_mq.rb b/lib/posix_mq.rb
index 5e660ec..91f4140 100644
--- a/lib/posix_mq.rb
+++ b/lib/posix_mq.rb
@@ -26,7 +26,38 @@ class POSIX_MQ
         mq.close unless mq.closed?
       end
     end
+  end
 
+  # Executes the given block upon reception of the next message in an
+  # empty queue.  If the message queue is not empty, then this block
+  # will only be fired after the queue is emptied and repopulated with
+  # one message.
+  #
+  # This block will only be executed upon the arrival of the
+  # first message and must be reset/reenabled for subsequent
+  # notifications.  This block will execute in a separate Ruby
+  # Thread (and thus will safely have the GVL by default).
+  def notify(&block)
+    block.arity == 1 or
+      raise ArgumentError, "arity of notify block must be 1"
+    r, w = IO.pipe
+    thr = Thread.new(r, w, self) do |r, w, mq|
+      begin
+        begin
+          r.read(1) or raise Errno::EINTR
+        rescue Errno::EINTR, Errno::EAGAIN
+          retry
+        end
+        block.call(mq)
+      ensure
+        mq.notify_thread = nil
+        r.close rescue nil
+        w.close rescue nil
+      end
+    end
+    self.notify = w
+    self.notify_thread = thr
+    nil
   end
 
 end
diff --git a/test/test_posix_mq.rb b/test/test_posix_mq.rb
index dccc547..58ee6e5 100644
--- a/test/test_posix_mq.rb
+++ b/test/test_posix_mq.rb
@@ -1,6 +1,7 @@
 # -*- encoding: binary -*-
 require 'test/unit'
 require 'posix_mq'
+require 'thread'
 require 'fcntl'
 $stderr.sync = $stdout.sync = true
 
@@ -238,4 +239,28 @@ class Test_POSIX_MQ < Test::Unit::TestCase
     assert POSIX_MQ::OPEN_MAX.kind_of?(Integer)
   end
 
+  def test_notify_block_replace
+    q = Queue.new
+    @mq = POSIX_MQ.new(@path, :rw)
+    assert_nothing_raised { @mq.notify { |mq| q << mq } }
+    @mq << "hi"
+    assert_equal POSIX_MQ, q.pop.class
+    assert_equal "hi", @mq.receive.first
+    assert_nothing_raised { @mq.notify { |mq| q << "hi" } }
+    @mq << "bye"
+    assert_equal "hi", q.pop
+  end
+
+  def test_notify_thread
+    q = Queue.new
+    @mq = POSIX_MQ.new(@path, :rw)
+    @mq.notify_thread = thr = Thread.new { sleep }
+    assert thr.alive?
+    @mq.notify { |mq| q << Thread.current }
+    @mq << "."
+    x = q.pop
+    assert x.instance_of?(Thread)
+    assert Thread.current != x
+    assert ! thr.alive?
+  end
 end