about summary refs log tree commit homepage
path: root/test/test_server.rb
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_server.rb')
-rw-r--r--test/test_server.rb382
1 files changed, 382 insertions, 0 deletions
diff --git a/test/test_server.rb b/test/test_server.rb
new file mode 100644
index 0000000..5c86268
--- /dev/null
+++ b/test/test_server.rb
@@ -0,0 +1,382 @@
+# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> and all contributors
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+require_relative 'server_helper'
+
+class TestServer < Testcase
+  parallelize_me!
+  include ServerHelper
+
+  alias setup server_helper_setup
+  alias teardown server_helper_teardown
+
+  def test_single_process
+    err = @err
+    cfg = Yahns::Config.new
+    host, port = @srv.addr[3], @srv.addr[1]
+    cfg.instance_eval do
+      ru = lambda { |_| [ 200, {'Content-Length'=>'2'}, ['HI'] ] }
+      GTL.synchronize { app(:rack, ru) { listen "#{host}:#{port}" } }
+      logger(Logger.new(err.path))
+    end
+    srv = Yahns::Server.new(cfg)
+    pid = fork do
+      ENV["YAHNS_FD"] = @srv.fileno.to_s
+      srv.start.join
+    end
+    run_client(host, port) { |res| assert_equal "HI", res.body }
+    c = get_tcp_client(host, port)
+
+    # test pipelining
+    r = "GET / HTTP/1.1\r\nHost: example.com\r\n\r\n"
+    c.write(r + r)
+    buf = ""
+    Timeout.timeout(10) do
+      until buf =~ /HI.+HI/m
+        buf << c.readpartial(4096)
+      end
+    end
+
+    # trickle pipelining
+    c.write(r + "GET ")
+    buf = ""
+    Timeout.timeout(10) do
+      until buf =~ /HI\z/
+        buf << c.readpartial(4096)
+      end
+    end
+    c.write("/ HTTP/1.1\r\nHost: example.com\r\n\r\n")
+    Timeout.timeout(10) do
+      until buf =~ /HI.+HI/m
+        buf << c.readpartial(4096)
+      end
+    end
+  rescue => e
+    Yahns::Log.exception(Logger.new($stderr), "test", e)
+    raise
+  ensure
+    c.close if c
+    quit_wait(pid)
+  end
+
+  def test_input_body_true; input_body(true); end
+  def test_input_body_false; input_body(false); end
+  def test_input_body_lazy; input_body(:lazy); end
+
+  def input_body(btype)
+    err = @err
+    cfg = Yahns::Config.new
+    host, port = @srv.addr[3], @srv.addr[1]
+    cfg.instance_eval do
+      ru = lambda {|e|[ 200, {'Content-Length'=>'2'},[e["rack.input"].read]]}
+      GTL.synchronize do
+        app(:rack, ru) do
+          listen "#{host}:#{port}"
+          input_buffering btype
+        end
+      end
+      logger(Logger.new(err.path))
+    end
+    srv = Yahns::Server.new(cfg)
+    pid = fork do
+      ENV["YAHNS_FD"] = @srv.fileno.to_s
+      srv.start.join
+    end
+    c = get_tcp_client(host, port)
+    buf = "PUT / HTTP/1.0\r\nContent-Length: 2\r\n\r\nHI"
+    c.write(buf)
+    IO.select([c], nil, nil, 5)
+    rv = c.read(666)
+    head, body = rv.split(/\r\n\r\n/)
+    assert_match(%r{^Content-Length: 2\r\n}, head)
+    assert_equal "HI", body, "#{rv.inspect} - #{btype.inspect}"
+    c.close
+
+    # pipelined oneshot
+    buf = "PUT / HTTP/1.1\r\nContent-Length: 2\r\n\r\nHI"
+    c = get_tcp_client(host, port)
+    c.write(buf + buf)
+    buf = ""
+    Timeout.timeout(10) do
+      until buf =~ /HI.+HI/m
+        buf << c.readpartial(4096)
+      end
+    end
+    assert buf.gsub!(/Date:[^\r\n]+\r\n/, ""), "kill differing Date"
+    rv = buf.sub!(/\A(HTTP.+?\r\n\r\nHI)/m, "")
+    first = $1
+    assert rv
+    assert_equal first, buf
+
+    # pipelined trickle
+    buf = "PUT / HTTP/1.1\r\nContent-Length: 5\r\n\r\nHIBYE"
+    (buf + buf).each_byte do |b|
+      c.write(b.chr)
+      sleep(0.01) if b.chr == ":"
+      Thread.pass
+    end
+    buf = ""
+    Timeout.timeout(10) do
+      until buf =~ /HIBYE.+HIBYE/m
+        buf << c.readpartial(4096)
+      end
+    end
+    assert buf.gsub!(/Date:[^\r\n]+\r\n/, ""), "kill differing Date"
+    rv = buf.sub!(/\A(HTTP.+?\r\n\r\nHIBYE)/m, "")
+    first = $1
+    assert rv
+    assert_equal first, buf
+  rescue => e
+    Yahns::Log.exception(Logger.new($stderr), "test", e)
+    raise
+  ensure
+    c.close if c
+    quit_wait(pid)
+  end
+
+  def test_trailer_true; trailer(true); end
+  def test_trailer_false; trailer(false); end
+  def test_trailer_lazy; trailer(:lazy); end
+  def test_slow_trailer_true; trailer(true, 0.02); end
+  def test_slow_trailer_false; trailer(false, 0.02); end
+  def test_slow_trailer_lazy; trailer(:lazy, 0.02); end
+
+  def trailer(btype, delay = false)
+    err = @err
+    cfg = Yahns::Config.new
+    host, port = @srv.addr[3], @srv.addr[1]
+    cfg.instance_eval do
+      ru = lambda do |e|
+        body = e["rack.input"].read
+        s = e["HTTP_XBT"] + "\n" + body
+        [ 200, {'Content-Length'=>s.size.to_s}, [ s ] ]
+      end
+      GTL.synchronize do
+        app(:rack, ru) do
+          listen "#{host}:#{port}"
+          input_buffering btype
+        end
+      end
+      logger(Logger.new(err.path))
+    end
+    srv = Yahns::Server.new(cfg)
+    pid = fork do
+      ENV["YAHNS_FD"] = @srv.fileno.to_s
+      srv.start.join
+    end
+    c = get_tcp_client(host, port)
+    buf = "PUT / HTTP/1.0\r\nTrailer:xbt\r\nTransfer-Encoding: chunked\r\n\r\n"
+    c.write(buf)
+    xbt = btype.to_s
+    sleep(delay) if delay
+    c.write(sprintf("%x\r\n", xbt.size))
+    sleep(delay) if delay
+    c.write(xbt)
+    sleep(delay) if delay
+    c.write("\r\n")
+    sleep(delay) if delay
+    c.write("0\r\nXBT: ")
+    sleep(delay) if delay
+    c.write("#{xbt}\r\n\r\n")
+    IO.select([c], nil, nil, 5000) or raise "timed out"
+    rv = c.read(666)
+    _, body = rv.split(/\r\n\r\n/)
+    a, b = body.split(/\n/)
+    assert_equal xbt, a
+    assert_equal xbt, b
+  ensure
+    c.close if c
+    quit_wait(pid)
+  end
+
+  def test_check_client_connection
+    msgs = %w(ZZ zz)
+    err = @err
+    cfg = Yahns::Config.new
+    bpipe = IO.pipe
+    host, port = @srv.addr[3], @srv.addr[1]
+    cfg.instance_eval do
+      ru = lambda { |e|
+        case e['PATH_INFO']
+        when '/sleep'
+          a = Object.new
+          a.instance_variable_set(:@bpipe, bpipe)
+          a.instance_variable_set(:@msgs, msgs)
+          def a.each
+            @msgs.each do |msg|
+              yield @bpipe[0].read(msg.size)
+            end
+          end
+        when '/cccfail'
+          # we should not get here if check_client_connection worked
+          abort "CCCFAIL"
+        else
+          a = %w(HI)
+        end
+        [ 200, {'Content-Length'=>'2'}, a ]
+      }
+      GTL.synchronize {
+        app(:rack, ru) {
+          listen "#{host}:#{port}"
+          check_client_connection true
+          # needed to avoid concurrency with check_client_connection
+          queue { worker_threads 1 }
+          output_buffering false
+        }
+      }
+      logger(Logger.new(err.path))
+    end
+    srv = Yahns::Server.new(cfg)
+
+    # ensure we set worker_threads correctly
+    eggs = srv.instance_variable_get(:@config).qeggs
+    assert_equal 1, eggs.size
+    assert_equal 1, eggs[:default].instance_variable_get(:@worker_threads)
+
+    pid = fork do
+      bpipe[1].close
+      ENV["YAHNS_FD"] = @srv.fileno.to_s
+      srv.start.join
+    end
+    bpipe[0].close
+    a = get_tcp_client(host, port)
+    b = get_tcp_client(host, port)
+    a.write("GET /sleep HTTP/1.0\r\n\r\n")
+    r = IO.select([a], nil, nil, 4)
+    assert r, "nothing ready"
+    assert_equal a, r[0][0]
+    buf = a.read(8)
+    assert_equal "HTTP/1.1", buf
+
+    # hope the kernel sees this before it sees the bpipe ping-ponging below
+    b.write("GET /cccfail HTTP/1.0\r\n\r\n")
+    b.shutdown
+    b.close
+
+    # ping-pong a bit to stall the server
+    msgs.each do |msg|
+      bpipe[1].write(msg)
+      Timeout.timeout(10) { buf << a.readpartial(10) until buf =~ /#{msg}/ }
+    end
+    bpipe[1].close
+    assert_equal msgs.join, buf.split(/\r\n\r\n/)[1]
+
+    # do things still work?
+    run_client(host, port) { |res| assert_equal "HI", res.body }
+  ensure
+    quit_wait(pid)
+  end
+
+  def test_mp
+    pid, host, port = new_mp_server
+    wpid = nil
+    run_client(host, port) do |res|
+      wpid ||= res.body.to_i
+    end
+  ensure
+    quit_wait(pid)
+    if wpid
+      assert_raises(Errno::ESRCH) { Process.kill(:KILL, wpid) }
+      assert_raises(Errno::ECHILD) { Process.waitpid2(wpid) }
+    end
+  end
+
+  # Linux blocking accept() has fair behavior between multiple tasks
+  def test_mp_balance
+    skip("linux-only test") unless RUBY_PLATFORM =~ /linux/
+    pid, host, port = new_mp_server(2)
+    seen = {}
+
+    # wait for both processes to spin up
+    Timeout.timeout(10) do
+      run_client(host, port) { |res| seen[res.body] = 1 } until seen.size == 2
+    end
+
+    prev = nil
+    req = Net::HTTP::Get.new("/")
+    # we should bounce new connections between 2 processes
+    4.times do
+      Net::HTTP.start(host, port) do |http|
+        res = http.request(req)
+        assert_equal 200, res.code.to_i
+        assert_equal "keep-alive", res["Connection"]
+        refute_equal prev, res.body, "same PID accepted twice"
+        prev = res.body.dup
+        seen[prev] += 1
+        666.times { Thread.pass } # have the other acceptor to wake up
+      end
+    end
+    assert_equal 2, seen.size
+  ensure
+    quit_wait(pid)
+  end
+
+  def test_mp_worker_die
+    pid, host, port = new_mp_server
+    wpid1 = wpid2 = nil
+    run_client(host, port) do |res|
+      wpid1 ||= res.body.to_i
+    end
+    Process.kill(:QUIT, wpid1)
+    poke_until_dead(wpid1)
+    run_client(host, port) do |res|
+      wpid2 ||= res.body.to_i
+    end
+    refute_equal wpid2, wpid1
+  ensure
+    quit_wait(pid)
+    assert_raises(Errno::ESRCH) { Process.kill(:KILL, wpid2) } if wpid2
+  end
+
+  def test_mp_dead_parent
+    pid, host, port = new_mp_server
+    wpid = nil
+    run_client(host, port) do |res|
+      wpid ||= res.body.to_i
+    end
+    Process.kill(:KILL, pid)
+    _, status = Process.waitpid2(pid)
+    assert status.signaled?, status.inspect
+    poke_until_dead(wpid)
+  end
+
+  def run_client(host, port)
+    c = get_tcp_client(host, port)
+    Net::HTTP.start(host, port) do |http|
+      res = http.request(Net::HTTP::Get.new("/"))
+      assert_equal 200, res.code.to_i
+      assert_equal "keep-alive", res["Connection"]
+      yield res
+      res = http.request(Net::HTTP::Get.new("/"))
+      assert_equal 200, res.code.to_i
+      assert_equal "keep-alive", res["Connection"]
+      yield res
+    end
+    c.write "GET / HTTP/1.0\r\n\r\n"
+    res = Timeout.timeout(10) { c.read }
+    head, _ = res.split(/\r\n\r\n/)
+    head = head.split(/\r\n/)
+    assert_equal "HTTP/1.1 200 OK", head[0]
+    assert_equal "Connection: close", head[-1]
+    c.close
+  end
+
+  def new_mp_server(nr = 1)
+    ru = @ru = tmpfile(%w(config .ru))
+    @ru.puts('a = $$.to_s')
+    @ru.puts('run lambda { |_| [ 200, {"Content-Length"=>a.size.to_s},[a]]}')
+    err = @err
+    cfg = Yahns::Config.new
+    host, port = @srv.addr[3], @srv.addr[1]
+    cfg.instance_eval do
+      worker_processes 2
+      GTL.synchronize { app(:rack, ru.path) { listen "#{host}:#{port}" } }
+      logger(Logger.new(File.open(err.path, "a")))
+    end
+    srv = Yahns::Server.new(cfg)
+    pid = fork do
+      ENV["YAHNS_FD"] = @srv.fileno.to_s
+      srv.start.join
+    end
+    [ pid, host, port ]
+  end
+end