1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
| | # -*- encoding: binary -*-
# here are internal implementation details, do not use them in your code
require 'stringio'
require 'mogilefs/new_file'
##
# HTTPFile wraps up the new file operations for storing files onto an HTTP
# storage node.
#
# You really don't want to create an HTTPFile by hand. Instead you want to
# create a new file using MogileFS::MogileFS.new_file.
#
class MogileFS::HTTPFile < StringIO
include MogileFS::NewFile::Common
##
# The big_io name in case we have file > 256M
attr_accessor :big_io
attr_accessor :streaming_io
##
# Creates a new HTTPFile with MogileFS-specific data. Use
# MogileFS::MogileFS#new_file instead of this method.
def initialize(dests, opts = nil)
super ""
@md5 = @streaming_io = @big_io = @active = nil
@dests = dests
@opts = Integer === opts ? { :content_length => opts } : opts
end
def request_put(sock, uri, file_size, input = nil)
host_with_port = "#{uri.host}:#{uri.port}"
clen = @opts[:content_length]
file_size ||= clen
content_md5 = @opts[:content_md5]
if String === content_md5
file_size or
raise ArgumentError,
":content_length must be specified with :content_md5 String"
file_size = "#{file_size}\r\nContent-MD5: #{content_md5}"
elsif content_md5.respond_to?(:call) ||
:trailer == content_md5 ||
MD5_TRAILER_NODES[host_with_port]
file_size = nil
@md5 = Digest::MD5.new
end
if file_size
sock.write("PUT #{uri.request_uri} HTTP/1.0\r\n" \
"Content-Length: #{file_size}\r\n\r\n")
rv = input ? MogileFS.io.copy_stream(@active = input, sock) : yield(sock)
else
trailers = @md5 ? "Trailer: Content-MD5\r\n".freeze : "".freeze
sock.write("PUT #{uri.request_uri} HTTP/1.1\r\n" \
"Host: #{host_with_port}\r\n#{trailers}" \
"Transfer-Encoding: chunked\r\n\r\n")
tmp = MogileFS::Chunker.new(sock, @md5, content_md5)
rv = input ? MogileFS.io.copy_stream(@active = input, tmp) : yield(tmp)
tmp.flush
end
if clen && clen != rv
raise MogileFS::SizeMismatchError,
":content_length expected: #{clen.inspect}, actual: #{rv.inspect}"
end
rv
end
def put_streaming_io(sock, uri) # unlikely to be used
file_size = @streaming_io.length
written = 0
request_put(sock, uri, file_size) do |wr|
@streaming_io.call(Proc.new do |data_to_write|
written += wr.write(data_to_write)
end)
end
file_size ? file_size : written
end
def rewind_or_raise!(uri, err)
@active.rewind if @active
rescue => e
msg = "#{uri} failed with #{err.message} (#{err.class}) and " \
"retrying is impossible as rewind on " \
"#{@active.inspect} failed with: #{e.message} (#{e.class})"
raise NonRetryableError, msg, e.backtrace
end
##
# Writes an HTTP PUT request to +sock+ to upload the file and
# returns file size if the socket finished writing
def upload(devid, uri) # :nodoc:
sock = MogileFS::Socket.tcp(uri.host, uri.port)
file_size = length
if @streaming_io
file_size = put_streaming_io(sock, uri)
elsif @big_io
stat = file = size = nil
if @big_io.respond_to?(:stat)
stat = @big_io.stat
elsif String === @big_io || @big_io.respond_to?(:to_path)
begin
file = File.open(@big_io)
rescue => e
msg = "Failed to open input (#{@big_io.inspect}): " \
"#{e.message} (#{e.class})"
raise NonRetryableError, msg, e.backtrace
end
stat = file.stat
elsif @big_io.respond_to?(:size)
size = @big_io.size
end
if stat && stat.file?
size ||= stat.size
file ||= @big_io.to_io if @big_io.respond_to?(:to_io)
end
file_size = request_put(sock, uri, size, file || @big_io)
else
rewind
request_put(sock, uri, file_size, self)
end
read_response(sock) # raises on errors
file_size
rescue SystemCallError, RetryableError => err
rewind_or_raise!(uri, err)
raise
ensure
file.close if file && @big_io != file
sock.close if sock
end
def nhp_put(devid, uri)
clen = @opts[:content_length]
if clen && clen != size
raise MogileFS::SizeMismatchError,
":content_length expected: #{clen.inspect}, actual: #{size}"
end
put = Net::HTTP::Put.new(uri.path)
put["Content-Type".freeze] = "application/octet-stream".freeze
if md5 = @opts[:content_md5]
if md5.respond_to?(:call)
md5 = md5.call.strip
elsif md5 == :trailer
md5 = [ Digest::MD5.digest(string) ].pack("m".freeze).chomp!
end
put["Content-MD5".freeze] = md5
end
put.body = string
res = @opts[:nhp_put].request(uri, put)
return size if Net::HTTPSuccess === res
raise BadResponseError, "#{res.code} #{res.message}"
rescue => e
/\ANet::/ =~ "#{e.class}" and
raise RetryableError, "#{e.message} (#{e.class})", e.backtrace
raise
end
def commit
errors = nil
@dests.each do |devid, path|
begin
uri = URI.parse(path)
bytes_uploaded = size > 0 ? nhp_put(devid, uri) : upload(devid, uri)
return create_close(devid, uri, bytes_uploaded)
rescue Timeout::Error, SystemCallError, RetryableError => e
errors ||= []
errors << "#{path} - #{e.message} (#{e.class})"
end
end
raise NoStorageNodesError,
"all paths failed with PUT: #{errors.join(', ')}", []
end
def close
commit
super
end
end
|