1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
| | # -*- encoding: binary -*-
# :enddoc:
class Rainbows::EventMachine::Client < EM::Connection
include Rainbows::EvCore
Rainbows.config!(self, :keepalive_timeout)
def initialize(io)
@_io = io
@deferred = nil
end
alias write send_data
def receive_data(data)
# To avoid clobbering the current streaming response
# (often a static file), we do not attempt to process another
# request on the same connection until the first is complete
if @deferred
if data
@buf << data
@_io.shutdown(Socket::SHUT_RD) if @buf.size > 0x1c000
end
EM.next_tick { receive_data(nil) } unless @buf.empty?
else
on_read(data || Z) if (@buf.size > 0) || data
end
end
def quit
super
close_connection_after_writing if nil == @deferred
end
def app_call input
set_comm_inactivity_timeout 0
@env[RACK_INPUT] = input
@env[REMOTE_ADDR] = @_io.kgio_addr
@env[ASYNC_CALLBACK] = method(:write_async_response)
@env[ASYNC_CLOSE] = EM::DefaultDeferrable.new
status, headers, body = catch(:async) {
APP.call(@env.merge!(RACK_DEFAULTS))
}
(nil == status || -1 == status) ? @deferred = true :
ev_write_response(status, headers, body, @hp.next?)
end
def deferred_errback(orig_body)
@deferred.errback do
orig_body.close if orig_body.respond_to?(:close)
@deferred = nil
quit
end
end
def deferred_callback(orig_body, alive)
@deferred.callback do
orig_body.close if orig_body.respond_to?(:close)
@deferred = nil
alive ? receive_data(nil) : quit
end
end
def ev_write_response(status, headers, body, alive)
@state = :headers if alive
if body.respond_to?(:errback) && body.respond_to?(:callback)
@deferred = body
write_headers(status, headers, alive)
write_body_each(body)
deferred_errback(body)
deferred_callback(body, alive)
return
elsif body.respond_to?(:to_path)
st = File.stat(path = body.to_path)
if st.file?
write_headers(status, headers, alive)
@deferred = stream_file_data(path)
deferred_errback(body)
deferred_callback(body, alive)
return
elsif st.socket? || st.pipe?
io = body_to_io(@deferred = body)
chunk = stream_response_headers(status, headers, alive)
m = chunk ? Rainbows::EventMachine::ResponseChunkPipe :
Rainbows::EventMachine::ResponsePipe
return EM.watch(io, m, self).notify_readable = true
end
# char or block device... WTF? fall through to body.each
end
write_response(status, headers, body, alive)
if alive
if @deferred.nil?
if @buf.empty?
set_comm_inactivity_timeout(KEEPALIVE_TIMEOUT)
else
EM.next_tick { receive_data(nil) }
end
end
else
quit unless @deferred
end
end
def next!
@deferred.close if @deferred.respond_to?(:close)
@deferred = nil
@hp.keepalive? ? receive_data(nil) : quit
end
def unbind
async_close = @env[ASYNC_CLOSE] and async_close.succeed
@deferred.respond_to?(:fail) and @deferred.fail
begin
@_io.close
rescue Errno::EBADF
# EventMachine's EventableDescriptor::Close() may close
# the underlying file descriptor without invalidating the
# associated IO object on errors, so @_io.closed? isn't
# sufficient.
end
end
end
|