yahns Ruby server user/dev discussion
 help / color / mirror / code / Atom feed
From: Eric Wong <yahns-public@yhbt.net>
To: yahns-public@yhbt.net
Subject: [PATCH 1/2] remove kgio read and wait dependencies
Date: Mon,  6 Mar 2017 03:30:10 +0000	[thread overview]
Message-ID: <20170306033011.26938-2-yahns-public@yhbt.net> (raw)
In-Reply-To: <20170306033011.26938-1-yahns-public@yhbt.net>

Ruby 2.4+ should contain everything we need to replace kgio;
and we can provide reasonable (if not slower) facsimiles
for older Rubies.

The reading and wait_*able APIs of kgio are similar to APIs
in the IO core class, so lets deal with that, first.

Removing dependencies on writing will be slightly trickier...
---
 extras/exec_cgi.rb                 | 42 ++++++++++++++++++++++++--------------
 extras/proxy_pass.rb               | 23 +++++++++++----------
 lib/yahns.rb                       |  1 +
 lib/yahns/acceptor.rb              |  2 +-
 lib/yahns/client_expire_generic.rb |  2 +-
 lib/yahns/http_client.rb           | 32 ++++++++++-------------------
 lib/yahns/http_response.rb         |  8 ++++----
 lib/yahns/openssl_client.rb        |  6 +++---
 lib/yahns/proxy_http_response.rb   |  6 +++---
 lib/yahns/req_res.rb               |  4 ++--
 lib/yahns/server.rb                |  2 +-
 lib/yahns/server_mp.rb             |  2 +-
 lib/yahns/sigevent_efd.rb          |  1 -
 lib/yahns/sigevent_pipe.rb         | 10 +++------
 lib/yahns/worker.rb                |  6 +++---
 15 files changed, 73 insertions(+), 74 deletions(-)

diff --git a/extras/exec_cgi.rb b/extras/exec_cgi.rb
index b546e1f..15fe7de 100644
--- a/extras/exec_cgi.rb
+++ b/extras/exec_cgi.rb
@@ -21,18 +21,29 @@
 #   run ExecCgi.new('/path/to/cgit.cgi') # cgit: https://git.zx2c4.com/cgit/
 #
 class ExecCgi
-  class MyIO < Kgio::Pipe
+  class MyIO
     attr_writer :my_pid
     attr_writer :body_tip
+    attr_reader :rd, :wr
+
+    def initialize
+      @rd, @wr = IO.pipe
+    end
 
     def each
       buf = @body_tip || ''.dup
       if buf.size > 0
         yield buf
       end
-      while tmp = kgio_read(8192, buf)
+
+      case tmp = @rd.read_nonblock(8192, buf, exception: false)
+      when String
         yield tmp
-      end
+      when :wait_readable
+        @rd.wait_readable
+      when nil
+        break
+      end while true
       self
     ensure
       # do this sooner, since the response body may be buffered, we want
@@ -46,8 +57,8 @@ def close
       # Note: this object (and any client-specific objects) will never
       # be shared across different threads, so we do not need extra
       # mutual exclusion here.
-      return if closed?
-      super
+      return if @rd.closed?
+      @rd.close
       begin
         Process.waitpid(@my_pid)
       rescue Errno::ECHILD
@@ -90,20 +101,21 @@ def call(env)
     cgi_env = { "GATEWAY_INTERFACE" => "CGI/1.1" }
     PASS_VARS.each { |key| val = env[key] and cgi_env[key] = val }
     env.each { |key,val| cgi_env[key] = val if key =~ /\AHTTP_/ }
-    pipe = MyIO.pipe
-    errbody = pipe[0]
+    io = MyIO.new
+    errbody = io
     errbody.my_pid = Process.spawn(cgi_env.merge!(@env), *@args,
-                                   out: pipe[1], close_others: true)
-    pipe[1].close
-    pipe = pipe[0]
+                              out: io.wr, close_others: true)
+    io.wr.close
+    rd = io.rd
 
-    if head = pipe.kgio_read(8192)
+    begin
+      head = rd.readpartial(8192)
       until head =~ /\r?\n\r?\n/
-        tmp = pipe.kgio_read(8192) or break
+        tmp = rd.readpartial(8192)
         head << tmp
       end
       head, body = head.split(/\r?\n\r?\n/, 2)
