From: Eric Wong <yahns-public@yhbt.net>
To: yahns-public@yhbt.net
Subject: [PATCH] rack_proxy: initial implementation
Date: Fri, 7 Apr 2017 03:42:15 +0000 [thread overview]
Message-ID: <20170407034215.25377-1-yahns-public@yhbt.net> (raw)
From: Eric Wong <e@80x24.org>
Needs more tests, docs, and such...
---
lib/yahns.rb | 3 +
lib/yahns/config.rb | 16 ++++-
lib/yahns/http_context.rb | 6 ++
lib/yahns/rack_proxy.rb | 156 ++++++++++++++++++++++++++++++++++++++++++++++
lib/yahns/server.rb | 1 +
lib/yahns/server_mp.rb | 55 +++++++++++++---
test/test_rack_proxy.rb | 96 ++++++++++++++++++++++++++++
7 files changed, 323 insertions(+), 10 deletions(-)
create mode 100644 lib/yahns/rack_proxy.rb
create mode 100644 test/test_rack_proxy.rb
diff --git a/lib/yahns.rb b/lib/yahns.rb
index a0abe49..8c66d14 100644
--- a/lib/yahns.rb
+++ b/lib/yahns.rb
@@ -59,6 +59,9 @@ module Yahns
ClientTimeout = Class.new(RuntimeError) # :nodoc:
+ # for rack_proxy, maybe others...
+ Submaster = Struct.new(:key, :cmd, :pid) # :nodoc:
+
# try to use the monotonic clock in Ruby >= 2.1, it is immune to clock
# offset adjustments and generates less garbage (Float vs Time object)
begin
diff --git a/lib/yahns/config.rb b/lib/yahns/config.rb
index bcea0d4..4650665 100644
--- a/lib/yahns/config.rb
+++ b/lib/yahns/config.rb
@@ -313,7 +313,7 @@ def client_expire_threshold(val)
@set[var] = val
end
- # type = :rack
+ # type = :rack, :rack_proxy
def app(type, *args, &block)
var = _check_in_block(nil, :app)
file = "yahns/#{type.to_s}"
@@ -436,7 +436,19 @@ def commit!(server)
server.__send__("#{var}=", val) if val != :unset
end
- @app_ctx.each { |app| app.logger ||= server.logger }
+ # count extra submasters for rack_proxy (and maybe others)
+ submasters = []
+ @app_ctx.each do |ctx|
+ ctx.logger ||= server.logger
+ next unless ctx.respond_to?(:submasters)
+ sm = ctx.submasters and submasters.concat(sm)
+ end
+ unless submasters.empty?
+ wp = @set[:worker_processes]
+ wp = 1 if wp == :unset # gotta have at least one worker for submasters
+ server.__send__(:worker_processes=, wp)
+ server.instance_variable_set(:@submasters, submasters)
+ end
end
def register_inherited(name)
diff --git a/lib/yahns/http_context.rb b/lib/yahns/http_context.rb
index 40f2c58..6c18fb8 100644
--- a/lib/yahns/http_context.rb
+++ b/lib/yahns/http_context.rb
@@ -91,4 +91,10 @@ def tmpio_for(len, env)
end
tmp
end
+
+ # returns an array of Submaster structs or nil,
+ # only Yahns::RackProxy uses this at the moment
+ def submasters # :nodoc:
+ @yahns_rack.respond_to?(:submasters) ? @yahns_rack.submasters : nil
+ end
end
diff --git a/lib/yahns/rack_proxy.rb b/lib/yahns/rack_proxy.rb
new file mode 100644
index 0000000..7144903
--- /dev/null
+++ b/lib/yahns/rack_proxy.rb
@@ -0,0 +1,156 @@
+# -*- encoding: binary -*-
+# Copyright (C) 2017 all contributors <yahns-public@yhbt.net>
+# License: GPL-3.0+ <https://www.gnu.org/licenses/gpl-3.0.txt>
+# frozen_string_literal: true
+require_relative 'rack'
+require_relative 'proxy_pass'
+
+# Basically, a lazy way to setup ProxyPass to hand off some (or all)
+# requests to any HTTP server backend (e.g. varnish, etc)
+class Yahns::RackProxy < Yahns::Rack # :nodoc:
+
+ # the key is the destination returned by the top-level config.ru
+ # and the value is a splattable array for spawning another process
+ # via Process.exec
+ # {
+ # # [ key, backend URL, ] => %w(splattable array for Process.exec),
+ # [:pass, 'http://127.0.0.1:9292/' ] => %w(rackup /path/to/config.ru)
+ # [:lsock, 'unix:/path/to/sock' ] => %w(bleh -l /path/to/sock ...)
+ #
+ # # Users of Ruby 2.3+ can shorten their config when
+ # # running systemd-aware daemons which will bind to
+ # # a random TCP port:
+ # :pri => %w(blah -c conf.rb config.ru),
+ # :alt => %w(blah -c /path/to/alt.conf.rb alt.ru),
+ # :psgi => %w(blah foo.psgi),
+ # ...
+ # }
+
+ # By default, proxy all requests by using the :pass return value
+ # Users can selectively process requests for non-buggy code in
+ # the core yahns processes.
+ PROXY_ALL = lambda { |env| :pass } # :nodoc:
+ attr_reader :submasters # :nodoc: see http_context.rb /submasters
+
+ # every declaration of this in yahns_config is unique:
+ def self.instance_key(*args)
+ args.object_id
+ end
+
+ def initialize(mapping = { :pass => %w(true) }, ru = PROXY_ALL, opts = {})
+ sd_env = {
+ 'LISTEN_FDS' => '1',
+ 'LISTEN_PID' => lambda { "#$$" }
+ }
+ @submasters = []
+ case mapping
+ when Hash # multiple HTTP backends running different commands
+ # nothing to do { key: splattable array for Process.spawn }
+ when Array # only one backend
+ mapping = { :pass => mapping }
+ else
+ raise ArgumentError, "#{mapping.inspect} must be an Array or Hash"
+ end
+
+ @proxy_pass_map = {}
+ mapping.each do |key, cmd|
+ case key
+ when Array
+ key, addr, ppopts = key
+ ppopts ||= {}
+ when Symbol # OK
+ ppopts = {}
+ else
+ raise ArgumentError, "#{key.inspect} is not a symbol"
+ end
+ Array === cmd or raise ArgumentError,
+ "#{cmd.inspect} must be a splattable array for Process.exec"
+ @proxy_pass_map[key] and raise ArgumentError,
+ "#{key.inspect} may not be repeated in mapping"
+
+ cmd = cmd.dup
+ if addr
+ env = {}
+ rdr = {}
+ else
+ if RUBY_VERSION.to_f < 2.3 && @submasters.empty? # only warn once
+ warn "Ruby < 2.3 may crash when emulating systemd to pass FDs\n",
+" http://blade.nagaokaut.ac.jp/cgi-bin/scat.rb/ruby/ruby-core/69895\n"
+ end
+
+ # nope, no UNIXServer support, maybe not worth it to deal
+ # with FS perms in containers.
+ # TODO: try TCP Fast Open (Linux)
+ srv = random_tcp_listener(ppopts)
+ addr = srv.addr
+ addr = "http://#{addr[3]}:#{addr[1]}/"
+ env = sd_env
+ rdr = { 3 => srv }
+ end
+
+ # never pass YAHNS_FD to children, they do not inherit what we use
+ # for SIGUSR2 upgrades
+ env['YAHNS_FD'] = nil
+ case cmd[0]
+ when Hash
+ cmd[0] = cmd[0].merge(env)
+ else
+ cmd.unshift(env)
+ end
+
+ rdr[:close_others] = true
+ case cmd[-1]
+ when Hash
+ cmd[-1] = cmd[-1].merge(rdr)
+ else
+ cmd << rdr
+ end
+
+ @submasters << Yahns::Submaster.new(key, cmd)
+ @proxy_pass_map[key] = Yahns::ProxyPass.new(addr, ppopts)
+ end
+ super(ru, opts) # Yahns::Rack#initialize
+ end
+
+ def build_app!
+ super # Yahns::Rack#build_app!
+ proxy_app = @app
+
+ # wrap the (possibly-)user-supplied app
+ @app = lambda do |env|
+ res = proxy_app.call(env)
+
+ # standard Rack responses may be handled in yahns proper:
+ Array === res and return res
+
+ # the response is :pass or another Symbol, not a proper Rack response!
+ # shove the env over to the appropriate Yahns::ProxyPass which
+ # talks to a backend HTTP process:
+ ppass = @proxy_pass_map[res] and return ppass.call(env)
+
+ # oops, user screwed up :<
+ logger = env['rack.logger'] and
+ logger.error("bad response from user-supplied proxy: #{res.inspect}")
+
+ [ 500, [ %w(Content-Type text/plain) ], [] ]
+ end
+ end
+
+ def random_tcp_listener(opts) # TODO: should we support options?
+ srv = TCPServer.new('127.0.0.1', 0) # 0: bind random port
+ srv.close_on_exec = true
+ srv.setsockopt(:SOL_SOCKET, :SO_KEEPALIVE, 1)
+ srv.setsockopt(:IPPROTO_TCP, :TCP_NODELAY, 1)
+
+ # Deferring accepts slows down core yahns, but it's useful for
+ # less-sophisticated upstream (backend) servers:
+ Socket.const_defined?(:TCP_DEFER_ACCEPT) and
+ srv.setsockopt(:IPPROTO_TCP, :TCP_DEFER_ACCEPT, 1)
+
+ srv.listen(1024)
+ srv
+ end
+end
+
+# register ourselves
+Yahns::Config::APP_CLASS[:rack_proxy] = Yahns::RackProxy
diff --git a/lib/yahns/server.rb b/lib/yahns/server.rb
index efd6f05..4260a9d 100644
--- a/lib/yahns/server.rb
+++ b/lib/yahns/server.rb
@@ -37,6 +37,7 @@ def initialize(config)
@user = nil
@queues = []
@wthr = []
+ @submasters = [] # array of Yahns::Submaster structs
end
def sqwakeup(sig)
diff --git a/lib/yahns/server_mp.rb b/lib/yahns/server_mp.rb
index c9cd207..c61d583 100644
--- a/lib/yahns/server_mp.rb
+++ b/lib/yahns/server_mp.rb
@@ -52,13 +52,31 @@ def worker_atfork_internal(worker)
switch_user(*@user) if @user
@user = @workers = nil
__call_hooks(@atfork_child, worker.nr)
- @atfork_child = @atfork_parent = @atfork_prepare = nil
+ @atfork_child = @atfork_parent = @atfork_prepare = @submasters = nil
end
def __call_hooks(ary, worker_nr)
ary.each { |x| x.call(worker_nr) } if ary
end
+ def spawn_missing_submasters
+ @submasters.each do |sm|
+ next if sm.pid # we don't expect dozens of submasters
+ pid = sm.pid = fork
+ if pid
+ @logger.info("submaster=#{sm.key.inspect} pid=#{pid}")
+ else
+ # expand env variables like LISTEN_PID in the child
+ env = sm.cmd[0]
+ env.each { |k,v| env[k] = v.call if v.respond_to?(:call) }
+ exec(*(sm.cmd))
+ end
+ end
+ rescue => e
+ Yahns::Log.exception(@logger, 'spawning submaster', e)
+ exit!
+ end
+
def spawn_missing_workers
worker_nr = -1
until (worker_nr += 1) == @worker_processes
@@ -85,6 +103,7 @@ def spawn_missing_workers
# one-at-a-time time and we'll happily drop signals in case somebody
# is signalling us too often.
def join
+ spawn_missing_submasters
spawn_missing_workers
state = :respawn # :QUIT, :WINCH
proc_name 'master'
@@ -94,14 +113,16 @@ def join
@sev.kgio_wait_readable
@sev.yahns_step
reap_all
- case @sig_queue.shift
+ case sig = @sig_queue.shift
when *EXIT_SIGS # graceful shutdown (twice for non graceful)
@listeners.each(&:close).clear
soft_kill_each_worker("QUIT")
+ kill_each_submaster(sig)
state = :QUIT
when :USR1 # rotate logs
usr1_reopen("master ")
soft_kill_each_worker("USR1")
+ kill_each_submaster(sig)
when :USR2 # exec binary, stay alive in case something went wrong
reexec
when :WINCH
@@ -127,7 +148,10 @@ def join
reexec
end
end while @sig_queue[0]
- maintain_worker_count if state == :respawn
+ if state == :respawn
+ spawn_missing_submasters
+ maintain_worker_count
+ end
rescue => e
Yahns::Log.exception(@logger, "master loop error", e)
end while state != :QUIT || @workers.size > 0
@@ -181,14 +205,29 @@ def reap_all
@reexec_pid = 0
self.pid = @pid.chomp('.oldbin') if @pid
proc_name('master')
- else
- worker = @workers.delete(wpid)
- desc = worker ? "worker=#{worker.nr}" : "(unknown)"
- m = "reaped #{status.inspect} #{desc}"
- status.success? ? @logger.info(m) : @logger.error(m)
+ next
end
+ if worker = @workers.delete(wpid)
+ desc = "worker=#{worker.nr}"
+ elsif sm = @submasters.find { |s| s.pid == wpid }
+ sm.pid = nil
+ desc = "submaster=#{sm.key.inspect}"
+ end
+ m = "reaped #{status.inspect} #{desc || '(unknown)'}"
+ status.success? ? @logger.info(m) : @logger.error(m)
rescue Errno::ECHILD
return
end while true
end
+
+ def kill_each_submaster(sig)
+ @submasters.each do |sm|
+ pid = sm.pid or next
+ begin
+ Process.kill(sig, pid)
+ rescue => e
+ @logger.error("#{e.message} killing submaster=#{sm.key.inspect}")
+ end
+ end
+ end
end
diff --git a/test/test_rack_proxy.rb b/test/test_rack_proxy.rb
new file mode 100644
index 0000000..c202289
--- /dev/null
+++ b/test/test_rack_proxy.rb
@@ -0,0 +1,96 @@
+# Copyright (C) 2017 all contributors <yahns-public@yhbt.net>
+# License: GPL-3.0+ <https://www.gnu.org/licenses/gpl-3.0.txt>
+# frozen_string_literal: true
+require_relative 'server_helper'
+require 'rbconfig'
+begin
+ require 'kcar'
+rescue LoadError
+end
+
+class TestRackProxy < Testcase
+ ENV["N"].to_i > 1 and parallelize_me!
+ include ServerHelper
+ alias teardown server_helper_teardown
+
+ def setup
+ server_helper_setup
+ skip "kcar missing yahns/proxy_pass" unless defined?(Kcar)
+ end
+
+ def test_shorthand_systemd_emulation
+ RUBY_VERSION.to_f > 2.3 or skip 'Ruby 2.3+ needed to test passing fd=3'
+ ru = tmpfile(%w(test_rack_proxy .ru))
+ ru.write("require 'rack/lobster'; run Rack::Lobster.new\n")
+ cmd = %W(#{RbConfig.ruby} -I lib bin/yahns-rackup -E none
+ -O listen=inherit #{ru.path})
+ err, cfg, host, port = @err, Yahns::Config.new, @srv.addr[3], @srv.addr[1]
+ pid = mkserver(cfg) do
+ cfg.instance_eval do
+ app(:rack_proxy, { :pass => cmd }) { listen "#{host}:#{port}" }
+ stderr_path err.path
+ end
+ end
+
+ Net::HTTP.start(host, port) do |http|
+ req = Net::HTTP::Get.new("/")
+ res = http.request(req)
+ assert_equal 200, res.code.to_i
+ end
+ ensure
+ return unless pid
+ Process.kill(:QUIT, pid)
+ _, status = Process.waitpid2(pid)
+ assert status.success?, status.inspect
+ end
+
+ def test_respawn
+ ru = tmpfile(%w(test_rack_proxy .ru))
+ ru.write('run(lambda { |env| ' \
+ '[ 200, [ %w(Content-Type text/plain)], [ "#$$" ]]})')
+ _, cfg, host, port = @err, Yahns::Config.new, @srv.addr[3], @srv.addr[1]
+ Dir.mktmpdir do |dir|
+ upath = "#{dir}/u.sock"
+ cmd = %W(#{RbConfig.ruby} -I lib bin/yahns-rackup -E none
+ -O listen=#{upath} #{ru.path})
+ mapping = { [ :pass, "unix:#{upath}" ] => cmd }
+ begin
+ r, w = IO.pipe
+ pid = mkserver(cfg) do
+ cfg.instance_eval do
+ app(:rack_proxy, mapping) { listen "#{host}:#{port}" }
+ end
+ $stderr.reopen(w)
+ r.close
+ end
+ Timeout.timeout(10) do
+ Thread.pass until r.gets =~ %r{listening on addr=.*/u\.sock}
+ end
+ u = UNIXSocket.new(upath)
+ u.write("GET /\r\n\r\n")
+ res = u.read
+ backend = res.to_i
+ assert_operator backend, :>, 0
+ u.close
+ assert_equal 1, Process.kill(0, backend)
+ Process.kill(:QUIT, backend)
+
+ Timeout.timeout(10) do
+ Thread.pass until r.gets =~ %r{listening on addr=.*/u\.sock}
+ end
+ u = UNIXSocket.new(upath)
+ u.write("GET /\r\n\r\n")
+ res = u.read
+ respawned = res.to_i
+ assert_operator respawned, :>, 0
+ assert_operator respawned, :!=, backend
+ ensure
+ [ r, w ].compact.map(&:close)
+ return unless pid
+ Process.kill(:QUIT, pid)
+ _, status = Process.waitpid2(pid)
+ assert status.success?, status.inspect
+ end
+ end
+ end
+end
--
EW
next reply other threads:[~2017-04-07 3:42 UTC|newest]
Thread overview: 2+ messages / expand[flat|nested] mbox.gz Atom feed top
2017-04-07 3:42 Eric Wong [this message]
2017-04-07 20:35 ` [PATCH] rack_proxy: initial implementation Eric Wong
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
List information: https://yhbt.net/yahns/README
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20170407034215.25377-1-yahns-public@yhbt.net \
--to=yahns-public@yhbt.net \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
Code repositories for project(s) associated with this public inbox
https://yhbt.net/yahns.git/
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).