1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
| | # Copyright (C) 2013-2016 all contributors <yahns-public@yhbt.net>
# License: GPL-3.0+ (https://www.gnu.org/licenses/gpl-3.0.txt)
# frozen_string_literal: true
require_relative 'helper'
require 'timeout'
class TestWbuf < Testcase
ENV["N"].to_i > 1 and parallelize_me!
class KgioUS < UNIXSocket
include Kgio::SocketMethods
def self.output_buffer_tmpdir
Dir.tmpdir
end
end
def socketpair
KgioUS.pair.each { |io| io.close_on_exec = true }
end
def test_wbuf
skip "sendfile not Linux-compatible" if RUBY_PLATFORM !~ /linux/
buf = "*" * (16384 * 2)
nr = 1000
[ true, false ].each do |persist|
wbuf = Yahns::Wbuf.new([], persist)
assert_equal false, wbuf.busy
a, b = socketpair
assert_nil wbuf.wbuf_write(a, "HIHI")
assert_equal "HIHI", b.read(4)
nr.times { wbuf.wbuf_write(a, buf) }
assert_equal :wait_writable, wbuf.wbuf_flush(a)
done = cloexec_pipe
thr = Thread.new do
rv = []
until rv[-1] == persist
IO.select(nil, [a])
tmp = wbuf.wbuf_flush(a)
rv << tmp
end
done[1].syswrite '.'
rv
end
wait = true
begin
if wait
r = IO.select([b,done[0]], nil, nil, 5)
end
b.read_nonblock((rand * 1024).to_i + 666, buf)
wait = (r[0] & done).empty?
rescue Errno::EAGAIN
break
end while true
assert_equal thr, thr.join(5)
rv = thr.value
assert_equal persist, rv.pop
assert(rv.all? { |x| x == :wait_writable })
a.close
b.close
done.each { |io| io.close }
end
end
def test_wbuf_blocked
a, b = socketpair
skip "sendfile not Linux-compatible" if RUBY_PLATFORM !~ /linux/
buf = "." * 4096
4.times do
begin
a.write_nonblock(buf)
rescue Errno::EAGAIN
break
end while true
end
wbuf = Yahns::Wbuf.new([], true)
rv1 = wbuf.wbuf_write(a, buf)
rv2 = wbuf.wbuf_flush(a)
case rv1
when nil
assert_equal true, rv2, 'some kernels succeed with real sendfile'
when :wait_writable
assert_equal :wait_writable, rv2, 'some block on sendfile'
else
flunk "unexpected from wbuf_write/flush: #{rv1.inspect} / #{rv2.inspect}"
end
# drain the buffer
Timeout.timeout(10) { b.read(b.nread) until b.nread == 0 }
# b.nread will increase after this
assert_nil wbuf.wbuf_write(a, "HI")
nr = b.nread
assert_operator nr, :>, 0
assert_equal b, IO.select([b], nil, nil, 5)[0][0]
b.read(nr - 2) if nr > 2
assert_equal b, IO.select([b], nil, nil, 5)[0][0]
assert_equal "HI", b.read(2), "read the end of the response"
assert_equal true, wbuf.wbuf_flush(a)
ensure
a.close
b.close
end
def test_wbuf_flush_close
pipe = cloexec_pipe
persist = true
wbuf = Yahns::Wbuf.new(pipe[0], persist)
refute wbuf.respond_to?(:close) # we don't want this for HttpResponse body
sp = socketpair
rv = nil
buf = ("*" * 16384) << "\n"
thr = Thread.new do
1000.times { pipe[1].write(buf) }
pipe[1].close
end
pipe[0].each { |chunk| rv = wbuf.wbuf_write(sp[1], chunk) }
assert_equal thr, thr.join(5)
assert_equal :wait_writable, rv
done = cloexec_pipe
thr = Thread.new do
rv = []
until rv[-1] == persist
IO.select(nil, [sp[1]])
rv << wbuf.wbuf_flush(sp[1])
end
done[1].syswrite '.'
rv
end
wait = true
begin
if wait
r = IO.select([sp[0],done[0]], nil, nil, 5)
end
sp[0].read_nonblock(16384, buf)
wait = (r[0] & done).empty?
rescue Errno::EAGAIN
break
end while true
assert_equal thr, thr.join(5)
rv = thr.value
assert_equal true, rv.pop
assert rv.all? { |x| x == :wait_writable }
assert pipe[0].closed?
sp.each(&:close)
done.each(&:close)
end
end
|