diff options
author | Eric Wong <normalperson@yhbt.net> | 2011-03-09 16:54:46 -0800 |
---|---|---|
committer | Eric Wong <normalperson@yhbt.net> | 2011-03-09 16:54:46 -0800 |
commit | fa64d77ac096612d6bc731407066df8aa1ff79a7 (patch) | |
tree | 2a6254b95e37a82d93eacfd86fe52a5921c6f714 | |
parent | b57881c86de4e7406ca537b5285f27d7130b0073 (diff) | |
download | raindrops-fa64d77ac096612d6bc731407066df8aa1ff79a7.tar.gz |
fcntl() locks are per-process, so we also need something to protect individual threads within a process from stepping over each other.
-rw-r--r-- | lib/raindrops/aggregate/pmq.rb | 17 |
1 files changed, 13 insertions, 4 deletions
diff --git a/lib/raindrops/aggregate/pmq.rb b/lib/raindrops/aggregate/pmq.rb index 14f73be..0e7246d 100644 --- a/lib/raindrops/aggregate/pmq.rb +++ b/lib/raindrops/aggregate/pmq.rb @@ -4,9 +4,13 @@ require "aggregate" require "posix_mq" require "fcntl" require "io/extra" +require "thread" # Aggregate + POSIX message queues support class Raindrops::Aggregate::PMQ + + # These constants are for Linux. Tthis is designed for aggregating + # TCP_INFO. RDLOCK = [ Fcntl::F_RDLCK ].pack("s @256") WRLOCK = [ Fcntl::F_WRLCK ].pack("s @256") UNLOCK = [ Fcntl::F_UNLCK ].pack("s @256") @@ -27,6 +31,7 @@ class Raindrops::Aggregate::PMQ @worker_interval = opts[:worker_interval] @aggregate = opts[:aggregate] @worker_queue = @worker_interval ? [] : nil + @mutex = Mutex.new @mq_name = opts[:queue] mq = POSIX_MQ.new @mq_name, :w, opts[:mq_umask], opts[:mq_attr] @@ -118,10 +123,14 @@ class Raindrops::Aggregate::PMQ end def synchronize io, type - lock! io, type - yield io - ensure - lock! io, UNLOCK + @mutex.synchronize do + begin + lock! io, type + yield io + ensure + lock! io, UNLOCK + end + end end def flush |