1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
| | # Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> and all contributors
# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
require 'thread'
# only initialize this after forking, this is highly volatile and won't
# be able to share data across processes at all.
# This is really a singleton
class Yahns::Fdmap # :nodoc:
def initialize(logger, client_expire_threshold)
@logger = logger
if Float === client_expire_threshold
client_expire_threshold *= Process.getrlimit(:NOFILE)[0]
elsif client_expire_threshold < 0
client_expire_threshold = Process.getrlimit(:NOFILE)[0] +
client_expire_threshold
end
@client_expire_threshold = client_expire_threshold.to_i
# This is an array because any sane OS will frequently reuse FDs
# to keep this tightly-packed and favor lower FD numbers
# (consider select(2) performance (not that we use select))
# An (unpacked) Hash (in MRI) uses 5 more words per entry than an Array,
# and we should expect this array to have around 60K elements
@fdmap_ary = []
@fdmap_mtx = Mutex.new
@last_expire = 0.0
@count = 0
end
# Yes, we call IO#close inside the lock(!)
#
# We don't want to race with __expire. Theoretically, a Ruby
# implementation w/o GVL may end up issuing shutdown(2) on the same fd
# as one which got accept-ed (a brand new IO object) so we must prevent
# IO#close in worker threads from racing with any threads which may run
# __expire
#
# We must never, ever call this while it is capable of being on the
# epoll ready list and returnable by epoll_wait. So we can only call
# this on objects which were epoll_ctl-ed with EPOLLONESHOT (and now
# retrieved).
def sync_close(io)
@fdmap_mtx.synchronize do
@count -= 1
io.close
end
end
# called immediately after accept()
def add(io)
fd = io.fileno
@fdmap_mtx.synchronize do
if (@count += 1) > @client_expire_threshold
__expire(nil)
end
@fdmap_ary[fd] = io
end
end
# this is only called in Errno::EMFILE/Errno::ENFILE situations
# and graceful shutdown
def desperate_expire(timeout)
@fdmap_mtx.synchronize { __expire(timeout) }
end
# only called on hijack
def forget(io)
fd = io.fileno
@fdmap_mtx.synchronize do
# prevent rack.hijacked IOs from being expired by us
@fdmap_ary[fd] = nil
@count -= 1
end
end
# expire a bunch of idle clients and register the current one
# We should not be calling this too frequently, it is expensive
# This is called while @fdmap_mtx is held
def __expire(timeout)
nr = 0
now = Time.now.to_f
(now - @last_expire) >= 1.0 or return # don't expire too frequently
# @fdmap_ary may be huge, so always expire a bunch at once to
# avoid getting to this method too frequently
@fdmap_ary.each do |c|
c.respond_to?(:yahns_expire) or next
ctimeout = c.class.client_timeout
tout = (timeout && timeout < ctimeout) ? timeout : ctimeout
nr += c.yahns_expire(tout)
end
@last_expire = Time.now.to_f
msg = timeout ? "timeout=#{timeout}" : "client_timeout"
@logger.info("dropping #{nr} of #@count clients for #{msg}")
end
# used for graceful shutdown
def size
@fdmap_mtx.synchronize { @count }
end
end
|