1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
| | # -*- encoding: binary -*-
# :enddoc:
require 'socket'
require 'thread'
require 'uri'
require 'kcar' # http://bogomips.org/kcar/ -- gem install kcar
# This is lightly tested and has an unstable configuration interface.
# ***** Do not rely on anything under the ReverseProxy namespace! *****
#
# A reverse proxy implementation for \Rainbows! It is a Rack application
# compatible and optimized for most \Rainbows! concurrency models.
#
# It makes HTTP/1.0 connections without keepalive to backends, so
# it is only recommended for proxying to upstreams on the same LAN
# or machine. It can proxy to TCP hosts as well as UNIX domain sockets.
#
# Currently it only does simple round-robin balancing and does not
# know to retry connections from failed backends.
#
# Buffering-behavior is currently dependent on the concurrency model selected:
#
# Fully-buffered (uploads and response bodies):
# Coolio, EventMachine, NeverBlock, CoolioThreadSpawn, CoolioThreadPool
# If you're proxying to Unicorn, fully-buffered is the way to go.
#
# Buffered input only (uploads, but not response bodies):
# ThreadSpawn, ThreadPool, FiberSpawn, FiberPool, CoolioFiberSpawn
#
# It is not recommended to use Base, WriterThreadSpawn or WriterThreadPool
# to host this application. However, you may proxy to a backend running
# one of these concurrency models with a fully-buffering concurrency model.
#
# See the {example config}[link:examples/reverse_proxy.ru] for a sample
# configuration
#
# TODO: Revactor support
# TODO: Support HTTP trailers
# TODO: optional streaming input for synchronous
# TODO: error handling
#
# WARNING! this is only lightly tested and has no automated tests, yet!
class Rainbows::ReverseProxy
autoload :MultiThread, 'rainbows/reverse_proxy/multi_thread'
autoload :Synchronous, 'rainbows/reverse_proxy/synchronous'
autoload :Coolio, 'rainbows/reverse_proxy/coolio'
autoload :EventMachine, 'rainbows/reverse_proxy/event_machine'
autoload :EvClient, 'rainbows/reverse_proxy/ev_client'
HTTP_X_FORWARDED_FOR = "HTTP_X_FORWARDED_FOR"
REMOTE_ADDR = "REMOTE_ADDR"
REQUEST_METHOD = "REQUEST_METHOD"
REQUEST_URI = "REQUEST_URI"
CRLF = "\r\n"
TR = %w(_ -)
CONTENT_LENGTH = "CONTENT_LENGTH"
HTTP_TRANSFER_ENCODING = "HTTP_TRANSFER_ENCODING"
RackInput = "rack.input"
E502 = [ 502, [ %w(Content-Length 0), %w(Content-Type text/plain) ], [] ]
def initialize(opts)
@lock = Mutex.new
upstreams = opts[:upstreams]
@upstreams = []
upstreams.each do |url|
url, cfg = *url if Array === url
if url =~ %r{\Ahttp://}
uri = URI.parse(url)
host = uri.host =~ %r{\A\[([a-fA-F0-9:]+)\]\z} ? $1 : uri.host
sockaddr = Socket.sockaddr_in(uri.port, host)
else
path = url.gsub(%r{\Aunix:}, "") # nginx compat
%r{\A~} =~ path and path = File.expand_path(path)
sockaddr = Socket.sockaddr_un(path)
end
((cfg && cfg[:weight]) || 1).times { @upstreams << sockaddr }
end
@nr = 0
end
# detects the concurrency model at first run and replaces itself
def call(env)
if @lock.try_lock
case model = env["rainbows.model"]
when :EventMachine, :NeverBlock
extend(EventMachine)
when :Coolio, :CoolioThreadPool, :CoolioThreadSpawn
extend(Coolio)
when :RevFiberSpawn, :Rev, :RevThreadPool, :RevThreadSpawn
warn "#{model} is not *well* supported with #{self.class}"
warn "Switch to #{model.to_s.gsub(/Rev/, 'Coolio')}!"
extend(Synchronous)
when :Revactor
warn "Revactor is not *well* supported with #{self.class} yet"
extend(Synchronous)
when :FiberSpawn, :FiberPool, :CoolioFiberSpawn
extend(Synchronous)
Synchronous::UpstreamSocket.
__send__(:include, Rainbows::Fiber::IO::Methods)
when :WriterThreadSpawn, :WriterThreadPool
warn "#{model} is not recommended for use with #{self.class}"
extend(Synchronous)
else
extend(Synchronous)
end
extend(MultiThread) if env["rack.multithread"]
@lock.unlock
else
@lock.synchronize {} # wait for the first locker to finish
end
call(env)
end
# returns request headers for sending to the upstream as a string
def build_headers(env, input)
remote_addr = env[REMOTE_ADDR]
xff = env[HTTP_X_FORWARDED_FOR]
xff = xff ? "#{xff},#{remote_addr}" : remote_addr
req = "#{env[REQUEST_METHOD]} #{env[REQUEST_URI]} HTTP/1.0\r\n" \
"Connection: close\r\n" \
"X-Forwarded-For: #{xff}\r\n"
uscore, dash = *TR
env.each do |key, value|
%r{\AHTTP_(\w+)\z} =~ key or next
key = $1
next if %r{\A(?:VERSION|CONNECTION|KEEP_ALIVE|X_FORWARDED_FOR)\z}x =~ key
key.tr!(uscore, dash)
req << "#{key}: #{value}\r\n"
end
input and req << (input.respond_to?(:size) ?
"Content-Length: #{input.size}\r\n" :
"Transfer-Encoding: chunked\r\n")
req << CRLF
end
def pick_upstream(env) # +env+ is reserved for future expansion
@nr += 1
@upstreams[@nr %= @upstreams.size]
end
def prepare_input!(env)
if cl = env[CONTENT_LENGTH]
size = cl.to_i
size > 0 or return
elsif %r{\Achunked\z}i =~ env.delete(HTTP_TRANSFER_ENCODING)
# do people use multiple transfer-encodings?
else
return
end
input = env[RackInput]
if input.respond_to?(:rewind)
if input.respond_to?(:size)
input.size # TeeInput-specific behavior
return input
else
return SizedInput.new(input, size)
end
end
tmp = size && size < 0x4000 ? StringIO.new("") : Unicorn::TmpIO.new
each_block(input) { |x| tmp.syswrite(x) }
tmp.rewind
tmp
end
class SizedInput
attr_reader :size
def initialize(input, n)
buf = ""
if n == nil
n = 0
while input.read(16384, buf)
n += buf.size
end
input.rewind
end
@input, @size = input, n
end
def read(*args)
@input.read(*args)
end
end
class UpstreamSocket < Kgio::Socket
alias readpartial kgio_read!
end
end
|