1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
| | # -*- encoding: binary -*-
require 'revactor'
Revactor::VERSION >= '0.1.5' or abort 'revactor 0.1.5 is required'
module Rainbows
# Enables use of the Actor model through
# {Revactor}[http://revactor.org] under Ruby 1.9. It spawns one
# long-lived Actor for every listen socket in the process and spawns a
# new Actor for every client connection accept()-ed.
# +worker_connections+ will limit the number of client Actors we have
# running at any one time.
#
# Applications using this model are required to be reentrant, but do
# not have to worry about race conditions unless they use threads
# internally. \Rainbows! does not spawn threads under this model.
# Multiple instances of the same app may run in the same address space
# sequentially (but at interleaved points). Any network dependencies
# in the application using this model should be implemented using the
# \Revactor library as well, to take advantage of the networking
# concurrency features this model provides.
module Revactor
require 'rainbows/revactor/tee_input'
include Base
# once a client is accepted, it is processed in its entirety here
# in 3 easy steps: read request, call app, write app response
def process_client(client)
buf = client.read or return # this probably does not happen...
hp = HttpParser.new
env = {}
alive = true
remote_addr = ::Revactor::TCP::Socket === client ?
client.remote_addr : LOCALHOST
begin
while ! hp.headers(env, buf)
buf << client.read
end
env[Const::RACK_INPUT] = 0 == hp.content_length ?
HttpRequest::NULL_IO :
Rainbows::Revactor::TeeInput.new(client, env, hp, buf)
env[Const::REMOTE_ADDR] = remote_addr
response = app.call(env.update(RACK_DEFAULTS))
if 100 == response.first.to_i
client.write(Const::EXPECT_100_RESPONSE)
env.delete(Const::HTTP_EXPECT)
response = app.call(env)
end
alive = hp.keepalive? && G.alive
out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if hp.headers?
HttpResponse.write(client, response, out)
end while alive and hp.reset.nil? and env.clear
client.close
rescue => e
handle_error(client, e)
end
# runs inside each forked worker, this sits around and waits
# for connections and doesn't die until the parent dies (or is
# given a INT, QUIT, or TERM signal)
def worker_loop(worker)
init_worker_process(worker)
root = Actor.current
root.trap_exit = true
limit = worker_connections
revactorize_listeners!
clients = {}
listeners = LISTENERS.map do |s|
Actor.spawn(s) do |l|
begin
while clients.size >= limit
logger.info "busy: clients=#{clients.size} >= limit=#{limit}"
Actor.receive { |filter| filter.when(:resume) {} }
end
actor = Actor.spawn(l.accept) { |c| process_client(c) }
clients[actor.object_id] = actor
root.link(actor)
rescue Errno::EAGAIN, Errno::ECONNABORTED
rescue Object => e
listen_loop_error(e)
end while G.alive
end
end
begin
Actor.receive do |filter|
filter.after(1) { G.tick }
filter.when(Case[:exit, Actor, Object]) do |_,actor,_|
orig = clients.size
clients.delete(actor.object_id)
orig >= limit and listeners.each { |l| l << :resume }
G.tick
end
end
end while G.alive || clients.size > 0
end
# if we get any error, try to write something back to the client
# assuming we haven't closed the socket, but don't get hung up
# if the socket is already closed or broken. We'll always ensure
# the socket is closed at the end of this function
def handle_error(client, e)
msg = case e
when EOFError,Errno::ECONNRESET,Errno::EPIPE,Errno::EINVAL,Errno::EBADF
Const::ERROR_500_RESPONSE
when HttpParserError # try to tell the client they're bad
Const::ERROR_400_RESPONSE
else
logger.error "Read error: #{e.inspect}"
logger.error e.backtrace.join("\n")
Const::ERROR_500_RESPONSE
end
client.instance_eval do
# this is Revactor implementation dependent
@_io.write_nonblock(msg)
close
end
rescue
nil
end
def revactorize_listeners!
LISTENERS.map! do |s|
case s
when TCPServer
::Revactor::TCP.listen(s, nil)
when UNIXServer
::Revactor::UNIX.listen(s)
end
end
end
end
end
|