1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
| | # -*- encoding: binary -*-
# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> and all contributors
# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
class Yahns::Queue < SleepyPenguin::Epoll::IO # :nodoc:
include SleepyPenguin
attr_accessor :fdmap # Yahns::Fdmap
# public
QEV_RD = Epoll::IN | Epoll::ONESHOT
QEV_WR = Epoll::OUT | Epoll::ONESHOT
QEV_RDWR = QEV_RD | QEV_WR
def self.new
super(SleepyPenguin::Epoll::CLOEXEC)
end
# for HTTP and HTTPS servers, we rely on the io writing to us, first
# flags: QEV_RD/QEV_WR (usually QEV_RD)
def queue_add(io, flags)
@fdmap.add(io)
epoll_ctl(Epoll::CTL_ADD, io, flags)
end
# returns an array of infinitely running threads
def worker_thread(logger, max_events)
Thread.new do
Thread.current[:yahns_rbuf] = ""
begin
epoll_wait(max_events) do |_, io| # don't care for flags for now
case rv = io.yahns_step
when :wait_readable
epoll_ctl(Epoll::CTL_MOD, io, QEV_RD)
when :wait_writable
epoll_ctl(Epoll::CTL_MOD, io, QEV_WR)
when :wait_readwrite
epoll_ctl(Epoll::CTL_MOD, io, QEV_RDWR)
when :ignore # only used by rack.hijack
@fdmap.decr
when nil
# this is be the ONLY place where we call IO#close on
# things inside the queue
io.close
@fdmap.decr
else
raise "BUG: #{io.inspect}#yahns_step returned: #{rv.inspect}"
end
end
rescue => e
# sleep since this check is racy (and uncommon)
break if closed? || (sleep(0.01) && closed?)
Yahns::Log.exception(logger, 'queue loop', e)
end while true
end
end
end
|