From d4a2b5dd2b85f4b2d3bb120ee1e1b0dde31bc25c Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Wed, 20 Oct 2010 17:48:58 -0700 Subject: unicorn 2.x updates + kgio We get basic internal API changes from Unicorn, code simplifications coming next. --- bin/rainbows | 35 +++++++++++++++------------------- lib/rainbows.rb | 5 ++--- lib/rainbows/base.rb | 24 +++++------------------ lib/rainbows/const.rb | 2 ++ lib/rainbows/ev_core.rb | 6 +++--- lib/rainbows/fiber/io.rb | 34 +++++++++++++++++++++++++++------ lib/rainbows/fiber/rev.rb | 19 ++++++++----------- lib/rainbows/http_server.rb | 2 +- lib/rainbows/max_body.rb | 38 ++++++++++++++++++------------------- lib/rainbows/revactor.rb | 32 ++++++++++++++++--------------- lib/rainbows/writer_thread_pool.rb | 8 ++++++++ lib/rainbows/writer_thread_spawn.rb | 8 ++++++++ rainbows.gemspec | 7 ++----- t/test_isolate.rb | 4 ++-- 14 files changed, 120 insertions(+), 104 deletions(-) diff --git a/bin/rainbows b/bin/rainbows index 685dd09..b3606ac 100644 --- a/bin/rainbows +++ b/bin/rainbows @@ -5,16 +5,13 @@ require 'rainbows' require 'optparse' ENV["RACK_ENV"] ||= "development" -daemonize = false -listeners = [] -options = { :listeners => listeners } -host, port = Unicorn::Const::DEFAULT_HOST, Unicorn::Const::DEFAULT_PORT -set_listener = false +rackup_opts = Unicorn::Configurator::RACKUP +options = rackup_opts[:options] opts = OptionParser.new("", 24, ' ') do |opts| - opts.banner = "Usage: #{File.basename($0)} " \ - "[ruby options] [unicorn options] [rackup config file]" - + cmd = File.basename($0) + opts.banner = "Usage: #{cmd} " \ + "[ruby options] [#{cmd} options] [rackup config file]" opts.separator "Ruby options:" lineno = 1 @@ -41,20 +38,20 @@ opts = OptionParser.new("", 24, ' ') do |opts| require library end - opts.separator "Rainbows!/Unicorn options:" + opts.separator "#{cmd} options:" # some of these switches exist for rackup command-line compatibility, opts.on("-o", "--host HOST", "listen on HOST (default: #{Unicorn::Const::DEFAULT_HOST})") do |h| - host = h - set_listener = true + rackup_opts[:host] = h + rackup_opts[:set_listener] = true end opts.on("-p", "--port PORT", "use PORT (default: #{Unicorn::Const::DEFAULT_PORT})") do |p| - port = p.to_i - set_listener = true + rackup_opts[:port] = p.to_i + rackup_opts[:set_listener] = true end opts.on("-E", "--env RACK_ENV", @@ -63,7 +60,7 @@ opts = OptionParser.new("", 24, ' ') do |opts| end opts.on("-D", "--daemonize", "run daemonized in the background") do |d| - daemonize = d ? true : false + rackup_opts[:daemonize] = !!d end opts.on("-P", "--pid FILE", "DEPRECATED") do |f| @@ -82,11 +79,10 @@ opts = OptionParser.new("", 24, ' ') do |opts| "listen on HOST:PORT or PATH", "this may be specified multiple times", "(default: #{Unicorn::Const::DEFAULT_LISTEN})") do |address| - listeners << address + options[:listeners] << address end - opts.on("-c", "--config-file FILE", - "Rainbows!/Unicorn-specific config file") do |f| + opts.on("-c", "--config-file FILE", "Rainbows!-specific config file") do |f| options[:config_file] = f end @@ -111,16 +107,15 @@ opts = OptionParser.new("", 24, ' ') do |opts| end app = Unicorn.builder(ARGV[0] || 'config.ru', opts) -listeners << "#{host}:#{port}" if set_listener if $DEBUG require 'pp' pp({ :unicorn_options => options, :app => app, - :daemonize => daemonize, + :daemonize => rackup_opts[:daemonize], }) end -Unicorn::Launcher.daemonize!(options) if daemonize +Unicorn::Launcher.daemonize!(options) if rackup_opts[:daemonize] Rainbows.run(app, options) diff --git a/lib/rainbows.rb b/lib/rainbows.rb index 0914609..58ba23f 100644 --- a/lib/rainbows.rb +++ b/lib/rainbows.rb @@ -77,10 +77,9 @@ module Rainbows # returns a string representing the address of the given client +io+ # For local UNIX domain sockets, this will return a string referred - # to by the (non-frozen) Unicorn::HttpRequest::LOCALHOST constant. + # to by the (non-frozen) Kgio::LOCALHOST constant. def addr(io) # :nodoc: - io.respond_to?(:peeraddr) ? - io.peeraddr[-1] : Unicorn::HttpRequest::LOCALHOST + io.respond_to?(:peeraddr) ? io.peeraddr[-1] : Kgio::LOCALHOST end # :stopdoc: diff --git a/lib/rainbows/base.rb b/lib/rainbows/base.rb index 59747c7..fe2cf31 100644 --- a/lib/rainbows/base.rb +++ b/lib/rainbows/base.rb @@ -27,18 +27,6 @@ module Rainbows::Base listeners = Rainbows::HttpServer::LISTENERS Rainbows::HttpServer::IO_PURGATORY.concat(listeners) - # no need for this when Unicorn uses Kgio - listeners.map! do |io| - case io - when TCPServer - Kgio::TCPServer.for_fd(io.fileno) - when UNIXServer - Kgio::UNIXServer.for_fd(io.fileno) - else - io - end - end - # we're don't use the self-pipe mechanism in the Rainbows! worker # since we don't defer reopening logs Rainbows::HttpServer::SELF_PIPE.each { |x| x.close }.clear @@ -57,20 +45,18 @@ module Rainbows::Base # this is used by synchronous concurrency models # Base, ThreadSpawn, ThreadPool def process_client(client) # :nodoc: - buf = client.readpartial(CHUNK_SIZE) # accept filters protect us here hp = HttpParser.new - env = {} + client.readpartial(CHUNK_SIZE, buf = hp.buf) remote_addr = Rainbows.addr(client) begin # loop - until hp.headers(env, buf) + until env = hp.parse wait_headers_readable(client) or return buf << client.readpartial(CHUNK_SIZE) end env[CLIENT_IO] = client - env[RACK_INPUT] = 0 == hp.content_length ? - NULL_IO : TeeInput.new(client, env, hp, buf) + env[RACK_INPUT] = 0 == hp.content_length ? NULL_IO : TeeInput.new(client, hp) env[REMOTE_ADDR] = remote_addr status, headers, body = app.call(env.update(RACK_DEFAULTS)) @@ -83,12 +69,12 @@ module Rainbows::Base if hp.headers? headers = HH.new(headers) range = make_range!(env, status, headers) and status = range.shift - env = false unless hp.keepalive? && G.alive + env = hp.keepalive? && G.alive headers[CONNECTION] = env ? KEEP_ALIVE : CLOSE client.write(response_header(status, headers)) end write_body(client, body, range) - end while env && env.clear && hp.reset.nil? + end while env && hp.reset.nil? # if we get any error, try to write something back to the client # assuming we haven't closed the socket, but don't get hung up # if the socket is already closed or broken. We'll always ensure diff --git a/lib/rainbows/const.rb b/lib/rainbows/const.rb index 91d4e44..1c77c76 100644 --- a/lib/rainbows/const.rb +++ b/lib/rainbows/const.rb @@ -24,5 +24,7 @@ module Rainbows ERROR_413_RESPONSE = "HTTP/1.1 413 Request Entity Too Large\r\n\r\n" ERROR_416_RESPONSE = "HTTP/1.1 416 Requested Range Not Satisfiable\r\n\r\n" + RACK_INPUT = Unicorn::HttpRequest::RACK_INPUT + REMOTE_ADDR = Unicorn::HttpRequest::REMOTE_ADDR end end diff --git a/lib/rainbows/ev_core.rb b/lib/rainbows/ev_core.rb index bf00eed..9761144 100644 --- a/lib/rainbows/ev_core.rb +++ b/lib/rainbows/ev_core.rb @@ -88,7 +88,7 @@ module Rainbows class CapInput < Struct.new(:io, :client, :bytes_left) MAX_BODY = Unicorn::Const::MAX_BODY - Util = Unicorn::Util + TmpIO = Unicorn::TmpIO def self.err(client, msg) client.write(Const::ERROR_413_RESPONSE) @@ -104,9 +104,9 @@ module Rainbows if max && (len > max) err(client, "Content-Length too big: #{len} > #{max}") end - len <= MAX_BODY ? StringIO.new("") : Util.tmpio + len <= MAX_BODY ? StringIO.new("") : TmpIO.new else - max ? super(Util.tmpio, client, max) : Util.tmpio + max ? super(TmpIO.new, client, max) : TmpIO.new end end diff --git a/lib/rainbows/fiber/io.rb b/lib/rainbows/fiber/io.rb index f83b8b7..571f070 100644 --- a/lib/rainbows/fiber/io.rb +++ b/lib/rainbows/fiber/io.rb @@ -10,7 +10,7 @@ module Rainbows # TODO: subclass off IO and include Kgio::SocketMethods instead class IO < Struct.new(:to_io, :f) # :stopdoc: - LOCALHOST = Unicorn::HttpRequest::LOCALHOST + LOCALHOST = Kgio::LOCALHOST # needed to write errors with def write_nonblock(buf) @@ -82,14 +82,36 @@ module Rainbows end def readpartial(length, buf = "") - begin - to_io.read_nonblock(length, buf) - rescue Errno::EAGAIN - wait_readable - retry + if to_io.respond_to?(:kgio_tryread) + # TODO: use kgio_read! + begin + rv = to_io.kgio_tryread(length, buf) + case rv + when nil + raise EOFError, "end of file reached", [] + when Kgio::WaitReadable + wait_readable + else + return rv + end + end while true + else + begin + to_io.read_nonblock(length, buf) + rescue Errno::EAGAIN + wait_readable + retry + end end end + def kgio_read(*args) + to_io.kgio_read(*args) + end + + def kgio_read!(*args) + to_io.kgio_read!(*args) + end end end end diff --git a/lib/rainbows/fiber/rev.rb b/lib/rainbows/fiber/rev.rb index 2c1abb7..6969f5b 100644 --- a/lib/rainbows/fiber/rev.rb +++ b/lib/rainbows/fiber/rev.rb @@ -78,16 +78,17 @@ module Rainbows::Fiber def process(io) G.cur += 1 client = FIO.new(io, ::Fiber.current) - buf = client.read_timeout or return hp = HttpParser.new - env = {} + client.readpartial(16384, buf = hp.buf) begin # loop - buf << (client.read_timeout or return) until hp.headers(env, buf) + until env = hp.parse + buf << (client.read_timeout or return) + end env[CLIENT_IO] = client env[RACK_INPUT] = 0 == hp.content_length ? - HttpRequest::NULL_IO : TeeInput.new(client, env, hp, buf) + HttpRequest::NULL_IO : TeeInput.new(client, hp) env[REMOTE_ADDR] = io.kgio_addr status, headers, body = APP.call(env.update(RACK_DEFAULTS)) @@ -100,16 +101,12 @@ module Rainbows::Fiber if hp.headers? headers = HH.new(headers) range = make_range!(env, status, headers) and status = range.shift - headers[CONNECTION] = if hp.keepalive? && G.alive - KEEP_ALIVE - else - env = false - CLOSE - end + env = hp.keepalive? && G.alive + headers[CONNECTION] = env ? KEEP_ALIVE : CLOSE client.write(response_header(status, headers)) end write_body(client, body, range) - end while env && env.clear && hp.reset.nil? + end while env && hp.reset.nil? rescue => e Error.write(io, e) ensure diff --git a/lib/rainbows/http_server.rb b/lib/rainbows/http_server.rb index a5cb054..0ed6717 100644 --- a/lib/rainbows/http_server.rb +++ b/lib/rainbows/http_server.rb @@ -36,7 +36,7 @@ module Rainbows # fact that we let clients keep idle connections open for long # periods of time means we have to chmod at a fixed interval. def timeout=(nr) - super(nr + 1) + @timeout = nr + 1 end #:startdoc: diff --git a/lib/rainbows/max_body.rb b/lib/rainbows/max_body.rb index 23e4fa6..d825d2f 100644 --- a/lib/rainbows/max_body.rb +++ b/lib/rainbows/max_body.rb @@ -11,32 +11,32 @@ class MaxBody < Struct.new(:app) # this is meant to be included in Rainbows::TeeInput (and derived # classes) to limit body sizes module Limit - Util = Unicorn::Util + TmpIO = Unicorn::TmpIO - def initialize(socket, req, parser, buf) - self.len = parser.content_length + def initialize(socket, request) + @parser = request + @buf = request.buf + @env = request.env + @len = request.content_length max = Rainbows.max_bytes # never nil, see MaxBody.setup - if len && len > max + if @len && @len > max socket.write(Const::ERROR_413_RESPONSE) socket.close - raise IOError, "Content-Length too big: #{len} > #{max}", [] + raise IOError, "Content-Length too big: #@len > #{max}", [] end - self.req = req - self.parser = parser - self.buf = buf - self.socket = socket - self.buf2 = "" - if buf.size > 0 - parser.filter_body(buf2, buf) and finalize_input - buf2.size > max and raise IOError, "chunked request body too big", [] + @socket = socket + @buf2 = "" + if @buf.size > 0 + parser.filter_body(@buf2, @buf) and finalize_input + @buf2.size > max and raise IOError, "chunked request body too big", [] end - self.tmp = len && len < Const::MAX_BODY ? StringIO.new("") : Util.tmpio - if buf2.size > 0 - tmp.write(buf2) - tmp.seek(0) - max -= buf2.size + @tmp = @len && @len < Const::MAX_BODY ? StringIO.new("") : TmpIO.new + if @buf2.size > 0 + @tmp.write(@buf2) + @tmp.rewind + max -= @buf2.size end @max_body = max end @@ -46,7 +46,7 @@ class MaxBody < Struct.new(:app) if rv && ((@max_body -= rv.size) < 0) # make HttpParser#keepalive? => false to force an immediate disconnect # after we write - parser.reset + @parser.reset throw :rainbows_EFBIG end rv diff --git a/lib/rainbows/revactor.rb b/lib/rainbows/revactor.rb index eae7673..a0b4bbf 100644 --- a/lib/rainbows/revactor.rb +++ b/lib/rainbows/revactor.rb @@ -26,7 +26,7 @@ module Rainbows::Revactor autoload :Proxy, 'rainbows/revactor/proxy' include Rainbows::Base - LOCALHOST = Unicorn::HttpRequest::LOCALHOST + LOCALHOST = Kgio::LOCALHOST TCP = ::Revactor::TCP::Socket # once a client is accepted, it is processed in its entirety here @@ -41,16 +41,17 @@ module Rainbows::Revactor else LOCALHOST end - buf = client.read(*rd_args) - hp = HttpParser.new - env = {} + hp = Unicorn::HttpParser.new + buf = hp.buf begin - buf << client.read(*rd_args) until hp.headers(env, buf) + until env = hp.parse + buf << client.read(*rd_args) + end env[CLIENT_IO] = client env[RACK_INPUT] = 0 == hp.content_length ? - NULL_IO : TeeInput.new(PartialSocket.new(client), env, hp, buf) + NULL_IO : TeeInput.new(TeeSocket.new(client), hp) env[REMOTE_ADDR] = remote_addr status, headers, body = app.call(env.update(RACK_DEFAULTS)) @@ -63,12 +64,12 @@ module Rainbows::Revactor if hp.headers? headers = HH.new(headers) range = make_range!(env, status, headers) and status = range.shift - env = false unless hp.keepalive? && G.alive && G.kato > 0 + env = hp.keepalive? && G.alive && G.kato > 0 headers[CONNECTION] = env ? KEEP_ALIVE : CLOSE client.write(response_header(status, headers)) end write_body(client, body, range) - end while env && env.clear && hp.reset.nil? + end while env && hp.reset.nil? rescue ::Revactor::TCP::ReadError rescue => e Rainbows::Error.write(io, e) @@ -146,36 +147,37 @@ module Rainbows::Revactor # enough to avoid mucking with TeeInput internals. Fortunately # this code is not heavily used so we can usually avoid the overhead # of adding a userspace buffer. - class PartialSocket < Struct.new(:socket, :rbuf) + class TeeSocket def initialize(socket) # IO::Buffer is used internally by Rev which Revactor is based on # so we'll always have it available - super(socket, IO::Buffer.new) + @socket, @rbuf = socket, IO::Buffer.new end # Revactor socket reads always return an unspecified amount, # sometimes too much - def readpartial(length, dst = "") + def kgio_read(length, dst = "") return dst.replace("") if length == 0 # always check and return from the userspace buffer first - rbuf.size > 0 and return dst.replace(rbuf.read(length)) + @rbuf.size > 0 and return dst.replace(@rbuf.read(length)) # read off the socket since there was nothing in rbuf - tmp = socket.read + tmp = @socket.read # we didn't read too much, good, just return it straight back # to avoid needlessly wasting memory bandwidth tmp.size <= length and return dst.replace(tmp) # ugh, read returned too much - rbuf << tmp[length, tmp.size] + @rbuf << tmp[length, tmp.size] dst.replace(tmp[0, length]) + rescue EOFError end # just proxy any remaining methods TeeInput may use def close - socket.close + @socket.close end end diff --git a/lib/rainbows/writer_thread_pool.rb b/lib/rainbows/writer_thread_pool.rb index c4d8d9f..5c8e2a3 100644 --- a/lib/rainbows/writer_thread_pool.rb +++ b/lib/rainbows/writer_thread_pool.rb @@ -29,6 +29,14 @@ module Rainbows to_io.readpartial(size, buf) end + def kgio_read(size, buf = "") + to_io.kgio_read(size, buf) + end + + def kgio_read!(size, buf = "") + to_io.kgio_read!(size, buf) + end + def write_nonblock(buf) to_io.write_nonblock(buf) end diff --git a/lib/rainbows/writer_thread_spawn.rb b/lib/rainbows/writer_thread_spawn.rb index 17aa835..dfd6c39 100644 --- a/lib/rainbows/writer_thread_spawn.rb +++ b/lib/rainbows/writer_thread_spawn.rb @@ -34,6 +34,14 @@ module Rainbows to_io.readpartial(size, buf) end + def kgio_read(size, buf = "") + to_io.kgio_read(size, buf) + end + + def kgio_read!(size, buf = "") + to_io.kgio_read!(size, buf) + end + def write_nonblock(buf) to_io.write_nonblock(buf) end diff --git a/rainbows.gemspec b/rainbows.gemspec index 95442d2..6d5f990 100644 --- a/rainbows.gemspec +++ b/rainbows.gemspec @@ -44,11 +44,8 @@ Gem::Specification.new do |s| s.add_dependency(%q, ['~> 1.1']) # we need Unicorn for the HTTP parser and process management - # Unicorn 0.991.0 handles config.ru when started outside of - # the prespecified working_directory - s.add_dependency(%q, [">= 1.1.3", "< 2.0.0"]) - s.add_dependency(%q, ["~> 1.0.1"]) - s.add_development_dependency(%q, "~> 2.1.0") + s.add_dependency(%q, ["~> 2.0.0pre3"]) + s.add_development_dependency(%q, "~> 2.1.2") # optional runtime dependencies depending on configuration # see t/test_isolate.rb for the exact versions we've tested with diff --git a/t/test_isolate.rb b/t/test_isolate.rb index d39d7be..3f7fdef 100644 --- a/t/test_isolate.rb +++ b/t/test_isolate.rb @@ -15,8 +15,8 @@ $stdout.reopen($stderr) Isolate.now!(opts) do gem 'rack', '1.1.0' # Cramp currently requires ~> 1.1.0 - gem 'kgio', '1.0.1' - gem 'unicorn', '1.1.3' + gem 'kgio', '1.3.1' + gem 'unicorn', '2.0.0pre3' gem 'kcar', '0.1.1' if engine == "ruby" -- cgit v1.2.3-24-ge0c7