From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,AWL,BAYES_00, RP_MATCHES_RCVD shortcircuit=no autolearn=ham autolearn_force=no version=3.4.0 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 3ED97202FB for ; Mon, 6 Mar 2017 03:30:18 +0000 (UTC) From: Eric Wong To: yahns-public@yhbt.net Subject: [PATCH 1/2] remove kgio read and wait dependencies Date: Mon, 6 Mar 2017 03:30:10 +0000 Message-Id: <20170306033011.26938-2-yahns-public@yhbt.net> In-Reply-To: <20170306033011.26938-1-yahns-public@yhbt.net> References: <20170306033011.26938-1-yahns-public@yhbt.net> List-Id: Ruby 2.4+ should contain everything we need to replace kgio; and we can provide reasonable (if not slower) facsimiles for older Rubies. The reading and wait_*able APIs of kgio are similar to APIs in the IO core class, so lets deal with that, first. Removing dependencies on writing will be slightly trickier... --- extras/exec_cgi.rb | 42 ++++++++++++++++++++++++-------------- extras/proxy_pass.rb | 23 +++++++++++---------- lib/yahns.rb | 1 + lib/yahns/acceptor.rb | 2 +- lib/yahns/client_expire_generic.rb | 2 +- lib/yahns/http_client.rb | 32 ++++++++++------------------- lib/yahns/http_response.rb | 8 ++++---- lib/yahns/openssl_client.rb | 6 +++--- lib/yahns/proxy_http_response.rb | 6 +++--- lib/yahns/req_res.rb | 4 ++-- lib/yahns/server.rb | 2 +- lib/yahns/server_mp.rb | 2 +- lib/yahns/sigevent_efd.rb | 1 - lib/yahns/sigevent_pipe.rb | 10 +++------ lib/yahns/worker.rb | 6 +++--- 15 files changed, 73 insertions(+), 74 deletions(-) diff --git a/extras/exec_cgi.rb b/extras/exec_cgi.rb index b546e1f..15fe7de 100644 --- a/extras/exec_cgi.rb +++ b/extras/exec_cgi.rb @@ -21,18 +21,29 @@ # run ExecCgi.new('/path/to/cgit.cgi') # cgit: https://git.zx2c4.com/cgit/ # class ExecCgi - class MyIO < Kgio::Pipe + class MyIO attr_writer :my_pid attr_writer :body_tip + attr_reader :rd, :wr + + def initialize + @rd, @wr = IO.pipe + end def each buf = @body_tip || ''.dup if buf.size > 0 yield buf end - while tmp = kgio_read(8192, buf) + + case tmp = @rd.read_nonblock(8192, buf, exception: false) + when String yield tmp - end + when :wait_readable + @rd.wait_readable + when nil + break + end while true self ensure # do this sooner, since the response body may be buffered, we want @@ -46,8 +57,8 @@ def close # Note: this object (and any client-specific objects) will never # be shared across different threads, so we do not need extra # mutual exclusion here. - return if closed? - super + return if @rd.closed? + @rd.close begin Process.waitpid(@my_pid) rescue Errno::ECHILD @@ -90,20 +101,21 @@ def call(env) cgi_env = { "GATEWAY_INTERFACE" => "CGI/1.1" } PASS_VARS.each { |key| val = env[key] and cgi_env[key] = val } env.each { |key,val| cgi_env[key] = val if key =~ /\AHTTP_/ } - pipe = MyIO.pipe - errbody = pipe[0] + io = MyIO.new + errbody = io errbody.my_pid = Process.spawn(cgi_env.merge!(@env), *@args, - out: pipe[1], close_others: true) - pipe[1].close - pipe = pipe[0] + out: io.wr, close_others: true) + io.wr.close + rd = io.rd - if head = pipe.kgio_read(8192) + begin + head = rd.readpartial(8192) until head =~ /\r?\n\r?\n/ - tmp = pipe.kgio_read(8192) or break + tmp = rd.readpartial(8192) head << tmp end head, body = head.split(/\r?\n\r?\n/, 2) - pipe.body_tip = body + io.body_tip = body env["HTTP_VERSION"] ||= "HTTP/1.0" # stop Rack::Chunked for HTTP/0.9 @@ -117,8 +129,8 @@ def call(env) end status = headers.delete("Status") || 200 errbody = nil - [ status, headers, pipe ] - else + [ status, headers, io ] + rescue EOFError [ 500, { "Content-Length" => "0", "Content-Type" => "text/plain" }, [] ] end ensure diff --git a/extras/proxy_pass.rb b/extras/proxy_pass.rb index 310da9e..9c03f42 100644 --- a/extras/proxy_pass.rb +++ b/extras/proxy_pass.rb @@ -9,6 +9,7 @@ require 'rack/request' require 'thread' require 'timeout' +require 'io/wait' # Totally synchronous and Rack 1.1-compatible, this will probably be rewritten. # to take advantage of rack.hijack and use the non-blocking I/O facilities @@ -35,11 +36,6 @@ def put(obj) class UpstreamSocket < Kgio::Socket # :nodoc: attr_writer :expiry - # called automatically by kgio_read! - def kgio_wait_readable(timeout = nil) - super(timeout || wait_time) - end - def wait_time tout = @expiry ? @expiry - Time.now : @timeout raise Timeout::Error, "request timed out", [] if tout < 0 @@ -47,11 +43,15 @@ def wait_time end def readpartial(bytes, buf = Thread.current[:proxy_pass_buf] ||= ''.dup) - case rv = kgio_read!(bytes, buf) - when String + case rv = read_nonblock(bytes, buf, exception: false) + when :wait_readable + wait_readable(wait_time) + when nil + return rv + else @expiry += @timeout # bump expiry when we succeed - end - rv + return rv + end while true end def req_write(buf, timeout) @@ -59,7 +59,7 @@ def req_write(buf, timeout) @expiry = Time.now + timeout case rv = kgio_trywrite(buf) when :wait_writable - kgio_wait_writable(wait_time) + wait_writable(wait_time) when nil return when String @@ -84,7 +84,8 @@ def req_write(req, timeout) # returns true if the socket is still alive, nil if dead def sock_alive? - @reused = (:wait_readable == (@sock.kgio_tryread(1) rescue nil)) ? + @reused = (:wait_readable == + (@sock.read_nonblock(1, ''.b, exception: false) rescue nil)) ? true : @sock.close end diff --git a/lib/yahns.rb b/lib/yahns.rb index a0abe49..a9a1c76 100644 --- a/lib/yahns.rb +++ b/lib/yahns.rb @@ -4,6 +4,7 @@ $stdout.sync = $stderr.sync = true require 'unicorn' # pulls in raindrops, kgio, fcntl, etc, stringio, and logger +require 'io/wait' require 'sleepy_penguin' # kill off some unicorn internals we don't need diff --git a/lib/yahns/acceptor.rb b/lib/yahns/acceptor.rb index 7ab9f60..7c0f368 100644 --- a/lib/yahns/acceptor.rb +++ b/lib/yahns/acceptor.rb @@ -31,7 +31,7 @@ def ac_quit begin # try to connect to kick it out of the blocking accept() syscall killer = Kgio::Socket.start(getsockname) - killer.kgio_write("G") # first byte of "GET / HTTP/1.0\r\n\r\n" + killer.write("G") # first byte of "GET / HTTP/1.0\r\n\r\n" ensure killer.close if killer end diff --git a/lib/yahns/client_expire_generic.rb b/lib/yahns/client_expire_generic.rb index 59c1b46..2165af9 100644 --- a/lib/yahns/client_expire_generic.rb +++ b/lib/yahns/client_expire_generic.rb @@ -29,7 +29,7 @@ def kgio_trywrite(*args) super end - def kgio_tryread(*args) + def read_nonblock(*args) @last_io_at = __timestamp super end diff --git a/lib/yahns/http_client.rb b/lib/yahns/http_client.rb index d8154a4..f0b61fd 100644 --- a/lib/yahns/http_client.rb +++ b/lib/yahns/http_client.rb @@ -80,11 +80,11 @@ def input_ready # returns true if we want to keep looping on this # returns :wait_readable/wait_writable/nil to yield back to epoll def fill_body(rsize, rbuf) - case rv = kgio_tryread(rsize, rbuf) + case rv = read_nonblock(rsize, rbuf, exception: false) when String @hs.filter_body(rbuf, @hs.buf << rbuf) @input.write(rbuf) - true # keep looping on kgio_tryread (but check body_eof? first) + true # keep looping on read_nonblock (but check body_eof? first) when :wait_readable, :wait_writable rv # have epoll/kqueue wait for more when nil # unexpected EOF @@ -95,13 +95,13 @@ def fill_body(rsize, rbuf) # returns true if we are ready to dispatch the app # returns :wait_readable/wait_writable/nil to yield back to epoll def read_trailers(rsize, rbuf) - case rv = kgio_tryread(rsize, rbuf) + case rv = read_nonblock(rsize, rbuf, exception: false) when String if @hs.add_parse(rbuf) @input.rewind return true end - # keep looping on kgio_tryread... + # keep looping on read_nonblock... when :wait_readable, :wait_writable return rv # wait for more when nil # unexpected EOF @@ -133,7 +133,8 @@ def yahns_step end # continue to outer loop when :headers - case rv = kgio_tryread(k.client_header_buffer_size, rbuf) + case rv = read_nonblock(k.client_header_buffer_size, rbuf, + exception: false) when String if @hs.add_parse(rv) case input = input_ready @@ -143,7 +144,7 @@ def yahns_step return app_call(input) end end - # keep looping on kgio_tryread + # keep looping on read_nonblock when :wait_readable, :wait_writable, nil return rv end while true @@ -235,25 +236,14 @@ def app_call(input) http_response_write(res, opt) end - # called automatically by kgio_write - def kgio_wait_writable(timeout = self.class.client_timeout) - super timeout - end - - # called automatically by kgio_read - def kgio_wait_readable(timeout = self.class.client_timeout) - super timeout - end - # used by StreamInput (and thus TeeInput) for input_buffering {false|:lazy} def yahns_read(bytes, buf) - case rv = kgio_tryread(bytes, buf) + case rv = read_nonblock(bytes, buf, exception: false) when String, nil return rv - when :wait_readable - kgio_wait_readable or raise Yahns::ClientTimeout, "waiting for read", [] - when :wait_writable - kgio_wait_writable or raise Yahns::ClientTimeout, "waiting for write", [] + when :wait_readable, :wait_writable + __send__(rv, self.class.client_timeout) or + raise Yahns::ClientTimeout, "on #{rv}", [] end while true end diff --git a/lib/yahns/http_response.rb b/lib/yahns/http_response.rb index 865193d..86b7f56 100644 --- a/lib/yahns/http_response.rb +++ b/lib/yahns/http_response.rb @@ -47,11 +47,11 @@ def response_start end def response_wait_write(rv) - # call the kgio_wait_readable or kgio_wait_writable method - ok = __send__("kgio_#{rv}") and return ok + # call the wait_readable or wait_writable method k = self.class - k.logger.info("fd=#{fileno} ip=#@kgio_addr timeout on :#{rv} after "\ - "#{k.client_timeout}s") + t = k.client_timeout + ok = __send__(rv, t) and return ok + k.logger.info("fd=#{fileno} ip=#@kgio_addr timeout on :#{rv} after #{t}s") false end diff --git a/lib/yahns/openssl_client.rb b/lib/yahns/openssl_client.rb index 0d376bd..a86d701 100644 --- a/lib/yahns/openssl_client.rb +++ b/lib/yahns/openssl_client.rb @@ -13,7 +13,7 @@ def self.included(cls) # This is a bit weird, since OpenSSL::SSL::SSLSocket wraps # our actual socket, too, so we must take care to not blindly # use method_missing and cause infinite recursion - %w(sync= read write readpartial write_nonblock read_nonblock + %w(sync= read write readpartial write_nonblock print printf puts gets readlines readline getc readchar ungetc eof eof? << flush sysread syswrite).map!(&:to_sym).each do |m| @@ -59,7 +59,7 @@ def kgio_syssend(buf, flags) kgio_trywrite(buf) end - def kgio_tryread(len, buf) + def read_nonblock(len, buf = nil, exception: true) if @need_accept # most protocols require read before write, so we start the negotiation # process here: @@ -69,7 +69,7 @@ def kgio_tryread(len, buf) end @need_accept = false end - @ssl.read_nonblock(len, buf, exception: false) + @ssl.read_nonblock(len, buf, exception: exception) end def trysendio(io, offset, count) diff --git a/lib/yahns/proxy_http_response.rb b/lib/yahns/proxy_http_response.rb index 7df2834..d9c878e 100644 --- a/lib/yahns/proxy_http_response.rb +++ b/lib/yahns/proxy_http_response.rb @@ -153,13 +153,13 @@ def proxy_read_body(tip, kcar, req_res) alive = req_res.alive wbuf = req_res.resbuf - case tmp = tip.shift || req_res.kgio_tryread(0x2000, rbuf) + case tmp = tip.shift || req_res.read_nonblock(0x2000, rbuf, exception:false) when String if len kcar.body_bytes_left -= tmp.size # progress for body_eof? => true elsif chunk kcar.filter_body(chunk, rbuf = tmp) # progress for body_eof? => true - next if chunk.empty? # call req_res.kgio_tryread for more + next if chunk.empty? # call req_res.read_nonblock for more tmp = chunk_out(chunk) elsif alive # HTTP/1.0 upstream, HTTP/1.1 client tmp = chunk_out(tmp) @@ -200,7 +200,7 @@ def proxy_read_trailers(kcar, req_res) wbuf = req_res.resbuf until kcar.trailers(tlr, chunk) - case rv = req_res.kgio_tryread(0x2000, rbuf) + case rv = req_res.read_nonblock(0x2000, rbuf, exception: false) when String chunk << rv when :wait_readable diff --git a/lib/yahns/req_res.rb b/lib/yahns/req_res.rb index 4ad8e5c..0fa4285 100644 --- a/lib/yahns/req_res.rb +++ b/lib/yahns/req_res.rb @@ -29,7 +29,7 @@ def yahns_step # yahns event loop entry point case resbuf = @resbuf # where are we at the response? when nil # common case, catch the response header in a single read - case rv = kgio_tryread(0x2000, buf) + case rv = read_nonblock(0x2000, buf, exception: false) when String if res = req.headers(@hdr = [], rv) return c.proxy_response_start(res, rv, req, self) @@ -48,7 +48,7 @@ def yahns_step # yahns event loop entry point when String # continue reading trickled response headers from upstream - case rv = kgio_tryread(0x2000, buf) + case rv = read_nonblock(0x2000, buf, exception: false) when String then res = req.headers(@hdr, resbuf << rv) and break when :wait_readable then return rv when nil diff --git a/lib/yahns/server.rb b/lib/yahns/server.rb index 00e5f15..2f2c57a 100644 --- a/lib/yahns/server.rb +++ b/lib/yahns/server.rb @@ -473,7 +473,7 @@ def reap_reexec end def sp_sig_handle(alive) - @sev.kgio_wait_readable(alive ? nil : 0.01) + @sev.wait_readable(alive ? nil : 0.01) @sev.yahns_step case sig = @sig_queue.shift when :QUIT, :TERM, :INT diff --git a/lib/yahns/server_mp.rb b/lib/yahns/server_mp.rb index c9cd207..8982401 100644 --- a/lib/yahns/server_mp.rb +++ b/lib/yahns/server_mp.rb @@ -91,7 +91,7 @@ def join @logger.info "master process ready" daemon_ready begin - @sev.kgio_wait_readable + @sev.to_io.wait_readable @sev.yahns_step reap_all case @sig_queue.shift diff --git a/lib/yahns/sigevent_efd.rb b/lib/yahns/sigevent_efd.rb index 1250cf4..264097d 100644 --- a/lib/yahns/sigevent_efd.rb +++ b/lib/yahns/sigevent_efd.rb @@ -3,7 +3,6 @@ # License: GPL-3.0+ (https://www.gnu.org/licenses/gpl-3.0.txt) # frozen_string_literal: true class Yahns::Sigevent < SleepyPenguin::EventFD # :nodoc: - include Kgio::DefaultWaiters def self.new super(0, :CLOEXEC) end diff --git a/lib/yahns/sigevent_pipe.rb b/lib/yahns/sigevent_pipe.rb index a85fb2a..00cd9e4 100644 --- a/lib/yahns/sigevent_pipe.rb +++ b/lib/yahns/sigevent_pipe.rb @@ -5,21 +5,17 @@ class Yahns::Sigevent # :nodoc: attr_reader :to_io def initialize - @to_io, @wr = Kgio::Pipe.new + @to_io, @wr = IO.pipe @to_io.close_on_exec = @wr.close_on_exec = true end - def kgio_wait_readable(*args) - @to_io.kgio_wait_readable(*args) - end - def sev_signal - @wr.kgio_trywrite(".") + @wr.write_nonblock(".", exception: false) end def yahns_step # 11 byte strings -> no malloc on YARV - while String === @to_io.kgio_tryread(11) + while String === @to_io.read_nonblock(11, exception: false) end :wait_readable end diff --git a/lib/yahns/worker.rb b/lib/yahns/worker.rb index 9192803..8886936 100644 --- a/lib/yahns/worker.rb +++ b/lib/yahns/worker.rb @@ -8,7 +8,7 @@ class Yahns::Worker # :nodoc: def initialize(nr) @nr = nr - @to_io, @wr = Kgio::Pipe.new + @to_io, @wr = IO.pipe end def atfork_child @@ -24,7 +24,7 @@ def atfork_parent # This causes the worker to gracefully exit if the master # dies unexpectedly. def yahns_step - case buf = @to_io.kgio_tryread(4) + case buf = @to_io.read_nonblock(4, exception: false) when String # unpack the buffer and trigger the signal handler signum = buf.unpack('l') @@ -60,7 +60,7 @@ def fake_sig(sig) # :nodoc: def soft_kill(signum) # :nodoc: # writing and reading 4 bytes on a pipe is atomic on all POSIX platforms # Do not care in the odd case the buffer is full, here. - @wr.kgio_trywrite([signum].pack('l')) + @wr.write_nonblock([signum].pack('l'), exception: false) rescue Errno::EPIPE # worker will be reaped soon end -- EW