From: Eric Wong <yahns-public@yhbt.net>
To: yahns-public@yhbt.net
Subject: [PATCH 1/2] remove kgio read and wait dependencies
Date: Mon, 6 Mar 2017 03:30:10 +0000 [thread overview]
Message-ID: <20170306033011.26938-2-yahns-public@yhbt.net> (raw)
In-Reply-To: <20170306033011.26938-1-yahns-public@yhbt.net>
Ruby 2.4+ should contain everything we need to replace kgio;
and we can provide reasonable (if not slower) facsimiles
for older Rubies.
The reading and wait_*able APIs of kgio are similar to APIs
in the IO core class, so lets deal with that, first.
Removing dependencies on writing will be slightly trickier...
---
| 42 ++++++++++++++++++++++++--------------
| 23 +++++++++++----------
lib/yahns.rb | 1 +
lib/yahns/acceptor.rb | 2 +-
lib/yahns/client_expire_generic.rb | 2 +-
lib/yahns/http_client.rb | 32 ++++++++++-------------------
lib/yahns/http_response.rb | 8 ++++----
lib/yahns/openssl_client.rb | 6 +++---
lib/yahns/proxy_http_response.rb | 6 +++---
lib/yahns/req_res.rb | 4 ++--
lib/yahns/server.rb | 2 +-
lib/yahns/server_mp.rb | 2 +-
lib/yahns/sigevent_efd.rb | 1 -
lib/yahns/sigevent_pipe.rb | 10 +++------
lib/yahns/worker.rb | 6 +++---
15 files changed, 73 insertions(+), 74 deletions(-)
--git a/extras/exec_cgi.rb b/extras/exec_cgi.rb
index b546e1f..15fe7de 100644
--- a/extras/exec_cgi.rb
+++ b/extras/exec_cgi.rb
@@ -21,18 +21,29 @@
# run ExecCgi.new('/path/to/cgit.cgi') # cgit: https://git.zx2c4.com/cgit/
#
class ExecCgi
- class MyIO < Kgio::Pipe
+ class MyIO
attr_writer :my_pid
attr_writer :body_tip
+ attr_reader :rd, :wr
+
+ def initialize
+ @rd, @wr = IO.pipe
+ end
def each
buf = @body_tip || ''.dup
if buf.size > 0
yield buf
end
- while tmp = kgio_read(8192, buf)
+
+ case tmp = @rd.read_nonblock(8192, buf, exception: false)
+ when String
yield tmp
- end
+ when :wait_readable
+ @rd.wait_readable
+ when nil
+ break
+ end while true
self
ensure
# do this sooner, since the response body may be buffered, we want
@@ -46,8 +57,8 @@ def close
# Note: this object (and any client-specific objects) will never
# be shared across different threads, so we do not need extra
# mutual exclusion here.
- return if closed?
- super
+ return if @rd.closed?
+ @rd.close
begin
Process.waitpid(@my_pid)
rescue Errno::ECHILD
@@ -90,20 +101,21 @@ def call(env)
cgi_env = { "GATEWAY_INTERFACE" => "CGI/1.1" }
PASS_VARS.each { |key| val = env[key] and cgi_env[key] = val }
env.each { |key,val| cgi_env[key] = val if key =~ /\AHTTP_/ }
- pipe = MyIO.pipe
- errbody = pipe[0]
+ io = MyIO.new
+ errbody = io
errbody.my_pid = Process.spawn(cgi_env.merge!(@env), *@args,
- out: pipe[1], close_others: true)
- pipe[1].close
- pipe = pipe[0]
+ out: io.wr, close_others: true)
+ io.wr.close
+ rd = io.rd
- if head = pipe.kgio_read(8192)
+ begin
+ head = rd.readpartial(8192)
until head =~ /\r?\n\r?\n/
- tmp = pipe.kgio_read(8192) or break
+ tmp = rd.readpartial(8192)
head << tmp
end
head, body = head.split(/\r?\n\r?\n/, 2)
- pipe.body_tip = body
+ io.body_tip = body
env["HTTP_VERSION"] ||= "HTTP/1.0" # stop Rack::Chunked for HTTP/0.9
@@ -117,8 +129,8 @@ def call(env)
end
status = headers.delete("Status") || 200
errbody = nil
- [ status, headers, pipe ]
- else
+ [ status, headers, io ]
+ rescue EOFError
[ 500, { "Content-Length" => "0", "Content-Type" => "text/plain" }, [] ]
end
ensure
--git a/extras/proxy_pass.rb b/extras/proxy_pass.rb
index 310da9e..9c03f42 100644
--- a/extras/proxy_pass.rb
+++ b/extras/proxy_pass.rb
@@ -9,6 +9,7 @@
require 'rack/request'
require 'thread'
require 'timeout'
+require 'io/wait'
# Totally synchronous and Rack 1.1-compatible, this will probably be rewritten.
# to take advantage of rack.hijack and use the non-blocking I/O facilities
@@ -35,11 +36,6 @@ def put(obj)
class UpstreamSocket < Kgio::Socket # :nodoc:
attr_writer :expiry
- # called automatically by kgio_read!
- def kgio_wait_readable(timeout = nil)
- super(timeout || wait_time)
- end
-
def wait_time
tout = @expiry ? @expiry - Time.now : @timeout
raise Timeout::Error, "request timed out", [] if tout < 0
@@ -47,11 +43,15 @@ def wait_time
end
def readpartial(bytes, buf = Thread.current[:proxy_pass_buf] ||= ''.dup)
- case rv = kgio_read!(bytes, buf)
- when String
+ case rv = read_nonblock(bytes, buf, exception: false)
+ when :wait_readable
+ wait_readable(wait_time)
+ when nil
+ return rv
+ else
@expiry += @timeout # bump expiry when we succeed
- end
- rv
+ return rv
+ end while true
end
def req_write(buf, timeout)
@@ -59,7 +59,7 @@ def req_write(buf, timeout)
@expiry = Time.now + timeout
case rv = kgio_trywrite(buf)
when :wait_writable
- kgio_wait_writable(wait_time)
+ wait_writable(wait_time)
when nil
return
when String
@@ -84,7 +84,8 @@ def req_write(req, timeout)
# returns true if the socket is still alive, nil if dead
def sock_alive?
- @reused = (:wait_readable == (@sock.kgio_tryread(1) rescue nil)) ?
+ @reused = (:wait_readable ==
+ (@sock.read_nonblock(1, ''.b, exception: false) rescue nil)) ?
true : @sock.close
end
diff --git a/lib/yahns.rb b/lib/yahns.rb
index a0abe49..a9a1c76 100644
--- a/lib/yahns.rb
+++ b/lib/yahns.rb
@@ -4,6 +4,7 @@
$stdout.sync = $stderr.sync = true
require 'unicorn' # pulls in raindrops, kgio, fcntl, etc, stringio, and logger
+require 'io/wait'
require 'sleepy_penguin'
# kill off some unicorn internals we don't need
diff --git a/lib/yahns/acceptor.rb b/lib/yahns/acceptor.rb
index 7ab9f60..7c0f368 100644
--- a/lib/yahns/acceptor.rb
+++ b/lib/yahns/acceptor.rb
@@ -31,7 +31,7 @@ def ac_quit
begin
# try to connect to kick it out of the blocking accept() syscall
killer = Kgio::Socket.start(getsockname)
- killer.kgio_write("G") # first byte of "GET / HTTP/1.0\r\n\r\n"
+ killer.write("G") # first byte of "GET / HTTP/1.0\r\n\r\n"
ensure
killer.close if killer
end
diff --git a/lib/yahns/client_expire_generic.rb b/lib/yahns/client_expire_generic.rb
index 59c1b46..2165af9 100644
--- a/lib/yahns/client_expire_generic.rb
+++ b/lib/yahns/client_expire_generic.rb
@@ -29,7 +29,7 @@ def kgio_trywrite(*args)
super
end
- def kgio_tryread(*args)
+ def read_nonblock(*args)
@last_io_at = __timestamp
super
end
diff --git a/lib/yahns/http_client.rb b/lib/yahns/http_client.rb
index d8154a4..f0b61fd 100644
--- a/lib/yahns/http_client.rb
+++ b/lib/yahns/http_client.rb
@@ -80,11 +80,11 @@ def input_ready
# returns true if we want to keep looping on this
# returns :wait_readable/wait_writable/nil to yield back to epoll
def fill_body(rsize, rbuf)
- case rv = kgio_tryread(rsize, rbuf)
+ case rv = read_nonblock(rsize, rbuf, exception: false)
when String
@hs.filter_body(rbuf, @hs.buf << rbuf)
@input.write(rbuf)
- true # keep looping on kgio_tryread (but check body_eof? first)
+ true # keep looping on read_nonblock (but check body_eof? first)
when :wait_readable, :wait_writable
rv # have epoll/kqueue wait for more
when nil # unexpected EOF
@@ -95,13 +95,13 @@ def fill_body(rsize, rbuf)
# returns true if we are ready to dispatch the app
# returns :wait_readable/wait_writable/nil to yield back to epoll
def read_trailers(rsize, rbuf)
- case rv = kgio_tryread(rsize, rbuf)
+ case rv = read_nonblock(rsize, rbuf, exception: false)
when String
if @hs.add_parse(rbuf)
@input.rewind
return true
end
- # keep looping on kgio_tryread...
+ # keep looping on read_nonblock...
when :wait_readable, :wait_writable
return rv # wait for more
when nil # unexpected EOF
@@ -133,7 +133,8 @@ def yahns_step
end
# continue to outer loop
when :headers
- case rv = kgio_tryread(k.client_header_buffer_size, rbuf)
+ case rv = read_nonblock(k.client_header_buffer_size, rbuf,
+ exception: false)
when String
if @hs.add_parse(rv)
case input = input_ready
@@ -143,7 +144,7 @@ def yahns_step
return app_call(input)
end
end
- # keep looping on kgio_tryread
+ # keep looping on read_nonblock
when :wait_readable, :wait_writable, nil
return rv
end while true
@@ -235,25 +236,14 @@ def app_call(input)
http_response_write(res, opt)
end
- # called automatically by kgio_write
- def kgio_wait_writable(timeout = self.class.client_timeout)
- super timeout
- end
-
- # called automatically by kgio_read
- def kgio_wait_readable(timeout = self.class.client_timeout)
- super timeout
- end
-
# used by StreamInput (and thus TeeInput) for input_buffering {false|:lazy}
def yahns_read(bytes, buf)
- case rv = kgio_tryread(bytes, buf)
+ case rv = read_nonblock(bytes, buf, exception: false)
when String, nil
return rv
- when :wait_readable
- kgio_wait_readable or raise Yahns::ClientTimeout, "waiting for read", []
- when :wait_writable
- kgio_wait_writable or raise Yahns::ClientTimeout, "waiting for write", []
+ when :wait_readable, :wait_writable
+ __send__(rv, self.class.client_timeout) or
+ raise Yahns::ClientTimeout, "on #{rv}", []
end while true
end
diff --git a/lib/yahns/http_response.rb b/lib/yahns/http_response.rb
index 865193d..86b7f56 100644
--- a/lib/yahns/http_response.rb
+++ b/lib/yahns/http_response.rb
@@ -47,11 +47,11 @@ def response_start
end
def response_wait_write(rv)
- # call the kgio_wait_readable or kgio_wait_writable method
- ok = __send__("kgio_#{rv}") and return ok
+ # call the wait_readable or wait_writable method
k = self.class
- k.logger.info("fd=#{fileno} ip=#@kgio_addr timeout on :#{rv} after "\
- "#{k.client_timeout}s")
+ t = k.client_timeout
+ ok = __send__(rv, t) and return ok
+ k.logger.info("fd=#{fileno} ip=#@kgio_addr timeout on :#{rv} after #{t}s")
false
end
diff --git a/lib/yahns/openssl_client.rb b/lib/yahns/openssl_client.rb
index 0d376bd..a86d701 100644
--- a/lib/yahns/openssl_client.rb
+++ b/lib/yahns/openssl_client.rb
@@ -13,7 +13,7 @@ def self.included(cls)
# This is a bit weird, since OpenSSL::SSL::SSLSocket wraps
# our actual socket, too, so we must take care to not blindly
# use method_missing and cause infinite recursion
- %w(sync= read write readpartial write_nonblock read_nonblock
+ %w(sync= read write readpartial write_nonblock
print printf puts gets readlines readline getc
readchar ungetc eof eof? << flush
sysread syswrite).map!(&:to_sym).each do |m|
@@ -59,7 +59,7 @@ def kgio_syssend(buf, flags)
kgio_trywrite(buf)
end
- def kgio_tryread(len, buf)
+ def read_nonblock(len, buf = nil, exception: true)
if @need_accept
# most protocols require read before write, so we start the negotiation
# process here:
@@ -69,7 +69,7 @@ def kgio_tryread(len, buf)
end
@need_accept = false
end
- @ssl.read_nonblock(len, buf, exception: false)
+ @ssl.read_nonblock(len, buf, exception: exception)
end
def trysendio(io, offset, count)
diff --git a/lib/yahns/proxy_http_response.rb b/lib/yahns/proxy_http_response.rb
index 7df2834..d9c878e 100644
--- a/lib/yahns/proxy_http_response.rb
+++ b/lib/yahns/proxy_http_response.rb
@@ -153,13 +153,13 @@ def proxy_read_body(tip, kcar, req_res)
alive = req_res.alive
wbuf = req_res.resbuf
- case tmp = tip.shift || req_res.kgio_tryread(0x2000, rbuf)
+ case tmp = tip.shift || req_res.read_nonblock(0x2000, rbuf, exception:false)
when String
if len
kcar.body_bytes_left -= tmp.size # progress for body_eof? => true
elsif chunk
kcar.filter_body(chunk, rbuf = tmp) # progress for body_eof? => true
- next if chunk.empty? # call req_res.kgio_tryread for more
+ next if chunk.empty? # call req_res.read_nonblock for more
tmp = chunk_out(chunk)
elsif alive # HTTP/1.0 upstream, HTTP/1.1 client
tmp = chunk_out(tmp)
@@ -200,7 +200,7 @@ def proxy_read_trailers(kcar, req_res)
wbuf = req_res.resbuf
until kcar.trailers(tlr, chunk)
- case rv = req_res.kgio_tryread(0x2000, rbuf)
+ case rv = req_res.read_nonblock(0x2000, rbuf, exception: false)
when String
chunk << rv
when :wait_readable
diff --git a/lib/yahns/req_res.rb b/lib/yahns/req_res.rb
index 4ad8e5c..0fa4285 100644
--- a/lib/yahns/req_res.rb
+++ b/lib/yahns/req_res.rb
@@ -29,7 +29,7 @@ def yahns_step # yahns event loop entry point
case resbuf = @resbuf # where are we at the response?
when nil # common case, catch the response header in a single read
- case rv = kgio_tryread(0x2000, buf)
+ case rv = read_nonblock(0x2000, buf, exception: false)
when String
if res = req.headers(@hdr = [], rv)
return c.proxy_response_start(res, rv, req, self)
@@ -48,7 +48,7 @@ def yahns_step # yahns event loop entry point
when String # continue reading trickled response headers from upstream
- case rv = kgio_tryread(0x2000, buf)
+ case rv = read_nonblock(0x2000, buf, exception: false)
when String then res = req.headers(@hdr, resbuf << rv) and break
when :wait_readable then return rv
when nil
diff --git a/lib/yahns/server.rb b/lib/yahns/server.rb
index 00e5f15..2f2c57a 100644
--- a/lib/yahns/server.rb
+++ b/lib/yahns/server.rb
@@ -473,7 +473,7 @@ def reap_reexec
end
def sp_sig_handle(alive)
- @sev.kgio_wait_readable(alive ? nil : 0.01)
+ @sev.wait_readable(alive ? nil : 0.01)
@sev.yahns_step
case sig = @sig_queue.shift
when :QUIT, :TERM, :INT
diff --git a/lib/yahns/server_mp.rb b/lib/yahns/server_mp.rb
index c9cd207..8982401 100644
--- a/lib/yahns/server_mp.rb
+++ b/lib/yahns/server_mp.rb
@@ -91,7 +91,7 @@ def join
@logger.info "master process ready"
daemon_ready
begin
- @sev.kgio_wait_readable
+ @sev.to_io.wait_readable
@sev.yahns_step
reap_all
case @sig_queue.shift
diff --git a/lib/yahns/sigevent_efd.rb b/lib/yahns/sigevent_efd.rb
index 1250cf4..264097d 100644
--- a/lib/yahns/sigevent_efd.rb
+++ b/lib/yahns/sigevent_efd.rb
@@ -3,7 +3,6 @@
# License: GPL-3.0+ (https://www.gnu.org/licenses/gpl-3.0.txt)
# frozen_string_literal: true
class Yahns::Sigevent < SleepyPenguin::EventFD # :nodoc:
- include Kgio::DefaultWaiters
def self.new
super(0, :CLOEXEC)
end
diff --git a/lib/yahns/sigevent_pipe.rb b/lib/yahns/sigevent_pipe.rb
index a85fb2a..00cd9e4 100644
--- a/lib/yahns/sigevent_pipe.rb
+++ b/lib/yahns/sigevent_pipe.rb
@@ -5,21 +5,17 @@
class Yahns::Sigevent # :nodoc:
attr_reader :to_io
def initialize
- @to_io, @wr = Kgio::Pipe.new
+ @to_io, @wr = IO.pipe
@to_io.close_on_exec = @wr.close_on_exec = true
end
- def kgio_wait_readable(*args)
- @to_io.kgio_wait_readable(*args)
- end
-
def sev_signal
- @wr.kgio_trywrite(".")
+ @wr.write_nonblock(".", exception: false)
end
def yahns_step
# 11 byte strings -> no malloc on YARV
- while String === @to_io.kgio_tryread(11)
+ while String === @to_io.read_nonblock(11, exception: false)
end
:wait_readable
end
diff --git a/lib/yahns/worker.rb b/lib/yahns/worker.rb
index 9192803..8886936 100644
--- a/lib/yahns/worker.rb
+++ b/lib/yahns/worker.rb
@@ -8,7 +8,7 @@ class Yahns::Worker # :nodoc:
def initialize(nr)
@nr = nr
- @to_io, @wr = Kgio::Pipe.new
+ @to_io, @wr = IO.pipe
end
def atfork_child
@@ -24,7 +24,7 @@ def atfork_parent
# This causes the worker to gracefully exit if the master
# dies unexpectedly.
def yahns_step
- case buf = @to_io.kgio_tryread(4)
+ case buf = @to_io.read_nonblock(4, exception: false)
when String
# unpack the buffer and trigger the signal handler
signum = buf.unpack('l')
@@ -60,7 +60,7 @@ def fake_sig(sig) # :nodoc:
def soft_kill(signum) # :nodoc:
# writing and reading 4 bytes on a pipe is atomic on all POSIX platforms
# Do not care in the odd case the buffer is full, here.
- @wr.kgio_trywrite([signum].pack('l'))
+ @wr.write_nonblock([signum].pack('l'), exception: false)
rescue Errno::EPIPE
# worker will be reaped soon
end
--
EW
next prev parent reply other threads:[~2017-03-06 3:30 UTC|newest]
Thread overview: 3+ messages / expand[flat|nested] mbox.gz Atom feed top
2017-03-06 3:30 [PATCH 0/2] WIP remove-kgio branch pushed branch Eric Wong
2017-03-06 3:30 ` Eric Wong [this message]
2017-03-06 3:30 ` [PATCH 2/2] drop writev support Eric Wong
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
List information: https://yhbt.net/yahns/README
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20170306033011.26938-2-yahns-public@yhbt.net \
--to=yahns-public@yhbt.net \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
Code repositories for project(s) associated with this public inbox
https://yhbt.net/yahns.git/
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).