diff options
Diffstat (limited to 'lib/yahns')
-rw-r--r-- | lib/yahns/queue_kqueue.rb | 82 | ||||
-rw-r--r-- | lib/yahns/queue_quitter_pipe.rb | 4 | ||||
-rw-r--r-- | lib/yahns/sendfile_compat.rb | 29 | ||||
-rw-r--r-- | lib/yahns/sigevent_pipe.rb | 4 | ||||
-rw-r--r-- | lib/yahns/wbuf_common.rb | 10 |
5 files changed, 123 insertions, 6 deletions
diff --git a/lib/yahns/queue_kqueue.rb b/lib/yahns/queue_kqueue.rb new file mode 100644 index 0000000..c502de0 --- /dev/null +++ b/lib/yahns/queue_kqueue.rb @@ -0,0 +1,82 @@ +# -*- encoding: binary -*- +# Copyright (C) 2014, Eric Wong <normalperson@yhbt.net> and all contributors +# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) +# +# This is the dangerous, low-level kqueue interface for sleepy_penguin +# It is safe as long as you're aware of all potential concurrency +# issues given multithreading, GC, and kqueue itself. +class Yahns::Queue < SleepyPenguin::Kqueue::IO # :nodoc: + include SleepyPenguin + attr_accessor :fdmap # Yahns::Fdmap + + # public + QEV_QUIT = nil # Level Trigger for QueueQuitter + QEV_RD = EvFilt::READ + QEV_WR = EvFilt::WRITE + + ADD_ONESHOT = Ev::ADD | Ev::ONESHOT # private + + def self.new + rv = super + rv.close_on_exec = true + rv + end + + # for HTTP and HTTPS servers, we rely on the io writing to us, first + # flags: QEV_RD/QEV_WR (usually QEV_RD) + def queue_add(io, flags) + # order is very important here, this thread cannot do anything with + # io once we've issued epoll_ctl() because another thread may use it + @fdmap.add(io) + fflags = ADD_ONESHOT + if flags == QEV_QUIT + fflags = Ev::ADD + flags = QEV_WR + end + kevent(Kevent[io.fileno, flags, fflags, 0, 0, io]) + end + + def thr_init + Thread.current[:yahns_rbuf] = "" + Thread.current[:yahns_fdmap] = @fdmap + end + + def queue_del(io) + # do not bother with kevent EV_DELETE, it may be tricky to get right, + # we only did it in epoll since Eric knows the epoll internals well. + @fdmap.forget(io) + end + + # returns an array of infinitely running threads + def worker_thread(logger, max_events) + Thread.new do + thr_init + begin + kevent(nil, max_events) do |_,_,_,_,_,io| # don't care for flags for now + # Note: we absolutely must not do anything with io after + # we've called epoll_ctl on it, io is exclusive to this + # thread only until epoll_ctl is called on it. + case rv = io.yahns_step + when :wait_readable + kevent(Kevent[io.fileno, QEV_RD, ADD_ONESHOT, 0, 0, io]) + when :wait_writable + kevent(Kevent[io.fileno, QEV_WR, ADD_ONESHOT, 0, 0, io]) + when :ignore # only used by rack.hijack + # we cannot EV_DELETE after hijacking, the hijacker + # may have already closed it Likewise, io.fileno is not + # expected to work, so we had to erase it from fdmap before hijack + when nil, :close + # this must be the ONLY place where we call IO#close on + # things that got inside the queue + @fdmap.sync_close(io) + else + raise "BUG: #{io.inspect}#yahns_step returned: #{rv.inspect}" + end + end + rescue => e + break if closed? # can still happen due to shutdown_timeout + Yahns::Log.exception(logger, 'queue loop', e) + end while true + end + end +end diff --git a/lib/yahns/queue_quitter_pipe.rb b/lib/yahns/queue_quitter_pipe.rb index 1aa0643..e18e249 100644 --- a/lib/yahns/queue_quitter_pipe.rb +++ b/lib/yahns/queue_quitter_pipe.rb @@ -5,9 +5,8 @@ class Yahns::QueueQuitter # :nodoc: attr_reader :to_io def initialize - reader, @to_io = IO.pipe + @reader, @to_io = IO.pipe @to_io.close_on_exec = true - reader.close end def yahns_step @@ -19,6 +18,7 @@ class Yahns::QueueQuitter # :nodoc: end def close + @reader.close @to_io.close end end diff --git a/lib/yahns/sendfile_compat.rb b/lib/yahns/sendfile_compat.rb new file mode 100644 index 0000000..e3f53d1 --- /dev/null +++ b/lib/yahns/sendfile_compat.rb @@ -0,0 +1,29 @@ +# -*- encoding: binary -*- +# Copyright (C) 2009-2014, Eric Wong <normalperson@yhbt.net> et. al. +# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) +require 'io/extra' # gem install io-extra + +module Yahns::SendfileCompat + def trysendfile(io, offset, count) + return 0 if count == 0 + count = 0x4000 if count > 0x4000 + str = IO.pread(io.fileno, count, offset) + if count > str.bytesize + raise EOFError, "end of file reached" + end + n = 0 + case rv = kgio_trywrite(str) + when String # partial write, keep trying + n += (str.bytesize - rv.bytesize) + str = rv + when :wait_writable, :wait_readable + return n > 0 ? n : rv + when nil + return n + str.bytesize # yay! + end while true + end +end + +class IO + include Yahns::SendfileCompat +end diff --git a/lib/yahns/sigevent_pipe.rb b/lib/yahns/sigevent_pipe.rb index 8bd083f..ee0f14e 100644 --- a/lib/yahns/sigevent_pipe.rb +++ b/lib/yahns/sigevent_pipe.rb @@ -8,8 +8,8 @@ class Yahns::Sigevent # :nodoc: @to_io.close_on_exec = @wr.close_on_exec = true end - def kgio_wait_readable - @to_io.kgio_wait_readable + def kgio_wait_readable(*args) + @to_io.kgio_wait_readable(*args) end def sev_signal diff --git a/lib/yahns/wbuf_common.rb b/lib/yahns/wbuf_common.rb index b3a4502..01e20bb 100644 --- a/lib/yahns/wbuf_common.rb +++ b/lib/yahns/wbuf_common.rb @@ -1,7 +1,12 @@ # -*- encoding: binary -*- # Copyright (C) 2009-2013, Eric Wong <normalperson@yhbt.net> et. al. # License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) -require 'sendfile' +if ENV["SENDFILE_BROKEN"] + require_relative 'sendfile_compat' +else + require 'sendfile' +end + module Yahns::WbufCommon # :nodoc: # returns nil on success, :wait_*able when blocked # currently, we rely on each thread having exclusive access to the @@ -25,7 +30,8 @@ module Yahns::WbufCommon # :nodoc: raise "BUG: rv=#{rv.inspect} " \ "on tmpio=#{@tmpio.inspect} " \ "sf_offset=#@sf_offset sf_count=#@sf_count" - end while true + end while @sf_count > 0 + wbuf_close(client) end def wbuf_close_common(client) |