about summary refs log tree commit homepage
diff options
context:
space:
mode:
-rw-r--r--lib/rainbows.rb1
-rw-r--r--lib/rainbows/epoll.rb22
-rw-r--r--lib/rainbows/epoll/client.rb226
-rw-r--r--lib/rainbows/epoll/response_chunk_pipe.rb18
-rw-r--r--lib/rainbows/epoll/response_pipe.rb38
-rw-r--r--lib/rainbows/epoll/server.rb43
-rw-r--r--lib/rainbows/epoll/state.rb22
-rw-r--r--lib/rainbows/http_server.rb2
-rw-r--r--t/GNUmakefile1
-rw-r--r--t/kgio-pipe-response.ru10
-rw-r--r--t/simple-http_Epoll.ru9
-rwxr-xr-xt/t0034-pipelined-pipe-response.sh3
-rwxr-xr-xt/t0035-kgio-pipe-response.sh2
-rwxr-xr-xt/t0113-rewindable-input-false.sh1
-rwxr-xr-xt/t0114-rewindable-input-true.sh1
-rw-r--r--t/test_isolate.rb2
16 files changed, 397 insertions, 4 deletions
diff --git a/lib/rainbows.rb b/lib/rainbows.rb
index 76cb728..5de8a80 100644
--- a/lib/rainbows.rb
+++ b/lib/rainbows.rb
@@ -121,6 +121,7 @@ module Rainbows
     :CoolioThreadSpawn => 50,
     :CoolioThreadPool => 50,
     :CoolioFiberSpawn => 50,
+    :Epoll => 50,
     :EventMachine => 50,
     :FiberSpawn => 50,
     :FiberPool => 50,
