diff options
26 files changed, 405 insertions, 413 deletions
diff --git a/lib/rainbows.rb b/lib/rainbows.rb index 643bdd2..909e97e 100644 --- a/lib/rainbows.rb +++ b/lib/rainbows.rb @@ -39,9 +39,11 @@ module Rainbows require 'rainbows/const' require 'rainbows/http_parser' require 'rainbows/http_server' - require 'rainbows/response' - require 'rainbows/client' - require 'rainbows/process_client' + autoload :RackInput, 'rainbows/rack_input' + autoload :Response, 'rainbows/response' + autoload :ProcessClient, 'rainbows/process_client' + autoload :TimedRead, 'rainbows/timed_read' + autoload :Client, 'rainbows/client' autoload :Base, 'rainbows/base' autoload :Sendfile, 'rainbows/sendfile' autoload :AppPool, 'rainbows/app_pool' diff --git a/lib/rainbows/base.rb b/lib/rainbows/base.rb index bf9ef87..5d56063 100644 --- a/lib/rainbows/base.rb +++ b/lib/rainbows/base.rb @@ -6,9 +6,7 @@ # not intended for production use, as keepalive with a pure prefork # concurrency model is extremely expensive. module Rainbows::Base - # :stopdoc: - include Rainbows::ProcessClient # shortcuts... G = Rainbows::G @@ -34,6 +32,10 @@ module Rainbows::Base logger.info "Rainbows! #@use worker_connections=#@worker_connections" end + def process_client(client) + client.process_loop + end + def self.included(klass) # :nodoc: klass.const_set :LISTENERS, Rainbows::HttpServer::LISTENERS klass.const_set :G, Rainbows::G diff --git a/lib/rainbows/client.rb b/lib/rainbows/client.rb index dc6d95e..8425e9e 100644 --- a/lib/rainbows/client.rb +++ b/lib/rainbows/client.rb @@ -1,9 +1,8 @@ # -*- encoding: binary -*- # :enddoc: -require 'rainbows/timed_read' - +# this class is used for most synchronous concurrency models class Rainbows::Client < Kgio::Socket include Rainbows::TimedRead + include Rainbows::ProcessClient end -Kgio.accept_class = Rainbows::Client diff --git a/lib/rainbows/coolio.rb b/lib/rainbows/coolio.rb index 463bf0a..d0b8b2e 100644 --- a/lib/rainbows/coolio.rb +++ b/lib/rainbows/coolio.rb @@ -31,6 +31,7 @@ module Rainbows::Coolio KATO.compare_by_identity end + autoload :Client, 'rainbows/coolio/client' autoload :Master, 'rainbows/coolio/master' autoload :ThreadClient, 'rainbows/coolio/thread_client' autoload :ResponsePipe, 'rainbows/coolio/response_pipe' @@ -41,5 +42,4 @@ end require 'rainbows/coolio/heartbeat' require 'rainbows/coolio/server' require 'rainbows/coolio/core' -require 'rainbows/coolio/client' Rainbows::Coolio.__send__ :include, Rainbows::Coolio::Core diff --git a/lib/rainbows/coolio/client.rb b/lib/rainbows/coolio/client.rb index 6360e2d..d0b17a9 100644 --- a/lib/rainbows/coolio/client.rb +++ b/lib/rainbows/coolio/client.rb @@ -84,39 +84,37 @@ class Rainbows::Coolio::Client < Coolio::IO end # used for streaming sockets and pipes - def stream_response(status, headers, io, body) - c = stream_response_headers(status, headers) if headers + def stream_response_body(body, io, chunk) # we only want to attach to the Coolio::Loop belonging to the # main thread in Ruby 1.9 - io = (c ? ResponseChunkPipe : ResponsePipe).new(io, self, body) + io = (chunk ? ResponseChunkPipe : ResponsePipe).new(io, self, body) defer_body(io.attach(LOOP)) end def coolio_write_response(response, alive) status, headers, body = response - headers = @hp.headers? ? HH.new(headers) : nil - headers[CONNECTION] = alive ? KEEP_ALIVE : CLOSE if headers if body.respond_to?(:to_path) io = body_to_io(body) st = io.stat if st.file? - offset, count = 0, st.size - if headers - if range = make_range!(@env, status, headers) - status, offset, count = range - end - write(response_header(status, headers)) + if respond_to?(:sendfile_range) && r = sendfile_range(status, headers) + status, headers, range = r + write_headers(status, headers, alive) + defer_body(SF.new(range[0], range[1], io, body)) if range + else + write_headers(status, headers, alive) + defer_body(SF.new(0, st.size, io, body)) end - return defer_body(SF.new(offset, count, io, body)) + return elsif st.socket? || st.pipe? - return stream_response(status, headers, io, body) + chunk = stream_response_headers(status, headers, alive) + return stream_response_body(body, io, chunk) end # char or block device... WTF? fall through to body.each end - write(response_header(status, headers)) if headers - write_body_each(self, body, nil) + write_response(status, headers, body, alive) end def app_call diff --git a/lib/rainbows/ev_core.rb b/lib/rainbows/ev_core.rb index 60fbdca..471f6a3 100644 --- a/lib/rainbows/ev_core.rb +++ b/lib/rainbows/ev_core.rb @@ -36,14 +36,15 @@ module Rainbows::EvCore end # returns whether to enable response chunking for autochunk models - def stream_response_headers(status, headers) + def stream_response_headers(status, headers, alive) + headers = Rack::Utils::HeaderHash.new(headers) if headers['Content-Length'] rv = false else rv = !!(headers['Transfer-Encoding'] =~ %r{\Achunked\z}i) rv = false if headers.delete('X-Rainbows-Autochunk') == 'no' end - write(response_header(status, headers)) + write_headers(status, headers, alive) rv end diff --git a/lib/rainbows/event_machine.rb b/lib/rainbows/event_machine.rb index b226cab..cb76669 100644 --- a/lib/rainbows/event_machine.rb +++ b/lib/rainbows/event_machine.rb @@ -44,6 +44,7 @@ module Rainbows::EventMachine autoload :ResponsePipe, 'rainbows/event_machine/response_pipe' autoload :ResponseChunkPipe, 'rainbows/event_machine/response_chunk_pipe' autoload :TryDefer, 'rainbows/event_machine/try_defer' + autoload :Client, 'rainbows/event_machine/client' include Rainbows::Base @@ -89,5 +90,4 @@ module Rainbows::EventMachine end end # :enddoc: -require 'rainbows/event_machine/client' require 'rainbows/event_machine/server' diff --git a/lib/rainbows/event_machine/client.rb b/lib/rainbows/event_machine/client.rb index 6863be0..2fc9d03 100644 --- a/lib/rainbows/event_machine/client.rb +++ b/lib/rainbows/event_machine/client.rb @@ -58,27 +58,20 @@ class Rainbows::EventMachine::Client < EM::Connection end end + # don't change this method signature, "async.callback" relies on it def em_write_response(response, alive = false) status, headers, body = response - if @hp.headers? - headers = HH.new(headers) - headers[CONNECTION] = alive ? KEEP_ALIVE : CLOSE - else - headers = nil - end if body.respond_to?(:errback) && body.respond_to?(:callback) @body = body body.callback { quit } body.errback { quit } - # async response, this could be a trickle as is in comet-style apps - headers[CONNECTION] = CLOSE if headers alive = true elsif body.respond_to?(:to_path) st = File.stat(path = body.to_path) if st.file? - write(response_header(status, headers)) if headers + write_headers(status, headers, alive) @body = stream_file_data(path) @body.errback do body.close if body.respond_to?(:close) @@ -92,16 +85,14 @@ class Rainbows::EventMachine::Client < EM::Connection return elsif st.socket? || st.pipe? io = body_to_io(@body = body) - chunk = stream_response_headers(status, headers) if headers + chunk = stream_response_headers(status, headers, alive) m = chunk ? Rainbows::EventMachine::ResponseChunkPipe : Rainbows::EventMachine::ResponsePipe return EM.watch(io, m, self).notify_readable = true end # char or block device... WTF? fall through to body.each end - - write(response_header(status, headers)) if headers - write_body_each(self, body) + write_response(status, headers, body, alive) quit unless alive end diff --git a/lib/rainbows/fiber/base.rb b/lib/rainbows/fiber/base.rb index b693451..ae885b6 100644 --- a/lib/rainbows/fiber/base.rb +++ b/lib/rainbows/fiber/base.rb @@ -57,7 +57,7 @@ module Rainbows::Fiber::Base def process(client) G.cur += 1 - process_client(client) + client.process_loop ensure G.cur -= 1 ZZ.delete(client.f) @@ -65,7 +65,7 @@ module Rainbows::Fiber::Base def self.setup(klass, app) require 'rainbows/fiber/body' - klass.__send__(:include, Rainbows::Fiber::Body) + Rainbows::Client.__send__(:include, Rainbows::Fiber::Body) self.const_set(:APP, app) end end diff --git a/lib/rainbows/fiber/body.rb b/lib/rainbows/fiber/body.rb index 1d7d325..872b1df 100644 --- a/lib/rainbows/fiber/body.rb +++ b/lib/rainbows/fiber/body.rb @@ -5,20 +5,15 @@ # this is meant to be included _after_ Rainbows::Response::Body module Rainbows::Fiber::Body # :nodoc: - # TODO non-blocking splice(2) under Linux - ALIASES = { - :write_body_stream => :write_body_each - } - # the sendfile 1.0.0+ gem includes IO#sendfile_nonblock if IO.method_defined?(:sendfile_nonblock) - def write_body_file_sendfile_fiber(client, body, range) - sock, n, body = client.to_io, nil, body_to_io(body) + def write_body_file(body, range) + sock, n, body = to_io, nil, body_to_io(body) offset, count = range ? range : [ 0, body.stat.size ] begin offset += (n = sock.sendfile_nonblock(body, offset, count)) rescue Errno::EAGAIN - client.kgio_wait_writable + kgio_wait_writable retry rescue EOFError break @@ -26,14 +21,9 @@ module Rainbows::Fiber::Body # :nodoc: ensure close_if_private(body) end - ALIASES[:write_body_file] = :write_body_file_sendfile_fiber - else - ALIASES[:write_body] = :write_body_each end def self.included(klass) - ALIASES.each do |new_method, orig_method| - klass.__send__(:alias_method, new_method, orig_method) - end + klass.__send__ :alias_method, :write_body_stream, :write_body_each end end diff --git a/lib/rainbows/fiber/coolio/server.rb b/lib/rainbows/fiber/coolio/server.rb index 0de1ab3..b064953 100644 --- a/lib/rainbows/fiber/coolio/server.rb +++ b/lib/rainbows/fiber/coolio/server.rb @@ -2,7 +2,6 @@ # :enddoc: class Rainbows::Fiber::Coolio::Server < Coolio::IOWatcher G = Rainbows::G - include Rainbows::ProcessClient def to_io @io @@ -25,7 +24,7 @@ class Rainbows::Fiber::Coolio::Server < Coolio::IOWatcher def process(io) G.cur += 1 - process_client(io) + io.process_loop ensure G.cur -= 1 end diff --git a/lib/rainbows/process_client.rb b/lib/rainbows/process_client.rb index 54e59e8..d840778 100644 --- a/lib/rainbows/process_client.rb +++ b/lib/rainbows/process_client.rb @@ -1,54 +1,41 @@ # -*- encoding: binary -*- -# :enddoc: -require 'rainbows/rack_input' module Rainbows::ProcessClient - G = Rainbows::G include Rainbows::Response - HttpParser = Rainbows::HttpParser include Rainbows::RackInput include Rainbows::Const - # once a client is accepted, it is processed in its entirety here - # in 3 easy steps: read request, call app, write app response - # this is used by synchronous concurrency models - # Base, ThreadSpawn, ThreadPool - def process_client(client) # :nodoc: - hp = HttpParser.new - client.kgio_read!(16384, buf = hp.buf) - remote_addr = client.kgio_addr - alive = false + def process_loop + @hp = hp = Rainbows::HttpParser.new + kgio_read!(16384, buf = hp.buf) or return begin # loop until env = hp.parse - client.timed_read(buf2 ||= "") or return + timed_read(buf2 ||= "") or return buf << buf2 end - set_input(env, hp, client) - env[REMOTE_ADDR] = remote_addr - status, headers, body = APP.call(env.update(RACK_DEFAULTS)) + set_input(env, hp) + env[REMOTE_ADDR] = kgio_addr + status, headers, body = APP.call(env.merge!(RACK_DEFAULTS)) if 100 == status.to_i - client.write(EXPECT_100_RESPONSE) + write(EXPECT_100_RESPONSE) env.delete(HTTP_EXPECT) status, headers, body = APP.call(env) end - - if hp.headers? - headers = HH.new(headers) - range = make_range!(env, status, headers) and status = range.shift - headers[CONNECTION] = (alive = hp.next?) ? KEEP_ALIVE : CLOSE - client.write(response_header(status, headers)) - end - write_body(client, body, range) + write_response(status, headers, body, alive = @hp.next?) end while alive # if we get any error, try to write something back to the client # assuming we haven't closed the socket, but don't get hung up # if the socket is already closed or broken. We'll always ensure # the socket is closed at the end of this function rescue => e - Rainbows::Error.write(client, e) + handle_error(e) ensure - client.close unless client.closed? + close unless closed? + end + + def handle_error(e) + Rainbows::Error.write(self, e) end end diff --git a/lib/rainbows/rack_input.rb b/lib/rainbows/rack_input.rb index df51ac1..bc68ed1 100644 --- a/lib/rainbows/rack_input.rb +++ b/lib/rainbows/rack_input.rb @@ -10,8 +10,8 @@ module Rainbows::RackInput const_set(:IC, Unicorn::HttpRequest.input_class) end - def set_input(env, hp, client) - env[RACK_INPUT] = 0 == hp.content_length ? NULL_IO : IC.new(client, hp) - env[CLIENT_IO] = client + def set_input(env, hp) + env[RACK_INPUT] = 0 == hp.content_length ? NULL_IO : IC.new(self, hp) + env[CLIENT_IO] = self end end diff --git a/lib/rainbows/response.rb b/lib/rainbows/response.rb index ca381b8..c0d0740 100644 --- a/lib/rainbows/response.rb +++ b/lib/rainbows/response.rb @@ -3,60 +3,179 @@ require 'time' # for Time#httpdate module Rainbows::Response - autoload :Body, 'rainbows/response/body' - autoload :Range, 'rainbows/response/range' - + CRLF = Unicorn::HttpResponse::CRLF CODES = Unicorn::HttpResponse::CODES - CRLF = "\r\n" + Close = "close" + KeepAlive = "keep-alive" + + # private file class for IO objects opened by Rainbows! itself (and not + # the app or middleware) + class F < File; end - # freeze headers we may set as hash keys for a small speedup - CONNECTION = "Connection".freeze - CLOSE = "close" - KEEP_ALIVE = "keep-alive" - HH = Rack::Utils::HeaderHash + # called after forking + def self.setup(klass) + Kgio.accept_class = Rainbows::Client + 0 == Rainbows::G.kato and Rainbows::HttpParser.keepalive_requests = 0 + end - def response_header(status, headers) + def write_headers(status, headers, alive) + @hp.headers? or return status = CODES[status.to_i] || status - rv = "HTTP/1.1 #{status}\r\n" \ - "Date: #{Time.now.httpdate}\r\n" \ - "Status: #{status}\r\n" + buf = "HTTP/1.1 #{status}\r\n" \ + "Date: #{Time.now.httpdate}\r\n" \ + "Status: #{status}\r\n" \ + "Connection: #{alive ? KeepAlive : Close}\r\n" headers.each do |key, value| - next if %r{\A(?:X-Rainbows-|Date\z|Status\z)}i =~ key + next if %r{\A(?:X-Rainbows-|Date\z|Status\z\|Connection\z)}i =~ key if value =~ /\n/ # avoiding blank, key-only cookies with /\n+/ - rv << value.split(/\n+/).map! { |v| "#{key}: #{v}\r\n" }.join('') + buf << value.split(/\n+/).map! { |v| "#{key}: #{v}\r\n" }.join else - rv << "#{key}: #{value}\r\n" + buf << "#{key}: #{value}\r\n" end end - rv << CRLF + write(buf << CRLF) end - # called after forking - def self.setup(klass) - if 0 == Rainbows::G.kato - KEEP_ALIVE.replace(CLOSE) - Rainbows::HttpParser.keepalive_requests = 0 - end - range_class = body_class = klass - case Rainbows::Const::RACK_DEFAULTS['rainbows.model'] - when :WriterThreadSpawn - body_class = Rainbows::WriterThreadSpawn::Client - range_class = Rainbows::HttpServer - when :EventMachine, :NeverBlock - range_class = nil # :< - end - return if body_class.included_modules.include?(Body) - body_class.__send__(:include, Body) - sf = IO.respond_to?(:copy_stream) || IO.method_defined?(:sendfile_nonblock) - if range_class - range_class.__send__(:include, sf ? Range : NoRange) + def close_if_private(io) + io.close if F === io + end + + def io_for_fd(fd) + Rainbows::FD_MAP.delete(fd) || F.for_fd(fd) + end + + # to_io is not part of the Rack spec, but make an exception here + # since we can conserve path lookups and file descriptors. + # \Rainbows! will never get here without checking for the existence + # of body.to_path first. + def body_to_io(body) + if body.respond_to?(:to_io) + body.to_io + else + # try to take advantage of Rainbows::DevFdResponse, calling F.open + # is a last resort + path = body.to_path + %r{\A/dev/fd/(\d+)\z} =~ path ? io_for_fd($1.to_i) : F.open(path) end end - module NoRange - # dummy method if we can't send range responses - def make_range!(env, status, headers) + module Each + # generic body writer, used for most dynamically-generated responses + def write_body_each(body) + body.each { |chunk| write(chunk) } + end + + # generic response writer, used for most dynamically-generated responses + # and also when IO.copy_stream and/or IO#sendfile_nonblock is unavailable + def write_response(status, headers, body, alive) + write_headers(status, headers, alive) + write_body_each(body) + ensure + body.close if body.respond_to?(:close) end end + include Each + + if IO.method_defined?(:sendfile_nonblock) + module Sendfile + def write_body_file(body, range) + io = body_to_io(body) + range ? sendfile(io, range[0], range[1]) : sendfile(io, 0) + ensure + close_if_private(io) + end + end + include Sendfile + end + + if IO.respond_to?(:copy_stream) + unless IO.method_defined?(:sendfile_nonblock) + module CopyStream + def write_body_file(body, range) + range ? IO.copy_stream(body, self, range[1], range[0]) : + IO.copy_stream(body, self, nil, 0) + end + end + include CopyStream + end + + # write_body_stream is an alias for write_body_each if IO.copy_stream + # isn't used or available. + def write_body_stream(body) + IO.copy_stream(io = body_to_io(body), self) + ensure + close_if_private(io) + end + else # ! IO.respond_to?(:copy_stream) + alias write_body_stream write_body_each + end # ! IO.respond_to?(:copy_stream) + + if IO.method_defined?(:sendfile_nonblock) || IO.respond_to?(:copy_stream) + HTTP_RANGE = 'HTTP_RANGE' + Content_Range = 'Content-Range'.freeze + Content_Length = 'Content-Length'.freeze + + # This does not support multipart responses (does anybody actually + # use those?) + def sendfile_range(status, headers) + 200 == status.to_i && + /\Abytes=(\d+-\d*|\d*-\d+)\z/ =~ @hp.env[HTTP_RANGE] or + return + a, b = $1.split(/-/) + + headers = Rack::Utils::HeaderHash.new(headers) + clen = headers[Content_Length] or return + size = clen.to_i + + if b.nil? # bytes=M- + offset = a.to_i + count = size - offset + elsif a.empty? # bytes=-N + offset = size - b.to_i + count = size - offset + else # bytes=M-N + offset = a.to_i + count = b.to_i + 1 - offset + end + + if 0 > count || offset >= size + return 416, headers, nil + else + count = size if count > size + headers[Content_Length] = count.to_s + headers[Content_Range] = "bytes #{offset}-#{offset+count-1}/#{clen}" + return 206, headers, [ offset, count ] + end + end + + def write_response_path(status, headers, body, alive) + if File.file?(body.to_path) + if r = sendfile_range(status, headers) + status, headers, range = r + write_headers(status, headers, alive) + write_body_file(body, range) if range + else + write_headers(status, headers, alive) + write_body_file(body, nil) + end + else + write_headers(status, headers, alive) + write_body_stream(body) + end + ensure + body.close if body.respond_to?(:close) + end + + module ToPath + def write_response(status, headers, body, alive) + if body.respond_to?(:to_path) + write_response_path(status, headers, body, alive) + else + super + end + end + end + include ToPath + end # IO.respond_to?(:copy_stream) || IO.method_defined?(:sendfile_nonblock) end diff --git a/lib/rainbows/response/body.rb b/lib/rainbows/response/body.rb deleted file mode 100644 index a5d04dd..0000000 --- a/lib/rainbows/response/body.rb +++ /dev/null @@ -1,122 +0,0 @@ -# -*- encoding: binary -*- -# :enddoc: -# non-portable body response stuff goes here -# -# The sendfile 1.0.0 RubyGem includes IO#sendfile and -# IO#sendfile_nonblock. Previous versions of "sendfile" didn't have -# IO#sendfile_nonblock, and IO#sendfile in previous versions could -# block other threads under 1.8 with large files -# -# IO#sendfile currently (June 2010) beats 1.9 IO.copy_stream with -# non-Linux support and large files on 32-bit. We still fall back to -# IO.copy_stream (if available) if we're dealing with DevFdResponse -# objects, though. -# -# Linux-only splice(2) support via the "io_splice" gem will eventually -# be added for streaming sockets/pipes, too. -# -# * write_body_file - regular files (sendfile or pread+write) -# * write_body_stream - socket/pipes (read+write, splice later) -# * write_body_each - generic fallback -# -# callgraph is as follows: -# -# write_body -# `- write_body_each -# `- write_body_path -# `- write_body_file -# `- write_body_stream -# -module Rainbows::Response::Body # :nodoc: - ALIASES = {} - - FD_MAP = Rainbows::FD_MAP - - class F < File; end - - def close_if_private(io) - io.close if F === io - end - - def io_for_fd(fd) - FD_MAP.delete(fd) || F.for_fd(fd) - end - - # to_io is not part of the Rack spec, but make an exception here - # since we can conserve path lookups and file descriptors. - # \Rainbows! will never get here without checking for the existence - # of body.to_path first. - def body_to_io(body) - if body.respond_to?(:to_io) - body.to_io - else - # try to take advantage of Rainbows::DevFdResponse, calling File.open - # is a last resort - path = body.to_path - path =~ %r{\A/dev/fd/(\d+)\z} ? io_for_fd($1.to_i) : F.open(path) - end - end - - if IO.method_defined?(:sendfile_nonblock) - def write_body_file_sendfile(sock, body, range) - io = body_to_io(body) - range ? sock.sendfile(io, range[0], range[1]) : sock.sendfile(io, 0) - ensure - close_if_private(io) - end - ALIASES[:write_body_file] = :write_body_file_sendfile - end - - if IO.respond_to?(:copy_stream) - unless method_defined?(:write_body_file_sendfile) - # try to use sendfile() via IO.copy_stream, otherwise pread()+write() - def write_body_file_copy_stream(sock, body, range) - range ? IO.copy_stream(body, sock, range[1], range[0]) : - IO.copy_stream(body, sock, nil, 0) - end - ALIASES[:write_body_file] = :write_body_file_copy_stream - end - - # only used when body is a pipe or socket that can't handle - # pread() semantics - def write_body_stream(sock, body) - IO.copy_stream(body, sock) - end - else - # fall back to body#each, which is a Rack standard - ALIASES[:write_body_stream] = :write_body_each - end - - if ALIASES[:write_body_file] - # middlewares/apps may return with a body that responds to +to_path+ - def write_body_path(sock, body, range) - File.file?(body.to_path) ? write_body_file(sock, body, range) : - write_body_stream(sock, body) - ensure - body.respond_to?(:close) and body.close - end - end - - if method_defined?(:write_body_path) - def write_body(client, body, range) - body.respond_to?(:to_path) ? - write_body_path(client, body, range) : - write_body_each(client, body) - end - else - ALIASES[:write_body] = :write_body_each - end - - # generic body writer, used for most dynamically generated responses - def write_body_each(socket, body, range = nil) - body.each { |chunk| socket.write(chunk) } - ensure - body.respond_to?(:close) and body.close - end - - def self.included(klass) - ALIASES.each do |new_method, orig_method| - klass.__send__(:alias_method, new_method, orig_method) - end - end -end diff --git a/lib/rainbows/response/range.rb b/lib/rainbows/response/range.rb deleted file mode 100644 index b383587..0000000 --- a/lib/rainbows/response/range.rb +++ /dev/null @@ -1,34 +0,0 @@ -# -*- encoding: binary -*- -# :enddoc: -module Rainbows::Response::Range - HTTP_RANGE = 'HTTP_RANGE' - Content_Range = 'Content-Range'.freeze - Content_Length = 'Content-Length'.freeze - - # This does not support multipart responses (does anybody actually - # use those?) +headers+ is always a Rack::Utils::HeaderHash - def make_range!(env, status, headers) - if 200 == status.to_i && - (clen = headers[Content_Length]) && - /\Abytes=(\d+-\d*|\d*-\d+)\z/ =~ env[HTTP_RANGE] - a, b = $1.split(/-/) - clen = clen.to_i - if b.nil? # bytes=M- - offset = a.to_i - count = clen - offset - elsif a.empty? # bytes=-N - offset = clen - b.to_i - count = clen - offset - else # bytes=M-N - offset = a.to_i - count = b.to_i + 1 - offset - end - raise Rainbows::Response416 if count <= 0 || offset >= clen - count = clen if count > clen - headers[Content_Length] = count.to_s - headers[Content_Range] = "bytes #{offset}-#{offset+count-1}/#{clen}" - [ 206, offset, count ] - end - # nil if no status - end -end diff --git a/lib/rainbows/revactor.rb b/lib/rainbows/revactor.rb index f4e8fca..be4badf 100644 --- a/lib/rainbows/revactor.rb +++ b/lib/rainbows/revactor.rb @@ -19,76 +19,17 @@ Revactor::VERSION >= '0.1.5' or abort 'revactor 0.1.5 is required' # \Revactor library as well, to take advantage of the networking # concurrency features this model provides. module Rainbows::Revactor - - # :stopdoc: - RD_ARGS = {} - + autoload :Client, 'rainbows/revactor/client' autoload :Proxy, 'rainbows/revactor/proxy' - autoload :TeeSocket, 'rainbows/revactor/tee_socket' include Rainbows::Base - LOCALHOST = Kgio::LOCALHOST - TCP = Revactor::TCP::Socket - - # once a client is accepted, it is processed in its entirety here - # in 3 easy steps: read request, call app, write app response - def process_client(client) # :nodoc: - io = client.instance_variable_get(:@_io) - io.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) - rd_args = [ nil ] - remote_addr = if TCP === client - rd_args << RD_ARGS - client.remote_addr - else - LOCALHOST - end - hp = Rainbows::HttpParser.new - buf = hp.buf - alive = false - - begin - ts = nil - until env = hp.parse - buf << client.read(*rd_args) - end - - env[CLIENT_IO] = client - env[RACK_INPUT] = 0 == hp.content_length ? - NULL_IO : IC.new(ts = TeeSocket.new(client), hp) - env[REMOTE_ADDR] = remote_addr - status, headers, body = app.call(env.update(RACK_DEFAULTS)) - - if 100 == status.to_i - client.write(EXPECT_100_RESPONSE) - env.delete(HTTP_EXPECT) - status, headers, body = app.call(env) - end - - if hp.headers? - headers = HH.new(headers) - range = make_range!(env, status, headers) and status = range.shift - headers[CONNECTION] = (alive = hp.next?) ? KEEP_ALIVE : CLOSE - client.write(response_header(status, headers)) - alive && ts and buf << ts.leftover - end - write_body(client, body, range) - end while alive - rescue Revactor::TCP::ReadError - rescue => e - Rainbows::Error.write(io, e) - ensure - client.close - end # runs inside each forked worker, this sits around and waits # for connections and doesn't die until the parent dies (or is # given a INT, QUIT, or TERM signal) def worker_loop(worker) #:nodoc: + Client.setup init_worker_process(worker) - require 'rainbows/revactor/body' - self.class.__send__(:include, Rainbows::Revactor::Body) - self.class.const_set(:IC, Unicorn::HttpRequest.input_class) - RD_ARGS[:timeout] = G.kato if G.kato > 0 nr = 0 limit = worker_connections actor_exit = Case[:exit, Actor, Object] @@ -114,7 +55,7 @@ module Rainbows::Revactor f.when(actor_exit) { nr -= 1 } f.when(accept) do |_, _, s| nr += 1 - Actor.spawn_link(s) { |c| process_client(c) } + Actor.spawn_link(s) { |c| Client.new(c).process_loop } end end rescue => e diff --git a/lib/rainbows/revactor/client.rb b/lib/rainbows/revactor/client.rb new file mode 100644 index 0000000..7c4b53d --- /dev/null +++ b/lib/rainbows/revactor/client.rb @@ -0,0 +1,59 @@ +# -*- encoding: binary -*- +# :enddoc: +require 'fcntl' +class Rainbows::Revactor::Client + autoload :TeeSocket, 'rainbows/revactor/client/tee_socket' + RD_ARGS = {} + RD_ARGS[:timeout] = Rainbows::G.kato if Rainbows::G.kato > 0 + attr_reader :kgio_addr + + def initialize(client) + @client, @rd_args, @ts = client, [ nil ], nil + io = client.instance_variable_get(:@_io) + io.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) + @kgio_addr = if Revactor::TCP::Socket === client + @rd_args << RD_ARGS + client.remote_addr + else + Kgio::LOCALHOST + end + end + + def kgio_read!(nr, buf) + buf.replace(@client.read) + end + + def write(buf) + @client.write(buf) + end + + def write_nonblock(buf) # only used for errors + @client.instance_variable_get(:@_io).write_nonblock(buf) + end + + def timed_read(buf2) + buf2.replace(@client.read(*@rd_args)) + end + + def set_input(env, hp) + env[RACK_INPUT] = 0 == hp.content_length ? + NULL_IO : IC.new(@ts = TeeSocket.new(@client), hp) + env[CLIENT_IO] = @client + end + + def close + @client.close + @client = nil + end + + def closed? + @client.nil? + end + + def self.setup + self.const_set(:IC, Unicorn::HttpRequest.input_class) + include Rainbows::ProcessClient + include Methods + end +end +require 'rainbows/revactor/client/methods' diff --git a/lib/rainbows/revactor/body.rb b/lib/rainbows/revactor/client/methods.rb index 9820df3..e9b39a3 100644 --- a/lib/rainbows/revactor/body.rb +++ b/lib/rainbows/revactor/client/methods.rb @@ -1,15 +1,10 @@ # -*- encoding: binary -*- # :enddoc: -module Rainbows::Revactor::Body - # TODO non-blocking splice(2) under Linux - ALIASES = { - :write_body_stream => :write_body_each - } - +module Rainbows::Revactor::Client::Methods if IO.method_defined?(:sendfile_nonblock) - def write_body_file_sendfile_revactor(client, body, range) - body = body_to_io(body) - sock = client.instance_variable_get(:@_io) + def write_body_file(body, range) + body, client = body_to_io(body), @client + sock = @client.instance_variable_get(:@_io) pfx = Revactor::TCP::Socket === client ? :tcp : :unix write_complete = T[:"#{pfx}_write_complete", client] closed = T[:"#{pfx}_closed", client] @@ -33,14 +28,18 @@ module Rainbows::Revactor::Body ensure close_if_private(body) end - ALIASES[:write_body_file] = :write_body_file_sendfile_revactor - else - ALIASES[:write_body] = :write_body_each + end + + def handle_error(e) + Revactor::TCP::ReadError === e or super + end + + def write_response(status, headers, body, alive) + super(status, headers, body, alive) + alive && @ts and @hp.buf << @ts.leftover end def self.included(klass) - ALIASES.each do |new_method, orig_method| - klass.__send__(:alias_method, new_method, orig_method) - end + klass.__send__ :alias_method, :write_body_stream, :write_body_each end end diff --git a/lib/rainbows/revactor/tee_socket.rb b/lib/rainbows/revactor/client/tee_socket.rb index 71aeb88..2f9f52e 100644 --- a/lib/rainbows/revactor/tee_socket.rb +++ b/lib/rainbows/revactor/client/tee_socket.rb @@ -5,7 +5,7 @@ # enough to avoid mucking with TeeInput internals. Fortunately # this code is not heavily used so we can usually avoid the overhead # of adding a userspace buffer. -class Rainbows::Revactor::TeeSocket +class Rainbows::Revactor::Client::TeeSocket def initialize(socket) # IO::Buffer is used internally by Rev which Revactor is based on # so we'll always have it available diff --git a/lib/rainbows/thread_pool.rb b/lib/rainbows/thread_pool.rb index f243dc5..c82e22a 100644 --- a/lib/rainbows/thread_pool.rb +++ b/lib/rainbows/thread_pool.rb @@ -41,7 +41,7 @@ module Rainbows::ThreadPool def sync_worker # :nodoc: s = LISTENERS[0] begin - c = s.kgio_accept and process_client(c) + c = s.kgio_accept and c.process_loop rescue => e Rainbows::Error.listen_loop(e) end while G.alive @@ -55,7 +55,7 @@ module Rainbows::ThreadPool # problem. On the other hand, a thundering herd may not # even incur as much overhead as an extra Mutex#synchronize ret = select(LISTENERS) and ret[0].each do |s| - s = s.kgio_tryaccept and process_client(s) + s = s.kgio_tryaccept and s.process_loop end rescue Errno::EINTR rescue => e diff --git a/lib/rainbows/thread_spawn.rb b/lib/rainbows/thread_spawn.rb index acdaa69..d2d41e8 100644 --- a/lib/rainbows/thread_spawn.rb +++ b/lib/rainbows/thread_spawn.rb @@ -31,7 +31,7 @@ module Rainbows::ThreadSpawn klass.new(c) do |c| begin lock.synchronize { G.cur += 1 } - process_client(c) + c.process_loop ensure lock.synchronize { G.cur -= 1 } end diff --git a/lib/rainbows/writer_thread_pool.rb b/lib/rainbows/writer_thread_pool.rb index 67c8e83..558827f 100644 --- a/lib/rainbows/writer_thread_pool.rb +++ b/lib/rainbows/writer_thread_pool.rb @@ -19,30 +19,14 @@ module Rainbows::WriterThreadPool # :stopdoc: include Rainbows::Base + autoload :Client, 'rainbows/writer_thread_pool/client' @@nr = 0 @@q = nil - def async_write_body(qclient, body, range) - if body.respond_to?(:close) - Rainbows::SyncClose.new(body) do |body| - qclient.q << [ qclient.to_io, :body, body, range ] - end - else - qclient.q << [ qclient.to_io, :body, body, range ] - end - end - def process_client(client) # :nodoc: @@nr += 1 - super(Client.new(client, @@q[@@nr %= @@q.size])) - end - - def init_worker_process(worker) - super - self.class.__send__(:alias_method, :sync_write_body, :write_body) - Rainbows::WriterThreadPool.__send__( - :alias_method, :write_body, :async_write_body) + Client.new(client, @@q[@@nr %= @@q.size]).process_loop end def worker_loop(worker) # :nodoc: @@ -51,12 +35,16 @@ module Rainbows::WriterThreadPool qp = (1..worker_connections).map do |n| Rainbows::QueuePool.new(1) do |response| begin - io, arg1, arg2, arg3 = response - case arg1 - when :body then sync_write_body(io, arg2, arg3) - when :close then io.close unless io.closed? + io, arg, *rest = response + case arg + when String + io.kgio_write(arg) + when :close + warn "#{Thread.current} #{io} close" + io.close unless io.closed? else - io.write(arg1) + warn "#{Thread.current} #{io} #{arg}" + io.__send__(arg, *rest) end rescue => err Rainbows::Error.write(io, err) @@ -70,5 +58,3 @@ module Rainbows::WriterThreadPool end # :startdoc: end -# :enddoc: -require 'rainbows/writer_thread_pool/client' diff --git a/lib/rainbows/writer_thread_pool/client.rb b/lib/rainbows/writer_thread_pool/client.rb index 3cc3335..526a623 100644 --- a/lib/rainbows/writer_thread_pool/client.rb +++ b/lib/rainbows/writer_thread_pool/client.rb @@ -4,6 +4,49 @@ # this is compatible with IO.select class Rainbows::WriterThreadPool::Client < Struct.new(:to_io, :q) include Rainbows::SocketProxy + include Rainbows::ProcessClient + + module Methods + def write_body_each(body) + q << [ to_io, :write_body_each, body ] + end + + def write_response_close(status, headers, body, alive) + to_io.instance_variable_set(:@hp, @hp) # XXX ugh + Rainbows::SyncClose.new(body) { |sync_body| + q << [ to_io, :write_response, status, headers, sync_body, alive ] + } + end + + if IO.respond_to?(:copy_stream) || IO.method_defined?(:sendfile_nonblock) + def write_response(status, headers, body, alive) + if body.respond_to?(:close) + write_response_close(status, headers, body, alive) + elsif body.respond_to?(:to_path) + write_response_path(status, headers, body, alive) + else + super + end + end + + def write_body_file(body, range) + q << [ to_io, :write_body_file, body, range ] + end + + def write_body_stream(body) + q << [ to_io, :write_body_stream, body ] + end + else # each-only body response + def write_response(status, headers, body, alive) + if body.respond_to?(:close) + write_response_close(status, headers, body, alive) + else + super + end + end + end # each-only body response + end # module Methods + include Methods def write(buf) q << [ to_io, buf ] @@ -14,6 +57,6 @@ class Rainbows::WriterThreadPool::Client < Struct.new(:to_io, :q) end def closed? - false + to_io.closed? end end diff --git a/lib/rainbows/writer_thread_spawn.rb b/lib/rainbows/writer_thread_spawn.rb index 43e4f2c..2f264d9 100644 --- a/lib/rainbows/writer_thread_spawn.rb +++ b/lib/rainbows/writer_thread_spawn.rb @@ -19,19 +19,11 @@ require 'thread' # vulnerable to slow client denial-of-service attacks. module Rainbows::WriterThreadSpawn - # :stopdoc: include Rainbows::Base - - def write_body(my_sock, body, range) # :nodoc: - if body.respond_to?(:close) - Rainbows::SyncClose.new(body) { |body| my_sock.queue_body(body, range) } - else - my_sock.queue_body(body, range) - end - end + autoload :Client, 'rainbows/writer_thread_spawn/client' def process_client(client) # :nodoc: - super(Client.new(client)) + Client.new(client).process_loop end def worker_loop(worker) # :nodoc: @@ -42,4 +34,3 @@ module Rainbows::WriterThreadSpawn # :startdoc: end # :enddoc: -require 'rainbows/writer_thread_spawn/client' diff --git a/lib/rainbows/writer_thread_spawn/client.rb b/lib/rainbows/writer_thread_spawn/client.rb index 8f65c19..15264d0 100644 --- a/lib/rainbows/writer_thread_spawn/client.rb +++ b/lib/rainbows/writer_thread_spawn/client.rb @@ -3,12 +3,56 @@ # used to wrap a BasicSocket to use with +q+ for all writes # this is compatible with IO.select class Rainbows::WriterThreadSpawn::Client < Struct.new(:to_io, :q, :thr) - include Rainbows::Response include Rainbows::SocketProxy + include Rainbows::ProcessClient include Rainbows::WorkerYield CUR = {} # :nodoc: + module Methods + def write_body_each(body) + q << [ :write_body_each, body ] + end + + def write_response_close(status, headers, body, alive) + to_io.instance_variable_set(:@hp, @hp) # XXX ugh + Rainbows::SyncClose.new(body) { |sync_body| + q << [ :write_response, status, headers, sync_body, alive ] + } + end + + if IO.respond_to?(:copy_stream) || IO.method_defined?(:sendfile_nonblock) + def write_response(status, headers, body, alive) + self.q ||= queue_writer + if body.respond_to?(:close) + write_response_close(status, headers, body, alive) + elsif body.respond_to?(:to_path) + write_response_path(status, headers, body, alive) + else + super + end + end + + def write_body_file(body, range) + q << [ :write_body_file, body, range ] + end + + def write_body_stream(body) + q << [ :write_body_stream, body ] + end + else # each-only body response + def write_response(status, headers, body, alive) + self.q ||= queue_writer + if body.respond_to?(:close) + write_response_close(status, headers, body, alive) + else + super + end + end + end # each-only body response + end # module Methods + include Methods + def self.quit g = Rainbows::G CUR.delete_if do |t,q| @@ -27,16 +71,17 @@ class Rainbows::WriterThreadSpawn::Client < Struct.new(:to_io, :q, :thr) q = Queue.new self.thr = Thread.new(to_io, q) do |io, q| - while response = q.shift + while op = q.shift begin - arg1, arg2, arg3 = response - case arg1 - when :body then write_body(io, arg2, arg3) + op, *rest = op + case op + when String + io.kgio_write(op) when :close io.close unless io.closed? break else - io.write(arg1) + io.__send__ op, *rest end rescue => e Rainbows::Error.write(io, e) @@ -51,10 +96,6 @@ class Rainbows::WriterThreadSpawn::Client < Struct.new(:to_io, :q, :thr) (self.q ||= queue_writer) << buf end - def queue_body(body, range) - (self.q ||= queue_writer) << [ :body, body, range ] - end - def close if q q << :close @@ -64,6 +105,6 @@ class Rainbows::WriterThreadSpawn::Client < Struct.new(:to_io, :q, :thr) end def closed? - false + to_io.closed? end end |