From a667bc3787eaaf5b7df3b70624985d5fb5b8fc61 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 28 Oct 2013 06:52:36 +0000 Subject: associate private/anonymous queues correctly We do not want users to use the default queue unless an app context requires it. We also do not want to spin up the default queue unless we are sure we have app contexts using it (and not private/anonymous queues). --- lib/yahns/config.rb | 17 ++++++++++++----- lib/yahns/server.rb | 21 +++++++++++---------- test/test_server.rb | 2 +- 3 files changed, 24 insertions(+), 16 deletions(-) diff --git a/lib/yahns/config.rb b/lib/yahns/config.rb index c955e0f..e571cb7 100644 --- a/lib/yahns/config.rb +++ b/lib/yahns/config.rb @@ -38,7 +38,7 @@ class Yahns::Config # :nodoc: @config_listeners = {} # name/address -> options @app_ctx = [] @set = Hash.new(:unset) - @qeggs = {} + @qeggs = Hash.new { |h,k| h[k] = Yahns::QueueEgg.new } @app_instances = {} # set defaults: @@ -227,16 +227,23 @@ class Yahns::Config # :nodoc: /:/ =~ addr ? "[#{addr}]:#{port}" : "#{addr}:#{port}" end - def queue(name = :default, &block) + def queue(name = nil, &block) var = :queue - qegg = @qeggs[name] ||= Yahns::QueueEgg.new prev_block = @block - _check_in_block(:app, var) if prev_block + if prev_block + _check_in_block(:app, var) + name ||= @block + else + name ||= :default + end + qegg = @qeggs[name] if block_given? @block = CfgBlock.new(:queue, qegg) instance_eval(&block) @block = prev_block end + + # associate the queue if we're inside an app prev_block.ctx.qegg = qegg if prev_block end @@ -374,7 +381,7 @@ class Yahns::Config # :nodoc: val = @set[var] server.__send__("#{var}=", val) if val != :unset end - queue(:default) if @qeggs.empty? + @app_ctx.each { |app| app.logger ||= server.logger } end end diff --git a/lib/yahns/server.rb b/lib/yahns/server.rb index 9a72ef5..fea310c 100644 --- a/lib/yahns/server.rb +++ b/lib/yahns/server.rb @@ -307,6 +307,15 @@ class Yahns::Server # :nodoc: $0 = ([ File.basename(s[0]), tag ]).concat(s[:argv]).join(' ') end + def qegg_vivify(qegg, fdmap) + queue = qegg.vivify(fdmap) + qegg.worker_threads.times do + @wthr << queue.worker_thread(@logger, qegg.max_events) + end + @queues << queue + queue + end + # spins up processing threads of the server def fdmap_init thresh = @config.value(:client_expire_threshold) @@ -314,16 +323,8 @@ class Yahns::Server # :nodoc: # keeps track of all connections, like ObjectSpace, but only for IOs fdmap = Yahns::Fdmap.new(@logger, thresh) - # initialize queues (epoll/kqueue) and associated worker threads + # once initialize queues (epoll/kqueue) and associated worker threads queues = {} - @config.qeggs.each do |name, qe| - queue = qe.vivify(fdmap) - qe.worker_threads.times do - @wthr << queue.worker_thread(@logger, qe.max_events) - end - @queues << queue - queues[qe] = queue - end # spin up applications (which are preload: false) @config.app_ctx.each(&:after_fork_init) @@ -334,7 +335,7 @@ class Yahns::Server # :nodoc: qegg = ctx.qegg || @config.qeggs[:default] # acceptors feed the the queues - l.spawn_acceptor(@logger, ctx, queues[qegg]) + l.spawn_acceptor(@logger, ctx, queues[qegg] ||= qegg_vivify(qegg, fdmap)) end fdmap end diff --git a/test/test_server.rb b/test/test_server.rb index d34ed2a..e49c8ea 100644 --- a/test/test_server.rb +++ b/test/test_server.rb @@ -219,7 +219,7 @@ class TestServer < Testcase # ensure we set worker_threads correctly eggs = srv.instance_variable_get(:@config).qeggs assert_equal 1, eggs.size - assert_equal 1, eggs[:default].instance_variable_get(:@worker_threads) + assert_equal 1, eggs.first[1].instance_variable_get(:@worker_threads) pid = fork do bpipe[1].close -- cgit v1.2.3-24-ge0c7