about summary refs log tree commit homepage
path: root/lib
diff options
context:
space:
mode:
authorEric Wong <normalperson@yhbt.net>2010-07-22 05:42:16 +0000
committerEric Wong <normalperson@yhbt.net>2010-07-22 09:09:37 +0000
commit416d3a0f868571319a2b29b0034d2dba68e4d5b3 (patch)
tree081bdbdcce23063667c707212ceda45bbc322675 /lib
parent015daa81f26afc59d1da857b8bbedfb80eb532b1 (diff)
downloadrainbows-416d3a0f868571319a2b29b0034d2dba68e4d5b3.tar.gz
The FileStreamer class of EventMachine (and by extension
NeverBlock) unfortunately doesn't handle this.  It's possible
to do with Revactor (since it uses Rev under the covers),
but we'll support what we can easily for now.
Diffstat (limited to 'lib')
-rw-r--r--lib/rainbows.rb2
-rw-r--r--lib/rainbows/base.rb3
-rw-r--r--lib/rainbows/const.rb1
-rw-r--r--lib/rainbows/error.rb2
-rw-r--r--lib/rainbows/fiber/body.rb10
-rw-r--r--lib/rainbows/fiber/rev.rb3
-rw-r--r--lib/rainbows/response.rb24
-rw-r--r--lib/rainbows/response/body.rb29
-rw-r--r--lib/rainbows/response/range.rb34
-rw-r--r--lib/rainbows/rev/client.rb12
-rw-r--r--lib/rainbows/rev/sendfile.rb5
-rw-r--r--lib/rainbows/stream_file.rb3
-rw-r--r--lib/rainbows/writer_thread_pool.rb8
-rw-r--r--lib/rainbows/writer_thread_spawn.rb13
14 files changed, 109 insertions, 40 deletions
diff --git a/lib/rainbows.rb b/lib/rainbows.rb
index 1a3d6ff..39d3ae1 100644
--- a/lib/rainbows.rb
+++ b/lib/rainbows.rb
@@ -130,4 +130,6 @@ module Rainbows
   autoload :ByteSlice, 'rainbows/byte_slice'
   autoload :StreamFile, 'rainbows/stream_file'
   autoload :HttpResponse, 'rainbows/http_response' # deprecated
+
+  class Response416 < RangeError; end
 end
diff --git a/lib/rainbows/base.rb b/lib/rainbows/base.rb
index 180c80c..63abdd2 100644
--- a/lib/rainbows/base.rb
+++ b/lib/rainbows/base.rb
@@ -73,11 +73,12 @@ module Rainbows::Base
 
       if hp.headers?
         headers = HH.new(headers)
+        range = parse_range(env, status, headers) and status = range.shift
         env = false unless hp.keepalive? && G.alive
         headers[CONNECTION] = env ? KEEP_ALIVE : CLOSE
         client.write(response_header(status, headers))
       end
-      write_body(client, body)
+      write_body(client, body, range)
     end while env && env.clear && hp.reset.nil?
   # if we get any error, try to write something back to the client
   # assuming we haven't closed the socket, but don't get hung up
diff --git a/lib/rainbows/const.rb b/lib/rainbows/const.rb
index 184dd86..c5c7d58 100644
--- a/lib/rainbows/const.rb
+++ b/lib/rainbows/const.rb
@@ -22,6 +22,7 @@ module Rainbows
     CLIENT_IO = "hack.io".freeze
 
     ERROR_413_RESPONSE = "HTTP/1.1 413 Request Entity Too Large\r\n\r\n"
+    ERROR_416_RESPONSE = "HTTP/1.1 416 Requested Range Not Satisfiable\r\n\r\n"
 
   end
 end
diff --git a/lib/rainbows/error.rb b/lib/rainbows/error.rb
index 8b4d9ff..7c91050 100644
--- a/lib/rainbows/error.rb
+++ b/lib/rainbows/error.rb
@@ -32,6 +32,8 @@ module Rainbows
         when EOFError, Errno::ECONNRESET, Errno::EPIPE, Errno::EINVAL,
              Errno::EBADF, Errno::ENOTCONN
           # swallow error if client shuts down one end or disconnects
+        when Rainbows::Response416
+          Const::ERROR_416_RESPONSE
         when Unicorn::HttpParserError
           Const::ERROR_400_RESPONSE # try to tell the client they're bad
         when IOError # HttpParserError is an IOError
