1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
| | # -*- encoding: binary -*-
module Rainbows
# Spawns a new thread for every client connection we accept(). This
# model is recommended for platforms where spawning threads is
# inexpensive.
#
# If you're connecting to external services and need to perform DNS
# lookups, consider using the "resolv-replace" library which replaces
# parts of the core Socket package with concurrent DNS lookup
# capabilities
module ThreadSpawn
include Base
def worker_loop(worker)
init_worker_process(worker)
threads = ThreadGroup.new
alive = worker.tmp
nr = 0
limit = worker_connections
# closing anything we IO.select on will raise EBADF
trap(:USR1) { reopen_worker_logs(worker.nr) rescue nil }
trap(:QUIT) { LISTENERS.map! { |s| s.close rescue nil } }
[:TERM, :INT].each { |sig| trap(sig) { exit(0) } } # instant shutdown
logger.info "worker=#{worker.nr} ready with ThreadSpawn"
while alive && master_pid == Process.ppid
ret = begin
alive.chmod(nr += 1)
IO.select(LISTENERS, nil, nil, timeout/2.0) or next
rescue Errno::EINTR
retry
rescue Errno::EBADF
alive = false
end
ret.first.each do |l|
while threads.list.size >= limit
nuke_old_thread(threads)
end
c = begin
l.accept_nonblock
rescue Errno::EINTR, Errno::ECONNABORTED
next
end
threads.add(Thread.new(c) { |c| process_client(c) })
end
end
join_spawned_threads(threads)
end
def nuke_old_thread(threads)
threads.list.each do |thr|
next if (Time.now - (thr[:t] || next)) < timeout
thr.kill
logger.error "killed #{thr.inspect} for being too old"
return
end
# nothing to kill, yield to another thread
Thread.pass
end
def join_spawned_threads(threads)
logger.info "Joining spawned threads..."
t0 = Time.now
timeleft = timeout
threads.list.each { |thr|
thr.join(timeleft)
timeleft -= (Time.now - t0)
}
logger.info "Done joining spawned threads."
end
end
end
|