From 180485d49ea858f83ef2a28a9e07224aa514edc7 Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Fri, 22 Oct 2010 16:21:03 -0700 Subject: unindent most files This simplifies and disambiguates most constant resolution issues as well as lowering our identation level. Hopefully this makes code easier to understand. --- lib/rainbows/actor_spawn.rb | 42 +++-- lib/rainbows/app_pool.rb | 182 ++++++++++--------- lib/rainbows/configurator.rb | 76 ++++---- lib/rainbows/const.rb | 38 ++-- lib/rainbows/error.rb | 72 ++++---- lib/rainbows/ev_core.rb | 211 +++++++++++----------- lib/rainbows/event_machine.rb | 382 ++++++++++++++++++++-------------------- lib/rainbows/fiber.rb | 1 + lib/rainbows/http_server.rb | 163 +++++++++-------- lib/rainbows/max_body.rb | 18 +- lib/rainbows/queue_pool.rb | 41 ++--- lib/rainbows/rev/core.rb | 2 +- lib/rainbows/rev_fiber_spawn.rb | 45 +++-- 13 files changed, 619 insertions(+), 654 deletions(-) (limited to 'lib/rainbows') diff --git a/lib/rainbows/actor_spawn.rb b/lib/rainbows/actor_spawn.rb index 8cb839d..2b42b68 100644 --- a/lib/rainbows/actor_spawn.rb +++ b/lib/rainbows/actor_spawn.rb @@ -1,29 +1,27 @@ # -*- encoding: binary -*- require 'actor' -module Rainbows - # Actor concurrency model for Rubinius. We can't seem to get message - # passing working right, so we're throwing a Mutex into the mix for - # now. Hopefully somebody can fix things for us. Currently, this is - # exactly the same as the ThreadSpawn model since we don't use the - # message passing capabilities of the Actor model (and even then - # it wouldn't really make sense since Actors in Rubinius are just - # Threads underneath and our ThreadSpawn model is one layer of - # complexity less. - # - # This is different from the Revactor one which is not prone to race - # conditions within the same process at all (since it uses Fibers). - module ActorSpawn - include ThreadSpawn +# Actor concurrency model for Rubinius. We can't seem to get message +# passing working right, so we're throwing a Mutex into the mix for +# now. Hopefully somebody can fix things for us. Currently, this is +# exactly the same as the ThreadSpawn model since we don't use the +# message passing capabilities of the Actor model (and even then +# it wouldn't really make sense since Actors in Rubinius are just +# Threads underneath and our ThreadSpawn model is one layer of +# complexity less. +# +# This is different from the Revactor one which is not prone to race +# conditions within the same process at all (since it uses Fibers). +module Rainbows::ActorSpawn + include Rainbows::ThreadSpawn - # runs inside each forked worker, this sits around and waits - # for connections and doesn't die until the parent dies (or is - # given a INT, QUIT, or TERM signal) - def worker_loop(worker) # :nodoc: - Const::RACK_DEFAULTS["rack.multithread"] = true # :( - init_worker_process(worker) - accept_loop(Actor) - end + # runs inside each forked worker, this sits around and waits + # for connections and doesn't die until the parent dies (or is + # given a INT, QUIT, or TERM signal) + def worker_loop(worker) # :nodoc: + Rainbows::Const::RACK_DEFAULTS["rack.multithread"] = true # :( + init_worker_process(worker) + accept_loop(Actor) end end diff --git a/lib/rainbows/app_pool.rb b/lib/rainbows/app_pool.rb index 7996e2b..b406b32 100644 --- a/lib/rainbows/app_pool.rb +++ b/lib/rainbows/app_pool.rb @@ -2,105 +2,101 @@ require 'thread' -module Rainbows +# Rack middleware to limit application-level concurrency independently +# of network conncurrency in \Rainbows! Since the +worker_connections+ +# option in \Rainbows! is only intended to limit the number of +# simultaneous clients, this middleware may be used to limit the +# number of concurrent application dispatches independently of +# concurrent clients. +# +# Instead of using M:N concurrency in \Rainbows!, this middleware +# allows M:N:P concurrency where +P+ is the AppPool +:size+ while +# +M+ remains the number of +worker_processes+ and +N+ remains the +# number of +worker_connections+. +# +# rainbows master +# \_ rainbows worker[0] +# | \_ client[0,0]------\ ___app[0] +# | \_ client[0,1]-------\ /___app[1] +# | \_ client[0,2]-------->--< ... +# | ... __/ `---app[P] +# | \_ client[0,N]----/ +# \_ rainbows worker[1] +# | \_ client[1,0]------\ ___app[0] +# | \_ client[1,1]-------\ /___app[1] +# | \_ client[1,2]-------->--< ... +# | ... __/ `---app[P] +# | \_ client[1,N]----/ +# \_ rainbows worker[M] +# \_ client[M,0]------\ ___app[0] +# \_ client[M,1]-------\ /___app[1] +# \_ client[M,2]-------->--< ... +# ... __/ `---app[P] +# \_ client[M,N]----/ +# +# AppPool should be used if you want to enforce a lower value of +P+ +# than +N+. +# +# AppPool has no effect on the Rev or EventMachine concurrency models +# as those are single-threaded/single-instance as far as application +# concurrency goes. In other words, +P+ is always +one+ when using +# Rev or EventMachine. As of \Rainbows! 0.7.0, it is safe to use with +# Revactor and the new FiberSpawn and FiberPool concurrency models. +# +# Since this is Rack middleware, you may load this in your Rack +# config.ru file and even use it in threaded servers other than +# \Rainbows! +# +# use Rainbows::AppPool, :size => 30 +# map "/lobster" do +# run Rack::Lobster.new +# end +# +# You may to load this earlier or later in your middleware chain +# depending on the concurrency/copy-friendliness of your middleware(s). +class Rainbows::AppPool < Struct.new(:pool, :re) - # Rack middleware to limit application-level concurrency independently - # of network conncurrency in \Rainbows! Since the +worker_connections+ - # option in \Rainbows! is only intended to limit the number of - # simultaneous clients, this middleware may be used to limit the - # number of concurrent application dispatches independently of - # concurrent clients. - # - # Instead of using M:N concurrency in \Rainbows!, this middleware - # allows M:N:P concurrency where +P+ is the AppPool +:size+ while - # +M+ remains the number of +worker_processes+ and +N+ remains the - # number of +worker_connections+. - # - # rainbows master - # \_ rainbows worker[0] - # | \_ client[0,0]------\ ___app[0] - # | \_ client[0,1]-------\ /___app[1] - # | \_ client[0,2]-------->--< ... - # | ... __/ `---app[P] - # | \_ client[0,N]----/ - # \_ rainbows worker[1] - # | \_ client[1,0]------\ ___app[0] - # | \_ client[1,1]-------\ /___app[1] - # | \_ client[1,2]-------->--< ... - # | ... __/ `---app[P] - # | \_ client[1,N]----/ - # \_ rainbows worker[M] - # \_ client[M,0]------\ ___app[0] - # \_ client[M,1]-------\ /___app[1] - # \_ client[M,2]-------->--< ... - # ... __/ `---app[P] - # \_ client[M,N]----/ - # - # AppPool should be used if you want to enforce a lower value of +P+ - # than +N+. - # - # AppPool has no effect on the Rev or EventMachine concurrency models - # as those are single-threaded/single-instance as far as application - # concurrency goes. In other words, +P+ is always +one+ when using - # Rev or EventMachine. As of \Rainbows! 0.7.0, it is safe to use with - # Revactor and the new FiberSpawn and FiberPool concurrency models. - # - # Since this is Rack middleware, you may load this in your Rack - # config.ru file and even use it in threaded servers other than - # \Rainbows! - # - # use Rainbows::AppPool, :size => 30 - # map "/lobster" do - # run Rack::Lobster.new - # end - # - # You may to load this earlier or later in your middleware chain - # depending on the concurrency/copy-friendliness of your middleware(s). - - class AppPool < Struct.new(:pool, :re) - - # +opt+ is a hash, +:size+ is the size of the pool (default: 6) - # meaning you can have up to 6 concurrent instances of +app+ - # within one \Rainbows! worker process. We support various - # methods of the +:copy+ option: +dup+, +clone+, +deep+ and +none+. - # Depending on your +app+, one of these options should be set. - # The default +:copy+ is +:dup+ as is commonly seen in existing - # Rack middleware. - def initialize(app, opt = {}) - self.pool = Queue.new - (1...(opt[:size] || 6)).each do - pool << case (opt[:copy] || :dup) - when :none then app - when :dup then app.dup - when :clone then app.clone - when :deep then Marshal.load(Marshal.dump(app)) # unlikely... - else - raise ArgumentError, "unsupported copy method: #{opt[:copy].inspect}" - end + # +opt+ is a hash, +:size+ is the size of the pool (default: 6) + # meaning you can have up to 6 concurrent instances of +app+ + # within one \Rainbows! worker process. We support various + # methods of the +:copy+ option: +dup+, +clone+, +deep+ and +none+. + # Depending on your +app+, one of these options should be set. + # The default +:copy+ is +:dup+ as is commonly seen in existing + # Rack middleware. + def initialize(app, opt = {}) + self.pool = Queue.new + (1...(opt[:size] || 6)).each do + pool << case (opt[:copy] || :dup) + when :none then app + when :dup then app.dup + when :clone then app.clone + when :deep then Marshal.load(Marshal.dump(app)) # unlikely... + else + raise ArgumentError, "unsupported copy method: #{opt[:copy].inspect}" end - pool << app # the original end + pool << app # the original + end - # Rack application endpoint, +env+ is the Rack environment - def call(env) # :nodoc: + # Rack application endpoint, +env+ is the Rack environment + def call(env) # :nodoc: - # we have to do this check at call time (and not initialize) - # because of preload_app=true and models being changeable with SIGHUP - # fortunately this is safe for all the reentrant (but not multithreaded) - # classes that depend on it and a safe no-op for multithreaded - # concurrency models - self.re ||= begin - case env["rainbows.model"] - when :FiberSpawn, :FiberPool, :Revactor, :NeverBlock, :RevFiberSpawn - self.pool = Rainbows::Fiber::Queue.new(pool) - end - true + # we have to do this check at call time (and not initialize) + # because of preload_app=true and models being changeable with SIGHUP + # fortunately this is safe for all the reentrant (but not multithreaded) + # classes that depend on it and a safe no-op for multithreaded + # concurrency models + self.re ||= begin + case env["rainbows.model"] + when :FiberSpawn, :FiberPool, :Revactor, :NeverBlock, :RevFiberSpawn + self.pool = Rainbows::Fiber::Queue.new(pool) end - - app = pool.shift - app.call(env) - ensure - pool << app + true end + + app = pool.shift + app.call(env) + ensure + pool << app end end diff --git a/lib/rainbows/configurator.rb b/lib/rainbows/configurator.rb index 7add1f8..e69a3fb 100644 --- a/lib/rainbows/configurator.rb +++ b/lib/rainbows/configurator.rb @@ -1,46 +1,44 @@ # -*- encoding: binary -*- -module Rainbows - # This module adds \Rainbows! to the - # {Unicorn::Configurator}[http://unicorn.bogomips.org/Unicorn/Configurator.html] - module Configurator - - # configures \Rainbows! with a given concurrency model to +use+ and - # a +worker_connections+ upper-bound. This method may be called - # inside a Unicorn/\Rainbows! configuration file: - # - # Rainbows! do - # use :ThreadSpawn # concurrency model to use - # worker_connections 400 - # keepalive_timeout 0 # zero disables keepalives entirely - # client_max_body_size 5*1024*1024 # 5 megabytes - # end - # - # # the rest of the Unicorn configuration - # worker_processes 8 - # - # See the documentation for the respective Revactor, ThreadSpawn, - # and ThreadPool classes for descriptions and recommendations for - # each of them. The total number of clients we're able to serve is - # +worker_processes+ * +worker_connections+, so in the above example - # we can serve 8 * 400 = 3200 clients concurrently. - # - # The default is +keepalive_timeout+ is 5 seconds, which should be - # enough under most conditions for browsers to render the page and - # start retrieving extra elements for. Increasing this beyond 5 - # seconds is not recommended. Zero disables keepalive entirely - # (but pipelining fully-formed requests is still works). - # - # The default +client_max_body_size+ is 1 megabyte (1024 * 1024 bytes), - # setting this to +nil+ will disable body size checks and allow any - # size to be specified. - def Rainbows!(&block) - block_given? or raise ArgumentError, "Rainbows! requires a block" - HttpServer.setup(block) - end +# This module adds \Rainbows! to the +# {Unicorn::Configurator}[http://unicorn.bogomips.org/Unicorn/Configurator.html] +module Rainbows::Configurator + # configures \Rainbows! with a given concurrency model to +use+ and + # a +worker_connections+ upper-bound. This method may be called + # inside a Unicorn/\Rainbows! configuration file: + # + # Rainbows! do + # use :ThreadSpawn # concurrency model to use + # worker_connections 400 + # keepalive_timeout 0 # zero disables keepalives entirely + # client_max_body_size 5*1024*1024 # 5 megabytes + # end + # + # # the rest of the Unicorn configuration + # worker_processes 8 + # + # See the documentation for the respective Revactor, ThreadSpawn, + # and ThreadPool classes for descriptions and recommendations for + # each of them. The total number of clients we're able to serve is + # +worker_processes+ * +worker_connections+, so in the above example + # we can serve 8 * 400 = 3200 clients concurrently. + # + # The default is +keepalive_timeout+ is 5 seconds, which should be + # enough under most conditions for browsers to render the page and + # start retrieving extra elements for. Increasing this beyond 5 + # seconds is not recommended. Zero disables keepalive entirely + # (but pipelining fully-formed requests is still works). + # + # The default +client_max_body_size+ is 1 megabyte (1024 * 1024 bytes), + # setting this to +nil+ will disable body size checks and allow any + # size to be specified. + def Rainbows!(&block) + block_given? or raise ArgumentError, "Rainbows! requires a block" + Rainbows::HttpServer.setup(block) end end +# :enddoc: # inject the Rainbows! method into Unicorn::Configurator -Unicorn::Configurator.class_eval { include Rainbows::Configurator } +Unicorn::Configurator.__send__(:include, Rainbows::Configurator) diff --git a/lib/rainbows/const.rb b/lib/rainbows/const.rb index 1c77c76..ecdafab 100644 --- a/lib/rainbows/const.rb +++ b/lib/rainbows/const.rb @@ -1,30 +1,28 @@ # -*- encoding: binary -*- # :enddoc: -module Rainbows +module Rainbows::Const - module Const - RAINBOWS_VERSION = '0.97.0' + RAINBOWS_VERSION = '0.97.0' - include Unicorn::Const + include Unicorn::Const - RACK_DEFAULTS = Unicorn::HttpRequest::DEFAULTS.update({ - "SERVER_SOFTWARE" => "Rainbows! #{RAINBOWS_VERSION}", + RACK_DEFAULTS = Unicorn::HttpRequest::DEFAULTS.update({ + "SERVER_SOFTWARE" => "Rainbows! #{RAINBOWS_VERSION}", - # using the Rev model, we'll automatically chunk pipe and socket objects - # if they're the response body. Unset by default. - # "rainbows.autochunk" => false, - }) + # using the Rev model, we'll automatically chunk pipe and socket objects + # if they're the response body. Unset by default. + # "rainbows.autochunk" => false, + }) - # client IO object that supports reading and writing directly - # without filtering it through the HTTP chunk parser. - # Maybe we can get this renamed to "rack.io" if it becomes part - # of the official spec, but for now it is "hack.io" - CLIENT_IO = "hack.io".freeze + # client IO object that supports reading and writing directly + # without filtering it through the HTTP chunk parser. + # Maybe we can get this renamed to "rack.io" if it becomes part + # of the official spec, but for now it is "hack.io" + CLIENT_IO = "hack.io".freeze - ERROR_413_RESPONSE = "HTTP/1.1 413 Request Entity Too Large\r\n\r\n" - ERROR_416_RESPONSE = "HTTP/1.1 416 Requested Range Not Satisfiable\r\n\r\n" + ERROR_413_RESPONSE = "HTTP/1.1 413 Request Entity Too Large\r\n\r\n" + ERROR_416_RESPONSE = "HTTP/1.1 416 Requested Range Not Satisfiable\r\n\r\n" - RACK_INPUT = Unicorn::HttpRequest::RACK_INPUT - REMOTE_ADDR = Unicorn::HttpRequest::REMOTE_ADDR - end + RACK_INPUT = Unicorn::HttpRequest::RACK_INPUT + REMOTE_ADDR = Unicorn::HttpRequest::REMOTE_ADDR end diff --git a/lib/rainbows/error.rb b/lib/rainbows/error.rb index 7c91050..bdbfdc5 100644 --- a/lib/rainbows/error.rb +++ b/lib/rainbows/error.rb @@ -1,48 +1,44 @@ # -*- encoding: binary -*- # :enddoc: -module Rainbows +module Rainbows::Error - class Error - class << self + G = Rainbows::G - # if we get any error, try to write something back to the client - # assuming we haven't closed the socket, but don't get hung up - # if the socket is already closed or broken. We'll always ensure - # the socket is closed at the end of this function - def write(io, e) - msg = Error.response(e) and io.write_nonblock(msg) - rescue - end - - def app(e) - G.server.logger.error "app error: #{e.inspect}" - G.server.logger.error e.backtrace.join("\n") - rescue - end + # if we get any error, try to write something back to the client + # assuming we haven't closed the socket, but don't get hung up + # if the socket is already closed or broken. We'll always ensure + # the socket is closed at the end of this function + def self.write(io, e) + msg = response(e) and io.write_nonblock(msg) + rescue + end - def listen_loop(e) - G.alive or return - G.server.logger.error "listen loop error: #{e.inspect}." - G.server.logger.error e.backtrace.join("\n") - rescue - end + def self.app(e) + G.server.logger.error "app error: #{e.inspect}" + G.server.logger.error e.backtrace.join("\n") + rescue + end - def response(e) - case e - when EOFError, Errno::ECONNRESET, Errno::EPIPE, Errno::EINVAL, - Errno::EBADF, Errno::ENOTCONN - # swallow error if client shuts down one end or disconnects - when Rainbows::Response416 - Const::ERROR_416_RESPONSE - when Unicorn::HttpParserError - Const::ERROR_400_RESPONSE # try to tell the client they're bad - when IOError # HttpParserError is an IOError - else - app(e) - Const::ERROR_500_RESPONSE - end - end + def self.listen_loop(e) + G.alive or return + G.server.logger.error "listen loop error: #{e.inspect}." + G.server.logger.error e.backtrace.join("\n") + rescue + end + def self.response(e) + case e + when EOFError, Errno::ECONNRESET, Errno::EPIPE, Errno::EINVAL, + Errno::EBADF, Errno::ENOTCONN + # swallow error if client shuts down one end or disconnects + when Rainbows::Response416 + Rainbows::Const::ERROR_416_RESPONSE + when Unicorn::HttpParserError + Rainbows::Const::ERROR_400_RESPONSE # try to tell the client they're bad + when IOError # HttpParserError is an IOError + else + app(e) + Rainbows::Const::ERROR_500_RESPONSE end end end diff --git a/lib/rainbows/ev_core.rb b/lib/rainbows/ev_core.rb index 9761144..200bf79 100644 --- a/lib/rainbows/ev_core.rb +++ b/lib/rainbows/ev_core.rb @@ -1,130 +1,125 @@ # -*- encoding: binary -*- # :enddoc: -module Rainbows - - # base module for evented models like Rev and EventMachine - module EvCore - include Unicorn - include Rainbows::Const - include Rainbows::Response - G = Rainbows::G - NULL_IO = Unicorn::HttpRequest::NULL_IO - - # Apps may return this Rack response: AsyncResponse = [ -1, {}, [] ] - ASYNC_CALLBACK = "async.callback".freeze - - ASYNC_CLOSE = "async.close".freeze - - def post_init - @env = {} - @hp = HttpParser.new - @state = :headers # [ :body [ :trailers ] ] :app_call :close - @buf = "" - end +# base module for evented models like Rev and EventMachine +module Rainbows::EvCore + include Rainbows::Const + include Rainbows::Response + G = Rainbows::G + NULL_IO = Unicorn::HttpRequest::NULL_IO + HttpParser = Unicorn::HttpParser + + # Apps may return this Rack response: AsyncResponse = [ -1, {}, [] ] + ASYNC_CALLBACK = "async.callback".freeze + + ASYNC_CLOSE = "async.close".freeze + + def post_init + @env = {} + @hp = HttpParser.new + @state = :headers # [ :body [ :trailers ] ] :app_call :close + @buf = "" + end - # graceful exit, like SIGQUIT - def quit - @state = :close - end + # graceful exit, like SIGQUIT + def quit + @state = :close + end - def handle_error(e) - msg = Error.response(e) and write(msg) - ensure - quit - end + def handle_error(e) + msg = Rainbows::Error.response(e) and write(msg) + ensure + quit + end - # returns whether to enable response chunking for autochunk models - def stream_response_headers(status, headers) - if headers['Content-Length'] - rv = false - else - rv = !!(headers['Transfer-Encoding'] =~ %r{\Achunked\z}i) - rv = false if headers.delete('X-Rainbows-Autochunk') == 'no' - end - write(response_header(status, headers)) - rv + # returns whether to enable response chunking for autochunk models + def stream_response_headers(status, headers) + if headers['Content-Length'] + rv = false + else + rv = !!(headers['Transfer-Encoding'] =~ %r{\Achunked\z}i) + rv = false if headers.delete('X-Rainbows-Autochunk') == 'no' end + write(response_header(status, headers)) + rv + end - # TeeInput doesn't map too well to this right now... - def on_read(data) - case @state - when :headers - @hp.headers(@env, @buf << data) or return - @state = :body - len = @hp.content_length - if len == 0 - @input = NULL_IO - app_call # common case - else # nil or len > 0 - # since we don't do streaming input, we have no choice but - # to take over 100-continue handling from the Rack application - if @env[HTTP_EXPECT] =~ /\A100-continue\z/i - write(EXPECT_100_RESPONSE) - @env.delete(HTTP_EXPECT) - end - @input = CapInput.new(len, self) - @hp.filter_body(@buf2 = "", @buf) - @input << @buf2 - on_read("") - end - when :body - if @hp.body_eof? - @state = :trailers - on_read(data) - elsif data.size > 0 - @hp.filter_body(@buf2, @buf << data) - @input << @buf2 - on_read("") - end - when :trailers - if @hp.trailers(@env, @buf << data) - @input.rewind - app_call + # TeeInput doesn't map too well to this right now... + def on_read(data) + case @state + when :headers + @hp.headers(@env, @buf << data) or return + @state = :body + len = @hp.content_length + if len == 0 + @input = NULL_IO + app_call # common case + else # nil or len > 0 + # since we don't do streaming input, we have no choice but + # to take over 100-continue handling from the Rack application + if @env[HTTP_EXPECT] =~ /\A100-continue\z/i + write(EXPECT_100_RESPONSE) + @env.delete(HTTP_EXPECT) end + @input = CapInput.new(len, self) + @hp.filter_body(@buf2 = "", @buf) + @input << @buf2 + on_read("") + end + when :body + if @hp.body_eof? + @state = :trailers + on_read(data) + elsif data.size > 0 + @hp.filter_body(@buf2, @buf << data) + @input << @buf2 + on_read("") + end + when :trailers + if @hp.trailers(@env, @buf << data) + @input.rewind + app_call end - rescue => e - handle_error(e) end + rescue => e + handle_error(e) + end - class CapInput < Struct.new(:io, :client, :bytes_left) - MAX_BODY = Unicorn::Const::MAX_BODY - TmpIO = Unicorn::TmpIO + class CapInput < Struct.new(:io, :client, :bytes_left) + MAX_BODY = Unicorn::Const::MAX_BODY + TmpIO = Unicorn::TmpIO - def self.err(client, msg) - client.write(Const::ERROR_413_RESPONSE) - client.quit + def self.err(client, msg) + client.write(Rainbows::Const::ERROR_413_RESPONSE) + client.quit - # zip back up the stack - raise IOError, msg, [] - end + # zip back up the stack + raise IOError, msg, [] + end - def self.new(len, client) - max = Rainbows.max_bytes - if len - if max && (len > max) - err(client, "Content-Length too big: #{len} > #{max}") - end - len <= MAX_BODY ? StringIO.new("") : TmpIO.new - else - max ? super(TmpIO.new, client, max) : TmpIO.new + def self.new(len, client) + max = Rainbows.max_bytes + if len + if max && (len > max) + err(client, "Content-Length too big: #{len} > #{max}") end + len <= MAX_BODY ? StringIO.new("") : TmpIO.new + else + max ? super(TmpIO.new, client, max) : TmpIO.new end + end - def <<(buf) - if (self.bytes_left -= buf.size) < 0 - io.close - CapInput.err(client, "chunked request body too big") - end - io << buf + def <<(buf) + if (self.bytes_left -= buf.size) < 0 + io.close + CapInput.err(client, "chunked request body too big") end - - def gets; io.gets; end - def each(&block); io.each(&block); end - def size; io.size; end - def rewind; io.rewind; end - def read(*args); io.read(*args); end - + io << buf end + def gets; io.gets; end + def each(&block); io.each(&block); end + def size; io.size; end + def rewind; io.rewind; end + def read(*args); io.read(*args); end end end diff --git a/lib/rainbows/event_machine.rb b/lib/rainbows/event_machine.rb index 96d9a9e..2f363a1 100644 --- a/lib/rainbows/event_machine.rb +++ b/lib/rainbows/event_machine.rb @@ -3,222 +3,216 @@ require 'eventmachine' EM::VERSION >= '0.12.10' or abort 'eventmachine 0.12.10 is required' require 'rainbows/ev_core' -module Rainbows - - # Implements a basic single-threaded event model with - # {EventMachine}[http://rubyeventmachine.com/]. It is capable of - # handling thousands of simultaneous client connections, but with only - # a single-threaded app dispatch. It is suited for slow clients, - # and can work with slow applications via asynchronous libraries such as - # {async_sinatra}[http://github.com/raggi/async_sinatra], - # {Cramp}[http://m.onkey.org/2010/1/7/introducing-cramp], - # and {rack-fiber_pool}[http://github.com/mperham/rack-fiber_pool]. - # - # It does not require your Rack application to be thread-safe, - # reentrancy is only required for the DevFdResponse body - # generator. - # - # Compatibility: Whatever \EventMachine ~> 0.12.10 and Unicorn both - # support, currently Ruby 1.8/1.9. - # - # This model is compatible with users of "async.callback" in the Rack - # environment such as - # {async_sinatra}[http://github.com/raggi/async_sinatra]. - # - # For a complete asynchronous framework, - # {Cramp}[http://m.onkey.org/2010/1/7/introducing-cramp] is fully - # supported when using this concurrency model. - # - # This model is fully-compatible with - # {rack-fiber_pool}[http://github.com/mperham/rack-fiber_pool] - # which allows each request to run inside its own \Fiber after - # all request processing is complete. - # - # Merb (and other frameworks/apps) supporting +deferred?+ execution as - # documented at http://brainspl.at/articles/2008/04/18/deferred-requests-with-merb-ebb-and-thin - # will also get the ability to conditionally defer request processing - # to a separate thread. - # - # This model does not implement as streaming "rack.input" which allows - # the Rack application to process data as it arrives. This means - # "rack.input" will be fully buffered in memory or to a temporary file - # before the application is entered. - - module EventMachine - - include Base - autoload :ResponsePipe, 'rainbows/event_machine/response_pipe' - autoload :ResponseChunkPipe, 'rainbows/event_machine/response_chunk_pipe' - autoload :TryDefer, 'rainbows/event_machine/try_defer' - - class Client < EM::Connection # :nodoc: all - attr_writer :body - include Rainbows::EvCore - G = Rainbows::G - - def initialize(io) - @_io = io - @body = nil +# Implements a basic single-threaded event model with +# {EventMachine}[http://rubyeventmachine.com/]. It is capable of +# handling thousands of simultaneous client connections, but with only +# a single-threaded app dispatch. It is suited for slow clients, +# and can work with slow applications via asynchronous libraries such as +# {async_sinatra}[http://github.com/raggi/async_sinatra], +# {Cramp}[http://m.onkey.org/2010/1/7/introducing-cramp], +# and {rack-fiber_pool}[http://github.com/mperham/rack-fiber_pool]. +# +# It does not require your Rack application to be thread-safe, +# reentrancy is only required for the DevFdResponse body +# generator. +# +# Compatibility: Whatever \EventMachine ~> 0.12.10 and Unicorn both +# support, currently Ruby 1.8/1.9. +# +# This model is compatible with users of "async.callback" in the Rack +# environment such as +# {async_sinatra}[http://github.com/raggi/async_sinatra]. +# +# For a complete asynchronous framework, +# {Cramp}[http://m.onkey.org/2010/1/7/introducing-cramp] is fully +# supported when using this concurrency model. +# +# This model is fully-compatible with +# {rack-fiber_pool}[http://github.com/mperham/rack-fiber_pool] +# which allows each request to run inside its own \Fiber after +# all request processing is complete. +# +# Merb (and other frameworks/apps) supporting +deferred?+ execution as +# documented at http://brainspl.at/articles/2008/04/18/deferred-requests-with-merb-ebb-and-thin +# will also get the ability to conditionally defer request processing +# to a separate thread. +# +# This model does not implement as streaming "rack.input" which allows +# the Rack application to process data as it arrives. This means +# "rack.input" will be fully buffered in memory or to a temporary file +# before the application is entered. +module Rainbows::EventMachine + + include Rainbows::Base + autoload :ResponsePipe, 'rainbows/event_machine/response_pipe' + autoload :ResponseChunkPipe, 'rainbows/event_machine/response_chunk_pipe' + autoload :TryDefer, 'rainbows/event_machine/try_defer' + + class Client < EM::Connection # :nodoc: all + attr_writer :body + include Rainbows::EvCore + + def initialize(io) + @_io = io + @body = nil + end + + alias write send_data + + def receive_data(data) + # To avoid clobbering the current streaming response + # (often a static file), we do not attempt to process another + # request on the same connection until the first is complete + if @body + @buf << data + @_io.shutdown(Socket::SHUT_RD) if @buf.size > 0x1c000 + EM.next_tick { receive_data('') } + else + on_read(data) end + end - alias write send_data + def quit + super + close_connection_after_writing + end - def receive_data(data) - # To avoid clobbering the current streaming response - # (often a static file), we do not attempt to process another - # request on the same connection until the first is complete - if @body - @buf << data - @_io.shutdown(Socket::SHUT_RD) if @buf.size > 0x1c000 - EM.next_tick { receive_data('') } + def app_call + set_comm_inactivity_timeout 0 + @env[RACK_INPUT] = @input + @env[REMOTE_ADDR] = @_io.kgio_addr + @env[ASYNC_CALLBACK] = method(:em_write_response) + @env[ASYNC_CLOSE] = EM::DefaultDeferrable.new + + response = catch(:async) { APP.call(@env.update(RACK_DEFAULTS)) } + + # too tricky to support pipelining with :async since the + # second (pipelined) request could be a stuck behind a + # long-running async response + (response.nil? || -1 == response[0]) and return @state = :close + + alive = @hp.keepalive? && G.alive && G.kato > 0 + em_write_response(response, alive) + if alive + @env.clear + @hp.reset + @state = :headers + if @buf.empty? + set_comm_inactivity_timeout(G.kato) else - on_read(data) + EM.next_tick { receive_data('') } end end + end - def quit - super - close_connection_after_writing + def em_write_response(response, alive = false) + status, headers, body = response + if @hp.headers? + headers = HH.new(headers) + headers[CONNECTION] = alive ? KEEP_ALIVE : CLOSE + else + headers = nil end - def app_call - set_comm_inactivity_timeout 0 - @env[RACK_INPUT] = @input - @env[REMOTE_ADDR] = @_io.kgio_addr - @env[ASYNC_CALLBACK] = method(:em_write_response) - @env[ASYNC_CLOSE] = EM::DefaultDeferrable.new - - response = catch(:async) { APP.call(@env.update(RACK_DEFAULTS)) } - - # too tricky to support pipelining with :async since the - # second (pipelined) request could be a stuck behind a - # long-running async response - (response.nil? || -1 == response[0]) and return @state = :close - - alive = @hp.keepalive? && G.alive && G.kato > 0 - em_write_response(response, alive) - if alive - @env.clear - @hp.reset - @state = :headers - if @buf.empty? - set_comm_inactivity_timeout(G.kato) - else - EM.next_tick { receive_data('') } + if body.respond_to?(:errback) && body.respond_to?(:callback) + @body = body + body.callback { quit } + body.errback { quit } + # async response, this could be a trickle as is in comet-style apps + headers[CONNECTION] = CLOSE if headers + alive = true + elsif body.respond_to?(:to_path) + st = File.stat(path = body.to_path) + + if st.file? + write(response_header(status, headers)) if headers + @body = stream_file_data(path) + @body.errback do + body.close if body.respond_to?(:close) + quit end - end - end - - def em_write_response(response, alive = false) - status, headers, body = response - if @hp.headers? - headers = HH.new(headers) - headers[CONNECTION] = alive ? KEEP_ALIVE : CLOSE - else - headers = nil - end - - if body.respond_to?(:errback) && body.respond_to?(:callback) - @body = body - body.callback { quit } - body.errback { quit } - # async response, this could be a trickle as is in comet-style apps - headers[CONNECTION] = CLOSE if headers - alive = true - elsif body.respond_to?(:to_path) - st = File.stat(path = body.to_path) - - if st.file? - write(response_header(status, headers)) if headers - @body = stream_file_data(path) - @body.errback do - body.close if body.respond_to?(:close) - quit - end - @body.callback do - body.close if body.respond_to?(:close) - @body = nil - alive ? receive_data('') : quit - end - return - elsif st.socket? || st.pipe? - @body = io = body_to_io(body) - chunk = stream_response_headers(status, headers) if headers - m = chunk ? ResponseChunkPipe : ResponsePipe - return EM.watch(io, m, self, alive, body).notify_readable = true + @body.callback do + body.close if body.respond_to?(:close) + @body = nil + alive ? receive_data('') : quit end - # char or block device... WTF? fall through to body.each + return + elsif st.socket? || st.pipe? + @body = io = body_to_io(body) + chunk = stream_response_headers(status, headers) if headers + m = chunk ? ResponseChunkPipe : ResponsePipe + return EM.watch(io, m, self, alive, body).notify_readable = true end - - write(response_header(status, headers)) if headers - write_body_each(self, body) - quit unless alive + # char or block device... WTF? fall through to body.each end - def unbind - async_close = @env[ASYNC_CLOSE] and async_close.succeed - @body.respond_to?(:fail) and @body.fail - begin - @_io.close - rescue Errno::EBADF - # EventMachine's EventableDescriptor::Close() may close - # the underlying file descriptor without invalidating the - # associated IO object on errors, so @_io.closed? isn't - # sufficient. - end - end + write(response_header(status, headers)) if headers + write_body_each(self, body) + quit unless alive end - module Server # :nodoc: all - def close - detach - @io.close - end - - def notify_readable - return if CUR.size >= MAX - io = @io.kgio_tryaccept or return - sig = EM.attach_fd(io.fileno, false) - CUR[sig] = CL.new(sig, io) + def unbind + async_close = @env[ASYNC_CLOSE] and async_close.succeed + @body.respond_to?(:fail) and @body.fail + begin + @_io.close + rescue Errno::EBADF + # EventMachine's EventableDescriptor::Close() may close + # the underlying file descriptor without invalidating the + # associated IO object on errors, so @_io.closed? isn't + # sufficient. end end + end - def init_worker_process(worker) # :nodoc: - Rainbows::Response.setup(Rainbows::EventMachine::Client) - super + module Server # :nodoc: all + def close + detach + @io.close end - # runs inside each forked worker, this sits around and waits - # for connections and doesn't die until the parent dies (or is - # given a INT, QUIT, or TERM signal) - def worker_loop(worker) # :nodoc: - init_worker_process(worker) - G.server.app.respond_to?(:deferred?) and - G.server.app = TryDefer[G.server.app] - - # enable them both, should be non-fatal if not supported - EM.epoll - EM.kqueue - logger.info "#@use: epoll=#{EM.epoll?} kqueue=#{EM.kqueue?}" - client_class = Rainbows.const_get(@use).const_get(:Client) - Server.const_set(:MAX, worker_connections + LISTENERS.size) - Server.const_set(:CL, client_class) - client_class.const_set(:APP, G.server.app) - EM.run { - conns = EM.instance_variable_get(:@conns) or - raise RuntimeError, "EM @conns instance variable not accessible!" - Server.const_set(:CUR, conns) - EM.add_periodic_timer(1) do - unless G.tick - conns.each_value { |c| client_class === c and c.quit } - EM.stop if conns.empty? && EM.reactor_running? - end - end - LISTENERS.map! do |s| - EM.watch(s, Server) { |c| c.notify_readable = true } - end - } + def notify_readable + return if CUR.size >= MAX + io = @io.kgio_tryaccept or return + sig = EM.attach_fd(io.fileno, false) + CUR[sig] = CL.new(sig, io) end + end + + def init_worker_process(worker) # :nodoc: + Rainbows::Response.setup(Rainbows::EventMachine::Client) + super + end + # runs inside each forked worker, this sits around and waits + # for connections and doesn't die until the parent dies (or is + # given a INT, QUIT, or TERM signal) + def worker_loop(worker) # :nodoc: + init_worker_process(worker) + G.server.app.respond_to?(:deferred?) and + G.server.app = TryDefer[G.server.app] + + # enable them both, should be non-fatal if not supported + EM.epoll + EM.kqueue + logger.info "#@use: epoll=#{EM.epoll?} kqueue=#{EM.kqueue?}" + client_class = Rainbows.const_get(@use).const_get(:Client) + Server.const_set(:MAX, worker_connections + LISTENERS.size) + Server.const_set(:CL, client_class) + client_class.const_set(:APP, G.server.app) + EM.run { + conns = EM.instance_variable_get(:@conns) or + raise RuntimeError, "EM @conns instance variable not accessible!" + Server.const_set(:CUR, conns) + EM.add_periodic_timer(1) do + unless G.tick + conns.each_value { |c| client_class === c and c.quit } + EM.stop if conns.empty? && EM.reactor_running? + end + end + LISTENERS.map! do |s| + EM.watch(s, Server) { |c| c.notify_readable = true } + end + } end end diff --git a/lib/rainbows/fiber.rb b/lib/rainbows/fiber.rb index 94f7d94..b516fb9 100644 --- a/lib/rainbows/fiber.rb +++ b/lib/rainbows/fiber.rb @@ -30,4 +30,5 @@ module Rainbows::Fiber autoload :Base, 'rainbows/fiber/base' autoload :Queue, 'rainbows/fiber/queue' + autoload :IO, 'rainbows/fiber/io' end diff --git a/lib/rainbows/http_server.rb b/lib/rainbows/http_server.rb index 0ed6717..3826a23 100644 --- a/lib/rainbows/http_server.rb +++ b/lib/rainbows/http_server.rb @@ -1,100 +1,97 @@ # -*- encoding: binary -*- # :enddoc: -module Rainbows - class HttpServer < ::Unicorn::HttpServer - include Rainbows +class Rainbows::HttpServer < Unicorn::HttpServer + G = Rainbows::G - class << self - def setup(block) - G.server.instance_eval(&block) - end - end - - def initialize(app, options) - G.server = self - @logger = Unicorn::Configurator::DEFAULTS[:logger] - rv = super(app, options) - defined?(@use) or use(:Base) - @worker_connections ||= MODEL_WORKER_CONNECTIONS[@use] - end + def self.setup(block) + G.server.instance_eval(&block) + end - def reopen_worker_logs(worker_nr) - logger.info "worker=#{worker_nr} reopening logs..." - Unicorn::Util.reopen_logs - logger.info "worker=#{worker_nr} done reopening logs" - rescue - G.quit! # let the master reopen and refork us - end + def initialize(app, options) + G.server = self + @logger = Unicorn::Configurator::DEFAULTS[:logger] + rv = super(app, options) + defined?(@use) or use(:Base) + @worker_connections ||= Rainbows::MODEL_WORKER_CONNECTIONS[@use] + end - #:stopdoc: - # - # Add one second to the timeout since our fchmod heartbeat is less - # precise (and must be more conservative) than Unicorn does. We - # handle many clients per process and can't chmod on every - # connection we accept without wasting cycles. That added to the - # fact that we let clients keep idle connections open for long - # periods of time means we have to chmod at a fixed interval. - def timeout=(nr) - @timeout = nr + 1 - end - #:startdoc: + def reopen_worker_logs(worker_nr) + logger.info "worker=#{worker_nr} reopening logs..." + Unicorn::Util.reopen_logs + logger.info "worker=#{worker_nr} done reopening logs" + rescue + G.quit! # let the master reopen and refork us + end - def use(*args) - model = args.shift or return @use - mod = begin - Rainbows.const_get(model) - rescue NameError => e - logger.error "error loading #{model.inspect}: #{e}" - e.backtrace.each { |l| logger.error l } - raise ArgumentError, "concurrency model #{model.inspect} not supported" - end + #:stopdoc: + # + # Add one second to the timeout since our fchmod heartbeat is less + # precise (and must be more conservative) than Unicorn does. We + # handle many clients per process and can't chmod on every + # connection we accept without wasting cycles. That added to the + # fact that we let clients keep idle connections open for long + # periods of time means we have to chmod at a fixed interval. + def timeout=(nr) + @timeout = nr + 1 + end + #:startdoc: - Module === mod or - raise ArgumentError, "concurrency model #{model.inspect} not supported" - extend(mod) - args.each do |opt| - case opt - when Hash; O.update(opt) - when Symbol; O[opt] = true - else; raise ArgumentError, "can't handle option: #{opt.inspect}" - end - end - mod.setup if mod.respond_to?(:setup) - Const::RACK_DEFAULTS['rainbows.model'] = @use = model.to_sym - Const::RACK_DEFAULTS['rack.multithread'] = !!(model.to_s =~ /Thread/) + def use(*args) + model = args.shift or return @use + mod = begin + Rainbows.const_get(model) + rescue NameError => e + logger.error "error loading #{model.inspect}: #{e}" + e.backtrace.each { |l| logger.error l } + raise ArgumentError, "concurrency model #{model.inspect} not supported" + end - case @use - when :Rev, :EventMachine, :NeverBlock - Const::RACK_DEFAULTS['rainbows.autochunk'] = true + Module === mod or + raise ArgumentError, "concurrency model #{model.inspect} not supported" + extend(mod) + args.each do |opt| + case opt + when Hash; O.update(opt) + when Symbol; O[opt] = true + else; raise ArgumentError, "can't handle option: #{opt.inspect}" end end - - def worker_connections(*args) - return @worker_connections if args.empty? - nr = args[0] - (Integer === nr && nr > 0) or - raise ArgumentError, "worker_connections must be a positive Integer" - @worker_connections = nr + mod.setup if mod.respond_to?(:setup) + new_defaults = { + 'rainbows.model' => (@use = model.to_sym), + 'rack.multithread' => !!(model.to_s =~ /Thread/), + } + case @use + when :Rev, :EventMachine, :NeverBlock + new_defaults['rainbows.autochunk'] = true end + Rainbows::Const::RACK_DEFAULTS.update(new_defaults) + end - def keepalive_timeout(nr) - (Integer === nr && nr >= 0) or - raise ArgumentError, "keepalive must be a non-negative Integer" - G.kato = nr - end + def worker_connections(*args) + return @worker_connections if args.empty? + nr = args[0] + (Integer === nr && nr > 0) or + raise ArgumentError, "worker_connections must be a positive Integer" + @worker_connections = nr + end - def client_max_body_size(nr) - err = "client_max_body_size must be nil or a non-negative Integer" - case nr - when nil - when Integer - nr >= 0 or raise ArgumentError, err - else - raise ArgumentError, err - end - Rainbows.max_bytes = nr - end + def keepalive_timeout(nr) + (Integer === nr && nr >= 0) or + raise ArgumentError, "keepalive must be a non-negative Integer" + G.kato = nr end + def client_max_body_size(nr) + err = "client_max_body_size must be nil or a non-negative Integer" + case nr + when nil + when Integer + nr >= 0 or raise ArgumentError, err + else + raise ArgumentError, err + end + Rainbows.max_bytes = nr + end end diff --git a/lib/rainbows/max_body.rb b/lib/rainbows/max_body.rb index d825d2f..223c57e 100644 --- a/lib/rainbows/max_body.rb +++ b/lib/rainbows/max_body.rb @@ -1,17 +1,17 @@ # -*- encoding: binary -*- # :enddoc: -module Rainbows # middleware used to enforce client_max_body_size for TeeInput users, # there is no need to configure this middleware manually, it will # automatically be configured for you based on the client_max_body_size # setting -class MaxBody < Struct.new(:app) +class Rainbows::MaxBody < Struct.new(:app) # this is meant to be included in Rainbows::TeeInput (and derived # classes) to limit body sizes module Limit TmpIO = Unicorn::TmpIO + MAX_BODY = Rainbows::Const::MAX_BODY def initialize(socket, request) @parser = request @@ -21,7 +21,7 @@ class MaxBody < Struct.new(:app) max = Rainbows.max_bytes # never nil, see MaxBody.setup if @len && @len > max - socket.write(Const::ERROR_413_RESPONSE) + socket.write(Rainbows::Const::ERROR_413_RESPONSE) socket.close raise IOError, "Content-Length too big: #@len > #{max}", [] end @@ -32,7 +32,7 @@ class MaxBody < Struct.new(:app) parser.filter_body(@buf2, @buf) and finalize_input @buf2.size > max and raise IOError, "chunked request body too big", [] end - @tmp = @len && @len < Const::MAX_BODY ? StringIO.new("") : TmpIO.new + @tmp = @len && @len < MAX_BODY ? StringIO.new("") : TmpIO.new if @buf2.size > 0 @tmp.write(@buf2) @tmp.rewind @@ -58,15 +58,15 @@ class MaxBody < Struct.new(:app) # if it's reconfigured def self.setup Rainbows.max_bytes or return - case G.server.use + case Rainbows::G.server.use when :Rev, :EventMachine, :NeverBlock return end - TeeInput.class_eval { include Limit } + Rainbows::TeeInput.__send__(:include, Limit) # force ourselves to the outermost middleware layer - G.server.app = MaxBody.new(G.server.app) + Rainbows::G.server.app = self.new(Rainbows::G.server.app) end # Rack response returned when there's an error @@ -78,6 +78,4 @@ class MaxBody < Struct.new(:app) def call(env) catch(:rainbows_EFBIG) { app.call(env) } || err(env) end - -end # class -end # module +end diff --git a/lib/rainbows/queue_pool.rb b/lib/rainbows/queue_pool.rb index 3ae899c..4a2ab8c 100644 --- a/lib/rainbows/queue_pool.rb +++ b/lib/rainbows/queue_pool.rb @@ -2,32 +2,29 @@ # :enddoc: require 'thread' -module Rainbows +# Thread pool class based on pulling off a single Ruby Queue. +# This is NOT used for the ThreadPool class, since that class does not +# need a userspace Queue. +class Rainbows::QueuePool < Struct.new(:queue, :threads) + G = Rainbows::G - # Thread pool class based on pulling off a single Ruby Queue. - # This is NOT used for the ThreadPool class, since that class does not - # need a userspace Queue. - class QueuePool < Struct.new(:queue, :threads) - G = Rainbows::G - - def initialize(size = 20, &block) - q = Queue.new - self.threads = (1..size).map do - Thread.new do - while job = q.shift - block.call(job) - end + def initialize(size = 20, &block) + q = Queue.new + self.threads = (1..size).map do + Thread.new do + while job = q.shift + block.call(job) end end - self.queue = q end + self.queue = q + end - def quit! - threads.each { |_| queue << nil } - threads.delete_if do |t| - G.tick - t.alive? ? t.join(0.01) : true - end until threads.empty? - end + def quit! + threads.each { |_| queue << nil } + threads.delete_if do |t| + G.tick + t.alive? ? t.join(0.01) : true + end until threads.empty? end end diff --git a/lib/rainbows/rev/core.rb b/lib/rainbows/rev/core.rb index 2273b24..abd7ca7 100644 --- a/lib/rainbows/rev/core.rb +++ b/lib/rainbows/rev/core.rb @@ -26,7 +26,7 @@ module Rainbows require 'rainbows/rev/sendfile' Rainbows::Rev::Client.__send__(:include, Rainbows::Rev::Sendfile) init_worker_process(worker) - mod = self.class.const_get(@use) + mod = Rainbows.const_get(@use) rloop = Server.const_set(:LOOP, ::Rev::Loop.default) Server.const_set(:MAX, @worker_connections) Server.const_set(:CL, mod.const_get(:Client)) diff --git a/lib/rainbows/rev_fiber_spawn.rb b/lib/rainbows/rev_fiber_spawn.rb index 522ae71..39483b3 100644 --- a/lib/rainbows/rev_fiber_spawn.rb +++ b/lib/rainbows/rev_fiber_spawn.rb @@ -1,31 +1,28 @@ # -*- encoding: binary -*- require 'rainbows/fiber/rev' -module Rainbows +# A combination of the Rev and FiberSpawn models. This allows Ruby +# 1.9 Fiber-based concurrency for application processing while +# exposing a synchronous execution model and using scalable network +# concurrency provided by Rev. A "rack.input" is exposed as well +# being Sunshowers-compatible. Applications are strongly advised to +# wrap all slow IO objects (sockets, pipes) using the +# Rainbows::Fiber::IO or a Rev-compatible class whenever possible. +module Rainbows::RevFiberSpawn - # A combination of the Rev and FiberSpawn models. This allows Ruby - # 1.9 Fiber-based concurrency for application processing while - # exposing a synchronous execution model and using scalable network - # concurrency provided by Rev. A "rack.input" is exposed as well - # being Sunshowers-compatible. Applications are strongly advised to - # wrap all slow IO objects (sockets, pipes) using the - # Rainbows::Fiber::IO or a Rev-compatible class whenever possible. - module RevFiberSpawn + include Rainbows::Base + include Rainbows::Fiber::Rev - include Base - include Fiber::Rev - - def worker_loop(worker) # :nodoc: - Rainbows::Response.setup(Rainbows::Fiber::Rev::Server) - init_worker_process(worker) - Server.const_set(:MAX, @worker_connections) - Rainbows::Fiber::Base.setup(Rainbows::Fiber::Rev::Server, nil) - Server.const_set(:APP, G.server.app) - Heartbeat.new(1, true).attach(::Rev::Loop.default) - kato = Kato.new.attach(::Rev::Loop.default) - Rainbows::Fiber::Rev::Methods.const_set(:KATO, kato) - LISTENERS.map! { |s| Server.new(s).attach(::Rev::Loop.default) } - ::Rev::Loop.default.run - end + def worker_loop(worker) # :nodoc: + Rainbows::Response.setup(Server) + init_worker_process(worker) + Server.const_set(:MAX, @worker_connections) + Rainbows::Fiber::Base.setup(Server, nil) + Server.const_set(:APP, G.server.app) + Heartbeat.new(1, true).attach(Rev::Loop.default) + kato = Kato.new.attach(Rev::Loop.default) + Rainbows::Fiber::Rev::Methods.const_set(:KATO, kato) + LISTENERS.map! { |s| Server.new(s).attach(Rev::Loop.default) } + Rev::Loop.default.run end end -- cgit v1.2.3-24-ge0c7