1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
| | # -*- encoding: binary -*-
require 'thread'
module Rainbows
# This concurrency model implements a single-threaded app dispatch and
# spawns a new thread for writing responses. This concurrency model
# should be ideal for apps that serve large responses or stream
# responses slowly.
#
# Unlike most \Rainbows! concurrency models, WriterThreadSpawn is
# designed to run behind nginx just like Unicorn is. This concurrency
# model may be useful for existing Unicorn users looking for more
# output concurrency than socket buffers can provide while still
# maintaining a single-threaded application dispatch (though if the
# response body is generated on-the-fly, it must be thread safe).
#
# For serving large or streaming responses, setting
# "proxy_buffering off" in nginx is recommended. If your application
# does not handle uploads, then using any HTTP-aware proxy like
# haproxy is fine. Using a non-HTTP-aware proxy will leave you
# vulnerable to slow client denial-of-service attacks.
module WriterThreadSpawn
include Base
CUR = {} # :nodoc:
# used to wrap a BasicSocket to use with +q+ for all writes
# this is compatible with IO.select
class MySocket < Struct.new(:to_io, :q, :thr) # :nodoc: all
include Rainbows::Response
def readpartial(size, buf = "")
to_io.readpartial(size, buf)
end
def write_nonblock(buf)
to_io.write_nonblock(buf)
end
def queue_writer
# not using Thread.pass here because that spins the CPU during
# I/O wait and will eat cycles from other worker processes.
until CUR.size < MAX
CUR.delete_if { |t,_|
t.alive? ? t.join(0) : true
}.size >= MAX and sleep(0.01)
end
q = Queue.new
self.thr = Thread.new(to_io, q) do |io, q|
while response = q.shift
begin
arg1, arg2 = response
case arg1
when :body then write_body(io, arg2)
when :close
io.close unless io.closed?
break
else
io.write(arg1)
end
rescue => e
Error.write(io, e)
end
end
CUR.delete(Thread.current)
end
CUR[thr] = q
end
def write(buf)
(self.q ||= queue_writer) << buf
end
def queue_body(body)
(self.q ||= queue_writer) << [ :body, body ]
end
def close
if q
q << :close
else
to_io.close
end
end
def closed?
false
end
end
def write_body(my_sock, body) # :nodoc:
my_sock.queue_body(body)
end
def process_client(client) # :nodoc:
super(MySocket[client])
end
def worker_loop(worker) # :nodoc:
MySocket.const_set(:MAX, worker_connections)
Rainbows::Response.setup(MySocket)
super(worker) # accept loop from Unicorn
CUR.delete_if do |t,q|
q << nil
G.tick
t.alive? ? t.join(0.01) : true
end until CUR.empty?
end
end
end
|