From 42747db815ad668b20849afb2a9dcdd1319713ae Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Tue, 2 Nov 2010 12:32:23 -0700 Subject: avoid Errno::EAGAIN, harder Errno::EAGAIN is still a problem under Ruby 1.9.2, so try harder to avoid it and use kgio methods. Even when 1.9.3 is available, kgio will still be faster as exceptions are slower than normal return values. --- lib/rainbows/client.rb | 4 ++-- lib/rainbows/fiber/base.rb | 13 ------------- lib/rainbows/fiber/io.rb | 29 +++++++++++++++++++++-------- lib/rainbows/fiber/rev/methods.rb | 19 ++++++------------- lib/rainbows/process_client.rb | 8 ++------ lib/rainbows/read_timeout.rb | 28 ---------------------------- lib/rainbows/rev/client.rb | 13 +++++++++++++ lib/rainbows/rev/master.rb | 15 +++++++++------ lib/rainbows/timed_read.rb | 24 ++++++++++++++++++++++++ lib/rainbows/writer_thread_pool.rb | 4 ++++ lib/rainbows/writer_thread_spawn.rb | 4 ++++ 11 files changed, 85 insertions(+), 76 deletions(-) delete mode 100644 lib/rainbows/read_timeout.rb create mode 100644 lib/rainbows/timed_read.rb diff --git a/lib/rainbows/client.rb b/lib/rainbows/client.rb index 8956509..dc6d95e 100644 --- a/lib/rainbows/client.rb +++ b/lib/rainbows/client.rb @@ -1,9 +1,9 @@ # -*- encoding: binary -*- # :enddoc: -require 'rainbows/read_timeout' +require 'rainbows/timed_read' class Rainbows::Client < Kgio::Socket - include Rainbows::ReadTimeout + include Rainbows::TimedRead end Kgio.accept_class = Rainbows::Client diff --git a/lib/rainbows/fiber/base.rb b/lib/rainbows/fiber/base.rb index b7c4ce5..69bf5d9 100644 --- a/lib/rainbows/fiber/base.rb +++ b/lib/rainbows/fiber/base.rb @@ -56,19 +56,6 @@ module Rainbows::Fiber::Base max.nil? || max > (now + 1) ? 1 : max - now end - def wait_headers_readable(client) - io = client.to_io - expire = nil - begin - return io.recv_nonblock(1, Socket::MSG_PEEK) - rescue Errno::EAGAIN - return if expire && expire < Time.now - expire ||= Time.now + G.kato - client.wait_readable - retry - end - end - def process(client) G.cur += 1 process_client(client) diff --git a/lib/rainbows/fiber/io.rb b/lib/rainbows/fiber/io.rb index 3028eab..a9803ee 100644 --- a/lib/rainbows/fiber/io.rb +++ b/lib/rainbows/fiber/io.rb @@ -75,15 +75,28 @@ class Rainbows::Fiber::IO end # used for reading headers (respecting keepalive_timeout) - def read_timeout + def timed_read(buf) expire = nil - begin - return @to_io.read_nonblock(16384) - rescue Errno::EAGAIN - return if expire && expire < Time.now - expire ||= Time.now + G.kato - wait_readable - end while true + if @to_io.respond_to?(:kgio_tryread) + begin + case rv = @to_io.kgio_tryread(16384, buf) + when :wait_readable + return if expire && expire < Time.now + expire ||= Time.now + G.kato + wait_readable + else + return rv + end + end while true + else + begin + return @to_io.read_nonblock(16384, buf) + rescue Errno::EAGAIN + return if expire && expire < Time.now + expire ||= Time.now + G.kato + wait_readable + end while true + end end def readpartial(length, buf = "") diff --git a/lib/rainbows/fiber/rev/methods.rb b/lib/rainbows/fiber/rev/methods.rb index 64108a9..c09268f 100644 --- a/lib/rainbows/fiber/rev/methods.rb +++ b/lib/rainbows/fiber/rev/methods.rb @@ -3,7 +3,7 @@ module Rainbows::Fiber::Rev::Methods class Watcher < Rev::IOWatcher def initialize(fio, flag) - @f = fio.f || Fiber.current + @f = Fiber.current super(fio, flag) attach(Rev::Loop.default) end @@ -15,30 +15,23 @@ module Rainbows::Fiber::Rev::Methods alias on_writable on_readable end - def initialize(*args) - @f = Fiber.current - super(*args) - @r = @w = false - end - def close - @w.detach if @w - @r.detach if @r - @r = @w = false + @w.detach if defined?(@w) && @w.attached? + @r.detach if defined?(@r) && @r.attached? super end def wait_writable - @w ||= Watcher.new(self, :w) + @w = Watcher.new(self, :w) unless defined?(@w) @w.enable unless @w.enabled? Fiber.yield @w.disable end def wait_readable - @r ||= Watcher.new(self, :r) + @r = Watcher.new(self, :r) unless defined?(@r) @r.enable unless @r.enabled? - KATO << @f + KATO << Fiber.current Fiber.yield @r.disable end diff --git a/lib/rainbows/process_client.rb b/lib/rainbows/process_client.rb index d2c9d0e..d66c1ae 100644 --- a/lib/rainbows/process_client.rb +++ b/lib/rainbows/process_client.rb @@ -9,10 +9,6 @@ module Rainbows::ProcessClient TeeInput = Rainbows::TeeInput include Rainbows::Const - def wait_headers_readable(client) - IO.select([client], nil, nil, G.kato) - end - # once a client is accepted, it is processed in its entirety here # in 3 easy steps: read request, call app, write app response # this is used by synchronous concurrency models @@ -25,8 +21,8 @@ module Rainbows::ProcessClient begin # loop until env = hp.parse - wait_headers_readable(client) or return - buf << client.kgio_read!(16384) + client.timed_read(buf2 ||= "") or return + buf << buf2 end env[CLIENT_IO] = client diff --git a/lib/rainbows/read_timeout.rb b/lib/rainbows/read_timeout.rb deleted file mode 100644 index d8245bd..0000000 --- a/lib/rainbows/read_timeout.rb +++ /dev/null @@ -1,28 +0,0 @@ -# -*- encoding: binary -*- -# :enddoc: -module Rainbows::ReadTimeout - G = Rainbows::G # :nodoc: - - def wait_readable - IO.select([self], nil, nil, G.kato) - end - - # used for reading headers (respecting keepalive_timeout) - def read_timeout(buf = "") - expire = nil - begin - case rv = kgio_tryread(16384, buf) - when :wait_readable - now = Time.now.to_f - if expire - now > expire and return - else - expire = now + G.kato - end - wait_readable - else - return rv - end - end while true - end -end diff --git a/lib/rainbows/rev/client.rb b/lib/rainbows/rev/client.rb index b7c1c78..bc85fbd 100644 --- a/lib/rainbows/rev/client.rb +++ b/lib/rainbows/rev/client.rb @@ -42,6 +42,19 @@ module Rainbows super(buf) end + def on_readable + buf = @_io.kgio_tryread(16384) + case buf + when :wait_readable + when nil # eof + close + else + on_read buf + end + rescue Errno::ECONNRESET + close + end + # queued, optional response bodies, it should only be unpollable "fast" # devices where read(2) is uninterruptable. Unfortunately, NFS and ilk # are also part of this. We'll also stick DeferredResponse bodies in diff --git a/lib/rainbows/rev/master.rb b/lib/rainbows/rev/master.rb index 21b583a..8e5d4ef 100644 --- a/lib/rainbows/rev/master.rb +++ b/lib/rainbows/rev/master.rb @@ -2,20 +2,23 @@ # :enddoc: require 'rainbows/rev' -class Rainbows::Rev::Master < Rev::AsyncWatcher +class Rainbows::Rev::Master < Rev::IOWatcher def initialize(queue) - super() + @reader, @writer = Kgio::Pipe.new + super(@reader) @queue = queue end def <<(output) @queue << output - signal + @writer.kgio_trywrite("\0") end - def on_signal - client, response = @queue.pop - client.response_write(response) + def on_readable + if String === @reader.kgio_tryread(1) + client, response = @queue.pop + client.response_write(response) + end end end diff --git a/lib/rainbows/timed_read.rb b/lib/rainbows/timed_read.rb new file mode 100644 index 0000000..4a4e027 --- /dev/null +++ b/lib/rainbows/timed_read.rb @@ -0,0 +1,24 @@ +# -*- encoding: binary -*- +# :enddoc: +module Rainbows::TimedRead + G = Rainbows::G # :nodoc: + + def wait_readable + IO.select([self], nil, nil, G.kato) + end + + # used for reading headers (respecting keepalive_timeout) + def timed_read(buf) + expire = nil + begin + case rv = kgio_tryread(16384, buf) + when :wait_readable + return if expire && expire < Time.now + expire ||= Time.now + G.kato + wait_readable + else + return rv + end + end while true + end +end diff --git a/lib/rainbows/writer_thread_pool.rb b/lib/rainbows/writer_thread_pool.rb index a81725a..e8cad91 100644 --- a/lib/rainbows/writer_thread_pool.rb +++ b/lib/rainbows/writer_thread_pool.rb @@ -42,6 +42,10 @@ module Rainbows to_io.kgio_trywrite(buf) end + def timed_read(buf) + to_io.timed_read(buf) + end + def write(buf) q << [ to_io, buf ] end diff --git a/lib/rainbows/writer_thread_spawn.rb b/lib/rainbows/writer_thread_spawn.rb index 691e68c..02ae0d5 100644 --- a/lib/rainbows/writer_thread_spawn.rb +++ b/lib/rainbows/writer_thread_spawn.rb @@ -47,6 +47,10 @@ module Rainbows to_io.kgio_trywrite(buf) end + def timed_read(buf) + to_io.timed_read(buf) + end + def queue_writer # not using Thread.pass here because that spins the CPU during # I/O wait and will eat cycles from other worker processes. -- cgit v1.2.3-24-ge0c7