-      pipe.body_tip = body
+      io.body_tip = body
 
       env["HTTP_VERSION"] ||= "HTTP/1.0" # stop Rack::Chunked for HTTP/0.9
 
@@ -117,8 +129,8 @@ def call(env)
       end
       status = headers.delete("Status") || 200
       errbody = nil
-      [ status, headers, pipe ]
-    else
+      [ status, headers, io ]
+    rescue EOFError
       [ 500, { "Content-Length" => "0", "Content-Type" => "text/plain" }, [] ]
     end
   ensure
diff --git a/extras/proxy_pass.rb b/extras/proxy_pass.rb
index 310da9e..9c03f42 100644
--- a/extras/proxy_pass.rb
+++ b/extras/proxy_pass.rb
@@ -9,6 +9,7 @@
 require 'rack/request'
 require 'thread'
 require 'timeout'
+require 'io/wait'
 
 # Totally synchronous and Rack 1.1-compatible, this will probably be rewritten.
 # to take advantage of rack.hijack and use the non-blocking I/O facilities
@@ -35,11 +36,6 @@ def put(obj)
   class UpstreamSocket < Kgio::Socket # :nodoc:
     attr_writer :expiry
 
-    # called automatically by kgio_read!
-    def kgio_wait_readable(timeout = nil)
-      super(timeout || wait_time)
-    end
-
     def wait_time
       tout = @expiry ? @expiry - Time.now : @timeout
       raise Timeout::Error, "request timed out", [] if tout < 0
@@ -47,11 +43,15 @@ def wait_time
     end
 
     def readpartial(bytes, buf = Thread.current[:proxy_pass_buf] ||= ''.dup)
-      case rv = kgio_read!(bytes, buf)
-      when String
+      case rv = read_nonblock(bytes, buf, exception: false)
+      when :wait_readable
+        wait_readable(wait_time)
+      when nil
+        return rv
+      else
         @expiry += @timeout # bump expiry when we succeed
-      end
-      rv
+        return rv
+      end while true
     end
 
     def req_write(buf, timeout)
@@ -59,7 +59,7 @@ def req_write(buf, timeout)
       @expiry = Time.now + timeout
       case rv = kgio_trywrite(buf)
       when :wait_writable
-        kgio_wait_writable(wait_time)
+        wait_writable(wait_time)
       when nil
         return
       when String
@@ -84,7 +84,8 @@ def req_write(req, timeout)
 
     # returns true if the socket is still alive, nil if dead
     def sock_alive?
-      @reused = (:wait_readable == (@sock.kgio_tryread(1) rescue nil)) ?
+      @reused = (:wait_readable ==
+                 (@sock.read_nonblock(1, ''.b, exception: false) rescue nil)) ?
                 true : @sock.close
     end
 
diff --git a/lib/yahns.rb b/lib/yahns.rb
index a0abe49..a9a1c76 100644
--- a/lib/yahns.rb
+++ b/lib/yahns.rb
@@ -4,6 +4,7 @@
 $stdout.sync = $stderr.sync = true
 
 require 'unicorn' # pulls in raindrops, kgio, fcntl, etc, stringio, and logger
+require 'io/wait'
 require 'sleepy_penguin'
 
 # kill off some unicorn internals we don't need
diff --git a/lib/yahns/acceptor.rb b/lib/yahns/acceptor.rb
index 7ab9f60..7c0f368 100644
--- a/lib/yahns/acceptor.rb
+++ b/lib/yahns/acceptor.rb
@@ -31,7 +31,7 @@ def ac_quit
       begin
         # try to connect to kick it out of the blocking accept() syscall
         killer = Kgio::Socket.start(getsockname)
-        killer.kgio_write("G") # first byte of "GET / HTTP/1.0\r\n\r\n"
+        killer.write("G") # first byte of "GET / HTTP/1.0\r\n\r\n"
       ensure
         killer.close if killer
       end
diff --git a/lib/yahns/client_expire_generic.rb b/lib/yahns/client_expire_generic.rb
index 59c1b46..2165af9 100644
--- a/lib/yahns/client_expire_generic.rb
+++ b/lib/yahns/client_expire_generic.rb
@@ -29,7 +29,7 @@ def kgio_trywrite(*args)
     super
   end
 
