From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on dcvr.yhbt.net X-Spam-Level: X-Spam-ASN: X-Spam-Status: No, score=-4.0 required=3.0 tests=ALL_TRUSTED,BAYES_00 shortcircuit=no autolearn=ham autolearn_force=no version=3.4.0 Received: from localhost (dcvr.yhbt.net [127.0.0.1]) by dcvr.yhbt.net (Postfix) with ESMTP id 6B4BE2019E for ; Tue, 12 Jul 2016 21:41:05 +0000 (UTC) From: Eric Wong To: yahns-public@yhbt.net Subject: [PATCH 2/3] wbuf_lite: use StringIO instead of TmpIO Date: Tue, 12 Jul 2016 21:41:02 +0000 Message-Id: <20160712214103.12282-3-e@80x24.org> In-Reply-To: <20160712214103.12282-1-e@80x24.org> References: <20160712214103.12282-1-e@80x24.org> List-Id: This allows us to work transparently with our OpenSSL workaround[*] while allowing us to reuse our non-sendfile compatibility code. Unfortunately, this means we duplicate a lot of code from the normal wbuf code for now; but that should be fairly stable at this point. [*] https://bugs.ruby-lang.org/issues/12085 --- lib/yahns/http_client.rb | 26 ++++++++++++ lib/yahns/openssl_client.rb | 6 +-- lib/yahns/sendfile_compat.rb | 24 ------------ lib/yahns/wbuf_common.rb | 8 ---- lib/yahns/wbuf_lite.rb | 76 ++++++++++++++++++++++++++++++++---- test/test_proxy_pass_no_buffering.rb | 2 +- test/test_tmpio.rb | 4 ++ test/test_wbuf.rb | 4 ++ 8 files changed, 107 insertions(+), 43 deletions(-) delete mode 100644 lib/yahns/sendfile_compat.rb diff --git a/lib/yahns/http_client.rb b/lib/yahns/http_client.rb index 1d64e08..1cdaa0f 100644 --- a/lib/yahns/http_client.rb +++ b/lib/yahns/http_client.rb @@ -2,6 +2,12 @@ # Copyright (C) 2013-2016 all contributors # License: GPL-3.0+ (https://www.gnu.org/licenses/gpl-3.0.txt) # frozen_string_literal: true +begin + raise LoadError, 'SENDFILE_BROKEN env set' if ENV['SENDFILE_BROKEN'] + require 'sendfile' +rescue LoadError +end + class Yahns::HttpClient < Kgio::Socket # :nodoc: NULL_IO = StringIO.new(''.dup) # :nodoc: @@ -304,4 +310,24 @@ def app_hijacked?(env, body) body.close if body.respond_to?(:close) true end + + def trysendio(io, offset, count) + return 0 if count == 0 + count = 0x4000 if count > 0x4000 + buf = Thread.current[:yahns_sfbuf] ||= ''.dup + io.pos = offset + str = io.read(count, buf) or return # nil for EOF + n = 0 + case rv = kgio_trywrite(str) + when String # partial write, keep trying + n += (str.size - rv.size) + str = rv + when :wait_writable, :wait_readable + return n > 0 ? n : rv + when nil + return n + str.size # yay! + end while true + end + + alias trysendfile trysendio unless IO.instance_methods.include?(:trysendfile) end diff --git a/lib/yahns/openssl_client.rb b/lib/yahns/openssl_client.rb index f896acd..439bc75 100644 --- a/lib/yahns/openssl_client.rb +++ b/lib/yahns/openssl_client.rb @@ -3,8 +3,6 @@ # License: GPL-3.0+ (https://www.gnu.org/licenses/gpl-3.0.txt) # frozen_string_literal: true -require_relative 'sendfile_compat' - # this is to be included into a Kgio::Socket-derived class # this requires Ruby 2.1 and later for "exception: false" module Yahns::OpenSSLClient # :nodoc: @@ -72,7 +70,7 @@ def kgio_tryread(len, buf) @ssl.read_nonblock(len, buf, exception: false) end - def trysendfile(io, offset, count) + def trysendio(io, offset, count) return 0 if count == 0 unless buf = @ssl_blocked @@ -97,6 +95,8 @@ def shutdown # we never call this with a how=SHUT_* arg @ssl.sysclose end + alias trysendfile trysendio + def close @ssl.close # flushes SSLSocket super # IO#close diff --git a/lib/yahns/sendfile_compat.rb b/lib/yahns/sendfile_compat.rb deleted file mode 100644 index 8bd4622..0000000 --- a/lib/yahns/sendfile_compat.rb +++ /dev/null @@ -1,24 +0,0 @@ -# -*- encoding: binary -*- -# Copyright (C) 2013-2016 all contributors -# License: GPL-3.0+ (https://www.gnu.org/licenses/gpl-3.0.txt) -# frozen_string_literal: true - -module Yahns::SendfileCompat # :nodoc: - def trysendfile(io, offset, count) - return 0 if count == 0 - count = 0x4000 if count > 0x4000 - buf = Thread.current[:yahns_sfbuf] ||= ''.dup - io.pos = offset - str = io.read(count, buf) or return # nil for EOF - n = 0 - case rv = kgio_trywrite(str) - when String # partial write, keep trying - n += (str.size - rv.size) - str = rv - when :wait_writable, :wait_readable - return n > 0 ? n : rv - when nil - return n + str.size # yay! - end while true - end -end diff --git a/lib/yahns/wbuf_common.rb b/lib/yahns/wbuf_common.rb index 2799baf..7cd19f7 100644 --- a/lib/yahns/wbuf_common.rb +++ b/lib/yahns/wbuf_common.rb @@ -2,14 +2,6 @@ # Copyright (C) 2013-2016 all contributors # License: GPL-3.0+ (https://www.gnu.org/licenses/gpl-3.0.txt) # frozen_string_literal: true -begin - raise LoadError, "SENDFILE_BROKEN env set" if ENV["SENDFILE_BROKEN"] - require 'sendfile' -rescue LoadError - require_relative 'sendfile_compat' - IO.__send__ :include, Yahns::SendfileCompat -end - module Yahns::WbufCommon # :nodoc: # returns true / false for persistent/non-persistent connections # returns :wait_*able when blocked diff --git a/lib/yahns/wbuf_lite.rb b/lib/yahns/wbuf_lite.rb index 25daf21..2da5349 100644 --- a/lib/yahns/wbuf_lite.rb +++ b/lib/yahns/wbuf_lite.rb @@ -2,30 +2,88 @@ # Copyright (C) 2016 all contributors # License: GPL-3.0+ # frozen_string_literal: true -require_relative 'wbuf' +require 'stringio' +require_relative 'wbuf_common' # This is only used for "proxy_buffering: false" -class Yahns::WbufLite < Yahns::Wbuf # :nodoc: +class Yahns::WbufLite # :nodoc: + include Yahns::WbufCommon attr_reader :busy attr_writer :req_res def initialize(req_res) - alive = req_res.alive - super(nil, alive ? :ignore : false) + @tmpio = nil + @sf_offset = @sf_count = 0 + @wbuf_persist = :ignore + @busy = false @req_res = req_res end - def wbuf_write(client, buf) - super + def wbuf_write(c, buf) + buf = buf.join if Array === buf + # try to bypass the VFS layer and write directly to the socket + # if we're all caught up + case rv = c.kgio_trywrite(buf) + when String + buf = rv # retry in loop + when nil + return # yay! hopefully we don't have to buffer again + when :wait_writable, :wait_readable + @busy = rv + end until @busy + + @tmpio ||= StringIO.new(''.dup) # relies on encoding: binary above + @sf_count += @tmpio.write(buf) + + # we spent some time copying to the FS, try to write to + # the socket again in case some space opened up... + case rv = c.trysendio(@tmpio, @sf_offset, @sf_count) + when Integer + @sf_count -= rv + @sf_offset += rv + when :wait_writable, :wait_readable + @busy = rv + return rv + else + raise "BUG: #{rv.nil? ? "EOF" : rv.inspect} on tmpio " \ + "sf_offset=#@sf_offset sf_count=#@sf_count" + end while @sf_count > 0 + + # we're all caught up, try to prevent dirty data from getting flushed + # to disk if we can help it. + wbuf_abort + @sf_offset = 0 + @busy = false + nil rescue @req_res = @req_res.close if @req_res raise end def wbuf_flush(client) - super + case rv = client.trysendio(@tmpio, @sf_offset, @sf_count) + when Integer + return wbuf_close(client) if (@sf_count -= rv) == 0 # all sent! + @sf_offset += rv # keep going otherwise + when :wait_writable, :wait_readable + return rv + when nil + # response got truncated, drop the connection + # this may happens when using Rack::File or similar, we can't + # keep the connection alive because we already sent our Content-Length + # header the client would be confused. + @wbuf_persist = false + return wbuf_close(client) + else + raise "BUG: rv=#{rv.inspect} " \ + "on tmpio=#{@tmpio.inspect} " \ + "sf_offset=#@sf_offset sf_count=#@sf_count" + end while @sf_count > 0 + wbuf_close(client) rescue + @wbuf_persist = false # ensure a hijack response is not called @req_res = @req_res.close if @req_res + wbuf_close(client) raise end @@ -48,4 +106,8 @@ def wbuf_close(client) @req_res = @req_res.close if @req_res raise end + + def wbuf_abort + @tmpio = @tmpio.close if @tmpio + end end diff --git a/test/test_proxy_pass_no_buffering.rb b/test/test_proxy_pass_no_buffering.rb index 2dc3b0b..356623f 100644 --- a/test/test_proxy_pass_no_buffering.rb +++ b/test/test_proxy_pass_no_buffering.rb @@ -108,7 +108,7 @@ def test_proxy_pass_no_buffering [ deleted1, deleted2 ].each do |ary| ary.delete_if { |x| x =~ /\.(?:err|out|rb|ru) \(deleted\)/ } end - assert_equal 1, deleted1.size, "pid1=#{deleted1.inspect}" + assert_equal 0, deleted1.size, "pid1=#{deleted1.inspect}" assert_equal 0, deleted2.size, "pid2=#{deleted2.inspect}" bufs.push(deleted1[0]) end diff --git a/test/test_tmpio.rb b/test/test_tmpio.rb index 7d25d3f..3bcf3ca 100644 --- a/test/test_tmpio.rb +++ b/test/test_tmpio.rb @@ -5,6 +5,10 @@ require_relative 'helper' class TestTmpIO < Testcase + def setup + skip 'sendfile missing' unless IO.instance_methods.include?(:sendfile) + end + def test_writev a, b = UNIXSocket.pair a.extend Kgio::PipeMethods diff --git a/test/test_wbuf.rb b/test/test_wbuf.rb index 1382086..89825db 100644 --- a/test/test_wbuf.rb +++ b/test/test_wbuf.rb @@ -7,6 +7,10 @@ class TestWbuf < Testcase ENV["N"].to_i > 1 and parallelize_me! + def setup + skip 'sendfile missing' unless IO.instance_methods.include?(:sendfile) + end + class KgioUS < UNIXSocket include Kgio::SocketMethods def self.output_buffer_tmpdir -- EW