1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
| | #!/usr/bin/env ruby
# -*- encoding: binary -*-
# -*- frozen_string_literal: true -*-
$stderr.sync = $stdout.sync = true
$stdout.binmode
$stderr.binmode
$stdin.binmode
require 'posix_mq'
require 'optparse'
commands = %w(create attr send receive wait unlink)
usage = "Usage: MQUEUE=/name #{File.basename($0)} COMMAND " \
"[options] [<arguments>]\n" \
"COMMAND may be one of: #{commands.join(', ')}"
mqueue = ENV["MQUEUE"] or abort usage
command = ARGV.shift or abort usage
commands.include?(command) or abort usage
priority = nil
timeout = nil
mode = 0666
oflags = IO::RDONLY
mq_attr = nil
nonblock = false
command = command.to_sym
ARGV.options do |x|
x.banner = usage.split(/\n/).first.gsub(/COMMAND/, command.to_s)
x.separator ''
case command
when :create
oflags |= IO::CREAT
x.on('-x', '--exclusive', "exclusive create") {
oflags |= IO::EXCL
}
x.on('-m', '--mode=MODE', "octal file mode") { |i|
mode = i.to_i(8)
}
x.on('-c', '--maxmsg=COUNT', Integer, "maximum number of messages") { |i|
mq_attr ||= POSIX_MQ::Attr.new
mq_attr.maxmsg = i
}
x.on('-s', '--msgsize=BYTES', Integer, "maximum size of message") { |i|
mq_attr ||= POSIX_MQ::Attr.new
mq_attr.msgsize = i
}
when :wait
x.on('-t', '--timeout=SECONDS', Float, "timeout in seconds") { |f|
timeout = f
}
when :send, :receive
conflict = "timeout and nonblock are exclusive"
x.on('-t', '--timeout=SECONDS', Float, "timeout in seconds") { |f|
abort conflict if nonblock
timeout = f
}
x.on('-n', '--nonblock', "nonblocking operation") {
abort conflict if timeout
nonblock = true
oflags |= IO::NONBLOCK
}
if command == :send
oflags = IO::WRONLY
x.on('-p', '--priority=PRIO', Integer, "priority of message") { |i|
priority = i
}
else
x.on('-p', '--priority', "output priority of message to stderr") {
priority = $stderr
}
end
end
x.on('-q', "quiet warnings and errors") { $stderr.reopen("/dev/null", "wb") }
x.on('-h', '--help', 'Show this help message.') { puts x; exit }
x.parse!
end
trap(:INT) { exit 130 }
unless command == :send || ARGV.empty?
abort "#{command} accepts no arguments"
end
begin
if command == :create && mq_attr
mq_attr.flags = mq_attr.curmsgs = 0
mq_attr.msgsize && ! mq_attr.maxmsg and
abort "--maxmsg must be set with --msgsize"
mq_attr.maxmsg && ! mq_attr.msgsize and
abort "--msgsize must be set with --maxmsg"
elsif command == :unlink
POSIX_MQ.unlink(mqueue)
exit
end
mq = POSIX_MQ.open(mqueue, oflags, mode, mq_attr)
case command
when :create
exit
when :receive
buf, prio = mq.receive(nil, timeout)
$stderr.write("priority=#{prio}\n") if priority
$stdout.write(buf)
when :send
ARGV << $stdin.read if ARGV.empty?
ARGV.each { |msg| mq.send(msg, priority, timeout) }
when :attr
mq_attr = mq.attr
$stdout.write(
"flags=#{mq_attr.flags}\n" \
"maxmsg=#{mq_attr.maxmsg}\n" \
"msgsize=#{mq_attr.msgsize}\n" \
"curmsgs=#{mq_attr.curmsgs}\n")
when :wait
trap(:USR1) { exit }
# we wouldn't get a notification if there were already messages
exit if mq.attr.curmsgs > 0
mq.notify = :USR1
exit if mq.attr.curmsgs > 0 # avoid race condition
timeout.nil? ? sleep : sleep(timeout)
exit 2 # timed out
end
rescue Errno::EEXIST
abort "Queue exists"
rescue Errno::ENOENT
abort "Queue does not exist"
rescue Errno::EMSGSIZE
abort "Message too long"
rescue Errno::EAGAIN
abort(command == :send ? "Queue full" : "No messages available")
rescue Errno::ETIMEDOUT
warn "Operation timed out"
exit 2
rescue => e
abort e.message
end
|