about summary refs log tree commit homepage
path: root/lib/rainbows
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2013-02-08 22:45:20 +0000
committerEric Wong <normalperson@yhbt.net>2013-02-11 01:57:05 +0000
commite166cfe5e8d648b544b1291ec157bd234a425e21 (patch)
tree8ac56aadc51d81d4d250cfec696446f19ffd1d64 /lib/rainbows
parente6faf9e26bcb172026a4984ecadbaa8b6789bcb7 (diff)
downloadrainbows-e166cfe5e8d648b544b1291ec157bd234a425e21.tar.gz
This requires Rack 1.5.x and unicorn 4.6.0 for hijacking
support.  Older versions of Rack continue to work fine,
but we must use unicorn 4.6.0 features to support this.
Diffstat (limited to 'lib/rainbows')
-rw-r--r--lib/rainbows/coolio/client.rb24
-rw-r--r--lib/rainbows/coolio/thread_client.rb2
-rw-r--r--lib/rainbows/epoll/client.rb20
-rw-r--r--lib/rainbows/ev_core.rb9
-rw-r--r--lib/rainbows/event_machine/client.rb13
-rw-r--r--lib/rainbows/process_client.rb10
-rw-r--r--lib/rainbows/response.rb72
-rw-r--r--lib/rainbows/revactor/client/methods.rb2
-rw-r--r--lib/rainbows/stream_response_epoll.rb49
-rw-r--r--lib/rainbows/stream_response_epoll/client.rb14
-rw-r--r--lib/rainbows/writer_thread_pool/client.rb2
11 files changed, 160 insertions, 57 deletions
diff --git a/lib/rainbows/coolio/client.rb b/lib/rainbows/coolio/client.rb
index 8d48bbf..843f574 100644
--- a/lib/rainbows/coolio/client.rb
+++ b/lib/rainbows/coolio/client.rb
@@ -86,6 +86,12 @@ class Rainbows::Coolio::Client < Coolio::IO
     @deferred = true
   end
 
+  def hijacked
+    CONN.delete(self)
+    detach
+    nil
+  end
+
   def write_response_path(status, headers, body, alive)
     io = body_to_io(body)
     st = io.stat
@@ -93,7 +99,8 @@ class Rainbows::Coolio::Client < Coolio::IO
     if st.file?
       defer_file(status, headers, body, alive, io, st)
     elsif st.socket? || st.pipe?
-      chunk = stream_response_headers(status, headers, alive)
+      chunk = stream_response_headers(status, headers, alive, body)
+      return hijacked if nil == chunk
       stream_response_body(body, io, chunk)
     else
       # char or block device... WTF?
@@ -103,10 +110,11 @@ class Rainbows::Coolio::Client < Coolio::IO
 
   def ev_write_response(status, headers, body, alive)
     if body.respond_to?(:to_path)
-      write_response_path(status, headers, body, alive)
+      body = write_response_path(status, headers, body, alive)
     else
-      write_response(status, headers, body, alive)
+      body = write_response(status, headers, body, alive)
     end
+    return hijacked unless body
     return quit unless alive && :close != @state
     @state = :headers
   end
@@ -117,9 +125,11 @@ class Rainbows::Coolio::Client < Coolio::IO
     @env[RACK_INPUT] = input
     @env[REMOTE_ADDR] = @_io.kgio_addr
     @env[ASYNC_CALLBACK] = method(:write_async_response)
+    @hp.hijack_setup(@env, @_io)
     status, headers, body = catch(:async) {
       APP.call(@env.merge!(RACK_DEFAULTS))
     }
+    return hijacked if @hp.hijacked?
 
     (nil == status || -1 == status) ? @deferred = true :
         ev_write_response(status, headers, body, @hp.next?)
@@ -186,12 +196,13 @@ class Rainbows::Coolio::Client < Coolio::IO
     def defer_file(status, headers, body, alive, io, st)
       if r = sendfile_range(status, headers)
         status, headers, range = r
-        write_headers(status, headers, alive)
+        body = write_headers(status, headers, alive, body) or return hijacked
         range and defer_file_stream(range[0], range[1], io, body)
       else
