about summary refs log tree commit homepage
path: root/ext/posix_mq/posix_mq.c
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2010-05-09 00:05:03 -0700
committerEric Wong <normalperson@yhbt.net>2010-05-09 01:03:14 -0700
commita997f4822a99590c7a5175be4a694b4482a4b997 (patch)
treea4b0330862797e47247f57f1e40d92902c04f460 /ext/posix_mq/posix_mq.c
parentf3605c820fd73713e34950170bf759e1af204038 (diff)
downloadruby_posix_mq-a997f4822a99590c7a5175be4a694b4482a4b997.tar.gz
fix POSIX_MQ#notify(&block) aka SIGEV_THREAD
tests for them were stupidly broken and never executed :x
Diffstat (limited to 'ext/posix_mq/posix_mq.c')
-rw-r--r--ext/posix_mq/posix_mq.c78
1 files changed, 43 insertions, 35 deletions
diff --git a/ext/posix_mq/posix_mq.c b/ext/posix_mq/posix_mq.c
index 4bf4262..db50534 100644
--- a/ext/posix_mq/posix_mq.c
+++ b/ext/posix_mq/posix_mq.c
@@ -724,6 +724,10 @@ static int lookup_sig(VALUE sig)
         return NUM2INT(sig);
 }
 
+/*
+ * TODO: Under Linux, we could just use netlink directly
+ * the same way glibc does...
+ */
 /* we spawn a thread just to write ONE byte into an fd (usually a pipe) */
 static void thread_notify_fd(union sigval sv)
 {
@@ -732,30 +736,49 @@ static void thread_notify_fd(union sigval sv)
         while ((write(fd, "", 1) < 0) && (errno == EINTR || errno == EAGAIN));
 }
 
-/*
- * TODO: Under Linux, we could just use netlink directly
- * the same way glibc does...
- */
-static void setup_notify_io(struct sigevent *not, VALUE io)
+/* :nodoc: */
+static VALUE setnotify_exec(VALUE self, VALUE io, VALUE thr)
 {
         int fd = NUM2INT(rb_funcall(io, id_fileno, 0, 0));
+        struct posix_mq *mq = get(self, 1);
+        struct sigevent not;
         pthread_attr_t attr;
-        int e;
 
-        if ((e = pthread_attr_init(&attr)))
-                goto err;
-        if ((e = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED)))
-                goto err;
+        errno = pthread_attr_init(&attr);
+        if (errno) rb_sys_fail("pthread_attr_init");
+
+        errno = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
+        if (errno) rb_sys_fail("pthread_attr_setdetachstate");
+
 #ifdef PTHREAD_STACK_MIN
         (void)pthread_attr_setstacksize(&attr, PTHREAD_STACK_MIN);
 #endif
-        not->sigev_notify = SIGEV_THREAD;
-        not->sigev_notify_function = thread_notify_fd;
-        not->sigev_notify_attributes = &attr;
-        not->sigev_value.sival_int = fd;
-        return;
-err:
-        rb_raise(rb_eRuntimeError, "pthread failure: %s\n", strerror(e));
+
+        not.sigev_notify = SIGEV_THREAD;
+        not.sigev_notify_function = thread_notify_fd;
+        not.sigev_notify_attributes = &attr;
+        not.sigev_value.sival_int = fd;
+
+        if (!NIL_P(mq->thread))
+                rb_funcall(mq->thread, id_kill, 0, 0);
+        mq->thread = thr;
+
+        if (mq_notify(mq->des, &not) < 0)
+                rb_sys_fail("mq_notify");
+
+        return thr;
+}
+
+/* :nodoc: */
+static VALUE notify_cleanup(VALUE self)
+{
+        struct posix_mq *mq = get(self, 1);
+
+        if (!NIL_P(mq->thread)) {
+                rb_funcall(mq->thread, id_kill, 0, 0);
+                mq->thread = Qnil;
+        }
+        return Qnil;
 }
 
 /*
@@ -789,10 +812,7 @@ static VALUE setnotify(VALUE self, VALUE arg)
         struct sigevent * notification = &not;
         VALUE rv = arg;
 
-        if (!NIL_P(mq->thread)) {
-                rb_funcall(mq->thread, id_kill, 0, 0);
-                mq->thread = Qnil;
-        }
+        notify_cleanup(self);
         not.sigev_notify = SIGEV_SIGNAL;
 
         switch (TYPE(arg)) {
@@ -810,11 +830,7 @@ static VALUE setnotify(VALUE self, VALUE arg)
                 not.sigev_signo = lookup_sig(arg);
                 rv = INT2NUM(not.sigev_signo);
                 break;
-        case T_FILE:
-                setup_notify_io(&not, arg);
-                break;
         default:
-                /* maybe support Proc+thread via sigev_notify_function.. */
                 rb_raise(rb_eArgError, "must be a signal or nil");
         }
 
@@ -866,15 +882,6 @@ static VALUE setnonblock(VALUE self, VALUE nb)
         return nb;
 }
 
-/* :nodoc: */
-static VALUE setnotifythread(VALUE self, VALUE thread)
-{
-        struct posix_mq *mq = get(self, 1);
-
-        mq->thread = thread;
-        return thread;
-}
-
 void Init_posix_mq_ext(void)
 {
         cPOSIX_MQ = rb_define_class("POSIX_MQ", rb_cObject);
@@ -914,7 +921,8 @@ void Init_posix_mq_ext(void)
         rb_define_method(cPOSIX_MQ, "name", name, 0);
         rb_define_method(cPOSIX_MQ, "notify=", setnotify, 1);
         rb_define_method(cPOSIX_MQ, "nonblock=", setnonblock, 1);
-        rb_define_method(cPOSIX_MQ, "notify_thread=", setnotifythread, 1);
+        rb_define_method(cPOSIX_MQ, "notify_exec", setnotify_exec, 2);
+        rb_define_method(cPOSIX_MQ, "notify_cleanup", notify_cleanup, 0);
         rb_define_method(cPOSIX_MQ, "nonblock?", getnonblock, 0);
 #ifdef MQD_TO_FD
         rb_define_method(cPOSIX_MQ, "to_io", to_io, 0);