diff options
author | Eric Wong <normalperson@yhbt.net> | 2009-11-07 12:23:26 -0800 |
---|---|---|
committer | Eric Wong <normalperson@yhbt.net> | 2009-11-07 12:30:47 -0800 |
commit | 7e35ea595f4742ace9579402323515031d69fc87 (patch) | |
tree | 006c5a6a57c159da466afddc5a7edfb086f6b1cf /lib/rainbows/rev.rb | |
parent | 1266417999aeb939d4e2a7d01aa6730f13cae9fa (diff) | |
download | rainbows-7e35ea595f4742ace9579402323515031d69fc87.tar.gz |
This will make things easier to manage with more Rev-based concurrency models.
Diffstat (limited to 'lib/rainbows/rev.rb')
-rw-r--r-- | lib/rainbows/rev.rb | 163 |
1 files changed, 4 insertions, 159 deletions
diff --git a/lib/rainbows/rev.rb b/lib/rainbows/rev.rb index 0d8b6c9..602545d 100644 --- a/lib/rainbows/rev.rb +++ b/lib/rainbows/rev.rb @@ -1,6 +1,7 @@ # -*- encoding: binary -*- -require 'rainbows/rev/heartbeat' -require 'rainbows/ev_core' +require 'rainbows/rev/core' +require 'rainbows/rev/client' +require 'rainbows/rev/deferred_response' module Rainbows @@ -23,162 +24,6 @@ module Rainbows # temporary file before the application is entered. module Rev - - include Base - - class Client < ::Rev::IO - include Rainbows::EvCore - G = Rainbows::G - - def initialize(io) - G.cur += 1 - super(io) - post_init - @deferred_bodies = [] # for (fast) regular files only - end - - # queued, optional response bodies, it should only be unpollable "fast" - # devices where read(2) is uninterruptable. Unfortunately, NFS and ilk - # are also part of this. We'll also stick DeferredResponse bodies in - # here to prevent connections from being closed on us. - def defer_body(io) - @deferred_bodies << io - on_write_complete unless @hp.headers? # triggers a write - end - - def app_call - begin - (@env[RACK_INPUT] = @input).rewind - @env[REMOTE_ADDR] = @remote_addr - response = APP.call(@env.update(RACK_DEFAULTS)) - alive = @hp.keepalive? && G.alive - out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if @hp.headers? - - DeferredResponse.write(self, response, out) - if alive - @env.clear - @hp.reset - @state = :headers - # keepalive requests are always body-less, so @input is unchanged - @hp.headers(@env, @buf) and next - else - quit - end - return - end while true - end - - def on_write_complete - if body = @deferred_bodies.first - return if DeferredResponse === body - begin - begin - write(body.sysread(CHUNK_SIZE)) - rescue EOFError # expected at file EOF - @deferred_bodies.shift - body.close - close if :close == @state && @deferred_bodies.empty? - end - rescue Object => e - handle_error(e) - end - else - close if :close == @state - end - end - - def on_close - G.cur -= 1 - end - end - - class Server < ::Rev::IO - G = Rainbows::G - - def on_readable - return if G.cur >= MAX - begin - Client.new(@_io.accept_nonblock).attach(::Rev::Loop.default) - rescue Errno::EAGAIN, Errno::ECONNABORTED - end - end - - end - - class DeferredResponse < ::Rev::IO - include Unicorn - include Rainbows::Const - G = Rainbows::G - - def self.defer!(client, response, out) - body = response.last - headers = Rack::Utils::HeaderHash.new(response[1]) - - # to_io is not part of the Rack spec, but make an exception - # here since we can't get here without checking to_path first - io = body.to_io if body.respond_to?(:to_io) - io ||= ::IO.new($1.to_i) if body.to_path =~ %r{\A/dev/fd/(\d+)\z} - io ||= File.open(body.to_path, 'rb') - st = io.stat - - if st.socket? || st.pipe? - do_chunk = !!(headers['Transfer-Encoding'] =~ %r{\Achunked\z}i) - do_chunk = false if headers.delete('X-Rainbows-Autochunk') == 'no' - # too tricky to support keepalive/pipelining when a response can - # take an indeterminate amount of time here. - if out.nil? - do_chunk = false - else - out[0] = CONN_CLOSE - end - - io = new(io, client, do_chunk, body).attach(::Rev::Loop.default) - elsif st.file? - headers.delete('Transfer-Encoding') - headers['Content-Length'] ||= st.size.to_s - else # char/block device, directory, whatever... nobody cares - return response - end - client.defer_body(io) - [ response.first, headers.to_hash, [] ] - end - - def self.write(client, response, out) - response.last.respond_to?(:to_path) and - response = defer!(client, response, out) - HttpResponse.write(client, response, out) - end - - def initialize(io, client, do_chunk, body) - super(io) - @client, @do_chunk, @body = client, do_chunk, body - end - - def on_read(data) - @do_chunk and @client.write(sprintf("%x\r\n", data.size)) - @client.write(data) - @do_chunk and @client.write("\r\n") - end - - def on_close - @do_chunk and @client.write("0\r\n\r\n") - @client.quit - @body.respond_to?(:close) and @body.close - end - end - - # runs inside each forked worker, this sits around and waits - # for connections and doesn't die until the parent dies (or is - # given a INT, QUIT, or TERM signal) - def worker_loop(worker) - init_worker_process(worker) - Client.const_set(:APP, G.server.app) - Server.const_set(:MAX, G.server.worker_connections) - rloop = ::Rev::Loop.default - Heartbeat.new(1, true).attach(rloop) - LISTENERS.map! { |s| Server.new(s).attach(rloop) } - rloop.run - end - + include Core end end |