-  def kgio_tryread(*args)
+  def read_nonblock(*args)
     @last_io_at = __timestamp
     super
   end
diff --git a/lib/yahns/http_client.rb b/lib/yahns/http_client.rb
index d8154a4..f0b61fd 100644
--- a/lib/yahns/http_client.rb
+++ b/lib/yahns/http_client.rb
@@ -80,11 +80,11 @@ def input_ready
   # returns true if we want to keep looping on this
   # returns :wait_readable/wait_writable/nil to yield back to epoll
   def fill_body(rsize, rbuf)
-    case rv = kgio_tryread(rsize, rbuf)
+    case rv = read_nonblock(rsize, rbuf, exception: false)
     when String
       @hs.filter_body(rbuf, @hs.buf << rbuf)
       @input.write(rbuf)
-      true # keep looping on kgio_tryread (but check body_eof? first)
+      true # keep looping on read_nonblock (but check body_eof? first)
     when :wait_readable, :wait_writable
       rv # have epoll/kqueue wait for more
     when nil # unexpected EOF
@@ -95,13 +95,13 @@ def fill_body(rsize, rbuf)
   # returns true if we are ready to dispatch the app
   # returns :wait_readable/wait_writable/nil to yield back to epoll
   def read_trailers(rsize, rbuf)
-    case rv = kgio_tryread(rsize, rbuf)
+    case rv = read_nonblock(rsize, rbuf, exception: false)
     when String
       if @hs.add_parse(rbuf)
         @input.rewind
         return true
       end
-      # keep looping on kgio_tryread...
+      # keep looping on read_nonblock...
     when :wait_readable, :wait_writable
       return rv # wait for more
     when nil # unexpected EOF
@@ -133,7 +133,8 @@ def yahns_step
       end
       # continue to outer loop
     when :headers
-      case rv = kgio_tryread(k.client_header_buffer_size, rbuf)
+      case rv = read_nonblock(k.client_header_buffer_size, rbuf,
+                              exception: false)
       when String
         if @hs.add_parse(rv)
           case input = input_ready
@@ -143,7 +144,7 @@ def yahns_step
             return app_call(input)
           end
         end
-        # keep looping on kgio_tryread
+        # keep looping on read_nonblock
       when :wait_readable, :wait_writable, nil
         return rv
       end while true
@@ -235,25 +236,14 @@ def app_call(input)
     http_response_write(res, opt)
   end
 
-  # called automatically by kgio_write
-  def kgio_wait_writable(timeout = self.class.client_timeout)
-    super timeout
-  end
-
-  # called automatically by kgio_read
-  def kgio_wait_readable(timeout = self.class.client_timeout)
-    super timeout
-  end
-
   # used by StreamInput (and thus TeeInput) for input_buffering {false|:lazy}
   def yahns_read(bytes, buf)
-    case rv = kgio_tryread(bytes, buf)
+    case rv = read_nonblock(bytes, buf, exception: false)
     when String, nil
       return rv
-    when :wait_readable
-      kgio_wait_readable or raise Yahns::ClientTimeout, "waiting for read", []
-    when :wait_writable
-      kgio_wait_writable or raise Yahns::ClientTimeout, "waiting for write", []
+    when :wait_readable, :wait_writable
+      __send__(rv, self.class.client_timeout) or
+            raise Yahns::ClientTimeout, "on #{rv}", []
     end while true
   end
 
diff --git a/lib/yahns/http_response.rb b/lib/yahns/http_response.rb
index 865193d..86b7f56 100644
--- a/lib/yahns/http_response.rb
+++ b/lib/yahns/http_response.rb
@@ -47,11 +47,11 @@ def response_start
   end
 
   def response_wait_write(rv)
-    # call the kgio_wait_readable or kgio_wait_writable method
-    ok = __send__("kgio_#{rv}") and return ok
+    # call the wait_readable or wait_writable method
     k = self.class
-    k.logger.info("fd=#{fileno} ip=#@kgio_addr timeout on :#{rv} after "\
-                  "#{k.client_timeout}s")
+    t = k.client_timeout
+    ok = __send__(rv, t) and return ok
+    k.logger.info("fd=#{fileno} ip=#@kgio_addr timeout on :#{rv} after #{t}s")
     false
   end
 
