about summary refs log tree commit homepage
path: root/lib/rainbows
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2009-10-02 20:44:03 -0700
committerEric Wong <normalperson@yhbt.net>2009-10-02 21:21:28 -0700
commit37a12997628fcab722512f8a6370b92d44e33529 (patch)
tree9ced4ceaee3d4d6ce21dd1742f037d1d79a01e61 /lib/rainbows
downloadrainbows-37a12997628fcab722512f8a6370b92d44e33529.tar.gz
No tests yet, but the old "gossamer" and "rainbows" branches
seem to be basically working.
Diffstat (limited to 'lib/rainbows')
-rw-r--r--lib/rainbows/configurator.rb25
-rw-r--r--lib/rainbows/const.rb22
-rw-r--r--lib/rainbows/http_response.rb35
-rw-r--r--lib/rainbows/http_server.rb34
-rw-r--r--lib/rainbows/revactor.rb115
-rw-r--r--lib/rainbows/revactor/tee_input.rb44
-rw-r--r--lib/rainbows/thread_base.rb60
-rw-r--r--lib/rainbows/thread_pool.rb84
8 files changed, 419 insertions, 0 deletions
diff --git a/lib/rainbows/configurator.rb b/lib/rainbows/configurator.rb
new file mode 100644
index 0000000..449cdd9
--- /dev/null
+++ b/lib/rainbows/configurator.rb
@@ -0,0 +1,25 @@
+require 'rainbows'
+module Rainbows
+
+  class Configurator < ::Unicorn::Configurator
+
+    def use(model)
+      begin
+        model = Rainbows.const_get(model)
+      rescue NameError
+        raise ArgumentError, "concurrency model #{model.inspect} not supported"
+      end
+
+      Module === model or
+        raise ArgumentError, "concurrency model #{model.inspect} not supported"
+      set[:use] = model
+    end
+
+    def worker_connections(nr)
+      (Integer === nr && nr > 0) || nr.nil? or
+        raise ArgumentError, "worker_connections must be an Integer or nil"
+    end
+
+  end
+
+end
diff --git a/lib/rainbows/const.rb b/lib/rainbows/const.rb
new file mode 100644
index 0000000..9606b80
--- /dev/null
+++ b/lib/rainbows/const.rb
@@ -0,0 +1,22 @@
+module Rainbows
+
+  module Const
+    RAINBOWS_VERSION = '0.93.0'
+
+    include Unicorn::Const
+
+    RACK_DEFAULTS = ::Unicorn::HttpRequest::DEFAULTS.merge({
+
+      # we need to observe many of the rules for thread-safety even
+      # with Revactor or Rev, so we're considered multithread-ed even
+      # when we're not technically...
+      "rack.multithread" => true,
+      "SERVER_SOFTWARE" => "Rainbows #{RAINBOWS_VERSION}",
+    })
+
+    CONN_CLOSE = "Connection: close\r\n"
+    CONN_ALIVE = "Connection: keep-alive\r\n"
+    LOCALHOST = "127.0.0.1"
+
+  end
+end
diff --git a/lib/rainbows/http_response.rb b/lib/rainbows/http_response.rb
new file mode 100644
index 0000000..ebaa4e7
--- /dev/null
+++ b/lib/rainbows/http_response.rb
@@ -0,0 +1,35 @@
+# -*- encoding: binary -*-
+require 'time'
+require 'rainbows'
+
+module Rainbows
+
+  class HttpResponse < ::Unicorn::HttpResponse
+
+    def self.write(socket, rack_response, out = [])
+      status, headers, body = rack_response
+
+      if Array === out
+        status = CODES[status.to_i] || status
+
+        headers.each do |key, value|
+          next if SKIP.include?(key.downcase)
+          if value =~ /\n/
+            out.concat(value.split(/\n/).map! { |v| "#{key}: #{v}\r\n" })
+          else
+            out << "#{key}: #{value}\r\n"
+          end
+        end
+
+        socket.write("HTTP/1.1 #{status}\r\n" \
+                     "Date: #{Time.now.httpdate}\r\n" \
+                     "Status: #{status}\r\n" \
+                     "#{out.join('')}\r\n")
+      end
+
+      body.each { |chunk| socket.write(chunk) }
+      ensure
+        body.respond_to?(:close) and body.close rescue nil
+    end
+  end
+end
diff --git a/lib/rainbows/http_server.rb b/lib/rainbows/http_server.rb
new file mode 100644
index 0000000..355f3c5
--- /dev/null
+++ b/lib/rainbows/http_server.rb
@@ -0,0 +1,34 @@
+# -*- encoding: binary -*-
+require 'rainbows'
+module Rainbows
+
+  class HttpServer < ::Unicorn::HttpServer
+    include Rainbows
+
+    attr_accessor :worker_connections
+    attr_reader :use
+
+    def initialize(app, options)
+      self.app = app
+      self.reexec_pid = 0
+      self.init_listeners = options[:listeners] ? options[:listeners].dup : []
+      self.config = Configurator.new(options.merge(:use_defaults => true))
+      self.listener_opts = {}
+      config.commit!(self, :skip => [:listeners, :pid])
+
+      defined?(@use) or
+        self.use = Rainbows.const_get(:ThreadPool)
+      defined?(@worker_connections) or
+        @worker_connections = 4
+
+      #self.orig_app = app
+    end
+
+    def use=(model)
+      (class << self; self; end).instance_eval { include model }
+      @use = model
+    end
+
+  end
+
+end
diff --git a/lib/rainbows/revactor.rb b/lib/rainbows/revactor.rb
new file mode 100644
index 0000000..4c04079
--- /dev/null
+++ b/lib/rainbows/revactor.rb
@@ -0,0 +1,115 @@
+require 'rainbows'
+require 'revactor'
+
+module Rainbows
+
+  module Revactor
+    require 'rainbows/revactor/tee_input'
+
+    include Unicorn
+    include Rainbows::Const
+    HttpServer.constants.each  { |x| const_set(x, HttpServer.const_get(x)) }
+
+    # once a client is accepted, it is processed in its entirety here
+    # in 3 easy steps: read request, call app, write app response
+    def process_client(client)
+      buf = client.read or return # this probably does not happen...
+      hp = HttpParser.new
+      env = {}
+      remote_addr = client.remote_addr
+
+      begin
+        while ! hp.headers(env, buf)
+          buf << client.read
+        end
+
+        env[Const::RACK_INPUT] = 0 == hp.content_length ?
+                 HttpRequest::NULL_IO :
+                 Rainbows::Revactor::TeeInput.new(client, env, hp, buf)
+        env[Const::REMOTE_ADDR] = remote_addr
+        response = app.call(env.update(RACK_DEFAULTS))
+
+        if 100 == response.first.to_i
+          client.write(Const::EXPECT_100_RESPONSE)
+          env.delete(Const::HTTP_EXPECT)
+          response = app.call(env)
+        end
+
+        out = [ hp.keepalive? ? CONN_ALIVE : CONN_CLOSE ] if hp.headers?
+        HttpResponse.write(client, response, out)
+      end while hp.keepalive? and hp.reset.nil? and env.clear
+      client.close
+    # if we get any error, try to write something back to the client
+    # assuming we haven't closed the socket, but don't get hung up
+    # if the socket is already closed or broken.  We'll always ensure
+    # the socket is closed at the end of this function
+    rescue EOFError,Errno::ECONNRESET,Errno::EPIPE,Errno::EINVAL,Errno::EBADF
+      emergency_response(client, Const::ERROR_500_RESPONSE)
+    rescue HttpParserError # try to tell the client they're bad
+      buf.empty? or emergency_response(client, Const::ERROR_400_RESPONSE)
+    rescue Object => e
+      emergency_response(client, Const::ERROR_500_RESPONSE)
+      logger.error "Read error: #{e.inspect}"
+      logger.error e.backtrace.join("\n")
+    end
+
+    # runs inside each forked worker, this sits around and waits
+    # for connections and doesn't die until the parent dies (or is
+    # given a INT, QUIT, or TERM signal)
+    def worker_loop(worker)
+      ppid = master_pid
+      init_worker_process(worker)
+      alive = worker.tmp # tmp is our lifeline to the master process
+
+      trap(:USR1) { reopen_worker_logs(worker.nr) }
+      trap(:QUIT) { alive = false; LISTENERS.each { |s| s.close rescue nil } }
+      [:TERM, :INT].each { |sig| trap(sig) { exit!(0) } } # instant shutdown
+
+      Actor.current.trap_exit = true
+
+      listeners = LISTENERS.map do |s|
+        TCPServer === s ? ::Revactor::TCP.listen(s, nil) : nil
+      end.compact
+
+      logger.info "worker=#{worker.nr} ready with Revactor"
+      clients = []
+
+      listeners.map! do |s|
+        Actor.spawn(s) do |l|
+          begin
+            clients << Actor.spawn(l.accept) { |c| process_client(c) }
+          rescue Errno::EAGAIN, Errno::ECONNABORTED
+          rescue Object => e
+            if alive
+              logger.error "Unhandled listen loop exception #{e.inspect}."
+              logger.error e.backtrace.join("\n")
+            end
+          end while alive
+        end
+      end
+
+      nr = 0
+      begin
+        Actor.sleep 1
+        clients.delete_if { |c| c.dead? }
+        if alive
+          alive.chmod(nr = 0 == nr ? 1 : 0)
+          ppid == Process.ppid or alive = false
+        end
+      end while alive || ! clients.empty?
+    end
+
+  private
+
+    # write a response without caring if it went out or not
+    # This is in the case of untrappable errors
+    def emergency_response(client, response_str)
+      client.instance_eval do
+        # this is Revactor implementation dependent
+        @_io.write_nonblock(response_str) rescue nil
+      end
+      client.close rescue nil
+    end
+
+  end
+end
diff --git a/lib/rainbows/revactor/tee_input.rb b/lib/rainbows/revactor/tee_input.rb
new file mode 100644
index 0000000..92effb4
--- /dev/null
+++ b/lib/rainbows/revactor/tee_input.rb
@@ -0,0 +1,44 @@
+# -*- encoding: binary -*-
+require 'rainbows/revactor'
+
+module Rainbows
+  module Revactor
+
+    # acts like tee(1) on an input input to provide a input-like stream
+    # while providing rewindable semantics through a File/StringIO
+    # backing store.  On the first pass, the input is only read on demand
+    # so your Rack application can use input notification (upload progress
+    # and like).  This should fully conform to the Rack::InputWrapper
+    # specification on the public API.  This class is intended to be a
+    # strict interpretation of Rack::InputWrapper functionality and will
+    # not support any deviations from it.
+    class TeeInput < ::Unicorn::TeeInput
+
+    private
+
+      # tees off a +length+ chunk of data from the input into the IO
+      # backing store as well as returning it.  +dst+ must be specified.
+      # returns nil if reading from the input returns nil
+      def tee(length, dst)
+        unless parser.body_eof?
+          begin
+            if parser.filter_body(dst, buf << socket.read).nil?
+              @tmp.write(dst)
+              return dst
+            end
+          rescue EOFError
+          end
+        end
+        finalize_input
+      end
+
+      def finalize_input
+        while parser.trailers(req, buf).nil?
+          buf << socket.read
+        end
+        self.socket = nil
+      end
+
+    end
+  end
+end
diff --git a/lib/rainbows/thread_base.rb b/lib/rainbows/thread_base.rb
new file mode 100644
index 0000000..e544772
--- /dev/null
+++ b/lib/rainbows/thread_base.rb
@@ -0,0 +1,60 @@
+
+module Rainbows
+
+  module ThreadBase
+
+    include Unicorn
+    include Rainbows::Const
+
+    # write a response without caring if it went out or not
+    # This is in the case of untrappable errors
+    def emergency_response(client, response_str)
+      client.write_nonblock(response_str) rescue nil
+      client.close rescue nil
+    end
+
+    # once a client is accepted, it is processed in its entirety here
+    # in 3 easy steps: read request, call app, write app response
+    def process_client(client)
+      buf = client.readpartial(CHUNK_SIZE)
+      hp = HttpParser.new
+      env = {}
+      remote_addr = TCPSocket === client ? client.peeraddr.last : LOCALHOST
+
+      begin
+        while ! hp.headers(env, buf)
+          buf << client.readpartial(CHUNK_SIZE)
+        end
+
+        env[RACK_INPUT] = 0 == hp.content_length ?
+                 HttpRequest::NULL_IO :
+                 Unicorn::TeeInput.new(client, env, hp, buf)
+        env[REMOTE_ADDR] = remote_addr
+        response = app.call(env.update(RACK_DEFAULTS))
+
+        if 100 == response.first.to_i
+          client.write(EXPECT_100_RESPONSE)
+          env.delete(HTTP_EXPECT)
+          response = app.call(env)
+        end
+
+        out = [ hp.keepalive? ? CONN_ALIVE : CONN_CLOSE ] if hp.headers?
+        HttpResponse.write(client, response, out)
+      end while hp.keepalive? and hp.reset.nil? and env.clear
+      client.close
+    # if we get any error, try to write something back to the client
+    # assuming we haven't closed the socket, but don't get hung up
+    # if the socket is already closed or broken.  We'll always ensure
+    # the socket is closed at the end of this function
+    rescue EOFError,Errno::ECONNRESET,Errno::EPIPE,Errno::EINVAL,Errno::EBADF
+      emergency_response(client, ERROR_500_RESPONSE)
+    rescue HttpParserError # try to tell the client they're bad
+      buf.empty? or emergency_response(client, ERROR_400_RESPONSE)
+    rescue Object => e
+      emergency_response(client, ERROR_500_RESPONSE)
+      logger.error "Read error: #{e.inspect}"
+      logger.error e.backtrace.join("\n")
+    end
+  end
+end
+
diff --git a/lib/rainbows/thread_pool.rb b/lib/rainbows/thread_pool.rb
new file mode 100644
index 0000000..058205f
--- /dev/null
+++ b/lib/rainbows/thread_pool.rb
@@ -0,0 +1,84 @@
+module Rainbows
+
+  module ThreadPool
+
+    include ThreadBase
+
+    HttpServer.constants.each  { |x| const_set(x, HttpServer.const_get(x)) }
+
+    def worker_loop(worker)
+      init_worker_process(worker)
+      threads = ThreadGroup.new
+      alive = worker.tmp
+      nr = 0
+
+      # closing anything we IO.select on will raise EBADF
+      trap(:USR1) { reopen_worker_logs(worker.nr) rescue nil }
+      trap(:QUIT) { alive = false; LISTENERS.map! { |s| s.close rescue nil } }
+      [:TERM, :INT].each { |sig| trap(sig) { exit(0) } } # instant shutdown
+      logger.info "worker=#{worker.nr} ready with ThreadPool"
+
+      while alive && master_pid == Process.ppid
+        maintain_thread_count(threads)
+        threads.list.each do |thr|
+          alive.chmod(nr += 1)
+          thr.join(timeout / 2.0) and break
+        end
+      end
+      join_worker_threads(threads)
+    end
+
+    def join_worker_threads(threads)
+      logger.info "Joining worker threads..."
+      t0 = Time.now
+      timeleft = timeout
+      threads.list.each { |thr|
+        thr.join(timeleft)
+        timeleft -= (Time.now - t0)
+      }
+      logger.info "Done joining worker threads."
+    end
+
+    def maintain_thread_count(threads)
+      threads.list.each do |thr|
+        next if (Time.now - (thr[:t] || next)) < timeout
+        thr.kill! # take no prisoners for timeout violations
+        logger.error "killed #{thr.inspect} for being too old"
+      end
+
+      while threads.list.size < worker_connections
+        threads.add(new_worker_thread)
+      end
+    end
+
+    def new_worker_thread
+      Thread.new {
+        alive = true
+        thr = Thread.current
+        begin
+          ret = begin
+            thr[:t] = Time.now
+            IO.select(LISTENERS, nil, nil, timeout/2.0) or next
+          rescue Errno::EINTR
+            retry
+          rescue Errno::EBADF
+            return
+          end
+          ret.first.each do |sock|
+            begin
+              process_client(sock.accept_nonblock)
+              thr[:t] = Time.now
+            rescue Errno::EAGAIN, Errno::ECONNABORTED
+            end
+          end
+        rescue Object => e
+          if alive
+            logger.error "Unhandled listen loop exception #{e.inspect}."
+            logger.error e.backtrace.join("\n")
+          end
+        end while alive = LISTENERS.first
+      }
+    end
+
+  end
+end