about summary refs log tree commit homepage
path: root/ext/posix_mq/posix_mq.c
diff options
context:
space:
mode:
Diffstat (limited to 'ext/posix_mq/posix_mq.c')
-rw-r--r--ext/posix_mq/posix_mq.c80
1 files changed, 79 insertions, 1 deletions
diff --git a/ext/posix_mq/posix_mq.c b/ext/posix_mq/posix_mq.c
index 883aa45..70d9125 100644
--- a/ext/posix_mq/posix_mq.c
+++ b/ext/posix_mq/posix_mq.c
@@ -5,6 +5,9 @@
 #ifdef HAVE_SIGNAL_H
 #  include <signal.h>
 #endif
+#ifdef HAVE_PTHREAD_H
+#  include <pthread.h>
+#endif
 #include <ruby.h>
 
 #include <time.h>
@@ -34,13 +37,14 @@ struct posix_mq {
         mqd_t des;
         long msgsize;
         VALUE name;
+        VALUE thread;
 #ifdef MQD_TO_FD
         VALUE io;
 #endif
 };
 
 static VALUE cPOSIX_MQ, cAttr;
-static ID id_new;
+static ID id_new, id_kill, id_fileno;
 static ID sym_r, sym_w, sym_rw;
 static const mqd_t MQD_INVALID = (mqd_t)-1;
 
@@ -198,6 +202,7 @@ static void mark(void *ptr)
         struct posix_mq *mq = ptr;
 
         rb_gc_mark(mq->name);
+        rb_gc_mark(mq->thread);
         MQ_IO_MARK(mq);
 }
 
@@ -225,6 +230,7 @@ static VALUE alloc(VALUE klass)
         mq->des = MQD_INVALID;
         mq->msgsize = -1;
         mq->name = Qnil;
+        mq->thread = Qnil;
         MQ_IO_SET(mq, Qnil);
 
         return rv;
@@ -683,6 +689,53 @@ static int lookup_sig(VALUE sig)
         return NUM2INT(sig);
 }
 
+/* we spawn a thread just to write ONE byte into an fd (usually a pipe) */
+static void thread_notify_fd(union sigval sv)
+{
+        int fd = sv.sival_int;
+
+        while ((write(fd, "", 1) < 0) && (errno == EINTR || errno == EAGAIN));
+}
+
+static void setup_notify_io(struct sigevent *not, VALUE io)
+{
+        VALUE fileno = rb_funcall(io, id_fileno, 0, 0);
+        int fd = NUM2INT(fileno);
+        int flags;
+        pthread_attr_t attr;
+        int e;
+
+        /*
+         * fd going to be written to inside a native thread,
+         * make it blocking for simplicity
+         */
+        flags = fcntl(fd, F_GETFL);
+        if (flags < 0) {
+                rb_sys_fail("fcntl F_GETFL");
+        } else if (flags & O_NONBLOCK) {
+                flags = fcntl(fd, F_SETFL, flags & ~O_NONBLOCK);
+                if (flags < 0)
+                        rb_sys_fail("fcntl F_SETFL");
+        }
+
+        if ((e = pthread_attr_init(&attr)))
+                goto err;
+        if ((e = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED)))
+                goto err;
+#ifdef PTHREAD_STACK_MIN
+        (void)pthread_attr_setstacksize(&attr, PTHREAD_STACK_MIN);
+#else
+#  warning PTHREAD_STACK_MIN not available,
+#endif
+        not->sigev_notify = SIGEV_THREAD;
+        not->sigev_notify_function = thread_notify_fd;
+        not->sigev_notify_attributes = &attr;
+        not->sigev_value.sival_int = fd;
+        return;
+err:
+        rb_raise(rb_eRuntimeError, "pthread failure: %s\n", strerror(e));
+}
+
 /*
  * call-seq:
  *        mq.notify = signal        => signal
@@ -693,10 +746,16 @@ static int lookup_sig(VALUE sig)
  * request to allow other processes to register a request.
  * If +signal+ is +false+, it will register a no-op notification request
  * which will prevent other processes from registering a notification.
+ * If +signal+ is an +IO+ object, it will spawn a thread upon the
+ * arrival of the next message and write one "\\0" byte to the file
+ * descriptor belonging to that IO object.
  * Only one process may have a notification request for a queue
  * at a time, Errno::EBUSY will be raised if there is already
  * a notification request registration for the queue.
  *
+ * Notifications are only fired once and processes must reregister
+ * for subsequent notifications.
+ *
  * For readers of the mq_notify(3) manpage, passing +false+
  * is equivalent to SIGEV_NONE, and passing +nil+ is equivalent
  * of passing a NULL notification pointer to mq_notify(3).
@@ -708,6 +767,10 @@ static VALUE setnotify(VALUE self, VALUE arg)
         struct sigevent * notification = &not;
         VALUE rv = arg;
 
+        if (!NIL_P(mq->thread)) {
+                rb_funcall(mq->thread, id_kill, 0, 0);
+                mq->thread = Qnil;
+        }
         not.sigev_notify = SIGEV_SIGNAL;
 
         switch (TYPE(arg)) {
@@ -725,6 +788,9 @@ static VALUE setnotify(VALUE self, VALUE arg)
                 not.sigev_signo = lookup_sig(arg);
                 rv = INT2NUM(not.sigev_signo);
                 break;
+        case T_FILE:
+                setup_notify_io(&not, arg);
+                break;
         default:
                 /* maybe support Proc+thread via sigev_notify_function.. */
                 rb_raise(rb_eArgError, "must be a signal or nil");
@@ -784,6 +850,15 @@ static VALUE setnonblock(VALUE self, VALUE nb)
         return nb;
 }
 
+/* :nodoc: */
+static VALUE setnotifythread(VALUE self, VALUE thread)
+{
+        struct posix_mq *mq = get(self, 1);
+
+        mq->thread = thread;
+        return thread;
+}
+
 void Init_posix_mq_ext(void)
 {
         cPOSIX_MQ = rb_define_class("POSIX_MQ", rb_cObject);
@@ -822,12 +897,15 @@ void Init_posix_mq_ext(void)
         rb_define_method(cPOSIX_MQ, "name", name, 0);
         rb_define_method(cPOSIX_MQ, "notify=", setnotify, 1);
         rb_define_method(cPOSIX_MQ, "nonblock=", setnonblock, 1);
+        rb_define_method(cPOSIX_MQ, "notify_thread=", setnotifythread, 1);
         rb_define_method(cPOSIX_MQ, "nonblock?", getnonblock, 0);
 #ifdef MQD_TO_FD
         rb_define_method(cPOSIX_MQ, "to_io", to_io, 0);
 #endif
 
         id_new = rb_intern("new");
+        id_kill = rb_intern("kill");
+        id_fileno = rb_intern("fileno");
         sym_r = ID2SYM(rb_intern("r"));
         sym_w = ID2SYM(rb_intern("w"));
         sym_rw = ID2SYM(rb_intern("rw"));