-        write_headers(status, headers, alive)
+        write_headers(status, headers, alive, body) or return hijacked
         defer_file_stream(0, st.size, io, body)
       end
+      body
     end
 
     def stream_file_chunk(sf) # +sf+ is a Rainbows::StreamFile object
@@ -207,8 +218,9 @@ class Rainbows::Coolio::Client < Coolio::IO
     end
   else
     def defer_file(status, headers, body, alive, io, st)
-      write_headers(status, headers, alive)
+      write_headers(status, headers, alive, body) or return hijacked
       defer_file_stream(0, st.size, io, body)
+      body
     end
 
     def stream_file_chunk(body)
diff --git a/lib/rainbows/coolio/thread_client.rb b/lib/rainbows/coolio/thread_client.rb
index abc11d2..ee9fa04 100644
--- a/lib/rainbows/coolio/thread_client.rb
+++ b/lib/rainbows/coolio/thread_client.rb
@@ -14,6 +14,7 @@ class Rainbows::Coolio::ThreadClient < Rainbows::Coolio::Client
 
   # this is only called in the master thread
   def response_write(response)
+    return hijacked if @hp.hijacked?
     ev_write_response(*response, @hp.next?)
     rescue => e
       handle_error(e)
@@ -25,6 +26,7 @@ class Rainbows::Coolio::ThreadClient < Rainbows::Coolio::Client
   def app_response
     begin
       @env[REMOTE_ADDR] = @_io.kgio_addr
+      @hp.hijack_setup(@env, @_io)
       APP.call(@env.merge!(RACK_DEFAULTS))
     rescue => e
       Rainbows::Error.app(e) # we guarantee this does not raise
diff --git a/lib/rainbows/epoll/client.rb b/lib/rainbows/epoll/client.rb
index d72696b..f6af6fa 100644
--- a/lib/rainbows/epoll/client.rb
+++ b/lib/rainbows/epoll/client.rb
@@ -52,6 +52,7 @@ module Rainbows::Epoll::Client
     when String
       on_read(rv)
       return if @wr_queue[0] || closed?
+      return hijacked if @hp.hijacked?
     when :wait_readable
       KATO[self] = @@last_expire if :headers == @state
       return EP.set(self, IN)
@@ -67,7 +68,9 @@ module Rainbows::Epoll::Client
   def app_call input # called by on_read()
     @env[RACK_INPUT] = input
     @env[REMOTE_ADDR] = kgio_addr
+    @hp.hijack_setup(@env, self)
     status, headers, body = APP.call(@env.merge!(RACK_DEFAULTS))
+    return hijacked if @hp.hijacked?
     ev_write_response(status, headers, body, @hp.next?)
   end
 
@@ -78,7 +81,8 @@ module Rainbows::Epoll::Client
     if st.file?
       defer_file(status, headers, body, alive, io, st)
     elsif st.socket? || st.pipe?
-      chunk = stream_response_headers(status, headers, alive)
+      chunk = stream_response_headers(status, headers, alive, body)
+      return hijacked if nil == chunk
       stream_response_body(body, io, chunk)
     else
       # char or block device... WTF?
@@ -102,10 +106,18 @@ module Rainbows::Epoll::Client
     else
       write_response(status, headers, body, alive)
     end
+    return hijacked if @hp.hijacked?
     # try to read more if we didn't have to buffer writes
     next_request if alive && 0 == @wr_queue.size
   end
 
+  def hijacked
+    KATO.delete(self)
+    Server.decr # no other place to do this
+    EP.delete(self)
+    nil
+  end
+
   def next_request
     if 0 == @buf.size
       want_more
@@ -113,6 +125,7 @@ module Rainbows::Epoll::Client
       # pipelined request (already in buffer)
       on_read(Z)
       return if @wr_queue[0] || closed?
+      return hijacked if @hp.hijacked?
       close if :close == @state
     end
   end
@@ -197,13 +210,14 @@ module Rainbows::Epoll::Client
     true
   end
 
