From f309cfaf70cbffd7a39208da869e47784e4cb41b Mon Sep 17 00:00:00 2001 From: Eric Wong Date: Wed, 28 Jul 2010 11:20:46 +0000 Subject: event_machine: better handling of staggered pipelines Since TCP sockets stream, HTTP requests do not come in at well-defined boundaries and it's possible for pipelined requests to come in in a staggered form. We need to ensure our receive_data callback doesn't fire any actions at all while responding with a deferrable @body. We still need to be careful about buffering, since EM does not appear to allow temporarily disabling read events (without pausing writes), so we shutdown the read end of the socket if it reaches a maximum header size limit. --- lib/rainbows/event_machine.rb | 27 +++++++++++++++++---------- t/t0024-pipelined-sendfile-response.sh | 34 ++++++++++++++++++++++++++++++++-- 2 files changed, 49 insertions(+), 12 deletions(-) diff --git a/lib/rainbows/event_machine.rb b/lib/rainbows/event_machine.rb index 7fe9864..757817d 100644 --- a/lib/rainbows/event_machine.rb +++ b/lib/rainbows/event_machine.rb @@ -62,7 +62,19 @@ module Rainbows end alias write send_data - alias receive_data on_read + + def receive_data(data) + # To avoid clobbering the current streaming response + # (often a static file), we do not attempt to process another + # request on the same connection until the first is complete + if @body + @buf << data + @_io.shutdown(Socket::SHUT_RD) if @buf.size > 0x1c000 + return EM.next_tick { receive_data('') } + else + on_read(data) + end + end def quit super @@ -70,11 +82,6 @@ module Rainbows end def app_call - # To avoid clobbering the current streaming response - # (often a static file), we do not attempt to process another - # request on the same connection until the first is complete - return EM.next_tick { app_call } if @body - set_comm_inactivity_timeout 0 @env[RACK_INPUT] = @input @env[REMOTE_ADDR] = @remote_addr @@ -93,10 +100,10 @@ module Rainbows @env.clear @hp.reset @state = :headers - if @body.nil? && @hp.headers(@env, @buf) - EM.next_tick { on_read('') } - else + if @buf.empty? set_comm_inactivity_timeout(G.kato) + else + EM.next_tick { receive_data('') } end end end @@ -130,7 +137,7 @@ module Rainbows @body.callback do body.close if body.respond_to?(:close) @body = nil - alive ? on_read('') : quit + alive ? receive_data('') : quit end return elsif st.socket? || st.pipe? diff --git a/t/t0024-pipelined-sendfile-response.sh b/t/t0024-pipelined-sendfile-response.sh index b0f5d56..9111ce9 100755 --- a/t/t0024-pipelined-sendfile-response.sh +++ b/t/t0024-pipelined-sendfile-response.sh @@ -1,10 +1,10 @@ #!/bin/sh . ./test-lib.sh -t_plan 5 "pipelined sendfile response for $model" +t_plan 6 "pipelined sendfile response for $model" t_begin "setup and startup" && { - rtmpfiles err out + rtmpfiles err out dd_fifo rainbows_setup $model echo 'require "sendfile"' >> $unicorn_config echo 'def (::IO).copy_stream(*x); abort "NO"; end' >> $unicorn_config @@ -36,7 +36,37 @@ end $stdout.syswrite("ok\n") ' +t_begin "staggered pipeline of 3 HTTP requests" && { + req='GET /random_blob HTTP/1.1\r\nHost: example.com\r\n' + rm -f $ok + ( + export random_blob_sha1 + $RUBY -e "$script" < $fifo >> $ok & + printf "$req"'X-Req:0\r\n\r\n' + exec 6>&1 + ( + dd bs=16384 count=1 + printf "$req" >&6 + dd bs=16384 count=1 + printf 'X-Req:1\r\n\r\n' >&6 + dd bs=16384 count=1 + printf "$req" >&6 + dd bs=16384 count=1 + printf 'X-Req:2\r\n' >&6 + dd bs=16384 count=1 + printf 'Connection: close\r\n\r' >&6 + dd bs=16384 count=1 + printf '\n' >&6 + cat + ) < $dd_fifo > $fifo & + wait + echo ok >> $ok + ) | socat - TCP:$listen > $dd_fifo + test 2 -eq $(grep '^ok$' $ok |wc -l) +} + t_begin "pipeline 3 HTTP requests" && { + rm -f $ok req='GET /random_blob HTTP/1.1\r\nHost: example.com\r\n' req="$req"'\r\n'"$req"'\r\n'"$req" req="$req"'Connection: close\r\n\r\n' -- cgit v1.2.3-24-ge0c7