1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
| | # -*- encoding: binary -*-
# :enddoc:
require 'rev'
require 'rainbows/fiber'
require 'rainbows/fiber/io'
module Rainbows::Fiber
module Rev
G = Rainbows::G
# keep-alive timeout class
class Kato < ::Rev::TimerWatcher
def initialize
@watch = []
super(1, true)
end
def <<(fiber)
@watch << fiber
enable unless enabled?
end
def on_timer
@watch.uniq!
while f = @watch.shift
f.resume if f.alive?
end
disable
end
end
class Heartbeat < ::Rev::TimerWatcher
def on_timer
exit if (! G.tick && G.cur <= 0)
end
end
class Sleeper < ::Rev::TimerWatcher
def initialize(seconds)
@f = ::Fiber.current
super(seconds, false)
attach(::Rev::Loop.default)
::Fiber.yield
end
def on_timer
@f.resume
end
end
class Server < ::Rev::IOWatcher
include Unicorn
include Rainbows
include Rainbows::Const
include Rainbows::Response
include Rainbows::Acceptor
FIO = Rainbows::Fiber::IO
def to_io
@io
end
def initialize(io)
@io = io
super(self, :r)
end
def close
detach if attached?
@io.close
end
def on_readable
return if G.cur >= MAX
c = accept(@io) and ::Fiber.new { process(c) }.resume
end
def process(io)
G.cur += 1
client = FIO.new(io, ::Fiber.current)
buf = client.read_timeout or return
hp = HttpParser.new
env = {}
remote_addr = Rainbows.addr(io)
begin # loop
buf << (client.read_timeout or return) until hp.headers(env, buf)
env[CLIENT_IO] = client
env[RACK_INPUT] = 0 == hp.content_length ?
HttpRequest::NULL_IO : TeeInput.new(client, env, hp, buf)
env[REMOTE_ADDR] = remote_addr
status, headers, body = APP.call(env.update(RACK_DEFAULTS))
if 100 == status.to_i
client.write(EXPECT_100_RESPONSE)
env.delete(HTTP_EXPECT)
status, headers, body = APP.call(env)
end
if hp.headers?
headers = HH.new(headers)
range = make_range!(env, status, headers) and status = range.shift
headers[CONNECTION] = if hp.keepalive? && G.alive
KEEP_ALIVE
else
env = false
CLOSE
end
client.write(response_header(status, headers))
end
write_body(client, body, range)
end while env && env.clear && hp.reset.nil?
rescue => e
Error.write(io, e)
ensure
G.cur -= 1
client.close
end
end
end
class IO # see rainbows/fiber/io for original definition
class Watcher < ::Rev::IOWatcher
def initialize(fio, flag)
@fiber = fio.f
super(fio, flag)
attach(::Rev::Loop.default)
end
def on_readable
@fiber.resume
end
alias on_writable on_readable
end
undef_method :wait_readable
undef_method :wait_writable
undef_method :close
def initialize(*args)
super
@r = @w = false
end
def close
@w.detach if @w
@r.detach if @r
@r = @w = false
to_io.close unless to_io.closed?
end
def wait_writable
@w ||= Watcher.new(self, :w)
@w.enable unless @w.enabled?
::Fiber.yield
@w.disable
end
def wait_readable
@r ||= Watcher.new(self, :r)
@r.enable unless @r.enabled?
KATO << f
::Fiber.yield
@r.disable
end
end
end
|