+  # Rack apps should not hijack here, but they may...
   def defer_file(status, headers, body, alive, io, st)
     if r = sendfile_range(status, headers)
       status, headers, range = r
-      write_headers(status, headers, alive)
+      write_headers(status, headers, alive, body) or return hijacked
       range and defer_file_stream(range[0], range[1], io, body)
     else
-      write_headers(status, headers, alive)
+      write_headers(status, headers, alive, body) or return hijacked
       defer_file_stream(0, st.size, io, body)
     end
   end
diff --git a/lib/rainbows/ev_core.rb b/lib/rainbows/ev_core.rb
index 46feaff..5c3c5b8 100644
--- a/lib/rainbows/ev_core.rb
+++ b/lib/rainbows/ev_core.rb
@@ -52,16 +52,17 @@ module Rainbows::EvCore
   end
 
   # returns whether to enable response chunking for autochunk models
-  def stream_response_headers(status, headers, alive)
+  # returns nil if request was hijacked in response stage
+  def stream_response_headers(status, headers, alive, body)
     headers = Rack::Utils::HeaderHash.new(headers) unless Hash === headers
     if headers.include?(Content_Length)
-      write_headers(status, headers, alive)
+      write_headers(status, headers, alive, body) or return
       return false
     end
 
     case @env[HTTP_VERSION]
     when "HTTP/1.0" # disable HTTP/1.0 keepalive to stream
-      write_headers(status, headers, false)
+      write_headers(status, headers, false, body) or return
       @hp.clear
       false
     when nil # "HTTP/0.9"
@@ -69,7 +70,7 @@ module Rainbows::EvCore
     else
       rv = !!(headers[Transfer_Encoding] =~ %r{\Achunked\z}i)
       rv = false unless @env["rainbows.autochunk"]
-      write_headers(status, headers, alive)
+      write_headers(status, headers, alive, body) or return
       rv
     end
   end
diff --git a/lib/rainbows/event_machine/client.rb b/lib/rainbows/event_machine/client.rb
index 26f0dbd..9871c09 100644
--- a/lib/rainbows/event_machine/client.rb
+++ b/lib/rainbows/event_machine/client.rb
@@ -10,6 +10,7 @@ class Rainbows::EventMachine::Client < EM::Connection
   end
 
   alias write send_data
+  alias hijacked detach
 
   def receive_data(data)
     # To avoid clobbering the current streaming response
@@ -37,9 +38,11 @@ class Rainbows::EventMachine::Client < EM::Connection
     @env[REMOTE_ADDR] = @_io.kgio_addr
     @env[ASYNC_CALLBACK] = method(:write_async_response)
     @env[ASYNC_CLOSE] = EM::DefaultDeferrable.new
+    @hp.hijack_setup(@env, @_io)
     status, headers, body = catch(:async) {
       APP.call(@env.merge!(RACK_DEFAULTS))
     }
+    return hijacked if @hp.hijacked?
 
     if (nil == status || -1 == status)
       @deferred = true
@@ -67,8 +70,8 @@ class Rainbows::EventMachine::Client < EM::Connection
   def ev_write_response(status, headers, body, alive)
     @state = :headers if alive
     if body.respond_to?(:errback) && body.respond_to?(:callback)
+      write_headers(status, headers, alive, body) or return hijacked
       @deferred = body
-      write_headers(status, headers, alive)
       write_body_each(body)
       deferred_errback(body)
       deferred_callback(body, alive)
@@ -77,21 +80,22 @@ class Rainbows::EventMachine::Client < EM::Connection
       st = File.stat(path = body.to_path)
 
       if st.file?
-        write_headers(status, headers, alive)
+        write_headers(status, headers, alive, body) or return hijacked
         @deferred = stream_file_data(path)
         deferred_errback(body)
         deferred_callback(body, alive)
         return
       elsif st.socket? || st.pipe?
+        chunk = stream_response_headers(status, headers, alive, body)
+        return hijacked if nil == chunk
         io = body_to_io(@deferred = body)
