1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
| | # -*- encoding: binary -*-
# read-only interface that can be a backend for MogileFS::MogileFS
#
# This provides direct, read-only access to any slave MySQL database to
# provide better performance, scalability and eliminate mogilefsd as a
# point of failure
class MogileFS::Mysql
attr_reader :my
attr_reader :query_method
##
# Creates a new MogileFS::Mysql instance. +args+ must include a key
# :domain specifying the domain of this client and :mysql, specifying
# an already-initialized Mysql object.
#
# The Mysql object can be either the standard Mysql driver or the
# Mysqlplus one supporting c_async_query.
def initialize(args = {})
@my = args[:mysql]
@query_method = @my.respond_to?(:c_async_query) ? :c_async_query : :query
@last_update_device = @last_update_domain = Time.at(0)
@cache_domain = @cache_device = nil
end
##
# Lists keys starting with +prefix+ follwing +after+ up to +limit+. If
# +after+ is nil the list starts at the beginning.
def _list_keys(domain, prefix = '', after = '', limit = 1000)
# this code is based on server/lib/MogileFS/Worker/Query.pm
dmid = get_dmid(domain)
# don't modify passed arguments
limit ||= 1000
limit = limit.to_i
limit = 1000 if limit > 1000 || limit <= 0
after, prefix = "#{after}", "#{prefix}"
if after.length > 0 && /^#{Regexp.quote(prefix)}/ !~ after
raise MogileFS::Backend::AfterMismatchError
end
raise MogileFS::Backend::InvalidCharsError if /[%\\]/ =~ prefix
prefix.gsub!(/_/, '\_') # not sure why MogileFS::Worker::Query does this...
sql = <<-EOS
SELECT dkey,length,devcount FROM file
WHERE dmid = #{dmid}
AND dkey LIKE '#{@my.quote(prefix)}%'
AND dkey > '#{@my.quote(after)}'
ORDER BY dkey LIMIT #{limit}
EOS
keys = []
query(sql).each do |dkey,length,devcount|
yield(dkey, length.to_i, devcount.to_i) if block_given?
keys << dkey
end
keys.empty? ? nil : [ keys, (keys.last || '') ]
end
##
# Returns the size of +key+.
def _size(domain, key)
dmid = get_dmid(domain)
sql = <<-EOS
SELECT length FROM file
WHERE dmid = #{dmid} AND dkey = '#{@my.quote(key)}'
LIMIT 1
EOS
res = query(sql).fetch_row
return res[0].to_i if res && res[0]
raise MogileFS::Backend::UnknownKeyError
end
##
# Get the paths for +key+.
def _get_paths(params = {})
zone = params[:zone]
noverify = (params[:noverify] == 1) # TODO this is unused atm
dmid = get_dmid(params[:domain])
devices = refresh_device or raise MogileFS::Backend::NoDevicesError
urls = []
sql = <<-EOS
SELECT fid FROM file
WHERE dmid = #{dmid} AND dkey = '#{@my.quote(params[:key])}'
LIMIT 1
EOS
res = query(sql).fetch_row
res && res[0] or raise MogileFS::Backend::UnknownKeyError
fid = res[0]
sql = "SELECT devid FROM file_on WHERE fid = '#{@my.quote(fid)}'"
query(sql).each do |devid,|
unless devinfo = devices[devid.to_i]
devices = refresh_device(true)
devinfo = devices[devid.to_i] or next
end
devinfo[:readable] or next
port = devinfo[:http_get_port]
host = zone && zone == 'alt' ? devinfo[:altip] : devinfo[:hostip]
nfid = '%010u' % fid
b, mmm, ttt = /(\d)(\d{3})(\d{3})(?:\d{3})/.match(nfid)[1..3]
uri = "/dev#{devid}/#{b}/#{mmm}/#{ttt}/#{nfid}.fid"
urls << "http://#{host}:#{port}#{uri}"
end
urls
end
def sleep(params); Kernel.sleep(params[:duration] || 10); {}; end
private
unless defined? GET_DEVICES
GET_DOMAINS = 'SELECT dmid,namespace FROM domain'.freeze
GET_DEVICES = <<-EOS
SELECT d.devid, h.hostip, h.altip, h.http_port, h.http_get_port,
d.status, h.status
FROM device d
LEFT JOIN host h ON d.hostid = h.hostid
EOS
GET_DEVICES.freeze
end
def query(sql)
@my.send(@query_method, sql)
end
DEV_STATUS_READABLE = {
"alive" => true,
"readonly" => true,
"drain" => true,
}.freeze
def refresh_device(force = false)
return @cache_device if ! force && ((Time.now - @last_update_device) < 60)
tmp = {}
res = query(GET_DEVICES)
res.each do |devid, hostip, altip, http_port, http_get_port,
dev_status, host_status|
http_port = http_port ? http_port.to_i : 80
tmp[devid.to_i] = {
:hostip => hostip.freeze,
:altip => (altip || hostip).freeze,
:readable => (host_status == "alive" &&
DEV_STATUS_READABLE.include?(dev_status)),
:http_port => http_port,
:http_get_port => http_get_port ? http_get_port.to_i : http_port,
}.freeze
end
@last_update_device = Time.now
@cache_device = tmp.freeze
end
def refresh_domain(force = false)
return @cache_domain if ! force && ((Time.now - @last_update_domain) < 5)
tmp = {}
res = query(GET_DOMAINS)
res.each { |dmid,namespace| tmp[namespace] = dmid.to_i }
@last_update_domain = Time.now
@cache_domain = tmp.freeze
end
def get_dmid(domain)
refresh_domain[domain] || refresh_domain(true)[domain] or \
raise MogileFS::Backend::DomainNotFoundError, domain
end
end
|