1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
| | # -*- encoding: binary -*-
require 'revactor'
require 'fcntl'
Revactor::VERSION >= '0.1.5' or abort 'revactor 0.1.5 is required'
# Enables use of the Actor model through
# {Revactor}[http://revactor.org] under Ruby 1.9. It spawns one
# long-lived Actor for every listen socket in the process and spawns a
# new Actor for every client connection accept()-ed.
# +worker_connections+ will limit the number of client Actors we have
# running at any one time.
#
# Applications using this model are required to be reentrant, but do
# not have to worry about race conditions unless they use threads
# internally. \Rainbows! does not spawn threads under this model.
# Multiple instances of the same app may run in the same address space
# sequentially (but at interleaved points). Any network dependencies
# in the application using this model should be implemented using the
# \Revactor library as well, to take advantage of the networking
# concurrency features this model provides.
module Rainbows::Revactor
# :stopdoc:
RD_ARGS = {}
autoload :Proxy, 'rainbows/revactor/proxy'
include Rainbows::Base
LOCALHOST = Kgio::LOCALHOST
TCP = ::Revactor::TCP::Socket
# once a client is accepted, it is processed in its entirety here
# in 3 easy steps: read request, call app, write app response
def process_client(client) # :nodoc:
io = client.instance_variable_get(:@_io)
io.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC)
rd_args = [ nil ]
remote_addr = if TCP === client
rd_args << RD_ARGS
client.remote_addr
else
LOCALHOST
end
hp = Unicorn::HttpParser.new
buf = hp.buf
begin
until env = hp.parse
buf << client.read(*rd_args)
end
env[CLIENT_IO] = client
env[RACK_INPUT] = 0 == hp.content_length ?
NULL_IO : TeeInput.new(TeeSocket.new(client), hp)
env[REMOTE_ADDR] = remote_addr
status, headers, body = app.call(env.update(RACK_DEFAULTS))
if 100 == status.to_i
client.write(EXPECT_100_RESPONSE)
env.delete(HTTP_EXPECT)
status, headers, body = app.call(env)
end
if hp.headers?
headers = HH.new(headers)
range = make_range!(env, status, headers) and status = range.shift
env = hp.keepalive? && G.alive && G.kato > 0
headers[CONNECTION] = env ? KEEP_ALIVE : CLOSE
client.write(response_header(status, headers))
end
write_body(client, body, range)
end while env && hp.reset.nil?
rescue ::Revactor::TCP::ReadError
rescue => e
Rainbows::Error.write(io, e)
ensure
client.close
end
# runs inside each forked worker, this sits around and waits
# for connections and doesn't die until the parent dies (or is
# given a INT, QUIT, or TERM signal)
def worker_loop(worker) #:nodoc:
init_worker_process(worker)
require 'rainbows/revactor/body'
self.class.__send__(:include, Rainbows::Revactor::Body)
RD_ARGS[:timeout] = G.kato if G.kato > 0
nr = 0
limit = worker_connections
actor_exit = Case[:exit, Actor, Object]
revactorize_listeners.each do |l, close, accept|
Actor.spawn(l, close, accept) do |l, close, accept|
Actor.current.trap_exit = true
l.controller = l.instance_variable_set(:@receiver, Actor.current)
begin
while nr >= limit
l.disable if l.enabled?
logger.info "busy: clients=#{nr} >= limit=#{limit}"
Actor.receive do |f|
f.when(close) {}
f.when(actor_exit) { nr -= 1 }
f.after(0.01) {} # another listener could've gotten an exit
end
end
l.enable unless l.enabled?
Actor.receive do |f|
f.when(close) {}
f.when(actor_exit) { nr -= 1 }
f.when(accept) do |_, _, s|
nr += 1
Actor.spawn_link(s) { |c| process_client(c) }
end
end
rescue => e
Rainbows::Error.listen_loop(e)
end while G.alive
Actor.receive do |f|
f.when(close) {}
f.when(actor_exit) { nr -= 1 }
end while nr > 0
end
end
Actor.sleep 1 while G.tick || nr > 0
rescue Errno::EMFILE
# ignore, let another worker process take it
end
def revactorize_listeners
LISTENERS.map do |s|
case s
when TCPServer
l = ::Revactor::TCP.listen(s, nil)
[ l, T[:tcp_closed, ::Revactor::TCP::Socket],
T[:tcp_connection, l, ::Revactor::TCP::Socket] ]
when UNIXServer
l = ::Revactor::UNIX.listen(s)
[ l, T[:unix_closed, ::Revactor::UNIX::Socket ],
T[:unix_connection, l, ::Revactor::UNIX::Socket] ]
end
end
end
# Revactor Sockets do not implement readpartial, so we emulate just
# enough to avoid mucking with TeeInput internals. Fortunately
# this code is not heavily used so we can usually avoid the overhead
# of adding a userspace buffer.
class TeeSocket
def initialize(socket)
# IO::Buffer is used internally by Rev which Revactor is based on
# so we'll always have it available
@socket, @rbuf = socket, IO::Buffer.new
end
# Revactor socket reads always return an unspecified amount,
# sometimes too much
def kgio_read(length, dst = "")
return dst.replace("") if length == 0
# always check and return from the userspace buffer first
@rbuf.size > 0 and return dst.replace(@rbuf.read(length))
# read off the socket since there was nothing in rbuf
tmp = @socket.read
# we didn't read too much, good, just return it straight back
# to avoid needlessly wasting memory bandwidth
tmp.size <= length and return dst.replace(tmp)
# ugh, read returned too much
@rbuf << tmp[length, tmp.size]
dst.replace(tmp[0, length])
rescue EOFError
end
# just proxy any remaining methods TeeInput may use
def close
@socket.close
end
end
# :startdoc:
end
|