about summary refs log tree commit homepage
path: root/lib/yahns
diff options
context:
space:
mode:
Diffstat (limited to 'lib/yahns')
-rw-r--r--lib/yahns/queue_kqueue.rb82
-rw-r--r--lib/yahns/queue_quitter_pipe.rb4
-rw-r--r--lib/yahns/sendfile_compat.rb29
-rw-r--r--lib/yahns/sigevent_pipe.rb4
-rw-r--r--lib/yahns/wbuf_common.rb10
5 files changed, 123 insertions, 6 deletions
diff --git a/lib/yahns/queue_kqueue.rb b/lib/yahns/queue_kqueue.rb
new file mode 100644
index 0000000..c502de0
--- /dev/null
+++ b/lib/yahns/queue_kqueue.rb
@@ -0,0 +1,82 @@
+# -*- encoding: binary -*-
+# Copyright (C) 2014, Eric Wong <normalperson@yhbt.net> and all contributors
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+#
+# This is the dangerous, low-level kqueue interface for sleepy_penguin
+# It is safe as long as you're aware of all potential concurrency
+# issues given multithreading, GC, and kqueue itself.
+class Yahns::Queue < SleepyPenguin::Kqueue::IO # :nodoc:
+  include SleepyPenguin
+  attr_accessor :fdmap # Yahns::Fdmap
+
+  # public
+  QEV_QUIT = nil # Level Trigger for QueueQuitter
+  QEV_RD = EvFilt::READ
+  QEV_WR = EvFilt::WRITE
+
+  ADD_ONESHOT = Ev::ADD | Ev::ONESHOT # private
+
+  def self.new
+    rv = super
+    rv.close_on_exec = true
+    rv
+  end
+
+  # for HTTP and HTTPS servers, we rely on the io writing to us, first
+  # flags: QEV_RD/QEV_WR (usually QEV_RD)
+  def queue_add(io, flags)
+    # order is very important here, this thread cannot do anything with
+    # io once we've issued epoll_ctl() because another thread may use it
+    @fdmap.add(io)
+    fflags = ADD_ONESHOT
+    if flags == QEV_QUIT
+      fflags = Ev::ADD
+      flags = QEV_WR
+    end
+    kevent(Kevent[io.fileno, flags, fflags, 0, 0, io])
+  end
+
+  def thr_init
+    Thread.current[:yahns_rbuf] = ""
+    Thread.current[:yahns_fdmap] = @fdmap
+  end
+
+  def queue_del(io)
+    # do not bother with kevent EV_DELETE, it may be tricky to get right,
+    # we only did it in epoll since Eric knows the epoll internals well.
+    @fdmap.forget(io)
+  end
+
+  # returns an array of infinitely running threads
+  def worker_thread(logger, max_events)
+    Thread.new do
+      thr_init
+      begin
+        kevent(nil, max_events) do |_,_,_,_,_,io| # don't care for flags for now
+          # Note: we absolutely must not do anything with io after
+          # we've called epoll_ctl on it, io is exclusive to this
+          # thread only until epoll_ctl is called on it.
+          case rv = io.yahns_step
+          when :wait_readable
+            kevent(Kevent[io.fileno, QEV_RD, ADD_ONESHOT, 0, 0, io])
+          when :wait_writable
+            kevent(Kevent[io.fileno, QEV_WR, ADD_ONESHOT, 0, 0, io])
+          when :ignore # only used by rack.hijack
+            # we cannot EV_DELETE after hijacking, the hijacker
+            # may have already closed it  Likewise, io.fileno is not
+            # expected to work, so we had to erase it from fdmap before hijack
+          when nil, :close
+            # this must be the ONLY place where we call IO#close on
+            # things that got inside the queue
+            @fdmap.sync_close(io)
+          else
+            raise "BUG: #{io.inspect}#yahns_step returned: #{rv.inspect}"
+          end
+        end
+      rescue => e
+        break if closed? # can still happen due to shutdown_timeout
+        Yahns::Log.exception(logger, 'queue loop', e)
+      end while true
+    end
+  end
+end
diff --git a/lib/yahns/queue_quitter_pipe.rb b/lib/yahns/queue_quitter_pipe.rb
index 1aa0643..e18e249 100644
--- a/lib/yahns/queue_quitter_pipe.rb
+++ b/lib/yahns/queue_quitter_pipe.rb
@@ -5,9 +5,8 @@
 class Yahns::QueueQuitter # :nodoc:
   attr_reader :to_io
   def initialize
