ruby_posix_mq.git  about / heads / tags
POSIX message queues for Ruby
blob fa44b94e10e3c5021a4b1b57ffe774bd290e4792 3759 bytes (raw)
$ git show v0.1.0:bin/posix-mq.rb	# shows this blob on the CLI

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
 
#!/usr/bin/ruby
# -*- encoding: binary -*-
Encoding.default_external = Encoding::BINARY if defined?(Encoding)
$stderr.sync = $stdout.sync = true

require 'posix_mq'
require 'optparse'

commands = %w(create attr send receive wait unlink)
usage = "Usage: MQUEUE=/name #{File.basename($0)} COMMAND " \
        "[options] [<arguments>]\n" \
        "COMMAND may be one of: #{commands.join(', ')}"

mqueue = ENV["MQUEUE"] or abort usage
command = ARGV.shift or abort usage
commands.include?(command) or abort usage

priority = nil
timeout = nil
mode = 0666
oflags = IO::RDONLY
mq_attr = nil
nonblock = false
command = command.to_sym

ARGV.options do |x|
  x.banner = usage.split(/\n/).first.gsub(/COMMAND/, command.to_s)
  x.separator ''

  case command
  when :create
    oflags |= IO::CREAT
    x.on('-x', '--exclusive', "exclusive create") {
      oflags |= IO::EXCL
    }
    x.on('-m', '--mode=MODE', "octal file mode") { |i|
      mode = i.to_i(8)
    }
    x.on('-c', '--maxmsg=COUNT', Integer, "maximum number of messages") { |i|
      mq_attr ||= POSIX_MQ::Attr.new
      mq_attr.maxmsg = i
    }
    x.on('-s', '--msgsize=BYTES', Integer, "maximum size of message") { |i|
      mq_attr ||= POSIX_MQ::Attr.new
      mq_attr.msgsize = i
    }
  when :wait
    x.on('-t', '--timeout=SECONDS', Float, "timeout in seconds") { |f|
      timeout = f
    }
  when :send, :receive
    conflict = "timeout and nonblock are exclusive"
    x.on('-t', '--timeout=SECONDS', Float, "timeout in seconds") { |f|
      abort conflict if nonblock
      timeout = f
    }
    x.on('-n', '--nonblock', "nonblocking operation") {
      abort conflict if timeout
      nonblock = true
      oflags |= IO::NONBLOCK
    }
    if command == :send
      oflags = IO::WRONLY
      x.on('-p', '--priority=PRIO', Integer, "priority of message") { |i|
        priority = i
      }
    else
      x.on('-p', '--priority', "output priority of message to stderr") {
        priority = $stderr
      }
    end
  end
  x.on('-q', "quiet warnings and errors") { $stderr.reopen("/dev/null", "w") }
  x.on('-h', '--help', 'Show this help message.') { puts x; exit }
  x.parse!
end

trap(:INT) { exit 130 }

unless command == :send || ARGV.empty?
  abort "#{command} accepts no arguments"
end

begin
  if command == :create && mq_attr
    mq_attr.flags = mq_attr.curmsgs = 0
    mq_attr.msgsize && ! mq_attr.maxmsg and
      abort "--maxmsg must be set with --msgsize"
    mq_attr.maxmsg && ! mq_attr.msgsize and
      abort "--msgsize must be set with --maxmsg"
  elsif command == :unlink
    POSIX_MQ.unlink(mqueue)
    exit
  end

  mq = POSIX_MQ.open(mqueue, oflags, mode, mq_attr)
  case command
  when :create
    exit
  when :receive
    buf, prio = mq.receive("", timeout)
    $stderr.syswrite("priority=#{prio}\n") if priority
    $stdout.syswrite(buf)
  when :send
    ARGV << $stdin.read if ARGV.empty?
    ARGV.each { |msg| mq.send(msg, priority, timeout) }
  when :attr
    mq_attr = mq.attr
    $stdout.syswrite(
      "flags=#{mq_attr.flags}\n" \
      "maxmsg=#{mq_attr.maxmsg}\n" \
      "msgsize=#{mq_attr.msgsize}\n" \
      "curmsgs=#{mq_attr.curmsgs}\n")
  when :wait
    trap(:USR1) { exit }

    # we wouldn't get a notification if there were already messages
    exit if mq.attr.curmsgs > 0
    mq.notify = :USR1
    exit if mq.attr.curmsgs > 0 # avoid race condition

    timeout.nil? ? sleep : sleep(timeout)
    exit 2 # timed out
  end
rescue Errno::EEXIST
  abort "Queue exists"
rescue Errno::ENOENT
  abort "Queue does not exist"
rescue Errno::EMSGSIZE
  abort "Message too long"
rescue Errno::EAGAIN
  abort(command == :send ? "Queue full" : "No messages available")
rescue Errno::ETIMEDOUT
  warn "Operation timed out"
  exit 2
rescue => e
  abort e.message
end

git clone https://yhbt.net/ruby_posix_mq.git