yahns Ruby server user/dev discussion
 help / color / mirror / code / Atom feed
blob b2c8b48b6f889f88dc4113d93a7c44e2bd0fd231 4151 bytes (raw)
name: test/test_proxy_pass_no_buffering.rb 	 # note: path name is non-authoritative(*)

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
 
# Copyright (C) 2015-2016 all contributors <yahns-public@yhbt.net>
# License: GPL-3.0+ (https://www.gnu.org/licenses/gpl-3.0.txt)
# frozen_string_literal: true
require_relative 'server_helper'
begin
  require 'kcar'
rescue LoadError
end
require 'digest/md5'
class TestProxyPassNoBuffering < Testcase
  ENV["N"].to_i > 1 and parallelize_me!
  include ServerHelper
  STR4 = 'abcd' * (256 * 1024)
  NCHUNK = 50
  class ProxiedApp
    def call(env)
      case env['REQUEST_METHOD']
      when 'GET'
        case env['PATH_INFO']
        when '/giant-body'
          h = [ %W(content-type text/pain),
                   %W(content-length #{NCHUNK * STR4.size}) ]
          body = Object.new
          def body.each
            NCHUNK.times { yield STR4 }
          end
          [ 200, h, body ]
        end
      end
    end
  end

  def setup
    @srv2 = TCPServer.new(ENV["TEST_HOST"] || "127.0.0.1", 0)
    server_helper_setup
    skip "kcar missing yahns/proxy_pass" unless defined?(Kcar)
    require 'yahns/proxy_pass'
  end

  def teardown
    @srv2.close if defined?(@srv2) && !@srv2.closed?
    server_helper_teardown
  end

  def check_headers(io)
    l = io.gets
    assert_match %r{\AHTTP/1\.[01] 200\b}, l
    begin
      l = io.gets
    end until l == "\r\n"
  end

  def test_proxy_pass_no_buffering
    err, cfg, host, port = @err, Yahns::Config.new, @srv.addr[3], @srv.addr[1]
    host2, port2 = @srv2.addr[3], @srv2.addr[1]
    pxp = Yahns::ProxyPass.new("http://#{host2}:#{port2}",
                               proxy_buffering: false)
    pid = mkserver(cfg) do
      ObjectSpace.each_object(Yahns::TmpIO) { |io| io.close unless io.closed? }
      @srv2.close
      cfg.instance_eval do
        app(:rack, pxp) { listen "#{host}:#{port}" }
        stderr_path err.path
      end
    end

    pid2 = mkserver(cfg, @srv2) do
      ObjectSpace.each_object(Yahns::TmpIO) { |io| io.close unless io.closed? }
      @srv.close
      cfg.instance_eval do
        app(:rack, ProxiedApp.new) do
          output_buffering false
          listen "#{host2}:#{port2}"
        end
        stderr_path err.path
      end
    end
    s = TCPSocket.new(host, port)
    req = "GET /giant-body HTTP/1.1\r\nHost: example.com\r\n" \
          "Connection: close\r\n\r\n"
    s.write(req)
    bufs = []
    sleep 1
    10.times do
      sleep 0.1
      # ensure no files get created
      if RUBY_PLATFORM =~ /\blinux\b/ && `which lsof 2>/dev/null`.size >= 4
        deleted1 = `lsof -p #{pid}`.split("\n")
        deleted1 = deleted1.grep(/\bREG\b.* \(deleted\)/)
        deleted2 = `lsof -p #{pid2}`.split("\n")
        deleted2 = deleted2.grep(/\bREG\b.* \(deleted\)/)
        [ deleted1, deleted2 ].each do |ary|
          ary.delete_if { |x| x =~ /\.(?:err|out) \(deleted\)/ }
        end
        assert_equal 1, deleted1.size, "pid1=#{deleted1.inspect}"
        assert_equal 0, deleted2.size, "pid2=#{deleted2.inspect}"
        bufs.push(deleted1[0])
      end
    end
    before = bufs.size
    bufs.uniq!
    assert bufs.size < before, 'unlinked buffer should not grow'
    buf = ''.dup
    slow = Digest::MD5.new
    ft = Thread.new do
      fast = Digest::MD5.new
      f = TCPSocket.new(host2, port2)
      f.write(req)
      b2 = ''.dup
      check_headers(f)
      nf = 0
      begin
        f.readpartial(1024 * 1024, b2)
        nf += b2.bytesize
        fast.update(b2)
      rescue EOFError
        f = f.close
      end while f
      b2.clear
      [ nf, fast.hexdigest ]
    end
    Thread.abort_on_exception = true
    check_headers(s)
    n = 0
    begin
      s.readpartial(1024 * 1024, buf)
      slow.update(buf)
      n += buf.bytesize
      sleep 0.01
    rescue EOFError
      s = s.close
    end while s
    ft.join(5)
    assert_equal [n, slow.hexdigest ], ft.value

    fast = Digest::MD5.new
    f = TCPSocket.new(host, port)
    f.write(req)
    check_headers(f)
    begin
      f.readpartial(1024 * 1024, buf)
      fast.update(buf)
    rescue EOFError
      f = f.close
    end while f
    buf.clear
    assert_equal slow.hexdigest, fast.hexdigest
  ensure
    s.close if s
    quit_wait(pid)
    quit_wait(pid2)
  end
end

debug log:

solving b2c8b48 ...
found b2c8b48 in https://yhbt.net/yahns.git/

(*) Git path names are given by the tree(s) the blob belongs to.
    Blobs themselves have no identifier aside from the hash of its contents.^

Code repositories for project(s) associated with this public inbox

	https://yhbt.net/yahns.git/

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for read-only IMAP folder(s) and NNTP newsgroup(s).