diff --git a/lib/rainbows/fiber/body.rb b/lib/rainbows/fiber/body.rb
index ab5cfc8..c6c4484 100644
--- a/lib/rainbows/fiber/body.rb
+++ b/lib/rainbows/fiber/body.rb
@@ -12,15 +12,17 @@ module Rainbows::Fiber::Body # :nodoc:
 
   # the sendfile 1.0.0+ gem includes IO#sendfile_nonblock
   if ::IO.method_defined?(:sendfile_nonblock)
-    def write_body_file(client, body)
-      sock, off = client.to_io, 0
+    def write_body_file(client, body, range)
+      sock, n = client.to_io, nil
+      offset, count = range ? range : [ 0, body.stat.size ]
       begin
-        off += sock.sendfile_nonblock(body, off, 0x10000)
+        offset += (n = sock.sendfile_nonblock(body, offset, count))
       rescue Errno::EAGAIN
         client.wait_writable
+        retry
       rescue EOFError
         break
-      end while true
+      end while (count -= n) > 0
     end
   else
     ALIASES[:write_body] = :write_body_each
diff --git a/lib/rainbows/fiber/rev.rb b/lib/rainbows/fiber/rev.rb
index 5bf4fdd..d837c01 100644
--- a/lib/rainbows/fiber/rev.rb
+++ b/lib/rainbows/fiber/rev.rb
@@ -100,6 +100,7 @@ module Rainbows::Fiber
 
           if hp.headers?
             headers = HH.new(headers)
+            range = parse_range(env, status, headers) and status = range.shift
             headers[CONNECTION] = if hp.keepalive? && G.alive
               KEEP_ALIVE
             else
@@ -108,7 +109,7 @@ module Rainbows::Fiber
             end
             client.write(response_header(status, headers))
           end
-          write_body(client, body)
+          write_body(client, body, range)
         end while env && env.clear && hp.reset.nil?
       rescue => e
         Error.write(io, e)
diff --git a/lib/rainbows/response.rb b/lib/rainbows/response.rb
index 13946ca..44be30f 100644
--- a/lib/rainbows/response.rb
+++ b/lib/rainbows/response.rb
@@ -3,6 +3,8 @@
 require 'time' # for Time#httpdate
 
 module Rainbows::Response
+  autoload :Body, 'rainbows/response/body'
+  autoload :Range, 'rainbows/response/range'
 
   CODES = Unicorn::HttpResponse::CODES
   CRLF = "\r\n"
@@ -32,7 +34,25 @@ module Rainbows::Response
 
   # called after forking
   def self.setup(klass)
-    require('rainbows/response/body') and
-      klass.__send__(:include, Rainbows::Response::Body)
+    range_class = body_class = klass
+    case Rainbows::Const::RACK_DEFAULTS['rainbows.model']
+    when :WriterThreadSpawn
+      body_class = Rainbows::WriterThreadSpawn::MySocket
+      range_class = Rainbows::HttpServer
+    when :EventMachine, :NeverBlock, :Revactor
+      range_class = nil # :<
+    end
+    return if body_class.included_modules.include?(Body)
+    body_class.__send__(:include, Body)
+    sf = IO.respond_to?(:copy_stream) || IO.method_defined?(:sendfile_nonblock)
+    if range_class
+      range_class.__send__(:include, sf ? Range : NoRange)
+    end
+  end
+
+  module NoRange
+    # dummy method if we can't send range responses
+    def parse_range(env, status, headers)
+    end
   end
 end
diff --git a/lib/rainbows/response/body.rb b/lib/rainbows/response/body.rb
index 9e36412..cf14f08 100644
--- a/lib/rainbows/response/body.rb
+++ b/lib/rainbows/response/body.rb
@@ -46,22 +46,23 @@ module Rainbows::Response::Body # :nodoc:
   end
 
   if IO.method_defined?(:sendfile_nonblock)
-    def write_body_file(sock, body)
-      sock.sendfile(body, 0)
+    def write_body_file(sock, body, range)
+      range ? sock.sendfile(body, range[0], range[1]) : sock.sendfile(body, 0)
     end
   end
 
   if IO.respond_to?(:copy_stream)
     unless method_defined?(:write_body_file)
       # try to use sendfile() via IO.copy_stream, otherwise pread()+write()
-      def write_body_file(sock, body)
-        IO.copy_stream(body, sock, nil, 0)
+      def write_body_file(sock, body, range)
+        range ? IO.copy_stream(body, sock, range[1], range[0]) :
+                IO.copy_stream(body, sock, nil, 0)
       end
     end
 
     # only used when body is a pipe or socket that can't handle
     # pread() semantics
