yahns Ruby server user/dev discussion
 help / color / mirror / code / Atom feed
From: Eric Wong <yahns-public@yhbt.net>
To: yahns-public@yhbt.net
Subject: [PATCH] rack_proxy: initial implementation
Date: Fri,  7 Apr 2017 03:42:15 +0000	[thread overview]
Message-ID: <20170407034215.25377-1-yahns-public@yhbt.net> (raw)

From: Eric Wong <e@80x24.org>

Needs more tests, docs, and such...
---
 lib/yahns.rb              |   3 +
 lib/yahns/config.rb       |  16 ++++-
 lib/yahns/http_context.rb |   6 ++
 lib/yahns/rack_proxy.rb   | 156 ++++++++++++++++++++++++++++++++++++++++++++++
 lib/yahns/server.rb       |   1 +
 lib/yahns/server_mp.rb    |  55 +++++++++++++---
 test/test_rack_proxy.rb   |  96 ++++++++++++++++++++++++++++
 7 files changed, 323 insertions(+), 10 deletions(-)
 create mode 100644 lib/yahns/rack_proxy.rb
 create mode 100644 test/test_rack_proxy.rb

diff --git a/lib/yahns.rb b/lib/yahns.rb
index a0abe49..8c66d14 100644
--- a/lib/yahns.rb
+++ b/lib/yahns.rb
@@ -59,6 +59,9 @@ module Yahns
 
   ClientTimeout = Class.new(RuntimeError) # :nodoc:
 
+  # for rack_proxy, maybe others...
+  Submaster = Struct.new(:key, :cmd, :pid) # :nodoc:
+
   # try to use the monotonic clock in Ruby >= 2.1, it is immune to clock
   # offset adjustments and generates less garbage (Float vs Time object)
   begin
diff --git a/lib/yahns/config.rb b/lib/yahns/config.rb
index bcea0d4..4650665 100644
--- a/lib/yahns/config.rb
+++ b/lib/yahns/config.rb
@@ -313,7 +313,7 @@ def client_expire_threshold(val)
     @set[var] = val
   end
 
-  # type = :rack
+  # type = :rack, :rack_proxy
   def app(type, *args, &block)
     var = _check_in_block(nil, :app)
     file = "yahns/#{type.to_s}"
@@ -436,7 +436,19 @@ def commit!(server)
       server.__send__("#{var}=", val) if val != :unset
     end
 
-    @app_ctx.each { |app| app.logger ||= server.logger }
+    # count extra submasters for rack_proxy (and maybe others)
+    submasters = []
+    @app_ctx.each do |ctx|
+      ctx.logger ||= server.logger
+      next unless ctx.respond_to?(:submasters)
+      sm = ctx.submasters and submasters.concat(sm)
+    end
+    unless submasters.empty?
+      wp = @set[:worker_processes]
+      wp = 1 if wp == :unset # gotta have at least one worker for submasters
+      server.__send__(:worker_processes=, wp)
+      server.instance_variable_set(:@submasters, submasters)
+    end
   end
 
   def register_inherited(name)
diff --git a/lib/yahns/http_context.rb b/lib/yahns/http_context.rb
index 40f2c58..6c18fb8 100644
--- a/lib/yahns/http_context.rb
+++ b/lib/yahns/http_context.rb
@@ -91,4 +91,10 @@ def tmpio_for(len, env)
     end
     tmp
   end
+
+  # returns an array of Submaster structs or nil,
+  # only Yahns::RackProxy uses this at the moment
+  def submasters # :nodoc:
+    @yahns_rack.respond_to?(:submasters) ? @yahns_rack.submasters : nil
+  end
 end
