about summary refs log tree commit homepage
path: root/lib/yahns/queue_kqueue.rb
diff options
context:
space:
mode:
Diffstat (limited to 'lib/yahns/queue_kqueue.rb')
-rw-r--r--lib/yahns/queue_kqueue.rb82
1 files changed, 82 insertions, 0 deletions
diff --git a/lib/yahns/queue_kqueue.rb b/lib/yahns/queue_kqueue.rb
new file mode 100644
index 0000000..c502de0
--- /dev/null
+++ b/lib/yahns/queue_kqueue.rb
@@ -0,0 +1,82 @@
+# -*- encoding: binary -*-
+# Copyright (C) 2014, Eric Wong <normalperson@yhbt.net> and all contributors
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+#
+# This is the dangerous, low-level kqueue interface for sleepy_penguin
+# It is safe as long as you're aware of all potential concurrency
+# issues given multithreading, GC, and kqueue itself.
+class Yahns::Queue < SleepyPenguin::Kqueue::IO # :nodoc:
+  include SleepyPenguin
+  attr_accessor :fdmap # Yahns::Fdmap
+
+  # public
+  QEV_QUIT = nil # Level Trigger for QueueQuitter
+  QEV_RD = EvFilt::READ
+  QEV_WR = EvFilt::WRITE
+
+  ADD_ONESHOT = Ev::ADD | Ev::ONESHOT # private
+
+  def self.new
+    rv = super
+    rv.close_on_exec = true
+    rv
+  end
+
+  # for HTTP and HTTPS servers, we rely on the io writing to us, first
+  # flags: QEV_RD/QEV_WR (usually QEV_RD)
+  def queue_add(io, flags)
+    # order is very important here, this thread cannot do anything with
+    # io once we've issued epoll_ctl() because another thread may use it
+    @fdmap.add(io)
+    fflags = ADD_ONESHOT
+    if flags == QEV_QUIT
+      fflags = Ev::ADD
+      flags = QEV_WR
+    end
+    kevent(Kevent[io.fileno, flags, fflags, 0, 0, io])
+  end
+
+  def thr_init
+    Thread.current[:yahns_rbuf] = ""
+    Thread.current[:yahns_fdmap] = @fdmap
+  end
+
+  def queue_del(io)
+    # do not bother with kevent EV_DELETE, it may be tricky to get right,
+    # we only did it in epoll since Eric knows the epoll internals well.
+    @fdmap.forget(io)
+  end
+
+  # returns an array of infinitely running threads
+  def worker_thread(logger, max_events)
+    Thread.new do
+      thr_init
+      begin
+        kevent(nil, max_events) do |_,_,_,_,_,io| # don't care for flags for now
+          # Note: we absolutely must not do anything with io after
+          # we've called epoll_ctl on it, io is exclusive to this
+          # thread only until epoll_ctl is called on it.
+          case rv = io.yahns_step
+          when :wait_readable
+            kevent(Kevent[io.fileno, QEV_RD, ADD_ONESHOT, 0, 0, io])
+          when :wait_writable
+            kevent(Kevent[io.fileno, QEV_WR, ADD_ONESHOT, 0, 0, io])
+          when :ignore # only used by rack.hijack
+            # we cannot EV_DELETE after hijacking, the hijacker
+            # may have already closed it  Likewise, io.fileno is not
+            # expected to work, so we had to erase it from fdmap before hijack
+          when nil, :close
+            # this must be the ONLY place where we call IO#close on
+            # things that got inside the queue
+            @fdmap.sync_close(io)
+          else
+            raise "BUG: #{io.inspect}#yahns_step returned: #{rv.inspect}"
+          end
+        end
+      rescue => e
+        break if closed? # can still happen due to shutdown_timeout
+        Yahns::Log.exception(logger, 'queue loop', e)
+      end while true
+    end
+  end
+end