yahns.git  about / heads / tags
sleepy, multi-threaded, non-blocking application server for Ruby
blob 441d3f95d51ebba57ce3ab4caea425ec6c4c9b2e 13583 bytes (raw)
$ git show HEAD:lib/yahns/config.rb	# shows this blob on the CLI

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
 
# -*- encoding: binary -*-
# Copyright (C) 2013-2016 all contributors <yahns-public@yhbt.net>
# License: GPL-3.0+ (https://www.gnu.org/licenses/gpl-3.0.txt)
# frozen_string_literal: true
#
# Implements a DSL for configuring a yahns server.
# See https://yhbt.net/yahns.git/tree/examples/yahns_multi.conf.rb
# for a full example configuration file.
class Yahns::Config # :nodoc:
  # public within yahns itself, NOT a public interface for users outside
  # of yahns.  See yahns/rack for usage example
  APP_CLASS = {}

  CfgBlock = Struct.new(:type, :ctx) # :nodoc:
  attr_reader :config_file, :config_listeners, :set
  attr_reader :qeggs, :app_ctx

  def initialize(config_file = nil)
    @config_file = config_file
    @block = nil
    config_reload!

    # FIXME: we shouldn't have this at all when we go Unicorn 5-only
    Unicorn::HttpParser.respond_to?(:keepalive_requests=) and
      Unicorn::HttpParser.keepalive_requests = 0xffffffff
  end

  def _check_in_block(ctx, var)
    if ctx.nil?
      return var if @block.nil?
      msg = "#{var} must be called outside of #{@block.type}"
    else
      ctx = Array(ctx)
      return var if @block && ctx.include?(@block.type)
      msg = @block ? "may not be used inside a #{@block.type} block" :
                     "must be used with a #{ctx.join(' or ')} block"
    end
    raise ArgumentError, msg
  end

  def postfork_cleanup
    @app_ctx = @set = @qeggs = @app_instances = @config_file = nil
  end

  def config_reload! #:nodoc:
    # app_instance:app_ctx is a 1:N relationship
    @config_listeners = {} # name/address -> options
    @app_ctx = []
    @set = Hash.new(:unset)
    @qeggs = Hash.new { |h,k| h[k] = Yahns::QueueEgg.new }
    @app_instances = {}

    # set defaults:
    client_expire_threshold(0.5) # default is half of the open file limit

    instance_eval(File.read(@config_file), @config_file) if @config_file

    # working_directory binds immediately (easier error checking that way),
    # now ensure any paths we changed are correctly set.
    [ :pid, :stderr_path, :stdout_path ].each do |var|
      String === (path = @set[var]) or next
      path = File.expand_path(path)
      File.writable?(path) || File.writable?(File.dirname(path)) or \
            raise ArgumentError, "directory for #{var}=#{path} not writable"
    end
  end

  def logger(obj)
    var = :logger
    %w(debug info warn error fatal).each do |m|
      obj.respond_to?(m) and next
      raise ArgumentError, "#{var}=#{obj} does not respond to method=#{m}"
    end
    if @block
      if @block.ctx.respond_to?(:logger=)
        @block.ctx.logger = obj
      else
        raise ArgumentError, "#{var} not valid inside #{@block.type}"
      end
    else
      @set[var] = obj
    end
  end

  def shutdown_timeout(sec)
    var = _check_in_block(nil, :shutdown_timeout)
    @set[var] = _check_num(var, sec, 0)
  end

  def worker_processes(nr, &blk)
    var =_check_in_block(nil, :worker_processes)
    @set[var] = _check_int(var, nr, 1)
    if block_given?
      @block = CfgBlock.new(var, nil)
      instance_eval(&blk)
      @block = nil
    end
  end

  %w(atfork_prepare atfork_parent atfork_child).each do |fn|
    eval(
    "def #{fn}(*args, &blk);" \
    "  _check_in_block([:worker_processes,:app], :#{fn});" \
    "  _add_hook(:#{fn}, block_given? ? blk : args[0]);" \
    'end'
    )
  end

  def before_exec(&blk)
    var = _check_in_block(nil, :before_exec)
    @set[var] = (block_given? ? blk : args[0])
  end

  def _add_hook(var, my_proc)
    Proc === my_proc or
      raise ArgumentError, "invalid type: #{var}=#{my_proc.inspect}"

    # this sets:
    # :atfork_prepare, :atfork_parent, :atfork_child
    key = var.to_sym
    @set[key] = [] unless @set.include?(key)
    @set[key] << my_proc
  end

  # sets the +path+ for the PID file of the yahns master process
  def pid(path)
    _set_path(:pid, path)
  end

  def stderr_path(path)
    _set_path(:stderr_path, path)
  end

  def stdout_path(path)
    _set_path(:stdout_path, path)
  end

  def value(var)
    val = @set[var]
    val == :unset ? nil : val
  end

  # sets the working directory for yahns.  This ensures SIGUSR2 will
  # start a new instance of yahns in this directory.  This may be
  # a symlink, a common scenario for Capistrano users.  Unlike
  # all other yahns configuration directives, this binds immediately
  # for error checking and cannot be undone by unsetting it in the
  # configuration file and reloading.
  def working_directory(path)
    var = _check_in_block(nil, :working_directory)
    @app_ctx.empty? or
      raise ArgumentError, "#{var} must be declared before any apps"

    # just let chdir raise errors
    path = File.expand_path(path)
    if @config_file &&
       @config_file[0] != ?/ &&
       ! File.readable?("#{path}/#@config_file")
      raise ArgumentError,
            "config_file=#@config_file would not be accessible in" \
            " #{var}=#{path}"
    end
    Dir.chdir(path)
    @set[var] = ENV["PWD"] = path
  end

  # Runs worker processes as the specified +user+ and +group+.
  # The master process always stays running as the user who started it.
  # This switch will occur after calling the after_fork hooks, and only
  # if the Worker#user method is not called in the after_fork hooks
  # +group+ is optional and will not change if unspecified.
  def user(user, group = nil)
    var = _check_in_block(nil, :user)
    @block and raise "#{var} is not valid inside #{@block.type}"
    # raises ArgumentError on invalid user/group
    Etc.getpwnam(user)
    Etc.getgrnam(group) if group
    @set[var] = [ user, group ]
  end

  def _set_path(var, path) #:nodoc:
    _check_in_block(nil, var)
    case path
    when NilClass, String
      @set[var] = path
    else
      raise ArgumentError
    end
  end

  def listen(address, options = {})
    options = options.dup
    var = _check_in_block(:app, :listen)
    address = expand_addr(address)
    String === address or
      raise ArgumentError, "address=#{address.inspect} must be a string"
    [ :umask, :backlog ].each do |key|
      # :backlog may be negative on some OSes
      value = options[key] or next
      Integer === value or
        raise ArgumentError, "#{var}: not an integer: #{key}=#{value.inspect}"
    end
    [ :sndbuf, :rcvbuf, :threads ].each do |key|
       value = options[key] and _check_int(key, value, 1)
    end

    [ :ipv6only, :reuseport ].each do |key|
      (value = options[key]).nil? and next
      [ true, false ].include?(value) or
        raise ArgumentError, "#{var}: not boolean: #{key}=#{value.inspect}"
    end

    require_relative('openssl_server') if options[:ssl_ctx]

    options[:yahns_app_ctx] = @block.ctx
    @config_listeners.include?(address) and
      raise ArgumentError, "listen #{address} already in use"
    @config_listeners[address] = options
  end

  # expands "unix:path/to/foo" to a socket relative to the current path
  # expands pathnames of sockets if relative to "~" or "~username"
  # expands "*:port and ":port" to "0.0.0.0:port"
  def expand_addr(address) #:nodoc:
    return "0.0.0.0:#{address}" if Integer === address
    return address unless String === address

    case address
    when %r{\Aunix:(.*)\z}
      File.expand_path($1)
    when %r{\A~}
      File.expand_path(address)
    when %r{\A(?:\*:)?(\d+)\z}
      "0.0.0.0:#$1"
    when %r{\A\[([a-fA-F0-9:]+)\]\z}, %r/\A((?:\d+\.){3}\d+)\z/
      canonicalize_tcp($1, 80)
    when %r{\A\[([a-fA-F0-9:]+)\]:(\d+)\z}, %r{\A(.*):(\d+)\z}
      canonicalize_tcp($1, $2.to_i)
    else
      address
    end
  end

  def canonicalize_tcp(addr, port)
    packed = Socket.pack_sockaddr_in(port, addr)
    port, addr = Socket.unpack_sockaddr_in(packed)
    addr.include?(':') ? "[#{addr}]:#{port}" : "#{addr}:#{port}"
  end

  def queue(*args, &block)
    var = :queue
    prev_block = @block
    if prev_block
      _check_in_block(:app, var)
      if block_given?
        args.size == 0 or
          raise ArgumentError,
                "queues defined with a block inside app must not have names"
        name = @block
      else
        name = args[0] or
          raise ArgumentError, "queue must be given a name if no block given"
      end
    else
      name = args[0] || :default
    end
    args.size > 1 and
      raise ArgumentError, "queue only takes one name argument"
    qegg = @qeggs[name]
    if block_given?
      @block = CfgBlock.new(:queue, qegg)
      instance_eval(&block)
      @block = prev_block
    end

    # associate the queue if we're inside an app
    prev_block.ctx.qegg = qegg if prev_block
  end

  # queue parameters (Yahns::QueueEgg)
  %w(max_events worker_threads).each do |_v|
    eval(
    "def #{_v}(val);" \
    "  _check_in_block(:queue, :#{_v});" \
    "  @block.ctx.__send__(%Q(#{_v}=), _check_int(:#{_v}, val, 1));" \
    'end'
    )
  end

  def _check_int(var, n, min)
    Integer === n or raise ArgumentError, "not an integer: #{var}=#{n.inspect}"
    n >= min or raise ArgumentError, "too low (< #{min}): #{var}=#{n.inspect}"
    n
  end

  def _check_num(var, n, min)
    Numeric === n or raise ArgumentError, "not a number: #{var}=#{n.inspect}"
    n >= min or raise ArgumentError, "too low (< #{min}): #{var}=#{n.inspect}"
    n
  end

  # global
  def client_expire_threshold(val)
    var = _check_in_block(nil, :client_expire_threshold)
    case val
    when Float
      (val > 0 && val <= 1.0) or
        raise ArgumentError, "#{var} must be > 0 and <= 1.0 if a ratio"
    when Integer
    else
      raise ArgumentError, "#{var} must be a float or integer"
    end
    @set[var] = val
  end

  # type = :rack
  def app(type, *args, &block)
    var = _check_in_block(nil, :app)
    file = "yahns/#{type.to_s}"
    begin
      require file
    rescue LoadError => e
      raise ArgumentError, "#{type.inspect} is not a supported app type",
            e.backtrace
    end
    klass = APP_CLASS[type] or
      raise TypeError,
        "#{var}: #{file} did not register #{type} in #{self.class}::APP_CLASS"

    # apps may have multiple configurator contexts
    app_cfg = @app_instances[klass.instance_key(*args)] = klass.new(*args)
    ctx = app_cfg.config_context
    if block_given?
      @block = CfgBlock.new(:app, ctx)
      instance_eval(&block)
      @block = nil
    end
    @app_ctx << ctx
  end

  def _check_bool(var, val)
    return val if [ true, false ].include?(val)
    raise ArgumentError, "#{var} must be boolean"
  end

  # boolean config directives for app
  %w(check_client_connection persistent_connections).each do |_v|
    eval(
    "def #{_v}(bool);" \
    "  _check_in_block(:app, :#{_v});" \
    "  @block.ctx.__send__(%Q(#{_v}=), _check_bool(:#{_v}, bool));" \
    'end'
    )
  end

  def output_buffering(bool, opts = {})
    var = _check_in_block(:app, :output_buffering)
    @block.ctx.__send__("#{var}=", _check_bool(var, bool))
    tmpdir = opts[:tmpdir] and
      @block.ctx.output_buffer_tmpdir = _check_tmpdir(var, tmpdir)
  end

  def _check_tmpdir(var, path)
    File.directory?(path) or
      raise ArgumentError, "#{var} tmpdir: #{path} is not a directory"
    File.writable?(path) or
      raise ArgumentError, "#{var} tmpdir: #{path} is not writable"
    path
  end

  # integer config directives for app
  {
    # config name, minimum value
    client_body_buffer_size: 1,
    client_header_buffer_size: 1,
  }.each do |_v,minval|
    eval(
    "def #{_v}(val);" \
    "  _check_in_block(:app, :#{_v});" \
    "  @block.ctx.__send__(%Q(#{_v}=), _check_int(:#{_v}, val, #{minval}));" \
    'end'
    )
  end

  def client_timeout(val)
    var = _check_in_block(:app, :client_timeout)
    @block.ctx.__send__("#{var}=", _check_num(var, val, 0))
  end

  def client_max_body_size(val)
    var = _check_in_block(:app, :client_max_body_size)
    val = _check_int(var, val, 0) if val != nil
    @block.ctx.__send__("#{var}=", val)
  end

  def input_buffering(val, opts = {})
    var = _check_in_block(:app, :input_buffering)
    ok = [ :lazy, true, false ]
    ok.include?(val) or
      raise ArgumentError, "`#{var}' must be one of: #{ok.inspect}"
    @block.ctx.__send__("#{var}=", val)
    tmpdir = opts[:tmpdir] and
      @block.ctx.input_buffer_tmpdir = _check_tmpdir(var, tmpdir)
  end

  # used to configure rack.errors destination
  def errors(val)
    var = _check_in_block(:app, :errors)
    if String === val
      # we've already bound working_directory by the time we get here
      val = File.open(File.expand_path(val), "ab")
      val.sync = true
    else
      rt = [ :puts, :write, :flush ] # match Rack::Lint
      rt.all? { |m| val.respond_to?(m) } or raise ArgumentError,
                   "`#{var}' destination must respond to all of: #{rt.inspect}"
    end
    @block.ctx.__send__("#{var}=", val)
  end

  def commit!(server)
    # redirect IOs
    { stdout_path: $stdout, stderr_path: $stderr }.each do |key, io|
      path = @set[key]
      if path == :unset && server.daemon_pipe
        @set[key] = path = "/dev/null"
      end
      File.open(path, 'a') { |fp| io.reopen(fp) } if String === path
      io.sync = true
    end

    [ :logger, :pid, :worker_processes, :user, :shutdown_timeout, :before_exec,
      :atfork_prepare, :atfork_parent, :atfork_child
    ].each do |var|
      val = @set[var]
      server.__send__("#{var}=", val) if val != :unset
    end

    @app_ctx.each { |app| app.logger ||= server.logger }
  end

  def register_inherited(name)
    return unless @config_listeners.empty? && @app_ctx.size == 1
    @config_listeners[name] = { :yahns_app_ctx => @app_ctx[0] }
  end
end

git clone git://yhbt.net/yahns.git
git clone https://yhbt.net/yahns.git