diff --git a/lib/yahns/rack_proxy.rb b/lib/yahns/rack_proxy.rb
new file mode 100644
index 0000000..7144903
--- /dev/null
+++ b/lib/yahns/rack_proxy.rb
@@ -0,0 +1,156 @@
+# -*- encoding: binary -*-
+# Copyright (C) 2017 all contributors <yahns-public@yhbt.net>
+# License: GPL-3.0+ <https://www.gnu.org/licenses/gpl-3.0.txt>
+# frozen_string_literal: true
+require_relative 'rack'
+require_relative 'proxy_pass'
+
+# Basically, a lazy way to setup ProxyPass to hand off some (or all)
+# requests to any HTTP server backend (e.g. varnish, etc)
+class Yahns::RackProxy < Yahns::Rack # :nodoc:
+
+  # the key is the destination returned by the top-level config.ru
+  # and the value is a splattable array for spawning another process
+  # via Process.exec
+  # {
+  #   # [ key, backend URL, ]  => %w(splattable array for Process.exec),
+  #   [:pass, 'http://127.0.0.1:9292/' ] => %w(rackup /path/to/config.ru)
+  #   [:lsock, 'unix:/path/to/sock' ] => %w(bleh -l /path/to/sock ...)
+  #
+  #   # Users of Ruby 2.3+ can shorten their config when
+  #   # running systemd-aware daemons which will bind to
+  #   # a random TCP port:
+  #   :pri => %w(blah -c conf.rb config.ru),
+  #   :alt => %w(blah -c /path/to/alt.conf.rb alt.ru),
+  #   :psgi => %w(blah foo.psgi),
+  #   ...
+  # }
+
+  # By default, proxy all requests by using the :pass return value
+  # Users can selectively process requests for non-buggy code in
+  # the core yahns processes.
+  PROXY_ALL = lambda { |env| :pass } # :nodoc:
+  attr_reader :submasters # :nodoc: see http_context.rb /submasters
+
+  # every declaration of this in yahns_config is unique:
+  def self.instance_key(*args)
+    args.object_id
+  end
+
+  def initialize(mapping = { :pass => %w(true) }, ru = PROXY_ALL, opts = {})
+    sd_env = {
+     'LISTEN_FDS' => '1',
+     'LISTEN_PID' => lambda { "#$$" }
+    }
+    @submasters = []
+    case mapping
+    when Hash # multiple HTTP backends running different commands
+      # nothing to do  { key: splattable array for Process.spawn }
+    when Array # only one backend
+      mapping = { :pass => mapping }
+    else
+      raise ArgumentError, "#{mapping.inspect} must be an Array or Hash"
+    end
+
+    @proxy_pass_map = {}
+    mapping.each do |key, cmd|
+      case key
+      when Array
+        key, addr, ppopts = key
+        ppopts ||= {}
+      when Symbol # OK
+        ppopts = {}
+      else
+        raise ArgumentError, "#{key.inspect} is not a symbol"
+      end
+      Array === cmd or raise ArgumentError,
+                "#{cmd.inspect} must be a splattable array for Process.exec"
+      @proxy_pass_map[key] and raise ArgumentError,
+                "#{key.inspect} may not be repeated in mapping"
+
+      cmd = cmd.dup
+      if addr
+        env = {}
+        rdr = {}
+      else
+        if RUBY_VERSION.to_f < 2.3 && @submasters.empty? # only warn once
+           warn "Ruby < 2.3 may crash when emulating systemd to pass FDs\n",
+" http://blade.nagaokaut.ac.jp/cgi-bin/scat.rb/ruby/ruby-core/69895\n"
+        end
+
+        # nope, no UNIXServer support, maybe not worth it to deal
+        # with FS perms in containers.
+        # TODO: try TCP Fast Open (Linux)
+        srv = random_tcp_listener(ppopts)
+        addr = srv.addr
+        addr = "http://#{addr[3]}:#{addr[1]}/"
+        env = sd_env
+        rdr = { 3 => srv }
+      end
+
+      # never pass YAHNS_FD to children, they do not inherit what we use
+      # for SIGUSR2 upgrades
+      env['YAHNS_FD'] = nil
+      case cmd[0]
+      when Hash
+        cmd[0] = cmd[0].merge(env)
+      else
+        cmd.unshift(env)
+      end
+
+      rdr[:close_others] = true
+      case cmd[-1]
+      when Hash
+        cmd[-1] = cmd[-1].merge(rdr)
+      else
+        cmd << rdr
+      end
+
+      @submasters << Yahns::Submaster.new(key, cmd)
+      @proxy_pass_map[key] = Yahns::ProxyPass.new(addr, ppopts)
+    end
+    super(ru, opts) # Yahns::Rack#initialize
+  end
+
+  def build_app!
+    super # Yahns::Rack#build_app!
+    proxy_app = @app
+
+    # wrap the (possibly-)user-supplied app
+    @app = lambda do |env|
+      res = proxy_app.call(env)
+
+      # standard Rack responses may be handled in yahns proper:
+      Array === res and return res
+
+      # the response is :pass or another Symbol, not a proper Rack response!
+      # shove the env over to the appropriate Yahns::ProxyPass which
+      # talks to a backend HTTP process:
+      ppass = @proxy_pass_map[res] and return ppass.call(env)
+
+      # oops, user screwed up :<
+      logger = env['rack.logger'] and
+        logger.error("bad response from user-supplied proxy: #{res.inspect}")
+
+      [ 500, [ %w(Content-Type text/plain) ], [] ]
+    end
+  end
+
+  def random_tcp_listener(opts) # TODO: should we support options?
+    srv = TCPServer.new('127.0.0.1', 0) # 0: bind random port
+    srv.close_on_exec = true
+    srv.setsockopt(:SOL_SOCKET, :SO_KEEPALIVE, 1)
+    srv.setsockopt(:IPPROTO_TCP, :TCP_NODELAY, 1)
+
+    # Deferring accepts slows down core yahns, but it's useful for
+    # less-sophisticated upstream (backend) servers:
+    Socket.const_defined?(:TCP_DEFER_ACCEPT) and
+      srv.setsockopt(:IPPROTO_TCP, :TCP_DEFER_ACCEPT, 1)
+
+    srv.listen(1024)
+    srv
+  end
+end
+
+# register ourselves
+Yahns::Config::APP_CLASS[:rack_proxy] = Yahns::RackProxy
diff --git a/lib/yahns/server.rb b/lib/yahns/server.rb
index efd6f05..4260a9d 100644
--- a/lib/yahns/server.rb
+++ b/lib/yahns/server.rb
@@ -37,6 +37,7 @@ def initialize(config)
     @user = nil
     @queues = []
     @wthr = []