diff --git a/lib/rainbows/epoll.rb b/lib/rainbows/epoll.rb
new file mode 100644
index 0000000..8698f78
--- /dev/null
+++ b/lib/rainbows/epoll.rb
@@ -0,0 +1,22 @@
+# -*- encoding: binary -*-
+# :enddoc:
+require 'sleepy_penguin'
+require 'sendfile'
+
+# Edge-triggered epoll concurrency model.  This is extremely unfair
+# and optimized for throughput at the expense of fairness
+module Rainbows::Epoll
+  include Rainbows::Base
+  autoload :State, 'rainbows/epoll/state'
+  autoload :Server, 'rainbows/epoll/server'
+  autoload :Client, 'rainbows/epoll/client'
+  autoload :ResponsePipe, 'rainbows/epoll/response_pipe'
+  autoload :ResponseChunkPipe, 'rainbows/epoll/response_chunk_pipe'
+
+  def worker_loop(worker) # :nodoc:
+    init_worker_process(worker)
+    Rainbows::EvCore.setup
+    Rainbows::Client.__send__ :include, Client
+    Server.run
+  end
+end
diff --git a/lib/rainbows/epoll/client.rb b/lib/rainbows/epoll/client.rb
new file mode 100644
index 0000000..a3ae6db
--- /dev/null
+++ b/lib/rainbows/epoll/client.rb
@@ -0,0 +1,226 @@
+# -*- encoding: binary -*-
+# :enddoc:
+
+module Rainbows::Epoll::Client
+  attr_reader :wr_queue, :state, :epoll_active
+
+  include Rainbows::Epoll::State
+  include Rainbows::EvCore
+  APP = Rainbows.server.app
+  Server = Rainbows::Epoll::Server
+  IN = SleepyPenguin::Epoll::IN | SleepyPenguin::Epoll::ET
+  INLT = SleepyPenguin::Epoll::IN
+  OUT = SleepyPenguin::Epoll::OUT | SleepyPenguin::Epoll::ET
+  KATO = {}
+  KATO.compare_by_identity if KATO.respond_to?(:compare_by_identity)
+  KEEPALIVE_TIMEOUT = Rainbows.keepalive_timeout
+
+  def self.expire
+    if (ot = KEEPALIVE_TIMEOUT) >= 0
+      ot = Time.now - ot
+      KATO.delete_if { |client, time| time < ot and client.timeout! }
+    end
+  end
+
+  # only call this once
+  def epoll_once
+    @wr_queue = [] # may contain String, ResponsePipe, and StreamFile objects
+    @epoll_active = false
+    post_init
+    epoll_run
+    rescue => e
+      handle_error(e)
+  end
+
+  def on_readable
+    case rv = kgio_tryread(16384, RBUF)
+    when String
+      on_read(rv)
+      return if @wr_queue[0] || closed?
+    when :wait_readable
+      KATO[self] = Time.now if :headers == @state
+      return epoll_enable(IN)
+    else
+      break
+    end until :close == @state
+    close unless closed?
+    rescue IOError
+  end
+
+  def app_call # called by on_read()
+    @env[RACK_INPUT] = @input
+    @env[REMOTE_ADDR] = kgio_addr
+    status, headers, body = APP.call(@env.merge!(RACK_DEFAULTS))
+    ev_write_response(status, headers, body, @hp.next?)
+  end
+
+  def write_response_path(status, headers, body, alive)
+    io = body_to_io(body)
+    st = io.stat
+
+    if st.file?
+      defer_file(status, headers, body, alive, io, st)
+    elsif st.socket? || st.pipe?
+      chunk = stream_response_headers(status, headers, alive)
+      stream_response_body(body, io, chunk)
+    else
+      # char or block device... WTF?
+      write_response(status, headers, body, alive)
+    end
+  end
+
+  # used for streaming sockets and pipes
+  def stream_response_body(body, io, chunk)
+    pipe = (chunk ? Rainbows::Epoll::ResponseChunkPipe :
+                    Rainbows::Epoll::ResponsePipe).new(io, self, body)
+    return @wr_queue << pipe if @wr_queue[0]
+    stream_pipe(pipe) or return
+    @wr_queue[0] or @wr_queue << ""
+  end
+
+  def ev_write_response(status, headers, body, alive)
+    if body.respond_to?(:to_path)
+      write_response_path(status, headers, body, alive)
+    else
+      write_response(status, headers, body, alive)
+    end
+    @state = alive ? :headers : :close
+    on_read("") if alive && 0 == @wr_queue.size && 0 != @buf.size
+  end
+
+  def epoll_run
+    if @wr_queue[0]
+      on_writable
+    else
+      KATO.delete self
+      on_readable
+    end
+  end
+
+  def want_more
+    Server::ReRun << self
+  end
+
+  def on_deferred_write_complete
+    :close == @state and return close
+    0 == @buf.size ? on_readable : on_read("")
+  end
+
+  def handle_error(e)
+    msg = Rainbows::Error.response(e) and kgio_trywrite(msg) rescue nil
+    ensure
+      close
+  end
+
+  def write_deferred(obj)
+    Rainbows::StreamFile === obj ? stream_file(obj) : stream_pipe(obj)
+  end
+
+  # writes until our write buffer is empty or we block
+  # returns true if we're done writing everything
+  def on_writable
+    obj = @wr_queue.shift
+
+    case rv = String === obj ? kgio_trywrite(obj) : write_deferred(obj)
+    when nil
+      obj = @wr_queue.shift or return on_deferred_write_complete
+    when String
+      obj = rv # retry
+    when :wait_writable # Strings and StreamFiles only
+      @wr_queue.unshift(obj)
+      epoll_enable(OUT)
+      return
+    when :deferred
+      return
+    end while true
+    rescue => e
+      handle_error(e)
+  end
+
+  # this returns an +Array+ write buffer if blocked
+  def write(buf)
+    unless @wr_queue[0]
+      case rv = kgio_trywrite(buf)
+      when nil
+        return # all written
+      when String
+        buf = rv # retry
+      when :wait_writable
+        epoll_enable(OUT)
+        break # queue
+      end while true
+    end
+    @wr_queue << buf.dup # >3-word 1.9 strings are copy-on-write
+  end
+
+  def close
+    @wr_queue.each { |x| x.respond_to?(:close) and x.close rescue nil }
+    super
+    KATO.delete(self)
+    Server.decr
+  end
+
+  def timeout!
+    close
+    true
+  end
+
+  def defer_file(status, headers, body, alive, io, st)
+    if r = sendfile_range(status, headers)
+      status, headers, range = r
+      write_headers(status, headers, alive)
+      range and defer_file_stream(range[0], range[1], io, body)
+    else
+      write_headers(status, headers, alive)
+      defer_file_stream(0, st.size, io, body)
+    end
+  end
+
+  # returns +nil+ on EOF, :wait_writable if the client blocks
+  def stream_file(sf) # +sf+ is a Rainbows::StreamFile object
+    begin
+      sf.offset += (n = sendfile_nonblock(sf, sf.offset, sf.count))
+      0 == (sf.count -= n) and return sf.close
+    rescue Errno::EAGAIN
+      return :wait_writable
+    rescue
+      sf.close
+      raise
+    end while true
+  end
+
+  def defer_file_stream(offset, count, io, body)
+    sf = Rainbows::StreamFile.new(offset, count, io, body)
+    unless @wr_queue[0]
+      stream_file(sf) or return
+    end
+    @wr_queue << sf
+    epoll_enable(OUT)
+  end
+
+  # this alternates between a push and pull model from the pipe -> client
+  # to avoid having too much data in userspace on either end.
+  def stream_pipe(pipe)
+    case buf = pipe.tryread
+    when String
+      if Array === write(buf)
+        # client is blocked on write, client will pull from pipe later
+        pipe.epoll_disable
+        @wr_queue << pipe
+        epoll_enable(OUT)
+        return :deferred
+      end
+      # continue looping...
+    when :wait_readable
+      # pipe blocked on read, let the pipe push to the client in the future
+      epoll_disable
+      pipe.epoll_enable(IN)
+      return :deferred
+    else # nil => EOF
+      return pipe.close # nil
+    end while true
+    rescue => e
+      pipe.close
+      raise
+  end
+end
diff --git a/lib/rainbows/epoll/response_chunk_pipe.rb b/lib/rainbows/epoll/response_chunk_pipe.rb
new file mode 100644
index 0000000..3ad57a8
--- /dev/null
+++ b/lib/rainbows/epoll/response_chunk_pipe.rb
@@ -0,0 +1,18 @@
+# -*- encoding: binary -*-
+# :enddoc:
+#
+class Rainbows::Epoll::ResponseChunkPipe < Rainbows::Epoll::ResponsePipe
+  def tryread
+    @io or return
+
+    case rv = super
+    when String
+      "#{rv.size.to_s(16)}\r\n#{rv}\r\n"
+    when nil
+      close
+      "0\r\n\r\n"
+    else
+      rv
+    end
+  end
+end
diff --git a/lib/rainbows/epoll/response_pipe.rb b/lib/rainbows/epoll/response_pipe.rb
new file mode 100644
index 0000000..ce240f5
--- /dev/null
+++ b/lib/rainbows/epoll/response_pipe.rb
@@ -0,0 +1,38 @@
+# -*- encoding: binary -*-
+# :enddoc:
+#
+class Rainbows::Epoll::ResponsePipe
+  include Rainbows::Epoll::State
+  attr_reader :io
+  alias to_io io
+  IN = SleepyPenguin::Epoll::IN | SleepyPenguin::Epoll::ET
+  RBUF = Rainbows::EvCore::RBUF
+
+  def initialize(io, client, body)
+    @io, @client, @body = io, client, body
+    @epoll_active = false
+  end
+
+  def epoll_run
+    return close if @client.closed?
+    @client.stream_pipe(self) or @client.on_deferred_write_complete
+    rescue => e
+      close
+      @client.handle_error(e)
+  end
+
+  def close
+    epoll_disable
+    @body.respond_to?(:close) and @body.close
+    @io = @body = nil
+  end
+
+  def tryread
+    io = @io
+    io.respond_to?(:kgio_tryread) and return io.kgio_tryread(16384, RBUF)
+    io.read_nonblock(16384, RBUF)
+    rescue Errno::EAGAIN
+      :wait_readable
+    rescue EOFError
+  end
+end
diff --git a/lib/rainbows/epoll/server.rb b/lib/rainbows/epoll/server.rb
new file mode 100644
index 0000000..4586c95
--- /dev/null
+++ b/lib/rainbows/epoll/server.rb
@@ -0,0 +1,43 @@
+# -*- encoding: binary -*-
+# :nodoc:
+module Rainbows::Epoll::Server
+  IN = SleepyPenguin::Epoll::IN | SleepyPenguin::Epoll::ET
+  @@nr = 0
+  MAX = Rainbows.server.worker_connections
+  THRESH = MAX - 1
+  include Rainbows::Epoll::State
+  LISTENERS = Rainbows::HttpServer::LISTENERS
+  ReRun = []
+
+  def self.extended(obj)
+    obj.instance_variable_set(:@epoll_active, false)
+  end
+
+  def self.run
+    LISTENERS.each { |sock| sock.extend(self).epoll_enable(IN) }
+    begin
+      EP.wait(100, 1000) { |_, obj| obj.epoll_run }
+      while obj = ReRun.shift
+        obj.epoll_run
+      end
+      Rainbows::Epoll::Client.expire
+    rescue => e
+      Rainbows::Error.listen_loop(e)
+    end while Rainbows.tick || @@nr > 0
+  end
+
+  # rearms all listeners when there's a free slot
+  def self.decr
+    THRESH == (@@nr -= 1) and LISTENERS.each { |sock| sock.epoll_enable(IN) }
+  end
+
+  def epoll_run
+    return epoll_disable if @@nr >= MAX
+    while io = kgio_tryaccept
+      @@nr += 1
+      # there's a chance the client never even sees epoll for simple apps
+      io.epoll_once
+      return epoll_disable if @@nr >= MAX
+    end
+  end
+end
diff --git a/lib/rainbows/epoll/state.rb b/lib/rainbows/epoll/state.rb
new file mode 100644
index 0000000..6e554be
--- /dev/null
+++ b/lib/rainbows/epoll/state.rb
@@ -0,0 +1,22 @@
+# -*- encoding: binary -*-
+# :enddoc:
+# used to keep track of state for each descriptor and avoid
+# unneeded syscall or ENONENT overhead
+module Rainbows::Epoll::State
+  EP = SleepyPenguin::Epoll.new
+
+  def epoll_disable
+    @epoll_active or return
+    @epoll_active = false
+    EP.del(self)
+  end
+
+  def epoll_enable(flags)
+    if @epoll_active
+      flags == @epoll_active or
+        EP.mod(self, @epoll_active = flags)
+    else
+      EP.add(self, @epoll_active = flags)
+    end
+  end
+end
diff --git a/lib/rainbows/http_server.rb b/lib/rainbows/http_server.rb
index 84d5a32..a9767bf 100644
--- a/lib/rainbows/http_server.rb
+++ b/lib/rainbows/http_server.rb
@@ -72,7 +72,7 @@ class Rainbows::HttpServer < Unicorn::HttpServer
     new_defaults = {
       'rainbows.model' => (@use = model.to_sym),
       'rack.multithread' => !!(model.to_s =~ /Thread/),
-      'rainbows.autochunk' => [:Coolio,:Rev,
+      'rainbows.autochunk' => [:Coolio,:Rev,:Epoll,
                                :EventMachine,:NeverBlock].include?(@use),
     }
     Rainbows::Const::RACK_DEFAULTS.update(new_defaults)
diff --git a/t/GNUmakefile b/t/GNUmakefile
index 91e05f5..7b50944 100644
--- a/t/GNUmakefile
+++ b/t/GNUmakefile
@@ -19,6 +19,7 @@ endif
 RUBY_ENGINE := $(shell $(RUBY) -e 'puts((RUBY_ENGINE rescue "ruby"))')
 export RUBY_VERSION RUBY_ENGINE
 
+models += Epoll
 models += WriterThreadPool
 models += WriterThreadSpawn
 models += ThreadPool
diff --git a/t/kgio-pipe-response.ru b/t/kgio-pipe-response.ru
index edd2aac..9c70d47 100644
--- a/t/kgio-pipe-response.ru
+++ b/t/kgio-pipe-response.ru
@@ -1,10 +1,18 @@
 # must be run without Rack::Lint since that clobbers to_path
 use Rainbows::DevFdResponse
 run(lambda { |env|
+  io = case env["rainbows.model"].to_s
+  when /Fiber/
+    Rainbows::Fiber::IO::Pipe
+  else
+    Kgio::Pipe
+  end.popen('cat random_blob', 'rb')
+
   [ 200,
     {
       'Content-Length' => ::File.stat('random_blob').size.to_s,
       'Content-Type' => 'application/octet-stream',
     },
-    Rainbows::Fiber::IO::Pipe.popen('cat random_blob', 'rb') ]
+    io
+  ]
 })
