about summary refs log tree commit homepage
diff options
context:
space:
mode:
-rw-r--r--lib/rainbows.rb1
-rw-r--r--lib/rainbows/process_client.rb21
-rw-r--r--lib/rainbows/xepoll_thread_spawn.rb16
-rw-r--r--lib/rainbows/xepoll_thread_spawn/client.rb120
-rw-r--r--t/GNUmakefile1
-rw-r--r--t/simple-http_XEpollThreadSpawn.ru10
-rwxr-xr-xt/t0022-copy_stream-byte-range.sh1
-rwxr-xr-xt/t9100-thread-timeout.sh1
-rwxr-xr-xt/t9101-thread-timeout-threshold.sh1
9 files changed, 172 insertions, 0 deletions
diff --git a/lib/rainbows.rb b/lib/rainbows.rb
index 391c41d..fccfe8b 100644
--- a/lib/rainbows.rb
+++ b/lib/rainbows.rb
@@ -132,6 +132,7 @@ module Rainbows
     :FiberPool => 50,
     :ActorSpawn => 50,
     :NeverBlock => 50,
+    :XEpollThreadSpawn => 50,
   }.each do |model, _|
     u = model.to_s.gsub(/([a-z0-9])([A-Z0-9])/) { "#{$1}_#{$2.downcase!}" }
     autoload model, "rainbows/#{u.downcase!}"
diff --git a/lib/rainbows/process_client.rb b/lib/rainbows/process_client.rb
index bf6d20b..24132f5 100644
--- a/lib/rainbows/process_client.rb
+++ b/lib/rainbows/process_client.rb
@@ -46,4 +46,25 @@ module Rainbows::ProcessClient
   def set_input(env, hp)
     env[RACK_INPUT] = 0 == hp.content_length ? NULL_IO : IC.new(self, hp)
   end
+
+  def process_pipeline(env, hp)
+    begin
+      set_input(env, hp)
+      env[REMOTE_ADDR] = kgio_addr
+      status, headers, body = APP.call(env.merge!(RACK_DEFAULTS))
+      if 100 == status.to_i
+        write(EXPECT_100_RESPONSE)
+        env.delete(HTTP_EXPECT)
+        status, headers, body = APP.call(env)
+      end
+      write_response(status, headers, body, alive = hp.next?)
+    end while alive && env = pipeline_ready(hp)
+    alive or close
+    rescue => e
+      handle_error(e)
+  end
+
+  # override this in subclass/module
+  def pipeline_ready
+  end
 end
diff --git a/lib/rainbows/xepoll_thread_spawn.rb b/lib/rainbows/xepoll_thread_spawn.rb
new file mode 100644
index 0000000..6e6ec5b
--- /dev/null
+++ b/lib/rainbows/xepoll_thread_spawn.rb
@@ -0,0 +1,16 @@
+# -*- encoding: binary -*-
+
+module Rainbows::XEpollThreadSpawn
+  include Rainbows::Base
+
+  def init_worker_process(worker)
+    super
+    require "rainbows/xepoll_thread_spawn/client"
+    Rainbows::Client.__send__ :include, Client
+  end
+
+  def worker_loop(worker) # :nodoc:
+    init_worker_process(worker)
+    Client.loop
+  end
+end
diff --git a/lib/rainbows/xepoll_thread_spawn/client.rb b/lib/rainbows/xepoll_thread_spawn/client.rb
new file mode 100644
index 0000000..bb1f324
--- /dev/null
+++ b/lib/rainbows/xepoll_thread_spawn/client.rb
@@ -0,0 +1,120 @@
+# -*- encoding: binary -*-
+require "thread"
+require "sleepy_penguin"
+require "raindrops"
+
+module Rainbows::XEpollThreadSpawn::Client
+  N = Raindrops.new(1)
+  max = Rainbows.server.worker_connections
+  ACCEPTORS = Rainbows::HttpServer::LISTENERS.map do |sock|
+    Thread.new do
+      begin
+        if io = sock.kgio_accept(Rainbows::Client)
+          N.incr(0, 1)
+          io.epoll_once
+        end
+        sleep while N[0] >= max
+      rescue => e
+        Rainbows::Error.listen_loop(e)
+      end while Rainbows.alive
+    end
+  end
+
+  ep = SleepyPenguin::Epoll
+  EP = ep.new
+  IN = ep::IN | ep::ET | ep::ONESHOT
+  THRESH = max - 1
+  KATO = {}
+  KATO.compare_by_identity if KATO.respond_to?(:compare_by_identity)
+  LOCK = Mutex.new
+  @@last_expire = Time.now
+
+  def kato_set
+    LOCK.synchronize { KATO[self] = @@last_expire }
+    EP.set(self, IN)
+  end
+
+  def kato_delete
+    LOCK.synchronize { KATO.delete self }
+  end
+
+  def self.loop
+    begin
+      EP.wait(nil, 1000) { |fl, obj| obj.epoll_run }
+      expire
+    rescue Errno::EINTR
+    rescue => e
+      Rainbows::Error.listen_loop(e)
+    end while Rainbows.tick || N[0] > 0
+    Rainbows::JoinThreads.acceptors(ACCEPTORS)
+  end
+
+  def self.expire
+    return if ((now = Time.now) - @@last_expire) < 1.0
+    if (ot = Rainbows.keepalive_timeout) >= 0
+      ot = now - ot
+      defer = []
+      LOCK.synchronize do
+        KATO.delete_if { |client, time| time < ot and client.timeout!(defer) }
+      end
+      defer.each { |io| io.closed? or io.close }
+    end
+    @@last_expire = now
+  end
+
+  def epoll_once
+    @hp = Rainbows::HttpParser.new
+    @buf2 = ""
+    epoll_run
+  end
+
+  def timeout!(defer)
+    defer << self
+  end
+
+  def close
+    super
+    kato_delete
+    N.decr(0, 1) == THRESH and ACCEPTORS.each { |t| t.run }
+  end
+
+  def handle_error(e)
+    super
+    ensure
+      closed? or close
+  end
+
+  def epoll_run
+    case kgio_tryread(0x4000, @buf2)
+    when :wait_readable
+      return kato_set
+    when String
+      kato_delete
+      @hp.buf << @buf2
+      env = @hp.parse and return spawn(env, @hp)
+    else
+      return close
+    end while true
+    rescue => e
+      handle_error(e)
+  end
+
+  def spawn(env, hp)
+    Thread.new { process_pipeline(env, hp) }
+  end
+
+  def pipeline_ready(hp)
+    env = hp.parse and return env
+    case kgio_tryread(0x4000, @buf2)
+    when :wait_readable
+      kato_set
+      return false
+    when String
+      hp.buf << @buf2
+      env = hp.parse and return env
+      # continue loop
+    else
+      return close
+    end while true
+  end
+end
diff --git a/t/GNUmakefile b/t/GNUmakefile
index 7cb7db4..408eabf 100644
--- a/t/GNUmakefile
+++ b/t/GNUmakefile
@@ -21,6 +21,7 @@ export RUBY_VERSION RUBY_ENGINE
 
 ifeq (Linux,$(shell uname -s))
   models += XEpoll