+    @submasters = [] # array of Yahns::Submaster structs
   end
 
   def sqwakeup(sig)
diff --git a/lib/yahns/server_mp.rb b/lib/yahns/server_mp.rb
index c9cd207..c61d583 100644
--- a/lib/yahns/server_mp.rb
+++ b/lib/yahns/server_mp.rb
@@ -52,13 +52,31 @@ def worker_atfork_internal(worker)
     switch_user(*@user) if @user
     @user = @workers = nil
     __call_hooks(@atfork_child, worker.nr)
-    @atfork_child = @atfork_parent = @atfork_prepare = nil
+    @atfork_child = @atfork_parent = @atfork_prepare = @submasters = nil
   end
 
   def __call_hooks(ary, worker_nr)
     ary.each { |x| x.call(worker_nr) } if ary
   end
 
+  def spawn_missing_submasters
+    @submasters.each do |sm|
+      next if sm.pid # we don't expect dozens of submasters
+      pid = sm.pid = fork
+      if pid
+        @logger.info("submaster=#{sm.key.inspect} pid=#{pid}")
+      else
+        # expand env variables like LISTEN_PID in the child
+        env = sm.cmd[0]
+        env.each { |k,v| env[k] = v.call if v.respond_to?(:call) }
+        exec(*(sm.cmd))
+      end
+    end
+  rescue => e
+    Yahns::Log.exception(@logger, 'spawning submaster', e)
+    exit!
+  end
+
   def spawn_missing_workers
     worker_nr = -1
     until (worker_nr += 1) == @worker_processes
@@ -85,6 +103,7 @@ def spawn_missing_workers
   # one-at-a-time time and we'll happily drop signals in case somebody
   # is signalling us too often.
   def join
+    spawn_missing_submasters
     spawn_missing_workers
     state = :respawn # :QUIT, :WINCH
     proc_name 'master'
@@ -94,14 +113,16 @@ def join
       @sev.kgio_wait_readable
       @sev.yahns_step
       reap_all
-      case @sig_queue.shift
+      case sig = @sig_queue.shift
       when *EXIT_SIGS # graceful shutdown (twice for non graceful)
         @listeners.each(&:close).clear
         soft_kill_each_worker("QUIT")
+        kill_each_submaster(sig)
         state = :QUIT
       when :USR1 # rotate logs
         usr1_reopen("master ")
         soft_kill_each_worker("USR1")
+        kill_each_submaster(sig)
       when :USR2 # exec binary, stay alive in case something went wrong
         reexec
       when :WINCH
@@ -127,7 +148,10 @@ def join
           reexec
         end
       end while @sig_queue[0]
-      maintain_worker_count if state == :respawn
+      if state == :respawn
+        spawn_missing_submasters
+        maintain_worker_count
+      end
     rescue => e
       Yahns::Log.exception(@logger, "master loop error", e)
     end while state != :QUIT || @workers.size > 0
@@ -181,14 +205,29 @@ def reap_all
         @reexec_pid = 0
         self.pid = @pid.chomp('.oldbin') if @pid
         proc_name('master')
-      else
-        worker = @workers.delete(wpid)
-        desc = worker ? "worker=#{worker.nr}" : "(unknown)"
-        m = "reaped #{status.inspect} #{desc}"
-        status.success? ? @logger.info(m) : @logger.error(m)
+        next
       end
+      if worker = @workers.delete(wpid)
+        desc = "worker=#{worker.nr}"
+      elsif sm = @submasters.find { |s| s.pid == wpid }
+        sm.pid = nil
+        desc = "submaster=#{sm.key.inspect}"
+      end
+      m = "reaped #{status.inspect} #{desc || '(unknown)'}"
+      status.success? ? @logger.info(m) : @logger.error(m)
     rescue Errno::ECHILD
       return
     end while true
   end
