about summary refs log tree commit homepage
path: root/lib/rainbows
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2009-11-26 00:41:26 -0800
committerEric Wong <normalperson@yhbt.net>2009-11-26 13:41:06 -0800
commit278d9d5a7f3d2dc3c6563af1584b5e773e08073d (patch)
treea3066da21425c49f3ca89bb8cf3af2ab03c285d0 /lib/rainbows
parent1079dfa30108466d413f30526eda468cdf0ae985 (diff)
downloadrainbows-278d9d5a7f3d2dc3c6563af1584b5e773e08073d.tar.gz
Both FiberSpawn and FiberPool share similar main loops, the
only difference being the handling of connection acceptance.
So move the scheduler into it's own function for consistency.

We'll also correctly implement keepalive timeout so clients
get disconnected at the right time.
Diffstat (limited to 'lib/rainbows')
-rw-r--r--lib/rainbows/fiber.rb35
-rw-r--r--lib/rainbows/fiber/io.rb4
-rw-r--r--lib/rainbows/fiber_pool.rb21
-rw-r--r--lib/rainbows/fiber_spawn.rb23
4 files changed, 38 insertions, 45 deletions
diff --git a/lib/rainbows/fiber.rb b/lib/rainbows/fiber.rb
index f0755aa..1927a78 100644
--- a/lib/rainbows/fiber.rb
+++ b/lib/rainbows/fiber.rb
@@ -10,6 +10,10 @@ module Rainbows
     WR = {}
     ZZ = {}
 
+    # puts the current Fiber into uninterruptible sleep for at least
+    # +seconds+.  Unlike Kernel#sleep, this it is not possible to sleep
+    # indefinitely to be woken up (nobody wants that in a web server,
+    # right?).
     def self.sleep(seconds)
       ZZ[::Fiber.current] = Time.now + seconds
       ::Fiber.yield
@@ -18,9 +22,34 @@ module Rainbows
     module Base
       include Rainbows::Base
 
+      # the scheduler method that powers both FiberSpawn and FiberPool
+      # concurrency models.  It times out idle clients and attempts to
+      # schedules ones that were blocked on I/O.  At most it'll sleep
+      # for one second (returned by the schedule_sleepers method) which
+      # will cause it.
+      def schedule(&block)
+        ret = begin
+          G.tick
+          RD.keys.each { |c| c.f.resume } # attempt to time out idle clients
+          t = schedule_sleepers
+          Kernel.select(RD.keys.concat(LISTENERS), WR.keys, nil, t) or return
+        rescue Errno::EINTR
+          retry
+        rescue Errno::EBADF, TypeError
+          LISTENERS.compact!
+          raise
+        end or return
+
+        # active writers first, then _all_ readers for keepalive timeout
+        ret[1].concat(RD.keys).each { |c| c.f.resume }
+
+        # accept is an expensive syscall, filter out listeners we don't want
+        (ret.first & LISTENERS).each(&block)
+      end
+
       # wakes up any sleepers that need to be woken and
       # returns an interval to IO.select on
-      def timer
+      def schedule_sleepers
         max = nil
         now = Time.now
         ZZ.delete_if { |fib, time|
@@ -46,7 +75,7 @@ module Rainbows
 
         begin # loop
           while ! hp.headers(env, buf)
-            buf << client.read_timeout or return
+            buf << (client.read_timeout or return)
           end
 
           env[RACK_INPUT] = 0 == hp.content_length ?
@@ -64,7 +93,6 @@ module Rainbows
           out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if hp.headers?
           HttpResponse.write(client, response, out)
         end while alive and hp.reset.nil? and env.clear
-        io.close
       rescue => e
         handle_error(io, e)
       ensure
@@ -72,6 +100,7 @@ module Rainbows
         RD.delete(client)
         WR.delete(client)
         ZZ.delete(client.f)
+        io.close unless io.closed?
       end
 
     end
diff --git a/lib/rainbows/fiber/io.rb b/lib/rainbows/fiber/io.rb
index bc6c0fe..5c51cb9 100644
--- a/lib/rainbows/fiber/io.rb
+++ b/lib/rainbows/fiber/io.rb
@@ -36,13 +36,13 @@ module Rainbows
 
       # used for reading headers (respecting keepalive_timeout)
       def read_timeout
-        expire = false
+        expire = nil
         begin
           to_io.read_nonblock(16384)
         rescue Errno::EAGAIN
           return if expire && expire < Time.now
           RD[self] = false
-          expire = Time.now + G.kato
+          expire ||= Time.now + G.kato
           ::Fiber.yield
           RD.delete(self)
           retry
diff --git a/lib/rainbows/fiber_pool.rb b/lib/rainbows/fiber_pool.rb
index 6cb2ca6..c647676 100644
--- a/lib/rainbows/fiber_pool.rb
+++ b/lib/rainbows/fiber_pool.rb
@@ -1,6 +1,5 @@
 # -*- encoding: binary -*-
 require 'rainbows/fiber'
-require 'pp'
 
 module Rainbows
 
@@ -26,26 +25,10 @@ module Rainbows
         }.resume # resume to hit ::Fiber.yield so it waits on a client
       }
       Fiber::Base.const_set(:APP, app)
-      rd = Fiber::RD
-      wr = Fiber::WR
 
       begin
-        ret = begin
-          G.tick
-          IO.select(rd.keys.concat(LISTENERS), wr.keys, nil, timer) or next
-        rescue Errno::EINTR
-          retry
-        rescue Errno::EBADF, TypeError
-          LISTENERS.compact!
-          G.cur > 0 ? retry : break
-        end
-
-        # active writers first, then _all_ readers for keepalive timeout
-        ret[1].concat(rd.keys).each { |c| c.f.resume }
-
-        # accept() is an expensive syscall
-        (ret.first & LISTENERS).each do |l|
-          fib = pool.shift or break
+        schedule do |l|
+          fib = pool.shift or break # let another worker process take it
           io = begin
             l.accept_nonblock
           rescue Errno::EAGAIN, Errno::ECONNABORTED
diff --git a/lib/rainbows/fiber_spawn.rb b/lib/rainbows/fiber_spawn.rb
index 004976a..83b64de 100644
--- a/lib/rainbows/fiber_spawn.rb
+++ b/lib/rainbows/fiber_spawn.rb
@@ -17,28 +17,10 @@ module Rainbows
       init_worker_process(worker)
       Fiber::Base.const_set(:APP, app)
       limit = worker_connections
-      rd = Rainbows::Fiber::RD
-      wr = Rainbows::Fiber::WR
       fio = Rainbows::Fiber::IO
 
       begin
-        ret = begin
-          IO.select(rd.keys.concat(LISTENERS), wr.keys, nil, timer) or next
-        rescue Errno::EINTR
-          G.tick
-          retry
-        rescue Errno::EBADF, TypeError
-          LISTENERS.compact!
-          G.cur > 0 ? retry : break
-        end
-        G.tick
-
-        # active writers first, then _all_ readers for keepalive timeout
-        ret[1].concat(rd.keys).each { |c| c.f.resume }
-        G.tick
-
-        # accept() is an expensive syscall
-        (ret.first & LISTENERS).each do |l|
+        schedule do |l|
           break if G.cur >= limit
           io = begin
             l.accept_nonblock
@@ -47,10 +29,9 @@ module Rainbows
           end
           ::Fiber.new { process_client(fio.new(io, ::Fiber.current)) }.resume
         end
-        G.tick
       rescue => e
         listen_loop_error(e)
-      end while G.tick || G.cur > 0
+      end while G.alive || G.cur > 0
     end
 
   end