From e21939d776673b2f8887adf7a5c64812b7d2e98e Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Thu, 30 Dec 2010 08:33:15 +0000 Subject: globally refactor Range handling for responses Rack::Utils::HeaderHash is still very expensive in Rack 1.2, especially for simple things that we want to run as fast as possible with minimal interference. HeaderHash is unnecessary for most requests that do not send Content-Range in responses. --- lib/rainbows/revactor/body.rb | 46 ----------------------- lib/rainbows/revactor/client.rb | 59 ++++++++++++++++++++++++++++++ lib/rainbows/revactor/client/methods.rb | 45 +++++++++++++++++++++++ lib/rainbows/revactor/client/tee_socket.rb | 44 ++++++++++++++++++++++ lib/rainbows/revactor/tee_socket.rb | 44 ---------------------- 5 files changed, 148 insertions(+), 90 deletions(-) delete mode 100644 lib/rainbows/revactor/body.rb create mode 100644 lib/rainbows/revactor/client.rb create mode 100644 lib/rainbows/revactor/client/methods.rb create mode 100644 lib/rainbows/revactor/client/tee_socket.rb delete mode 100644 lib/rainbows/revactor/tee_socket.rb (limited to 'lib/rainbows/revactor') diff --git a/lib/rainbows/revactor/body.rb b/lib/rainbows/revactor/body.rb deleted file mode 100644 index 9820df3..0000000 --- a/lib/rainbows/revactor/body.rb +++ /dev/null @@ -1,46 +0,0 @@ -# -*- encoding: binary -*- -# :enddoc: -module Rainbows::Revactor::Body - # TODO non-blocking splice(2) under Linux - ALIASES = { - :write_body_stream => :write_body_each - } - - if IO.method_defined?(:sendfile_nonblock) - def write_body_file_sendfile_revactor(client, body, range) - body = body_to_io(body) - sock = client.instance_variable_get(:@_io) - pfx = Revactor::TCP::Socket === client ? :tcp : :unix - write_complete = T[:"#{pfx}_write_complete", client] - closed = T[:"#{pfx}_closed", client] - offset, count = range ? range : [ 0, body.stat.size ] - begin - offset += (n = sock.sendfile_nonblock(body, offset, count)) - rescue Errno::EAGAIN - # The @_write_buffer is empty at this point, trigger the - # on_readable method which in turn triggers on_write_complete - # even though nothing was written - client.controller = Actor.current - client.__send__(:enable_write_watcher) - Actor.receive do |filter| - filter.when(write_complete) {} - filter.when(closed) { raise Errno::EPIPE } - end - retry - rescue EOFError - break - end while (count -= n) > 0 - ensure - close_if_private(body) - end - ALIASES[:write_body_file] = :write_body_file_sendfile_revactor - else - ALIASES[:write_body] = :write_body_each - end - - def self.included(klass) - ALIASES.each do |new_method, orig_method| - klass.__send__(:alias_method, new_method, orig_method) - end - end -end diff --git a/lib/rainbows/revactor/client.rb b/lib/rainbows/revactor/client.rb new file mode 100644 index 0000000..7c4b53d --- /dev/null +++ b/lib/rainbows/revactor/client.rb @@ -0,0 +1,59 @@ +# -*- encoding: binary -*- +# :enddoc: +require 'fcntl' +class Rainbows::Revactor::Client + autoload :TeeSocket, 'rainbows/revactor/client/tee_socket' + RD_ARGS = {} + RD_ARGS[:timeout] = Rainbows::G.kato if Rainbows::G.kato > 0 + attr_reader :kgio_addr + + def initialize(client) + @client, @rd_args, @ts = client, [ nil ], nil + io = client.instance_variable_get(:@_io) + io.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) + @kgio_addr = if Revactor::TCP::Socket === client + @rd_args << RD_ARGS + client.remote_addr + else + Kgio::LOCALHOST + end + end + + def kgio_read!(nr, buf) + buf.replace(@client.read) + end + + def write(buf) + @client.write(buf) + end + + def write_nonblock(buf) # only used for errors + @client.instance_variable_get(:@_io).write_nonblock(buf) + end + + def timed_read(buf2) + buf2.replace(@client.read(*@rd_args)) + end + + def set_input(env, hp) + env[RACK_INPUT] = 0 == hp.content_length ? + NULL_IO : IC.new(@ts = TeeSocket.new(@client), hp) + env[CLIENT_IO] = @client + end + + def close + @client.close + @client = nil + end + + def closed? + @client.nil? + end + + def self.setup + self.const_set(:IC, Unicorn::HttpRequest.input_class) + include Rainbows::ProcessClient + include Methods + end +end +require 'rainbows/revactor/client/methods' diff --git a/lib/rainbows/revactor/client/methods.rb b/lib/rainbows/revactor/client/methods.rb new file mode 100644 index 0000000..e9b39a3 --- /dev/null +++ b/lib/rainbows/revactor/client/methods.rb @@ -0,0 +1,45 @@ +# -*- encoding: binary -*- +# :enddoc: +module Rainbows::Revactor::Client::Methods + if IO.method_defined?(:sendfile_nonblock) + def write_body_file(body, range) + body, client = body_to_io(body), @client + sock = @client.instance_variable_get(:@_io) + pfx = Revactor::TCP::Socket === client ? :tcp : :unix + write_complete = T[:"#{pfx}_write_complete", client] + closed = T[:"#{pfx}_closed", client] + offset, count = range ? range : [ 0, body.stat.size ] + begin + offset += (n = sock.sendfile_nonblock(body, offset, count)) + rescue Errno::EAGAIN + # The @_write_buffer is empty at this point, trigger the + # on_readable method which in turn triggers on_write_complete + # even though nothing was written + client.controller = Actor.current + client.__send__(:enable_write_watcher) + Actor.receive do |filter| + filter.when(write_complete) {} + filter.when(closed) { raise Errno::EPIPE } + end + retry + rescue EOFError + break + end while (count -= n) > 0 + ensure + close_if_private(body) + end + end + + def handle_error(e) + Revactor::TCP::ReadError === e or super + end + + def write_response(status, headers, body, alive) + super(status, headers, body, alive) + alive && @ts and @hp.buf << @ts.leftover + end + + def self.included(klass) + klass.__send__ :alias_method, :write_body_stream, :write_body_each + end +end diff --git a/lib/rainbows/revactor/client/tee_socket.rb b/lib/rainbows/revactor/client/tee_socket.rb new file mode 100644 index 0000000..2f9f52e --- /dev/null +++ b/lib/rainbows/revactor/client/tee_socket.rb @@ -0,0 +1,44 @@ +# -*- encoding: binary -*- +# :enddoc: +# +# Revactor Sockets do not implement readpartial, so we emulate just +# enough to avoid mucking with TeeInput internals. Fortunately +# this code is not heavily used so we can usually avoid the overhead +# of adding a userspace buffer. +class Rainbows::Revactor::Client::TeeSocket + def initialize(socket) + # IO::Buffer is used internally by Rev which Revactor is based on + # so we'll always have it available + @socket, @rbuf = socket, IO::Buffer.new + end + + def leftover + @rbuf.read + end + + # Revactor socket reads always return an unspecified amount, + # sometimes too much + def kgio_read(length, dst = "") + return dst.replace("") if length == 0 + + # always check and return from the userspace buffer first + @rbuf.size > 0 and return dst.replace(@rbuf.read(length)) + + # read off the socket since there was nothing in rbuf + tmp = @socket.read + + # we didn't read too much, good, just return it straight back + # to avoid needlessly wasting memory bandwidth + tmp.size <= length and return dst.replace(tmp) + + # ugh, read returned too much + @rbuf << tmp[length, tmp.size] + dst.replace(tmp[0, length]) + rescue EOFError + end + + # just proxy any remaining methods TeeInput may use + def close + @socket.close + end +end diff --git a/lib/rainbows/revactor/tee_socket.rb b/lib/rainbows/revactor/tee_socket.rb deleted file mode 100644 index 71aeb88..0000000 --- a/lib/rainbows/revactor/tee_socket.rb +++ /dev/null @@ -1,44 +0,0 @@ -# -*- encoding: binary -*- -# :enddoc: -# -# Revactor Sockets do not implement readpartial, so we emulate just -# enough to avoid mucking with TeeInput internals. Fortunately -# this code is not heavily used so we can usually avoid the overhead -# of adding a userspace buffer. -class Rainbows::Revactor::TeeSocket - def initialize(socket) - # IO::Buffer is used internally by Rev which Revactor is based on - # so we'll always have it available - @socket, @rbuf = socket, IO::Buffer.new - end - - def leftover - @rbuf.read - end - - # Revactor socket reads always return an unspecified amount, - # sometimes too much - def kgio_read(length, dst = "") - return dst.replace("") if length == 0 - - # always check and return from the userspace buffer first - @rbuf.size > 0 and return dst.replace(@rbuf.read(length)) - - # read off the socket since there was nothing in rbuf - tmp = @socket.read - - # we didn't read too much, good, just return it straight back - # to avoid needlessly wasting memory bandwidth - tmp.size <= length and return dst.replace(tmp) - - # ugh, read returned too much - @rbuf << tmp[length, tmp.size] - dst.replace(tmp[0, length]) - rescue EOFError - end - - # just proxy any remaining methods TeeInput may use - def close - @socket.close - end -end -- cgit v1.2.3-24-ge0c7