1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
| | # -*- encoding: binary -*-
# :enddoc:
# FIXME: lots of duplication from xepolll_thread_spawn/client
module Rainbows::XEpollThreadPool::Client
Rainbows.config!(self, :keepalive_timeout, :client_header_buffer_size)
N = Raindrops.new(1)
ACCEPTORS = Rainbows::HttpServer::LISTENERS.dup
extend Rainbows::WorkerYield
def self.included(klass) # included in Rainbows::Client
max = Rainbows.server.worker_connections
ACCEPTORS.map! do |sock|
Thread.new do
buf = ""
begin
if io = sock.kgio_accept(klass)
N.incr(0, 1)
io.epoll_once(buf)
end
worker_yield while N[0] >= max
rescue => e
Rainbows::Error.listen_loop(e)
end while Rainbows.alive
end
end
end
def self.app_run(queue)
while client = queue.pop
client.run
end
end
QUEUE = Queue.new
Rainbows::O[:pool_size].times { Thread.new { app_run(QUEUE) } }
ep = SleepyPenguin::Epoll
EP = ep.new
IN = ep::IN | ep::ONESHOT
KATO = {}.compare_by_identity
LOCK = Mutex.new
Rainbows.at_quit do
clients = nil
LOCK.synchronize { clients = KATO.keys; KATO.clear }
clients.each { |io| io.closed? or io.close }
end
@@last_expire = Rainbows.now
def kato_set
LOCK.synchronize { KATO[self] = @@last_expire }
EP.set(self, IN)
end
def kato_delete
LOCK.synchronize { KATO.delete self }
end
def self.loop
buf = ""
begin
EP.wait(nil, 1000) { |_, obj| obj.epoll_run(buf) }
expire
rescue Errno::EINTR
rescue => e
Rainbows::Error.listen_loop(e)
end while Rainbows.tick || N[0] > 0
Rainbows::JoinThreads.acceptors(ACCEPTORS)
end
def self.expire
return if ((now = Rainbows.now) - @@last_expire) < 1.0
if (ot = KEEPALIVE_TIMEOUT) >= 0
ot = now - ot
defer = []
LOCK.synchronize do
KATO.delete_if { |client, time| time < ot and defer << client }
end
defer.each { |io| io.closed? or io.shutdown }
end
@@last_expire = now
end
def epoll_once(buf)
@hp = Rainbows::HttpParser.new
epoll_run(buf)
end
def close
super
kato_delete
N.decr(0, 1)
nil
end
def handle_error(e)
super
ensure
closed? or close
end
def queue!
QUEUE << self
false
end
def epoll_run(buf)
case kgio_tryread(CLIENT_HEADER_BUFFER_SIZE, buf)
when :wait_readable
return kato_set
when String
kato_delete
@hp.add_parse(buf) and return queue!
else
return close
end while true
rescue => e
handle_error(e)
end
def run
process_pipeline(@hp.env, @hp)
end
def pipeline_ready(hp)
# be fair to other clients, let others run first
hp.parse and return queue!
epoll_run("")
false
end
end
|