+
+  def kill_each_submaster(sig)
+    @submasters.each do |sm|
+      pid = sm.pid or next
+      begin
+        Process.kill(sig, pid)
+      rescue => e
+        @logger.error("#{e.message} killing submaster=#{sm.key.inspect}")
+      end
+    end
+  end
 end
diff --git a/test/test_rack_proxy.rb b/test/test_rack_proxy.rb
new file mode 100644
index 0000000..c202289
--- /dev/null
+++ b/test/test_rack_proxy.rb
@@ -0,0 +1,96 @@
+# Copyright (C) 2017 all contributors <yahns-public@yhbt.net>
+# License: GPL-3.0+ <https://www.gnu.org/licenses/gpl-3.0.txt>
+# frozen_string_literal: true
+require_relative 'server_helper'
+require 'rbconfig'
+begin
+  require 'kcar'
+rescue LoadError
+end
+
+class TestRackProxy < Testcase
+  ENV["N"].to_i > 1 and parallelize_me!
+  include ServerHelper
+  alias teardown server_helper_teardown
+
+  def setup
+    server_helper_setup
+    skip "kcar missing yahns/proxy_pass" unless defined?(Kcar)
+  end
+
+  def test_shorthand_systemd_emulation
+    RUBY_VERSION.to_f > 2.3 or skip 'Ruby 2.3+ needed to test passing fd=3'
+    ru = tmpfile(%w(test_rack_proxy .ru))
+    ru.write("require 'rack/lobster'; run Rack::Lobster.new\n")
+    cmd = %W(#{RbConfig.ruby} -I lib bin/yahns-rackup -E none
+            -O listen=inherit #{ru.path})
+    err, cfg, host, port = @err, Yahns::Config.new, @srv.addr[3], @srv.addr[1]
+    pid = mkserver(cfg) do
+      cfg.instance_eval do
+        app(:rack_proxy, { :pass => cmd }) { listen "#{host}:#{port}" }
+        stderr_path err.path
+      end
+    end
+
+    Net::HTTP.start(host, port) do |http|
+      req = Net::HTTP::Get.new("/")
+      res = http.request(req)
+      assert_equal 200, res.code.to_i
+    end
+  ensure
+    return unless pid
+    Process.kill(:QUIT, pid)
+    _, status = Process.waitpid2(pid)
+    assert status.success?, status.inspect
+  end
+
+  def test_respawn
+    ru = tmpfile(%w(test_rack_proxy .ru))
+    ru.write('run(lambda { |env| ' \
+        '[ 200, [ %w(Content-Type text/plain)], [ "#$$" ]]})')
+    _, cfg, host, port = @err, Yahns::Config.new, @srv.addr[3], @srv.addr[1]
+    Dir.mktmpdir do |dir|
+      upath = "#{dir}/u.sock"
+      cmd = %W(#{RbConfig.ruby} -I lib bin/yahns-rackup -E none
+            -O listen=#{upath} #{ru.path})
+      mapping = { [ :pass, "unix:#{upath}" ] => cmd }
+      begin
+        r, w = IO.pipe
+        pid = mkserver(cfg) do
+          cfg.instance_eval do
+            app(:rack_proxy, mapping) { listen "#{host}:#{port}" }
+          end
+          $stderr.reopen(w)
+          r.close
+        end
+        Timeout.timeout(10) do
+          Thread.pass until r.gets =~ %r{listening on addr=.*/u\.sock}
+        end
+        u = UNIXSocket.new(upath)
+        u.write("GET /\r\n\r\n")
+        res = u.read
+        backend = res.to_i
+        assert_operator backend, :>, 0
+        u.close
+        assert_equal 1, Process.kill(0, backend)
+        Process.kill(:QUIT, backend)
+
+        Timeout.timeout(10) do
+          Thread.pass until r.gets =~ %r{listening on addr=.*/u\.sock}
+        end
+        u = UNIXSocket.new(upath)
+        u.write("GET /\r\n\r\n")
+        res = u.read
+        respawned = res.to_i
+        assert_operator respawned, :>, 0
+        assert_operator respawned, :!=, backend
+      ensure
+        [ r, w ].compact.map(&:close)
+        return unless pid
+        Process.kill(:QUIT, pid)
+        _, status = Process.waitpid2(pid)
+        assert status.success?, status.inspect
+      end
+    end
+  end
+end
-- 
EW


             reply	other threads:[~2017-04-07  3:42 UTC|newest]

Thread overview: 2+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2017-04-07  3:42 Eric Wong [this message]
2017-04-07 20:35 ` [PATCH] rack_proxy: initial implementation Eric Wong

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

  List information: https://yhbt.net/yahns/README

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20170407034215.25377-1-yahns-public@yhbt.net \
    --to=yahns-public@yhbt.net \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
Code repositories for project(s) associated with this public inbox

	https://yhbt.net/yahns.git/

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).