-        chunk = stream_response_headers(status, headers, alive)
         m = chunk ? Rainbows::EventMachine::ResponseChunkPipe :
                     Rainbows::EventMachine::ResponsePipe
         return EM.watch(io, m, self).notify_readable = true
       end
       # char or block device... WTF? fall through to body.each
     end
-    write_response(status, headers, body, alive)
+    write_response(status, headers, body, alive) or return hijacked
     if alive
       if @deferred.nil?
         if @buf.empty?
@@ -112,6 +116,7 @@ class Rainbows::EventMachine::Client < EM::Connection
   end
 
   def unbind
+    return if @hp.hijacked?
     async_close = @env[ASYNC_CLOSE] and async_close.succeed
     @deferred.respond_to?(:fail) and @deferred.fail
     begin
diff --git a/lib/rainbows/process_client.rb b/lib/rainbows/process_client.rb
index b685001..f58770c 100644
--- a/lib/rainbows/process_client.rb
+++ b/lib/rainbows/process_client.rb
@@ -40,6 +40,7 @@ module Rainbows::ProcessClient
 
       set_input(env, hp)
       env[REMOTE_ADDR] = kgio_addr
+      hp.hijack_setup(env, to_io)
       status, headers, body = APP.call(env.merge!(RACK_DEFAULTS))
 
       if 100 == status.to_i
@@ -47,7 +48,8 @@ module Rainbows::ProcessClient
         env.delete(HTTP_EXPECT)
         status, headers, body = APP.call(env)
       end
-      write_response(status, headers, body, alive = @hp.next?)
+      return if hp.hijacked?
+      write_response(status, headers, body, alive = hp.next?) or return
     end while alive
   # if we get any error, try to write something back to the client
   # assuming we haven't closed the socket, but don't get hung up
@@ -56,7 +58,7 @@ module Rainbows::ProcessClient
   rescue => e
     handle_error(e)
   ensure
-    close unless closed?
+    close unless closed? || hp.hijacked?
   end
 
   def handle_error(e)
@@ -71,13 +73,15 @@ module Rainbows::ProcessClient
     begin
       set_input(env, hp)
       env[REMOTE_ADDR] = kgio_addr
+      hp.hijack_setup(env, to_io)
       status, headers, body = APP.call(env.merge!(RACK_DEFAULTS))
       if 100 == status.to_i
         write(EXPECT_100_RESPONSE)
         env.delete(HTTP_EXPECT)
         status, headers, body = APP.call(env)
       end
-      write_response(status, headers, body, alive = hp.next?)
+      return if hp.hijacked?
+      write_response(status, headers, body, alive = hp.next?) or return
     end while alive && pipeline_ready(hp)
     alive or close
     rescue => e
diff --git a/lib/rainbows/response.rb b/lib/rainbows/response.rb
index f8b0831..8a0daf8 100644
--- a/lib/rainbows/response.rb
+++ b/lib/rainbows/response.rb
@@ -19,23 +19,56 @@ module Rainbows::Response
       Rainbows::HttpParser.keepalive_requests = 0
   end
 
-  def write_headers(status, headers, alive)
-    @hp.headers? or return
+  # Rack 1.5.0 (protocol version 1.2) adds response hijacking support
+  if ((Rack::VERSION[0] << 8) | Rack::VERSION[1]) >= 0x0102
+    RACK_HIJACK = "rack.hijack"
+
+    def hijack_prepare(value)
+      value
+    end
+
+    def hijack_socket
+      @hp.env[RACK_HIJACK].call
+    end
+  else
+    def hijack_prepare(_)
+    end
+  end
+
+  # returns the original body on success
+  # returns nil if the headers hijacked the response body
+  def write_headers(status, headers, alive, body)
+    @hp.headers? or return body
+    hijack = nil
     status = CODES[status.to_i] || status
     buf = "HTTP/1.1 #{status}\r\n" \
           "Date: #{httpdate}\r\n" \
-          "Status: #{status}\r\n" \
-          "Connection: #{alive ? KeepAlive : Close}\r\n"
+          "Status: #{status}\r\n"
     headers.each do |key, value|
