diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/unicorn.rb | 205 | ||||
-rw-r--r-- | lib/unicorn/app/exec_cgi.rb | 63 | ||||
-rw-r--r-- | lib/unicorn/app/inetd.rb | 106 | ||||
-rw-r--r-- | lib/unicorn/app/old_rails/static.rb | 20 | ||||
-rw-r--r-- | lib/unicorn/chunked_reader.rb | 94 | ||||
-rw-r--r-- | lib/unicorn/configurator.rb | 55 | ||||
-rw-r--r-- | lib/unicorn/const.rb | 6 | ||||
-rw-r--r-- | lib/unicorn/http_request.rb | 92 | ||||
-rw-r--r-- | lib/unicorn/http_response.rb | 3 | ||||
-rw-r--r-- | lib/unicorn/tee_input.rb | 135 | ||||
-rw-r--r-- | lib/unicorn/trailer_parser.rb | 52 |
11 files changed, 585 insertions, 246 deletions
diff --git a/lib/unicorn.rb b/lib/unicorn.rb index aac530b..eb11f4d 100644 --- a/lib/unicorn.rb +++ b/lib/unicorn.rb @@ -1,4 +1,5 @@ require 'fcntl' +require 'tempfile' require 'unicorn/socket_helper' autoload :Rack, 'rack' @@ -10,8 +11,15 @@ module Unicorn autoload :HttpRequest, 'unicorn/http_request' autoload :HttpResponse, 'unicorn/http_response' autoload :Configurator, 'unicorn/configurator' + autoload :TeeInput, 'unicorn/tee_input' + autoload :ChunkedReader, 'unicorn/chunked_reader' + autoload :TrailerParser, 'unicorn/trailer_parser' autoload :Util, 'unicorn/util' + Z = '' # the stock empty string we use everywhere... + Z.force_encoding(Encoding::BINARY) if Z.respond_to?(:force_encoding) + Z.freeze + class << self def run(app, options = {}) HttpServer.new(app, options).start.join @@ -22,8 +30,12 @@ module Unicorn # processes which in turn handle the I/O and application process. # Listener sockets are started in the master process and shared with # forked worker children. - class HttpServer - attr_reader :logger + + class HttpServer < Struct.new(:listener_opts, :timeout, :worker_processes, + :before_fork, :after_fork, :before_exec, + :logger, :pid, :app, :preload_app, + :reexec_pid, :orig_app, :init_listeners, + :master_pid, :config) include ::Unicorn::SocketHelper # prevents IO objects in here from being GC-ed @@ -54,8 +66,7 @@ module Unicorn 0 => $0.dup, } - Worker = Struct.new(:nr, :tempfile) unless defined?(Worker) - class Worker + class Worker < Struct.new(:nr, :tempfile) # worker objects may be compared to just plain numbers def ==(other_nr) self.nr == other_nr @@ -67,14 +78,13 @@ module Unicorn # HttpServer.run.join to join the thread that's processing # incoming requests on the socket. def initialize(app, options = {}) - @app = app - @pid = nil - @reexec_pid = 0 - @init_listeners = options[:listeners] ? options[:listeners].dup : [] - @config = Configurator.new(options.merge(:use_defaults => true)) - @listener_opts = {} - @config.commit!(self, :skip => [:listeners, :pid]) - @orig_app = app + self.app = app + self.reexec_pid = 0 + self.init_listeners = options[:listeners] ? options[:listeners].dup : [] + self.config = Configurator.new(options.merge(:use_defaults => true)) + self.listener_opts = {} + config.commit!(self, :skip => [:listeners, :pid]) + self.orig_app = app end # Runs the thing. Returns self so you can run join on it @@ -85,13 +95,13 @@ module Unicorn # before they become UNIXServer or TCPServer inherited = ENV['UNICORN_FD'].to_s.split(/,/).map do |fd| io = Socket.for_fd(fd.to_i) - set_server_sockopt(io, @listener_opts[sock_name(io)]) + set_server_sockopt(io, listener_opts[sock_name(io)]) IO_PURGATORY << io logger.info "inherited addr=#{sock_name(io)} fd=#{fd}" server_cast(io) end - config_listeners = @config[:listeners].dup + config_listeners = config[:listeners].dup LISTENERS.replace(inherited) # we start out with generic Socket objects that get cast to either @@ -104,8 +114,9 @@ module Unicorn end config_listeners.each { |addr| listen(addr) } raise ArgumentError, "no listeners" if LISTENERS.empty? - self.pid = @config[:pid] - build_app! if @preload_app + self.pid = config[:pid] + self.master_pid = $$ + build_app! if preload_app maintain_worker_count self end @@ -123,8 +134,7 @@ module Unicorn end end set_names = listener_names(listeners) - dead_names += cur_names - set_names - dead_names.uniq! + dead_names.concat(cur_names - set_names).uniq! LISTENERS.delete_if do |io| if dead_names.include?(sock_name(io)) @@ -133,7 +143,7 @@ module Unicorn end (io.close rescue nil).nil? # true else - set_server_sockopt(io, @listener_opts[sock_name(io)]) + set_server_sockopt(io, listener_opts[sock_name(io)]) false end end @@ -144,28 +154,27 @@ module Unicorn def stdout_path=(path); redirect_io($stdout, path); end def stderr_path=(path); redirect_io($stderr, path); end - def logger=(obj) - REQUEST.logger = @logger = obj - end + alias_method :set_pid, :pid= + undef_method :pid= # sets the path for the PID file of the master process def pid=(path) if path if x = valid_pid?(path) - return path if @pid && path == @pid && x == $$ + return path if pid && path == pid && x == $$ raise ArgumentError, "Already running on PID:#{x} " \ "(or pid=#{path} is stale)" end end - unlink_pid_safe(@pid) if @pid + unlink_pid_safe(pid) if pid File.open(path, 'wb') { |fp| fp.syswrite("#$$\n") } if path - @pid = path + self.set_pid(path) end # add a given address to the +listeners+ set, idempotently # Allows workers to add a private, per-process listener via the - # @after_fork hook. Very useful for debugging and testing. - def listen(address, opt = {}.merge(@listener_opts[address] || {})) + # after_fork hook. Very useful for debugging and testing. + def listen(address, opt = {}.merge(listener_opts[address] || {})) return if String === address && listener_names.include?(address) delay, tries = 0.5, 5 @@ -231,12 +240,12 @@ module Unicorn logger.info "SIGWINCH ignored because we're not daemonized" end when :TTIN - @worker_processes += 1 + self.worker_processes += 1 when :TTOU - @worker_processes -= 1 if @worker_processes > 0 + self.worker_processes -= 1 if self.worker_processes > 0 when :HUP respawn = true - if @config.config_file + if config.config_file load_config! redo # immediate reaping since we may have QUIT workers else # exec binary and exit if there's no config file @@ -255,14 +264,14 @@ module Unicorn end stop # gracefully shutdown all workers on our way out logger.info "master complete" - unlink_pid_safe(@pid) if @pid + unlink_pid_safe(pid) if pid end # Terminates all workers, but does not exit master process def stop(graceful = true) self.listeners = [] kill_each_worker(graceful ? :QUIT : :TERM) - timeleft = @timeout + timeleft = timeout step = 0.2 reap_all_workers until WORKERS.empty? @@ -315,15 +324,15 @@ module Unicorn def reap_all_workers begin loop do - pid, status = Process.waitpid2(-1, Process::WNOHANG) - pid or break - if @reexec_pid == pid + wpid, status = Process.waitpid2(-1, Process::WNOHANG) + wpid or break + if reexec_pid == wpid logger.error "reaped #{status.inspect} exec()-ed" - @reexec_pid = 0 - self.pid = @pid.chomp('.oldbin') if @pid + self.reexec_pid = 0 + self.pid = pid.chomp('.oldbin') if pid proc_name 'master' else - worker = WORKERS.delete(pid) and worker.tempfile.close rescue nil + worker = WORKERS.delete(wpid) and worker.tempfile.close rescue nil logger.info "reaped #{status.inspect} " \ "worker=#{worker.nr rescue 'unknown'}" end @@ -334,19 +343,19 @@ module Unicorn # reexecutes the START_CTX with a new binary def reexec - if @reexec_pid > 0 + if reexec_pid > 0 begin - Process.kill(0, @reexec_pid) - logger.error "reexec-ed child already running PID:#{@reexec_pid}" + Process.kill(0, reexec_pid) + logger.error "reexec-ed child already running PID:#{reexec_pid}" return rescue Errno::ESRCH - @reexec_pid = 0 + reexec_pid = 0 end end - if @pid - old_pid = "#{@pid}.oldbin" - prev_pid = @pid.dup + if pid + old_pid = "#{pid}.oldbin" + prev_pid = pid.dup begin self.pid = old_pid # clear the path for a new pid file rescue ArgumentError @@ -359,7 +368,7 @@ module Unicorn end end - @reexec_pid = fork do + self.reexec_pid = fork do listener_fds = LISTENERS.map { |sock| sock.fileno } ENV['UNICORN_FD'] = listener_fds.join(',') Dir.chdir(START_CTX[:cwd]) @@ -376,38 +385,38 @@ module Unicorn io.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) end logger.info "executing #{cmd.inspect} (in #{Dir.pwd})" - @before_exec.call(self) + before_exec.call(self) exec(*cmd) end proc_name 'master (old)' end - # forcibly terminate all workers that haven't checked in in @timeout + # forcibly terminate all workers that haven't checked in in timeout # seconds. The timeout is implemented using an unlinked tempfile # shared between the parent process and each worker. The worker # runs File#chmod to modify the ctime of the tempfile. If the ctime - # is stale for >@timeout seconds, then we'll kill the corresponding + # is stale for >timeout seconds, then we'll kill the corresponding # worker. def murder_lazy_workers diff = stat = nil - WORKERS.dup.each_pair do |pid, worker| + WORKERS.dup.each_pair do |wpid, worker| stat = begin worker.tempfile.stat rescue => e - logger.warn "worker=#{worker.nr} PID:#{pid} stat error: #{e.inspect}" - kill_worker(:QUIT, pid) + logger.warn "worker=#{worker.nr} PID:#{wpid} stat error: #{e.inspect}" + kill_worker(:QUIT, wpid) next end stat.mode == 0100000 and next - (diff = (Time.now - stat.ctime)) <= @timeout and next - logger.error "worker=#{worker.nr} PID:#{pid} timeout " \ - "(#{diff}s > #{@timeout}s), killing" - kill_worker(:KILL, pid) # take no prisoners for @timeout violations + (diff = (Time.now - stat.ctime)) <= timeout and next + logger.error "worker=#{worker.nr} PID:#{wpid} timeout " \ + "(#{diff}s > #{timeout}s), killing" + kill_worker(:KILL, wpid) # take no prisoners for timeout violations end end def spawn_missing_workers - (0...@worker_processes).each do |worker_nr| + (0...worker_processes).each do |worker_nr| WORKERS.values.include?(worker_nr) and next begin Dir.chdir(START_CTX[:cwd]) @@ -419,25 +428,32 @@ module Unicorn tempfile = Tempfile.new(nil) # as short as possible to save dir space tempfile.unlink # don't allow other processes to find or see it worker = Worker.new(worker_nr, tempfile) - @before_fork.call(self, worker) - pid = fork { worker_loop(worker) } - WORKERS[pid] = worker + before_fork.call(self, worker) + WORKERS[fork { worker_loop(worker) }] = worker end end def maintain_worker_count - (off = WORKERS.size - @worker_processes) == 0 and return + (off = WORKERS.size - worker_processes) == 0 and return off < 0 and return spawn_missing_workers - WORKERS.dup.each_pair { |pid,w| - w.nr >= @worker_processes and kill_worker(:QUIT, pid) rescue nil + WORKERS.dup.each_pair { |wpid,w| + w.nr >= worker_processes and kill_worker(:QUIT, wpid) rescue nil } end # once a client is accepted, it is processed in its entirety here # in 3 easy steps: read request, call app, write app response - def process_client(app, client) + def process_client(client) client.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) - HttpResponse.write(client, app.call(REQUEST.read(client))) + response = app.call(env = REQUEST.read(client)) + + if 100 == response.first.to_i + client.write(Const::EXPECT_100_RESPONSE) + env.delete(Const::HTTP_EXPECT) + response = app.call(env) + end + + HttpResponse.write(client, response) # if we get any error, try to write something back to the client # assuming we haven't closed the socket, but don't get hung up # if the socket is already closed or broken. We'll always ensure @@ -457,7 +473,7 @@ module Unicorn # gets rid of stuff the worker has no business keeping track of # to free some resources and drops all sig handlers. - # traps for USR1, USR2, and HUP may be set in the @after_fork Proc + # traps for USR1, USR2, and HUP may be set in the after_fork Proc # by the user. def init_worker_process(worker) QUEUE_SIGS.each { |sig| trap(sig, nil) } @@ -470,15 +486,15 @@ module Unicorn WORKERS.clear LISTENERS.each { |sock| sock.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) } worker.tempfile.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) - @after_fork.call(self, worker) # can drop perms - @timeout /= 2.0 # halve it for select() - build_app! unless @preload_app + after_fork.call(self, worker) # can drop perms + self.timeout /= 2.0 # halve it for select() + build_app! unless preload_app end def reopen_worker_logs(worker_nr) - @logger.info "worker=#{worker_nr} reopening logs..." + logger.info "worker=#{worker_nr} reopening logs..." Unicorn::Util.reopen_logs - @logger.info "worker=#{worker_nr} done reopening logs" + logger.info "worker=#{worker_nr} done reopening logs" init_self_pipe! end @@ -486,7 +502,7 @@ module Unicorn # for connections and doesn't die until the parent dies (or is # given a INT, QUIT, or TERM signal) def worker_loop(worker) - master_pid = Process.ppid # slightly racy, but less memory usage + ppid = master_pid init_worker_process(worker) nr = 0 # this becomes negative if we need to reopen logs alive = worker.tempfile # tempfile is our lifeline to the master process @@ -497,14 +513,13 @@ module Unicorn trap(:USR1) { nr = -65536; SELF_PIPE.first.close rescue nil } trap(:QUIT) { alive = nil; LISTENERS.each { |s| s.close rescue nil } } [:TERM, :INT].each { |sig| trap(sig) { exit!(0) } } # instant shutdown - @logger.info "worker=#{worker.nr} ready" - app = @app + logger.info "worker=#{worker.nr} ready" begin nr < 0 and reopen_worker_logs(worker.nr) nr = 0 - # we're a goner in @timeout seconds anyways if alive.chmod + # we're a goner in timeout seconds anyways if alive.chmod # breaks, so don't trap the exception. Using fchmod() since # futimes() is not available in base Ruby and I very strongly # prefer temporary files to be unlinked for security, @@ -516,7 +531,7 @@ module Unicorn ready.each do |sock| begin - process_client(app, sock.accept_nonblock) + process_client(sock.accept_nonblock) nr += 1 t == (ti = Time.now.to_i) or alive.chmod(t = ti) rescue Errno::EAGAIN, Errno::ECONNABORTED @@ -530,11 +545,11 @@ module Unicorn # before we sleep again in select(). redo unless nr == 0 # (nr < 0) => reopen logs - master_pid == Process.ppid or return + ppid == Process.ppid or return alive.chmod(t = 0) begin # timeout used so we can detect parent death: - ret = IO.select(LISTENERS, nil, SELF_PIPE, @timeout) or redo + ret = IO.select(LISTENERS, nil, SELF_PIPE, timeout) or redo ready = ret.first rescue Errno::EINTR ready = LISTENERS @@ -551,17 +566,17 @@ module Unicorn # delivers a signal to a worker and fails gracefully if the worker # is no longer running. - def kill_worker(signal, pid) + def kill_worker(signal, wpid) begin - Process.kill(signal, pid) + Process.kill(signal, wpid) rescue Errno::ESRCH - worker = WORKERS.delete(pid) and worker.tempfile.close rescue nil + worker = WORKERS.delete(wpid) and worker.tempfile.close rescue nil end end # delivers a signal to each worker def kill_each_worker(signal) - WORKERS.keys.each { |pid| kill_worker(signal, pid) } + WORKERS.keys.each { |wpid| kill_worker(signal, wpid) } end # unlinks a PID file at given +path+ if it contains the current PID @@ -573,10 +588,10 @@ module Unicorn # returns a PID if a given path contains a non-stale PID file, # nil otherwise. def valid_pid?(path) - if File.exist?(path) && (pid = File.read(path).to_i) > 1 + if File.exist?(path) && (wpid = File.read(path).to_i) > 1 begin - Process.kill(0, pid) - return pid + Process.kill(0, wpid) + return wpid rescue Errno::ESRCH end end @@ -585,17 +600,17 @@ module Unicorn def load_config! begin - logger.info "reloading config_file=#{@config.config_file}" - @config[:listeners].replace(@init_listeners) - @config.reload - @config.commit!(self) + logger.info "reloading config_file=#{config.config_file}" + config[:listeners].replace(init_listeners) + config.reload + config.commit!(self) kill_each_worker(:QUIT) Unicorn::Util.reopen_logs - @app = @orig_app - build_app! if @preload_app - logger.info "done reloading config_file=#{@config.config_file}" + self.app = orig_app + build_app! if preload_app + logger.info "done reloading config_file=#{config.config_file}" rescue Object => e - logger.error "error reloading config_file=#{@config.config_file}: " \ + logger.error "error reloading config_file=#{config.config_file}: " \ "#{e.class} #{e.message}" end end @@ -606,12 +621,12 @@ module Unicorn end def build_app! - if @app.respond_to?(:arity) && @app.arity == 0 + if app.respond_to?(:arity) && app.arity == 0 if defined?(Gem) && Gem.respond_to?(:refresh) logger.info "Refreshing Gem list" Gem.refresh end - @app = @app.call + self.app = app.call end end diff --git a/lib/unicorn/app/exec_cgi.rb b/lib/unicorn/app/exec_cgi.rb index 8f81d78..147b279 100644 --- a/lib/unicorn/app/exec_cgi.rb +++ b/lib/unicorn/app/exec_cgi.rb @@ -5,7 +5,7 @@ module Unicorn::App # This class is highly experimental (even more so than the rest of Unicorn) # and has never run anything other than cgit. - class ExecCgi + class ExecCgi < Struct.new(:args) CHUNK_SIZE = 16384 PASS_VARS = %w( @@ -32,21 +32,21 @@ module Unicorn::App # run Unicorn::App::ExecCgi.new("/path/to/cgit.cgi") # end def initialize(*args) - @args = args.dup - first = @args[0] or + self.args = args + first = args[0] or raise ArgumentError, "need path to executable" - first[0..0] == "/" or @args[0] = ::File.expand_path(first) - File.executable?(@args[0]) or - raise ArgumentError, "#{@args[0]} is not executable" + first[0..0] == "/" or args[0] = ::File.expand_path(first) + File.executable?(args[0]) or + raise ArgumentError, "#{args[0]} is not executable" end # Calls the app def call(env) - out, err = Tempfile.new(''), Tempfile.new('') + out, err = Tempfile.new(nil), Tempfile.new(nil) out.unlink err.unlink inp = force_file_input(env) - inp.sync = out.sync = err.sync = true + out.sync = err.sync = true pid = fork { run_child(inp, out, err, env) } inp.close pid, status = Process.waitpid2(pid) @@ -65,14 +65,14 @@ module Unicorn::App val = env[key] or next ENV[key] = val end - ENV['SCRIPT_NAME'] = @args[0] + ENV['SCRIPT_NAME'] = args[0] ENV['GATEWAY_INTERFACE'] = 'CGI/1.1' env.keys.grep(/^HTTP_/) { |key| ENV[key] = env[key] } a = IO.new(0).reopen(inp) b = IO.new(1).reopen(out) c = IO.new(2).reopen(err) - exec(*@args) + exec(*args) end # Extracts headers from CGI out, will change the offset of out. @@ -89,23 +89,24 @@ module Unicorn::App offset = 4 end offset += head.length - out.instance_variable_set('@unicorn_app_exec_cgi_offset', offset) - size -= offset # Allows +out+ to be used as a Rack body. - def out.each - sysseek(@unicorn_app_exec_cgi_offset) - - # don't use a preallocated buffer for sysread since we can't - # guarantee an actual socket is consuming the yielded string - # (or if somebody is pushing to an array for eventual concatenation - begin - yield(sysread(CHUNK_SIZE)) - rescue EOFError - return - end while true - end + out.instance_eval { class << self; self; end }.instance_eval { + define_method(:each) { |&blk| + sysseek(offset) + + # don't use a preallocated buffer for sysread since we can't + # guarantee an actual socket is consuming the yielded string + # (or if somebody is pushing to an array for eventual concatenation + begin + blk.call(sysread(CHUNK_SIZE)) + rescue EOFError + break + end while true + } + } + size -= offset prev = nil headers = Rack::Utils::HeaderHash.new head.split(/\r?\n/).each do |line| @@ -121,17 +122,15 @@ module Unicorn::App # ensures rack.input is a file handle that we can redirect stdin to def force_file_input(env) inp = env['rack.input'] - if inp.respond_to?(:fileno) && Integer === inp.fileno - inp - elsif inp.size == 0 # inp could be a StringIO or StringIO-like object - ::File.open('/dev/null') + if inp.size == 0 # inp could be a StringIO or StringIO-like object + ::File.open('/dev/null', 'rb') else - tmp = Tempfile.new('') + tmp = Tempfile.new(nil) tmp.unlink tmp.binmode + tmp.sync = true - # Rack::Lint::InputWrapper doesn't allow sysread :( - buf = '' + buf = Z.dup while inp.read(CHUNK_SIZE, buf) tmp.syswrite(buf) end @@ -146,7 +145,7 @@ module Unicorn::App err.seek(0) dst = env['rack.errors'] pid = status.pid - dst.write("#{pid}: #{@args.inspect} status=#{status} stderr:\n") + dst.write("#{pid}: #{args.inspect} status=#{status} stderr:\n") err.each_line { |line| dst.write("#{pid}: #{line}") } dst.flush end diff --git a/lib/unicorn/app/inetd.rb b/lib/unicorn/app/inetd.rb new file mode 100644 index 0000000..580b456 --- /dev/null +++ b/lib/unicorn/app/inetd.rb @@ -0,0 +1,106 @@ +# Copyright (c) 2009 Eric Wong +# You can redistribute it and/or modify it under the same terms as Ruby. + +# this class *must* be used with Rack::Chunked + +module Unicorn::App + class Inetd < Struct.new(:cmd) + + class CatBody < Struct.new(:errors, :err_rd, :out_rd, :pid_map) + def initialize(env, cmd) + self.errors = env['rack.errors'] + in_rd, in_wr = IO.pipe + self.err_rd, err_wr = IO.pipe + self.out_rd, out_wr = IO.pipe + + cmd_pid = fork { + inp, out, err = (0..2).map { |i| IO.new(i) } + inp.reopen(in_rd) + out.reopen(out_wr) + err.reopen(err_wr) + [ in_rd, in_wr, err_rd, err_wr, out_rd, out_wr ].each { |i| i.close } + exec(*cmd) + } + [ in_rd, err_wr, out_wr ].each { |io| io.close } + [ in_wr, err_rd, out_rd ].each { |io| io.binmode } + in_wr.sync = true + + # Unfortunately, input here must be processed inside a seperate + # thread/process using blocking I/O since env['rack.input'] is not + # IO.select-able and attempting to make it so would trip Rack::Lint + inp_pid = fork { + input = env['rack.input'] + [ err_rd, out_rd ].each { |io| io.close } + buf = Unicorn::Z.dup + + # this is dependent on input.read having readpartial semantics: + while input.read(16384, buf) + in_wr.write(buf) + end + in_wr.close + } + in_wr.close + self.pid_map = { + inp_pid => 'input streamer', + cmd_pid => cmd.inspect, + } + end + + def each(&block) + begin + rd, = IO.select([err_rd, out_rd]) + rd && rd.first or next + + if rd.include?(err_rd) + begin + errors.write(err_rd.read_nonblock(16384)) + rescue Errno::EINTR + rescue Errno::EAGAIN + break + end while true + end + + rd.include?(out_rd) or next + + begin + yield out_rd.read_nonblock(16384) + rescue Errno::EINTR + rescue Errno::EAGAIN + break + end while true + rescue EOFError,Errno::EPIPE,Errno::EBADF,Errno::EINVAL + break + end while true + + self + end + + def close + pid_map.each { |pid, str| + begin + pid, status = Process.waitpid2(pid) + status.success? or + errors.write("#{str}: #{status.inspect} (PID:#{pid})\n") + rescue Errno::ECHILD + errors.write("Failed to reap #{str} (PID:#{pid})\n") + end + } + end + + end + + def initialize(*cmd) + self.cmd = cmd + end + + def call(env) + /\A100-continue\z/i =~ env[Unicorn::Const::HTTP_EXPECT] and + return [ 100, {} , [] ] + + [ 200, { 'Content-Type' => 'application/octet-stream' }, + CatBody.new(env, cmd) ] + end + + end + +end diff --git a/lib/unicorn/app/old_rails/static.rb b/lib/unicorn/app/old_rails/static.rb index 17c007c..51a0017 100644 --- a/lib/unicorn/app/old_rails/static.rb +++ b/lib/unicorn/app/old_rails/static.rb @@ -19,28 +19,28 @@ require 'rack/file' # This means that if you are using page caching it will actually work # with Unicorn and you should see a decent speed boost (but not as # fast as if you use a static server like nginx). -class Unicorn::App::OldRails::Static +class Unicorn::App::OldRails::Static < Struct.new(:app, :root, :file_server) FILE_METHODS = { 'GET' => true, 'HEAD' => true }.freeze REQUEST_METHOD = 'REQUEST_METHOD'.freeze REQUEST_URI = 'REQUEST_URI'.freeze PATH_INFO = 'PATH_INFO'.freeze def initialize(app) - @app = app - @root = "#{::RAILS_ROOT}/public" - @file_server = ::Rack::File.new(@root) + self.app = app + self.root = "#{::RAILS_ROOT}/public" + self.file_server = ::Rack::File.new(root) end def call(env) # short circuit this ASAP if serving non-file methods - FILE_METHODS.include?(env[REQUEST_METHOD]) or return @app.call(env) + FILE_METHODS.include?(env[REQUEST_METHOD]) or return app.call(env) # first try the path as-is path_info = env[PATH_INFO].chomp("/") - if File.file?("#@root/#{::Rack::Utils.unescape(path_info)}") + if File.file?("#{root}/#{::Rack::Utils.unescape(path_info)}") # File exists as-is so serve it up env[PATH_INFO] = path_info - return @file_server.call(env) + return file_server.call(env) end # then try the cached version: @@ -50,11 +50,11 @@ class Unicorn::App::OldRails::Static env[REQUEST_URI] =~ /^#{Regexp.escape(path_info)}(;[^\?]+)/ path_info << "#$1#{ActionController::Base.page_cache_extension}" - if File.file?("#@root/#{::Rack::Utils.unescape(path_info)}") + if File.file?("#{root}/#{::Rack::Utils.unescape(path_info)}") env[PATH_INFO] = path_info - return @file_server.call(env) + return file_server.call(env) end - @app.call(env) # call OldRails + app.call(env) # call OldRails end end if defined?(Unicorn::App::OldRails) diff --git a/lib/unicorn/chunked_reader.rb b/lib/unicorn/chunked_reader.rb new file mode 100644 index 0000000..606e4a6 --- /dev/null +++ b/lib/unicorn/chunked_reader.rb @@ -0,0 +1,94 @@ +# Copyright (c) 2009 Eric Wong +# You can redistribute it and/or modify it under the same terms as Ruby. + +require 'unicorn' +require 'unicorn/http11' + +module Unicorn + class ChunkedReader + + def initialize(env, input, buf) + @env, @input, @buf = env, input, buf + @chunk_left = 0 + parse_chunk_header + end + + def readpartial(max, buf = Z.dup) + while @input && @chunk_left <= 0 && ! parse_chunk_header + @buf << @input.readpartial(Const::CHUNK_SIZE, buf) + end + + if @input + begin + @buf << @input.read_nonblock(Const::CHUNK_SIZE, buf) + rescue Errno::EAGAIN, Errno::EINTR + end + end + + max = @chunk_left if max > @chunk_left + buf.replace(last_block(max) || Z) + @chunk_left -= buf.size + (0 == buf.size && @input.nil?) and raise EOFError + buf + end + + def gets + line = nil + begin + line = readpartial(Const::CHUNK_SIZE) + begin + if line.sub!(%r{\A(.*?#{$/})}, Z) + @chunk_left += line.size + @buf = @buf ? (line << @buf) : line + return $1.dup + end + line << readpartial(Const::CHUNK_SIZE) + end while true + rescue EOFError + return line + end + end + + private + + def last_block(max = nil) + rv = @buf + if max && rv && max < rv.size + @buf = rv[max - rv.size, rv.size - max] + return rv[0, max] + end + @buf = Z.dup + rv + end + + def parse_chunk_header + buf = @buf + # ignoring chunk-extension info for now, I haven't seen any use for it + # (or any users, and TE:chunked sent by clients is rare already) + # if there was not enough data in buffer to parse length of the chunk + # then just return + if buf.sub!(/\A(?:\r\n)?([a-fA-F0-9]{1,8})[^\r]*?\r\n/, Z) + @chunk_left = $1.to_i(16) + if 0 == @chunk_left # EOF + buf.sub!(/\A\r\n(?:\r\n)?/, Z) # cleanup for future requests + if trailer = @env[Const::HTTP_TRAILER] + tp = TrailerParser.new(trailer) + while ! tp.execute!(@env, buf) + buf << @input.readpartial(Const::CHUNK_SIZE) + end + end + @input = nil + end + return @chunk_left + end + + buf.size > 256 and + raise HttpParserError, + "malformed chunk, chunk-length not found in buffer: " \ + "#{buf.inspect}" + nil + end + + end + +end diff --git a/lib/unicorn/configurator.rb b/lib/unicorn/configurator.rb index 860962a..0ecd0d5 100644 --- a/lib/unicorn/configurator.rb +++ b/lib/unicorn/configurator.rb @@ -14,14 +14,13 @@ module Unicorn # after_fork do |server,worker| # server.listen("127.0.0.1:#{9293 + worker.nr}") rescue nil # end - class Configurator + class Configurator < Struct.new(:set, :config_file) # The default logger writes its output to $stderr - DEFAULT_LOGGER = Logger.new($stderr) unless defined?(DEFAULT_LOGGER) + DEFAULT_LOGGER = Logger.new($stderr) # Default settings for Unicorn DEFAULTS = { :timeout => 60, - :listeners => [], :logger => DEFAULT_LOGGER, :worker_processes => 1, :after_fork => lambda { |server, worker| @@ -42,42 +41,32 @@ module Unicorn }, :pid => nil, :preload_app => false, - :stderr_path => nil, - :stdout_path => nil, } - attr_reader :config_file #:nodoc: - def initialize(defaults = {}) #:nodoc: - @set = Hash.new(:unset) + self.set = Hash.new(:unset) use_defaults = defaults.delete(:use_defaults) - @config_file = defaults.delete(:config_file) - @config_file.freeze - @set.merge!(DEFAULTS) if use_defaults + self.config_file = defaults.delete(:config_file) + set.merge!(DEFAULTS) if use_defaults defaults.each { |key, value| self.send(key, value) } reload end def reload #:nodoc: - instance_eval(File.read(@config_file)) if @config_file + instance_eval(File.read(config_file)) if config_file end def commit!(server, options = {}) #:nodoc: skip = options[:skip] || [] - @set.each do |key, value| - (Symbol === value && value == :unset) and next + set.each do |key, value| + value == :unset and next skip.include?(key) and next - setter = "#{key}=" - if server.respond_to?(setter) - server.send(setter, value) - else - server.instance_variable_set("@#{key}", value) - end + server.__send__("#{key}=", value) end end def [](key) # :nodoc: - @set[key] + set[key] end # sets object to the +new+ Logger-like object. The new logger-like @@ -89,7 +78,7 @@ module Unicorn raise ArgumentError, "logger=#{new} does not respond to method=#{m}" end - @set[:logger] = new + set[:logger] = new end # sets after_fork hook to a given block. This block will be called by @@ -151,7 +140,7 @@ module Unicorn "not numeric: timeout=#{seconds.inspect}" seconds >= 3 or raise ArgumentError, "too low: timeout=#{seconds.inspect}" - @set[:timeout] = seconds + set[:timeout] = seconds end # sets the current number of worker_processes to +nr+. Each worker @@ -161,7 +150,7 @@ module Unicorn "not an integer: worker_processes=#{nr.inspect}" nr >= 0 or raise ArgumentError, "not non-negative: worker_processes=#{nr.inspect}" - @set[:worker_processes] = nr + set[:worker_processes] = nr end # sets listeners to the given +addresses+, replacing or augmenting the @@ -172,7 +161,7 @@ module Unicorn def listeners(addresses) # :nodoc: Array === addresses or addresses = Array(addresses) addresses.map! { |addr| expand_addr(addr) } - @set[:listeners] = addresses + set[:listeners] = addresses end # adds an +address+ to the existing listener set. @@ -227,8 +216,8 @@ module Unicorn def listen(address, opt = {}) address = expand_addr(address) if String === address - Hash === @set[:listener_opts] or - @set[:listener_opts] = Hash.new { |hash,key| hash[key] = {} } + Hash === set[:listener_opts] or + set[:listener_opts] = Hash.new { |hash,key| hash[key] = {} } [ :backlog, :sndbuf, :rcvbuf ].each do |key| value = opt[key] or next Integer === value or @@ -239,11 +228,11 @@ module Unicorn TrueClass === value || FalseClass === value or raise ArgumentError, "not boolean: #{key}=#{value.inspect}" end - @set[:listener_opts][address].merge!(opt) + set[:listener_opts][address].merge!(opt) end - @set[:listeners] = [] unless Array === @set[:listeners] - @set[:listeners] << address + set[:listeners] = [] unless Array === set[:listeners] + set[:listeners] << address end # sets the +path+ for the PID file of the unicorn master process @@ -265,7 +254,7 @@ module Unicorn def preload_app(bool) case bool when TrueClass, FalseClass - @set[:preload_app] = bool + set[:preload_app] = bool else raise ArgumentError, "preload_app=#{bool.inspect} not a boolean" end @@ -298,7 +287,7 @@ module Unicorn else raise ArgumentError end - @set[var] = path + set[var] = path end def set_hook(var, my_proc, req_arity = 2) #:nodoc: @@ -314,7 +303,7 @@ module Unicorn else raise ArgumentError, "invalid type: #{var}=#{my_proc.inspect}" end - @set[var] = my_proc + set[var] = my_proc end # expands "unix:path/to/foo" to a socket relative to the current path diff --git a/lib/unicorn/const.rb b/lib/unicorn/const.rb index 72a4d61..ef58984 100644 --- a/lib/unicorn/const.rb +++ b/lib/unicorn/const.rb @@ -5,7 +5,7 @@ module Unicorn # gave about a 3% to 10% performance improvement over using the strings directly. # Symbols did not really improve things much compared to constants. module Const - UNICORN_VERSION="0.8.2".freeze + UNICORN_VERSION="0.9.1".freeze DEFAULT_HOST = "0.0.0.0".freeze # default TCP listen host address DEFAULT_PORT = "8080".freeze # default TCP listen port @@ -24,11 +24,15 @@ module Unicorn # common errors we'll send back ERROR_400_RESPONSE = "HTTP/1.1 400 Bad Request\r\n\r\n".freeze ERROR_500_RESPONSE = "HTTP/1.1 500 Internal Server Error\r\n\r\n".freeze + EXPECT_100_RESPONSE = "HTTP/1.1 100 Continue\r\n\r\n" # A frozen format for this is about 15% faster + HTTP_TRANSFER_ENCODING = 'HTTP_TRANSFER_ENCODING'.freeze CONTENT_LENGTH="CONTENT_LENGTH".freeze REMOTE_ADDR="REMOTE_ADDR".freeze HTTP_X_FORWARDED_FOR="HTTP_X_FORWARDED_FOR".freeze + HTTP_EXPECT="HTTP_EXPECT".freeze + HTTP_TRAILER="HTTP_TRAILER".freeze RACK_INPUT="rack.input".freeze end diff --git a/lib/unicorn/http_request.rb b/lib/unicorn/http_request.rb index d7078a3..b8df403 100644 --- a/lib/unicorn/http_request.rb +++ b/lib/unicorn/http_request.rb @@ -1,19 +1,11 @@ -require 'tempfile' require 'stringio' # compiled extension require 'unicorn/http11' module Unicorn - # - # The HttpRequest.initialize method will convert any request that is larger than - # Const::MAX_BODY into a Tempfile and use that as the body. Otherwise it uses - # a StringIO object. To be safe, you should assume it works like a file. - # class HttpRequest - attr_accessor :logger - # default parameters we merge into the request env for Rack handlers DEFAULTS = { "rack.errors" => $stderr, @@ -27,21 +19,19 @@ module Unicorn "SERVER_SOFTWARE" => "Unicorn #{Const::UNICORN_VERSION}".freeze } - # Optimize for the common case where there's no request body - # (GET/HEAD) requests. - NULL_IO = StringIO.new + NULL_IO = StringIO.new(Z) LOCALHOST = '127.0.0.1'.freeze + def initialize + end + # Being explicitly single-threaded, we have certain advantages in # not having to worry about variables being clobbered :) BUFFER = ' ' * Const::CHUNK_SIZE # initial size, may grow + BUFFER.force_encoding(Encoding::BINARY) if Z.respond_to?(:force_encoding) PARSER = HttpParser.new PARAMS = Hash.new - def initialize(logger = Configurator::DEFAULT_LOGGER) - @logger = logger - end - # Does the majority of the IO processing. It has been written in # Ruby using about 8 different IO processing strategies. # @@ -56,11 +46,6 @@ module Unicorn # This does minimal exception trapping and it is up to the caller # to handle any socket errors (e.g. user aborted upload). def read(socket) - # reset the parser - unless NULL_IO == (input = PARAMS[Const::RACK_INPUT]) # unlikely - input.close rescue nil - input.close! rescue nil - end PARAMS.clear PARSER.reset @@ -86,69 +71,30 @@ module Unicorn data << socket.readpartial(Const::CHUNK_SIZE, BUFFER) PARSER.execute(PARAMS, data) and return handle_body(socket) end while true - rescue HttpParserError => e - @logger.error "HTTP parse error, malformed request " \ - "(#{PARAMS[Const::HTTP_X_FORWARDED_FOR] || - PARAMS[Const::REMOTE_ADDR]}): #{e.inspect}" - @logger.error "REQUEST DATA: #{data.inspect}\n---\n" \ - "PARAMS: #{PARAMS.inspect}\n---\n" - raise e end private # Handles dealing with the rest of the request - # returns a Rack environment if successful, raises an exception if not + # returns a Rack environment if successful def handle_body(socket) - http_body = PARAMS.delete(:http_body) - content_length = PARAMS[Const::CONTENT_LENGTH].to_i - - if content_length == 0 # short circuit the common case - PARAMS[Const::RACK_INPUT] = NULL_IO.closed? ? NULL_IO.reopen : NULL_IO - return PARAMS.update(DEFAULTS) + PARAMS[Const::RACK_INPUT] = if (body = PARAMS.delete(:http_body)) + length = PARAMS[Const::CONTENT_LENGTH].to_i + + if te = PARAMS[Const::HTTP_TRANSFER_ENCODING] + if /\Achunked\z/i =~ te + socket = ChunkedReader.new(PARAMS, socket, body) + length = body = nil + end + end + + TeeInput.new(socket, length, body) + else + NULL_IO.closed? ? NULL_IO.reopen(Z) : NULL_IO end - # must read more data to complete body - remain = content_length - http_body.length - - body = PARAMS[Const::RACK_INPUT] = (remain < Const::MAX_BODY) ? - StringIO.new : Tempfile.new('unicorn') - - body.binmode - body.write(http_body) - - # Some clients (like FF1.0) report 0 for body and then send a body. - # This will probably truncate them but at least the request goes through - # usually. - read_body(socket, remain, body) if remain > 0 - body.rewind - - # in case read_body overread because the client tried to pipeline - # another request, we'll truncate it. Again, we don't do pipelining - # or keepalive - body.truncate(content_length) PARAMS.update(DEFAULTS) end - # Does the heavy lifting of properly reading the larger body - # requests in small chunks. It expects PARAMS['rack.input'] to be - # an IO object, socket to be valid, It also expects any initial part - # of the body that has been read to be in the PARAMS['rack.input'] - # already. It will return true if successful and false if not. - def read_body(socket, remain, body) - begin - # write always writes the requested amount on a POSIX filesystem - remain -= body.write(socket.readpartial(Const::CHUNK_SIZE, BUFFER)) - end while remain > 0 - rescue Object => e - @logger.error "Error reading HTTP body: #{e.inspect}" - - # Any errors means we should delete the file, including if the file - # is dumped. Truncate it ASAP to help avoid page flushes to disk. - body.truncate(0) rescue nil - reset - raise e - end - end end diff --git a/lib/unicorn/http_response.rb b/lib/unicorn/http_response.rb index 15df3f6..bfaa33d 100644 --- a/lib/unicorn/http_response.rb +++ b/lib/unicorn/http_response.rb @@ -31,7 +31,6 @@ module Unicorn # Connection: and Date: headers no matter what (if anything) our # Rack application sent us. SKIP = { 'connection' => true, 'date' => true, 'status' => true }.freeze - EMPTY = ''.freeze # :nodoc OUT = [] # :nodoc # writes the rack_response to socket as an HTTP response @@ -59,7 +58,7 @@ module Unicorn "Date: #{Time.now.httpdate}\r\n" \ "Status: #{status}\r\n" \ "Connection: close\r\n" \ - "#{OUT.join(EMPTY)}\r\n") + "#{OUT.join(Z)}\r\n") body.each { |chunk| socket.write(chunk) } socket.close # flushes and uncorks the socket immediately ensure diff --git a/lib/unicorn/tee_input.rb b/lib/unicorn/tee_input.rb new file mode 100644 index 0000000..06028a6 --- /dev/null +++ b/lib/unicorn/tee_input.rb @@ -0,0 +1,135 @@ +# Copyright (c) 2009 Eric Wong +# You can redistribute it and/or modify it under the same terms as Ruby. + +require 'tempfile' + +# acts like tee(1) on an input input to provide a input-like stream +# while providing rewindable semantics through a Tempfile/StringIO +# backing store. On the first pass, the input is only read on demand +# so your Rack application can use input notification (upload progress +# and like). This should fully conform to the Rack::InputWrapper +# specification on the public API. This class is intended to be a +# strict interpretation of Rack::InputWrapper functionality and will +# not support any deviations from it. + +module Unicorn + class TeeInput + + def initialize(input, size, body) + @tmp = Tempfile.new(nil) + @tmp.unlink + @tmp.binmode + @tmp.sync = true + + if body + @tmp.write(body) + @tmp.seek(0) + end + @input = input + @size = size # nil if chunked + end + + # returns the size of the input. This is what the Content-Length + # header value should be, and how large our input is expected to be. + # For TE:chunked, this requires consuming all of the input stream + # before returning since there's no other way + def size + @size and return @size + + if @input + buf = Z.dup + while tee(Const::CHUNK_SIZE, buf) + end + @tmp.rewind + end + + @size = @tmp.stat.size + end + + def read(*args) + @input or return @tmp.read(*args) + + length = args.shift + if nil == length + rv = @tmp.read || Z.dup + tmp = Z.dup + while tee(Const::CHUNK_SIZE, tmp) + rv << tmp + end + rv + else + buf = args.shift || Z.dup + diff = @tmp.stat.size - @tmp.pos + if 0 == diff + tee(length, buf) + else + @tmp.read(diff > length ? length : diff, buf) + end + end + end + + # takes zero arguments for strict Rack::Lint compatibility, unlike IO#gets + def gets + @input or return @tmp.gets + nil == $/ and return read + + line = nil + if @tmp.pos < @tmp.stat.size + line = @tmp.gets # cannot be nil here + $/ == line[-$/.size, $/.size] and return line + + # half the line was already read, and the rest of has not been read + if buf = @input.gets + @tmp.write(buf) + line << buf + else + @input = nil + end + elsif line = @input.gets + @tmp.write(line) + end + + line + end + + def each(&block) + while line = gets + yield line + end + + self # Rack does not specify what the return value here + end + + def rewind + @tmp.rewind # Rack does not specify what the return value here + end + + private + + # tees off a +length+ chunk of data from the input into the IO + # backing store as well as returning it. +buf+ must be specified. + # returns nil if reading from the input returns nil + def tee(length, buf) + begin + if @size + left = @size - @tmp.stat.size + 0 == left and return nil + if length >= left + @input.readpartial(left, buf) == left and @input = nil + elsif @input.nil? + return nil + else + @input.readpartial(length, buf) + end + else # ChunkedReader#readpartial just raises EOFError when done + @input.readpartial(length, buf) + end + rescue EOFError + return @input = nil + end + @tmp.write(buf) + buf + end + + end +end diff --git a/lib/unicorn/trailer_parser.rb b/lib/unicorn/trailer_parser.rb new file mode 100644 index 0000000..9431331 --- /dev/null +++ b/lib/unicorn/trailer_parser.rb @@ -0,0 +1,52 @@ +# Copyright (c) 2009 Eric Wong +# You can redistribute it and/or modify it under the same terms as Ruby. +require 'unicorn' +require 'unicorn/http11' + +# Eventually I should integrate this into HttpParser... +module Unicorn + class TrailerParser + + TR_FR = 'a-z-'.freeze + TR_TO = 'A-Z_'.freeze + + # initializes HTTP trailer parser with acceptable +trailer+ + def initialize(http_trailer) + @trailers = http_trailer.split(/\s*,\s*/).inject({}) { |hash, key| + hash[key.tr(TR_FR, TR_TO)] = true + hash + } + end + + # Executes our TrailerParser on +data+ and modifies +env+ This will + # shrink +data+ as it is being consumed. Returns true if it has + # parsed all trailers, false if not. It raises HttpParserError on + # parse failure or unknown headers. It has slightly smaller limits + # than the C-based HTTP parser but should not be an issue in practice + # since Content-MD5 is probably the only legitimate use for it. + def execute!(env, data) + data.size > 0xffff and + raise HttpParserError, "trailer buffer too large: #{data.size} bytes" + + begin + data.sub!(/\A([^\r]+)\r\n/, Z) or return false # need more data + + key, val = $1.split(/:\s*/, 2) + + key.size > 256 and + raise HttpParserError, "trailer key #{key.inspect} is too long" + val.size > 8192 and + raise HttpParserError, "trailer value #{val.inspect} is too long" + + key.tr!(TR_FR, TR_TO) + + @trailers.delete(key) or + raise HttpParserError, "unknown trailer: #{key.inspect}" + env["HTTP_#{key}"] = val + + @trailers.empty? and return true + end while true + end + + end +end |