omgf.git  about / heads / tags
Unnamed repository; edit this file 'description' to name the repository.
blob 604e454e1a6968c4866adfbeaf0a885805406eba 11392 bytes (raw)
$ git show HEAD:lib/omgf/hysterical_raisins.rb	# shows this blob on the CLI

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
 
# -*- encoding: binary -*-
# :stopdoc:
# Copyright (C) 2008-2012, Eric Wong <normalperson@yhbt.net>
# License: AGPLv3 or later (https://www.gnu.org/licenses/agpl-3.0.txt)
# :startdoc:
require "omgf/pool"
require "omgf/verify_paths"
require "time"
require "json"
require "mogilefs"
require "rack"
require "rack/utils"
require "rack/mime"

# Basic REST interface.  This is bug-for-bug compatible with an old app
# that's been deployed for years in a private LAN.
#
# This started out as a WORM (write-once, read-many) system, but
# eventually gained the ability to handle deletes.
#
# This was meant to behave like the MogileFS protocol somewhat (but is
# pure-HTTP), so it redirects to the storage nodes and bypasses Ruby
# for bulk I/O when retrieving files.  PUTs (writes) still go through
# Ruby, however.
class OMGF::HystericalRaisins
  include OMGF::Pool

  def initialize(opts)
    @default_class_cb = opts[:default_class_cb] || {}
    mg_opts = {
      domain: "any",
      hosts: opts[:hosts],
      fail_timeout: opts[:fail_timeout] || 0.5,

      # high defaults because of slow seeks on storage nodes (for size verify)
      timeout: opts[:timeout] || 30,
      get_file_data_timeout: opts[:get_file_data_timeout] || 30,
    }
    @reproxy_header = opts[:reproxy_header] || "HTTP_X_OMGF_REPROXY"
    @reproxy_path = opts[:reproxy_path]
    @get_paths_opts = {
      noverify: opts[:noverify],
      pathcount: opts[:pathcount] || 0x7fffffff,
    }
    @new_file_opts = {
      content_md5: opts[:content_md5], # :trailer is acceptable here

      # largefile: auto-selects based on env["CONTENT_LENGTH"]
      largefile: opts[:largefile] || :stream,
    }
    @put_overwrite_header = opts[:put_overwrite_header] || "HTTP_X_OMGF_FORCE"
    @vp = OMGF::VerifyPaths.new(opts[:logger])
    @verify_timeout = opts[:verify_timeout] || 0.5
    pool_init(mg_opts)

    # we may use regurgitator for reads
    if db = opts[:db]
      require 'omgf/regurgitator'
      extend OMGF::Regurgitator
      regurgitator_init(db)
    end
  end

  # The entry point for Rack
  def call(env)
    case env["REQUEST_METHOD"]
    when "GET"
      get(env)
    when "HEAD"
      head(env)
    when "PUT"
      put(env)
    when "DELETE"
      delete(env)
    else
      r(405)
    end
  rescue MogileFS::Backend::UnknownKeyError,
         MogileFS::Backend::DomainNotFoundError,
         MogileFS::Backend::UnregDomainError
    r(404, "")
  rescue => e
    logger = env["rack.logger"]
    logger.error "#{e.message} (#{e.class})"
    e.backtrace.each { |line| logger.error(line) }
    r(500, "")
  end

  # GET /$DOMAIN?prefix=foo - list keys
  # GET /$DOMAIN/$KEY - redirects to FIDs on storage nodes
  def get(env)
    case env["PATH_INFO"].squeeze("/")
    when %r{\A/([^/]+)/(.+)\z} # /$DOMAIN/$KEY
      redirect_key(env, $1, $2)
    when %r{\A/([^/]+)/?\z}    # /$DOMAIN
      get_keys(env, $1)
    when "/"
      r(200, "")
    else
      r(404, "")
    end
  end

  # returns metadata for a given domain/key
  def head(env)
    case env["PATH_INFO"].squeeze("/")
    when %r{\A/([^/]+)/(.+)\z} # HEAD /$DOMAIN/$KEY
      stat_key(env, $1, $2)
    else
      # pass on headers from listing results
      status, headers, _ = get(env)
      [ status, headers, [] ]
    end
  end

  # HEAD /$DOMAIN/$KEY
  def stat_key(env, domain, key)
    size, uris = mg_size_and_uris(env, domain, key, @get_paths_opts)
    uris = @vp.verify(uris, 1, @verify_timeout).flatten!

    return r(503, "") unless uris && uris[0]

    h = { "Content-Length" => size.to_s }
    fn = filename(h, query(env)) || key
    h["Content-Type"] = key_mime_type(fn)
    unless reproxy?(env, key, h, uris[0].to_s)
      uris.each_with_index { |uri,i| h["X-Url-#{i}"] = uri.to_s }
    end
    [ 200, h, [] ]
  end

  # GET /$DOMAIN/$KEY
  def redirect_key(env, domain, key)
    uris = mg_get_uris(env, domain, key, @get_paths_opts)
    uris = @vp.verify(uris, 1, @verify_timeout).flatten!

    return r(503, "") unless uris && dest = uris.shift

    location = dest.to_s
    h = {
      'Content-Length' => '0',
      'Location' => location,
      'Content-Type' => 'text/html'
    }

    unless reproxy?(env, key, h, location)
      uris.each_with_index { |uri,i| h["X-Alt-Location-#{i}"] = uri.to_s }
    end
    [ 302, h, [] ]
  end

  # GET /$DOMAIN/
  def get_keys(env, domain) # :nodoc:
    params = query(env)
    prefix = params["prefix"] || ""
    after = params["after"]
    limit = (params["limit"] || 1000).to_i

    case env["HTTP_ACCEPT"]
    when "application/json"
      tmp = []
      h = { "Content-Type" => "application/json" }
      mg_list_keys(env, domain, prefix, after, limit) { |*x| tmp << x }
      tmp = tmp.to_json
    else
      tmp = ""
      h = { "Content-Type" => "text/plain" }
      mg_list_keys(env, domain, prefix, after, limit) do |dkey,length,devcount|
        tmp << "#{dkey}|#{length}|#{devcount}\n"
      end
    end
    h["Content-Length"] = tmp.size.to_s

    [ 200, h, [ tmp ] ]
  rescue MogileFS::Backend::UnregDomainError,
         MogileFS::Backend::DomainNotFoundError
    r(404, "Domain not found")
  end

  # returns a plain-text HTTP response
  def r(code, msg = nil, env = nil) # :nodoc:
    if env && logger = env["rack.logger"]
      logger.warn("#{env['REQUEST_METHOD']} #{env['PATH_INFO']} " \
                  "#{code} #{msg.inspect}")
    end

    if Rack::Utils::STATUS_WITH_NO_ENTITY_BODY.include?(code)
      [ code, {}, [] ]
    else
      msg ||= Rack::Utils::HTTP_STATUS_CODES[code] || ""

      if msg.size > 0
        # using += to not modify original string (owned by Rack)
        msg += "\n"
      end

      [ code,
        { 'Content-Type' => 'text/plain', 'Content-Length' => msg.size.to_s },
        [ msg ] ]
    end
  end

  # returns the mime type based on key name
  def key_mime_type(key) # :nodoc:
    /\.([^.]+)\z/ =~ key
    Rack::Mime.mime_type($1) # nil => 'application/octet-stream'
  end

  # sets Content-Disposition response header if requested by the user
  # in a query string (via "inline" or "attachment" param)
  # Returns requested filename, or nil if invalid
  def filename(h, params) # :nodoc:
    if fn = params['inline'] and Rack::Utils.escape(fn) == fn
      h['Content-Disposition'] = "inline; filename=#{fn}"
      return fn
    elsif fn = (params['attachment'] || params['filename']) and
          Rack::Utils.escape(fn) == fn
      h['Content-Disposition'] = "attachment; filename=#{fn}"
      return fn
    end
  end

  # This relies on nginx to reproxy paths for us
  def reproxy?(env, key, h, location) # :nodoc:
    return false unless @reproxy_path && env[@reproxy_header].to_i != 0
    %r{/0*(\d+)\.fid\z} =~ location
    fid = $1
    h['ETag'] = %("#{fid}")
    fn = filename(h, query(env)) || key

    case env["REQUEST_METHOD"]
    when "GET"
      # Fake a Last-Modified time (enough to bust caches)
      h['X-Redirect-Last-Modified'] = Time.at(fid.to_i).httpdate

      # yes we violate Rack::Lint here, no Content-Type or Content-Length
      h.delete("Content-Length")
      h.delete("Content-Type")
      h['X-Redirect-Content-Type'] = key_mime_type(fn)
      h['X-Accel-Redirect'] = @reproxy_path
    when "HEAD"
      # Fake a Last-Modified time (enough to bust caches)
      h['Last-Modified'] = Time.at(fid.to_i).httpdate
    else
      raise "BUG: bad request method #{env["REQUEST_METHOD"]}"
    end

    true
  end

  # PUT /domain/key
  def put(env)
    case env["PATH_INFO"]
    when %r{\A/([^/]+)/(.+)\z} # /$DOMAIN/$KEY
      put_key(env, $1, $2)
    else
      r(404, "")
    end
  end

  # we're very picky about keys, we only allow keys which won't cause
  # ambiguity when passed through URI escapers/normalizers.
  def bad_key?(key) # :nodoc:
    %r{\A[\w./-]+\z} !~ key ||
      %r{//} =~ key ||
      %r{[./]} =~ "#{key[0]}#{key[-1]}"
  end

  # PUT /$DOMAIN/$KEY
  def put_key(env, domain, key)
    return r(403, "") if env[@reproxy_header].to_i != 0
    return r(406, "key `#{key}' is not URI-friendly") if bad_key?(key)
    return r(406, "key is too long") if key.size > 128

    clen = env["CONTENT_LENGTH"]

    # this was written before MogileFS supported empty files,
    # but empty files waste DB space so we don't support them
    # Not bothering with Transfer-Encoding: chunked, though...
    return r(403, "empty files forbidden") if "0" == clen

    params = query(env)
    input = env["rack.input"]
    paths = nil
    retried = false

    # prepare options for create_open/create_close:
    new_file_opts = @new_file_opts.dup

    # the original deployment of this created a class for every
    # domain with the same class having the same name as the domain
    new_file_opts[:class] = params['class'] || @default_class_cb[domain]

    # try to give a Content-Length to the tracker
    clen and new_file_opts[:content_length] = clen.to_i

    if /\bContent-MD5\b/i =~ env["HTTP_TRAILER"]
      # if the client will give the Content-MD5 as the trailer,
      # we must lazily populate it since we're not guaranteed to
      # have the trailer, yet (rack.input is lazily read on unicorn)
      new_file_opts[:content_md5] = lambda { env["HTTP_CONTENT_MD5"] }
    elsif cmd5 = env["HTTP_CONTENT_MD5"]
      # maybe the client gave the Content-MD5 in the header
      new_file_opts[:content_md5] = cmd5
    end

    begin
      pool_use(domain) do |mg|
        begin
          # TOCTOU issue, but probably not worth worrying about
          # Nothing we can do about it without explicit MogileFS support
          # or a 3rd-party locking daemon
          paths = mg.get_paths(key)
          if paths && paths[0]

            # overwriting existing files is not permitted by default
            if env[@put_overwrite_header] != "true"
              # show the existing paths in response
              return r(403, paths.join("\n"))
            end
          end
        rescue MogileFS::Backend::UnknownKeyError
          # good, not clobbering anything
        end

        # finally, upload the file
        mg.new_file(key, new_file_opts) do |io|
          IO.copy_stream(input, io)
        end
      end # pool_use

      # should always return 201 if ! found, but we keep 200 for legacy
      # compat if they're not logged in (via REMOTE_USER)
      status = paths ? 204 : (env["REMOTE_USER"] ? 201 : 200)
      r(status, "")
    rescue MogileFS::Backend::UnregDomainError,
           MogileFS::Backend::DomainNotFoundError
      r(406, "Invalid domain: #{domain}")
    rescue => e
      if retried == false && input.respond_to?(:rewind)
        begin
          retried = true
          input.rewind # may raise on future perfectly compliant Rack servers
          env["rack.logger"].warn("#{e.message} (#{e.class})")
          retry
        rescue
        end
      end

      raise
    end
  end

  # DELETE /domain/key
  def delete(env)
    case env["PATH_INFO"]
    when %r{\A/([^/]+)/(.+)\z} # /$DOMAIN/$KEY
      delete_key(env, $1, $2)
    else
      r(404, "")
    end
  end

  # DELETE /domain/key
  def delete_key(env, domain, key)
    pool_use(domain) { |mg| mg.delete(key) }
    r(204, "")
  rescue MogileFS::Backend::UnregDomainError,
         MogileFS::Backend::DomainNotFoundError
    r(406, "Invalid domain: #{domain}")
  rescue MogileFS::Backend::UnknownKeyError
    r(404, "")
  end

  def query(env)
    Rack::Utils.parse_query(env["QUERY_STRING"])
  end
end

git clone https://yhbt.net/omgf.git