about summary refs log tree commit homepage
path: root/test/test_wbuf.rb
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2013-10-18 10:28:18 +0000
committerEric Wong <normalperson@yhbt.net>2013-10-18 10:28:18 +0000
commitab067831e707b191d6dfdcd01de1f1d85fc90d05 (patch)
treeb02861eb1521fb325ee4e1d91e1a194ca73e7a9e /test/test_wbuf.rb
downloadyahns-ab067831e707b191d6dfdcd01de1f1d85fc90d05.tar.gz
Diffstat (limited to 'test/test_wbuf.rb')
-rw-r--r--test/test_wbuf.rb136
1 files changed, 136 insertions, 0 deletions
diff --git a/test/test_wbuf.rb b/test/test_wbuf.rb
new file mode 100644
index 0000000..dc6bc24
--- /dev/null
+++ b/test/test_wbuf.rb
@@ -0,0 +1,136 @@
+# Copyright (C) 2013, Eric Wong <normalperson@yhbt.net> and all contributors
+# License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt)
+require_relative 'helper'
+require 'timeout'
+
+class TestWbuf < Testcase
+  parallelize_me!
+
+  def test_wbuf
+    buf = "*" * (16384 * 2)
+    nr = 1000
+    [ true, false ].each do |persist|
+      wbuf = Yahns::Wbuf.new([], persist)
+      a, b = UNIXSocket.pair
+      assert_nil wbuf.wbuf_write(a, "HIHI")
+      assert_equal "HIHI", b.read(4)
+      nr.times { wbuf.wbuf_write(a, buf) }
+      assert_equal :wait_writable, wbuf.wbuf_flush(a)
+      done = IO.pipe
+      thr = Thread.new do
+        rv = []
+        until rv[-1] == persist
+          IO.select(nil, [a])
+          tmp = wbuf.wbuf_flush(a)
+          rv << tmp
+        end
+        done[1].syswrite '.'
+        rv
+      end
+
+      wait = true
+      begin
+        if wait
+          r = IO.select([b,done[0]], nil, nil, 5)
+        end
+        b.read_nonblock((rand * 1024) + 666, buf)
+        wait = (r[0] & done).empty?
+      rescue Errno::EAGAIN
+        break
+      end while true
+
+      assert_equal thr, thr.join(5)
+      rv = thr.value
+      assert_equal persist, rv.pop
+      assert(rv.all? { |x| x == :wait_writable })
+      a.close
+      b.close
+      done.each { |io| io.close }
+    end
+  end
+
+  def test_wbuf_blocked
+    a, b = UNIXSocket.pair
+    buf = "." * 4096
+    4.times do
+      begin
+        a.write_nonblock(buf)
+      rescue Errno::EAGAIN
+        break
+      end while true
+    end
+    wbuf = Yahns::Wbuf.new([], true)
+    assert_equal :wait_writable, wbuf.wbuf_write(a, buf)
+    assert_equal :wait_writable, wbuf.wbuf_flush(a)
+
+    # drain the buffer
+    Timeout.timeout(10) { b.read(b.nread) until b.nread == 0 }
+
+    # b.nread will increase after this
+    assert_nil wbuf.wbuf_write(a, "HI")
+    nr = b.nread
+    assert_operator nr, :>, 0
+    assert_equal b, IO.select([b], nil, nil, 5)[0][0]
+    b.read(nr - 2) if nr > 2
+    assert_equal b, IO.select([b], nil, nil, 5)[0][0]
+    assert_equal "HI", b.read(2)
+    begin
+      wbuf.wbuf_flush(a)
+      assert false
+    rescue => e
+    end
+    assert_match(%r{BUG: EOF on tmpio}, e.message)
+  ensure
+    a.close
+    b.close
+  end
+
+  def test_wbuf_flush_close
+    pipe = IO.pipe
+    persist = true
+    wbuf = Yahns::Wbuf.new(pipe[0], persist)
+    refute wbuf.respond_to?(:close) # we don't want this for HttpResponse body
+    sp = UNIXSocket.pair
+    rv = nil
+
+    buf = ("*" * 16384) << "\n"
+    thr = Thread.new do
+      1000.times { pipe[1].write(buf) }
+      pipe[1].close
+    end
+
+    pipe[0].each { |chunk| rv = wbuf.wbuf_write(sp[1], chunk) }
+    assert_equal thr, thr.join(5)
+    assert_equal :wait_writable, rv
+
+    done = IO.pipe
+    thr = Thread.new do
+      rv = []
+      until rv[-1] == persist
+        IO.select(nil, [sp[1]])
+        rv << wbuf.wbuf_flush(sp[1])
+      end
+      done[1].syswrite '.'
+      rv
+    end
+
+    wait = true
+    begin
+      if wait
+        r = IO.select([sp[0],done[0]], nil, nil, 5)
+      end
+      sp[0].read_nonblock(16384, buf)
+      wait = (r[0] & done).empty?
+    rescue Errno::EAGAIN
+      break
+    end while true
+
+    assert_equal thr, thr.join(5)
+    rv = thr.value
+    assert_equal true, rv.pop
+    assert rv.all? { |x| x == :wait_writable }
+    assert pipe[0].closed?
+    sp.each(&:close)
+    done.each(&:close)
+  end
+end