1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
| | # -*- encoding: binary -*-
require 'rainbows/ev_core'
module Rainbows
module Rev
class Client < ::Rev::IO
include Rainbows::EvCore
G = Rainbows::G
def initialize(io)
CONN[self] = false
super(io)
post_init
@deferred_bodies = [] # for (fast) regular files only
end
# queued, optional response bodies, it should only be unpollable "fast"
# devices where read(2) is uninterruptable. Unfortunately, NFS and ilk
# are also part of this. We'll also stick DeferredResponse bodies in
# here to prevent connections from being closed on us.
def defer_body(io, out_headers)
@deferred_bodies << io
schedule_write unless out_headers # triggers a write
end
def timeout?
@_write_buffer.empty? && @deferred_bodies.empty? and close.nil?
end
def app_call
begin
KATO.delete(self)
@env[RACK_INPUT] = @input
@env[REMOTE_ADDR] = @remote_addr
response = APP.call(@env.update(RACK_DEFAULTS))
alive = @hp.keepalive? && G.alive
out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if @hp.headers?
DeferredResponse.write(self, response, out)
if alive
@env.clear
@hp.reset
@state = :headers
# keepalive requests are always body-less, so @input is unchanged
@hp.headers(@env, @buf) and next
KATO[self] = Time.now
else
quit
end
return
end while true
end
def on_write_complete
if body = @deferred_bodies.first
return if DeferredResponse === body
begin
begin
write(body.sysread(CHUNK_SIZE))
rescue EOFError # expected at file EOF
@deferred_bodies.shift
body.close
close if :close == @state && @deferred_bodies.empty?
end
rescue => e
handle_error(e)
end
else
close if :close == @state
end
end
def on_close
CONN.delete(self)
end
end # module Client
end # module Rev
end # module Rainbows
|