about summary refs log tree commit homepage
path: root/lib/yahns/queue_epoll.rb
diff options
context:
space:
mode:
Diffstat (limited to 'lib/yahns/queue_epoll.rb')
-rw-r--r--lib/yahns/queue_epoll.rb57
1 files changed, 57 insertions, 0 deletions
diff --git a/lib/yahns/queue_epoll.rb b/lib/yahns/queue_epoll.rb
new file mode 100644
index 0000000..c9febc4
--- /dev/null
+++ b/lib/yahns/queue_epoll.rb
@@ -0,0 +1,57 @@
+# -*- encoding: binary -*-
+# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> and all contributors
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+class Yahns::Queue < SleepyPenguin::Epoll::IO # :nodoc:
+  include SleepyPenguin
+  attr_accessor :fdmap # Yahns::Fdmap
+
+  # public
+  QEV_RD = Epoll::IN | Epoll::ONESHOT
+  QEV_WR = Epoll::OUT | Epoll::ONESHOT
+  QEV_RDWR = QEV_RD | QEV_WR
+
+  def self.new
+    super(SleepyPenguin::Epoll::CLOEXEC)
+  end
+
+  # for HTTP and HTTPS servers, we rely on the io writing to us, first
+  # flags: QEV_RD/QEV_WR (usually QEV_RD)
+  def queue_add(io, flags)
+    @fdmap.add(io)
+    epoll_ctl(Epoll::CTL_ADD, io, flags)
+  end
+
+  # returns an array of infinitely running threads
+  def spawn_worker_threads(logger, worker_threads, max_events)
+    worker_threads.times do
+      Thread.new do
+        Thread.current[:yahns_rbuf] = ""
+        begin
+          epoll_wait(max_events) do |_, io| # don't care for flags for now
+            case rv = io.yahns_step
+            when :wait_readable
+              epoll_ctl(Epoll::CTL_MOD, io, QEV_RD)
+            when :wait_writable
+              epoll_ctl(Epoll::CTL_MOD, io, QEV_WR)
+            when :wait_readwrite
+              epoll_ctl(Epoll::CTL_MOD, io, QEV_RDWR)
+            when :delete # only used by rack.hijack
+              epoll_ctl(Epoll::CTL_DEL, io, 0)
+              @fdmap.delete(io)
+            when nil
+              # this is be the ONLY place where we call IO#close on
+              # things inside the queue
+              io.close
+              @fdmap.decr
+            else
+              raise "BUG: #{io.inspect}#yahns_step returned: #{rv.inspect}"
+            end
+          end
+        rescue => e
+          break if (IOError === e || Errno::EBADF === e) && closed?
+          Yahns::Log.exception(logger, 'queue loop', e)
+        end while true
+      end
+    end
+  end
+end