about summary refs log tree commit homepage
path: root/ext/posix_mq/posix_mq.c
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2011-02-27 10:54:42 +0000
committerEric Wong <normalperson@yhbt.net>2011-02-27 11:03:07 +0000
commitb78572aebcf764d314844caff43c438cc93257fc (patch)
tree9e497895bcf650e13a2ad20fa28060a43a9b9d5b /ext/posix_mq/posix_mq.c
parentfe77abb7dc4dbe6fb4054430ff7fceb98f5dc9b8 (diff)
downloadruby_posix_mq-b78572aebcf764d314844caff43c438cc93257fc.tar.gz
These are kinder and less exceptional than their
non-trying counterparts as generating exceptions
is expensive for common EAGAIN errors.
Diffstat (limited to 'ext/posix_mq/posix_mq.c')
-rw-r--r--ext/posix_mq/posix_mq.c89
1 files changed, 80 insertions, 9 deletions
diff --git a/ext/posix_mq/posix_mq.c b/ext/posix_mq/posix_mq.c
index 3f40185..b4b1678 100644
--- a/ext/posix_mq/posix_mq.c
+++ b/ext/posix_mq/posix_mq.c
@@ -62,6 +62,9 @@ static int MQ_IO_CLOSE(struct posix_mq *mq)
 }
 #endif
 
+# define PMQ_WANTARRAY (1<<0)
+# define PMQ_TRY       (1<<1)
+
 static VALUE cAttr;
 static ID id_new, id_kill, id_fileno, id_mul, id_divmod;
 static ID id_flags, id_maxmsg, id_msgsize, id_curmsgs;
@@ -505,6 +508,7 @@ static void setup_send_buffer(struct rw_args *x, VALUE buffer)
         x->msg_len = (size_t)RSTRING_LEN(buffer);
 }
 
+static VALUE _send(int sflags, int argc, VALUE *argv, VALUE self);
 /*
  * call-seq:
  *        mq.send(string [,priority[, timeout]])        => nil
@@ -518,7 +522,12 @@ static void setup_send_buffer(struct rw_args *x, VALUE buffer)
  * On some older systems, the +timeout+ argument is not currently
  * supported and may raise NotImplementedError if +timeout+ is used.
  */
