1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
| | # -*- encoding: binary -*-
# :enddoc:
module Rainbows::Epoll::Client
include Rainbows::EvCore
APP = Rainbows.server.app
Server = Rainbows::Epoll::Server
IN = SleepyPenguin::Epoll::IN | SleepyPenguin::Epoll::ONESHOT
OUT = SleepyPenguin::Epoll::OUT | SleepyPenguin::Epoll::ONESHOT
EPINOUT = IN | OUT
KATO = {}
KATO.compare_by_identity if KATO.respond_to?(:compare_by_identity)
Rainbows.at_quit { KATO.each_key { |k| k.timeout! }.clear }
Rainbows.config!(self, :keepalive_timeout)
EP = Rainbows::EP
@@last_expire = Time.now
def self.expire
return if ((now = Time.now) - @@last_expire) < 1.0
if (ot = KEEPALIVE_TIMEOUT) >= 0
ot = now - ot
KATO.delete_if { |client, time| time < ot and client.timeout! }
end
@@last_expire = now
end
def self.loop
begin
EP.wait(nil, 1000) { |_, obj| obj.epoll_run }
expire
rescue Errno::EINTR
rescue => e
Rainbows::Error.listen_loop(e)
end while Rainbows.tick || Server.nr > 0
end
# only call this once
def epoll_once
@wr_queue = [] # may contain String, ResponsePipe, and StreamFile objects
post_init
on_readable
rescue => e
handle_error(e)
end
def on_readable
case rv = kgio_tryread(CLIENT_HEADER_BUFFER_SIZE, RBUF)
when String
on_read(rv)
return if @wr_queue[0] || closed?
return hijacked if @hp.hijacked?
when :wait_readable
KATO[self] = @@last_expire if :headers == @state
return EP.set(self, IN)
else
break
end until :close == @state
close unless closed?
rescue Errno::ECONNRESET
close
rescue IOError
end
def app_call input # called by on_read()
@env['rack.input'] = input
@env['REMOTE_ADDR'] = kgio_addr
@hp.hijack_setup(self)
status, headers, body = APP.call(@env.merge!(RACK_DEFAULTS))
return hijacked if @hp.hijacked?
ev_write_response(status, headers, body, @hp.next?)
end
def write_response_path(status, headers, body, alive)
io = body_to_io(body)
st = io.stat
if st.file?
defer_file(status, headers, body, alive, io, st)
elsif st.socket? || st.pipe?
chunk = stream_response_headers(status, headers, alive, body)
return hijacked if nil == chunk
stream_response_body(body, io, chunk)
else
# char or block device... WTF?
write_response(status, headers, body, alive)
end
end
# used for streaming sockets and pipes
def stream_response_body(body, io, chunk)
pipe = (chunk ? Rainbows::Epoll::ResponseChunkPipe :
Rainbows::Epoll::ResponsePipe).new(io, self, body)
return @wr_queue << pipe if @wr_queue[0]
stream_pipe(pipe) or return
@wr_queue[0] or @wr_queue << ''.freeze
end
def ev_write_response(status, headers, body, alive)
@state = alive ? :headers : :close
if body.respond_to?(:to_path)
write_response_path(status, headers, body, alive)
else
write_response(status, headers, body, alive)
end
return hijacked if @hp.hijacked?
# try to read more if we didn't have to buffer writes
next_request if alive && 0 == @wr_queue.size
end
def hijacked
KATO.delete(self)
Server.decr # no other place to do this
EP.delete(self)
nil
end
def next_request
if 0 == @buf.size
want_more
else
# pipelined request (already in buffer)
on_read(''.freeze)
return if @wr_queue[0] || closed?
return hijacked if @hp.hijacked?
close if :close == @state
end
end
def epoll_run
if @wr_queue[0]
on_writable
else
KATO.delete self
on_readable
end
end
def want_more
EP.set(self, EPINOUT)
end
def on_deferred_write_complete
:close == @state and return close
next_request
end
def handle_error(e)
msg = Rainbows::Error.response(e) and kgio_trywrite(msg) rescue nil
ensure
close
end
def write_deferred(obj)
Rainbows::StreamFile === obj ? stream_file(obj) : stream_pipe(obj)
end
# writes until our write buffer is empty or we block
# returns true if we're done writing everything
def on_writable
obj = @wr_queue.shift
case rv = String === obj ? kgio_trywrite(obj) : write_deferred(obj)
when nil
obj = @wr_queue.shift or return on_deferred_write_complete
when String
obj = rv # retry
when :wait_writable # Strings and StreamFiles only
@wr_queue.unshift(obj)
EP.set(self, OUT)
return
when :deferred
return
end while true
rescue => e
handle_error(e)
end
def write(buf)
unless @wr_queue[0]
case rv = kgio_trywrite(buf)
when nil
return # all written
when String
buf = rv # retry
when :wait_writable
@wr_queue << buf.dup # >3-word 1.9 strings are copy-on-write
return EP.set(self, OUT)
end while true
end
@wr_queue << buf.dup # >3-word 1.9 strings are copy-on-write
end
def close
@wr_queue.each { |x| x.respond_to?(:close) and x.close rescue nil }
super
on_close
end
def on_close
KATO.delete(self)
Server.decr
end
def timeout!
shutdown
true
end
# Rack apps should not hijack here, but they may...
def defer_file(status, headers, body, alive, io, st)
if r = sendfile_range(status, headers)
status, headers, range = r
write_headers(status, headers, alive, body) or return hijacked
range and defer_file_stream(range[0], range[1], io, body)
else
write_headers(status, headers, alive, body) or return hijacked
defer_file_stream(0, st.size, io, body)
end
end
# returns +nil+ on EOF, :wait_writable if the client blocks
def stream_file(sf) # +sf+ is a Rainbows::StreamFile object
case n = trysendfile(sf, sf.offset, sf.count)
when Integer
sf.offset += n
0 == (sf.count -= n) and return sf.close
else
return n # :wait_writable or nil
end while true
rescue
sf.close
raise
end
def defer_file_stream(offset, count, io, body)
sf = Rainbows::StreamFile.new(offset, count, io, body)
unless @wr_queue[0]
stream_file(sf) or return
end
@wr_queue << sf
EP.set(self, OUT)
end
# this alternates between a push and pull model from the pipe -> client
# to avoid having too much data in userspace on either end.
def stream_pipe(pipe)
case buf = pipe.tryread
when String
write(buf)
if @wr_queue[0]
# client is blocked on write, client will pull from pipe later
EP.delete pipe
@wr_queue << pipe
EP.set(self, OUT)
return :deferred
end
# continue looping...
when :wait_readable
# pipe blocked on read, let the pipe push to the client in the future
EP.delete self
EP.set(pipe, IN)
return :deferred
else # nil => EOF
return pipe.close # nil
end while true
rescue
pipe.close
raise
end
end
|