From 37a12997628fcab722512f8a6370b92d44e33529 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Fri, 2 Oct 2009 20:44:03 -0700 Subject: initial revision No tests yet, but the old "gossamer" and "rainbows" branches seem to be basically working. --- lib/rainbows/configurator.rb | 25 ++++++++ lib/rainbows/const.rb | 22 +++++++ lib/rainbows/http_response.rb | 35 +++++++++++ lib/rainbows/http_server.rb | 34 +++++++++++ lib/rainbows/revactor.rb | 115 +++++++++++++++++++++++++++++++++++++ lib/rainbows/revactor/tee_input.rb | 44 ++++++++++++++ lib/rainbows/thread_base.rb | 60 +++++++++++++++++++ lib/rainbows/thread_pool.rb | 84 +++++++++++++++++++++++++++ 8 files changed, 419 insertions(+) create mode 100644 lib/rainbows/configurator.rb create mode 100644 lib/rainbows/const.rb create mode 100644 lib/rainbows/http_response.rb create mode 100644 lib/rainbows/http_server.rb create mode 100644 lib/rainbows/revactor.rb create mode 100644 lib/rainbows/revactor/tee_input.rb create mode 100644 lib/rainbows/thread_base.rb create mode 100644 lib/rainbows/thread_pool.rb (limited to 'lib/rainbows') diff --git a/lib/rainbows/configurator.rb b/lib/rainbows/configurator.rb new file mode 100644 index 0000000..449cdd9 --- /dev/null +++ b/lib/rainbows/configurator.rb @@ -0,0 +1,25 @@ +require 'rainbows' +module Rainbows + + class Configurator < ::Unicorn::Configurator + + def use(model) + begin + model = Rainbows.const_get(model) + rescue NameError + raise ArgumentError, "concurrency model #{model.inspect} not supported" + end + + Module === model or + raise ArgumentError, "concurrency model #{model.inspect} not supported" + set[:use] = model + end + + def worker_connections(nr) + (Integer === nr && nr > 0) || nr.nil? or + raise ArgumentError, "worker_connections must be an Integer or nil" + end + + end + +end diff --git a/lib/rainbows/const.rb b/lib/rainbows/const.rb new file mode 100644 index 0000000..9606b80 --- /dev/null +++ b/lib/rainbows/const.rb @@ -0,0 +1,22 @@ +module Rainbows + + module Const + RAINBOWS_VERSION = '0.93.0' + + include Unicorn::Const + + RACK_DEFAULTS = ::Unicorn::HttpRequest::DEFAULTS.merge({ + + # we need to observe many of the rules for thread-safety even + # with Revactor or Rev, so we're considered multithread-ed even + # when we're not technically... + "rack.multithread" => true, + "SERVER_SOFTWARE" => "Rainbows #{RAINBOWS_VERSION}", + }) + + CONN_CLOSE = "Connection: close\r\n" + CONN_ALIVE = "Connection: keep-alive\r\n" + LOCALHOST = "127.0.0.1" + + end +end diff --git a/lib/rainbows/http_response.rb b/lib/rainbows/http_response.rb new file mode 100644 index 0000000..ebaa4e7 --- /dev/null +++ b/lib/rainbows/http_response.rb @@ -0,0 +1,35 @@ +# -*- encoding: binary -*- +require 'time' +require 'rainbows' + +module Rainbows + + class HttpResponse < ::Unicorn::HttpResponse + + def self.write(socket, rack_response, out = []) + status, headers, body = rack_response + + if Array === out + status = CODES[status.to_i] || status + + headers.each do |key, value| + next if SKIP.include?(key.downcase) + if value =~ /\n/ + out.concat(value.split(/\n/).map! { |v| "#{key}: #{v}\r\n" }) + else + out << "#{key}: #{value}\r\n" + end + end + + socket.write("HTTP/1.1 #{status}\r\n" \ + "Date: #{Time.now.httpdate}\r\n" \ + "Status: #{status}\r\n" \ + "#{out.join('')}\r\n") + end + + body.each { |chunk| socket.write(chunk) } + ensure + body.respond_to?(:close) and body.close rescue nil + end + end +end diff --git a/lib/rainbows/http_server.rb b/lib/rainbows/http_server.rb new file mode 100644 index 0000000..355f3c5 --- /dev/null +++ b/lib/rainbows/http_server.rb @@ -0,0 +1,34 @@ +# -*- encoding: binary -*- +require 'rainbows' +module Rainbows + + class HttpServer < ::Unicorn::HttpServer + include Rainbows + + attr_accessor :worker_connections + attr_reader :use + + def initialize(app, options) + self.app = app + self.reexec_pid = 0 + self.init_listeners = options[:listeners] ? options[:listeners].dup : [] + self.config = Configurator.new(options.merge(:use_defaults => true)) + self.listener_opts = {} + config.commit!(self, :skip => [:listeners, :pid]) + + defined?(@use) or + self.use = Rainbows.const_get(:ThreadPool) + defined?(@worker_connections) or + @worker_connections = 4 + + #self.orig_app = app + end + + def use=(model) + (class << self; self; end).instance_eval { include model } + @use = model + end + + end + +end diff --git a/lib/rainbows/revactor.rb b/lib/rainbows/revactor.rb new file mode 100644 index 0000000..4c04079 --- /dev/null +++ b/lib/rainbows/revactor.rb @@ -0,0 +1,115 @@ +require 'rainbows' +require 'revactor' + +module Rainbows + + module Revactor + require 'rainbows/revactor/tee_input' + + include Unicorn + include Rainbows::Const + HttpServer.constants.each { |x| const_set(x, HttpServer.const_get(x)) } + + # once a client is accepted, it is processed in its entirety here + # in 3 easy steps: read request, call app, write app response + def process_client(client) + buf = client.read or return # this probably does not happen... + hp = HttpParser.new + env = {} + remote_addr = client.remote_addr + + begin + while ! hp.headers(env, buf) + buf << client.read + end + + env[Const::RACK_INPUT] = 0 == hp.content_length ? + HttpRequest::NULL_IO : + Rainbows::Revactor::TeeInput.new(client, env, hp, buf) + env[Const::REMOTE_ADDR] = remote_addr + response = app.call(env.update(RACK_DEFAULTS)) + + if 100 == response.first.to_i + client.write(Const::EXPECT_100_RESPONSE) + env.delete(Const::HTTP_EXPECT) + response = app.call(env) + end + + out = [ hp.keepalive? ? CONN_ALIVE : CONN_CLOSE ] if hp.headers? + HttpResponse.write(client, response, out) + end while hp.keepalive? and hp.reset.nil? and env.clear + client.close + # if we get any error, try to write something back to the client + # assuming we haven't closed the socket, but don't get hung up + # if the socket is already closed or broken. We'll always ensure + # the socket is closed at the end of this function + rescue EOFError,Errno::ECONNRESET,Errno::EPIPE,Errno::EINVAL,Errno::EBADF + emergency_response(client, Const::ERROR_500_RESPONSE) + rescue HttpParserError # try to tell the client they're bad + buf.empty? or emergency_response(client, Const::ERROR_400_RESPONSE) + rescue Object => e + emergency_response(client, Const::ERROR_500_RESPONSE) + logger.error "Read error: #{e.inspect}" + logger.error e.backtrace.join("\n") + end + + # runs inside each forked worker, this sits around and waits + # for connections and doesn't die until the parent dies (or is + # given a INT, QUIT, or TERM signal) + def worker_loop(worker) + ppid = master_pid + init_worker_process(worker) + alive = worker.tmp # tmp is our lifeline to the master process + + trap(:USR1) { reopen_worker_logs(worker.nr) } + trap(:QUIT) { alive = false; LISTENERS.each { |s| s.close rescue nil } } + [:TERM, :INT].each { |sig| trap(sig) { exit!(0) } } # instant shutdown + + Actor.current.trap_exit = true + + listeners = LISTENERS.map do |s| + TCPServer === s ? ::Revactor::TCP.listen(s, nil) : nil + end.compact + + logger.info "worker=#{worker.nr} ready with Revactor" + clients = [] + + listeners.map! do |s| + Actor.spawn(s) do |l| + begin + clients << Actor.spawn(l.accept) { |c| process_client(c) } + rescue Errno::EAGAIN, Errno::ECONNABORTED + rescue Object => e + if alive + logger.error "Unhandled listen loop exception #{e.inspect}." + logger.error e.backtrace.join("\n") + end + end while alive + end + end + + nr = 0 + begin + Actor.sleep 1 + clients.delete_if { |c| c.dead? } + if alive + alive.chmod(nr = 0 == nr ? 1 : 0) + ppid == Process.ppid or alive = false + end + end while alive || ! clients.empty? + end + + private + + # write a response without caring if it went out or not + # This is in the case of untrappable errors + def emergency_response(client, response_str) + client.instance_eval do + # this is Revactor implementation dependent + @_io.write_nonblock(response_str) rescue nil + end + client.close rescue nil + end + + end +end diff --git a/lib/rainbows/revactor/tee_input.rb b/lib/rainbows/revactor/tee_input.rb new file mode 100644 index 0000000..92effb4 --- /dev/null +++ b/lib/rainbows/revactor/tee_input.rb @@ -0,0 +1,44 @@ +# -*- encoding: binary -*- +require 'rainbows/revactor' + +module Rainbows + module Revactor + + # acts like tee(1) on an input input to provide a input-like stream + # while providing rewindable semantics through a File/StringIO + # backing store. On the first pass, the input is only read on demand + # so your Rack application can use input notification (upload progress + # and like). This should fully conform to the Rack::InputWrapper + # specification on the public API. This class is intended to be a + # strict interpretation of Rack::InputWrapper functionality and will + # not support any deviations from it. + class TeeInput < ::Unicorn::TeeInput + + private + + # tees off a +length+ chunk of data from the input into the IO + # backing store as well as returning it. +dst+ must be specified. + # returns nil if reading from the input returns nil + def tee(length, dst) + unless parser.body_eof? + begin + if parser.filter_body(dst, buf << socket.read).nil? + @tmp.write(dst) + return dst + end + rescue EOFError + end + end + finalize_input + end + + def finalize_input + while parser.trailers(req, buf).nil? + buf << socket.read + end + self.socket = nil + end + + end + end +end diff --git a/lib/rainbows/thread_base.rb b/lib/rainbows/thread_base.rb new file mode 100644 index 0000000..e544772 --- /dev/null +++ b/lib/rainbows/thread_base.rb @@ -0,0 +1,60 @@ + +module Rainbows + + module ThreadBase + + include Unicorn + include Rainbows::Const + + # write a response without caring if it went out or not + # This is in the case of untrappable errors + def emergency_response(client, response_str) + client.write_nonblock(response_str) rescue nil + client.close rescue nil + end + + # once a client is accepted, it is processed in its entirety here + # in 3 easy steps: read request, call app, write app response + def process_client(client) + buf = client.readpartial(CHUNK_SIZE) + hp = HttpParser.new + env = {} + remote_addr = TCPSocket === client ? client.peeraddr.last : LOCALHOST + + begin + while ! hp.headers(env, buf) + buf << client.readpartial(CHUNK_SIZE) + end + + env[RACK_INPUT] = 0 == hp.content_length ? + HttpRequest::NULL_IO : + Unicorn::TeeInput.new(client, env, hp, buf) + env[REMOTE_ADDR] = remote_addr + response = app.call(env.update(RACK_DEFAULTS)) + + if 100 == response.first.to_i + client.write(EXPECT_100_RESPONSE) + env.delete(HTTP_EXPECT) + response = app.call(env) + end + + out = [ hp.keepalive? ? CONN_ALIVE : CONN_CLOSE ] if hp.headers? + HttpResponse.write(client, response, out) + end while hp.keepalive? and hp.reset.nil? and env.clear + client.close + # if we get any error, try to write something back to the client + # assuming we haven't closed the socket, but don't get hung up + # if the socket is already closed or broken. We'll always ensure + # the socket is closed at the end of this function + rescue EOFError,Errno::ECONNRESET,Errno::EPIPE,Errno::EINVAL,Errno::EBADF + emergency_response(client, ERROR_500_RESPONSE) + rescue HttpParserError # try to tell the client they're bad + buf.empty? or emergency_response(client, ERROR_400_RESPONSE) + rescue Object => e + emergency_response(client, ERROR_500_RESPONSE) + logger.error "Read error: #{e.inspect}" + logger.error e.backtrace.join("\n") + end + end +end + diff --git a/lib/rainbows/thread_pool.rb b/lib/rainbows/thread_pool.rb new file mode 100644 index 0000000..058205f --- /dev/null +++ b/lib/rainbows/thread_pool.rb @@ -0,0 +1,84 @@ +module Rainbows + + module ThreadPool + + include ThreadBase + + HttpServer.constants.each { |x| const_set(x, HttpServer.const_get(x)) } + + def worker_loop(worker) + init_worker_process(worker) + threads = ThreadGroup.new + alive = worker.tmp + nr = 0 + + # closing anything we IO.select on will raise EBADF + trap(:USR1) { reopen_worker_logs(worker.nr) rescue nil } + trap(:QUIT) { alive = false; LISTENERS.map! { |s| s.close rescue nil } } + [:TERM, :INT].each { |sig| trap(sig) { exit(0) } } # instant shutdown + logger.info "worker=#{worker.nr} ready with ThreadPool" + + while alive && master_pid == Process.ppid + maintain_thread_count(threads) + threads.list.each do |thr| + alive.chmod(nr += 1) + thr.join(timeout / 2.0) and break + end + end + join_worker_threads(threads) + end + + def join_worker_threads(threads) + logger.info "Joining worker threads..." + t0 = Time.now + timeleft = timeout + threads.list.each { |thr| + thr.join(timeleft) + timeleft -= (Time.now - t0) + } + logger.info "Done joining worker threads." + end + + def maintain_thread_count(threads) + threads.list.each do |thr| + next if (Time.now - (thr[:t] || next)) < timeout + thr.kill! # take no prisoners for timeout violations + logger.error "killed #{thr.inspect} for being too old" + end + + while threads.list.size < worker_connections + threads.add(new_worker_thread) + end + end + + def new_worker_thread + Thread.new { + alive = true + thr = Thread.current + begin + ret = begin + thr[:t] = Time.now + IO.select(LISTENERS, nil, nil, timeout/2.0) or next + rescue Errno::EINTR + retry + rescue Errno::EBADF + return + end + ret.first.each do |sock| + begin + process_client(sock.accept_nonblock) + thr[:t] = Time.now + rescue Errno::EAGAIN, Errno::ECONNABORTED + end + end + rescue Object => e + if alive + logger.error "Unhandled listen loop exception #{e.inspect}." + logger.error e.backtrace.join("\n") + end + end while alive = LISTENERS.first + } + end + + end +end -- cgit v1.2.3-24-ge0c7