diff options
author | Eric Wong <normalperson@yhbt.net> | 2011-11-08 00:52:30 +0000 |
---|---|---|
committer | Eric Wong <normalperson@yhbt.net> | 2011-11-08 00:52:30 +0000 |
commit | e814b15f1386e2ea53bc018aaa92aecb147400ae (patch) | |
tree | 2e023b0d011d699a31f5aa217d5e03c670d6afd5 /lib/mogilefs/http_file.rb | |
parent | 9bcff8f08dc084b880ef978c890a2706a1bbb304 (diff) | |
download | mogilefs-client-e814b15f1386e2ea53bc018aaa92aecb147400ae.tar.gz |
If a user tries to pipe something to us and we can't rewind on failure, propagate that error all the way up to avoid risking a corrupted upload.
Diffstat (limited to 'lib/mogilefs/http_file.rb')
-rw-r--r-- | lib/mogilefs/http_file.rb | 54 |
1 files changed, 35 insertions, 19 deletions
diff --git a/lib/mogilefs/http_file.rb b/lib/mogilefs/http_file.rb index 0ba895e..ac836e4 100644 --- a/lib/mogilefs/http_file.rb +++ b/lib/mogilefs/http_file.rb @@ -18,6 +18,7 @@ class MogileFS::HTTPFile < StringIO class NoStorageNodesError < MogileFS::Error def message; 'Unable to open socket to storage node'; end end + class NonRetryableError < MogileFS::Error; end ## # The URI this file will be stored to. @@ -39,27 +40,46 @@ class MogileFS::HTTPFile < StringIO def initialize(dests, content_length) super "" - @streaming_io = @big_io = @uri = @devid = nil + @streaming_io = @big_io = @uri = @devid = @active = nil @dests = dests - @tried = {} end - def request_put(sock, uri, file_size) + def request_put(sock, uri, file_size, input = nil) if file_size sock.write("PUT #{uri.request_uri} HTTP/1.0\r\n" \ "Content-Length: #{file_size}\r\n\r\n") - yield sock + input ? MogileFS::X.copy_stream(@active = input, sock) : yield(sock) else sock.write("PUT #{uri.request_uri} HTTP/1.1\r\n" \ "Host: #{uri.host}:#{uri.port}\r\n" \ "Transfer-Encoding: chunked\r\n\r\n") tmp = MogileFS::Chunker.new(sock) - rv = yield tmp + rv = input ? MogileFS::X.copy_stream(@active = input, tmp) : yield(tmp) tmp.flush rv end end + def put_streaming_io(sock, uri) # unlikely to be used + file_size = @streaming_io.length + written = 0 + request_put(sock, uri, file_size) do |wr| + @streaming_io.call(Proc.new do |data_to_write| + written += wr.write(data_to_write) + end) + end + file_size ? file_size : written + end + + def rewind_or_raise!(uri, err) + @active.rewind if @active + rescue => e + msg = "#{uri} failed with #{err.message} (#{err.class}) and " \ + "retrying is impossible as rewind on " \ + "#{@active.inspect} failed with: #{e.message} (#{e.class})" + raise NonRetryableError, msg, e.backtrace + end + ## # Writes an HTTP PUT request to +sock+ to upload the file and # returns file size if the socket finished writing @@ -68,31 +88,22 @@ class MogileFS::HTTPFile < StringIO file_size = length if @streaming_io - file_size = @streaming_io.length - written = 0 - request_put(sock, uri, file_size) do |wr| - @streaming_io.call(Proc.new do |data_to_write| - written += wr.write(data_to_write) - end) - end - file_size = written if file_size.nil? + file_size = put_streaming_io(sock, uri) elsif @big_io if String === @big_io || @big_io.respond_to?(:to_path) File.open(@big_io) do |rd| stat = rd.stat - request_put(sock, uri, stat.file? ? stat.size : nil) do |wr| - file_size = MogileFS::X.copy_stream(rd, wr) - end + file_size = request_put(sock, uri, stat.file? ? stat.size : nil, rd) end else size = nil if @big_io.respond_to?(:stat) stat = @big_io.stat size = stat.size if stat.file? + elsif @big_io.respond_to?(:size) + size = @big_io.size end - request_put(sock, uri, size) do |wr| - file_size = MogileFS::X.copy_stream(@big_io, wr) - end + file_size = request_put(sock, uri, size, @big_io) end else sock.write("PUT #{uri.request_uri} HTTP/1.0\r\n" \ @@ -109,6 +120,9 @@ class MogileFS::HTTPFile < StringIO else raise UnparseableResponseError, "Response line not understood: #{line}" end + rescue => err + rewind_or_raise!(uri, err) + raise ensure sock.close if sock end @@ -121,6 +135,8 @@ class MogileFS::HTTPFile < StringIO bytes_uploaded = upload(devid, uri) @devid, @uri = devid, uri return bytes_uploaded + rescue NonRetryableError + raise rescue => e errors ||= [] errors << "#{path} failed with #{e.message} (#{e.class})" |