about summary refs log tree commit homepage
path: root/lib
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2011-03-09 16:54:46 -0800
committerEric Wong <normalperson@yhbt.net>2011-03-09 16:54:46 -0800
commitfa64d77ac096612d6bc731407066df8aa1ff79a7 (patch)
tree2a6254b95e37a82d93eacfd86fe52a5921c6f714 /lib
parentb57881c86de4e7406ca537b5285f27d7130b0073 (diff)
downloadraindrops-fa64d77ac096612d6bc731407066df8aa1ff79a7.tar.gz
fcntl() locks are per-process, so we also need something
to protect individual threads within a process from stepping
over each other.
Diffstat (limited to 'lib')
-rw-r--r--lib/raindrops/aggregate/pmq.rb17
1 files changed, 13 insertions, 4 deletions
diff --git a/lib/raindrops/aggregate/pmq.rb b/lib/raindrops/aggregate/pmq.rb
index 14f73be..0e7246d 100644
--- a/lib/raindrops/aggregate/pmq.rb
+++ b/lib/raindrops/aggregate/pmq.rb
@@ -4,9 +4,13 @@ require "aggregate"
 require "posix_mq"
 require "fcntl"
 require "io/extra"
+require "thread"
 
 # Aggregate + POSIX message queues support
 class Raindrops::Aggregate::PMQ
+
+  # These constants are for Linux.  Tthis is designed for aggregating
+  # TCP_INFO.
   RDLOCK = [ Fcntl::F_RDLCK ].pack("s @256")
   WRLOCK = [ Fcntl::F_WRLCK ].pack("s @256")
   UNLOCK = [ Fcntl::F_UNLCK ].pack("s @256")
@@ -27,6 +31,7 @@ class Raindrops::Aggregate::PMQ
     @worker_interval = opts[:worker_interval]
     @aggregate = opts[:aggregate]
     @worker_queue = @worker_interval ? [] : nil
+    @mutex = Mutex.new
 
     @mq_name = opts[:queue]
     mq = POSIX_MQ.new @mq_name, :w, opts[:mq_umask], opts[:mq_attr]
@@ -118,10 +123,14 @@ class Raindrops::Aggregate::PMQ
   end
 
   def synchronize io, type
-    lock! io, type
-    yield io
-    ensure
-      lock! io, UNLOCK
+    @mutex.synchronize do
+      begin
+        lock! io, type
+        yield io
+      ensure
+        lock! io, UNLOCK
+      end
+    end
   end
 
   def flush