yahns.git  about / heads / tags
sleepy, multi-threaded, non-blocking application server for Ruby
blob 0135958cd02ce42bfa8eecbf38e8862cd7db698c 3999 bytes (raw)
$ git show HEAD:test/test_wbuf.rb	# shows this blob on the CLI

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
 
# Copyright (C) 2013-2016 all contributors <yahns-public@yhbt.net>
# License: GPL-3.0+ (https://www.gnu.org/licenses/gpl-3.0.txt)
# frozen_string_literal: true
require_relative 'helper'
require 'timeout'

class TestWbuf < Testcase
  ENV["N"].to_i > 1 and parallelize_me!

  def setup
    skip 'sendfile missing' unless IO.instance_methods.include?(:sendfile)
  end

  class KgioUS < UNIXSocket
    include Kgio::SocketMethods
    def self.output_buffer_tmpdir
      Dir.tmpdir
    end
  end

  def socketpair
    KgioUS.pair
  end

  def test_wbuf
    skip "sendfile not Linux-compatible" if RUBY_PLATFORM !~ /linux/
    buf = "*" * (16384 * 2)
    nr = 1000
    [ true, false ].each do |persist|
      wbuf = Yahns::Wbuf.new([], persist)
      assert_equal false, wbuf.busy
      a, b = socketpair
      assert_nil wbuf.wbuf_write(a, "HIHI")
      assert_equal "HIHI", b.read(4)
      nr.times { wbuf.wbuf_write(a, buf) }
      assert_equal :wait_writable, wbuf.wbuf_flush(a)
      done = cloexec_pipe
      thr = Thread.new do
        rv = []
        until rv[-1] == persist
          IO.select(nil, [a])
          tmp = wbuf.wbuf_flush(a)
          rv << tmp
        end
        done[1].syswrite '.'
        rv
      end

      wait = true
      begin
        if wait
          r = IO.select([b,done[0]], nil, nil, 5)
        end
        b.read_nonblock((rand * 1024).to_i + 666, buf)
        wait = (r[0] & done).empty?
      rescue Errno::EAGAIN
        break
      end while true

      assert_equal thr, thr.join(5)
      rv = thr.value
      assert_equal persist, rv.pop
      assert(rv.all? { |x| x == :wait_writable })
      a.close
      b.close
      done.each { |io| io.close }
    end
  end

  def test_wbuf_blocked
    a, b = socketpair
    skip "sendfile not Linux-compatible" if RUBY_PLATFORM !~ /linux/
    buf = "." * 4096
    4.times do
      begin
        a.write_nonblock(buf)
      rescue Errno::EAGAIN
        break
      end while true
    end
    wbuf = Yahns::Wbuf.new([], true)

    rv1 = wbuf.wbuf_write(a, buf)
    rv2 = wbuf.wbuf_flush(a)
    case rv1
    when nil
      assert_equal true, rv2, 'some kernels succeed with real sendfile'
    when :wait_writable
      assert_equal :wait_writable, rv2, 'some block on sendfile'
    else
      flunk "unexpected from wbuf_write/flush: #{rv1.inspect} / #{rv2.inspect}"
    end

    # drain the buffer
    Timeout.timeout(10) { b.read(b.nread) until b.nread == 0 }

    # b.nread will increase after this
    assert_nil wbuf.wbuf_write(a, "HI")
    nr = b.nread
    assert_operator nr, :>, 0
    assert_equal b, IO.select([b], nil, nil, 5)[0][0]
    b.read(nr - 2) if nr > 2
    assert_equal b, IO.select([b], nil, nil, 5)[0][0]
    assert_equal "HI", b.read(2), "read the end of the response"
    assert_equal true, wbuf.wbuf_flush(a)
  ensure
    a.close
    b.close
  end

  def test_wbuf_flush_close
    pipe = cloexec_pipe
    persist = true
    wbuf = Yahns::Wbuf.new(pipe[0], persist)
    refute wbuf.respond_to?(:close) # we don't want this for HttpResponse body
    sp = socketpair
    rv = nil

    buf = ("*" * 16384) << "\n"
    thr = Thread.new do
      1000.times { pipe[1].write(buf) }
      pipe[1].close
    end

    pipe[0].each { |chunk| rv = wbuf.wbuf_write(sp[1], chunk) }
    assert_equal thr, thr.join(5)
    assert_equal :wait_writable, rv

    done = cloexec_pipe
    thr = Thread.new do
      rv = []
      until rv[-1] == persist
        IO.select(nil, [sp[1]])
        rv << wbuf.wbuf_flush(sp[1])
      end
      done[1].syswrite '.'
      rv
    end

    wait = true
    begin
      if wait
        r = IO.select([sp[0],done[0]], nil, nil, 5)
      end
      sp[0].read_nonblock(16384, buf)
      wait = (r[0] & done).empty?
    rescue Errno::EAGAIN
      break
    end while true

    assert_equal thr, thr.join(5)
    rv = thr.value
    assert_equal true, rv.pop
    assert rv.all? { |x| x == :wait_writable }
    assert pipe[0].closed?
    sp.each(&:close)
    done.each(&:close)
  end
end

git clone git://yhbt.net/yahns.git
git clone https://yhbt.net/yahns.git