about summary refs log tree commit homepage
path: root/lib/yahns/proxy_http_response.rb
diff options
context:
space:
mode:
Diffstat (limited to 'lib/yahns/proxy_http_response.rb')
-rw-r--r--lib/yahns/proxy_http_response.rb80
1 files changed, 43 insertions, 37 deletions
diff --git a/lib/yahns/proxy_http_response.rb b/lib/yahns/proxy_http_response.rb
index 765fe14..ea176d9 100644
--- a/lib/yahns/proxy_http_response.rb
+++ b/lib/yahns/proxy_http_response.rb
@@ -3,20 +3,30 @@
 # License: GPL-3.0+ (https://www.gnu.org/licenses/gpl-3.0.txt)
 # frozen_string_literal: true
 
+require_relative 'wbuf_lite'
+
 # loaded by yahns/proxy_pass, this relies on Yahns::HttpResponse for
 # constants.
 module Yahns::HttpResponse # :nodoc:
 
   # switch and yield
   def proxy_unbuffer(wbuf)
-    wbuf.body.resbuf = @state = wbuf
+    @state = wbuf
     tc = Thread.current
     tc[:yahns_fdmap].remember(self) # Yahns::HttpClient
-    tc[:yahns_queue].queue_mod(self, wbuf.busy == :wait_writable ?
-                               Yahns::Queue::QEV_WR : Yahns::Queue::QEV_RD)
+    tc[:yahns_queue].queue_mod(self, wbuf.busy == :wait_readable ?
+                               Yahns::Queue::QEV_RD : Yahns::Queue::QEV_WR)
     :ignore
   end
 
+  def wbuf_alloc(req_res, busy)
+    if req_res.proxy_pass.proxy_buffering
+      Yahns::Wbuf.new(nil, req_res.alive, self.class.output_buffer_tmpdir, busy)
+    else
+      Yahns::WbufLite.new(req_res)
+    end
+  end
+
   # write everything in buf to our client socket (or wbuf, if it exists)
   # it may return a newly-created wbuf or nil
   def proxy_write(wbuf, buf, req_res)
@@ -27,15 +37,7 @@ module Yahns::HttpResponse # :nodoc:
       when String, Array # partial write, hope the skb grows
         buf = rv
       when :wait_writable, :wait_readable
-        if req_res.proxy_pass.proxy_buffering
-          body = nil
-          alive = req_res.alive
-        else
-          req_res.paused = true
-          body = req_res
-          alive = :ignore
-        end
-        wbuf = Yahns::Wbuf.new(body, alive, self.class.output_buffer_tmpdir, rv)
+        wbuf = req_res.resbuf ||= wbuf_alloc(req_res, rv)
         break
       end while true
     end
@@ -44,7 +46,7 @@ module Yahns::HttpResponse # :nodoc:
     wbuf.busy ? wbuf : nil
   end
 
-  def proxy_err_response(code, req_res, exc, wbuf)
+  def proxy_err_response(code, req_res, exc)
     logger = @hs.env['rack.logger']
     case exc
     when nil
@@ -68,13 +70,12 @@ module Yahns::HttpResponse # :nodoc:
 
     nil # signal close of req_res from yahns_step in yahns/proxy_pass.rb
   ensure
-    wbuf.wbuf_abort if wbuf
+    wbuf = req_res.resbuf
+    wbuf.wbuf_abort if wbuf.respond_to?(:wbuf_abort)
   end
 
-  def wait_on_upstream(req_res, wbuf)
-    req_res.resbuf = wbuf || Yahns::Wbuf.new(nil, req_res.alive,
-                                             self.class.output_buffer_tmpdir,
-                                             false)
+  def wait_on_upstream(req_res)
+    req_res.resbuf ||= wbuf_alloc(req_res, false)
     :wait_readable # self remains in :ignore, wait on upstream
   end
 
@@ -135,18 +136,19 @@ module Yahns::HttpResponse # :nodoc:
       flags = MSG_DONTWAIT
       res = rv # hope the skb grows
     when :wait_writable, :wait_readable # highly unlikely in real apps
-      wbuf = proxy_write(nil, res, req_res)
-      break # keep buffering as much as possible
+      proxy_write(nil, res, req_res)
+      break # keep buffering body...
     end while true
     req_res.alive = alive
-    [ wbuf, have_body ]
+    have_body
   end
 