+  models += XEpollThreadSpawn
   models += Epoll
 endif
 models += WriterThreadPool
diff --git a/t/simple-http_XEpollThreadSpawn.ru b/t/simple-http_XEpollThreadSpawn.ru
new file mode 100644
index 0000000..e89fccc
--- /dev/null
+++ b/t/simple-http_XEpollThreadSpawn.ru
@@ -0,0 +1,10 @@
+use Rack::ContentLength
+use Rack::ContentType
+run lambda { |env|
+  if env['rack.multithread'] == true &&
+    env['rainbows.model'] == :XEpollThreadSpawn
+    [ 200, {}, [ Thread.current.inspect << "\n" ] ]
+  else
+    raise env.inspect
+  end
+}
diff --git a/t/t0022-copy_stream-byte-range.sh b/t/t0022-copy_stream-byte-range.sh
index 7539c02..3e0a66b 100755
--- a/t/t0022-copy_stream-byte-range.sh
+++ b/t/t0022-copy_stream-byte-range.sh
@@ -11,6 +11,7 @@ esac
 
 case $model in
 ThreadSpawn|WriterThreadSpawn|ThreadPool|WriterThreadPool|Base) ;;
+XEpollThreadSpawn) ;;
 *)
         t_info "skipping $T since it doesn't use IO.copy_stream"
         exit 0
diff --git a/t/t9100-thread-timeout.sh b/t/t9100-thread-timeout.sh
index 422052e..8d61cc5 100755
--- a/t/t9100-thread-timeout.sh
+++ b/t/t9100-thread-timeout.sh
@@ -4,6 +4,7 @@ case $model in
 ThreadSpawn|ThreadPool) ;;
 RevThreadSpawn|RevThreadPool) ;;
 CoolioThreadSpawn|CoolioThreadPool) ;;
+XEpollThreadSpawn) ;;
 *) t_info "$0 is only compatible with Thread*"; exit 0 ;;
 esac
 
diff --git a/t/t9101-thread-timeout-threshold.sh b/t/t9101-thread-timeout-threshold.sh
index 7309475..67e65f6 100755
--- a/t/t9101-thread-timeout-threshold.sh
+++ b/t/t9101-thread-timeout-threshold.sh
@@ -4,6 +4,7 @@ case $model in
 ThreadSpawn|ThreadPool) ;;
 RevThreadSpawn|RevThreadPool) ;;
 CoolioThreadSpawn|CoolioThreadPool) ;;
+XEpollThreadSpawn) ;;
 *) t_info "$0 is only compatible with Thread*"; exit 0 ;;
 esac