about summary refs log tree commit homepage
path: root/lib
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2010-11-02 12:32:23 -0700
committerEric Wong <normalperson@yhbt.net>2010-11-05 18:36:08 -0700
commit42747db815ad668b20849afb2a9dcdd1319713ae (patch)
tree6dcd7cb02f11bcfad40de6c72a9a6570df71c4d7 /lib
parent427ef4a2953a4b2d34f7dd89566a0cb5ee6e734d (diff)
downloadrainbows-42747db815ad668b20849afb2a9dcdd1319713ae.tar.gz
Errno::EAGAIN is still a problem under Ruby 1.9.2, so try harder
to avoid it and use kgio methods.  Even when 1.9.3 is available,
kgio will still be faster as exceptions are slower than normal
return values.
Diffstat (limited to 'lib')
-rw-r--r--lib/rainbows/client.rb4
-rw-r--r--lib/rainbows/fiber/base.rb13
-rw-r--r--lib/rainbows/fiber/io.rb29
-rw-r--r--lib/rainbows/fiber/rev/methods.rb19
-rw-r--r--lib/rainbows/process_client.rb8
-rw-r--r--lib/rainbows/rev/client.rb13
-rw-r--r--lib/rainbows/rev/master.rb15
-rw-r--r--lib/rainbows/timed_read.rb (renamed from lib/rainbows/read_timeout.rb)12
-rw-r--r--lib/rainbows/writer_thread_pool.rb4
-rw-r--r--lib/rainbows/writer_thread_spawn.rb4
10 files changed, 65 insertions, 56 deletions
diff --git a/lib/rainbows/client.rb b/lib/rainbows/client.rb
index 8956509..dc6d95e 100644
--- a/lib/rainbows/client.rb
+++ b/lib/rainbows/client.rb
@@ -1,9 +1,9 @@
 # -*- encoding: binary -*-
 # :enddoc:
 
-require 'rainbows/read_timeout'
+require 'rainbows/timed_read'
 
 class Rainbows::Client < Kgio::Socket
-  include Rainbows::ReadTimeout
+  include Rainbows::TimedRead
 end
 Kgio.accept_class = Rainbows::Client
diff --git a/lib/rainbows/fiber/base.rb b/lib/rainbows/fiber/base.rb
index b7c4ce5..69bf5d9 100644
--- a/lib/rainbows/fiber/base.rb
+++ b/lib/rainbows/fiber/base.rb
@@ -56,19 +56,6 @@ module Rainbows::Fiber::Base
     max.nil? || max > (now + 1) ? 1 : max - now
   end
 
-  def wait_headers_readable(client)
-    io = client.to_io
-    expire = nil
-    begin
-      return io.recv_nonblock(1, Socket::MSG_PEEK)
-    rescue Errno::EAGAIN
-      return if expire && expire < Time.now
-      expire ||= Time.now + G.kato
-      client.wait_readable
-      retry
-    end
-  end
-
   def process(client)
     G.cur += 1
     process_client(client)
diff --git a/lib/rainbows/fiber/io.rb b/lib/rainbows/fiber/io.rb
index 3028eab..a9803ee 100644
--- a/lib/rainbows/fiber/io.rb
+++ b/lib/rainbows/fiber/io.rb
@@ -75,15 +75,28 @@ class Rainbows::Fiber::IO
   end
 
   # used for reading headers (respecting keepalive_timeout)
-  def read_timeout
+  def timed_read(buf)
     expire = nil
-    begin
-      return @to_io.read_nonblock(16384)
-    rescue Errno::EAGAIN
-      return if expire && expire < Time.now
-      expire ||= Time.now + G.kato
-      wait_readable
-    end while true
+    if @to_io.respond_to?(:kgio_tryread)
+      begin
+        case rv = @to_io.kgio_tryread(16384, buf)
+        when :wait_readable
+          return if expire && expire < Time.now
+          expire ||= Time.now + G.kato
+          wait_readable
+        else
+          return rv
+        end
+      end while true
+    else
+      begin
+        return @to_io.read_nonblock(16384, buf)
+      rescue Errno::EAGAIN
+        return if expire && expire < Time.now
+        expire ||= Time.now + G.kato
+        wait_readable
+      end while true
+    end
   end
 
   def readpartial(length, buf = "")
diff --git a/lib/rainbows/fiber/rev/methods.rb b/lib/rainbows/fiber/rev/methods.rb
index 64108a9..c09268f 100644
--- a/lib/rainbows/fiber/rev/methods.rb
+++ b/lib/rainbows/fiber/rev/methods.rb
@@ -3,7 +3,7 @@
 module Rainbows::Fiber::Rev::Methods
   class Watcher < Rev::IOWatcher
     def initialize(fio, flag)
-      @f = fio.f || Fiber.current
+      @f = Fiber.current
       super(fio, flag)
       attach(Rev::Loop.default)
     end
@@ -15,30 +15,23 @@ module Rainbows::Fiber::Rev::Methods
     alias on_writable on_readable
   end
 
-  def initialize(*args)
-    @f = Fiber.current
-    super(*args)
-    @r = @w = false
-  end
-
   def close
-    @w.detach if @w
-    @r.detach if @r
-    @r = @w = false
+    @w.detach if defined?(@w) && @w.attached?
+    @r.detach if defined?(@r) && @r.attached?
     super
   end
 
   def wait_writable