diff --git a/lib/yahns/openssl_client.rb b/lib/yahns/openssl_client.rb
index 0d376bd..a86d701 100644
--- a/lib/yahns/openssl_client.rb
+++ b/lib/yahns/openssl_client.rb
@@ -13,7 +13,7 @@ def self.included(cls)
     # This is a bit weird, since OpenSSL::SSL::SSLSocket wraps
     # our actual socket, too, so we must take care to not blindly
     # use method_missing and cause infinite recursion
-    %w(sync= read write readpartial write_nonblock read_nonblock
+    %w(sync= read write readpartial write_nonblock
        print printf puts gets readlines readline getc
        readchar ungetc eof eof? << flush
        sysread syswrite).map!(&:to_sym).each do |m|
@@ -59,7 +59,7 @@ def kgio_syssend(buf, flags)
     kgio_trywrite(buf)
   end
 
-  def kgio_tryread(len, buf)
+  def read_nonblock(len, buf = nil, exception: true)
     if @need_accept
       # most protocols require read before write, so we start the negotiation
       # process here:
@@ -69,7 +69,7 @@ def kgio_tryread(len, buf)
       end
       @need_accept = false
     end
-    @ssl.read_nonblock(len, buf, exception: false)
+    @ssl.read_nonblock(len, buf, exception: exception)
   end
 
   def trysendio(io, offset, count)
diff --git a/lib/yahns/proxy_http_response.rb b/lib/yahns/proxy_http_response.rb
index 7df2834..d9c878e 100644
--- a/lib/yahns/proxy_http_response.rb
+++ b/lib/yahns/proxy_http_response.rb
@@ -153,13 +153,13 @@ def proxy_read_body(tip, kcar, req_res)
     alive = req_res.alive
     wbuf = req_res.resbuf
 
-    case tmp = tip.shift || req_res.kgio_tryread(0x2000, rbuf)
+    case tmp = tip.shift || req_res.read_nonblock(0x2000, rbuf, exception:false)
     when String
       if len
         kcar.body_bytes_left -= tmp.size # progress for body_eof? => true
       elsif chunk
         kcar.filter_body(chunk, rbuf = tmp) # progress for body_eof? => true
-        next if chunk.empty? # call req_res.kgio_tryread for more
+        next if chunk.empty? # call req_res.read_nonblock for more
         tmp = chunk_out(chunk)
       elsif alive # HTTP/1.0 upstream, HTTP/1.1 client
         tmp = chunk_out(tmp)
@@ -200,7 +200,7 @@ def proxy_read_trailers(kcar, req_res)
     wbuf = req_res.resbuf
 
     until kcar.trailers(tlr, chunk)
-      case rv = req_res.kgio_tryread(0x2000, rbuf)
+      case rv = req_res.read_nonblock(0x2000, rbuf, exception: false)
       when String
         chunk << rv
       when :wait_readable
diff --git a/lib/yahns/req_res.rb b/lib/yahns/req_res.rb
index 4ad8e5c..0fa4285 100644
--- a/lib/yahns/req_res.rb
+++ b/lib/yahns/req_res.rb
@@ -29,7 +29,7 @@ def yahns_step # yahns event loop entry point
       case resbuf = @resbuf # where are we at the response?
       when nil # common case, catch the response header in a single read
 
-        case rv = kgio_tryread(0x2000, buf)
+        case rv = read_nonblock(0x2000, buf, exception: false)
         when String
           if res = req.headers(@hdr = [], rv)
             return c.proxy_response_start(res, rv, req, self)
@@ -48,7 +48,7 @@ def yahns_step # yahns event loop entry point
 
       when String # continue reading trickled response headers from upstream
 
-        case rv = kgio_tryread(0x2000, buf)
+        case rv = read_nonblock(0x2000, buf, exception: false)
         when String then res = req.headers(@hdr, resbuf << rv) and break
         when :wait_readable then return rv
         when nil
diff --git a/lib/yahns/server.rb b/lib/yahns/server.rb
index 00e5f15..2f2c57a 100644
--- a/lib/yahns/server.rb
+++ b/lib/yahns/server.rb
@@ -473,7 +473,7 @@ def reap_reexec
   end
 
   def sp_sig_handle(alive)
-    @sev.kgio_wait_readable(alive ? nil : 0.01)
+    @sev.wait_readable(alive ? nil : 0.01)
     @sev.yahns_step
     case sig = @sig_queue.shift
     when :QUIT, :TERM, :INT
diff --git a/lib/yahns/server_mp.rb b/lib/yahns/server_mp.rb
index c9cd207..8982401 100644
--- a/lib/yahns/server_mp.rb
+++ b/lib/yahns/server_mp.rb
@@ -91,7 +91,7 @@ def join
     @logger.info "master process ready"
     daemon_ready
     begin
-      @sev.kgio_wait_readable
+      @sev.to_io.wait_readable
       @sev.yahns_step
       reap_all
       case @sig_queue.shift
diff --git a/lib/yahns/sigevent_efd.rb b/lib/yahns/sigevent_efd.rb
index 1250cf4..264097d 100644
--- a/lib/yahns/sigevent_efd.rb
+++ b/lib/yahns/sigevent_efd.rb
@@ -3,7 +3,6 @@
 # License: GPL-3.0+ (https://www.gnu.org/licenses/gpl-3.0.txt)
 # frozen_string_literal: true
 class Yahns::Sigevent < SleepyPenguin::EventFD # :nodoc:
-  include Kgio::DefaultWaiters
   def self.new
     super(0, :CLOEXEC)
   end
diff --git a/lib/yahns/sigevent_pipe.rb b/lib/yahns/sigevent_pipe.rb
index a85fb2a..00cd9e4 100644
--- a/lib/yahns/sigevent_pipe.rb
+++ b/lib/yahns/sigevent_pipe.rb
@@ -5,21 +5,17 @@
 class Yahns::Sigevent # :nodoc:
   attr_reader :to_io
   def initialize
-    @to_io, @wr = Kgio::Pipe.new
+    @to_io, @wr = IO.pipe
     @to_io.close_on_exec = @wr.close_on_exec = true
   end
 
-  def kgio_wait_readable(*args)
-    @to_io.kgio_wait_readable(*args)
-  end
-
   def sev_signal
-    @wr.kgio_trywrite(".")
+    @wr.write_nonblock(".", exception: false)
   end
 
   def yahns_step
     # 11 byte strings -> no malloc on YARV
-    while String === @to_io.kgio_tryread(11)
+    while String === @to_io.read_nonblock(11, exception: false)
     end
     :wait_readable
   end
diff --git a/lib/yahns/worker.rb b/lib/yahns/worker.rb
index 9192803..8886936 100644
--- a/lib/yahns/worker.rb
+++ b/lib/yahns/worker.rb
@@ -8,7 +8,7 @@ class Yahns::Worker # :nodoc:
 
   def initialize(nr)
     @nr = nr
-    @to_io, @wr = Kgio::Pipe.new
+    @to_io, @wr = IO.pipe
   end
 
   def atfork_child
@@ -24,7 +24,7 @@ def atfork_parent
   # This causes the worker to gracefully exit if the master
   # dies unexpectedly.
   def yahns_step
-    case buf = @to_io.kgio_tryread(4)
+    case buf = @to_io.read_nonblock(4, exception: false)
     when String
       # unpack the buffer and trigger the signal handler
       signum = buf.unpack('l')
@@ -60,7 +60,7 @@ def fake_sig(sig) # :nodoc:
   def soft_kill(signum) # :nodoc:
     # writing and reading 4 bytes on a pipe is atomic on all POSIX platforms
     # Do not care in the odd case the buffer is full, here.
-    @wr.kgio_trywrite([signum].pack('l'))
+    @wr.write_nonblock([signum].pack('l'), exception: false)
   rescue Errno::EPIPE
     # worker will be reaped soon
   end
-- 
EW


  reply	other threads:[~2017-03-06  3:30 UTC|newest]

Thread overview: 3+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2017-03-06  3:30 [PATCH 0/2] WIP remove-kgio branch pushed branch Eric Wong
2017-03-06  3:30 ` Eric Wong [this message]
2017-03-06  3:30 ` [PATCH 2/2] drop writev support Eric Wong

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

  List information: https://yhbt.net/yahns/README

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20170306033011.26938-2-yahns-public@yhbt.net \
    --to=yahns-public@yhbt.net \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
Code repositories for project(s) associated with this inbox:

	../../yahns.git

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).