-    def write_body_stream(sock, body)
+    def write_body_stream(sock, body, range)
       IO.copy_stream(body, sock)
       ensure
         body.respond_to?(:close) and body.close
@@ -74,40 +75,40 @@ module Rainbows::Response::Body # :nodoc:
   if method_defined?(:write_body_file)
 
     # middlewares/apps may return with a body that responds to +to_path+
-    def write_body_path(sock, body)
+    def write_body_path(sock, body, range)
       inp = body_to_io(body)
       if inp.stat.file?
         begin
-          write_body_file(sock, inp)
+          write_body_file(sock, inp, range)
         ensure
           inp.close if inp != body
         end
       else
-        write_body_stream(sock, inp)
+        write_body_stream(sock, inp, range)
       end
       ensure
         body.respond_to?(:close) && inp != body and body.close
     end
   elsif method_defined?(:write_body_stream)
-    def write_body_path(sock, body)
-      write_body_stream(sock, inp = body_to_io(body))
+    def write_body_path(sock, body, range)
+      write_body_stream(sock, inp = body_to_io(body), range)
       ensure
         body.respond_to?(:close) && inp != body and body.close
     end
   end
 
   if method_defined?(:write_body_path)
-    def write_body(client, body)
+    def write_body(client, body, range)
       body.respond_to?(:to_path) ?
-        write_body_path(client, body) :
-        write_body_each(client, body)
+        write_body_path(client, body, range) :
+        write_body_each(client, body, range)
     end
   else
     ALIASES[:write_body] = :write_body_each
   end
 
   # generic body writer, used for most dynamically generated responses
-  def write_body_each(socket, body)
+  def write_body_each(socket, body, range = nil)
     body.each { |chunk| socket.write(chunk) }
     ensure
       body.respond_to?(:close) and body.close
diff --git a/lib/rainbows/response/range.rb b/lib/rainbows/response/range.rb
new file mode 100644
index 0000000..4c0d4a1
--- /dev/null
+++ b/lib/rainbows/response/range.rb
@@ -0,0 +1,34 @@
+# -*- encoding: binary -*-
+# :enddoc:
+module Rainbows::Response::Range
+  HTTP_RANGE = 'HTTP_RANGE'
+  Content_Range = 'Content-Range'.freeze
+  Content_Length = 'Content-Length'.freeze
+
+  # This does not support multipart responses (does anybody actually
+  # use those?) +headers+ is always a Rack::Utils::HeaderHash
+  def parse_range(env, status, headers)
+    if 200 == status.to_i &&
+        (clen = headers[Content_Length]) &&
+        /\Abytes=(\d+-\d*|\d*-\d+)\z/ =~ env[HTTP_RANGE]
+      a, b = $1.split(/-/)
+      clen = clen.to_i
+      if b.nil? # bytes=M-
+        offset = a.to_i
+        count = clen - offset
+      elsif a.empty? # bytes=-N
+        offset = clen - b.to_i
+        count = clen - offset
+      else  # bytes=M-N
+        offset = a.to_i
+        count = b.to_i + 1 - offset
+      end
+      raise Rainbows::Response416 if count <= 0 || offset >= clen
+      count = clen if count > clen
+      headers[Content_Length] = count.to_s
+      headers[Content_Range] = "bytes #{offset}-#{offset+count-1}/#{clen}"
+      [ status, offset, count ]
+    end
+    # nil if no status
+  end
+end
diff --git a/lib/rainbows/rev/client.rb b/lib/rainbows/rev/client.rb
index 502615e..2242c18 100644
--- a/lib/rainbows/rev/client.rb
+++ b/lib/rainbows/rev/client.rb
@@ -80,15 +80,21 @@ module Rainbows
           st = io.stat
 
           if st.file?
-            write(response_header(status, headers)) if headers
-            return defer_body(F.new(0, io, body))
+            offset, count = 0, st.size
+            if headers
+              if range = parse_range(@env, status, headers)
+                status, offset, count = range
+              end
+              write(response_header(status, headers))
+            end
+            return defer_body(F.new(offset, count, io, body))
           elsif st.socket? || st.pipe?
             return stream_response(status, headers, io, body)
           end
           # char or block device... WTF? fall through to body.each
         end
         write(response_header(status, headers)) if headers
