about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2009-12-22 00:32:04 -0800
committerEric Wong <normalperson@yhbt.net>2009-12-22 00:38:04 -0800
commit19960488441651d689259071fa1be4f6957d681b (patch)
tree80c5752ad4484361c74a081d06fb6e43632d860a
parentee7fe220ccbc991e1e7cbe982caf48e3303274c7 (diff)
downloadrainbows-19960488441651d689259071fa1be4f6957d681b.tar.gz
This is like the traditional FiberSpawn, but more scalable (but
not necessarily faster) as it can use epoll or kqueue.
-rw-r--r--lib/rainbows.rb1
-rw-r--r--lib/rainbows/app_pool.rb2
-rw-r--r--lib/rainbows/dev_fd_response.rb2
-rw-r--r--lib/rainbows/fiber/rev.rb166
-rw-r--r--lib/rainbows/rev_fiber_spawn.rb29
-rw-r--r--t/simple-http_RevFiberSpawn.ru10
-rw-r--r--t/sleep.ru2
-rw-r--r--t/t9000.ru2
8 files changed, 212 insertions, 2 deletions
diff --git a/lib/rainbows.rb b/lib/rainbows.rb
index 4b80056..64c4d9e 100644
--- a/lib/rainbows.rb
+++ b/lib/rainbows.rb
@@ -87,6 +87,7 @@ module Rainbows
     :FiberPool => 50,
     :ActorSpawn => 50,
     :NeverBlock => 50,
+    :RevFiberSpawn => 50,
   }.each do |model, _|
     u = model.to_s.gsub(/([a-z0-9])([A-Z0-9])/) { "#{$1}_#{$2.downcase!}" }
     autoload model, "rainbows/#{u.downcase!}"
diff --git a/lib/rainbows/app_pool.rb b/lib/rainbows/app_pool.rb
index 036fe9c..a1a3119 100644
--- a/lib/rainbows/app_pool.rb
+++ b/lib/rainbows/app_pool.rb
@@ -91,7 +91,7 @@ module Rainbows
       # concurrency models
       self.re ||= begin
         case env["rainbows.model"]
-        when :FiberSpawn, :FiberPool, :Revactor, :NeverBlock
+        when :FiberSpawn, :FiberPool, :Revactor, :NeverBlock, :RevFiberSpawn
           self.pool = Rainbows::Fiber::Queue.new(pool)
         end
         true
diff --git a/lib/rainbows/dev_fd_response.rb b/lib/rainbows/dev_fd_response.rb
index 9ad326c..bab35bc 100644
--- a/lib/rainbows/dev_fd_response.rb
+++ b/lib/rainbows/dev_fd_response.rb
@@ -39,7 +39,7 @@ module Rainbows
 
         # we need to make sure our pipe output is Fiber-compatible
         case env["rainbows.model"]
-        when :FiberSpawn, :FiberPool
+        when :FiberSpawn, :FiberPool, :RevFiberSpawn
           return [ status, headers.to_hash, Fiber::IO.new(io,::Fiber.current) ]
         end
       else # unlikely, char/block device file, directory, ...
