about summary refs log tree commit homepage
diff options
context:
space:
mode:
-rw-r--r--lib/rainbows/client.rb4
-rw-r--r--lib/rainbows/fiber/base.rb13
-rw-r--r--lib/rainbows/fiber/io.rb29
-rw-r--r--lib/rainbows/fiber/rev/methods.rb19
-rw-r--r--lib/rainbows/process_client.rb8
-rw-r--r--lib/rainbows/rev/client.rb13
-rw-r--r--lib/rainbows/rev/master.rb15
-rw-r--r--lib/rainbows/timed_read.rb (renamed from lib/rainbows/read_timeout.rb)12
-rw-r--r--lib/rainbows/writer_thread_pool.rb4
-rw-r--r--lib/rainbows/writer_thread_spawn.rb4
10 files changed, 65 insertions, 56 deletions
diff --git a/lib/rainbows/client.rb b/lib/rainbows/client.rb
index 8956509..dc6d95e 100644
--- a/lib/rainbows/client.rb
+++ b/lib/rainbows/client.rb
@@ -1,9 +1,9 @@
 # -*- encoding: binary -*-
 # :enddoc:
 
-require 'rainbows/read_timeout'
+require 'rainbows/timed_read'
 
 class Rainbows::Client < Kgio::Socket
-  include Rainbows::ReadTimeout
+  include Rainbows::TimedRead
 end
 Kgio.accept_class = Rainbows::Client
diff --git a/lib/rainbows/fiber/base.rb b/lib/rainbows/fiber/base.rb
index b7c4ce5..69bf5d9 100644
--- a/lib/rainbows/fiber/base.rb
+++ b/lib/rainbows/fiber/base.rb
@@ -56,19 +56,6 @@ module Rainbows::Fiber::Base
     max.nil? || max > (now + 1) ? 1 : max - now
   end
 
-  def wait_headers_readable(client)
-    io = client.to_io
-    expire = nil
-    begin
-      return io.recv_nonblock(1, Socket::MSG_PEEK)
-    rescue Errno::EAGAIN
-      return if expire && expire < Time.now
-      expire ||= Time.now + G.kato
-      client.wait_readable
-      retry
-    end
-  end
-
   def process(client)
     G.cur += 1
     process_client(client)
diff --git a/lib/rainbows/fiber/io.rb b/lib/rainbows/fiber/io.rb
index 3028eab..a9803ee 100644
--- a/lib/rainbows/fiber/io.rb
+++ b/lib/rainbows/fiber/io.rb
@@ -75,15 +75,28 @@ class Rainbows::Fiber::IO
   end
 
   # used for reading headers (respecting keepalive_timeout)
-  def read_timeout
+  def timed_read(buf)
     expire = nil
-    begin
-      return @to_io.read_nonblock(16384)
-    rescue Errno::EAGAIN
-      return if expire && expire < Time.now
-      expire ||= Time.now + G.kato
-      wait_readable
-    end while true
+    if @to_io.respond_to?(:kgio_tryread)
+      begin
+        case rv = @to_io.kgio_tryread(16384, buf)
+        when :wait_readable
+          return if expire && expire < Time.now
+          expire ||= Time.now + G.kato
+          wait_readable
+        else
+          return rv
+        end
+      end while true
+    else
+      begin
+        return @to_io.read_nonblock(16384, buf)
+      rescue Errno::EAGAIN
+        return if expire && expire < Time.now
+        expire ||= Time.now + G.kato
+        wait_readable
+      end while true
+    end
   end
 
   def readpartial(length, buf = "")
diff --git a/lib/rainbows/fiber/rev/methods.rb b/lib/rainbows/fiber/rev/methods.rb
index 64108a9..c09268f 100644
--- a/lib/rainbows/fiber/rev/methods.rb
+++ b/lib/rainbows/fiber/rev/methods.rb
@@ -3,7 +3,7 @@
 module Rainbows::Fiber::Rev::Methods
   class Watcher < Rev::IOWatcher
     def initialize(fio, flag)
-      @f = fio.f || Fiber.current
+      @f = Fiber.current
       super(fio, flag)
       attach(Rev::Loop.default)
     end
@@ -15,30 +15,23 @@ module Rainbows::Fiber::Rev::Methods
     alias on_writable on_readable
   end
 
-  def initialize(*args)
-    @f = Fiber.current
-    super(*args)
-    @r = @w = false
-  end
-
   def close
-    @w.detach if @w
-    @r.detach if @r
-    @r = @w = false
+    @w.detach if defined?(@w) && @w.attached?
+    @r.detach if defined?(@r) && @r.attached?
     super
   end
 
   def wait_writable
-    @w ||= Watcher.new(self, :w)
+    @w = Watcher.new(self, :w) unless defined?(@w)
     @w.enable unless @w.enabled?
     Fiber.yield
     @w.disable
   end
 
   def wait_readable
-    @r ||= Watcher.new(self, :r)
+    @r = Watcher.new(self, :r) unless defined?(@r)
     @r.enable unless @r.enabled?