diff --git a/t/simple-http_Epoll.ru b/t/simple-http_Epoll.ru
new file mode 100644
index 0000000..6513343
--- /dev/null
+++ b/t/simple-http_Epoll.ru
@@ -0,0 +1,9 @@
+use Rack::ContentLength
+use Rack::ContentType
+run lambda { |env|
+  if env['rack.multithread'] == false && env['rainbows.model'] == :Epoll
+    [ 200, {}, [ Thread.current.inspect << "\n" ] ]
+  else
+    raise env.inspect
+  end
+}
diff --git a/t/t0034-pipelined-pipe-response.sh b/t/t0034-pipelined-pipe-response.sh
index 8346af9..6dff9ad 100755
--- a/t/t0034-pipelined-pipe-response.sh
+++ b/t/t0034-pipelined-pipe-response.sh
@@ -22,12 +22,13 @@ require "kcar"
 $stdin.binmode
 expect = ENV["random_blob_sha1"]
 kcar = Kcar::Response.new($stdin, {})
-3.times do
+3.times do |i|
         nr = 0
         status, headers, body = kcar.rack
         dig = Digest::SHA1.new
         body.each { |buf| dig << buf ; nr += buf.size }
         sha1 = dig.hexdigest
+        warn "[#{i}] nr: #{nr}"
         sha1 == expect or abort "mismatch: sha1=#{sha1} != expect=#{expect}"
         body.close
 end
