diff options
Diffstat (limited to 'ext/posix_mq/posix_mq.c')
-rw-r--r-- | ext/posix_mq/posix_mq.c | 80 |
1 files changed, 79 insertions, 1 deletions
diff --git a/ext/posix_mq/posix_mq.c b/ext/posix_mq/posix_mq.c index 883aa45..70d9125 100644 --- a/ext/posix_mq/posix_mq.c +++ b/ext/posix_mq/posix_mq.c @@ -5,6 +5,9 @@ #ifdef HAVE_SIGNAL_H # include <signal.h> #endif +#ifdef HAVE_PTHREAD_H +# include <pthread.h> +#endif #include <ruby.h> #include <time.h> @@ -34,13 +37,14 @@ struct posix_mq { mqd_t des; long msgsize; VALUE name; + VALUE thread; #ifdef MQD_TO_FD VALUE io; #endif }; static VALUE cPOSIX_MQ, cAttr; -static ID id_new; +static ID id_new, id_kill, id_fileno; static ID sym_r, sym_w, sym_rw; static const mqd_t MQD_INVALID = (mqd_t)-1; @@ -198,6 +202,7 @@ static void mark(void *ptr) struct posix_mq *mq = ptr; rb_gc_mark(mq->name); + rb_gc_mark(mq->thread); MQ_IO_MARK(mq); } @@ -225,6 +230,7 @@ static VALUE alloc(VALUE klass) mq->des = MQD_INVALID; mq->msgsize = -1; mq->name = Qnil; + mq->thread = Qnil; MQ_IO_SET(mq, Qnil); return rv; @@ -683,6 +689,53 @@ static int lookup_sig(VALUE sig) return NUM2INT(sig); } +/* we spawn a thread just to write ONE byte into an fd (usually a pipe) */ +static void thread_notify_fd(union sigval sv) +{ + int fd = sv.sival_int; + + while ((write(fd, "", 1) < 0) && (errno == EINTR || errno == EAGAIN)); +} + +static void setup_notify_io(struct sigevent *not, VALUE io) +{ + VALUE fileno = rb_funcall(io, id_fileno, 0, 0); + int fd = NUM2INT(fileno); + int flags; + pthread_attr_t attr; + int e; + + /* + * fd going to be written to inside a native thread, + * make it blocking for simplicity + */ + flags = fcntl(fd, F_GETFL); + if (flags < 0) { + rb_sys_fail("fcntl F_GETFL"); + } else if (flags & O_NONBLOCK) { + flags = fcntl(fd, F_SETFL, flags & ~O_NONBLOCK); + if (flags < 0) + rb_sys_fail("fcntl F_SETFL"); + } + + if ((e = pthread_attr_init(&attr))) + goto err; + if ((e = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED))) + goto err; +#ifdef PTHREAD_STACK_MIN + (void)pthread_attr_setstacksize(&attr, PTHREAD_STACK_MIN); +#else +# warning PTHREAD_STACK_MIN not available, +#endif + not->sigev_notify = SIGEV_THREAD; + not->sigev_notify_function = thread_notify_fd; + not->sigev_notify_attributes = &attr; + not->sigev_value.sival_int = fd; + return; +err: + rb_raise(rb_eRuntimeError, "pthread failure: %s\n", strerror(e)); +} + /* * call-seq: * mq.notify = signal => signal @@ -693,10 +746,16 @@ static int lookup_sig(VALUE sig) * request to allow other processes to register a request. * If +signal+ is +false+, it will register a no-op notification request * which will prevent other processes from registering a notification. + * If +signal+ is an +IO+ object, it will spawn a thread upon the + * arrival of the next message and write one "\\0" byte to the file + * descriptor belonging to that IO object. * Only one process may have a notification request for a queue * at a time, Errno::EBUSY will be raised if there is already * a notification request registration for the queue. * + * Notifications are only fired once and processes must reregister + * for subsequent notifications. + * * For readers of the mq_notify(3) manpage, passing +false+ * is equivalent to SIGEV_NONE, and passing +nil+ is equivalent * of passing a NULL notification pointer to mq_notify(3). @@ -708,6 +767,10 @@ static VALUE setnotify(VALUE self, VALUE arg) struct sigevent * notification = ¬ VALUE rv = arg; + if (!NIL_P(mq->thread)) { + rb_funcall(mq->thread, id_kill, 0, 0); + mq->thread = Qnil; + } not.sigev_notify = SIGEV_SIGNAL; switch (TYPE(arg)) { @@ -725,6 +788,9 @@ static VALUE setnotify(VALUE self, VALUE arg) not.sigev_signo = lookup_sig(arg); rv = INT2NUM(not.sigev_signo); break; + case T_FILE: + setup_notify_io(¬, arg); + break; default: /* maybe support Proc+thread via sigev_notify_function.. */ rb_raise(rb_eArgError, "must be a signal or nil"); @@ -784,6 +850,15 @@ static VALUE setnonblock(VALUE self, VALUE nb) return nb; } +/* :nodoc: */ +static VALUE setnotifythread(VALUE self, VALUE thread) +{ + struct posix_mq *mq = get(self, 1); + + mq->thread = thread; + return thread; +} + void Init_posix_mq_ext(void) { cPOSIX_MQ = rb_define_class("POSIX_MQ", rb_cObject); @@ -822,12 +897,15 @@ void Init_posix_mq_ext(void) rb_define_method(cPOSIX_MQ, "name", name, 0); rb_define_method(cPOSIX_MQ, "notify=", setnotify, 1); rb_define_method(cPOSIX_MQ, "nonblock=", setnonblock, 1); + rb_define_method(cPOSIX_MQ, "notify_thread=", setnotifythread, 1); rb_define_method(cPOSIX_MQ, "nonblock?", getnonblock, 0); #ifdef MQD_TO_FD rb_define_method(cPOSIX_MQ, "to_io", to_io, 0); #endif id_new = rb_intern("new"); + id_kill = rb_intern("kill"); + id_fileno = rb_intern("fileno"); sym_r = ID2SYM(rb_intern("r")); sym_w = ID2SYM(rb_intern("w")); sym_rw = ID2SYM(rb_intern("rw")); |