From fa64d77ac096612d6bc731407066df8aa1ff79a7 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Wed, 9 Mar 2011 16:54:46 -0800 Subject: aggregate/pmq: we need a Mutex to protect fcntl() locks fcntl() locks are per-process, so we also need something to protect individual threads within a process from stepping over each other. --- lib/raindrops/aggregate/pmq.rb | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) (limited to 'lib/raindrops/aggregate/pmq.rb') diff --git a/lib/raindrops/aggregate/pmq.rb b/lib/raindrops/aggregate/pmq.rb index 14f73be..0e7246d 100644 --- a/lib/raindrops/aggregate/pmq.rb +++ b/lib/raindrops/aggregate/pmq.rb @@ -4,9 +4,13 @@ require "aggregate" require "posix_mq" require "fcntl" require "io/extra" +require "thread" # Aggregate + POSIX message queues support class Raindrops::Aggregate::PMQ + + # These constants are for Linux. Tthis is designed for aggregating + # TCP_INFO. RDLOCK = [ Fcntl::F_RDLCK ].pack("s @256") WRLOCK = [ Fcntl::F_WRLCK ].pack("s @256") UNLOCK = [ Fcntl::F_UNLCK ].pack("s @256") @@ -27,6 +31,7 @@ class Raindrops::Aggregate::PMQ @worker_interval = opts[:worker_interval] @aggregate = opts[:aggregate] @worker_queue = @worker_interval ? [] : nil + @mutex = Mutex.new @mq_name = opts[:queue] mq = POSIX_MQ.new @mq_name, :w, opts[:mq_umask], opts[:mq_attr] @@ -118,10 +123,14 @@ class Raindrops::Aggregate::PMQ end def synchronize io, type - lock! io, type - yield io - ensure - lock! io, UNLOCK + @mutex.synchronize do + begin + lock! io, type + yield io + ensure + lock! io, UNLOCK + end + end end def flush -- cgit v1.2.3-24-ge0c7