From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: AS62744 199.249.223.0/24 X-Spam-Status: No, score=-1.2 required=3.0 tests=BAYES_00,RCVD_IN_MSPIKE_BL, RCVD_IN_MSPIKE_ZBI,RCVD_IN_XBL,RDNS_NONE,SPF_FAIL,SPF_HELO_FAIL, TO_EQ_FM_DIRECT_MX,TO_EQ_FM_DOM_SPF_FAIL,TO_EQ_FM_SPF_FAIL,WEIRD_PORT shortcircuit=no autolearn=no autolearn_force=no version=3.4.0 Received: from 80x24.org (unknown [199.249.223.69]) by dcvr.yhbt.net (Postfix) with ESMTP id E5F301FAFB for ; Fri, 7 Apr 2017 03:42:16 +0000 (UTC) From: Eric Wong To: yahns-public@yhbt.net Subject: [PATCH] rack_proxy: initial implementation Date: Fri, 7 Apr 2017 03:42:15 +0000 Message-Id: <20170407034215.25377-1-yahns-public@yhbt.net> List-Id: From: Eric Wong Needs more tests, docs, and such... --- lib/yahns.rb | 3 + lib/yahns/config.rb | 16 ++++- lib/yahns/http_context.rb | 6 ++ lib/yahns/rack_proxy.rb | 156 ++++++++++++++++++++++++++++++++++++++++++++++ lib/yahns/server.rb | 1 + lib/yahns/server_mp.rb | 55 +++++++++++++--- test/test_rack_proxy.rb | 96 ++++++++++++++++++++++++++++ 7 files changed, 323 insertions(+), 10 deletions(-) create mode 100644 lib/yahns/rack_proxy.rb create mode 100644 test/test_rack_proxy.rb diff --git a/lib/yahns.rb b/lib/yahns.rb index a0abe49..8c66d14 100644 --- a/lib/yahns.rb +++ b/lib/yahns.rb @@ -59,6 +59,9 @@ module Yahns ClientTimeout = Class.new(RuntimeError) # :nodoc: + # for rack_proxy, maybe others... + Submaster = Struct.new(:key, :cmd, :pid) # :nodoc: + # try to use the monotonic clock in Ruby >= 2.1, it is immune to clock # offset adjustments and generates less garbage (Float vs Time object) begin diff --git a/lib/yahns/config.rb b/lib/yahns/config.rb index bcea0d4..4650665 100644 --- a/lib/yahns/config.rb +++ b/lib/yahns/config.rb @@ -313,7 +313,7 @@ def client_expire_threshold(val) @set[var] = val end - # type = :rack + # type = :rack, :rack_proxy def app(type, *args, &block) var = _check_in_block(nil, :app) file = "yahns/#{type.to_s}" @@ -436,7 +436,19 @@ def commit!(server) server.__send__("#{var}=", val) if val != :unset end - @app_ctx.each { |app| app.logger ||= server.logger } + # count extra submasters for rack_proxy (and maybe others) + submasters = [] + @app_ctx.each do |ctx| + ctx.logger ||= server.logger + next unless ctx.respond_to?(:submasters) + sm = ctx.submasters and submasters.concat(sm) + end + unless submasters.empty? + wp = @set[:worker_processes] + wp = 1 if wp == :unset # gotta have at least one worker for submasters + server.__send__(:worker_processes=, wp) + server.instance_variable_set(:@submasters, submasters) + end end def register_inherited(name) diff --git a/lib/yahns/http_context.rb b/lib/yahns/http_context.rb index 40f2c58..6c18fb8 100644 --- a/lib/yahns/http_context.rb +++ b/lib/yahns/http_context.rb @@ -91,4 +91,10 @@ def tmpio_for(len, env) end tmp end + + # returns an array of Submaster structs or nil, + # only Yahns::RackProxy uses this at the moment + def submasters # :nodoc: + @yahns_rack.respond_to?(:submasters) ? @yahns_rack.submasters : nil + end end diff --git a/lib/yahns/rack_proxy.rb b/lib/yahns/rack_proxy.rb new file mode 100644 index 0000000..7144903 --- /dev/null +++ b/lib/yahns/rack_proxy.rb @@ -0,0 +1,156 @@ +# -*- encoding: binary -*- +# Copyright (C) 2017 all contributors +# License: GPL-3.0+ +# frozen_string_literal: true +require_relative 'rack' +require_relative 'proxy_pass' + +# Basically, a lazy way to setup ProxyPass to hand off some (or all) +# requests to any HTTP server backend (e.g. varnish, etc) +class Yahns::RackProxy < Yahns::Rack # :nodoc: + + # the key is the destination returned by the top-level config.ru + # and the value is a splattable array for spawning another process + # via Process.exec + # { + # # [ key, backend URL, ] => %w(splattable array for Process.exec), + # [:pass, 'http://127.0.0.1:9292/' ] => %w(rackup /path/to/config.ru) + # [:lsock, 'unix:/path/to/sock' ] => %w(bleh -l /path/to/sock ...) + # + # # Users of Ruby 2.3+ can shorten their config when + # # running systemd-aware daemons which will bind to + # # a random TCP port: + # :pri => %w(blah -c conf.rb config.ru), + # :alt => %w(blah -c /path/to/alt.conf.rb alt.ru), + # :psgi => %w(blah foo.psgi), + # ... + # } + + # By default, proxy all requests by using the :pass return value + # Users can selectively process requests for non-buggy code in + # the core yahns processes. + PROXY_ALL = lambda { |env| :pass } # :nodoc: + attr_reader :submasters # :nodoc: see http_context.rb /submasters + + # every declaration of this in yahns_config is unique: + def self.instance_key(*args) + args.object_id + end + + def initialize(mapping = { :pass => %w(true) }, ru = PROXY_ALL, opts = {}) + sd_env = { + 'LISTEN_FDS' => '1', + 'LISTEN_PID' => lambda { "#$$" } + } + @submasters = [] + case mapping + when Hash # multiple HTTP backends running different commands + # nothing to do { key: splattable array for Process.spawn } + when Array # only one backend + mapping = { :pass => mapping } + else + raise ArgumentError, "#{mapping.inspect} must be an Array or Hash" + end + + @proxy_pass_map = {} + mapping.each do |key, cmd| + case key + when Array + key, addr, ppopts = key + ppopts ||= {} + when Symbol # OK + ppopts = {} + else + raise ArgumentError, "#{key.inspect} is not a symbol" + end + Array === cmd or raise ArgumentError, + "#{cmd.inspect} must be a splattable array for Process.exec" + @proxy_pass_map[key] and raise ArgumentError, + "#{key.inspect} may not be repeated in mapping" + + cmd = cmd.dup + if addr + env = {} + rdr = {} + else + if RUBY_VERSION.to_f < 2.3 && @submasters.empty? # only warn once + warn "Ruby < 2.3 may crash when emulating systemd to pass FDs\n", +" http://blade.nagaokaut.ac.jp/cgi-bin/scat.rb/ruby/ruby-core/69895\n" + end + + # nope, no UNIXServer support, maybe not worth it to deal + # with FS perms in containers. + # TODO: try TCP Fast Open (Linux) + srv = random_tcp_listener(ppopts) + addr = srv.addr + addr = "http://#{addr[3]}:#{addr[1]}/" + env = sd_env + rdr = { 3 => srv } + end + + # never pass YAHNS_FD to children, they do not inherit what we use + # for SIGUSR2 upgrades + env['YAHNS_FD'] = nil + case cmd[0] + when Hash + cmd[0] = cmd[0].merge(env) + else + cmd.unshift(env) + end + + rdr[:close_others] = true + case cmd[-1] + when Hash + cmd[-1] = cmd[-1].merge(rdr) + else + cmd << rdr + end + + @submasters << Yahns::Submaster.new(key, cmd) + @proxy_pass_map[key] = Yahns::ProxyPass.new(addr, ppopts) + end + super(ru, opts) # Yahns::Rack#initialize + end + + def build_app! + super # Yahns::Rack#build_app! + proxy_app = @app + + # wrap the (possibly-)user-supplied app + @app = lambda do |env| + res = proxy_app.call(env) + + # standard Rack responses may be handled in yahns proper: + Array === res and return res + + # the response is :pass or another Symbol, not a proper Rack response! + # shove the env over to the appropriate Yahns::ProxyPass which + # talks to a backend HTTP process: + ppass = @proxy_pass_map[res] and return ppass.call(env) + + # oops, user screwed up :< + logger = env['rack.logger'] and + logger.error("bad response from user-supplied proxy: #{res.inspect}") + + [ 500, [ %w(Content-Type text/plain) ], [] ] + end + end + + def random_tcp_listener(opts) # TODO: should we support options? + srv = TCPServer.new('127.0.0.1', 0) # 0: bind random port + srv.close_on_exec = true + srv.setsockopt(:SOL_SOCKET, :SO_KEEPALIVE, 1) + srv.setsockopt(:IPPROTO_TCP, :TCP_NODELAY, 1) + + # Deferring accepts slows down core yahns, but it's useful for + # less-sophisticated upstream (backend) servers: + Socket.const_defined?(:TCP_DEFER_ACCEPT) and + srv.setsockopt(:IPPROTO_TCP, :TCP_DEFER_ACCEPT, 1) + + srv.listen(1024) + srv + end +end + +# register ourselves +Yahns::Config::APP_CLASS[:rack_proxy] = Yahns::RackProxy diff --git a/lib/yahns/server.rb b/lib/yahns/server.rb index efd6f05..4260a9d 100644 --- a/lib/yahns/server.rb +++ b/lib/yahns/server.rb @@ -37,6 +37,7 @@ def initialize(config) @user = nil @queues = [] @wthr = [] + @submasters = [] # array of Yahns::Submaster structs end def sqwakeup(sig) diff --git a/lib/yahns/server_mp.rb b/lib/yahns/server_mp.rb index c9cd207..c61d583 100644 --- a/lib/yahns/server_mp.rb +++ b/lib/yahns/server_mp.rb @@ -52,13 +52,31 @@ def worker_atfork_internal(worker) switch_user(*@user) if @user @user = @workers = nil __call_hooks(@atfork_child, worker.nr) - @atfork_child = @atfork_parent = @atfork_prepare = nil + @atfork_child = @atfork_parent = @atfork_prepare = @submasters = nil end def __call_hooks(ary, worker_nr) ary.each { |x| x.call(worker_nr) } if ary end + def spawn_missing_submasters + @submasters.each do |sm| + next if sm.pid # we don't expect dozens of submasters + pid = sm.pid = fork + if pid + @logger.info("submaster=#{sm.key.inspect} pid=#{pid}") + else + # expand env variables like LISTEN_PID in the child + env = sm.cmd[0] + env.each { |k,v| env[k] = v.call if v.respond_to?(:call) } + exec(*(sm.cmd)) + end + end + rescue => e + Yahns::Log.exception(@logger, 'spawning submaster', e) + exit! + end + def spawn_missing_workers worker_nr = -1 until (worker_nr += 1) == @worker_processes @@ -85,6 +103,7 @@ def spawn_missing_workers # one-at-a-time time and we'll happily drop signals in case somebody # is signalling us too often. def join + spawn_missing_submasters spawn_missing_workers state = :respawn # :QUIT, :WINCH proc_name 'master' @@ -94,14 +113,16 @@ def join @sev.kgio_wait_readable @sev.yahns_step reap_all - case @sig_queue.shift + case sig = @sig_queue.shift when *EXIT_SIGS # graceful shutdown (twice for non graceful) @listeners.each(&:close).clear soft_kill_each_worker("QUIT") + kill_each_submaster(sig) state = :QUIT when :USR1 # rotate logs usr1_reopen("master ") soft_kill_each_worker("USR1") + kill_each_submaster(sig) when :USR2 # exec binary, stay alive in case something went wrong reexec when :WINCH @@ -127,7 +148,10 @@ def join reexec end end while @sig_queue[0] - maintain_worker_count if state == :respawn + if state == :respawn + spawn_missing_submasters + maintain_worker_count + end rescue => e Yahns::Log.exception(@logger, "master loop error", e) end while state != :QUIT || @workers.size > 0 @@ -181,14 +205,29 @@ def reap_all @reexec_pid = 0 self.pid = @pid.chomp('.oldbin') if @pid proc_name('master') - else - worker = @workers.delete(wpid) - desc = worker ? "worker=#{worker.nr}" : "(unknown)" - m = "reaped #{status.inspect} #{desc}" - status.success? ? @logger.info(m) : @logger.error(m) + next end + if worker = @workers.delete(wpid) + desc = "worker=#{worker.nr}" + elsif sm = @submasters.find { |s| s.pid == wpid } + sm.pid = nil + desc = "submaster=#{sm.key.inspect}" + end + m = "reaped #{status.inspect} #{desc || '(unknown)'}" + status.success? ? @logger.info(m) : @logger.error(m) rescue Errno::ECHILD return end while true end + + def kill_each_submaster(sig) + @submasters.each do |sm| + pid = sm.pid or next + begin + Process.kill(sig, pid) + rescue => e + @logger.error("#{e.message} killing submaster=#{sm.key.inspect}") + end + end + end end diff --git a/test/test_rack_proxy.rb b/test/test_rack_proxy.rb new file mode 100644 index 0000000..c202289 --- /dev/null +++ b/test/test_rack_proxy.rb @@ -0,0 +1,96 @@ +# Copyright (C) 2017 all contributors +# License: GPL-3.0+ +# frozen_string_literal: true +require_relative 'server_helper' +require 'rbconfig' +begin + require 'kcar' +rescue LoadError +end + +class TestRackProxy < Testcase + ENV["N"].to_i > 1 and parallelize_me! + include ServerHelper + alias teardown server_helper_teardown + + def setup + server_helper_setup + skip "kcar missing yahns/proxy_pass" unless defined?(Kcar) + end + + def test_shorthand_systemd_emulation + RUBY_VERSION.to_f > 2.3 or skip 'Ruby 2.3+ needed to test passing fd=3' + ru = tmpfile(%w(test_rack_proxy .ru)) + ru.write("require 'rack/lobster'; run Rack::Lobster.new\n") + cmd = %W(#{RbConfig.ruby} -I lib bin/yahns-rackup -E none + -O listen=inherit #{ru.path}) + err, cfg, host, port = @err, Yahns::Config.new, @srv.addr[3], @srv.addr[1] + pid = mkserver(cfg) do + cfg.instance_eval do + app(:rack_proxy, { :pass => cmd }) { listen "#{host}:#{port}" } + stderr_path err.path + end + end + + Net::HTTP.start(host, port) do |http| + req = Net::HTTP::Get.new("/") + res = http.request(req) + assert_equal 200, res.code.to_i + end + ensure + return unless pid + Process.kill(:QUIT, pid) + _, status = Process.waitpid2(pid) + assert status.success?, status.inspect + end + + def test_respawn + ru = tmpfile(%w(test_rack_proxy .ru)) + ru.write('run(lambda { |env| ' \ + '[ 200, [ %w(Content-Type text/plain)], [ "#$$" ]]})') + _, cfg, host, port = @err, Yahns::Config.new, @srv.addr[3], @srv.addr[1] + Dir.mktmpdir do |dir| + upath = "#{dir}/u.sock" + cmd = %W(#{RbConfig.ruby} -I lib bin/yahns-rackup -E none + -O listen=#{upath} #{ru.path}) + mapping = { [ :pass, "unix:#{upath}" ] => cmd } + begin + r, w = IO.pipe + pid = mkserver(cfg) do + cfg.instance_eval do + app(:rack_proxy, mapping) { listen "#{host}:#{port}" } + end + $stderr.reopen(w) + r.close + end + Timeout.timeout(10) do + Thread.pass until r.gets =~ %r{listening on addr=.*/u\.sock} + end + u = UNIXSocket.new(upath) + u.write("GET /\r\n\r\n") + res = u.read + backend = res.to_i + assert_operator backend, :>, 0 + u.close + assert_equal 1, Process.kill(0, backend) + Process.kill(:QUIT, backend) + + Timeout.timeout(10) do + Thread.pass until r.gets =~ %r{listening on addr=.*/u\.sock} + end + u = UNIXSocket.new(upath) + u.write("GET /\r\n\r\n") + res = u.read + respawned = res.to_i + assert_operator respawned, :>, 0 + assert_operator respawned, :!=, backend + ensure + [ r, w ].compact.map(&:close) + return unless pid + Process.kill(:QUIT, pid) + _, status = Process.waitpid2(pid) + assert status.success?, status.inspect + end + end + end +end -- EW