-    KATO << @f
+    KATO << Fiber.current
     Fiber.yield
     @r.disable
   end
diff --git a/lib/rainbows/process_client.rb b/lib/rainbows/process_client.rb
index d2c9d0e..d66c1ae 100644
--- a/lib/rainbows/process_client.rb
+++ b/lib/rainbows/process_client.rb
@@ -9,10 +9,6 @@ module Rainbows::ProcessClient
   TeeInput = Rainbows::TeeInput
   include Rainbows::Const
 
-  def wait_headers_readable(client)
-    IO.select([client], nil, nil, G.kato)
-  end
-
   # once a client is accepted, it is processed in its entirety here
   # in 3 easy steps: read request, call app, write app response
   # this is used by synchronous concurrency models
@@ -25,8 +21,8 @@ module Rainbows::ProcessClient
 
     begin # loop
       until env = hp.parse
-        wait_headers_readable(client) or return
-        buf << client.kgio_read!(16384)
+        client.timed_read(buf2 ||= "") or return
+        buf << buf2
       end
 
       env[CLIENT_IO] = client
diff --git a/lib/rainbows/rev/client.rb b/lib/rainbows/rev/client.rb
index b7c1c78..bc85fbd 100644
--- a/lib/rainbows/rev/client.rb
+++ b/lib/rainbows/rev/client.rb
@@ -42,6 +42,19 @@ module Rainbows
         super(buf)
       end
 
+      def on_readable
+        buf = @_io.kgio_tryread(16384)
+        case buf
+        when :wait_readable
+        when nil # eof
+          close
+        else
+          on_read buf
+        end
+      rescue Errno::ECONNRESET
+        close
+      end
+
       # queued, optional response bodies, it should only be unpollable "fast"
       # devices where read(2) is uninterruptable.  Unfortunately, NFS and ilk
       # are also part of this.  We'll also stick DeferredResponse bodies in
diff --git a/lib/rainbows/rev/master.rb b/lib/rainbows/rev/master.rb
index 21b583a..8e5d4ef 100644
--- a/lib/rainbows/rev/master.rb
+++ b/lib/rainbows/rev/master.rb
@@ -2,20 +2,23 @@
 # :enddoc:
 require 'rainbows/rev'
 
-class Rainbows::Rev::Master < Rev::AsyncWatcher
+class Rainbows::Rev::Master < Rev::IOWatcher
 
   def initialize(queue)
-    super()
+    @reader, @writer = Kgio::Pipe.new
+    super(@reader)
     @queue = queue
   end
 
   def <<(output)
     @queue << output
-    signal
+    @writer.kgio_trywrite("\0")
   end
 
-  def on_signal
-    client, response = @queue.pop
-    client.response_write(response)
+  def on_readable
+    if String === @reader.kgio_tryread(1)
+      client, response = @queue.pop
+      client.response_write(response)
+    end
   end
 end
diff --git a/lib/rainbows/read_timeout.rb b/lib/rainbows/timed_read.rb
index d8245bd..4a4e027 100644
--- a/lib/rainbows/read_timeout.rb
+++ b/lib/rainbows/timed_read.rb
@@ -1,6 +1,6 @@
 # -*- encoding: binary -*-
 # :enddoc:
-module Rainbows::ReadTimeout
+module Rainbows::TimedRead
   G = Rainbows::G # :nodoc:
 
   def wait_readable
@@ -8,17 +8,13 @@ module Rainbows::ReadTimeout
   end
 
   # used for reading headers (respecting keepalive_timeout)
-  def read_timeout(buf = "")
+  def timed_read(buf)
     expire = nil
     begin
       case rv = kgio_tryread(16384, buf)
       when :wait_readable
-        now = Time.now.to_f
-        if expire
-          now > expire and return
-        else
-          expire = now + G.kato
-        end
+        return if expire && expire < Time.now
+        expire ||= Time.now + G.kato
         wait_readable
       else
         return rv
diff --git a/lib/rainbows/writer_thread_pool.rb b/lib/rainbows/writer_thread_pool.rb
index a81725a..e8cad91 100644
--- a/lib/rainbows/writer_thread_pool.rb
+++ b/lib/rainbows/writer_thread_pool.rb
@@ -42,6 +42,10 @@ module Rainbows
         to_io.kgio_trywrite(buf)
       end
 
+      def timed_read(buf)
+        to_io.timed_read(buf)
+      end
+
       def write(buf)
         q << [ to_io, buf ]
       end
diff --git a/lib/rainbows/writer_thread_spawn.rb b/lib/rainbows/writer_thread_spawn.rb
index 691e68c..02ae0d5 100644
--- a/lib/rainbows/writer_thread_spawn.rb
+++ b/lib/rainbows/writer_thread_spawn.rb
@@ -47,6 +47,10 @@ module Rainbows
         to_io.kgio_trywrite(buf)
       end
 
+      def timed_read(buf)
+        to_io.timed_read(buf)
+      end
+
       def queue_writer
         # not using Thread.pass here because that spins the CPU during
         # I/O wait and will eat cycles from other worker processes.