diff options
-rw-r--r-- | README | 6 | ||||
-rw-r--r-- | ext/posix_mq/extconf.rb | 2 | ||||
-rw-r--r-- | ext/posix_mq/posix_mq.c | 80 | ||||
-rw-r--r-- | lib/posix_mq.rb | 31 | ||||
-rw-r--r-- | test/test_posix_mq.rb | 25 |
5 files changed, 141 insertions, 3 deletions
@@ -11,7 +11,8 @@ network-aware message queue implementations. == Features -* Supports message notifications via signals. +* Supports message notifications via signals and block execution + in a separate thread. * Supports portable non-blocking operation. Under Linux 2.6.6+ and FreeBSD 7.2+, POSIX_MQ objects may even be used with event @@ -19,7 +20,8 @@ network-aware message queue implementations. * Optional timeouts may be applied to send and receive operations. -* Thread-safe under Ruby 1.9, releases GVL before blocking operations. +* Thread-safe blocking operations under Ruby 1.9, releases GVL + before blocking operations. * Documented library API diff --git a/ext/posix_mq/extconf.rb b/ext/posix_mq/extconf.rb index b9e8963..b473c13 100644 --- a/ext/posix_mq/extconf.rb +++ b/ext/posix_mq/extconf.rb @@ -4,9 +4,11 @@ have_header("sys/select.h") have_header("signal.h") have_header("mqueue.h") or abort "mqueue.h header missing" have_func("__mq_oshandle") +have_header("pthread.h") have_func("rb_str_set_len") have_func("rb_struct_alloc_noinit") have_func('rb_thread_blocking_region') have_library("rt") +have_library("pthread") dir_config("posix_mq") create_makefile("posix_mq_ext") diff --git a/ext/posix_mq/posix_mq.c b/ext/posix_mq/posix_mq.c index 883aa45..70d9125 100644 --- a/ext/posix_mq/posix_mq.c +++ b/ext/posix_mq/posix_mq.c @@ -5,6 +5,9 @@ #ifdef HAVE_SIGNAL_H # include <signal.h> #endif +#ifdef HAVE_PTHREAD_H +# include <pthread.h> +#endif #include <ruby.h> #include <time.h> @@ -34,13 +37,14 @@ struct posix_mq { mqd_t des; long msgsize; VALUE name; + VALUE thread; #ifdef MQD_TO_FD VALUE io; #endif }; static VALUE cPOSIX_MQ, cAttr; -static ID id_new; +static ID id_new, id_kill, id_fileno; static ID sym_r, sym_w, sym_rw; static const mqd_t MQD_INVALID = (mqd_t)-1; @@ -198,6 +202,7 @@ static void mark(void *ptr) struct posix_mq *mq = ptr; rb_gc_mark(mq->name); + rb_gc_mark(mq->thread); MQ_IO_MARK(mq); } @@ -225,6 +230,7 @@ static VALUE alloc(VALUE klass) mq->des = MQD_INVALID; mq->msgsize = -1; mq->name = Qnil; + mq->thread = Qnil; MQ_IO_SET(mq, Qnil); return rv; @@ -683,6 +689,53 @@ static int lookup_sig(VALUE sig) return NUM2INT(sig); } +/* we spawn a thread just to write ONE byte into an fd (usually a pipe) */ +static void thread_notify_fd(union sigval sv) +{ + int fd = sv.sival_int; + + while ((write(fd, "", 1) < 0) && (errno == EINTR || errno == EAGAIN)); +} + +static void setup_notify_io(struct sigevent *not, VALUE io) +{ + VALUE fileno = rb_funcall(io, id_fileno, 0, 0); + int fd = NUM2INT(fileno); + int flags; + pthread_attr_t attr; + int e; + + /* + * fd going to be written to inside a native thread, + * make it blocking for simplicity + */ + flags = fcntl(fd, F_GETFL); + if (flags < 0) { + rb_sys_fail("fcntl F_GETFL"); + } else if (flags & O_NONBLOCK) { + flags = fcntl(fd, F_SETFL, flags & ~O_NONBLOCK); + if (flags < 0) + rb_sys_fail("fcntl F_SETFL"); + } + + if ((e = pthread_attr_init(&attr))) + goto err; + if ((e = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED))) + goto err; +#ifdef PTHREAD_STACK_MIN + (void)pthread_attr_setstacksize(&attr, PTHREAD_STACK_MIN); +#else +# warning PTHREAD_STACK_MIN not available, +#endif + not->sigev_notify = SIGEV_THREAD; + not->sigev_notify_function = thread_notify_fd; + not->sigev_notify_attributes = &attr; + not->sigev_value.sival_int = fd; + return; +err: + rb_raise(rb_eRuntimeError, "pthread failure: %s\n", strerror(e)); +} + /* * call-seq: * mq.notify = signal => signal @@ -693,10 +746,16 @@ static int lookup_sig(VALUE sig) * request to allow other processes to register a request. * If +signal+ is +false+, it will register a no-op notification request * which will prevent other processes from registering a notification. + * If +signal+ is an +IO+ object, it will spawn a thread upon the + * arrival of the next message and write one "\\0" byte to the file + * descriptor belonging to that IO object. * Only one process may have a notification request for a queue * at a time, Errno::EBUSY will be raised if there is already * a notification request registration for the queue. * + * Notifications are only fired once and processes must reregister + * for subsequent notifications. + * * For readers of the mq_notify(3) manpage, passing +false+ * is equivalent to SIGEV_NONE, and passing +nil+ is equivalent * of passing a NULL notification pointer to mq_notify(3). @@ -708,6 +767,10 @@ static VALUE setnotify(VALUE self, VALUE arg) struct sigevent * notification = ¬ VALUE rv = arg; + if (!NIL_P(mq->thread)) { + rb_funcall(mq->thread, id_kill, 0, 0); + mq->thread = Qnil; + } not.sigev_notify = SIGEV_SIGNAL; switch (TYPE(arg)) { @@ -725,6 +788,9 @@ static VALUE setnotify(VALUE self, VALUE arg) not.sigev_signo = lookup_sig(arg); rv = INT2NUM(not.sigev_signo); break; + case T_FILE: + setup_notify_io(¬, arg); + break; default: /* maybe support Proc+thread via sigev_notify_function.. */ rb_raise(rb_eArgError, "must be a signal or nil"); @@ -784,6 +850,15 @@ static VALUE setnonblock(VALUE self, VALUE nb) return nb; } +/* :nodoc: */ +static VALUE setnotifythread(VALUE self, VALUE thread) +{ + struct posix_mq *mq = get(self, 1); + + mq->thread = thread; + return thread; +} + void Init_posix_mq_ext(void) { cPOSIX_MQ = rb_define_class("POSIX_MQ", rb_cObject); @@ -822,12 +897,15 @@ void Init_posix_mq_ext(void) rb_define_method(cPOSIX_MQ, "name", name, 0); rb_define_method(cPOSIX_MQ, "notify=", setnotify, 1); rb_define_method(cPOSIX_MQ, "nonblock=", setnonblock, 1); + rb_define_method(cPOSIX_MQ, "notify_thread=", setnotifythread, 1); rb_define_method(cPOSIX_MQ, "nonblock?", getnonblock, 0); #ifdef MQD_TO_FD rb_define_method(cPOSIX_MQ, "to_io", to_io, 0); #endif id_new = rb_intern("new"); + id_kill = rb_intern("kill"); + id_fileno = rb_intern("fileno"); sym_r = ID2SYM(rb_intern("r")); sym_w = ID2SYM(rb_intern("w")); sym_rw = ID2SYM(rb_intern("rw")); diff --git a/lib/posix_mq.rb b/lib/posix_mq.rb index 5e660ec..91f4140 100644 --- a/lib/posix_mq.rb +++ b/lib/posix_mq.rb @@ -26,7 +26,38 @@ class POSIX_MQ mq.close unless mq.closed? end end + end + # Executes the given block upon reception of the next message in an + # empty queue. If the message queue is not empty, then this block + # will only be fired after the queue is emptied and repopulated with + # one message. + # + # This block will only be executed upon the arrival of the + # first message and must be reset/reenabled for subsequent + # notifications. This block will execute in a separate Ruby + # Thread (and thus will safely have the GVL by default). + def notify(&block) + block.arity == 1 or + raise ArgumentError, "arity of notify block must be 1" + r, w = IO.pipe + thr = Thread.new(r, w, self) do |r, w, mq| + begin + begin + r.read(1) or raise Errno::EINTR + rescue Errno::EINTR, Errno::EAGAIN + retry + end + block.call(mq) + ensure + mq.notify_thread = nil + r.close rescue nil + w.close rescue nil + end + end + self.notify = w + self.notify_thread = thr + nil end end diff --git a/test/test_posix_mq.rb b/test/test_posix_mq.rb index dccc547..58ee6e5 100644 --- a/test/test_posix_mq.rb +++ b/test/test_posix_mq.rb @@ -1,6 +1,7 @@ # -*- encoding: binary -*- require 'test/unit' require 'posix_mq' +require 'thread' require 'fcntl' $stderr.sync = $stdout.sync = true @@ -238,4 +239,28 @@ class Test_POSIX_MQ < Test::Unit::TestCase assert POSIX_MQ::OPEN_MAX.kind_of?(Integer) end + def test_notify_block_replace + q = Queue.new + @mq = POSIX_MQ.new(@path, :rw) + assert_nothing_raised { @mq.notify { |mq| q << mq } } + @mq << "hi" + assert_equal POSIX_MQ, q.pop.class + assert_equal "hi", @mq.receive.first + assert_nothing_raised { @mq.notify { |mq| q << "hi" } } + @mq << "bye" + assert_equal "hi", q.pop + end + + def test_notify_thread + q = Queue.new + @mq = POSIX_MQ.new(@path, :rw) + @mq.notify_thread = thr = Thread.new { sleep } + assert thr.alive? + @mq.notify { |mq| q << Thread.current } + @mq << "." + x = q.pop + assert x.instance_of?(Thread) + assert Thread.current != x + assert ! thr.alive? + end end |