From 61962b27a51031965cef70451d369b115868fb11 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Thu, 10 Mar 2011 10:51:38 +0000 Subject: rdoc: 100% documentation coverage! Of course, RDoc doesn't know quantity vs quality :) --- lib/raindrops/aggregate/pmq.rb | 94 +++++++++++++++++++++++++++++++++++++++--- 1 file changed, 89 insertions(+), 5 deletions(-) (limited to 'lib/raindrops/aggregate/pmq.rb') diff --git a/lib/raindrops/aggregate/pmq.rb b/lib/raindrops/aggregate/pmq.rb index 0e7246d..6497ce1 100644 --- a/lib/raindrops/aggregate/pmq.rb +++ b/lib/raindrops/aggregate/pmq.rb @@ -6,17 +6,62 @@ require "fcntl" require "io/extra" require "thread" -# Aggregate + POSIX message queues support +# \Aggregate + POSIX message queues support for Ruby 1.9 and \Linux +# +# This class is duck-type compatible with \Aggregate and allows us to +# aggregate and share statistics from multiple processes/threads aided +# POSIX message queues. This is designed to be used with the +# Raindrops::LastDataRecv Rack application, but can be used independently +# on compatible Runtimes. +# +# Unlike the core of raindrops, this is only supported on Ruby 1.9 and +# Linux 2.6. Using this class requires the following additional RubyGems +# or libraries: +# +# * aggregate (tested with 0.2.2) +# * io-extra (tested with 1.2.3) +# * posix_mq (tested with 1.0.0) +# +# == Design +# +# There is one master thread which aggregates statistics. Individual +# worker processes or threads will write to a shared POSIX message +# queue (default: "/raindrops") that the master reads from. At a +# predefined interval, the master thread will write out to a shared, +# anonymous temporary file that workers may read from +# +# Setting +:worker_interval+ and +:master_interval+ to +1+ will result +# in perfect accuracy but at the cost of a high synchronization +# overhead. Larger intervals mean less frequent messaging for higher +# performance but lower accuracy. class Raindrops::Aggregate::PMQ - # These constants are for Linux. Tthis is designed for aggregating + # :stopdoc: + # These constants are for Linux. This is designed for aggregating # TCP_INFO. RDLOCK = [ Fcntl::F_RDLCK ].pack("s @256") WRLOCK = [ Fcntl::F_WRLCK ].pack("s @256") UNLOCK = [ Fcntl::F_UNLCK ].pack("s @256") + # :startdoc: + # returns the number of dropped messages sent to a POSIX message + # queue if non-blocking operation was desired with :lossy attr_reader :nr_dropped + # + # Creates a new Raindrops::Aggregate::PMQ object + # + # Raindrops::Aggregate::PMQ.new(options = {}) -> aggregate + # + # +options+ is a hash that accepts the following keys: + # + # * :queue - name of the POSIX message queue (default: "/raindrops") + # * :worker_interval - interval to send to the master (default: 10) + # * :master_interval - interval to for the master to write out (default: 5) + # * :lossy - workers drop packets if master cannot keep up (default: false) + # * :aggregate - \Aggregate object (default: \Aggregate.new) + # * :mq_umask - umask for creatingthe POSIX message queue (default: 0666) + # def initialize(params = {}) opts = { :queue => ENV["RAINDROPS_MQUEUE"] || "/raindrops", @@ -50,6 +95,7 @@ class Raindrops::Aggregate::PMQ end end + # adds a sample to the underlying \Aggregate object def << val if q = @worker_queue q << val @@ -62,11 +108,18 @@ class Raindrops::Aggregate::PMQ end end - def mq_send(val) + def mq_send(val) # :nodoc: @cached_aggregate = nil @mq_send.call Marshal.dump(val) end + # + # Starts running a master loop, usually in a dedicated thread or process: + # + # Thread.new { agg.master_loop } + # + # Any worker can call +agg.stop_master_loop+ to stop the master loop + # (possibly causing the thread or process to exit) def master_loop buf = "" a = @aggregate @@ -93,6 +146,7 @@ class Raindrops::Aggregate::PMQ flush_master end + # Loads the last shared \Aggregate from the master thread/process def aggregate @cached_aggregate ||= begin flush @@ -102,6 +156,9 @@ class Raindrops::Aggregate::PMQ end end + # Flushes the currently aggregate statistics to a temporary file. + # There is no need to call this explicitly as +:worker_interval+ defines + # how frequently your data will be flushed for workers to read. def flush_master dump = Marshal.dump @aggregate synchronize(@wr, WRLOCK) do |wr| @@ -110,19 +167,22 @@ class Raindrops::Aggregate::PMQ end end + # stops the currently running master loop, may be called from any + # worker thread or process def stop_master_loop sleep 0.1 until mq_send(false) rescue Errno::EINTR retry end - def lock! io, type + def lock! io, type # :nodoc: io.fcntl Fcntl::F_SETLKW, type rescue Errno::EINTR retry end - def synchronize io, type + # we use both a mutex for thread-safety and fcntl lock for process-safety + def synchronize io, type # :nodoc: @mutex.synchronize do begin lock! io, type @@ -133,6 +193,9 @@ class Raindrops::Aggregate::PMQ end end + # flushes the local queue of the worker process, sending all pending + # data to the master. There is no need to call this explicitly as + # +:worker_interval+ defines how frequently your queue will be flushed def flush if q = @local_queue && ! q.empty? mq_send q @@ -141,15 +204,36 @@ class Raindrops::Aggregate::PMQ nil end + # proxy for \Aggregate#count def count; aggregate.count; end + + # proxy for \Aggregate#max def max; aggregate.max; end + + # proxy for \Aggregate#min def min; aggregate.min; end + + # proxy for \Aggregate#sum def sum; aggregate.sum; end + + # proxy for \Aggregate#mean def mean; aggregate.mean; end + + # proxy for \Aggregate#std_dev def std_dev; aggregate.std_dev; end + + # proxy for \Aggregate#outliers_low def outliers_low; aggregate.outliers_low; end + + # proxy for \Aggregate#outliers_high def outliers_high; aggregate.outliers_high; end + + # proxy for \Aggregate#to_s def to_s(*args); aggregate.to_s *args; end + + # proxy for \Aggregate#each def each; aggregate.each { |*args| yield *args }; end + + # proxy for \Aggregate#each_nonzero def each_nonzero; aggregate.each_nonzero { |*args| yield *args }; end end -- cgit v1.2.3-24-ge0c7