From e21939d776673b2f8887adf7a5c64812b7d2e98e Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Thu, 30 Dec 2010 08:33:15 +0000 Subject: globally refactor Range handling for responses Rack::Utils::HeaderHash is still very expensive in Rack 1.2, especially for simple things that we want to run as fast as possible with minimal interference. HeaderHash is unnecessary for most requests that do not send Content-Range in responses. --- lib/rainbows.rb | 8 +- lib/rainbows/base.rb | 6 +- lib/rainbows/client.rb | 5 +- lib/rainbows/coolio.rb | 2 +- lib/rainbows/coolio/client.rb | 28 ++-- lib/rainbows/ev_core.rb | 5 +- lib/rainbows/event_machine.rb | 2 +- lib/rainbows/event_machine/client.rb | 17 +-- lib/rainbows/fiber/base.rb | 4 +- lib/rainbows/fiber/body.rb | 18 +-- lib/rainbows/fiber/coolio/server.rb | 3 +- lib/rainbows/process_client.rb | 43 +++---- lib/rainbows/rack_input.rb | 6 +- lib/rainbows/response.rb | 197 +++++++++++++++++++++++------ lib/rainbows/response/body.rb | 122 ------------------ lib/rainbows/response/range.rb | 34 ----- lib/rainbows/revactor.rb | 65 +--------- lib/rainbows/revactor/body.rb | 46 ------- lib/rainbows/revactor/client.rb | 59 +++++++++ lib/rainbows/revactor/client/methods.rb | 45 +++++++ lib/rainbows/revactor/client/tee_socket.rb | 44 +++++++ lib/rainbows/revactor/tee_socket.rb | 44 ------- lib/rainbows/thread_pool.rb | 4 +- lib/rainbows/thread_spawn.rb | 2 +- lib/rainbows/writer_thread_pool.rb | 36 ++---- lib/rainbows/writer_thread_pool/client.rb | 45 ++++++- lib/rainbows/writer_thread_spawn.rb | 13 +- lib/rainbows/writer_thread_spawn/client.rb | 63 +++++++-- 28 files changed, 479 insertions(+), 487 deletions(-) delete mode 100644 lib/rainbows/response/body.rb delete mode 100644 lib/rainbows/response/range.rb delete mode 100644 lib/rainbows/revactor/body.rb create mode 100644 lib/rainbows/revactor/client.rb create mode 100644 lib/rainbows/revactor/client/methods.rb create mode 100644 lib/rainbows/revactor/client/tee_socket.rb delete mode 100644 lib/rainbows/revactor/tee_socket.rb diff --git a/lib/rainbows.rb b/lib/rainbows.rb index 643bdd2..909e97e 100644 --- a/lib/rainbows.rb +++ b/lib/rainbows.rb @@ -39,9 +39,11 @@ module Rainbows require 'rainbows/const' require 'rainbows/http_parser' require 'rainbows/http_server' - require 'rainbows/response' - require 'rainbows/client' - require 'rainbows/process_client' + autoload :RackInput, 'rainbows/rack_input' + autoload :Response, 'rainbows/response' + autoload :ProcessClient, 'rainbows/process_client' + autoload :TimedRead, 'rainbows/timed_read' + autoload :Client, 'rainbows/client' autoload :Base, 'rainbows/base' autoload :Sendfile, 'rainbows/sendfile' autoload :AppPool, 'rainbows/app_pool' diff --git a/lib/rainbows/base.rb b/lib/rainbows/base.rb index bf9ef87..5d56063 100644 --- a/lib/rainbows/base.rb +++ b/lib/rainbows/base.rb @@ -6,9 +6,7 @@ # not intended for production use, as keepalive with a pure prefork # concurrency model is extremely expensive. module Rainbows::Base - # :stopdoc: - include Rainbows::ProcessClient # shortcuts... G = Rainbows::G @@ -34,6 +32,10 @@ module Rainbows::Base logger.info "Rainbows! #@use worker_connections=#@worker_connections" end + def process_client(client) + client.process_loop + end + def self.included(klass) # :nodoc: klass.const_set :LISTENERS, Rainbows::HttpServer::LISTENERS klass.const_set :G, Rainbows::G diff --git a/lib/rainbows/client.rb b/lib/rainbows/client.rb index dc6d95e..8425e9e 100644 --- a/lib/rainbows/client.rb +++ b/lib/rainbows/client.rb @@ -1,9 +1,8 @@ # -*- encoding: binary -*- # :enddoc: -require 'rainbows/timed_read' - +# this class is used for most synchronous concurrency models class Rainbows::Client < Kgio::Socket include Rainbows::TimedRead + include Rainbows::ProcessClient end -Kgio.accept_class = Rainbows::Client diff --git a/lib/rainbows/coolio.rb b/lib/rainbows/coolio.rb index 463bf0a..d0b8b2e 100644 --- a/lib/rainbows/coolio.rb +++ b/lib/rainbows/coolio.rb @@ -31,6 +31,7 @@ module Rainbows::Coolio KATO.compare_by_identity end + autoload :Client, 'rainbows/coolio/client' autoload :Master, 'rainbows/coolio/master' autoload :ThreadClient, 'rainbows/coolio/thread_client' autoload :ResponsePipe, 'rainbows/coolio/response_pipe' @@ -41,5 +42,4 @@ end require 'rainbows/coolio/heartbeat' require 'rainbows/coolio/server' require 'rainbows/coolio/core' -require 'rainbows/coolio/client' Rainbows::Coolio.__send__ :include, Rainbows::Coolio::Core diff --git a/lib/rainbows/coolio/client.rb b/lib/rainbows/coolio/client.rb index 6360e2d..d0b17a9 100644 --- a/lib/rainbows/coolio/client.rb +++ b/lib/rainbows/coolio/client.rb @@ -84,39 +84,37 @@ class Rainbows::Coolio::Client < Coolio::IO end # used for streaming sockets and pipes - def stream_response(status, headers, io, body) - c = stream_response_headers(status, headers) if headers + def stream_response_body(body, io, chunk) # we only want to attach to the Coolio::Loop belonging to the # main thread in Ruby 1.9 - io = (c ? ResponseChunkPipe : ResponsePipe).new(io, self, body) + io = (chunk ? ResponseChunkPipe : ResponsePipe).new(io, self, body) defer_body(io.attach(LOOP)) end def coolio_write_response(response, alive) status, headers, body = response - headers = @hp.headers? ? HH.new(headers) : nil - headers[CONNECTION] = alive ? KEEP_ALIVE : CLOSE if headers if body.respond_to?(:to_path) io = body_to_io(body) st = io.stat if st.file? - offset, count = 0, st.size - if headers - if range = make_range!(@env, status, headers) - status, offset, count = range - end - write(response_header(status, headers)) + if respond_to?(:sendfile_range) && r = sendfile_range(status, headers) + status, headers, range = r + write_headers(status, headers, alive) + defer_body(SF.new(range[0], range[1], io, body)) if range + else + write_headers(status, headers, alive) + defer_body(SF.new(0, st.size, io, body)) end - return defer_body(SF.new(offset, count, io, body)) + return elsif st.socket? || st.pipe? - return stream_response(status, headers, io, body) + chunk = stream_response_headers(status, headers, alive) + return stream_response_body(body, io, chunk) end # char or block device... WTF? fall through to body.each end - write(response_header(status, headers)) if headers - write_body_each(self, body, nil) + write_response(status, headers, body, alive) end def app_call diff --git a/lib/rainbows/ev_core.rb b/lib/rainbows/ev_core.rb index 60fbdca..471f6a3 100644 --- a/lib/rainbows/ev_core.rb +++ b/lib/rainbows/ev_core.rb @@ -36,14 +36,15 @@ module Rainbows::EvCore end # returns whether to enable response chunking for autochunk models - def stream_response_headers(status, headers) + def stream_response_headers(status, headers, alive) + headers = Rack::Utils::HeaderHash.new(headers) if headers['Content-Length'] rv = false else rv = !!(headers['Transfer-Encoding'] =~ %r{\Achunked\z}i) rv = false if headers.delete('X-Rainbows-Autochunk') == 'no' end - write(response_header(status, headers)) + write_headers(status, headers, alive) rv end diff --git a/lib/rainbows/event_machine.rb b/lib/rainbows/event_machine.rb index b226cab..cb76669 100644 --- a/lib/rainbows/event_machine.rb +++ b/lib/rainbows/event_machine.rb @@ -44,6 +44,7 @@ module Rainbows::EventMachine autoload :ResponsePipe, 'rainbows/event_machine/response_pipe' autoload :ResponseChunkPipe, 'rainbows/event_machine/response_chunk_pipe' autoload :TryDefer, 'rainbows/event_machine/try_defer' + autoload :Client, 'rainbows/event_machine/client' include Rainbows::Base @@ -89,5 +90,4 @@ module Rainbows::EventMachine end end # :enddoc: -require 'rainbows/event_machine/client' require 'rainbows/event_machine/server' diff --git a/lib/rainbows/event_machine/client.rb b/lib/rainbows/event_machine/client.rb index 6863be0..2fc9d03 100644 --- a/lib/rainbows/event_machine/client.rb +++ b/lib/rainbows/event_machine/client.rb @@ -58,27 +58,20 @@ class Rainbows::EventMachine::Client < EM::Connection end end + # don't change this method signature, "async.callback" relies on it def em_write_response(response, alive = false) status, headers, body = response - if @hp.headers? - headers = HH.new(headers) - headers[CONNECTION] = alive ? KEEP_ALIVE : CLOSE - else - headers = nil - end if body.respond_to?(:errback) && body.respond_to?(:callback) @body = body body.callback { quit } body.errback { quit } - # async response, this could be a trickle as is in comet-style apps - headers[CONNECTION] = CLOSE if headers alive = true elsif body.respond_to?(:to_path) st = File.stat(path = body.to_path) if st.file? - write(response_header(status, headers)) if headers + write_headers(status, headers, alive) @body = stream_file_data(path) @body.errback do body.close if body.respond_to?(:close) @@ -92,16 +85,14 @@ class Rainbows::EventMachine::Client < EM::Connection return elsif st.socket? || st.pipe? io = body_to_io(@body = body) - chunk = stream_response_headers(status, headers) if headers + chunk = stream_response_headers(status, headers, alive) m = chunk ? Rainbows::EventMachine::ResponseChunkPipe : Rainbows::EventMachine::ResponsePipe return EM.watch(io, m, self).notify_readable = true end # char or block device... WTF? fall through to body.each end - - write(response_header(status, headers)) if headers - write_body_each(self, body) + write_response(status, headers, body, alive) quit unless alive end diff --git a/lib/rainbows/fiber/base.rb b/lib/rainbows/fiber/base.rb index b693451..ae885b6 100644 --- a/lib/rainbows/fiber/base.rb +++ b/lib/rainbows/fiber/base.rb @@ -57,7 +57,7 @@ module Rainbows::Fiber::Base def process(client) G.cur += 1 - process_client(client) + client.process_loop ensure G.cur -= 1 ZZ.delete(client.f) @@ -65,7 +65,7 @@ module Rainbows::Fiber::Base def self.setup(klass, app) require 'rainbows/fiber/body' - klass.__send__(:include, Rainbows::Fiber::Body) + Rainbows::Client.__send__(:include, Rainbows::Fiber::Body) self.const_set(:APP, app) end end diff --git a/lib/rainbows/fiber/body.rb b/lib/rainbows/fiber/body.rb index 1d7d325..872b1df 100644 --- a/lib/rainbows/fiber/body.rb +++ b/lib/rainbows/fiber/body.rb @@ -5,20 +5,15 @@ # this is meant to be included _after_ Rainbows::Response::Body module Rainbows::Fiber::Body # :nodoc: - # TODO non-blocking splice(2) under Linux - ALIASES = { - :write_body_stream => :write_body_each - } - # the sendfile 1.0.0+ gem includes IO#sendfile_nonblock if IO.method_defined?(:sendfile_nonblock) - def write_body_file_sendfile_fiber(client, body, range) - sock, n, body = client.to_io, nil, body_to_io(body) + def write_body_file(body, range) + sock, n, body = to_io, nil, body_to_io(body) offset, count = range ? range : [ 0, body.stat.size ] begin offset += (n = sock.sendfile_nonblock(body, offset, count)) rescue Errno::EAGAIN - client.kgio_wait_writable + kgio_wait_writable retry rescue EOFError break @@ -26,14 +21,9 @@ module Rainbows::Fiber::Body # :nodoc: ensure close_if_private(body) end - ALIASES[:write_body_file] = :write_body_file_sendfile_fiber - else - ALIASES[:write_body] = :write_body_each end def self.included(klass) - ALIASES.each do |new_method, orig_method| - klass.__send__(:alias_method, new_method, orig_method) - end + klass.__send__ :alias_method, :write_body_stream, :write_body_each end end diff --git a/lib/rainbows/fiber/coolio/server.rb b/lib/rainbows/fiber/coolio/server.rb index 0de1ab3..b064953 100644 --- a/lib/rainbows/fiber/coolio/server.rb +++ b/lib/rainbows/fiber/coolio/server.rb @@ -2,7 +2,6 @@ # :enddoc: class Rainbows::Fiber::Coolio::Server < Coolio::IOWatcher G = Rainbows::G - include Rainbows::ProcessClient def to_io @io @@ -25,7 +24,7 @@ class Rainbows::Fiber::Coolio::Server < Coolio::IOWatcher def process(io) G.cur += 1 - process_client(io) + io.process_loop ensure G.cur -= 1 end diff --git a/lib/rainbows/process_client.rb b/lib/rainbows/process_client.rb index 54e59e8..d840778 100644 --- a/lib/rainbows/process_client.rb +++ b/lib/rainbows/process_client.rb @@ -1,54 +1,41 @@ # -*- encoding: binary -*- -# :enddoc: -require 'rainbows/rack_input' module Rainbows::ProcessClient - G = Rainbows::G include Rainbows::Response - HttpParser = Rainbows::HttpParser include Rainbows::RackInput include Rainbows::Const - # once a client is accepted, it is processed in its entirety here - # in 3 easy steps: read request, call app, write app response - # this is used by synchronous concurrency models - # Base, ThreadSpawn, ThreadPool - def process_client(client) # :nodoc: - hp = HttpParser.new - client.kgio_read!(16384, buf = hp.buf) - remote_addr = client.kgio_addr - alive = false + def process_loop + @hp = hp = Rainbows::HttpParser.new + kgio_read!(16384, buf = hp.buf) or return begin # loop until env = hp.parse - client.timed_read(buf2 ||= "") or return + timed_read(buf2 ||= "") or return buf << buf2 end - set_input(env, hp, client) - env[REMOTE_ADDR] = remote_addr - status, headers, body = APP.call(env.update(RACK_DEFAULTS)) + set_input(env, hp) + env[REMOTE_ADDR] = kgio_addr + status, headers, body = APP.call(env.merge!(RACK_DEFAULTS)) if 100 == status.to_i - client.write(EXPECT_100_RESPONSE) + write(EXPECT_100_RESPONSE) env.delete(HTTP_EXPECT) status, headers, body = APP.call(env) end - - if hp.headers? - headers = HH.new(headers) - range = make_range!(env, status, headers) and status = range.shift - headers[CONNECTION] = (alive = hp.next?) ? KEEP_ALIVE : CLOSE - client.write(response_header(status, headers)) - end - write_body(client, body, range) + write_response(status, headers, body, alive = @hp.next?) end while alive # if we get any error, try to write something back to the client # assuming we haven't closed the socket, but don't get hung up # if the socket is already closed or broken. We'll always ensure # the socket is closed at the end of this function rescue => e - Rainbows::Error.write(client, e) + handle_error(e) ensure - client.close unless client.closed? + close unless closed? + end + + def handle_error(e) + Rainbows::Error.write(self, e) end end diff --git a/lib/rainbows/rack_input.rb b/lib/rainbows/rack_input.rb index df51ac1..bc68ed1 100644 --- a/lib/rainbows/rack_input.rb +++ b/lib/rainbows/rack_input.rb @@ -10,8 +10,8 @@ module Rainbows::RackInput const_set(:IC, Unicorn::HttpRequest.input_class) end - def set_input(env, hp, client) - env[RACK_INPUT] = 0 == hp.content_length ? NULL_IO : IC.new(client, hp) - env[CLIENT_IO] = client + def set_input(env, hp) + env[RACK_INPUT] = 0 == hp.content_length ? NULL_IO : IC.new(self, hp) + env[CLIENT_IO] = self end end diff --git a/lib/rainbows/response.rb b/lib/rainbows/response.rb index ca381b8..c0d0740 100644 --- a/lib/rainbows/response.rb +++ b/lib/rainbows/response.rb @@ -3,60 +3,179 @@ require 'time' # for Time#httpdate module Rainbows::Response - autoload :Body, 'rainbows/response/body' - autoload :Range, 'rainbows/response/range' - + CRLF = Unicorn::HttpResponse::CRLF CODES = Unicorn::HttpResponse::CODES - CRLF = "\r\n" + Close = "close" + KeepAlive = "keep-alive" + + # private file class for IO objects opened by Rainbows! itself (and not + # the app or middleware) + class F < File; end - # freeze headers we may set as hash keys for a small speedup - CONNECTION = "Connection".freeze - CLOSE = "close" - KEEP_ALIVE = "keep-alive" - HH = Rack::Utils::HeaderHash + # called after forking + def self.setup(klass) + Kgio.accept_class = Rainbows::Client + 0 == Rainbows::G.kato and Rainbows::HttpParser.keepalive_requests = 0 + end - def response_header(status, headers) + def write_headers(status, headers, alive) + @hp.headers? or return status = CODES[status.to_i] || status - rv = "HTTP/1.1 #{status}\r\n" \ - "Date: #{Time.now.httpdate}\r\n" \ - "Status: #{status}\r\n" + buf = "HTTP/1.1 #{status}\r\n" \ + "Date: #{Time.now.httpdate}\r\n" \ + "Status: #{status}\r\n" \ + "Connection: #{alive ? KeepAlive : Close}\r\n" headers.each do |key, value| - next if %r{\A(?:X-Rainbows-|Date\z|Status\z)}i =~ key + next if %r{\A(?:X-Rainbows-|Date\z|Status\z\|Connection\z)}i =~ key if value =~ /\n/ # avoiding blank, key-only cookies with /\n+/ - rv << value.split(/\n+/).map! { |v| "#{key}: #{v}\r\n" }.join('') + buf << value.split(/\n+/).map! { |v| "#{key}: #{v}\r\n" }.join else - rv << "#{key}: #{value}\r\n" + buf << "#{key}: #{value}\r\n" end end - rv << CRLF + write(buf << CRLF) end - # called after forking - def self.setup(klass) - if 0 == Rainbows::G.kato - KEEP_ALIVE.replace(CLOSE) - Rainbows::HttpParser.keepalive_requests = 0 - end - range_class = body_class = klass - case Rainbows::Const::RACK_DEFAULTS['rainbows.model'] - when :WriterThreadSpawn - body_class = Rainbows::WriterThreadSpawn::Client - range_class = Rainbows::HttpServer - when :EventMachine, :NeverBlock - range_class = nil # :< - end - return if body_class.included_modules.include?(Body) - body_class.__send__(:include, Body) - sf = IO.respond_to?(:copy_stream) || IO.method_defined?(:sendfile_nonblock) - if range_class - range_class.__send__(:include, sf ? Range : NoRange) + def close_if_private(io) + io.close if F === io + end + + def io_for_fd(fd) + Rainbows::FD_MAP.delete(fd) || F.for_fd(fd) + end + + # to_io is not part of the Rack spec, but make an exception here + # since we can conserve path lookups and file descriptors. + # \Rainbows! will never get here without checking for the existence + # of body.to_path first. + def body_to_io(body) + if body.respond_to?(:to_io) + body.to_io + else + # try to take advantage of Rainbows::DevFdResponse, calling F.open + # is a last resort + path = body.to_path + %r{\A/dev/fd/(\d+)\z} =~ path ? io_for_fd($1.to_i) : F.open(path) end end - module NoRange - # dummy method if we can't send range responses - def make_range!(env, status, headers) + module Each + # generic body writer, used for most dynamically-generated responses + def write_body_each(body) + body.each { |chunk| write(chunk) } + end + + # generic response writer, used for most dynamically-generated responses + # and also when IO.copy_stream and/or IO#sendfile_nonblock is unavailable + def write_response(status, headers, body, alive) + write_headers(status, headers, alive) + write_body_each(body) + ensure + body.close if body.respond_to?(:close) end end + include Each + + if IO.method_defined?(:sendfile_nonblock) + module Sendfile + def write_body_file(body, range) + io = body_to_io(body) + range ? sendfile(io, range[0], range[1]) : sendfile(io, 0) + ensure + close_if_private(io) + end + end + include Sendfile + end + + if IO.respond_to?(:copy_stream) + unless IO.method_defined?(:sendfile_nonblock) + module CopyStream + def write_body_file(body, range) + range ? IO.copy_stream(body, self, range[1], range[0]) : + IO.copy_stream(body, self, nil, 0) + end + end + include CopyStream + end + + # write_body_stream is an alias for write_body_each if IO.copy_stream + # isn't used or available. + def write_body_stream(body) + IO.copy_stream(io = body_to_io(body), self) + ensure + close_if_private(io) + end + else # ! IO.respond_to?(:copy_stream) + alias write_body_stream write_body_each + end # ! IO.respond_to?(:copy_stream) + + if IO.method_defined?(:sendfile_nonblock) || IO.respond_to?(:copy_stream) + HTTP_RANGE = 'HTTP_RANGE' + Content_Range = 'Content-Range'.freeze + Content_Length = 'Content-Length'.freeze + + # This does not support multipart responses (does anybody actually + # use those?) + def sendfile_range(status, headers) + 200 == status.to_i && + /\Abytes=(\d+-\d*|\d*-\d+)\z/ =~ @hp.env[HTTP_RANGE] or + return + a, b = $1.split(/-/) + + headers = Rack::Utils::HeaderHash.new(headers) + clen = headers[Content_Length] or return + size = clen.to_i + + if b.nil? # bytes=M- + offset = a.to_i + count = size - offset + elsif a.empty? # bytes=-N + offset = size - b.to_i + count = size - offset + else # bytes=M-N + offset = a.to_i + count = b.to_i + 1 - offset + end + + if 0 > count || offset >= size + return 416, headers, nil + else + count = size if count > size + headers[Content_Length] = count.to_s + headers[Content_Range] = "bytes #{offset}-#{offset+count-1}/#{clen}" + return 206, headers, [ offset, count ] + end + end + + def write_response_path(status, headers, body, alive) + if File.file?(body.to_path) + if r = sendfile_range(status, headers) + status, headers, range = r + write_headers(status, headers, alive) + write_body_file(body, range) if range + else + write_headers(status, headers, alive) + write_body_file(body, nil) + end + else + write_headers(status, headers, alive) + write_body_stream(body) + end + ensure + body.close if body.respond_to?(:close) + end + + module ToPath + def write_response(status, headers, body, alive) + if body.respond_to?(:to_path) + write_response_path(status, headers, body, alive) + else + super + end + end + end + include ToPath + end # IO.respond_to?(:copy_stream) || IO.method_defined?(:sendfile_nonblock) end diff --git a/lib/rainbows/response/body.rb b/lib/rainbows/response/body.rb deleted file mode 100644 index a5d04dd..0000000 --- a/lib/rainbows/response/body.rb +++ /dev/null @@ -1,122 +0,0 @@ -# -*- encoding: binary -*- -# :enddoc: -# non-portable body response stuff goes here -# -# The sendfile 1.0.0 RubyGem includes IO#sendfile and -# IO#sendfile_nonblock. Previous versions of "sendfile" didn't have -# IO#sendfile_nonblock, and IO#sendfile in previous versions could -# block other threads under 1.8 with large files -# -# IO#sendfile currently (June 2010) beats 1.9 IO.copy_stream with -# non-Linux support and large files on 32-bit. We still fall back to -# IO.copy_stream (if available) if we're dealing with DevFdResponse -# objects, though. -# -# Linux-only splice(2) support via the "io_splice" gem will eventually -# be added for streaming sockets/pipes, too. -# -# * write_body_file - regular files (sendfile or pread+write) -# * write_body_stream - socket/pipes (read+write, splice later) -# * write_body_each - generic fallback -# -# callgraph is as follows: -# -# write_body -# `- write_body_each -# `- write_body_path -# `- write_body_file -# `- write_body_stream -# -module Rainbows::Response::Body # :nodoc: - ALIASES = {} - - FD_MAP = Rainbows::FD_MAP - - class F < File; end - - def close_if_private(io) - io.close if F === io - end - - def io_for_fd(fd) - FD_MAP.delete(fd) || F.for_fd(fd) - end - - # to_io is not part of the Rack spec, but make an exception here - # since we can conserve path lookups and file descriptors. - # \Rainbows! will never get here without checking for the existence - # of body.to_path first. - def body_to_io(body) - if body.respond_to?(:to_io) - body.to_io - else - # try to take advantage of Rainbows::DevFdResponse, calling File.open - # is a last resort - path = body.to_path - path =~ %r{\A/dev/fd/(\d+)\z} ? io_for_fd($1.to_i) : F.open(path) - end - end - - if IO.method_defined?(:sendfile_nonblock) - def write_body_file_sendfile(sock, body, range) - io = body_to_io(body) - range ? sock.sendfile(io, range[0], range[1]) : sock.sendfile(io, 0) - ensure - close_if_private(io) - end - ALIASES[:write_body_file] = :write_body_file_sendfile - end - - if IO.respond_to?(:copy_stream) - unless method_defined?(:write_body_file_sendfile) - # try to use sendfile() via IO.copy_stream, otherwise pread()+write() - def write_body_file_copy_stream(sock, body, range) - range ? IO.copy_stream(body, sock, range[1], range[0]) : - IO.copy_stream(body, sock, nil, 0) - end - ALIASES[:write_body_file] = :write_body_file_copy_stream - end - - # only used when body is a pipe or socket that can't handle - # pread() semantics - def write_body_stream(sock, body) - IO.copy_stream(body, sock) - end - else - # fall back to body#each, which is a Rack standard - ALIASES[:write_body_stream] = :write_body_each - end - - if ALIASES[:write_body_file] - # middlewares/apps may return with a body that responds to +to_path+ - def write_body_path(sock, body, range) - File.file?(body.to_path) ? write_body_file(sock, body, range) : - write_body_stream(sock, body) - ensure - body.respond_to?(:close) and body.close - end - end - - if method_defined?(:write_body_path) - def write_body(client, body, range) - body.respond_to?(:to_path) ? - write_body_path(client, body, range) : - write_body_each(client, body) - end - else - ALIASES[:write_body] = :write_body_each - end - - # generic body writer, used for most dynamically generated responses - def write_body_each(socket, body, range = nil) - body.each { |chunk| socket.write(chunk) } - ensure - body.respond_to?(:close) and body.close - end - - def self.included(klass) - ALIASES.each do |new_method, orig_method| - klass.__send__(:alias_method, new_method, orig_method) - end - end -end diff --git a/lib/rainbows/response/range.rb b/lib/rainbows/response/range.rb deleted file mode 100644 index b383587..0000000 --- a/lib/rainbows/response/range.rb +++ /dev/null @@ -1,34 +0,0 @@ -# -*- encoding: binary -*- -# :enddoc: -module Rainbows::Response::Range - HTTP_RANGE = 'HTTP_RANGE' - Content_Range = 'Content-Range'.freeze - Content_Length = 'Content-Length'.freeze - - # This does not support multipart responses (does anybody actually - # use those?) +headers+ is always a Rack::Utils::HeaderHash - def make_range!(env, status, headers) - if 200 == status.to_i && - (clen = headers[Content_Length]) && - /\Abytes=(\d+-\d*|\d*-\d+)\z/ =~ env[HTTP_RANGE] - a, b = $1.split(/-/) - clen = clen.to_i - if b.nil? # bytes=M- - offset = a.to_i - count = clen - offset - elsif a.empty? # bytes=-N - offset = clen - b.to_i - count = clen - offset - else # bytes=M-N - offset = a.to_i - count = b.to_i + 1 - offset - end - raise Rainbows::Response416 if count <= 0 || offset >= clen - count = clen if count > clen - headers[Content_Length] = count.to_s - headers[Content_Range] = "bytes #{offset}-#{offset+count-1}/#{clen}" - [ 206, offset, count ] - end - # nil if no status - end -end diff --git a/lib/rainbows/revactor.rb b/lib/rainbows/revactor.rb index f4e8fca..be4badf 100644 --- a/lib/rainbows/revactor.rb +++ b/lib/rainbows/revactor.rb @@ -19,76 +19,17 @@ Revactor::VERSION >= '0.1.5' or abort 'revactor 0.1.5 is required' # \Revactor library as well, to take advantage of the networking # concurrency features this model provides. module Rainbows::Revactor - - # :stopdoc: - RD_ARGS = {} - + autoload :Client, 'rainbows/revactor/client' autoload :Proxy, 'rainbows/revactor/proxy' - autoload :TeeSocket, 'rainbows/revactor/tee_socket' include Rainbows::Base - LOCALHOST = Kgio::LOCALHOST - TCP = Revactor::TCP::Socket - - # once a client is accepted, it is processed in its entirety here - # in 3 easy steps: read request, call app, write app response - def process_client(client) # :nodoc: - io = client.instance_variable_get(:@_io) - io.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) - rd_args = [ nil ] - remote_addr = if TCP === client - rd_args << RD_ARGS - client.remote_addr - else - LOCALHOST - end - hp = Rainbows::HttpParser.new - buf = hp.buf - alive = false - - begin - ts = nil - until env = hp.parse - buf << client.read(*rd_args) - end - - env[CLIENT_IO] = client - env[RACK_INPUT] = 0 == hp.content_length ? - NULL_IO : IC.new(ts = TeeSocket.new(client), hp) - env[REMOTE_ADDR] = remote_addr - status, headers, body = app.call(env.update(RACK_DEFAULTS)) - - if 100 == status.to_i - client.write(EXPECT_100_RESPONSE) - env.delete(HTTP_EXPECT) - status, headers, body = app.call(env) - end - - if hp.headers? - headers = HH.new(headers) - range = make_range!(env, status, headers) and status = range.shift - headers[CONNECTION] = (alive = hp.next?) ? KEEP_ALIVE : CLOSE - client.write(response_header(status, headers)) - alive && ts and buf << ts.leftover - end - write_body(client, body, range) - end while alive - rescue Revactor::TCP::ReadError - rescue => e - Rainbows::Error.write(io, e) - ensure - client.close - end # runs inside each forked worker, this sits around and waits # for connections and doesn't die until the parent dies (or is # given a INT, QUIT, or TERM signal) def worker_loop(worker) #:nodoc: + Client.setup init_worker_process(worker) - require 'rainbows/revactor/body' - self.class.__send__(:include, Rainbows::Revactor::Body) - self.class.const_set(:IC, Unicorn::HttpRequest.input_class) - RD_ARGS[:timeout] = G.kato if G.kato > 0 nr = 0 limit = worker_connections actor_exit = Case[:exit, Actor, Object] @@ -114,7 +55,7 @@ module Rainbows::Revactor f.when(actor_exit) { nr -= 1 } f.when(accept) do |_, _, s| nr += 1 - Actor.spawn_link(s) { |c| process_client(c) } + Actor.spawn_link(s) { |c| Client.new(c).process_loop } end end rescue => e diff --git a/lib/rainbows/revactor/body.rb b/lib/rainbows/revactor/body.rb deleted file mode 100644 index 9820df3..0000000 --- a/lib/rainbows/revactor/body.rb +++ /dev/null @@ -1,46 +0,0 @@ -# -*- encoding: binary -*- -# :enddoc: -module Rainbows::Revactor::Body - # TODO non-blocking splice(2) under Linux - ALIASES = { - :write_body_stream => :write_body_each - } - - if IO.method_defined?(:sendfile_nonblock) - def write_body_file_sendfile_revactor(client, body, range) - body = body_to_io(body) - sock = client.instance_variable_get(:@_io) - pfx = Revactor::TCP::Socket === client ? :tcp : :unix - write_complete = T[:"#{pfx}_write_complete", client] - closed = T[:"#{pfx}_closed", client] - offset, count = range ? range : [ 0, body.stat.size ] - begin - offset += (n = sock.sendfile_nonblock(body, offset, count)) - rescue Errno::EAGAIN - # The @_write_buffer is empty at this point, trigger the - # on_readable method which in turn triggers on_write_complete - # even though nothing was written - client.controller = Actor.current - client.__send__(:enable_write_watcher) - Actor.receive do |filter| - filter.when(write_complete) {} - filter.when(closed) { raise Errno::EPIPE } - end - retry - rescue EOFError - break - end while (count -= n) > 0 - ensure - close_if_private(body) - end - ALIASES[:write_body_file] = :write_body_file_sendfile_revactor - else - ALIASES[:write_body] = :write_body_each - end - - def self.included(klass) - ALIASES.each do |new_method, orig_method| - klass.__send__(:alias_method, new_method, orig_method) - end - end -end diff --git a/lib/rainbows/revactor/client.rb b/lib/rainbows/revactor/client.rb new file mode 100644 index 0000000..7c4b53d --- /dev/null +++ b/lib/rainbows/revactor/client.rb @@ -0,0 +1,59 @@ +# -*- encoding: binary -*- +# :enddoc: +require 'fcntl' +class Rainbows::Revactor::Client + autoload :TeeSocket, 'rainbows/revactor/client/tee_socket' + RD_ARGS = {} + RD_ARGS[:timeout] = Rainbows::G.kato if Rainbows::G.kato > 0 + attr_reader :kgio_addr + + def initialize(client) + @client, @rd_args, @ts = client, [ nil ], nil + io = client.instance_variable_get(:@_io) + io.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) + @kgio_addr = if Revactor::TCP::Socket === client + @rd_args << RD_ARGS + client.remote_addr + else + Kgio::LOCALHOST + end + end + + def kgio_read!(nr, buf) + buf.replace(@client.read) + end + + def write(buf) + @client.write(buf) + end + + def write_nonblock(buf) # only used for errors + @client.instance_variable_get(:@_io).write_nonblock(buf) + end + + def timed_read(buf2) + buf2.replace(@client.read(*@rd_args)) + end + + def set_input(env, hp) + env[RACK_INPUT] = 0 == hp.content_length ? + NULL_IO : IC.new(@ts = TeeSocket.new(@client), hp) + env[CLIENT_IO] = @client + end + + def close + @client.close + @client = nil + end + + def closed? + @client.nil? + end + + def self.setup + self.const_set(:IC, Unicorn::HttpRequest.input_class) + include Rainbows::ProcessClient + include Methods + end +end +require 'rainbows/revactor/client/methods' diff --git a/lib/rainbows/revactor/client/methods.rb b/lib/rainbows/revactor/client/methods.rb new file mode 100644 index 0000000..e9b39a3 --- /dev/null +++ b/lib/rainbows/revactor/client/methods.rb @@ -0,0 +1,45 @@ +# -*- encoding: binary -*- +# :enddoc: +module Rainbows::Revactor::Client::Methods + if IO.method_defined?(:sendfile_nonblock) + def write_body_file(body, range) + body, client = body_to_io(body), @client + sock = @client.instance_variable_get(:@_io) + pfx = Revactor::TCP::Socket === client ? :tcp : :unix + write_complete = T[:"#{pfx}_write_complete", client] + closed = T[:"#{pfx}_closed", client] + offset, count = range ? range : [ 0, body.stat.size ] + begin + offset += (n = sock.sendfile_nonblock(body, offset, count)) + rescue Errno::EAGAIN + # The @_write_buffer is empty at this point, trigger the + # on_readable method which in turn triggers on_write_complete + # even though nothing was written + client.controller = Actor.current + client.__send__(:enable_write_watcher) + Actor.receive do |filter| + filter.when(write_complete) {} + filter.when(closed) { raise Errno::EPIPE } + end + retry + rescue EOFError + break + end while (count -= n) > 0 + ensure + close_if_private(body) + end + end + + def handle_error(e) + Revactor::TCP::ReadError === e or super + end + + def write_response(status, headers, body, alive) + super(status, headers, body, alive) + alive && @ts and @hp.buf << @ts.leftover + end + + def self.included(klass) + klass.__send__ :alias_method, :write_body_stream, :write_body_each + end +end diff --git a/lib/rainbows/revactor/client/tee_socket.rb b/lib/rainbows/revactor/client/tee_socket.rb new file mode 100644 index 0000000..2f9f52e --- /dev/null +++ b/lib/rainbows/revactor/client/tee_socket.rb @@ -0,0 +1,44 @@ +# -*- encoding: binary -*- +# :enddoc: +# +# Revactor Sockets do not implement readpartial, so we emulate just +# enough to avoid mucking with TeeInput internals. Fortunately +# this code is not heavily used so we can usually avoid the overhead +# of adding a userspace buffer. +class Rainbows::Revactor::Client::TeeSocket + def initialize(socket) + # IO::Buffer is used internally by Rev which Revactor is based on + # so we'll always have it available + @socket, @rbuf = socket, IO::Buffer.new + end + + def leftover + @rbuf.read + end + + # Revactor socket reads always return an unspecified amount, + # sometimes too much + def kgio_read(length, dst = "") + return dst.replace("") if length == 0 + + # always check and return from the userspace buffer first + @rbuf.size > 0 and return dst.replace(@rbuf.read(length)) + + # read off the socket since there was nothing in rbuf + tmp = @socket.read + + # we didn't read too much, good, just return it straight back + # to avoid needlessly wasting memory bandwidth + tmp.size <= length and return dst.replace(tmp) + + # ugh, read returned too much + @rbuf << tmp[length, tmp.size] + dst.replace(tmp[0, length]) + rescue EOFError + end + + # just proxy any remaining methods TeeInput may use + def close + @socket.close + end +end diff --git a/lib/rainbows/revactor/tee_socket.rb b/lib/rainbows/revactor/tee_socket.rb deleted file mode 100644 index 71aeb88..0000000 --- a/lib/rainbows/revactor/tee_socket.rb +++ /dev/null @@ -1,44 +0,0 @@ -# -*- encoding: binary -*- -# :enddoc: -# -# Revactor Sockets do not implement readpartial, so we emulate just -# enough to avoid mucking with TeeInput internals. Fortunately -# this code is not heavily used so we can usually avoid the overhead -# of adding a userspace buffer. -class Rainbows::Revactor::TeeSocket - def initialize(socket) - # IO::Buffer is used internally by Rev which Revactor is based on - # so we'll always have it available - @socket, @rbuf = socket, IO::Buffer.new - end - - def leftover - @rbuf.read - end - - # Revactor socket reads always return an unspecified amount, - # sometimes too much - def kgio_read(length, dst = "") - return dst.replace("") if length == 0 - - # always check and return from the userspace buffer first - @rbuf.size > 0 and return dst.replace(@rbuf.read(length)) - - # read off the socket since there was nothing in rbuf - tmp = @socket.read - - # we didn't read too much, good, just return it straight back - # to avoid needlessly wasting memory bandwidth - tmp.size <= length and return dst.replace(tmp) - - # ugh, read returned too much - @rbuf << tmp[length, tmp.size] - dst.replace(tmp[0, length]) - rescue EOFError - end - - # just proxy any remaining methods TeeInput may use - def close - @socket.close - end -end diff --git a/lib/rainbows/thread_pool.rb b/lib/rainbows/thread_pool.rb index f243dc5..c82e22a 100644 --- a/lib/rainbows/thread_pool.rb +++ b/lib/rainbows/thread_pool.rb @@ -41,7 +41,7 @@ module Rainbows::ThreadPool def sync_worker # :nodoc: s = LISTENERS[0] begin - c = s.kgio_accept and process_client(c) + c = s.kgio_accept and c.process_loop rescue => e Rainbows::Error.listen_loop(e) end while G.alive @@ -55,7 +55,7 @@ module Rainbows::ThreadPool # problem. On the other hand, a thundering herd may not # even incur as much overhead as an extra Mutex#synchronize ret = select(LISTENERS) and ret[0].each do |s| - s = s.kgio_tryaccept and process_client(s) + s = s.kgio_tryaccept and s.process_loop end rescue Errno::EINTR rescue => e diff --git a/lib/rainbows/thread_spawn.rb b/lib/rainbows/thread_spawn.rb index acdaa69..d2d41e8 100644 --- a/lib/rainbows/thread_spawn.rb +++ b/lib/rainbows/thread_spawn.rb @@ -31,7 +31,7 @@ module Rainbows::ThreadSpawn klass.new(c) do |c| begin lock.synchronize { G.cur += 1 } - process_client(c) + c.process_loop ensure lock.synchronize { G.cur -= 1 } end diff --git a/lib/rainbows/writer_thread_pool.rb b/lib/rainbows/writer_thread_pool.rb index 67c8e83..558827f 100644 --- a/lib/rainbows/writer_thread_pool.rb +++ b/lib/rainbows/writer_thread_pool.rb @@ -19,30 +19,14 @@ module Rainbows::WriterThreadPool # :stopdoc: include Rainbows::Base + autoload :Client, 'rainbows/writer_thread_pool/client' @@nr = 0 @@q = nil - def async_write_body(qclient, body, range) - if body.respond_to?(:close) - Rainbows::SyncClose.new(body) do |body| - qclient.q << [ qclient.to_io, :body, body, range ] - end - else - qclient.q << [ qclient.to_io, :body, body, range ] - end - end - def process_client(client) # :nodoc: @@nr += 1 - super(Client.new(client, @@q[@@nr %= @@q.size])) - end - - def init_worker_process(worker) - super - self.class.__send__(:alias_method, :sync_write_body, :write_body) - Rainbows::WriterThreadPool.__send__( - :alias_method, :write_body, :async_write_body) + Client.new(client, @@q[@@nr %= @@q.size]).process_loop end def worker_loop(worker) # :nodoc: @@ -51,12 +35,16 @@ module Rainbows::WriterThreadPool qp = (1..worker_connections).map do |n| Rainbows::QueuePool.new(1) do |response| begin - io, arg1, arg2, arg3 = response - case arg1 - when :body then sync_write_body(io, arg2, arg3) - when :close then io.close unless io.closed? + io, arg, *rest = response + case arg + when String + io.kgio_write(arg) + when :close + warn "#{Thread.current} #{io} close" + io.close unless io.closed? else - io.write(arg1) + warn "#{Thread.current} #{io} #{arg}" + io.__send__(arg, *rest) end rescue => err Rainbows::Error.write(io, err) @@ -70,5 +58,3 @@ module Rainbows::WriterThreadPool end # :startdoc: end -# :enddoc: -require 'rainbows/writer_thread_pool/client' diff --git a/lib/rainbows/writer_thread_pool/client.rb b/lib/rainbows/writer_thread_pool/client.rb index 3cc3335..526a623 100644 --- a/lib/rainbows/writer_thread_pool/client.rb +++ b/lib/rainbows/writer_thread_pool/client.rb @@ -4,6 +4,49 @@ # this is compatible with IO.select class Rainbows::WriterThreadPool::Client < Struct.new(:to_io, :q) include Rainbows::SocketProxy + include Rainbows::ProcessClient + + module Methods + def write_body_each(body) + q << [ to_io, :write_body_each, body ] + end + + def write_response_close(status, headers, body, alive) + to_io.instance_variable_set(:@hp, @hp) # XXX ugh + Rainbows::SyncClose.new(body) { |sync_body| + q << [ to_io, :write_response, status, headers, sync_body, alive ] + } + end + + if IO.respond_to?(:copy_stream) || IO.method_defined?(:sendfile_nonblock) + def write_response(status, headers, body, alive) + if body.respond_to?(:close) + write_response_close(status, headers, body, alive) + elsif body.respond_to?(:to_path) + write_response_path(status, headers, body, alive) + else + super + end + end + + def write_body_file(body, range) + q << [ to_io, :write_body_file, body, range ] + end + + def write_body_stream(body) + q << [ to_io, :write_body_stream, body ] + end + else # each-only body response + def write_response(status, headers, body, alive) + if body.respond_to?(:close) + write_response_close(status, headers, body, alive) + else + super + end + end + end # each-only body response + end # module Methods + include Methods def write(buf) q << [ to_io, buf ] @@ -14,6 +57,6 @@ class Rainbows::WriterThreadPool::Client < Struct.new(:to_io, :q) end def closed? - false + to_io.closed? end end diff --git a/lib/rainbows/writer_thread_spawn.rb b/lib/rainbows/writer_thread_spawn.rb index 43e4f2c..2f264d9 100644 --- a/lib/rainbows/writer_thread_spawn.rb +++ b/lib/rainbows/writer_thread_spawn.rb @@ -19,19 +19,11 @@ require 'thread' # vulnerable to slow client denial-of-service attacks. module Rainbows::WriterThreadSpawn - # :stopdoc: include Rainbows::Base - - def write_body(my_sock, body, range) # :nodoc: - if body.respond_to?(:close) - Rainbows::SyncClose.new(body) { |body| my_sock.queue_body(body, range) } - else - my_sock.queue_body(body, range) - end - end + autoload :Client, 'rainbows/writer_thread_spawn/client' def process_client(client) # :nodoc: - super(Client.new(client)) + Client.new(client).process_loop end def worker_loop(worker) # :nodoc: @@ -42,4 +34,3 @@ module Rainbows::WriterThreadSpawn # :startdoc: end # :enddoc: -require 'rainbows/writer_thread_spawn/client' diff --git a/lib/rainbows/writer_thread_spawn/client.rb b/lib/rainbows/writer_thread_spawn/client.rb index 8f65c19..15264d0 100644 --- a/lib/rainbows/writer_thread_spawn/client.rb +++ b/lib/rainbows/writer_thread_spawn/client.rb @@ -3,12 +3,56 @@ # used to wrap a BasicSocket to use with +q+ for all writes # this is compatible with IO.select class Rainbows::WriterThreadSpawn::Client < Struct.new(:to_io, :q, :thr) - include Rainbows::Response include Rainbows::SocketProxy + include Rainbows::ProcessClient include Rainbows::WorkerYield CUR = {} # :nodoc: + module Methods + def write_body_each(body) + q << [ :write_body_each, body ] + end + + def write_response_close(status, headers, body, alive) + to_io.instance_variable_set(:@hp, @hp) # XXX ugh + Rainbows::SyncClose.new(body) { |sync_body| + q << [ :write_response, status, headers, sync_body, alive ] + } + end + + if IO.respond_to?(:copy_stream) || IO.method_defined?(:sendfile_nonblock) + def write_response(status, headers, body, alive) + self.q ||= queue_writer + if body.respond_to?(:close) + write_response_close(status, headers, body, alive) + elsif body.respond_to?(:to_path) + write_response_path(status, headers, body, alive) + else + super + end + end + + def write_body_file(body, range) + q << [ :write_body_file, body, range ] + end + + def write_body_stream(body) + q << [ :write_body_stream, body ] + end + else # each-only body response + def write_response(status, headers, body, alive) + self.q ||= queue_writer + if body.respond_to?(:close) + write_response_close(status, headers, body, alive) + else + super + end + end + end # each-only body response + end # module Methods + include Methods + def self.quit g = Rainbows::G CUR.delete_if do |t,q| @@ -27,16 +71,17 @@ class Rainbows::WriterThreadSpawn::Client < Struct.new(:to_io, :q, :thr) q = Queue.new self.thr = Thread.new(to_io, q) do |io, q| - while response = q.shift + while op = q.shift begin - arg1, arg2, arg3 = response - case arg1 - when :body then write_body(io, arg2, arg3) + op, *rest = op + case op + when String + io.kgio_write(op) when :close io.close unless io.closed? break else - io.write(arg1) + io.__send__ op, *rest end rescue => e Rainbows::Error.write(io, e) @@ -51,10 +96,6 @@ class Rainbows::WriterThreadSpawn::Client < Struct.new(:to_io, :q, :thr) (self.q ||= queue_writer) << buf end - def queue_body(body, range) - (self.q ||= queue_writer) << [ :body, body, range ] - end - def close if q q << :close @@ -64,6 +105,6 @@ class Rainbows::WriterThreadSpawn::Client < Struct.new(:to_io, :q, :thr) end def closed? - false + to_io.closed? end end -- cgit v1.2.3-24-ge0c7