-    @w ||= Watcher.new(self, :w)
+    @w = Watcher.new(self, :w) unless defined?(@w)
     @w.enable unless @w.enabled?
     Fiber.yield
     @w.disable
   end
 
   def wait_readable
-    @r ||= Watcher.new(self, :r)
+    @r = Watcher.new(self, :r) unless defined?(@r)
     @r.enable unless @r.enabled?
-    KATO << @f
+    KATO << Fiber.current
     Fiber.yield
     @r.disable
   end
diff --git a/lib/rainbows/process_client.rb b/lib/rainbows/process_client.rb
index d2c9d0e..d66c1ae 100644
--- a/lib/rainbows/process_client.rb
+++ b/lib/rainbows/process_client.rb
@@ -9,10 +9,6 @@ module Rainbows::ProcessClient
   TeeInput = Rainbows::TeeInput
   include Rainbows::Const
 
-  def wait_headers_readable(client)
-    IO.select([client], nil, nil, G.kato)
-  end
-
   # once a client is accepted, it is processed in its entirety here
   # in 3 easy steps: read request, call app, write app response
   # this is used by synchronous concurrency models
@@ -25,8 +21,8 @@ module Rainbows::ProcessClient
 
     begin # loop
       until env = hp.parse
-        wait_headers_readable(client) or return
-        buf << client.kgio_read!(16384)
+        client.timed_read(buf2 ||= "") or return
+        buf << buf2
       end
 
       env[CLIENT_IO] = client
diff --git a/lib/rainbows/rev/client.rb b/lib/rainbows/rev/client.rb
index b7c1c78..bc85fbd 100644
--- a/lib/rainbows/rev/client.rb
+++ b/lib/rainbows/rev/client.rb
@@ -42,6 +42,19 @@ module Rainbows
         super(buf)
       end
 
+      def on_readable
+        buf = @_io.kgio_tryread(16384)
+        case buf
+        when :wait_readable
+        when nil # eof
+          close
+        else
+          on_read buf
+        end
+      rescue Errno::ECONNRESET
+        close
+      end
+
       # queued, optional response bodies, it should only be unpollable "fast"
       # devices where read(2) is uninterruptable.  Unfortunately, NFS and ilk
       # are also part of this.  We'll also stick DeferredResponse bodies in
diff --git a/lib/rainbows/rev/master.rb b/lib/rainbows/rev/master.rb
index 21b583a..8e5d4ef 100644
--- a/lib/rainbows/rev/master.rb
+++ b/lib/rainbows/rev/master.rb
@@ -2,20 +2,23 @@
 # :enddoc:
 require 'rainbows/rev'
 
-class Rainbows::Rev::Master < Rev::AsyncWatcher
+class Rainbows::Rev::Master < Rev::IOWatcher
 
   def initialize(queue)
-    super()
+    @reader, @writer = Kgio::Pipe.new
+    super(@reader)
     @queue = queue
   end
 
   def <<(output)
     @queue << output
-    signal
+    @writer.kgio_trywrite("\0")
   end
 
-  def on_signal
-    client, response = @queue.pop
-    client.response_write(response)
+  def on_readable
+    if String === @reader.kgio_tryread(1)
+      client, response = @queue.pop
+      client.response_write(response)
+    end
   end
 end
diff --git a/lib/rainbows/read_timeout.rb b/lib/rainbows/timed_read.rb
index d8245bd..4a4e027 100644
--- a/lib/rainbows/read_timeout.rb
+++ b/lib/rainbows/timed_read.rb
@@ -1,6 +1,6 @@
 # -*- encoding: binary -*-
 # :enddoc:
-module Rainbows::ReadTimeout
+module Rainbows::TimedRead
   G = Rainbows::G # :nodoc:
 
   def wait_readable
@@ -8,17 +8,13 @@ module Rainbows::ReadTimeout
   end
 
   # used for reading headers (respecting keepalive_timeout)
-  def read_timeout(buf = "")
+  def timed_read(buf)
     expire = nil
     begin
       case rv = kgio_tryread(16384, buf)
       when :wait_readable
-        now = Time.now.to_f
-        if expire
-          now > expire and return
-        else
-          expire = now + G.kato
-        end
+        return if expire && expire < Time.now
+        expire ||= Time.now + G.kato
         wait_readable
       else
         return rv
diff --git a/lib/rainbows/writer_thread_pool.rb b/lib/rainbows/writer_thread_pool.rb
index a81725a..e8cad91 100644
--- a/lib/rainbows/writer_thread_pool.rb
+++ b/lib/rainbows/writer_thread_pool.rb
@@ -42,6 +42,10 @@ module Rainbows
         to_io.kgio_trywrite(buf)
       end
 
+      def timed_read(buf)
+        to_io.timed_read(buf)
+      end
+
       def write(buf)
         q << [ to_io, buf ]
       end
diff --git a/lib/rainbows/writer_thread_spawn.rb b/lib/rainbows/writer_thread_spawn.rb
index 691e68c..02ae0d5 100644
--- a/lib/rainbows/writer_thread_spawn.rb
+++ b/lib/rainbows/writer_thread_spawn.rb
@@ -47,6 +47,10 @@ module Rainbows
         to_io.kgio_trywrite(buf)
       end
 
+      def timed_read(buf)
+        to_io.timed_read(buf)
+      end
+
       def queue_writer
         # not using Thread.pass here because that spins the CPU during
         # I/O wait and will eat cycles from other worker processes.