From ad821f70a2488a91f2be1ac53cb2e64f50743989 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Tue, 28 Sep 2010 17:40:01 -0700 Subject: start using kgio library It removes the burden of byte slicing and setting file descriptor flags. In some cases, we can remove unnecessary peeraddr calls, too. --- lib/rainbows.rb | 4 +--- lib/rainbows/acceptor.rb | 26 -------------------------- lib/rainbows/base.rb | 17 +++++++++++++---- lib/rainbows/byte_slice.rb | 17 ----------------- lib/rainbows/ev_core.rb | 1 - lib/rainbows/event_machine.rb | 6 ++---- lib/rainbows/fiber/io.rb | 22 ++++++++++++---------- lib/rainbows/fiber/rev.rb | 6 ++---- lib/rainbows/fiber_pool.rb | 3 +-- lib/rainbows/fiber_spawn.rb | 3 +-- lib/rainbows/rev/client.rb | 18 +++++++++--------- lib/rainbows/rev/core.rb | 3 +-- lib/rainbows/rev/thread.rb | 2 +- lib/rainbows/revactor.rb | 1 + lib/rainbows/thread_pool.rb | 6 ++---- lib/rainbows/thread_spawn.rb | 3 +-- rainbows.gemspec | 1 + t/test_isolate.rb | 1 + 18 files changed, 49 insertions(+), 91 deletions(-) delete mode 100644 lib/rainbows/acceptor.rb delete mode 100644 lib/rainbows/byte_slice.rb diff --git a/lib/rainbows.rb b/lib/rainbows.rb index f80d5fd..0914609 100644 --- a/lib/rainbows.rb +++ b/lib/rainbows.rb @@ -1,11 +1,11 @@ # -*- encoding: binary -*- +require 'kgio' require 'unicorn' # the value passed to TCP_DEFER_ACCEPT actually matters in Linux 2.6.32+ Unicorn::SocketHelper::DEFAULTS[:tcp_defer_accept] = 60 require 'rainbows/error' require 'rainbows/configurator' -require 'fcntl' module Rainbows @@ -118,9 +118,7 @@ module Rainbows end # :startdoc: autoload :Fiber, 'rainbows/fiber' # core class - autoload :ByteSlice, 'rainbows/byte_slice' autoload :StreamFile, 'rainbows/stream_file' autoload :HttpResponse, 'rainbows/http_response' # deprecated autoload :ThreadTimeout, 'rainbows/thread_timeout' end -require 'rainbows/acceptor' diff --git a/lib/rainbows/acceptor.rb b/lib/rainbows/acceptor.rb deleted file mode 100644 index c67bf20..0000000 --- a/lib/rainbows/acceptor.rb +++ /dev/null @@ -1,26 +0,0 @@ -# -*- encoding: binary -*- - -# :enddoc: -require 'fcntl' - -# this should make life easier for Zbatery if compatibility with -# fcntl-crippled platforms is required (or if FD_CLOEXEC is inherited) -# and we want to microptimize away fcntl(2) syscalls. -module Rainbows::Acceptor - - # returns nil if accept fails - def sync_accept(sock) - rv = sock.accept - rv.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) - rv - rescue Errno::EAGAIN, Errno::ECONNABORTED, Errno::EINTR - end - - # returns nil if accept fails - def accept(sock) - rv = sock.accept_nonblock - rv.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) - rv - rescue Errno::EAGAIN, Errno::ECONNABORTED - end -end diff --git a/lib/rainbows/base.rb b/lib/rainbows/base.rb index 2f4d379..59747c7 100644 --- a/lib/rainbows/base.rb +++ b/lib/rainbows/base.rb @@ -24,10 +24,19 @@ module Rainbows::Base Rainbows::MaxBody.setup G.tmp = worker.tmp - # avoid spurious wakeups and blocking-accept() with 1.8 green threads - if ! defined?(RUBY_ENGINE) && RUBY_VERSION.to_f < 1.9 - require "io/nonblock" - Rainbows::HttpServer::LISTENERS.each { |l| l.nonblock = true } + listeners = Rainbows::HttpServer::LISTENERS + Rainbows::HttpServer::IO_PURGATORY.concat(listeners) + + # no need for this when Unicorn uses Kgio + listeners.map! do |io| + case io + when TCPServer + Kgio::TCPServer.for_fd(io.fileno) + when UNIXServer + Kgio::UNIXServer.for_fd(io.fileno) + else + io + end end # we're don't use the self-pipe mechanism in the Rainbows! worker diff --git a/lib/rainbows/byte_slice.rb b/lib/rainbows/byte_slice.rb deleted file mode 100644 index 3bb4dd7..0000000 --- a/lib/rainbows/byte_slice.rb +++ /dev/null @@ -1,17 +0,0 @@ -# -*- encoding: binary -*- -# :enddoc: -module Rainbows::ByteSlice - if String.method_defined?(:encoding) - def byte_slice(buf, range) - if buf.encoding != Encoding::BINARY - buf.dup.force_encoding(Encoding::BINARY)[range] - else - buf[range] - end - end - else - def byte_slice(buf, range) - buf[range] - end - end -end diff --git a/lib/rainbows/ev_core.rb b/lib/rainbows/ev_core.rb index 3e64ff9..bf00eed 100644 --- a/lib/rainbows/ev_core.rb +++ b/lib/rainbows/ev_core.rb @@ -16,7 +16,6 @@ module Rainbows ASYNC_CLOSE = "async.close".freeze def post_init - @remote_addr = Rainbows.addr(@_io) @env = {} @hp = HttpParser.new @state = :headers # [ :body [ :trailers ] ] :app_call :close diff --git a/lib/rainbows/event_machine.rb b/lib/rainbows/event_machine.rb index 5586d3e..96d9a9e 100644 --- a/lib/rainbows/event_machine.rb +++ b/lib/rainbows/event_machine.rb @@ -84,7 +84,7 @@ module Rainbows def app_call set_comm_inactivity_timeout 0 @env[RACK_INPUT] = @input - @env[REMOTE_ADDR] = @remote_addr + @env[REMOTE_ADDR] = @_io.kgio_addr @env[ASYNC_CALLBACK] = method(:em_write_response) @env[ASYNC_CLOSE] = EM::DefaultDeferrable.new @@ -170,8 +170,6 @@ module Rainbows end module Server # :nodoc: all - include Rainbows::Acceptor - def close detach @io.close @@ -179,7 +177,7 @@ module Rainbows def notify_readable return if CUR.size >= MAX - io = accept(@io) or return + io = @io.kgio_tryaccept or return sig = EM.attach_fd(io.fileno, false) CUR[sig] = CL.new(sig, io) end diff --git a/lib/rainbows/fiber/io.rb b/lib/rainbows/fiber/io.rb index 4175eb0..f83b8b7 100644 --- a/lib/rainbows/fiber/io.rb +++ b/lib/rainbows/fiber/io.rb @@ -6,9 +6,9 @@ module Rainbows # #to_io method and gives users the illusion of a synchronous # interface that yields away from the current Fiber whenever # the underlying IO object cannot read or write + # + # TODO: subclass off IO and include Kgio::SocketMethods instead class IO < Struct.new(:to_io, :f) - include Rainbows::ByteSlice - # :stopdoc: LOCALHOST = Unicorn::HttpRequest::LOCALHOST @@ -17,9 +17,8 @@ module Rainbows to_io.write_nonblock(buf) end - # enough for Rainbows.addr - def peeraddr - to_io.respond_to?(:peeraddr) ? to_io.peeraddr : [ LOCALHOST ] + def kgio_addr + to_io.kgio_addr end # for wrapping output response bodies @@ -58,11 +57,14 @@ module Rainbows def write(buf) begin - (w = to_io.write_nonblock(buf)) == buf.bytesize and return - buf = byte_slice(buf, w..-1) - rescue Errno::EAGAIN - wait_writable - retry + case rv = to_io.kgio_trywrite(buf) + when nil + return + when String + buf = rv + when Kgio::WaitWritable + wait_writable + end end while true end diff --git a/lib/rainbows/fiber/rev.rb b/lib/rainbows/fiber/rev.rb index 632b562..2c1abb7 100644 --- a/lib/rainbows/fiber/rev.rb +++ b/lib/rainbows/fiber/rev.rb @@ -54,7 +54,6 @@ module Rainbows::Fiber include Rainbows include Rainbows::Const include Rainbows::Response - include Rainbows::Acceptor FIO = Rainbows::Fiber::IO def to_io @@ -73,7 +72,7 @@ module Rainbows::Fiber def on_readable return if G.cur >= MAX - c = accept(@io) and ::Fiber.new { process(c) }.resume + c = @io.kgio_tryaccept and ::Fiber.new { process(c) }.resume end def process(io) @@ -82,7 +81,6 @@ module Rainbows::Fiber buf = client.read_timeout or return hp = HttpParser.new env = {} - remote_addr = Rainbows.addr(io) begin # loop buf << (client.read_timeout or return) until hp.headers(env, buf) @@ -90,7 +88,7 @@ module Rainbows::Fiber env[CLIENT_IO] = client env[RACK_INPUT] = 0 == hp.content_length ? HttpRequest::NULL_IO : TeeInput.new(client, env, hp, buf) - env[REMOTE_ADDR] = remote_addr + env[REMOTE_ADDR] = io.kgio_addr status, headers, body = APP.call(env.update(RACK_DEFAULTS)) if 100 == status.to_i diff --git a/lib/rainbows/fiber_pool.rb b/lib/rainbows/fiber_pool.rb index 63f1e2e..4f3ffd8 100644 --- a/lib/rainbows/fiber_pool.rb +++ b/lib/rainbows/fiber_pool.rb @@ -15,7 +15,6 @@ module Rainbows module FiberPool include Fiber::Base - include Rainbows::Acceptor def worker_loop(worker) # :nodoc: init_worker_process(worker) @@ -30,7 +29,7 @@ module Rainbows begin schedule do |l| fib = pool.shift or break # let another worker process take it - if io = accept(l) + if io = l.kgio_tryaccept fib.resume(Fiber::IO.new(io, fib)) else pool << fib diff --git a/lib/rainbows/fiber_spawn.rb b/lib/rainbows/fiber_spawn.rb index ecf83d8..ec259ad 100644 --- a/lib/rainbows/fiber_spawn.rb +++ b/lib/rainbows/fiber_spawn.rb @@ -12,7 +12,6 @@ module Rainbows module FiberSpawn include Fiber::Base - include Rainbows::Acceptor def worker_loop(worker) # :nodoc: init_worker_process(worker) @@ -23,7 +22,7 @@ module Rainbows begin schedule do |l| break if G.cur >= limit - io = accept(l) or next + io = l.kgio_tryaccept or next ::Fiber.new { process_client(fio.new(io, ::Fiber.current)) }.resume end rescue => e diff --git a/lib/rainbows/rev/client.rb b/lib/rainbows/rev/client.rb index 8bfeb31..0c02525 100644 --- a/lib/rainbows/rev/client.rb +++ b/lib/rainbows/rev/client.rb @@ -5,7 +5,6 @@ module Rainbows module Rev class Client < ::Rev::IO - include Rainbows::ByteSlice include Rainbows::EvCore G = Rainbows::G F = Rainbows::StreamFile @@ -28,13 +27,14 @@ module Rainbows def write(buf) if @_write_buffer.empty? begin - w = @_io.write_nonblock(buf) - return enable_write_watcher if w == Rack::Utils.bytesize(buf) - # we never care for the return value, but yes, we may return - # a "fake" short write from super(buf) if anybody cares. - buf = byte_slice(buf, w..-1) - rescue Errno::EAGAIN - break # fall through to super(buf) + case rv = @_io.kgio_trywrite(buf) + when nil + return enable_write_watcher + when Kgio::WaitWritable + break # fall through to super(buf) + when String + buf = rv # retry, skb could grow or been drained + end rescue => e return handle_error(e) end while true @@ -104,7 +104,7 @@ module Rainbows def app_call KATO.delete(self) @env[RACK_INPUT] = @input - @env[REMOTE_ADDR] = @remote_addr + @env[REMOTE_ADDR] = @_io.kgio_addr response = APP.call(@env.update(RACK_DEFAULTS)) rev_write_response(response, alive = @hp.keepalive? && G.alive) diff --git a/lib/rainbows/rev/core.rb b/lib/rainbows/rev/core.rb index aecd5e8..2273b24 100644 --- a/lib/rainbows/rev/core.rb +++ b/lib/rainbows/rev/core.rb @@ -7,12 +7,11 @@ require 'rainbows/rev/heartbeat' module Rainbows module Rev class Server < ::Rev::IO - include Rainbows::Acceptor # CL and MAX will be defined in the corresponding worker loop def on_readable return if CONN.size >= MAX - io = accept(@_io) and CL.new(io).attach(LOOP) + io = @_io.kgio_tryaccept and CL.new(io).attach(LOOP) end end # class Server diff --git a/lib/rainbows/rev/thread.rb b/lib/rainbows/rev/thread.rb index cce3e92..7b7d455 100644 --- a/lib/rainbows/rev/thread.rb +++ b/lib/rainbows/rev/thread.rb @@ -34,7 +34,7 @@ module Rainbows # here because that could cause a deadlock and we'd leak FDs def app_response begin - @env[REMOTE_ADDR] = @remote_addr + @env[REMOTE_ADDR] = @_io.kgio_addr APP.call(@env.update(RACK_DEFAULTS)) rescue => e Error.app(e) # we guarantee this does not raise diff --git a/lib/rainbows/revactor.rb b/lib/rainbows/revactor.rb index 388efa6..eae7673 100644 --- a/lib/rainbows/revactor.rb +++ b/lib/rainbows/revactor.rb @@ -1,5 +1,6 @@ # -*- encoding: binary -*- require 'revactor' +require 'fcntl' Revactor::VERSION >= '0.1.5' or abort 'revactor 0.1.5 is required' # Enables use of the Actor model through diff --git a/lib/rainbows/thread_pool.rb b/lib/rainbows/thread_pool.rb index a643bd8..321d3e4 100644 --- a/lib/rainbows/thread_pool.rb +++ b/lib/rainbows/thread_pool.rb @@ -22,9 +22,7 @@ module Rainbows # others and thus a lower +worker_connections+ setting is recommended. module ThreadPool - include Base - include Rainbows::Acceptor def worker_loop(worker) # :nodoc: init_worker_process(worker) @@ -45,7 +43,7 @@ module Rainbows def sync_worker # :nodoc: s = LISTENERS[0] begin - c = sync_accept(s) and process_client(c) + c = s.kgio_accept and process_client(c) rescue => e Error.listen_loop(e) end while G.alive @@ -59,7 +57,7 @@ module Rainbows # problem. On the other hand, a thundering herd may not # even incur as much overhead as an extra Mutex#synchronize ret = IO.select(LISTENERS, nil, nil, 1) and ret[0].each do |s| - s = accept(s) and process_client(s) + s = s.kgio_tryaccept and process_client(s) end rescue Errno::EINTR rescue => e diff --git a/lib/rainbows/thread_spawn.rb b/lib/rainbows/thread_spawn.rb index 0d4973a..a0ccde6 100644 --- a/lib/rainbows/thread_spawn.rb +++ b/lib/rainbows/thread_spawn.rb @@ -18,7 +18,6 @@ module Rainbows module ThreadSpawn include Base - include Rainbows::Acceptor def accept_loop(klass) #:nodoc: lock = Mutex.new @@ -37,7 +36,7 @@ module Rainbows # CPU during I/O wait, CPU cycles that can be better used # by other worker _processes_. sleep(0.01) - elsif c = sync_accept(l) + elsif c = l.kgio_accept klass.new(c) do |c| begin lock.synchronize { G.cur += 1 } diff --git a/rainbows.gemspec b/rainbows.gemspec index 4b9553d..95442d2 100644 --- a/rainbows.gemspec +++ b/rainbows.gemspec @@ -47,6 +47,7 @@ Gem::Specification.new do |s| # Unicorn 0.991.0 handles config.ru when started outside of # the prespecified working_directory s.add_dependency(%q, [">= 1.1.3", "< 2.0.0"]) + s.add_dependency(%q, ["~> 1.0.1"]) s.add_development_dependency(%q, "~> 2.1.0") # optional runtime dependencies depending on configuration diff --git a/t/test_isolate.rb b/t/test_isolate.rb index 52ec8d4..d39d7be 100644 --- a/t/test_isolate.rb +++ b/t/test_isolate.rb @@ -15,6 +15,7 @@ $stdout.reopen($stderr) Isolate.now!(opts) do gem 'rack', '1.1.0' # Cramp currently requires ~> 1.1.0 + gem 'kgio', '1.0.1' gem 'unicorn', '1.1.3' gem 'kcar', '0.1.1' -- cgit v1.2.3-24-ge0c7