1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
| | # -*- encoding: binary -*-
# :stopdoc:
# Copyright (C) 2008-2012, Eric Wong <normalperson@yhbt.net>
# License: AGPLv3 or later (https://www.gnu.org/licenses/agpl-3.0.txt)
# :startdoc:
require "omgf/pool"
require "omgf/verify_paths"
require "time"
require "json"
require "mogilefs"
require "rack"
require "rack/utils"
require "rack/mime"
# Basic REST interface. This is bug-for-bug compatible with an old app
# that's been deployed for years in a private LAN.
#
# This started out as a WORM (write-once, read-many) system, but
# eventually gained the ability to handle deletes.
#
# This was meant to behave like the MogileFS protocol somewhat (but is
# pure-HTTP), so it redirects to the storage nodes and bypasses Ruby
# for bulk I/O when retrieving files. PUTs (writes) still go through
# Ruby, however.
class OMGF::HystericalRaisins
include OMGF::Pool
def initialize(opts)
@default_class_cb = opts[:default_class_cb] || {}
mg_opts = {
domain: "any",
hosts: opts[:hosts],
fail_timeout: opts[:fail_timeout] || 0.5,
# high defaults because of slow seeks on storage nodes (for size verify)
timeout: opts[:timeout] || 30,
get_file_data_timeout: opts[:get_file_data_timeout] || 30,
}
@reproxy_header = opts[:reproxy_header] || "HTTP_X_OMGF_REPROXY"
@reproxy_path = opts[:reproxy_path]
@get_paths_opts = {
noverify: opts[:noverify],
pathcount: opts[:pathcount] || 0x7fffffff,
}
@new_file_opts = {
content_md5: opts[:content_md5], # :trailer is acceptable here
# largefile: auto-selects based on env["CONTENT_LENGTH"]
largefile: opts[:largefile] || :stream,
}
@put_overwrite_header = opts[:put_overwrite_header] || "HTTP_X_OMGF_FORCE"
@vp = OMGF::VerifyPaths.new(opts[:logger])
@verify_timeout = opts[:verify_timeout] || 0.5
pool_init(mg_opts)
# we may use regurgitator for reads
if db = opts[:db]
require 'omgf/regurgitator'
extend OMGF::Regurgitator
regurgitator_init(db)
end
end
# The entry point for Rack
def call(env)
case env["REQUEST_METHOD"]
when "GET"
get(env)
when "HEAD"
head(env)
when "PUT"
put(env)
when "DELETE"
delete(env)
else
r(405)
end
rescue MogileFS::Backend::UnknownKeyError,
MogileFS::Backend::DomainNotFoundError,
MogileFS::Backend::UnregDomainError
r(404, "")
rescue => e
logger = env["rack.logger"]
logger.error "#{e.message} (#{e.class})"
e.backtrace.each { |line| logger.error(line) }
r(500, "")
end
# GET /$DOMAIN?prefix=foo - list keys
# GET /$DOMAIN/$KEY - redirects to FIDs on storage nodes
def get(env)
case env["PATH_INFO"].squeeze("/")
when %r{\A/([^/]+)/(.+)\z} # /$DOMAIN/$KEY
redirect_key(env, $1, $2)
when %r{\A/([^/]+)/?\z} # /$DOMAIN
get_keys(env, $1)
when "/"
r(200, "")
else
r(404, "")
end
end
# returns metadata for a given domain/key
def head(env)
case env["PATH_INFO"].squeeze("/")
when %r{\A/([^/]+)/(.+)\z} # HEAD /$DOMAIN/$KEY
stat_key(env, $1, $2)
else
# pass on headers from listing results
status, headers, _ = get(env)
[ status, headers, [] ]
end
end
# HEAD /$DOMAIN/$KEY
def stat_key(env, domain, key)
size, uris = mg_size_and_uris(env, domain, key, @get_paths_opts)
uris = @vp.verify(uris, 1, @verify_timeout).flatten!
return r(503, "") unless uris && uris[0]
h = { "Content-Length" => size.to_s }
fn = filename(h, query(env)) || key
h["Content-Type"] = key_mime_type(fn)
unless reproxy?(env, key, h, uris[0].to_s)
uris.each_with_index { |uri,i| h["X-Url-#{i}"] = uri.to_s }
end
[ 200, h, [] ]
end
# GET /$DOMAIN/$KEY
def redirect_key(env, domain, key)
uris = mg_get_uris(env, domain, key, @get_paths_opts)
uris = @vp.verify(uris, 1, @verify_timeout).flatten!
return r(503, "") unless uris && dest = uris.shift
location = dest.to_s
h = {
'Content-Length' => '0',
'Location' => location,
'Content-Type' => 'text/html'
}
unless reproxy?(env, key, h, location)
uris.each_with_index { |uri,i| h["X-Alt-Location-#{i}"] = uri.to_s }
end
[ 302, h, [] ]
end
# GET /$DOMAIN/
def get_keys(env, domain) # :nodoc:
params = query(env)
prefix = params["prefix"] || ""
after = params["after"]
limit = (params["limit"] || 1000).to_i
case env["HTTP_ACCEPT"]
when "application/json"
tmp = []
h = { "Content-Type" => "application/json" }
mg_list_keys(env, domain, prefix, after, limit) { |*x| tmp << x }
tmp = tmp.to_json
else
tmp = ""
h = { "Content-Type" => "text/plain" }
mg_list_keys(env, domain, prefix, after, limit) do |dkey,length,devcount|
tmp << "#{dkey}|#{length}|#{devcount}\n"
end
end
h["Content-Length"] = tmp.size.to_s
[ 200, h, [ tmp ] ]
rescue MogileFS::Backend::UnregDomainError,
MogileFS::Backend::DomainNotFoundError
r(404, "Domain not found")
end
# returns a plain-text HTTP response
def r(code, msg = nil, env = nil) # :nodoc:
if env && logger = env["rack.logger"]
logger.warn("#{env['REQUEST_METHOD']} #{env['PATH_INFO']} " \
"#{code} #{msg.inspect}")
end
if Rack::Utils::STATUS_WITH_NO_ENTITY_BODY.include?(code)
[ code, {}, [] ]
else
msg ||= Rack::Utils::HTTP_STATUS_CODES[code] || ""
if msg.size > 0
# using += to not modify original string (owned by Rack)
msg += "\n"
end
[ code,
{ 'Content-Type' => 'text/plain', 'Content-Length' => msg.size.to_s },
[ msg ] ]
end
end
# returns the mime type based on key name
def key_mime_type(key) # :nodoc:
/\.([^.]+)\z/ =~ key
Rack::Mime.mime_type($1) # nil => 'application/octet-stream'
end
# sets Content-Disposition response header if requested by the user
# in a query string (via "inline" or "attachment" param)
# Returns requested filename, or nil if invalid
def filename(h, params) # :nodoc:
if fn = params['inline'] and Rack::Utils.escape(fn) == fn
h['Content-Disposition'] = "inline; filename=#{fn}"
return fn
elsif fn = (params['attachment'] || params['filename']) and
Rack::Utils.escape(fn) == fn
h['Content-Disposition'] = "attachment; filename=#{fn}"
return fn
end
end
# This relies on nginx to reproxy paths for us
def reproxy?(env, key, h, location) # :nodoc:
return false unless @reproxy_path && env[@reproxy_header].to_i != 0
%r{/0*(\d+)\.fid\z} =~ location
fid = $1
h['ETag'] = %("#{fid}")
fn = filename(h, query(env)) || key
case env["REQUEST_METHOD"]
when "GET"
# Fake a Last-Modified time (enough to bust caches)
h['X-Redirect-Last-Modified'] = Time.at(fid.to_i).httpdate
# yes we violate Rack::Lint here, no Content-Type or Content-Length
h.delete("Content-Length")
h.delete("Content-Type")
h['X-Redirect-Content-Type'] = key_mime_type(fn)
h['X-Accel-Redirect'] = @reproxy_path
when "HEAD"
# Fake a Last-Modified time (enough to bust caches)
h['Last-Modified'] = Time.at(fid.to_i).httpdate
else
raise "BUG: bad request method #{env["REQUEST_METHOD"]}"
end
true
end
# PUT /domain/key
def put(env)
case env["PATH_INFO"]
when %r{\A/([^/]+)/(.+)\z} # /$DOMAIN/$KEY
put_key(env, $1, $2)
else
r(404, "")
end
end
# we're very picky about keys, we only allow keys which won't cause
# ambiguity when passed through URI escapers/normalizers.
def bad_key?(key) # :nodoc:
%r{\A[\w./-]+\z} !~ key ||
%r{//} =~ key ||
%r{[./]} =~ "#{key[0]}#{key[-1]}"
end
# PUT /$DOMAIN/$KEY
def put_key(env, domain, key)
return r(403, "") if env[@reproxy_header].to_i != 0
return r(406, "key `#{key}' is not URI-friendly") if bad_key?(key)
return r(406, "key is too long") if key.size > 128
clen = env["CONTENT_LENGTH"]
# this was written before MogileFS supported empty files,
# but empty files waste DB space so we don't support them
# Not bothering with Transfer-Encoding: chunked, though...
return r(403, "empty files forbidden") if "0" == clen
params = query(env)
input = env["rack.input"]
paths = nil
retried = false
# prepare options for create_open/create_close:
new_file_opts = @new_file_opts.dup
# the original deployment of this created a class for every
# domain with the same class having the same name as the domain
new_file_opts[:class] = params['class'] || @default_class_cb[domain]
# try to give a Content-Length to the tracker
clen and new_file_opts[:content_length] = clen.to_i
if /\bContent-MD5\b/i =~ env["HTTP_TRAILER"]
# if the client will give the Content-MD5 as the trailer,
# we must lazily populate it since we're not guaranteed to
# have the trailer, yet (rack.input is lazily read on unicorn)
new_file_opts[:content_md5] = lambda { env["HTTP_CONTENT_MD5"] }
elsif cmd5 = env["HTTP_CONTENT_MD5"]
# maybe the client gave the Content-MD5 in the header
new_file_opts[:content_md5] = cmd5
end
begin
pool_use(domain) do |mg|
begin
# TOCTOU issue, but probably not worth worrying about
# Nothing we can do about it without explicit MogileFS support
# or a 3rd-party locking daemon
paths = mg.get_paths(key)
if paths && paths[0]
# overwriting existing files is not permitted by default
if env[@put_overwrite_header] != "true"
# show the existing paths in response
return r(403, paths.join("\n"))
end
end
rescue MogileFS::Backend::UnknownKeyError
# good, not clobbering anything
end
# finally, upload the file
mg.new_file(key, new_file_opts) do |io|
IO.copy_stream(input, io)
end
end # pool_use
# should always return 201 if ! found, but we keep 200 for legacy
# compat if they're not logged in (via REMOTE_USER)
status = paths ? 204 : (env["REMOTE_USER"] ? 201 : 200)
r(status, "")
rescue MogileFS::Backend::UnregDomainError,
MogileFS::Backend::DomainNotFoundError
r(406, "Invalid domain: #{domain}")
rescue => e
if retried == false && input.respond_to?(:rewind)
begin
retried = true
input.rewind # may raise on future perfectly compliant Rack servers
env["rack.logger"].warn("#{e.message} (#{e.class})")
retry
rescue
end
end
raise
end
end
# DELETE /domain/key
def delete(env)
case env["PATH_INFO"]
when %r{\A/([^/]+)/(.+)\z} # /$DOMAIN/$KEY
delete_key(env, $1, $2)
else
r(404, "")
end
end
# DELETE /domain/key
def delete_key(env, domain, key)
pool_use(domain) { |mg| mg.delete(key) }
r(204, "")
rescue MogileFS::Backend::UnregDomainError,
MogileFS::Backend::DomainNotFoundError
r(406, "Invalid domain: #{domain}")
rescue MogileFS::Backend::UnknownKeyError
r(404, "")
end
def query(env)
Rack::Utils.parse_query(env["QUERY_STRING"])
end
end
|