-    reader, @to_io = IO.pipe
+    @reader, @to_io = IO.pipe
     @to_io.close_on_exec = true
-    reader.close
   end
 
   def yahns_step
@@ -19,6 +18,7 @@ class Yahns::QueueQuitter # :nodoc:
   end
 
   def close
+    @reader.close
     @to_io.close
   end
 end
diff --git a/lib/yahns/sendfile_compat.rb b/lib/yahns/sendfile_compat.rb
new file mode 100644
index 0000000..e3f53d1
--- /dev/null
+++ b/lib/yahns/sendfile_compat.rb
@@ -0,0 +1,29 @@
+# -*- encoding: binary -*-
+# Copyright (C) 2009-2014, Eric Wong <normalperson@yhbt.net> et. al.
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+require 'io/extra' # gem install io-extra
+
+module Yahns::SendfileCompat
+  def trysendfile(io, offset, count)
+    return 0 if count == 0
+    count = 0x4000 if count > 0x4000
+    str = IO.pread(io.fileno, count, offset)
+    if count > str.bytesize
+      raise EOFError, "end of file reached"
+    end
+    n = 0
+    case rv = kgio_trywrite(str)
+    when String # partial write, keep trying
+      n += (str.bytesize - rv.bytesize)
+      str = rv
+    when :wait_writable, :wait_readable
+      return n > 0 ? n : rv
+    when nil
+      return n + str.bytesize # yay!
+    end while true
+  end
+end
+
+class IO
+  include Yahns::SendfileCompat
+end
diff --git a/lib/yahns/sigevent_pipe.rb b/lib/yahns/sigevent_pipe.rb
index 8bd083f..ee0f14e 100644
--- a/lib/yahns/sigevent_pipe.rb
+++ b/lib/yahns/sigevent_pipe.rb
@@ -8,8 +8,8 @@ class Yahns::Sigevent # :nodoc:
     @to_io.close_on_exec = @wr.close_on_exec = true
   end
 
-  def kgio_wait_readable
-    @to_io.kgio_wait_readable
+  def kgio_wait_readable(*args)
+    @to_io.kgio_wait_readable(*args)
   end
 
   def sev_signal
diff --git a/lib/yahns/wbuf_common.rb b/lib/yahns/wbuf_common.rb
index b3a4502..01e20bb 100644
--- a/lib/yahns/wbuf_common.rb
+++ b/lib/yahns/wbuf_common.rb
@@ -1,7 +1,12 @@
 # -*- encoding: binary -*-
 # Copyright (C) 2009-2013, Eric Wong <normalperson@yhbt.net> et. al.
 # License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
-require 'sendfile'
+if ENV["SENDFILE_BROKEN"]
+  require_relative 'sendfile_compat'
+else
+  require 'sendfile'
+end
+
 module Yahns::WbufCommon # :nodoc:
   # returns nil on success, :wait_*able when blocked
   # currently, we rely on each thread having exclusive access to the
@@ -25,7 +30,8 @@ module Yahns::WbufCommon # :nodoc:
       raise "BUG: rv=#{rv.inspect} " \
             "on tmpio=#{@tmpio.inspect} " \
             "sf_offset=#@sf_offset sf_count=#@sf_count"
-    end while true
+    end while @sf_count > 0
+    wbuf_close(client)
   end
 
   def wbuf_close_common(client)