1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
| | # -*- encoding: binary -*-
require "socket"
#
#
# This module is used to extend TCPServer and Kgio::TCPServer objects
# and aggregate +last_data_recv+ times for all accepted clients. It
# is designed to be used with Raindrops::LastDataRecv Rack application
# but can be easily changed to work with other stats collection devices.
#
# Methods wrapped include:
# - TCPServer#accept
# - TCPServer#accept_nonblock
# - Socket#accept
# - Socket#accept_nonblock
# - Kgio::TCPServer#kgio_accept
# - Kgio::TCPServer#kgio_tryaccept
module Raindrops::Aggregate::LastDataRecv
# The integer value of +last_data_recv+ is sent to this object.
# This is usually a duck type compatible with the \Aggregate class,
# but can be *anything* that accepts the *<<* method.
attr_accessor :raindrops_aggregate
@@default_aggregate = nil
# By default, this is a Raindrops::Aggregate::PMQ object
# It may be anything that responds to *<<*
def self.default_aggregate
@@default_aggregate ||= Raindrops::Aggregate::PMQ.new
end
# Assign any object that responds to *<<*
def self.default_aggregate=(agg)
@@default_aggregate = agg
end
# automatically extends any TCPServer objects used by Unicorn
def self.cornify!
Unicorn::HttpServer::LISTENERS.each do |s|
if TCPServer === s || (s.instance_of?(Socket) && s.local_address.ip?)
s.extend(self)
end
end
end
# each extended object needs to have TCP_DEFER_ACCEPT enabled
# for accuracy.
def self.extended(obj)
obj.raindrops_aggregate = default_aggregate
# obj.setsockopt Socket::SOL_TCP, tcp_defer_accept = 9, seconds = 60
obj.setsockopt Socket::SOL_TCP, 9, 60
end
# :stopdoc:
def kgio_tryaccept(*args)
count! super
end
def kgio_accept(*args)
count! super
end
def accept
count! super
end
def accept_nonblock(exception: true)
count! super(exception: exception)
end
# :startdoc:
# The +last_data_recv+ member of Raindrops::TCP_Info can be used to
# infer the time a client spent in the listen queue before it was
# accepted.
#
# We require TCP_DEFER_ACCEPT on the listen socket for
# +last_data_recv+ to be accurate
def count!(ret)
case ret
when :wait_readable
when Array # Socket#accept_nonblock
io = ret[0]
else # TCPSocket#accept_nonblock
io = ret
end
if io
x = Raindrops::TCP_Info.new(io)
@raindrops_aggregate << x.last_data_recv
end
ret
end
end
|