-        write_body_each(self, body)
+        write_body_each(self, body, nil)
       end
 
       def app_call
diff --git a/lib/rainbows/rev/sendfile.rb b/lib/rainbows/rev/sendfile.rb
index 9f421f1..42368a1 100644
--- a/lib/rainbows/rev/sendfile.rb
+++ b/lib/rainbows/rev/sendfile.rb
@@ -2,8 +2,9 @@
 # :enddoc:
 module Rainbows::Rev::Sendfile
   if IO.method_defined?(:sendfile_nonblock)
-    def rev_sendfile(body)
-      body.offset += @_io.sendfile_nonblock(body, body.offset, 0x10000)
+    def rev_sendfile(sf) # +sf+ is a Rainbows::StreamFile object
+      sf.offset += (n = @_io.sendfile_nonblock(sf, sf.offset, sf.count))
+      0 == (sf.count -= n) and raise EOFError
       enable_write_watcher
       rescue Errno::EAGAIN
         enable_write_watcher
diff --git a/lib/rainbows/stream_file.rb b/lib/rainbows/stream_file.rb
index dec58b0..11c84d4 100644
--- a/lib/rainbows/stream_file.rb
+++ b/lib/rainbows/stream_file.rb
@@ -5,8 +5,7 @@
 # models.  We always maintain our own file offsets in userspace because
 # because sendfile() implementations offer pread()-like idempotency for
 # concurrency (multiple clients can read the same underlying file handle).
-class Rainbows::StreamFile < Struct.new(:offset, :to_io, :body)
-
+class Rainbows::StreamFile < Struct.new(:offset, :count, :to_io, :body)
   def close
     body.close if body.respond_to?(:close)
     to_io.close unless to_io.closed?
diff --git a/lib/rainbows/writer_thread_pool.rb b/lib/rainbows/writer_thread_pool.rb
index 4050af9..dd3dd7c 100644
--- a/lib/rainbows/writer_thread_pool.rb
+++ b/lib/rainbows/writer_thread_pool.rb
@@ -47,8 +47,8 @@ module Rainbows
     end
 
     module Response # :nodoc:
-      def write_body(qclient, body)
-        qclient.q << [ qclient.to_io, :body, body ]
+      def write_body(qclient, body, range)
+        qclient.q << [ qclient.to_io, :body, body, range ]
       end
     end
 
@@ -70,9 +70,9 @@ module Rainbows
       qp = (1..worker_connections).map do |n|
         QueuePool.new(1) do |response|
           begin
-            io, arg1, arg2 = response
+            io, arg1, arg2, arg3 = response
             case arg1
-            when :body then sync_write_body(io, arg2)
+            when :body then sync_write_body(io, arg2, arg3)
             when :close then io.close unless io.closed?
             else
               io.write(arg1)
diff --git a/lib/rainbows/writer_thread_spawn.rb b/lib/rainbows/writer_thread_spawn.rb
index cbe7765..17aa835 100644
--- a/lib/rainbows/writer_thread_spawn.rb
+++ b/lib/rainbows/writer_thread_spawn.rb
@@ -51,9 +51,9 @@ module Rainbows
         self.thr = Thread.new(to_io, q) do |io, q|
           while response = q.shift
             begin
-              arg1, arg2 = response
+              arg1, arg2, arg3 = response
               case arg1
-              when :body then write_body(io, arg2)
+              when :body then write_body(io, arg2, arg3)
               when :close
                 io.close unless io.closed?
                 break
@@ -73,8 +73,8 @@ module Rainbows
         (self.q ||= queue_writer) << buf
       end
 
-      def queue_body(body)
-        (self.q ||= queue_writer) << [ :body, body ]
+      def queue_body(body, range)
+        (self.q ||= queue_writer) << [ :body, body, range ]
       end
 
       def close
@@ -90,8 +90,8 @@ module Rainbows
       end
     end
 
-    def write_body(my_sock, body) # :nodoc:
-      my_sock.queue_body(body)
+    def write_body(my_sock, body, range) # :nodoc:
+      my_sock.queue_body(body, range)
     end
 
     def process_client(client) # :nodoc:
@@ -100,7 +100,6 @@ module Rainbows
 
     def worker_loop(worker)  # :nodoc:
       MySocket.const_set(:MAX, worker_connections)
-      Rainbows::Response.setup(MySocket)
       super(worker) # accept loop from Unicorn
       CUR.delete_if do |t,q|
         q << nil