-      next if %r{\A(?:Date\z|Connection\z)}i =~ key
-      if value =~ /\n/
-        # avoiding blank, key-only cookies with /\n+/
-        buf << value.split(/\n+/).map! { |v| "#{key}: #{v}\r\n" }.join
+      case key
+      when %r{\A(?:Date\z|Connection\z)}i
+        next
+      when "rack.hijack"
+        # this was an illegal key in Rack < 1.5, so it should be
+        # OK to silently discard it for those older versions
+        hijack = hijack_prepare(value)
+        alive = false # No persistent connections for hijacking
       else
-        buf << "#{key}: #{value}\r\n"
+        if /\n/ =~ value
+          # avoiding blank, key-only cookies with /\n+/
+          buf << value.split(/\n+/).map! { |v| "#{key}: #{v}\r\n" }.join
+        else
+          buf << "#{key}: #{value}\r\n"
+        end
       end
     end
-    write(buf << CRLF)
+    write(buf << "Connection: #{alive ? KeepAlive : Close}\r\n\r\n")
+
+    if hijack
+      body = nil # ensure caller does not close body
+      hijack.call(hijack_socket)
+    end
+    body
   end
 
   def close_if_private(io)
@@ -70,8 +103,9 @@ module Rainbows::Response
     # generic response writer, used for most dynamically-generated responses
     # and also when copy_stream and/or IO#trysendfile is unavailable
     def write_response(status, headers, body, alive)
-      write_headers(status, headers, alive)
-      write_body_each(body)
+      body = write_headers(status, headers, alive, body)
+      write_body_each(body) if body
+      body
       ensure
         body.close if body.respond_to?(:close)
     end
@@ -166,21 +200,23 @@ module Rainbows::Response
       if File.file?(body.to_path)
         if r = sendfile_range(status, headers)
           status, headers, range = r
-          write_headers(status, headers, alive)
-          write_body_file(body, range) if range
+          body = write_headers(status, headers, alive, body)
+          write_body_file(body, range) if body && range
         else
-          write_headers(status, headers, alive)
-          write_body_file(body, nil)
+          body = write_headers(status, headers, alive, body)
+          write_body_file(body, nil) if body
         end
       else
-        write_headers(status, headers, alive)
-        write_body_stream(body)
+        body = write_headers(status, headers, alive, body)
+        write_body_stream(body) if body
       end
+      body
       ensure
         body.close if body.respond_to?(:close)
     end
 
     module ToPath
+      # returns nil if hijacked
       def write_response(status, headers, body, alive)
         if body.respond_to?(:to_path)
           write_response_path(status, headers, body, alive)
diff --git a/lib/rainbows/revactor/client/methods.rb b/lib/rainbows/revactor/client/methods.rb
index b2e1847..592c996 100644
--- a/lib/rainbows/revactor/client/methods.rb
+++ b/lib/rainbows/revactor/client/methods.rb
@@ -36,7 +36,7 @@ module Rainbows::Revactor::Client::Methods
   end
 
   def write_response(status, headers, body, alive)
-    super(status, headers, body, alive)
+    super(status, headers, body, alive) or return
     alive && @ts and @hp.buf << @ts.leftover
   end
 
diff --git a/lib/rainbows/stream_response_epoll.rb b/lib/rainbows/stream_response_epoll.rb
index 3bb3540..33d7386 100644
--- a/lib/rainbows/stream_response_epoll.rb
+++ b/lib/rainbows/stream_response_epoll.rb
@@ -26,18 +26,24 @@ module Rainbows::StreamResponseEpoll
 
   def http_response_write(socket, status, headers, body)
     status = CODES[status.to_i] || status
-    ep_client = false
+    hijack = ep_client = false
 
     if headers
       # don't set extra headers here, this is only intended for
       # consuming by nginx.
       buf = "HTTP/1.0 #{status}\r\nStatus: #{status}\r\n"
       headers.each do |key, value|
-        if value =~ /\n/
-          # avoiding blank, key-only cookies with /\n+/
-          buf << value.split(/\n+/).map! { |v| "#{key}: #{v}\r\n" }.join
+        case key
+        when "rack.hijack"
+          hijack = hijack_prepare(value)
+          body = nil # ensure we do not close body
         else
