As discussed kgio is no longer absolutely necessary. We can use Ruby 2+ non blocking IO capabilities instead. Also available at https://github.com/casperisfine/unicorn.git (remove-kgio) ---  lib/unicorn.rb                 |  3 +--  lib/unicorn/http_request.rb    | 15 +++++++++-----  lib/unicorn/http_server.rb     | 33 +++++++++++++++++++++--------  lib/unicorn/socket_helper.rb   | 20 ++++--------------  lib/unicorn/stream_input.rb    | 16 ++++++++------  lib/unicorn/worker.rb          | 38 ++++++++++++++++++++++------------  t/oob_gc.ru                    |  2 +-  t/oob_gc_path.ru               |  2 +-  test/unit/test_request.rb      | 16 +++-----------  test/unit/test_stream_input.rb |  7 ++-----  test/unit/test_tee_input.rb    |  2 +-  unicorn.gemspec                |  1 -  12 files changed, 82 insertions(+), 73 deletions(-) diff --git a/lib/unicorn.rb b/lib/unicorn.rb index 1a50631..170719c 100644 --- a/lib/unicorn.rb +++ b/lib/unicorn.rb @@ -1,7 +1,6 @@  # -*- encoding: binary -*-  require 'etc'  require 'stringio' -require 'kgio'  require 'raindrops'  require 'io/wait' @@ -113,7 +112,7 @@ def self.log_error(logger, prefix, exc)    F_SETPIPE_SZ = 1031 if RUBY_PLATFORM =~ /linux/    def self.pipe # :nodoc: -    Kgio::Pipe.new.each do |io| +    IO.pipe.each do |io|        # shrink pipes to minimize impact on /proc/sys/fs/pipe-user-pages-soft        # limits.        if defined?(F_SETPIPE_SZ) diff --git a/lib/unicorn/http_request.rb b/lib/unicorn/http_request.rb index e3ad592..7f7324b 100644 --- a/lib/unicorn/http_request.rb +++ b/lib/unicorn/http_request.rb @@ -71,14 +71,19 @@ def read(socket)      #  identify the client for the immediate request to the server;      #  that client may be a proxy, gateway, or other intermediary      #  acting on behalf of the actual source client." -    e['REMOTE_ADDR'] = socket.kgio_addr +    address = socket.remote_address +    e['REMOTE_ADDR'] = if address.unix? +      "127.0.0.1" +    else +      address.ip_address +    end      # short circuit the common case with small GET requests first -    socket.kgio_read!(16384, buf) +    socket.readpartial(16384, buf)      if parse.nil?        # Parser is not done, queue up more data to read and continue parsing        # an Exception thrown from the parser will throw us out of the loop -      false until add_parse(socket.kgio_read!(16384)) +      false until add_parse(socket.readpartial(16384))      end      check_client_connection(socket) if @@check_client_connection @@ -108,7 +113,7 @@ def hijacked?      TCPI = Raindrops::TCP_Info.allocate      def check_client_connection(socket) # :nodoc: -      if Unicorn::TCPClient === socket +      if TCPSocket === socket          # Raindrops::TCP_Info#get!, #state (reads struct tcp_info#tcpi_state)          raise Errno::EPIPE, "client closed connection".freeze,                EMPTY_ARRAY if closed_state?(TCPI.get!(socket).state) @@ -153,7 +158,7 @@ def closed_state?(state) # :nodoc:      # Not that efficient, but probably still better than doing unnecessary      # work after a client gives up.      def check_client_connection(socket) # :nodoc: -      if Unicorn::TCPClient === socket && @@tcpi_inspect_ok +      if TCPSocket === socket && @@tcpi_inspect_ok          opt = socket.getsockopt(Socket::IPPROTO_TCP, Socket::TCP_INFO).inspect          if opt =~ /\bstate=(\S+)/            raise Errno::EPIPE, "client closed connection".freeze, diff --git a/lib/unicorn/http_server.rb b/lib/unicorn/http_server.rb index 21f2a05..98cd119 100644 --- a/lib/unicorn/http_server.rb +++ b/lib/unicorn/http_server.rb @@ -111,7 +111,7 @@ def initialize(app, options = {})      @worker_data = if worker_data = ENV['UNICORN_WORKER']        worker_data = worker_data.split(',').map!(&:to_i)        worker_data[1] = worker_data.slice!(1..2).map do |i| -        Kgio::Pipe.for_fd(i) +        IO.for_fd(i)        end        worker_data      end @@ -240,7 +240,7 @@ def listen(address, opt = {}.merge(listener_opts[address] || {}))      tries = opt[:tries] || 5      begin        io = bind_listen(address, opt) -      unless Kgio::TCPServer === io || Kgio::UNIXServer === io +      unless TCPServer === io || UNIXServer === io          io.autoclose = false          io = server_cast(io)        end @@ -386,12 +386,18 @@ def master_sleep(sec)      # the Ruby itself and not require a separate malloc (on 32-bit MRI 1.9+).      # Most reads are only one byte here and uncommon, so it's not worth a      # persistent buffer, either: -    @self_pipe[0].kgio_tryread(11) +    begin +      @self_pipe[0].read_nonblock(11) +    rescue Errno::EAGAIN +    end    end    def awaken_master      return if $$ != @master_pid -    @self_pipe[1].kgio_trywrite('.') # wakeup master process from select +    begin +      @self_pipe[1].write_nonblock('.') # wakeup master process from select +    rescue Errno::EAGAIN +    end    end    # reaps all unreaped workers @@ -581,7 +587,10 @@ def handle_error(client, e)        500      end      if code -      client.kgio_trywrite(err_response(code, @request.response_start_sent)) +      begin +        client.write_nonblock(err_response(code, @request.response_start_sent)) +      rescue Errno::EAGAIN +      end      end      client.close    rescue @@ -733,9 +742,15 @@ def worker_loop(worker)        reopen = reopen_worker_logs(worker.nr) if reopen        worker.tick = time_now.to_i        while sock = ready.shift -        # Unicorn::Worker#kgio_tryaccept is not like accept(2) at all, +        # Unicorn::Worker#accept_nonblock is not like accept(2) at all,          # but that will return false -        if client = sock.kgio_tryaccept +        client = begin +          sock.accept_nonblock +        rescue Errno::EAGAIN +          false +        end + +        if client            process_client(client)            worker.tick = time_now.to_i          end @@ -834,7 +849,7 @@ def redirect_io(io, path)    def inherit_listeners!      # inherit sockets from parents, they need to be plain Socket objects -    # before they become Kgio::UNIXServer or Kgio::TCPServer +    # before they become UNIXServer or TCPServer      inherited = ENV['UNICORN_FD'].to_s.split(',')      # emulate sd_listen_fds() for systemd @@ -858,7 +873,7 @@ def inherit_listeners!      LISTENERS.replace(inherited)      # we start out with generic Socket objects that get cast to either -    # Kgio::TCPServer or Kgio::UNIXServer objects; but since the Socket +    # TCPServer or UNIXServer objects; but since the Socket      # objects share the same OS-level file descriptor as the higher-level      # *Server objects; we need to prevent Socket objects from being      # garbage-collected diff --git a/lib/unicorn/socket_helper.rb b/lib/unicorn/socket_helper.rb index 8a6f6ee..9d2ba52 100644 --- a/lib/unicorn/socket_helper.rb +++ b/lib/unicorn/socket_helper.rb @@ -3,18 +3,6 @@  require 'socket'  module Unicorn - -  # Instead of using a generic Kgio::Socket for everything, -  # tag TCP sockets so we can use TCP_INFO under Linux without -  # incurring extra syscalls for Unix domain sockets. -  # TODO: remove these when we remove kgio -  TCPClient = Class.new(Kgio::Socket) # :nodoc: -  class TCPSrv < Kgio::TCPServer # :nodoc: -    def kgio_tryaccept # :nodoc: -      super(TCPClient) -    end -  end -    module SocketHelper      # internal interface @@ -135,7 +123,7 @@ def bind_listen(address = '0.0.0.0:8080', opt = {})          end          old_umask = File.umask(opt[:umask] || 0)          begin -          Kgio::UNIXServer.new(address) +          UNIXServer.new(address)          ensure            File.umask(old_umask)          end @@ -164,7 +152,7 @@ def new_tcp_server(addr, port, opt)        end        sock.bind(Socket.pack_sockaddr_in(port, addr))        sock.autoclose = false -      TCPSrv.for_fd(sock.fileno) +      TCPServer.for_fd(sock.fileno)      end      # returns rfc2732-style (e.g. "[::1]:666") addresses for IPv6 @@ -201,9 +189,9 @@ def sock_name(sock)      def server_cast(sock)        begin          Socket.unpack_sockaddr_in(sock.getsockname) -        TCPSrv.for_fd(sock.fileno) +        TCPServer.for_fd(sock.fileno)        rescue ArgumentError -        Kgio::UNIXServer.for_fd(sock.fileno) +        UNIXServer.for_fd(sock.fileno)        end      end diff --git a/lib/unicorn/stream_input.rb b/lib/unicorn/stream_input.rb index 41d28a0..3241ff4 100644 --- a/lib/unicorn/stream_input.rb +++ b/lib/unicorn/stream_input.rb @@ -1,3 +1,4 @@ +  # -*- encoding: binary -*-  # When processing uploads, unicorn may expose a StreamInput object under @@ -49,7 +50,11 @@ def read(length = nil, rv = '')          to_read = length - @rbuf.size          rv.replace(@rbuf.slice!(0, @rbuf.size))          until to_read == 0 || eof? || (rv.size > 0 && @chunked) -          @socket.kgio_read(to_read, @buf) or eof! +          begin +            @socket.readpartial(to_read, @buf) +          rescue EOFError +            eof! +          end            filter_body(@rbuf, @buf)            rv << @rbuf            to_read -= @rbuf.size @@ -72,8 +77,7 @@ def read(length = nil, rv = '')    # Returns nil if called at the end of file.    # This takes zero arguments for strict Rack::Lint compatibility,    # unlike IO#gets. -  def gets -    sep = $/ +  def gets(sep = $/)      if sep.nil?        read_all(rv = '')        return rv.empty? ? nil : rv @@ -83,7 +87,7 @@ def gets      begin        @rbuf.sub!(re, '') and return $1        return @rbuf.empty? ? nil : @rbuf.slice!(0, @rbuf.size) if eof? -      @socket.kgio_read(@@io_chunk_size, @buf) or eof! +      @socket.readpartial(@@io_chunk_size, @buf) or eof!        filter_body(once = '', @buf)        @rbuf << once      end while true @@ -107,7 +111,7 @@ def each    def eof?      if @parser.body_eof?        while @chunked && ! @parser.parse -        once = @socket.kgio_read(@@io_chunk_size) or eof! +        once = @socket.readpartial(@@io_chunk_size) or eof!          @buf << once        end        @socket = nil @@ -127,7 +131,7 @@ def read_all(dst)      dst.replace(@rbuf)      @socket or return      until eof? -      @socket.kgio_read(@@io_chunk_size, @buf) or eof! +      @socket.readpartial(@@io_chunk_size, @buf) or eof!        filter_body(@rbuf, @buf)        dst << @rbuf      end diff --git a/lib/unicorn/worker.rb b/lib/unicorn/worker.rb index 5ddf379..fe741c0 100644 --- a/lib/unicorn/worker.rb +++ b/lib/unicorn/worker.rb @@ -63,27 +63,39 @@ def soft_kill(sig) # :nodoc:        signum = Signal.list[sig.to_s] or            raise ArgumentError, "BUG: bad signal: #{sig.inspect}"      end +      # writing and reading 4 bytes on a pipe is atomic on all POSIX platforms      # Do not care in the odd case the buffer is full, here. -    @master.kgio_trywrite([signum].pack('l')) +    begin +      @master.write_nonblock([signum].pack('l')) +    rescue Errno::EAGAIN +    end    rescue Errno::EPIPE      # worker will be reaped soon    end    # this only runs when the Rack app.call is not running    # act like a listener -  def kgio_tryaccept # :nodoc: -    case buf = @to_io.kgio_tryread(4) -    when String -      # unpack the buffer and trigger the signal handler -      signum = buf.unpack('l') -      fake_sig(signum[0]) -      # keep looping, more signals may be queued -    when nil # EOF: master died, but we are at a safe place to exit -      fake_sig(:QUIT) -    when :wait_readable # keep waiting -      return false -    end while true # loop, as multiple signals may be sent +  def accept_nonblock # :nodoc: +    loop do +      buf = begin +        @to_io.read_nonblock(4) +      rescue Errno::EAGAIN # keep waiting +        return false +      rescue EOFError # master died, but we are at a safe place to exit +        fake_sig(:QUIT) +      end + +      case buf +      when String +        # unpack the buffer and trigger the signal handler +        signum = buf.unpack('l') +        fake_sig(signum[0]) +        # keep looping, more signals may be queued +      else +        raise TypeError, "Unexpected read_nonblock returns: #{buf.inspect}" +      end +    end # loop, as multiple signals may be sent    end    # worker objects may be compared to just plain Integers diff --git a/t/oob_gc.ru b/t/oob_gc.ru index c253540..43c0f68 100644 --- a/t/oob_gc.ru +++ b/t/oob_gc.ru @@ -7,7 +7,7 @@  # Mock GC.start  def GC.start -  ObjectSpace.each_object(Kgio::Socket) do |x| +  ObjectSpace.each_object(Socket) do |x|      x.closed? or abort "not closed #{x}"    end    $gc_started = true diff --git a/t/oob_gc_path.ru b/t/oob_gc_path.ru index af8e3b9..e772261 100644 --- a/t/oob_gc_path.ru +++ b/t/oob_gc_path.ru @@ -7,7 +7,7 @@  # Mock GC.start  def GC.start -  ObjectSpace.each_object(Kgio::Socket) do |x| +  ObjectSpace.each_object(Socket) do |x|      x.closed? or abort "not closed #{x}"    end    $gc_started = true diff --git a/test/unit/test_request.rb b/test/unit/test_request.rb index 6cb0268..a951057 100644 --- a/test/unit/test_request.rb +++ b/test/unit/test_request.rb @@ -11,11 +11,8 @@  class RequestTest < Test::Unit::TestCase    class MockRequest < StringIO -    alias_method :readpartial, :sysread -    alias_method :kgio_read!, :sysread -    alias_method :read_nonblock, :sysread -    def kgio_addr -      '127.0.0.1' +    def remote_address +      Addrinfo.tcp('127.0.0.1', 55608)      end    end @@ -152,14 +149,7 @@ def test_rack_lint_big_put      buf = (' ' * bs).freeze      length = bs * count      client = Tempfile.new('big_put') -    def client.kgio_addr; '127.0.0.1'; end -    def client.kgio_read(*args) -      readpartial(*args) -    rescue EOFError -    end -    def client.kgio_read!(*args) -      readpartial(*args) -    end +    def client.remote_address; Addrinfo.tcp('127.0.0.1', 55608); end      client.syswrite(        "PUT / HTTP/1.1\r\n" \        "Host: foo\r\n" \ diff --git a/test/unit/test_stream_input.rb b/test/unit/test_stream_input.rb index 1a07ec3..445b415 100644 --- a/test/unit/test_stream_input.rb +++ b/test/unit/test_stream_input.rb @@ -6,16 +6,14 @@  class TestStreamInput < Test::Unit::TestCase    def setup -    @rs = $/      @env = {} -    @rd, @wr = Kgio::UNIXSocket.pair +    @rd, @wr = UNIXSocket.pair      @rd.sync = @wr.sync = true      @start_pid = $$    end    def teardown      return if $$ != @start_pid -    $/ = @rs      @rd.close rescue nil      @wr.close rescue nil      Process.waitall @@ -54,10 +52,9 @@ def test_gets_multiline    end    def test_gets_empty_rs -    $/ = nil      r = init_request("a\nb\n\n")      si = Unicorn::StreamInput.new(@rd, r) -    assert_equal "a\nb\n\n", si.gets +    assert_equal "a\nb\n\n", si.gets(nil)      assert_nil si.gets    end diff --git a/test/unit/test_tee_input.rb b/test/unit/test_tee_input.rb index 4647e66..21b90f1 100644 --- a/test/unit/test_tee_input.rb +++ b/test/unit/test_tee_input.rb @@ -12,7 +12,7 @@ class TestTeeInput < Test::Unit::TestCase    def setup      @rs = $/ -    @rd, @wr = Kgio::UNIXSocket.pair +    @rd, @wr = UNIXSocket.pair      @rd.sync = @wr.sync = true      @start_pid = $$    end diff --git a/unicorn.gemspec b/unicorn.gemspec index 7bb1154..85183d9 100644 --- a/unicorn.gemspec +++ b/unicorn.gemspec @@ -36,7 +36,6 @@    # won't have descriptive text, only the numeric status.    s.add_development_dependency(%q) -  s.add_dependency(%q, '~> 2.6')    s.add_dependency(%q, '~> 0.7')    s.add_development_dependency('test-unit', '~> 3.0') -- 2.35.1