1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
| | # -*- encoding: binary -*-
# Copyright (C) 2013-2016 all contributors <yahns-public@yhbt.net>
# License: GPL-3.0+ (see COPYING for details)
# frozen_string_literal: true
require_relative 'client_expire_tcpi'
require_relative 'client_expire_generic'
module Yahns::Acceptor # :nodoc:
def __ac_quit_done?
@thrs.delete_if do |t|
begin
t.join(0.01)
rescue
! t.alive?
end
end
return false if @thrs[0]
close
true
end
# just keep looping this on every acceptor until the associated thread dies
def ac_quit
unless defined?(@thrs) # acceptor has not started yet, freshly inherited
close
return true
end
@thrs.each { |t| t[:yahns_quit] = true }
return true if __ac_quit_done?
@thrs.each do
begin
# try to connect to kick it out of the blocking accept() syscall
killer = Kgio::Socket.start(getsockname)
killer.write("G") # first byte of "GET / HTTP/1.0\r\n\r\n"
ensure
killer.close if killer
end
end
false # now hope __ac_quit_done? is true next time around
rescue SystemCallError
return __ac_quit_done?
end
def spawn_acceptor(nr, logger, client_class)
@thrs = nr.times.map do
Thread.new do
queue = client_class.queue
t = Thread.current
accept_flags = Kgio::SOCK_NONBLOCK | Kgio::SOCK_CLOEXEC
qev_flags = client_class.superclass::QEV_FLAGS
begin
# We want the accept/accept4 syscall to be _blocking_
# so it can distribute work evenly between processes
if client = kgio_accept(client_class, accept_flags)
client.yahns_init
# it is not safe to touch client in this thread after this,
# a worker thread may grab client right away
queue.queue_add(client, qev_flags)
end
rescue Errno::EMFILE, Errno::ENFILE => e
logger.error("#{e.message}, consider raising open file limits")
queue.fdmap.desperate_expire(5)
sleep 1 # let other threads do some work
rescue => e
Yahns::Log.exception(logger, "accept loop", e)
end until t[:yahns_quit]
end
end
end
def expire_mod
(TCPServer === self && Yahns.const_defined?(:ClientExpireTCPI)) ?
Yahns::ClientExpireTCPI : Yahns::ClientExpireGeneric
end
end
|