about summary refs log tree commit homepage
path: root/lib/rainbows/reverse_proxy.rb
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2011-02-01 15:05:52 -0800
committerEric Wong <normalperson@yhbt.net>2011-02-01 18:51:04 -0800
commit130d086666ccd825fcb13cf02fcf941b8fe661af (patch)
treea123aab9b8396d8f74ee72983cf92e125451ac8b /lib/rainbows/reverse_proxy.rb
parent9bf406655a84090426a62ab60677df529e408f17 (diff)
downloadrainbows-130d086666ccd825fcb13cf02fcf941b8fe661af.tar.gz
This can be a starting point for developing Cool.io or
EventMachine-based reverse proxy applications on Rainbows!

Eventually Rainbows! could replace nginx for Unicorn users!
Just don't consider this code production ready, yet, at all,
it doesn't handle any sort of failover and has no automated
tests, yet.
Diffstat (limited to 'lib/rainbows/reverse_proxy.rb')
-rw-r--r--lib/rainbows/reverse_proxy.rb185
1 files changed, 185 insertions, 0 deletions
diff --git a/lib/rainbows/reverse_proxy.rb b/lib/rainbows/reverse_proxy.rb
new file mode 100644
index 0000000..0a8fd2b
--- /dev/null
+++ b/lib/rainbows/reverse_proxy.rb
@@ -0,0 +1,185 @@
+# -*- encoding: binary -*-
+require 'socket'
+require 'thread'
+require 'uri'
+require 'kcar' # http://bogomips.org/kcar/ -- gem install kcar
+
+# A reverse proxy implementation for \Rainbows!  It is a Rack application
+# compatible and optimized for most \Rainbows! concurrency models.
+#
+# It makes HTTP/1.0 connections without keepalive to backends, so
+# it is only recommended for proxying to upstreams on the same LAN
+# or machine.  It can proxy to TCP hosts as well as UNIX domain sockets.
+#
+# Currently it only does simple round-robin balancing and does not
+# know to retry connections from failed backends.
+#
+# Buffering-behavior is currently dependent on the concurrency model selected:
+#
+# Fully-buffered (uploads and response bodies):
+#    Coolio, EventMachine, NeverBlock, CoolioThreadSpawn, CoolioThreadPool
+# If you're proxying to Unicorn, fully-buffered is the way to go.
+#
+# Buffered input only (uploads, but not response bodies):
+#    ThreadSpawn, ThreadPool, FiberSpawn, FiberPool, CoolioFiberSpawn
+#
+# It is not recommended to use Base, WriterThreadSpawn or WriterThreadPool
+# to host this application.  However, you may proxy to a backend running
+# one of these concurrency models with a fully-buffering concurrency model.
+#
+# See the {example config}[link:examples/reverse_proxy.ru] for a sample
+# configuration
+#
+# TODO: Revactor support
+# TODO: Support HTTP trailers
+# TODO: optional streaming input for synchronous
+# TODO: error handling
+#
+# WARNING! this is only lightly tested and has no automated tests, yet!
+class Rainbows::ReverseProxy
+  autoload :MultiThread, 'rainbows/reverse_proxy/multi_thread'
+  autoload :Synchronous, 'rainbows/reverse_proxy/synchronous'
+  autoload :Coolio, 'rainbows/reverse_proxy/coolio'
+  autoload :EventMachine, 'rainbows/reverse_proxy/event_machine'
+  autoload :EvClient, 'rainbows/reverse_proxy/ev_client'
+
+  HTTP_X_FORWARDED_FOR = "HTTP_X_FORWARDED_FOR"
+  REMOTE_ADDR = "REMOTE_ADDR"
+  REQUEST_METHOD = "REQUEST_METHOD"
+  REQUEST_URI = "REQUEST_URI"
+  CRLF = "\r\n"
+  TR = %w(_ -)
+  E502 = [ 502, [ %w(Content-Length 0), %w(Content-Type text/plain) ], [] ]
+
+  def initialize(opts)
+    @lock = Mutex.new
+    upstreams = opts[:upstreams]
+    @upstreams = []
+    upstreams.each do |url|
+      url, cfg = *url if Array === url
+      if url =~ %r{\Ahttp://}
+        uri = URI.parse(url)
+        sockaddr = Socket.sockaddr_in(uri.port, uri.host)
+      else
+        path = url.gsub(%r{\Aunix:}, "") # nginx compat
+        %r{\A~} =~ path and path = File.expand_path(path)
+        sockaddr = Socket.sockaddr_un(path)
+      end
+      ((cfg && cfg[:weight]) || 1).times { @upstreams << sockaddr }
+    end
+    @nr = 0
+  end
+
+  # detects the concurrency model at first run and replaces itself
+  def call(env)
+    if @lock.try_lock
+      case model = env["rainbows.model"]
+      when :EventMachine, :NeverBlock
+        extend(EventMachine)
+      when :Coolio, :CoolioThreadPool, :CoolioThreadSpawn
+        extend(Coolio)
+      when :RevFiberSpawn, :Rev, :RevThreadPool, :RevThreadSpawn
+        warn "#{model} is not *well* supported with #{self.class}"
+        warn "Switch to #{model.to_s.gsub(/Rev/, 'Coolio')}!"
+        extend(Synchronous)
+      when :Revactor
+        warn "Revactor is not *well* supported with #{self.class} yet"
+        extend(Synchronous)
+      when :FiberSpawn, :FiberPool, :CoolioFiberSpawn
+        extend(Synchronous)
+        Synchronous::UpstreamSocket.
+          __send__(:include, Rainbows::Fiber::IO::Methods)
+      when :WriterThreadSpawn, :WriterThreadPool
+        warn "#{model} is not recommended for use with #{self.class}"
+        extend(Synchronous)
+      else
+        extend(Synchronous)
+      end
+      extend(MultiThread) if env["rack.multithread"]
+      @lock.unlock
+    else
+      @lock.synchronize {} # wait for the first locker to finish
+    end
+    call(env)
+  end
+
+  # returns request headers for sending to the upstream as a string
+  def build_headers(env, input)
+    remote_addr = env[REMOTE_ADDR]
+    xff = env[HTTP_X_FORWARDED_FOR]
+    xff = xff ? "#{xff},#{remote_addr}" : remote_addr
+    req = "#{env[REQUEST_METHOD]} #{env[REQUEST_URI]} HTTP/1.0\r\n" \
+          "Connection: close\r\n" \
+          "X-Forwarded-For: #{xff}\r\n"
+    uscore, dash = *TR
+    env.each do |key, value|
+      %r{\AHTTP_(\w+)\z} =~ key or next
+      key = $1
+      next if %r{\A(?:VERSION|CONNECTION|KEEP_ALIVE|X_FORWARDED_FOR)\z}x =~ key
+      key.tr!(uscore, dash)
+      req << "#{key}: #{value}\r\n"
+    end
+    input and req << (input.respond_to?(:size) ?
+                     "Content-Length: #{input.size}\r\n" :
+                     "Transfer-Encoding: chunked\r\n")
+    req << CRLF
+  end
+
+  def pick_upstream(env) # +env+ is reserved for future expansion
+    @nr += 1
+    @upstreams[@nr %= @upstreams.size]
+  end
+
+  CONTENT_LENGTH = "CONTENT_LENGTH"
+  HTTP_TRANSFER_ENCODING = "HTTP_TRANSFER_ENCODING"
+  RackInput = "rack.input"
+
+  def prepare_input!(env)
+    if cl = env[CONTENT_LENGTH]
+      size = cl.to_i
+      size > 0 or return
+    elsif %r{\Achunked\z}i =~ env.delete(HTTP_TRANSFER_ENCODING)
+      # do people use multiple transfer-encodings?
+    else
+      return
+    end
+
+    input = env[RackInput]
+    if input.respond_to?(:rewind)
+      if input.respond_to?(:size)
+        input.size # TeeInput-specific behavior
+        return input
+      else
+        return SizedInput.new(input, size)
+      end
+    end
+    tmp = size && size < 0x4000 ? StringIO.new("") : Unicorn::TmpIO.new
+    each_block(input) { |x| tmp.syswrite(x) }
+    tmp.rewind
+    tmp
+  end
+
+  class SizedInput
+    attr_reader :size
+
+    def initialize(input, n)
+      buf = ""
+      if n == nil
+        n = 0
+        while input.read(16384, buf)
+          n += buf.size
+        end
+        input.rewind
+      end
+      @input, @size = input, n
+    end
+
+    def read(*args)
+      @input.read(*args)
+    end
+  end
+
+  class UpstreamSocket < Kgio::Socket
+    alias readpartial kgio_read!
+  end
+end