diff --git a/lib/rainbows/fiber/rev.rb b/lib/rainbows/fiber/rev.rb
new file mode 100644
index 0000000..36a46d4
--- /dev/null
+++ b/lib/rainbows/fiber/rev.rb
@@ -0,0 +1,166 @@
+# -*- encoding: binary -*-
+require 'rev'
+require 'rainbows/fiber'
+require 'rainbows/fiber/io'
+
+module Rainbows::Fiber
+  module Rev
+    G = Rainbows::G
+
+    # keep-alive timeout class
+    class Kato < ::Rev::TimerWatcher
+      def initialize
+        @watch = []
+        super(1, true)
+      end
+
+      def <<(fiber)
+        @watch << fiber
+        enable unless enabled?
+      end
+
+      def on_timer
+        @watch.uniq!
+        while f = @watch.shift
+          f.resume if f.alive?
+        end
+        disable
+      end
+    end
+
+    class Heartbeat < ::Rev::TimerWatcher
+      def on_timer
+        exit if (! G.tick && G.cur <= 0)
+      end
+    end
+
+    class Sleeper < ::Rev::TimerWatcher
+
+      def initialize(seconds)
+        @f = ::Fiber.current
+        super(seconds, false)
+        attach(::Rev::Loop.default)
+        ::Fiber.yield
+      end
+
+      def on_timer
+        @f.resume
+      end
+    end
+
+    class Server < ::Rev::IOWatcher
+      include Unicorn
+      include Rainbows
+      include Rainbows::Const
+      FIO = Rainbows::Fiber::IO
+
+      def to_io
+        @io
+      end
+
+      def initialize(io)
+        @io = io
+        super(self, :r)
+      end
+
+      def close
+        detach if attached?
+        @io.close
+      end
+
+      def on_readable
+        return if G.cur >= MAX
+        c = Rainbows.accept(@io) and ::Fiber.new { process(c) }.resume
+      end
+
+      def process(io)
+        G.cur += 1
+        client = FIO.new(io, ::Fiber.current)
+        buf = client.read_timeout or return
+        hp = HttpParser.new
+        env = {}
+        alive = true
+        remote_addr = TCPSocket === io ? io.peeraddr.last : LOCALHOST
+
+        begin # loop
+          buf << (client.read_timeout or return) until hp.headers(env, buf)
+
+          env[CLIENT_IO] = client
+          env[RACK_INPUT] = 0 == hp.content_length ?
+                    HttpRequest::NULL_IO : TeeInput.new(client, env, hp, buf)
+          env[REMOTE_ADDR] = remote_addr
+          response = APP.call(env.update(RACK_DEFAULTS))
+
+          if 100 == response.first.to_i
+            client.write(EXPECT_100_RESPONSE)
+            env.delete(HTTP_EXPECT)
+            response = APP.call(env)
+          end
+
+          alive = hp.keepalive? && G.alive
+          out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if hp.headers?
+          HttpResponse.write(client, response, out)
+        end while alive and hp.reset.nil? and env.clear
+      rescue => e
+        Error.write(io, e)
+      ensure
+        G.cur -= 1
+        client.close
+      end
+    end
+
+    # TODO: env["rainbows.sleep"]
+    def self.sleep(seconds)
+      Sleeper.new(seconds)
+    end
+
+  end
+
+  class IO # see rainbows/fiber/io for original definition
+
+    class Watcher < ::Rev::IOWatcher
+      def initialize(fio, flag)
+        @fiber = fio.f
+        super(fio, flag)
+        attach(::Rev::Loop.default)
+      end
+
+      def on_readable
+        @fiber.resume
+      end
+
+      alias on_writable on_readable
+    end
+
+    undef_method :wait_readable
+    undef_method :wait_writable
+    undef_method :close
+
+    def initialize(*args)
+      super
+      @r = @w = false
+    end
+
+    def close
+      @w.detach if @w
+      @r.detach if @r
+      @r = @w = false
+      to_io.close unless to_io.closed?
+    end
+
+    def wait_writable
+      @w ||= Watcher.new(self, :w)
+      @w.enable unless @w.enabled?
+      ::Fiber.yield
+      @w.disable
+    end
+
+    def wait_readable
+      @r ||= Watcher.new(self, :r)
+      @r.enable unless @r.enabled?
+      KATO << f
+      ::Fiber.yield
+      @r.disable
+    end
+  end
+end
diff --git a/lib/rainbows/rev_fiber_spawn.rb b/lib/rainbows/rev_fiber_spawn.rb
new file mode 100644
index 0000000..4ce2995
--- /dev/null
+++ b/lib/rainbows/rev_fiber_spawn.rb
@@ -0,0 +1,29 @@
+# -*- encoding: binary -*-
+require 'rainbows/fiber/rev'
+
+module Rainbows
+
+  # A combination of the Rev and FiberSpawn models.  This allows Ruby
+  # 1.9 Fiber-based concurrency for application processing while
+  # exposing a synchronous execution model and using scalable network
+  # concurrency provided by Rev.  A "rack.input" is exposed as well
+  # being Sunshowers-compatible.  Applications are strongly advised to
+  # wrap all slow IO objects (sockets, pipes) using the
+  # Rainbows::Fiber::IO or similar class whenever possible.
+  module RevFiberSpawn
+
+    include Base
+    include Fiber::Rev
+
+    def worker_loop(worker)
+      init_worker_process(worker)
+      Server.const_set(:MAX, @worker_connections)
+      Server.const_set(:APP, G.server.app)
+      Heartbeat.new(1, true).attach(::Rev::Loop.default)
+      kato = Kato.new.attach(::Rev::Loop.default)
+      Rainbows::Fiber::IO.const_set(:KATO, kato)
+      LISTENERS.map! { |s| Server.new(s).attach(::Rev::Loop.default) }
+      ::Rev::Loop.default.run
+    end
+  end
+end
diff --git a/t/simple-http_RevFiberSpawn.ru b/t/simple-http_RevFiberSpawn.ru
new file mode 100644
index 0000000..ea9b0e3
--- /dev/null
+++ b/t/simple-http_RevFiberSpawn.ru
@@ -0,0 +1,10 @@
+use Rack::ContentLength
+use Rack::ContentType
+run lambda { |env|
+  if env['rack.multithread'] == false &&
+    env['rainbows.model'] == :RevFiberSpawn
+    [ 200, {}, [ Thread.current.inspect << "\n" ] ]
+  else
+    raise env.inspect
+  end
+}
diff --git a/t/sleep.ru b/t/sleep.ru
index 2df22ce..d0fd832 100644
--- a/t/sleep.ru
+++ b/t/sleep.ru
@@ -12,6 +12,8 @@ run lambda { |env|
     Rainbows::Fiber
   when :Revactor
     Actor
+  when :RevFiberSpawn
+    Rainbows::Fiber::Rev
   else
     Kernel
   end).sleep(nr)
diff --git a/t/t9000.ru b/t/t9000.ru
index 4ca36c1..abf36b2 100644
--- a/t/t9000.ru
+++ b/t/t9000.ru
@@ -8,6 +8,8 @@ class Sleeper
       Rainbows::Fiber
     when :Revactor
       Actor
+    when :RevFiberSpawn
+      Rainbows::Fiber::Rev
     else
       Kernel
     end).sleep(1)