-  def proxy_read_body(tip, kcar, req_res, wbuf)
+  def proxy_read_body(tip, kcar, req_res)
     chunk = ''.dup if kcar.chunked?
     len = kcar.body_bytes_left
     rbuf = Thread.current[:yahns_rbuf]
     alive = req_res.alive
+    wbuf = req_res.resbuf
 
     case tmp = tip.shift || req_res.kgio_tryread(0x2000, rbuf)
     when String
@@ -161,45 +163,46 @@ module Yahns::HttpResponse # :nodoc:
       # else # HTTP/1.0 upstream, HTTP/1.0 client, do nothing
       end
       wbuf = proxy_write(wbuf, tmp, req_res)
-      return proxy_unbuffer(wbuf) if wbuf && wbuf.body
       chunk.clear if chunk
+      return proxy_unbuffer(wbuf) if Yahns::WbufLite === wbuf
     when nil # EOF
       # HTTP/1.1 upstream, unexpected premature EOF:
-      return proxy_err_response(nil, req_res, nil, wbuf) if len || chunk
+      return proxy_err_response(nil, req_res, nil) if len || chunk
 
       # HTTP/1.0 upstream:
       wbuf = proxy_write(wbuf, "0\r\n\r\n".freeze, req_res) if alive
-      return proxy_unbuffer(wbuf) if wbuf && wbuf.body
+      return proxy_unbuffer(wbuf) if Yahns::WbufLite === wbuf
       req_res.shutdown
       break
     when :wait_readable
-      return wait_on_upstream(req_res, wbuf)
+      return wait_on_upstream(req_res)
     end until kcar.body_eof?
 
     if chunk
       # tip is an empty array and becomes trailer storage
       req_res.proxy_trailers = [ rbuf.dup, tip ]
-      return proxy_read_trailers(kcar, req_res, wbuf)
+      return proxy_read_trailers(kcar, req_res)
     end
     proxy_busy_mod(wbuf, req_res)
   end
 
-  def proxy_read_trailers(kcar, req_res, wbuf)
+  def proxy_read_trailers(kcar, req_res)
     chunk, tlr = req_res.proxy_trailers
     rbuf = Thread.current[:yahns_rbuf]
+    wbuf = req_res.resbuf
 
     until kcar.trailers(tlr, chunk)
       case rv = req_res.kgio_tryread(0x2000, rbuf)
       when String
         chunk << rv
       when :wait_readable
-        return wait_on_upstream(req_res, wbuf)
+        return wait_on_upstream(req_res)
       when nil # premature EOF
-        return proxy_err_response(nil, req_res, nil, wbuf)
+        return proxy_err_response(nil, req_res, nil)
       end # no loop here
     end
     wbuf = proxy_write(wbuf, trailer_out(tlr), req_res)
-    return proxy_unbuffer(wbuf) if wbuf && wbuf.body
+    return proxy_unbuffer(wbuf) if Yahns::WbufLite === wbuf
     proxy_busy_mod(wbuf, req_res)
   end
 
@@ -208,23 +211,26 @@ module Yahns::HttpResponse # :nodoc:
   # returns :ignore if we yield control to the client(self)
   # returns nil if completely done
   def proxy_response_start(res, tip, kcar, req_res)
-    wbuf, have_body = proxy_res_headers(res, req_res)
+    have_body = proxy_res_headers(res, req_res)
     tip = tip.empty? ? [] : [ tip ]
 
     if have_body
       req_res.proxy_trailers = nil # define to avoid uninitialized warnings
-      return proxy_read_body(tip, kcar, req_res, wbuf)
+      return proxy_read_body(tip, kcar, req_res)
     end
-    return proxy_unbuffer(wbuf) if wbuf && wbuf.body
+
+    # unlikely
+    wbuf = req_res.resbuf
+    return proxy_unbuffer(wbuf) if Yahns::WbufLite === wbuf
 
     # all done reading response from upstream, req_res will be discarded
     # when we return nil:
     proxy_busy_mod(wbuf, req_res)
   end
 
-  def proxy_response_finish(kcar, wbuf, req_res)
-    req_res.proxy_trailers ? proxy_read_trailers(kcar, req_res, wbuf)
-                           : proxy_read_body([], kcar, req_res, wbuf)
+  def proxy_response_finish(kcar, req_res)
+    req_res.proxy_trailers ? proxy_read_trailers(kcar, req_res)
+                           : proxy_read_body([], kcar, req_res)
   end
 
   def proxy_wait_next(qflags)