about summary refs log tree commit homepage
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2013-10-28 06:52:36 +0000
committerEric Wong <normalperson@yhbt.net>2013-10-28 06:52:36 +0000
commita667bc3787eaaf5b7df3b70624985d5fb5b8fc61 (patch)
tree7b364453627e2bdc4b737c5310abb8755f69222c
parent64dc231c557426f5729ad3b8b3d5a6693ca85ad2 (diff)
downloadyahns-a667bc3787eaaf5b7df3b70624985d5fb5b8fc61.tar.gz
We do not want users to use the default queue unless an app
context requires it.  We also do not want to spin up the default
queue unless we are sure we have app contexts using it (and
not private/anonymous queues).
-rw-r--r--lib/yahns/config.rb17
-rw-r--r--lib/yahns/server.rb21
-rw-r--r--test/test_server.rb2
3 files changed, 24 insertions, 16 deletions
diff --git a/lib/yahns/config.rb b/lib/yahns/config.rb
index c955e0f..e571cb7 100644
--- a/lib/yahns/config.rb
+++ b/lib/yahns/config.rb
@@ -38,7 +38,7 @@ class Yahns::Config # :nodoc:
     @config_listeners = {} # name/address -> options
     @app_ctx = []
     @set = Hash.new(:unset)
-    @qeggs = {}
+    @qeggs = Hash.new { |h,k| h[k] = Yahns::QueueEgg.new }
     @app_instances = {}
 
     # set defaults:
@@ -227,16 +227,23 @@ class Yahns::Config # :nodoc:
     /:/ =~ addr ? "[#{addr}]:#{port}" : "#{addr}:#{port}"
   end
 
-  def queue(name = :default, &block)
+  def queue(name = nil, &block)
     var = :queue
-    qegg = @qeggs[name] ||= Yahns::QueueEgg.new
     prev_block = @block
-    _check_in_block(:app, var) if prev_block
+    if prev_block
+      _check_in_block(:app, var)
+      name ||= @block
+    else
+      name ||= :default
+    end
+    qegg = @qeggs[name]
     if block_given?
       @block = CfgBlock.new(:queue, qegg)
       instance_eval(&block)
       @block = prev_block
     end
+
+    # associate the queue if we're inside an app
     prev_block.ctx.qegg = qegg if prev_block
   end
 
@@ -374,7 +381,7 @@ class Yahns::Config # :nodoc:
       val = @set[var]
       server.__send__("#{var}=", val) if val != :unset
     end
-    queue(:default) if @qeggs.empty?
+
     @app_ctx.each { |app| app.logger ||= server.logger }
   end
 end
diff --git a/lib/yahns/server.rb b/lib/yahns/server.rb
index 9a72ef5..fea310c 100644
--- a/lib/yahns/server.rb
+++ b/lib/yahns/server.rb
@@ -307,6 +307,15 @@ class Yahns::Server # :nodoc:
     $0 = ([ File.basename(s[0]), tag ]).concat(s[:argv]).join(' ')
   end
 
+  def qegg_vivify(qegg, fdmap)
+    queue = qegg.vivify(fdmap)
+    qegg.worker_threads.times do
+      @wthr << queue.worker_thread(@logger, qegg.max_events)
+    end
+    @queues << queue
+    queue
+  end
+
   # spins up processing threads of the server
   def fdmap_init
     thresh = @config.value(:client_expire_threshold)
@@ -314,16 +323,8 @@ class Yahns::Server # :nodoc:
     # keeps track of all connections, like ObjectSpace, but only for IOs
     fdmap = Yahns::Fdmap.new(@logger, thresh)
 
-    # initialize queues (epoll/kqueue) and associated worker threads
+    # once initialize queues (epoll/kqueue) and associated worker threads
     queues = {}
-    @config.qeggs.each do |name, qe|
-      queue = qe.vivify(fdmap)
-      qe.worker_threads.times do
-        @wthr << queue.worker_thread(@logger, qe.max_events)
-      end
-      @queues << queue
-      queues[qe] = queue
-    end
 
     # spin up applications (which are preload: false)
     @config.app_ctx.each(&:after_fork_init)
@@ -334,7 +335,7 @@ class Yahns::Server # :nodoc:
       qegg = ctx.qegg || @config.qeggs[:default]
 
       # acceptors feed the the queues
-      l.spawn_acceptor(@logger, ctx, queues[qegg])
+      l.spawn_acceptor(@logger, ctx, queues[qegg] ||= qegg_vivify(qegg, fdmap))
     end
     fdmap
   end
diff --git a/test/test_server.rb b/test/test_server.rb
index d34ed2a..e49c8ea 100644
--- a/test/test_server.rb
+++ b/test/test_server.rb
@@ -219,7 +219,7 @@ class TestServer < Testcase
     # ensure we set worker_threads correctly
     eggs = srv.instance_variable_get(:@config).qeggs
     assert_equal 1, eggs.size
-    assert_equal 1, eggs[:default].instance_variable_get(:@worker_threads)
+    assert_equal 1, eggs.first[1].instance_variable_get(:@worker_threads)
 
     pid = fork do
       bpipe[1].close