-static VALUE _send(int argc, VALUE *argv, VALUE self)
+static VALUE my_send(int argc, VALUE *argv, VALUE self)
+{
+        _send(0, argc, argv, self);
+}
+
+static VALUE _send(int sflags, int argc, VALUE *argv, VALUE self)
 {
         struct posix_mq *mq = get(self, 1);
         struct rw_args x;
@@ -534,10 +543,13 @@ static VALUE _send(int argc, VALUE *argv, VALUE self)
         x.msg_prio = NIL_P(prio) ? 0 : NUM2UINT(prio);
 
         rv = (mqd_t)rb_thread_blocking_region(xsend, &x, RUBY_UBF_IO, 0);
-        if (rv == MQD_INVALID)
+        if (rv == MQD_INVALID) {
+                if (errno == EAGAIN && (sflags & PMQ_TRY))
+                        return Qfalse;
                 rb_sys_fail("mq_send");
+        }
 
-        return Qnil;
+        return (sflags & PMQ_TRY) ? Qtrue : Qnil;
 }
 
 /*
@@ -585,7 +597,7 @@ static VALUE to_io(VALUE self)
 }
 #endif
 
-static VALUE _receive(int wantarray, int argc, VALUE *argv, VALUE self);
+static VALUE _receive(int rflags, int argc, VALUE *argv, VALUE self);
 
 /*
  * call-seq:
@@ -608,7 +620,7 @@ static VALUE _receive(int wantarray, int argc, VALUE *argv, VALUE self);
  */
 static VALUE receive(int argc, VALUE *argv, VALUE self)
 {
-        return _receive(1, argc, argv, self);
+        return _receive(PMQ_WANTARRAY, argc, argv, self);
 }
 
 /*
@@ -634,7 +646,7 @@ static VALUE shift(int argc, VALUE *argv, VALUE self)
         return _receive(0, argc, argv, self);
 }
 
-static VALUE _receive(int wantarray, int argc, VALUE *argv, VALUE self)
+static VALUE _receive(int rflags, int argc, VALUE *argv, VALUE self)
 {
         struct posix_mq *mq = get(self, 1);
         struct rw_args x;
@@ -663,12 +675,15 @@ static VALUE _receive(int wantarray, int argc, VALUE *argv, VALUE self)
         x.des = mq->des;
 
         r = (ssize_t)rb_thread_blocking_region(xrecv, &x, RUBY_UBF_IO, 0);
-        if (r < 0)
+        if (r < 0) {
+                if (errno == EAGAIN && (rflags & PMQ_TRY))
+                        return Qnil;
                 rb_sys_fail("mq_receive");
+        }
 
         rb_str_set_len(buffer, r);
 
-        if (wantarray)
+        if (rflags & PMQ_WANTARRAY)
                 return rb_ary_new3(2, buffer, UINT2NUM(x.msg_prio));
         return buffer;
 }
@@ -968,6 +983,59 @@ static VALUE setnonblock(VALUE self, VALUE nb)
         return nb;
 }
 
+static VALUE tryinit(int argc, VALUE *argv, VALUE self)
+{
+        init(argc, argv, self);
+        setnonblock(self, Qtrue);
+
+        return self;
+}
+
+/*
+ * call-seq:
+ *        mq.trysend(string [,priority[, timeout]]) => +true+ or +false+
+ *
+ * Exactly like POSIX_MQ#send, except it returns +false+ instead of raising
+ * +Errno::EAGAIN+ when non-blocking operation is desired and returns +true+
+ * on success instead of +nil+.
+ * This does not guarantee non-blocking behavior, the message queue must
+ * be made non-blocking before calling this method.
+ */
+static VALUE trysend(int argc, VALUE *argv, VALUE self)
+{
+        _send(PMQ_TRY, argc, argv, self);
+}
+
+/*
+ * call-seq:
+ *        mq.tryshift([buffer [, timeout]])        => message or nil
+ *
+ * Exactly like POSIX_MQ#shift, except it returns +nil+ instead of raising
+ * +Errno::EAGAIN+ when non-blocking operation is desired.
+ *
+ * This does not guarantee non-blocking behavior, the message queue must
+ * be made non-blocking before calling this method.
+ */
+static VALUE tryshift(int argc, VALUE *argv, VALUE self)
+{
+        return _receive(PMQ_TRY, argc, argv, self);
+}
+
+/*
+ * call-seq:
+ *        mq.tryreceive([buffer [, timeout]])        => [ message, priority ] or nil
+ *
+ * Exactly like POSIX_MQ#receive, except it returns +nil+ instead of raising
+ * +Errno::EAGAIN+ when non-blocking operation is desired.
+ *
+ * This does not guarantee non-blocking behavior, the message queue must
+ * be made non-blocking before calling this method.
+ */
+static VALUE tryreceive(int argc, VALUE *argv, VALUE self)
+{
+        return _receive(PMQ_WANTARRAY|PMQ_TRY, argc, argv, self);
+}
+
 void Init_posix_mq_ext(void)
 {
         VALUE cPOSIX_MQ = rb_define_class("POSIX_MQ", rb_cObject);
@@ -995,10 +1063,13 @@ void Init_posix_mq_ext(void)
         rb_define_singleton_method(cPOSIX_MQ, "unlink", s_unlink, 1);
 
         rb_define_method(cPOSIX_MQ, "initialize", init, -1);
-        rb_define_method(cPOSIX_MQ, "send", _send, -1);
+        rb_define_method(cPOSIX_MQ, "send", my_send, -1);
         rb_define_method(cPOSIX_MQ, "<<", send0, 1);
+        rb_define_method(cPOSIX_MQ, "trysend", trysend, -1);
         rb_define_method(cPOSIX_MQ, "receive", receive, -1);
+        rb_define_method(cPOSIX_MQ, "tryreceive", tryreceive, -1);
         rb_define_method(cPOSIX_MQ, "shift", shift, -1);
+        rb_define_method(cPOSIX_MQ, "tryshift", tryshift, -1);
         rb_define_method(cPOSIX_MQ, "attr", getattr, 0);
         rb_define_method(cPOSIX_MQ, "attr=", setattr, 1);
         rb_define_method(cPOSIX_MQ, "close", _close, 0);