-          buf << "#{key}: #{value}\r\n"
+          if /\n/ =~ value
+            # avoiding blank, key-only cookies with /\n+/
+            buf << value.split(/\n+/).map! { |v| "#{key}: #{v}\r\n" }.join
+          else
+            buf << "#{key}: #{value}\r\n"
+          end
         end
       end
       buf << HEADER_END
@@ -48,11 +54,22 @@ module Rainbows::StreamResponseEpoll
         buf = rv
       when :wait_writable
         ep_client = Client.new(socket, buf)
-        body.each { |chunk| ep_client.write(chunk) }
-        return ep_client.close
+        if hijack
+          ep_client.hijack(hijack)
+        else
+          body.each { |chunk| ep_client.write(chunk) }
+          ep_client.close
+        end
+        # body is nil on hijack, in which case ep_client is never closed by us
+        return
       end while true
     end
 
+    if hijack
+      hijack.call(socket)
+      return
+    end
+
     body.each do |chunk|
       if ep_client
         ep_client.write(chunk)
@@ -67,14 +84,15 @@ module Rainbows::StreamResponseEpoll
         end while true
       end
     end
-    ensure
-      body.respond_to?(:close) and body.close
-      if ep_client
-        ep_client.close
-      else
-        socket.shutdown
-        socket.close
-      end
+  ensure
+    return if hijack
+    body.respond_to?(:close) and body.close
+    if ep_client
+      ep_client.close
+    else
+      socket.shutdown
+      socket.close
+    end
   end
 
   # once a client is accepted, it is processed in its entirety here
@@ -88,6 +106,7 @@ module Rainbows::StreamResponseEpoll
       status, headers, body = @app.call(env)
     end
     @request.headers? or headers = nil
+    return if @request.hijacked?
     http_response_write(client, status, headers, body)
   rescue => e
     handle_error(client, e)
diff --git a/lib/rainbows/stream_response_epoll/client.rb b/lib/rainbows/stream_response_epoll/client.rb
index db303b0..dc226d6 100644
--- a/lib/rainbows/stream_response_epoll/client.rb
+++ b/lib/rainbows/stream_response_epoll/client.rb
@@ -18,7 +18,7 @@ class Rainbows::StreamResponseEpoll::Client
   attr_reader :to_io
 
   def initialize(io, unwritten)
-    @closed = false
+    @finish = false
     @to_io = io
     @wr_queue = [ unwritten.dup ]
     EP.set(self, OUT)
@@ -29,7 +29,11 @@ class Rainbows::StreamResponseEpoll::Client
   end
 
   def close
-    @closed = true
+    @finish = true
+  end
+
+  def hijack(hijack)
+    @finish = hijack
   end
 
   def epoll_run
@@ -49,10 +53,14 @@ class Rainbows::StreamResponseEpoll::Client
   end
 
   def on_write_complete
-    if @closed
+    if true == @finish
       @to_io.shutdown
       @to_io.close
       N.decr(0, 1)
+    elsif @finish.respond_to?(:call) # hijacked
+      EP.delete(self)
+      N.decr(0, 1)
+      @finish.call(@to_io)
     end
   end
 end
diff --git a/lib/rainbows/writer_thread_pool/client.rb b/lib/rainbows/writer_thread_pool/client.rb
index 4df7f49..e02d6a8 100644
--- a/lib/rainbows/writer_thread_pool/client.rb
+++ b/lib/rainbows/writer_thread_pool/client.rb
@@ -8,11 +8,13 @@ class Rainbows::WriterThreadPool::Client < Struct.new(:to_io, :q)
 
   module Methods
     def write_body_each(body)
+      return if @hp.hijacked?
       q << [ to_io, :write_body_each, body ]
     end
 
     def write_response_close(status, headers, body, alive)
       to_io.instance_variable_set(:@hp, @hp) # XXX ugh
+      return if @hp.hijacked?
       Rainbows::SyncClose.new(body) { |sync_body|
         q << [ to_io, :write_response, status, headers, sync_body, alive ]
       }