From 86e9c7013308d77def5fe41b52a35dea60c7361c Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Mon, 28 Jun 2010 06:29:54 +0000 Subject: fiber/base: reuse process_client logic in base This fleshes out Rainbows::Fiber::IO with a few more methods for people using it. --- lib/rainbows/base.rb | 6 +++++- lib/rainbows/fiber/base.rb | 51 ++++++++++++++++++---------------------------- lib/rainbows/fiber/io.rb | 18 ++++++++++++++++ 3 files changed, 43 insertions(+), 32 deletions(-) (limited to 'lib/rainbows') diff --git a/lib/rainbows/base.rb b/lib/rainbows/base.rb index 435c818..2627719 100644 --- a/lib/rainbows/base.rb +++ b/lib/rainbows/base.rb @@ -59,6 +59,10 @@ module Rainbows::Base module_function :write_body + def wait_headers_readable(client) + IO.select([client], nil, nil, G.kato) + end + # once a client is accepted, it is processed in its entirety here # in 3 easy steps: read request, call app, write app response # this is used by synchronous concurrency models @@ -72,7 +76,7 @@ module Rainbows::Base begin # loop until hp.headers(env, buf) - IO.select([client], nil, nil, G.kato) or return + wait_headers_readable(client) or return buf << client.readpartial(CHUNK_SIZE) end diff --git a/lib/rainbows/fiber/base.rb b/lib/rainbows/fiber/base.rb index e0be912..0298948 100644 --- a/lib/rainbows/fiber/base.rb +++ b/lib/rainbows/fiber/base.rb @@ -72,42 +72,31 @@ module Rainbows max.nil? || max > (now + 1) ? 1 : max - now end - def process_client(client) - G.cur += 1 - io = client.to_io - buf = client.read_timeout or return - hp = HttpParser.new - env = {} - alive = true - remote_addr = Rainbows.addr(io) - - begin # loop - until hp.headers(env, buf) - buf << (client.read_timeout or return) - end - - env[CLIENT_IO] = client - env[RACK_INPUT] = 0 == hp.content_length ? - NULL_IO : TeeInput.new(client, env, hp, buf) - env[REMOTE_ADDR] = remote_addr - response = APP.call(env.update(RACK_DEFAULTS)) + def write_body(client, body) + body.each { |chunk| client.write(chunk) } + ensure + body.respond_to?(:close) and body.close + end - if 100 == response[0].to_i - client.write(EXPECT_100_RESPONSE) - env.delete(HTTP_EXPECT) - response = APP.call(env) - end + def wait_headers_readable(client) + io = client.to_io + expire = nil + begin + return io.recv_nonblock(1, Socket::MSG_PEEK) + rescue Errno::EAGAIN + return if expire && expire < Time.now + expire ||= Time.now + G.kato + client.wait_readable + retry + end + end - alive = hp.keepalive? && G.alive - out = [ alive ? CONN_ALIVE : CONN_CLOSE ] if hp.headers? - HttpResponse.write(client, response, out) - end while alive and hp.reset.nil? and env.clear - rescue => e - Error.write(io, e) + def process_client(client) + G.cur += 1 + super(client) # see Rainbows::Base ensure G.cur -= 1 ZZ.delete(client.f) - client.close end end diff --git a/lib/rainbows/fiber/io.rb b/lib/rainbows/fiber/io.rb index 5f925ca..d4f2512 100644 --- a/lib/rainbows/fiber/io.rb +++ b/lib/rainbows/fiber/io.rb @@ -8,6 +8,20 @@ module Rainbows # the underlying IO object cannot read or write class IO < Struct.new(:to_io, :f) + # :stopdoc: + LOCALHOST = Unicorn::HttpRequest::LOCALHOST + + # needed to write errors with + def write_nonblock(buf) + to_io.write_nonblock(buf) + end + + # enough for Rainbows.addr + def peeraddr + to_io.respond_to?(:peeraddr) ? to_io.peeraddr : [ LOCALHOST ] + end + # :stopdoc: + # for wrapping output response bodies def each(&block) begin @@ -24,6 +38,10 @@ module Rainbows to_io.close unless to_io.closed? end + def closed? + to_io.closed? + end + def wait_readable fileno = to_io.fileno RD[fileno] = self -- cgit v1.2.3-24-ge0c7