about summary refs log tree commit homepage
path: root/ext/posix_mq/posix_mq.c
diff options
context:
space:
mode:
Diffstat (limited to 'ext/posix_mq/posix_mq.c')
-rw-r--r--ext/posix_mq/posix_mq.c813
1 files changed, 813 insertions, 0 deletions
diff --git a/ext/posix_mq/posix_mq.c b/ext/posix_mq/posix_mq.c
new file mode 100644
index 0000000..e64cac3
--- /dev/null
+++ b/ext/posix_mq/posix_mq.c
@@ -0,0 +1,813 @@
+#define _XOPEN_SOURCE 600
+#include <ruby.h>
+
+#include <time.h>
+#include <mqueue.h>
+#include <fcntl.h>
+#include <sys/stat.h>
+#include <errno.h>
+#include <assert.h>
+#include <unistd.h>
+
+#if defined(__linux__)
+#  define MQD_IS_FD 1
+#  define MQ_IO_MARK(mq) rb_gc_mark((mq)->io)
+#  define MQ_IO_SET(mq,val) do { (mq)->io = (val); } while (0)
+#else
+#  warning mqd_t is not select()-able on your OS
+#  define MQD_IS_FD 0
+#  define MQ_IO_MARK(mq) ((void)(0))
+#  define MQ_IO_SET(mq,val) ((void)(0))
+#endif /* non-Linux */
+
+struct posix_mq {
+        mqd_t des;
+        long msgsize;
+        VALUE name;
+#if MQD_IS_FD
+        VALUE io;
+#endif
+};
+
+static VALUE cPOSIX_MQ, cAttr;
+static ID id_new;
+static ID sym_r, sym_w, sym_rw;
+static const mqd_t MQD_INVALID = (mqd_t)-1;
+
+/* Ruby 1.8.6+ macros (for compatibility with Ruby 1.9) */
+#ifndef RSTRING_PTR
+#  define RSTRING_PTR(s) (RSTRING(s)->ptr)
+#endif
+#ifndef RSTRING_LEN
+#  define RSTRING_LEN(s) (RSTRING(s)->len)
+#endif
+#ifndef RSTRUCT_PTR
+#  define RSTRUCT_PTR(s) (RSTRUCT(s)->ptr)
+#endif
+#ifndef RSTRUCT_LEN
+#  define RSTRUCT_LEN(s) (RSTRUCT(s)->len)
+#endif
+
+#ifndef HAVE_RB_STR_SET_LEN
+#  ifdef RUBINIUS
+#    define rb_str_set_len(str,len) rb_str_resize(str,len)
+#  else /* 1.8.6 optimized version */
+/* this is taken from Ruby 1.8.7, 1.8.6 may not have it */
+static void rb_18_str_set_len(VALUE str, long len)
+{
+        RSTRING(str)->len = len;
+        RSTRING(str)->ptr[len] = '\0';
+        rb_str_flush(str);
+}
+#    define rb_str_set_len(str,len) rb_18_str_set_len(str,len)
+#  endif /* ! RUBINIUS */
+#endif /* !defined(HAVE_RB_STR_SET_LEN) */
+
+#ifndef HAVE_RB_STRUCT_ALLOC_NOINIT
+static VALUE rb_struct_alloc_noinit(VALUE class)
+{
+        return rb_funcall(class, id_new, 0, 0);
+}
+#endif /* !defined(HAVE_RB_STRUCT_ALLOC_NOINIT) */
+
+/* partial emulation of the 1.9 rb_thread_blocking_region under 1.8 */
+#ifndef HAVE_RB_THREAD_BLOCKING_REGION
+#  include <rubysig.h>
+#  define RUBY_UBF_IO ((rb_unblock_function_t *)-1)
+typedef void rb_unblock_function_t(void *);
+typedef VALUE rb_blocking_function_t(void *);
+static VALUE
+rb_thread_blocking_region(
+        rb_blocking_function_t *func, void *data1,
+        rb_unblock_function_t *ubf, void *data2)
+{
+        VALUE rv;
+
+        assert(RUBY_UBF_IO == ubf && "RUBY_UBF_IO required for emulation");
+
+        TRAP_BEG;
+        rv = func(data1);
+        TRAP_END;
+
+        return rv;
+}
+#endif /* ! HAVE_RB_THREAD_BLOCKING_REGION */
+
+/* used to pass arguments to mq_open inside blocking region */
+struct open_args {
+        int argc;
+        const char *name;
+        int oflags;
+        mode_t mode;
+        struct mq_attr attr;
+};
+
+/* used to pass arguments to mq_send/mq_receive inside blocking region */
+struct rw_args {
+        mqd_t des;
+        char *msg_ptr;
+        size_t msg_len;
+        unsigned msg_prio;
+        struct timespec *timeout;
+};
+
+/* hope it's there..., TODO: a better version that works in rbx */
+struct timeval rb_time_interval(VALUE);
+
+static struct timespec *convert_timeout(struct timespec *dest, VALUE time)
+{
+        struct timeval tv, now;
+
+        if (NIL_P(time))
+                return NULL;
+
+        tv = rb_time_interval(time); /* aggregate return :( */
+        gettimeofday(&now, NULL);
+        dest->tv_sec = now.tv_sec + tv.tv_sec;
+        dest->tv_nsec = (now.tv_usec + tv.tv_usec) * 1000;
+
+        if (dest->tv_nsec > 1000000000) {
+                dest->tv_nsec -= 1000000000;
+                dest->tv_sec++;
+        }
+
+        return dest;
+}
+
+/* runs without GVL */
+static VALUE xopen(void *ptr)
+{
+        struct open_args *x = ptr;
+        mqd_t rv;
+
+        switch (x->argc) {
+        case 2: rv = mq_open(x->name, x->oflags); break;
+        case 3: rv = mq_open(x->name, x->oflags, x->mode, NULL); break;
+        case 4: rv = mq_open(x->name, x->oflags, x->mode, &x->attr); break;
+        default: rv = MQD_INVALID;
+        }
+
+        return (VALUE)rv;
+}
+
+/* runs without GVL */
+static VALUE xsend(void *ptr)
+{
+        struct rw_args *x = ptr;
+
+        if (x->timeout)
+                return (VALUE)mq_timedsend(x->des, x->msg_ptr, x->msg_len,
+                                           x->msg_prio, x->timeout);
+
+        return (VALUE)mq_send(x->des, x->msg_ptr, x->msg_len, x->msg_prio);
+}
+
+/* runs without GVL */
+static VALUE xrecv(void *ptr)
+{
+        struct rw_args *x = ptr;
+
+        if (x->timeout)
+                return (VALUE)mq_timedreceive(x->des, x->msg_ptr, x->msg_len,
+                                              &x->msg_prio, x->timeout);
+
+        return (VALUE)mq_receive(x->des, x->msg_ptr, x->msg_len, &x->msg_prio);
+}
+
+/* runs without GVL, path resolution may be slow */
+static VALUE xunlink(void *ptr)
+{
+        VALUE name = (VALUE)ptr;
+
+        return (VALUE)mq_unlink(RSTRING_PTR(name));
+}
+
+/* called by GC */
+static void mark(void *ptr)
+{
+        struct posix_mq *mq = ptr;
+
+        rb_gc_mark(mq->name);
+        MQ_IO_MARK(mq);
+}
+
+/* called by GC */
+static void _free(void *ptr)
+{
+        struct posix_mq *mq = ptr;
+
+        if (mq->des != MQD_INVALID) {
+                /* we ignore errors when gc-ing */
+                int saved_errno = errno;
+
+                mq_close(mq->des);
+                errno = saved_errno;
+                mq->des = MQD_INVALID;
+        }
+}
+
+/* automatically called at creation (before initialize) */
+static VALUE alloc(VALUE klass)
+{
+        struct posix_mq *mq;
+        VALUE rv = Data_Make_Struct(klass, struct posix_mq, mark, _free, mq);
+
+        mq->des = MQD_INVALID;
+        mq->msgsize = -1;
+        mq->name = Qnil;
+        MQ_IO_SET(mq, Qnil);
+
+        return rv;
+}
+
+/* unwraps the posix_mq struct from self */
+static struct posix_mq *get(VALUE self, int need_valid)
+{
+        struct posix_mq *mq;
+
+        Data_Get_Struct(self, struct posix_mq, mq);
+
+        if (need_valid && mq->des == MQD_INVALID)
+                rb_raise(rb_eIOError, "closed queue descriptor");
+
+        return mq;
+}
+
+/* converts the POSIX_MQ::Attr astruct into a struct mq_attr attr */
+static void attr_from_struct(struct mq_attr *attr, VALUE astruct, int all)
+{
+        VALUE *ptr;
+
+        if (CLASS_OF(astruct) != cAttr)
+                rb_raise(rb_eArgError, "not a POSIX_MQ::Attr: %s",
+                         RSTRING_PTR(rb_inspect(astruct)));
+
+        ptr = RSTRUCT_PTR(astruct);
+
+        attr->mq_flags = NUM2LONG(ptr[0]);
+
+        if (all || !NIL_P(ptr[1]))
+                attr->mq_maxmsg = NUM2LONG(ptr[1]);
+        if (all || !NIL_P(ptr[2]))
+                attr->mq_msgsize = NUM2LONG(ptr[2]);
+        if (!NIL_P(ptr[3]))
+                attr->mq_curmsgs = NUM2LONG(ptr[3]);
+}
+
+/*
+ * call-seq:
+ *        POSIX_MQ.new(name [, flags [, mode [, mq_attr]])        => mq
+ *
+ * Opens a POSIX message queue given by +name+.  +name+ should start
+ * with a slash ("/") for portable applications.
+ *
+ * If a Symbol is given in place of integer +flags+, then:
+ *
+ * * +:r+ is equivalent to IO::RDONLY
+ * * +:w+ is equivalent to IO::CREAT|IO::WRONLY
+ * * +:rw+ is equivalent to IO::CREAT|IO::RDWR
+ *
+ * +mode+ is an integer and only used when IO::CREAT is used.
+ * +mq_attr+ is a POSIX_MQ::Attr and only used if IO::CREAT is used.
+ * If +mq_attr+ is not specified when creating a queue, then the
+ * system defaults will be used.
+ *
+ * See the manpage for mq_open(3) for more details on this function.
+ */
+static VALUE init(int argc, VALUE *argv, VALUE self)
+{
+        struct posix_mq *mq = get(self, 0);
+        struct open_args x;
+        VALUE name, oflags, mode, attr;
+
+        rb_scan_args(argc, argv, "13", &name, &oflags, &mode, &attr);
+
+        if (TYPE(name) != T_STRING)
+                rb_raise(rb_eArgError, "name must be a string");
+
+        switch (TYPE(oflags)) {
+        case T_NIL:
+                x.oflags = O_RDONLY;
+                break;
+        case T_SYMBOL:
+                if (oflags == sym_r)
+                        x.oflags = O_RDONLY;
+                else if (oflags == sym_w)
+                        x.oflags = O_CREAT|O_WRONLY;
+                else if (oflags == sym_rw)
+                        x.oflags = O_CREAT|O_RDWR;
+                else
+                        rb_raise(rb_eArgError,
+                                 "symbol must be :r, :w, or :rw: %s",
+                                 RSTRING_PTR(rb_inspect(oflags)));
+                break;
+        case T_BIGNUM:
+        case T_FIXNUM:
+                x.oflags = NUM2INT(oflags);
+                break;
+        default:
+                rb_raise(rb_eArgError, "flags must be an int, :r, :w, or :wr");
+        }
+
+        x.name = RSTRING_PTR(name);
+        x.argc = 2;
+
+        switch (TYPE(mode)) {
+        case T_FIXNUM:
+                x.argc = 3;
+                x.mode = NUM2INT(mode);
+                break;
+        case T_NIL:
+                if (x.oflags & O_CREAT) {
+                        x.argc = 3;
+                        x.mode = 0666;
+                }
+                break;
+        default:
+                rb_raise(rb_eArgError, "mode not an integer");
+        }
+
+        switch (TYPE(attr)) {
+        case T_STRUCT:
+                x.argc = 4;
+                attr_from_struct(&x.attr, attr, 1);
+
+                /* principle of least surprise */
+                if (x.attr.mq_flags & O_NONBLOCK)
+                        x.oflags |= O_NONBLOCK;
+                break;
+        case T_NIL:
+                break;
+        default:
+                rb_raise(rb_eArgError, "attr must be a POSIX_MQ::Attr: %s",
+                         RSTRING_PTR(rb_inspect(attr)));
+        }
+
+        mq->des = (mqd_t)rb_thread_blocking_region(xopen, &x, RUBY_UBF_IO, 0);
+        if (mq->des == MQD_INVALID)
+                rb_sys_fail("mq_open");
+
+        mq->name = rb_str_dup(name);
+
+        return self;
+}
+
+/*
+ * call-seq:
+ *        POSIX_MQ.unlink(name) =>        1
+ *
+ * Unlinks the message queue given by +name+.  The queue will be destroyed
+ * when the last process with the queue open closes its queue descriptors.
+ */
+static VALUE s_unlink(VALUE self, VALUE name)
+{
+        mqd_t rv;
+        void *ptr = (void *)name;
+
+        if (TYPE(name) != T_STRING)
+                rb_raise(rb_eArgError, "argument must be a string");
+
+        rv = (mqd_t)rb_thread_blocking_region(xunlink, ptr, RUBY_UBF_IO, 0);
+        if (rv == MQD_INVALID)
+                rb_sys_fail("mq_unlink");
+
+        return INT2NUM(1);
+}
+
+/*
+ * call-seq:
+ *        mq.unlink =>        mq
+ *
+ * Unlinks the message queue to prevent other processes from accessing it.
+ * All existing queue descriptors to this queue including those opened by
+ * other processes are unaffected.  The queue will only be destroyed
+ * when the last process with open descriptors to this queue closes
+ * the descriptors.
+ */
+static VALUE _unlink(VALUE self)
+{
+        struct posix_mq *mq = get(self, 0);
+        mqd_t rv;
+        void *ptr = (void *)mq->name;
+
+        assert(TYPE(mq->name) == T_STRING && "mq->name is not a string");
+
+        rv = (mqd_t)rb_thread_blocking_region(xunlink, ptr, RUBY_UBF_IO, 0);
+        if (rv == MQD_INVALID)
+                rb_sys_fail("mq_unlink");
+
+        return self;
+}
+
+static void setup_send_buffer(struct rw_args *x, VALUE buffer)
+{
+        buffer = rb_obj_as_string(buffer);
+        x->msg_ptr = RSTRING_PTR(buffer);
+        x->msg_len = (size_t)RSTRING_LEN(buffer);
+}
+
+/*
+ * call-seq:
+ *        mq.send(string [,priority[, timeout]])        => nil
+ *
+ * Inserts the given +string+ into the message queue with an optional,
+ * unsigned integer +priority+.  If the optional +timeout+ is specified,
+ * then Errno::ETIMEDOUT will be raised if the operation cannot complete
+ * before +timeout+ seconds has elapsed.  Without +timeout+, this method
+ * may block until the queue is writable.
+ */
+static VALUE _send(int argc, VALUE *argv, VALUE self)
+{
+        struct posix_mq *mq = get(self, 1);
+        struct rw_args x;
+        VALUE buffer, prio, timeout;
+        mqd_t rv;
+        struct timespec expire;
+
+        rb_scan_args(argc, argv, "12", &buffer, &prio, &timeout);
+
+        setup_send_buffer(&x, buffer);
+        x.des = mq->des;
+        x.timeout = convert_timeout(&expire, timeout);
+        x.msg_prio = NIL_P(prio) ? 0 : NUM2UINT(prio);
+
+        rv = (mqd_t)rb_thread_blocking_region(xsend, &x, RUBY_UBF_IO, 0);
+        if (rv == MQD_INVALID)
+                rb_sys_fail("mq_send");
+
+        return Qnil;
+}
+
+/*
+ * call-seq:
+ *        mq << string        => mq
+ *
+ * Inserts the given +string+ into the message queue with a
+ * default priority of 0 and no timeout.
+ */
+static VALUE send0(VALUE self, VALUE buffer)
+{
+        struct posix_mq *mq = get(self, 1);
+        struct rw_args x;
+        mqd_t rv;
+
+        setup_send_buffer(&x, buffer);
+        x.des = mq->des;
+        x.timeout = NULL;
+        x.msg_prio = 0;
+
+        rv = (mqd_t)rb_thread_blocking_region(xsend, &x, RUBY_UBF_IO, 0);
+        if (rv == MQD_INVALID)
+                rb_sys_fail("mq_send");
+
+        return self;
+}
+
+#if MQD_IS_FD
+/*
+ * call-seq:
+ *        mq.to_io        => IO
+ *
+ * Returns an IO.select-able +IO+ object.  This method is only available
+ * under Linux and is not intended to be portable.
+ */
+static VALUE to_io(VALUE self)
+{
+        struct posix_mq *mq = get(self, 1);
+
+        if (NIL_P(mq->io))
+                mq->io = rb_funcall(rb_cIO, id_new, 1, INT2NUM(mq->des));
+
+        return mq->io;
+}
+#endif
+
+static void get_msgsize(struct posix_mq *mq)
+{
+        struct mq_attr attr;
+
+        if (mq_getattr(mq->des, &attr) == MQD_INVALID)
+                rb_sys_fail("mq_getattr");
+
+        mq->msgsize = attr.mq_msgsize;
+}
+
+/*
+ * call-seq:
+ *        mq.receive([buffer, [timeout]])                => [ message, priority ]
+ *
+ * Takes the highest priority message off the queue and returns
+ * an array containing the message as a String and the Integer
+ * priority of the message.
+ *
+ * If the optional +buffer+ is present, then it must be a String
+ * which will receive the data.
+ *
+ * If the optional +timeout+ is present, then it may be a Float
+ * or Integer specifying the timeout in seconds.  Errno::ETIMEDOUT
+ * will be raised if +timeout+ has elapsed and there are no messages
+ * in the queue.
+ */
+static VALUE receive(int argc, VALUE *argv, VALUE self)
+{
+        struct posix_mq *mq = get(self, 1);
+        struct rw_args x;
+        VALUE buffer, timeout;
+        ssize_t r;
+        struct timespec expire;
+
+        if (mq->msgsize < 0)
+                get_msgsize(mq);
+
+        rb_scan_args(argc, argv, "02", &buffer, &timeout);
+        x.timeout = convert_timeout(&expire, timeout);
+
+        if (NIL_P(buffer)) {
+                buffer = rb_str_new(0, mq->msgsize);
+        } else {
+                StringValue(buffer);
+                rb_str_modify(buffer);
+                rb_str_resize(buffer, mq->msgsize);
+        }
+        OBJ_TAINT(buffer);
+        x.msg_ptr = RSTRING_PTR(buffer);
+        x.msg_len = (size_t)mq->msgsize;
+        x.des = mq->des;
+
+        r = (ssize_t)rb_thread_blocking_region(xrecv, &x, RUBY_UBF_IO, 0);
+        if (r < 0)
+                rb_sys_fail("mq_receive");
+
+        rb_str_set_len(buffer, r);
+
+        return rb_ary_new3(2, buffer, UINT2NUM(x.msg_prio));
+}
+
+/*
+ * call-seq:
+ *        mq.attr        =>         mq_attr
+ *
+ * Returns a POSIX_MQ::Attr struct containing the attributes
+ * of the message queue.  See the mq_getattr(3) manpage for
+ * more details.
+ */
+static VALUE getattr(VALUE self)
+{
+        struct posix_mq *mq = get(self, 1);
+        struct mq_attr attr;
+        VALUE astruct;
+        VALUE *ptr;
+
+        if (mq_getattr(mq->des, &attr) == MQD_INVALID)
+                rb_sys_fail("mq_getattr");
+
+        astruct = rb_struct_alloc_noinit(cAttr);
+        ptr = RSTRUCT_PTR(astruct);
+        ptr[0] = LONG2NUM(attr.mq_flags);
+        ptr[1] = LONG2NUM(attr.mq_maxmsg);
+        ptr[2] = LONG2NUM(attr.mq_msgsize);
+        ptr[3] = LONG2NUM(attr.mq_curmsgs);
+
+        return astruct;
+}
+
+/*
+ * call-seq:
+ *        mq.attr = POSIX_MQ::Attr(IO::NONBLOCK)                => mq_attr
+ *
+ * Only the IO::NONBLOCK flag may be set or unset (zero) in this manner.
+ * See the mq_setattr(3) manpage for more details.
+ *
+ * Consider using the POSIX_MQ#nonblock= method as it is easier and
+ * more natural to use.
+ */
+static VALUE setattr(VALUE self, VALUE astruct)
+{
+        struct posix_mq *mq = get(self, 1);
+        struct mq_attr newattr;
+
+        attr_from_struct(&newattr, astruct, 0);
+
+        if (mq_setattr(mq->des, &newattr, NULL) == MQD_INVALID)
+                rb_sys_fail("mq_setattr");
+
+        return astruct;
+}
+
+/*
+ * call-seq:
+ *        mq.close        => nil
+ *
+ * Closes the underlying message queue descriptor.
+ * If this descriptor had a registered notification request, the request
+ * will be removed so another descriptor or process may register a
+ * notification request.  Message queue descriptors are automatically
+ * closed by garbage collection.
+ */
+static VALUE _close(VALUE self)
+{
+        struct posix_mq *mq = get(self, 1);
+
+        if (mq_close(mq->des) == MQD_INVALID)
+                rb_sys_fail("mq_close");
+
+        mq->des = MQD_INVALID;
+        MQ_IO_SET(mq, Qnil);
+
+        return Qnil;
+}
+
+/*
+ *  call-seq:
+ *        mq.closed?        => true or false
+ *
+ *  Returns +true+ if the message queue descriptor is closed and therefore
+ *  unusable, otherwise +false+
+ */
+static VALUE closed(VALUE self)
+{
+        struct posix_mq *mq = get(self, 0);
+
+        return mq->des == MQD_INVALID ? Qtrue : Qfalse;
+}
+
+/*
+ * call-seq:
+ *        mq.name                => string
+ *
+ * Returns the string name of message queue associated with +mq+
+ */
+static VALUE name(VALUE self)
+{
+        struct posix_mq *mq = get(self, 0);
+
+        return mq->name;
+}
+
+static int lookup_sig(VALUE sig)
+{
+        static VALUE list;
+        const char *ptr;
+        long len;
+
+        sig = rb_obj_as_string(sig);
+        len = RSTRING_LEN(sig);
+        ptr = RSTRING_PTR(sig);
+
+        if (len > 3 && !memcmp("SIG", ptr, 3))
+                sig = rb_str_new(ptr + 3, len - 3);
+
+        if (!list) {
+                VALUE mSignal = rb_define_module("Signal"""); /* avoid RDoc */
+
+                list = rb_funcall(mSignal, rb_intern("list"), 0, 0);
+                rb_global_variable(&list);
+        }
+
+        sig = rb_hash_aref(list, sig);
+        if (NIL_P(sig))
+                rb_raise(rb_eArgError, "invalid signal: %s\n",
+                         RSTRING_PTR(rb_inspect(sig)));
+
+        return NUM2INT(sig);
+}
+
+/*
+ * call-seq:
+ *        mq.notify = signal        => signal
+ *
+ * Registers the notification request to deliver a given +signal+
+ * to the current process when message is received.
+ * If +signal+ is +nil+, it will unregister and disable the notification
+ * request to allow other processes to register a request.
+ * Only one process may have a notification request for a queue
+ * at a time, Errno::EBUSY will be raised if there is already
+ * a notification request registration for the queue.
+ */
+static VALUE setnotify(VALUE self, VALUE arg)
+{
+        struct posix_mq *mq = get(self, 1);
+        struct sigevent not;
+        VALUE rv = arg;
+
+        not.sigev_notify = SIGEV_SIGNAL;
+
+        switch (TYPE(arg)) {
+        case T_FIXNUM:
+                not.sigev_signo = NUM2INT(arg);
+                break;
+        case T_SYMBOL:
+        case T_STRING:
+                not.sigev_signo = lookup_sig(arg);
+                rv = INT2NUM(not.sigev_signo);
+                break;
+        case T_NIL:
+                not.sigev_notify = SIGEV_NONE;
+                break;
+        default:
+                /* maybe support Proc+thread via sigev_notify_function.. */
+                rb_raise(rb_eArgError, "must be a signal or nil");
+        }
+
+        if (mq_notify(mq->des, &not) == MQD_INVALID)
+                rb_sys_fail("mq_notify");
+
+        return rv;
+}
+
+/*
+ * call-seq:
+ *        mq.nonblock?        => true or false
+ *
+ * Returns the current non-blocking state of the message queue descriptor.
+ */
+static VALUE getnonblock(VALUE self)
+{
+        struct mq_attr attr;
+        struct posix_mq *mq = get(self, 1);
+
+        if (mq_getattr(mq->des, &attr) == MQD_INVALID)
+                rb_sys_fail("mq_getattr");
+
+        mq->msgsize = attr.mq_msgsize; /* optimization */
+
+        return attr.mq_flags & O_NONBLOCK ? Qtrue : Qfalse;
+}
+
+/*
+ * call-seq:
+ *        mq.nonblock = boolean        => boolean
+ *
+ * Enables or disables non-blocking operation for the message queue
+ * descriptor.  Errno::EAGAIN will be raised in situations where
+ * the queue would block.  This is not compatible with +timeout+
+ * arguments to POSIX_MQ#send and POSIX_MQ#receive.
+ */
+static VALUE setnonblock(VALUE self, VALUE nb)
+{
+        struct mq_attr newattr, oldattr;
+        struct posix_mq *mq = get(self, 1);
+
+        if (nb == Qtrue)
+                newattr.mq_flags = O_NONBLOCK;
+        else if (nb == Qfalse)
+                newattr.mq_flags = 0;
+        else
+                rb_raise(rb_eArgError, "must be true or false");
+
+        if (mq_setattr(mq->des, &newattr, &oldattr) == MQD_INVALID)
+                rb_sys_fail("mq_setattr");
+
+        mq->msgsize = oldattr.mq_msgsize; /* optimization */
+
+        return nb;
+}
+
+void Init_posix_mq_ext(void)
+{
+        cPOSIX_MQ = rb_define_class("POSIX_MQ", rb_cObject);
+        rb_define_alloc_func(cPOSIX_MQ, alloc);
+        cAttr = rb_const_get(cPOSIX_MQ, rb_intern("Attr"));
+
+        /*
+         * The maximum number of open message descriptors supported
+         * by the system.  This may be -1, in which case it is dynamically
+         * set at runtime.  Consult your operating system documentation
+         * for system-specific information about this.
+         */
+        rb_define_const(cPOSIX_MQ, "OPEN_MAX",
+                        LONG2NUM(sysconf(_SC_MQ_OPEN_MAX)));
+
+        /*
+         * The maximum priority that may be specified for POSIX_MQ#send
+         * On POSIX-compliant systems, this is at least 31, but some
+         * systems allow higher limits.
+         * The minimum priority is always zero.
+         */
+        rb_define_const(cPOSIX_MQ, "PRIO_MAX",
+                        LONG2NUM(sysconf(_SC_MQ_PRIO_MAX)));
+
+        rb_define_singleton_method(cPOSIX_MQ, "unlink", s_unlink, 1);
+
+        rb_define_method(cPOSIX_MQ, "initialize", init, -1);
+        rb_define_method(cPOSIX_MQ, "send", _send, -1);
+        rb_define_method(cPOSIX_MQ, "<<", send0, 1);
+        rb_define_method(cPOSIX_MQ, "receive", receive, -1);
+        rb_define_method(cPOSIX_MQ, "attr", getattr, 0);
+        rb_define_method(cPOSIX_MQ, "attr=", setattr, 1);
+        rb_define_method(cPOSIX_MQ, "close", _close, 0);
+        rb_define_method(cPOSIX_MQ, "closed?", closed, 0);
+        rb_define_method(cPOSIX_MQ, "unlink", _unlink, 0);
+        rb_define_method(cPOSIX_MQ, "name", name, 0);
+        rb_define_method(cPOSIX_MQ, "notify=", setnotify, 1);
+        rb_define_method(cPOSIX_MQ, "nonblock=", setnonblock, 1);
+        rb_define_method(cPOSIX_MQ, "nonblock?", getnonblock, 0);
+#if MQD_IS_FD
+        rb_define_method(cPOSIX_MQ, "to_io", to_io, 0);
+#endif
+
+        id_new = rb_intern("new");
+        sym_r = ID2SYM(rb_intern("r"));
+        sym_w = ID2SYM(rb_intern("w"));
+        sym_rw = ID2SYM(rb_intern("rw"));
+}