1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
| | # -*- encoding: binary -*-
# here are internal implementation details, do not rely on them in your code
require 'net/http'
require 'mogilefs/new_file/writer'
# an IO-like object
class MogileFS::NewFile::ContentRange
include MogileFS::NewFile::Writer
include MogileFS::NewFile::Common
attr_reader :md5
# :stopdoc:
begin
require 'net/http/persistent'
NHP = Net::HTTP::Persistent.new('mogilefs')
def hit(uri, req)
NHP.request(uri, req).value
end
rescue LoadError
def hit(uri, req)
Net::HTTP.start(uri.host, uri.port) { |h| h.request(req).value }
end
end
# :startdoc:
def initialize(dests, opts) # :nodoc:
@dests = dests
@opts = opts
@devid = @uri = @md5 = nil
@bytes_uploaded = 0
@errors = []
end
def get_dest # :nodoc:
return [ @devid, @uri ] if @uri
rv = @dests.shift or no_nodes!
rv[1] = URI.parse(rv[1])
rv
end
def no_nodes! # :nodoc:
raise NoStorageNodesError,
"all paths failed with PUT: #{@errors.join(', ')}", []
end
def request_for(uri, buf) # :nodoc:
put = Net::HTTP::Put.new(uri.path)
put["Content-Type"] = "application/octet-stream"
put["Content-MD5"] = [ Digest::MD5.digest(buf) ].pack("m").chomp!
if @bytes_uploaded > 0
last_byte = @bytes_uploaded + buf.bytesize - 1
put["Content-Range"] = "bytes #@bytes_uploaded-#{last_byte}/*"
end
put.body = buf
put
end
# see IO#write
def write(buf)
buf = String buf
len = buf.bytesize
return 0 if 0 == len
devid, uri = get_dest
put = request_for(uri, buf)
begin
hit(uri, put) # raises on error
rescue => e
raise if @bytes_uploaded > 0
# nothing uploaded, try another dest
@errors << "#{uri.to_s} - #{e.message} (#{e.class})"
devid, uri = get_dest
put = request_for(uri, buf)
retry
end
@uri, @devid = uri, devid if 0 == @bytes_uploaded
@bytes_uploaded += len
len
end
# called on close, do not use
def commit # :nodoc:
zero_byte_special if @bytes_uploaded == 0
create_close(@devid, @uri, @bytes_uploaded)
end
# special case for zero-byte files :<
def zero_byte_special # :nodoc:
@devid, @uri = get_dest
put = request_for(@uri, "")
begin
hit(@uri, put) # raises on error
rescue => e
@errors << "#{@uri.to_s} - #{e.message} (#{e.class})"
@devid, @uri = get_dest
put = request_for(@uri, "")
retry
end
end
end
|