diff --git a/t/t0035-kgio-pipe-response.sh b/t/t0035-kgio-pipe-response.sh
index 97c3f2a..c4b1096 100755
--- a/t/t0035-kgio-pipe-response.sh
+++ b/t/t0035-kgio-pipe-response.sh
@@ -2,7 +2,7 @@
 . ./test-lib.sh
 test -r random_blob || die "random_blob required, run with 'make $0'"
 case $model in
-*Fiber* ) ;;
+*Fiber*|Epoll) ;;
 *)
         t_info "skipping $T since it's not compatible with $model"
         exit 0
diff --git a/t/t0113-rewindable-input-false.sh b/t/t0113-rewindable-input-false.sh
index 1ab79bf..82b0fb7 100755
--- a/t/t0113-rewindable-input-false.sh
+++ b/t/t0113-rewindable-input-false.sh
@@ -3,6 +3,7 @@
 skip_models EventMachine NeverBlock
 skip_models Rev RevThreadSpawn RevThreadPool
 skip_models Coolio CoolioThreadSpawn CoolioThreadPool
+skip_models Epoll
 
 t_plan 4 "rewindable_input toggled to false"
 
diff --git a/t/t0114-rewindable-input-true.sh b/t/t0114-rewindable-input-true.sh
index 7e337ea..fd8561c 100755
--- a/t/t0114-rewindable-input-true.sh
+++ b/t/t0114-rewindable-input-true.sh
@@ -3,6 +3,7 @@
 skip_models EventMachine NeverBlock
 skip_models Rev RevThreadSpawn RevThreadPool
 skip_models Coolio CoolioThreadSpawn CoolioThreadPool
+skip_models Epoll
 
 t_plan 4 "rewindable_input toggled to true"
 
diff --git a/t/test_isolate.rb b/t/test_isolate.rb
index 9b0c026..f0f16f1 100644
--- a/t/test_isolate.rb
+++ b/t/test_isolate.rb
@@ -33,6 +33,8 @@ Isolate.now!(opts) do
     gem 'revactor', '0.1.5'
     gem 'rack-fiber_pool', '0.9.1'
   end
+
+  gem 'sleepy_penguin', '1.2.0'
 end
 
 $stdout.reopen(old_out)