From d0412e7305d7b6b4abc64996119e5722709bb6b0 Mon Sep 17 00:00:00 2001 From: Jean Boussier Date: Fri, 8 Jul 2022 12:58:25 +0200 Subject: [PATCH] Get rid of Kgio As discussed kgio is no longer absolutely necessary. We can use Ruby 2+ non blocking IO capabilities instead. --- lib/unicorn.rb | 3 +-- lib/unicorn/http_request.rb | 15 +++++++++----- lib/unicorn/http_server.rb | 33 +++++++++++++++++++++-------- lib/unicorn/socket_helper.rb | 20 ++++-------------- lib/unicorn/stream_input.rb | 16 ++++++++------ lib/unicorn/worker.rb | 38 ++++++++++++++++++++++------------ t/oob_gc.ru | 2 +- t/oob_gc_path.ru | 2 +- test/unit/test_request.rb | 16 +++----------- test/unit/test_stream_input.rb | 7 ++----- test/unit/test_tee_input.rb | 2 +- unicorn.gemspec | 1 - 12 files changed, 82 insertions(+), 73 deletions(-) diff --git a/lib/unicorn.rb b/lib/unicorn.rb index 1a50631..170719c 100644 --- a/lib/unicorn.rb +++ b/lib/unicorn.rb @@ -1,7 +1,6 @@ # -*- encoding: binary -*- require 'etc' require 'stringio' -require 'kgio' require 'raindrops' require 'io/wait' @@ -113,7 +112,7 @@ def self.log_error(logger, prefix, exc) F_SETPIPE_SZ = 1031 if RUBY_PLATFORM =~ /linux/ def self.pipe # :nodoc: - Kgio::Pipe.new.each do |io| + IO.pipe.each do |io| # shrink pipes to minimize impact on /proc/sys/fs/pipe-user-pages-soft # limits. if defined?(F_SETPIPE_SZ) diff --git a/lib/unicorn/http_request.rb b/lib/unicorn/http_request.rb index e3ad592..7f7324b 100644 --- a/lib/unicorn/http_request.rb +++ b/lib/unicorn/http_request.rb @@ -71,14 +71,19 @@ def read(socket) # identify the client for the immediate request to the server; # that client may be a proxy, gateway, or other intermediary # acting on behalf of the actual source client." - e['REMOTE_ADDR'] = socket.kgio_addr + address = socket.remote_address + e['REMOTE_ADDR'] = if address.unix? + "127.0.0.1" + else + address.ip_address + end # short circuit the common case with small GET requests first - socket.kgio_read!(16384, buf) + socket.readpartial(16384, buf) if parse.nil? # Parser is not done, queue up more data to read and continue parsing # an Exception thrown from the parser will throw us out of the loop - false until add_parse(socket.kgio_read!(16384)) + false until add_parse(socket.readpartial(16384)) end check_client_connection(socket) if @@check_client_connection @@ -108,7 +113,7 @@ def hijacked? TCPI = Raindrops::TCP_Info.allocate def check_client_connection(socket) # :nodoc: - if Unicorn::TCPClient === socket + if TCPSocket === socket # Raindrops::TCP_Info#get!, #state (reads struct tcp_info#tcpi_state) raise Errno::EPIPE, "client closed connection".freeze, EMPTY_ARRAY if closed_state?(TCPI.get!(socket).state) @@ -153,7 +158,7 @@ def closed_state?(state) # :nodoc: # Not that efficient, but probably still better than doing unnecessary # work after a client gives up. def check_client_connection(socket) # :nodoc: - if Unicorn::TCPClient === socket && @@tcpi_inspect_ok + if TCPSocket === socket && @@tcpi_inspect_ok opt = socket.getsockopt(Socket::IPPROTO_TCP, Socket::TCP_INFO).inspect if opt =~ /\bstate=(\S+)/ raise Errno::EPIPE, "client closed connection".freeze, diff --git a/lib/unicorn/http_server.rb b/lib/unicorn/http_server.rb index 21f2a05..98cd119 100644 --- a/lib/unicorn/http_server.rb +++ b/lib/unicorn/http_server.rb @@ -111,7 +111,7 @@ def initialize(app, options = {}) @worker_data = if worker_data = ENV['UNICORN_WORKER'] worker_data = worker_data.split(',').map!(&:to_i) worker_data[1] = worker_data.slice!(1..2).map do |i| - Kgio::Pipe.for_fd(i) + IO.for_fd(i) end worker_data end @@ -240,7 +240,7 @@ def listen(address, opt = {}.merge(listener_opts[address] || {})) tries = opt[:tries] || 5 begin io = bind_listen(address, opt) - unless Kgio::TCPServer === io || Kgio::UNIXServer === io + unless TCPServer === io || UNIXServer === io io.autoclose = false io = server_cast(io) end @@ -386,12 +386,18 @@ def master_sleep(sec) # the Ruby itself and not require a separate malloc (on 32-bit MRI 1.9+). # Most reads are only one byte here and uncommon, so it's not worth a # persistent buffer, either: - @self_pipe[0].kgio_tryread(11) + begin + @self_pipe[0].read_nonblock(11) + rescue Errno::EAGAIN + end end def awaken_master return if $$ != @master_pid - @self_pipe[1].kgio_trywrite('.') # wakeup master process from select + begin + @self_pipe[1].write_nonblock('.') # wakeup master process from select + rescue Errno::EAGAIN + end end # reaps all unreaped workers @@ -581,7 +587,10 @@ def handle_error(client, e) 500 end if code - client.kgio_trywrite(err_response(code, @request.response_start_sent)) + begin + client.write_nonblock(err_response(code, @request.response_start_sent)) + rescue Errno::EAGAIN + end end client.close rescue @@ -733,9 +742,15 @@ def worker_loop(worker) reopen = reopen_worker_logs(worker.nr) if reopen worker.tick = time_now.to_i while sock = ready.shift - # Unicorn::Worker#kgio_tryaccept is not like accept(2) at all, + # Unicorn::Worker#accept_nonblock is not like accept(2) at all, # but that will return false - if client = sock.kgio_tryaccept + client = begin + sock.accept_nonblock + rescue Errno::EAGAIN + false + end + + if client process_client(client) worker.tick = time_now.to_i end @@ -834,7 +849,7 @@ def redirect_io(io, path) def inherit_listeners! # inherit sockets from parents, they need to be plain Socket objects - # before they become Kgio::UNIXServer or Kgio::TCPServer + # before they become UNIXServer or TCPServer inherited = ENV['UNICORN_FD'].to_s.split(',') # emulate sd_listen_fds() for systemd @@ -858,7 +873,7 @@ def inherit_listeners! LISTENERS.replace(inherited) # we start out with generic Socket objects that get cast to either - # Kgio::TCPServer or Kgio::UNIXServer objects; but since the Socket + # TCPServer or UNIXServer objects; but since the Socket # objects share the same OS-level file descriptor as the higher-level # *Server objects; we need to prevent Socket objects from being # garbage-collected diff --git a/lib/unicorn/socket_helper.rb b/lib/unicorn/socket_helper.rb index 8a6f6ee..9d2ba52 100644 --- a/lib/unicorn/socket_helper.rb +++ b/lib/unicorn/socket_helper.rb @@ -3,18 +3,6 @@ require 'socket' module Unicorn - - # Instead of using a generic Kgio::Socket for everything, - # tag TCP sockets so we can use TCP_INFO under Linux without - # incurring extra syscalls for Unix domain sockets. - # TODO: remove these when we remove kgio - TCPClient = Class.new(Kgio::Socket) # :nodoc: - class TCPSrv < Kgio::TCPServer # :nodoc: - def kgio_tryaccept # :nodoc: - super(TCPClient) - end - end - module SocketHelper # internal interface @@ -135,7 +123,7 @@ def bind_listen(address = '0.0.0.0:8080', opt = {}) end old_umask = File.umask(opt[:umask] || 0) begin - Kgio::UNIXServer.new(address) + UNIXServer.new(address) ensure File.umask(old_umask) end @@ -164,7 +152,7 @@ def new_tcp_server(addr, port, opt) end sock.bind(Socket.pack_sockaddr_in(port, addr)) sock.autoclose = false - TCPSrv.for_fd(sock.fileno) + TCPServer.for_fd(sock.fileno) end # returns rfc2732-style (e.g. "[::1]:666") addresses for IPv6 @@ -201,9 +189,9 @@ def sock_name(sock) def server_cast(sock) begin Socket.unpack_sockaddr_in(sock.getsockname) - TCPSrv.for_fd(sock.fileno) + TCPServer.for_fd(sock.fileno) rescue ArgumentError - Kgio::UNIXServer.for_fd(sock.fileno) + UNIXServer.for_fd(sock.fileno) end end diff --git a/lib/unicorn/stream_input.rb b/lib/unicorn/stream_input.rb index 41d28a0..3241ff4 100644 --- a/lib/unicorn/stream_input.rb +++ b/lib/unicorn/stream_input.rb @@ -1,3 +1,4 @@ + # -*- encoding: binary -*- # When processing uploads, unicorn may expose a StreamInput object under @@ -49,7 +50,11 @@ def read(length = nil, rv = '') to_read = length - @rbuf.size rv.replace(@rbuf.slice!(0, @rbuf.size)) until to_read == 0 || eof? || (rv.size > 0 && @chunked) - @socket.kgio_read(to_read, @buf) or eof! + begin + @socket.readpartial(to_read, @buf) + rescue EOFError + eof! + end filter_body(@rbuf, @buf) rv << @rbuf to_read -= @rbuf.size @@ -72,8 +77,7 @@ def read(length = nil, rv = '') # Returns nil if called at the end of file. # This takes zero arguments for strict Rack::Lint compatibility, # unlike IO#gets. - def gets - sep = $/ + def gets(sep = $/) if sep.nil? read_all(rv = '') return rv.empty? ? nil : rv @@ -83,7 +87,7 @@ def gets begin @rbuf.sub!(re, '') and return $1 return @rbuf.empty? ? nil : @rbuf.slice!(0, @rbuf.size) if eof? - @socket.kgio_read(@@io_chunk_size, @buf) or eof! + @socket.readpartial(@@io_chunk_size, @buf) or eof! filter_body(once = '', @buf) @rbuf << once end while true @@ -107,7 +111,7 @@ def each def eof? if @parser.body_eof? while @chunked && ! @parser.parse - once = @socket.kgio_read(@@io_chunk_size) or eof! + once = @socket.readpartial(@@io_chunk_size) or eof! @buf << once end @socket = nil @@ -127,7 +131,7 @@ def read_all(dst) dst.replace(@rbuf) @socket or return until eof? - @socket.kgio_read(@@io_chunk_size, @buf) or eof! + @socket.readpartial(@@io_chunk_size, @buf) or eof! filter_body(@rbuf, @buf) dst << @rbuf end diff --git a/lib/unicorn/worker.rb b/lib/unicorn/worker.rb index 5ddf379..fe741c0 100644 --- a/lib/unicorn/worker.rb +++ b/lib/unicorn/worker.rb @@ -63,27 +63,39 @@ def soft_kill(sig) # :nodoc: signum = Signal.list[sig.to_s] or raise ArgumentError, "BUG: bad signal: #{sig.inspect}" end + # writing and reading 4 bytes on a pipe is atomic on all POSIX platforms # Do not care in the odd case the buffer is full, here. - @master.kgio_trywrite([signum].pack('l')) + begin + @master.write_nonblock([signum].pack('l')) + rescue Errno::EAGAIN + end rescue Errno::EPIPE # worker will be reaped soon end # this only runs when the Rack app.call is not running # act like a listener - def kgio_tryaccept # :nodoc: - case buf = @to_io.kgio_tryread(4) - when String - # unpack the buffer and trigger the signal handler - signum = buf.unpack('l') - fake_sig(signum[0]) - # keep looping, more signals may be queued - when nil # EOF: master died, but we are at a safe place to exit - fake_sig(:QUIT) - when :wait_readable # keep waiting - return false - end while true # loop, as multiple signals may be sent + def accept_nonblock # :nodoc: + loop do + buf = begin + @to_io.read_nonblock(4) + rescue Errno::EAGAIN # keep waiting + return false + rescue EOFError # master died, but we are at a safe place to exit + fake_sig(:QUIT) + end + + case buf + when String + # unpack the buffer and trigger the signal handler + signum = buf.unpack('l') + fake_sig(signum[0]) + # keep looping, more signals may be queued + else + raise TypeError, "Unexpected read_nonblock returns: #{buf.inspect}" + end + end # loop, as multiple signals may be sent end # worker objects may be compared to just plain Integers diff --git a/t/oob_gc.ru b/t/oob_gc.ru index c253540..43c0f68 100644 --- a/t/oob_gc.ru +++ b/t/oob_gc.ru @@ -7,7 +7,7 @@ # Mock GC.start def GC.start - ObjectSpace.each_object(Kgio::Socket) do |x| + ObjectSpace.each_object(Socket) do |x| x.closed? or abort "not closed #{x}" end $gc_started = true diff --git a/t/oob_gc_path.ru b/t/oob_gc_path.ru index af8e3b9..e772261 100644 --- a/t/oob_gc_path.ru +++ b/t/oob_gc_path.ru @@ -7,7 +7,7 @@ # Mock GC.start def GC.start - ObjectSpace.each_object(Kgio::Socket) do |x| + ObjectSpace.each_object(Socket) do |x| x.closed? or abort "not closed #{x}" end $gc_started = true diff --git a/test/unit/test_request.rb b/test/unit/test_request.rb index 6cb0268..a951057 100644 --- a/test/unit/test_request.rb +++ b/test/unit/test_request.rb @@ -11,11 +11,8 @@ class RequestTest < Test::Unit::TestCase class MockRequest < StringIO - alias_method :readpartial, :sysread - alias_method :kgio_read!, :sysread - alias_method :read_nonblock, :sysread - def kgio_addr - '127.0.0.1' + def remote_address + Addrinfo.tcp('127.0.0.1', 55608) end end @@ -152,14 +149,7 @@ def test_rack_lint_big_put buf = (' ' * bs).freeze length = bs * count client = Tempfile.new('big_put') - def client.kgio_addr; '127.0.0.1'; end - def client.kgio_read(*args) - readpartial(*args) - rescue EOFError - end - def client.kgio_read!(*args) - readpartial(*args) - end + def client.remote_address; Addrinfo.tcp('127.0.0.1', 55608); end client.syswrite( "PUT / HTTP/1.1\r\n" \ "Host: foo\r\n" \ diff --git a/test/unit/test_stream_input.rb b/test/unit/test_stream_input.rb index 1a07ec3..445b415 100644 --- a/test/unit/test_stream_input.rb +++ b/test/unit/test_stream_input.rb @@ -6,16 +6,14 @@ class TestStreamInput < Test::Unit::TestCase def setup - @rs = $/ @env = {} - @rd, @wr = Kgio::UNIXSocket.pair + @rd, @wr = UNIXSocket.pair @rd.sync = @wr.sync = true @start_pid = $$ end def teardown return if $$ != @start_pid - $/ = @rs @rd.close rescue nil @wr.close rescue nil Process.waitall @@ -54,10 +52,9 @@ def test_gets_multiline end def test_gets_empty_rs - $/ = nil r = init_request("a\nb\n\n") si = Unicorn::StreamInput.new(@rd, r) - assert_equal "a\nb\n\n", si.gets + assert_equal "a\nb\n\n", si.gets(nil) assert_nil si.gets end diff --git a/test/unit/test_tee_input.rb b/test/unit/test_tee_input.rb index 4647e66..21b90f1 100644 --- a/test/unit/test_tee_input.rb +++ b/test/unit/test_tee_input.rb @@ -12,7 +12,7 @@ class TestTeeInput < Test::Unit::TestCase def setup @rs = $/ - @rd, @wr = Kgio::UNIXSocket.pair + @rd, @wr = UNIXSocket.pair @rd.sync = @wr.sync = true @start_pid = $$ end diff --git a/unicorn.gemspec b/unicorn.gemspec index 7bb1154..85183d9 100644 --- a/unicorn.gemspec +++ b/unicorn.gemspec @@ -36,7 +36,6 @@ # won't have descriptive text, only the numeric status. s.add_development_dependency(%q) - s.add_dependency(%q, '~> 2.6') s.add_dependency(%q, '~> 0.7') s.add_development_dependency('test-unit', '~> 3.0') -- 2.35.1