1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
| | # -*- encoding: binary -*-
# :stopdoc:
module Rainbows::XEpollThreadSpawn::Client
Rainbows.config!(self, :keepalive_timeout, :client_header_buffer_size)
N = Raindrops.new(1)
ACCEPTORS = Rainbows::HttpServer::LISTENERS.dup
extend Rainbows::WorkerYield
def self.included(klass) # included in Rainbows::Client
max = Rainbows.server.worker_connections
ACCEPTORS.map! do |sock|
Thread.new do
buf = ""
begin
if io = sock.kgio_accept(klass)
N.incr(0, 1)
io.epoll_once(buf)
end
worker_yield while N[0] >= max
rescue => e
Rainbows::Error.listen_loop(e)
end while Rainbows.alive
end
end
end
ep = SleepyPenguin::Epoll
EP = ep.new
IN = ep::IN | ep::ONESHOT
KATO = {}
KATO.compare_by_identity if KATO.respond_to?(:compare_by_identity)
LOCK = Mutex.new
Rainbows.at_quit do
clients = nil
LOCK.synchronize { clients = KATO.keys; KATO.clear }
clients.each { |io| io.closed? or io.shutdown }
end
@@last_expire = Rainbows.now
def kato_set
LOCK.synchronize { KATO[self] = @@last_expire }
EP.set(self, IN)
end
def kato_delete
LOCK.synchronize { KATO.delete self }
end
def self.loop
buf = ""
begin
EP.wait(nil, 1000) { |_, obj| obj.epoll_run(buf) }
expire
rescue Errno::EINTR
rescue => e
Rainbows::Error.listen_loop(e)
end while Rainbows.tick || N[0] > 0
Rainbows::JoinThreads.acceptors(ACCEPTORS)
end
def self.expire
return if ((now = Rainbows.now) - @@last_expire) < 1.0
if (ot = KEEPALIVE_TIMEOUT) >= 0
ot = now - ot
defer = []
LOCK.synchronize do
KATO.delete_if { |client, time| time < ot and defer << client }
end
defer.each { |io| io.closed? or io.close }
end
@@last_expire = now
end
def epoll_once(buf)
@hp = Rainbows::HttpParser.new
epoll_run(buf)
end
def close
super
kato_delete
N.decr(0, 1)
nil
end
def handle_error(e)
super
ensure
closed? or close
end
def epoll_run(buf)
case kgio_tryread(CLIENT_HEADER_BUFFER_SIZE, buf)
when :wait_readable
return kato_set
when String
kato_delete
env = @hp.add_parse(buf) and return spawn(env, @hp)
else
return close
end while true
rescue => e
handle_error(e)
end
def spawn(env, hp)
Thread.new { process_pipeline(env, hp) }
end
def pipeline_ready(hp)
hp.parse and return true
case buf = kgio_tryread(CLIENT_HEADER_BUFFER_SIZE)
when :wait_readable
kato_set
return false
when String
hp.add_parse(buf) and return true
# continue loop
else
return close
end while true
end
end
|