diff options
-rw-r--r-- | lib/rainbows/client.rb | 4 | ||||
-rw-r--r-- | lib/rainbows/fiber/base.rb | 13 | ||||
-rw-r--r-- | lib/rainbows/fiber/io.rb | 29 | ||||
-rw-r--r-- | lib/rainbows/fiber/rev/methods.rb | 19 | ||||
-rw-r--r-- | lib/rainbows/process_client.rb | 8 | ||||
-rw-r--r-- | lib/rainbows/rev/client.rb | 13 | ||||
-rw-r--r-- | lib/rainbows/rev/master.rb | 15 | ||||
-rw-r--r-- | lib/rainbows/timed_read.rb (renamed from lib/rainbows/read_timeout.rb) | 12 | ||||
-rw-r--r-- | lib/rainbows/writer_thread_pool.rb | 4 | ||||
-rw-r--r-- | lib/rainbows/writer_thread_spawn.rb | 4 |
10 files changed, 65 insertions, 56 deletions
diff --git a/lib/rainbows/client.rb b/lib/rainbows/client.rb index 8956509..dc6d95e 100644 --- a/lib/rainbows/client.rb +++ b/lib/rainbows/client.rb @@ -1,9 +1,9 @@ # -*- encoding: binary -*- # :enddoc: -require 'rainbows/read_timeout' +require 'rainbows/timed_read' class Rainbows::Client < Kgio::Socket - include Rainbows::ReadTimeout + include Rainbows::TimedRead end Kgio.accept_class = Rainbows::Client diff --git a/lib/rainbows/fiber/base.rb b/lib/rainbows/fiber/base.rb index b7c4ce5..69bf5d9 100644 --- a/lib/rainbows/fiber/base.rb +++ b/lib/rainbows/fiber/base.rb @@ -56,19 +56,6 @@ module Rainbows::Fiber::Base max.nil? || max > (now + 1) ? 1 : max - now end - def wait_headers_readable(client) - io = client.to_io - expire = nil - begin - return io.recv_nonblock(1, Socket::MSG_PEEK) - rescue Errno::EAGAIN - return if expire && expire < Time.now - expire ||= Time.now + G.kato - client.wait_readable - retry - end - end - def process(client) G.cur += 1 process_client(client) diff --git a/lib/rainbows/fiber/io.rb b/lib/rainbows/fiber/io.rb index 3028eab..a9803ee 100644 --- a/lib/rainbows/fiber/io.rb +++ b/lib/rainbows/fiber/io.rb @@ -75,15 +75,28 @@ class Rainbows::Fiber::IO end # used for reading headers (respecting keepalive_timeout) - def read_timeout + def timed_read(buf) expire = nil - begin - return @to_io.read_nonblock(16384) - rescue Errno::EAGAIN - return if expire && expire < Time.now - expire ||= Time.now + G.kato - wait_readable - end while true + if @to_io.respond_to?(:kgio_tryread) + begin + case rv = @to_io.kgio_tryread(16384, buf) + when :wait_readable + return if expire && expire < Time.now + expire ||= Time.now + G.kato + wait_readable + else + return rv + end + end while true + else + begin + return @to_io.read_nonblock(16384, buf) + rescue Errno::EAGAIN + return if expire && expire < Time.now + expire ||= Time.now + G.kato + wait_readable + end while true + end end def readpartial(length, buf = "") diff --git a/lib/rainbows/fiber/rev/methods.rb b/lib/rainbows/fiber/rev/methods.rb index 64108a9..c09268f 100644 --- a/lib/rainbows/fiber/rev/methods.rb +++ b/lib/rainbows/fiber/rev/methods.rb @@ -3,7 +3,7 @@ module Rainbows::Fiber::Rev::Methods class Watcher < Rev::IOWatcher def initialize(fio, flag) - @f = fio.f || Fiber.current + @f = Fiber.current super(fio, flag) attach(Rev::Loop.default) end @@ -15,30 +15,23 @@ module Rainbows::Fiber::Rev::Methods alias on_writable on_readable end - def initialize(*args) - @f = Fiber.current - super(*args) - @r = @w = false - end - def close - @w.detach if @w - @r.detach if @r - @r = @w = false + @w.detach if defined?(@w) && @w.attached? + @r.detach if defined?(@r) && @r.attached? super end def wait_writable - @w ||= Watcher.new(self, :w) + @w = Watcher.new(self, :w) unless defined?(@w) @w.enable unless @w.enabled? Fiber.yield @w.disable end def wait_readable - @r ||= Watcher.new(self, :r) + @r = Watcher.new(self, :r) unless defined?(@r) @r.enable unless @r.enabled? - KATO << @f + KATO << Fiber.current Fiber.yield @r.disable end diff --git a/lib/rainbows/process_client.rb b/lib/rainbows/process_client.rb index d2c9d0e..d66c1ae 100644 --- a/lib/rainbows/process_client.rb +++ b/lib/rainbows/process_client.rb @@ -9,10 +9,6 @@ module Rainbows::ProcessClient TeeInput = Rainbows::TeeInput include Rainbows::Const - def wait_headers_readable(client) - IO.select([client], nil, nil, G.kato) - end - # once a client is accepted, it is processed in its entirety here # in 3 easy steps: read request, call app, write app response # this is used by synchronous concurrency models @@ -25,8 +21,8 @@ module Rainbows::ProcessClient begin # loop until env = hp.parse - wait_headers_readable(client) or return - buf << client.kgio_read!(16384) + client.timed_read(buf2 ||= "") or return + buf << buf2 end env[CLIENT_IO] = client diff --git a/lib/rainbows/rev/client.rb b/lib/rainbows/rev/client.rb index b7c1c78..bc85fbd 100644 --- a/lib/rainbows/rev/client.rb +++ b/lib/rainbows/rev/client.rb @@ -42,6 +42,19 @@ module Rainbows super(buf) end + def on_readable + buf = @_io.kgio_tryread(16384) + case buf + when :wait_readable + when nil # eof + close + else + on_read buf + end + rescue Errno::ECONNRESET + close + end + # queued, optional response bodies, it should only be unpollable "fast" # devices where read(2) is uninterruptable. Unfortunately, NFS and ilk # are also part of this. We'll also stick DeferredResponse bodies in diff --git a/lib/rainbows/rev/master.rb b/lib/rainbows/rev/master.rb index 21b583a..8e5d4ef 100644 --- a/lib/rainbows/rev/master.rb +++ b/lib/rainbows/rev/master.rb @@ -2,20 +2,23 @@ # :enddoc: require 'rainbows/rev' -class Rainbows::Rev::Master < Rev::AsyncWatcher +class Rainbows::Rev::Master < Rev::IOWatcher def initialize(queue) - super() + @reader, @writer = Kgio::Pipe.new + super(@reader) @queue = queue end def <<(output) @queue << output - signal + @writer.kgio_trywrite("\0") end - def on_signal - client, response = @queue.pop - client.response_write(response) + def on_readable + if String === @reader.kgio_tryread(1) + client, response = @queue.pop + client.response_write(response) + end end end diff --git a/lib/rainbows/read_timeout.rb b/lib/rainbows/timed_read.rb index d8245bd..4a4e027 100644 --- a/lib/rainbows/read_timeout.rb +++ b/lib/rainbows/timed_read.rb @@ -1,6 +1,6 @@ # -*- encoding: binary -*- # :enddoc: -module Rainbows::ReadTimeout +module Rainbows::TimedRead G = Rainbows::G # :nodoc: def wait_readable @@ -8,17 +8,13 @@ module Rainbows::ReadTimeout end # used for reading headers (respecting keepalive_timeout) - def read_timeout(buf = "") + def timed_read(buf) expire = nil begin case rv = kgio_tryread(16384, buf) when :wait_readable - now = Time.now.to_f - if expire - now > expire and return - else - expire = now + G.kato - end + return if expire && expire < Time.now + expire ||= Time.now + G.kato wait_readable else return rv diff --git a/lib/rainbows/writer_thread_pool.rb b/lib/rainbows/writer_thread_pool.rb index a81725a..e8cad91 100644 --- a/lib/rainbows/writer_thread_pool.rb +++ b/lib/rainbows/writer_thread_pool.rb @@ -42,6 +42,10 @@ module Rainbows to_io.kgio_trywrite(buf) end + def timed_read(buf) + to_io.timed_read(buf) + end + def write(buf) q << [ to_io, buf ] end diff --git a/lib/rainbows/writer_thread_spawn.rb b/lib/rainbows/writer_thread_spawn.rb index 691e68c..02ae0d5 100644 --- a/lib/rainbows/writer_thread_spawn.rb +++ b/lib/rainbows/writer_thread_spawn.rb @@ -47,6 +47,10 @@ module Rainbows to_io.kgio_trywrite(buf) end + def timed_read(buf) + to_io.timed_read(buf) + end + def queue_writer # not using Thread.pass here because that spins the CPU during # I/O wait and will eat cycles from other worker processes. |