# -*- encoding: binary -*- # \MogileFS file manipulation client. # # Create a new instance that will communicate with these trackers: # hosts = %w[192.168.1.69:6001 192.168.1.70:6001] # mg = MogileFS::MogileFS.new(:domain => 'test', :hosts => hosts) # # # Stores "A bunch of text to store" into 'some_key' with a class of 'text'. # mg.store_content('some_key', 'text', "A bunch of text to store") # # # Retrieve data from 'some_key' as a string # data = mg.get_file_data('some_key') # # # Store the contents of 'image.jpeg' into the key 'my_image' with a # # class of 'image'. # mg.store_file('my_image', 'image', 'image.jpeg') # # # Store the contents of 'image.jpeg' into the key 'my_image' with a # # class of 'image' using an open IO object. # File.open('image.jpeg') { |fp| mg.store_file('my_image', 'image', fp) } # # # Retrieve the contents of 'my_image' into '/path/to/huge_file' # # without slurping the entire contents into memory: # mg.get_file_data('my_image', '/path/to/huge_file') # # # Remove the key 'my_image' and 'some_key'. # mg.delete('my_image') # mg.delete('some_key') # class MogileFS::MogileFS < MogileFS::Client include MogileFS::Bigfile # The domain of keys for this MogileFS client. attr_accessor :domain # The timeout for get_file_data (per-read() system call). # Defaults to five seconds. attr_accessor :get_file_data_timeout # The maximum allowed time for creating a new_file. Defaults to 1 hour. attr_accessor :new_file_max_time # Creates a new MogileFS::MogileFS instance. +args+ must include a key # :domain specifying the domain of this client. # # Optional parameters for +args+: # # [:get_file_data_timeout => Numeric] # # See get_file_data_timeout # # [:new_file_max_time => Numeric] # # See new_file_max_time # # [:fail_timeout => Numeric] # # Delay before retrying a failed tracker backends. # Defaults to 5 seconds. # # [:timeout => Numeric] # # Timeout for tracker backend responses. # Defaults to 3 seconds. # # [:connect_timeout => Integer] # # Timeout for connecting to a tracker # Defaults to 3 seconds def initialize(args = {}) @domain = args[:domain] @get_file_data_timeout = args[:get_file_data_timeout] || 5 @new_file_max_time = args[:new_file_max_time] || 3600.0 @nhp_get = nhp_new('get') @nhp_get.open_timeout = @nhp_get.read_timeout = @get_file_data_timeout @nhp_put = nhp_new('put') @nhp_put.open_timeout = @nhp_put.read_timeout = @new_file_max_time raise ArgumentError, "you must specify a domain" unless @domain if @backend = args[:db_backend] @readonly = true else super end end # Enumerates keys, limited by optional +prefix+ # +args+ may also be specified for an optional +:limit+ # and +:after+ (default: nil) def each_key(prefix = "", args = nil, &block) after = limit = nil if args after = args[:after] limit = args[:limit] end begin keys, after = list_keys(prefix, after, limit || 1000, &block) return unless keys && keys[0] limit -= keys.size if limit end while limit == nil || limit > 0 nil end # Enumerates keys and yields a +file_info+ hash for each key matched by # +prefix+ def each_file_info(prefix = "", args = nil) # FIXME: there's a lot of duplicate code from list_keys_verbose here... raise ArgumentError, "need block" unless block_given? ordered = ready = nil on_file_info = lambda do |info| Hash === info or raise info file_info_cleanup(info) # deal with trackers with multiple queryworkers responding out-of-order ready[info["key"]] = info while info = ready.delete(ordered[-1]) ordered.pop yield info end end nr = 0 opts = { :domain => @domain } opts[:devices] = 1 if args && args[:devices] after = args ? args[:after] : nil limit = args ? args[:limit] : nil begin keys, after = list_keys(prefix, after, limit || 1000) return nr unless keys && keys[0] ordered = keys.reverse ready = {} nr += keys.size limit -= keys.size if limit keys.each do |key| opts[:key] = key @backend.pipeline_dispatch(:file_info, opts, &on_file_info) end @backend.pipeline_wait rescue MogileFS::PipelineError, SystemCallError, MogileFS::RequestTruncatedError, MogileFS::UnreadableSocketError, MogileFS::InvalidResponseError, # truncated response MogileFS::Timeout @backend.shutdown keys = (ordered - ready.keys).reverse! retry end while limit == nil || limit > 0 rescue @backend.shutdown raise end # Retrieves the contents of +key+. If +dst+ is specified, +dst+ # should be an IO-like object capable of receiving the +write+ method # or a path name. +copy_length+ may be specified to limit the number of # bytes to retrieve, and +src_offset+ can be specified to specified the # start position of the copy. def get_file_data(key, dst = nil, copy_length = nil, src_offset = nil) paths = get_paths(key) if src_offset || copy_length src_offset ||= 0 range_end = copy_length ? src_offset + copy_length - 1 : nil range = [ src_offset, range_end ] end if dst sock = MogileFS::HTTPReader.first(paths, @get_file_data_timeout, range) sock.stream_to(dst) elsif block_given? sock = MogileFS::HTTPReader.first(paths, @get_file_data_timeout, range) yield(sock) else errors = nil paths.each do |path| uri = URI.parse(path) get = Net::HTTP::Get.new(uri.path) get["range"] = "bytes=#{range[0]}-#{range[1]}" if range begin res = @nhp_get.request(uri, get) case res.code.to_i when 200, 206 return res.body end (errors ||= []) << "#{path} - #{res.message} (#{res.class})" rescue => e (errors ||= []) << "#{path} - #{e.message} (#{e.class})" end end raise MogileFS::Error, "all paths failed with GET: #{errors.join(', ')}", [] end ensure sock.close if sock && ! sock.closed? end # Get the paths (URLs as strings) for +key+. If +args+ is specified, # it may contain: # - :noverify -> boolean, whether or not the tracker checks (default: true) # - :pathcount -> a positive integer of URLs to retrieve (default: 2) # - :zone -> "alt" or nil (default: nil) # # :noverify defaults to true because this client library is capable of # verifying paths for readability itself. It is also faster and more # reliable to verify paths on the client. def get_paths(key, *args) opts = { :domain => @domain, :key => key, :noverify => args[0], :zone => args[1], } if Hash === args[0] args = args[0] opts[:noverify] = args[:noverify] zone = args[:zone] and opts[:zone] = zone pathcount = args[:pathcount] and opts[:pathcount] = pathcount.to_i end opts[:noverify] = false == opts[:noverify] ? 0 : 1 @backend.respond_to?(:_get_paths) and return @backend._get_paths(opts) res = @backend.get_paths(opts) (1..res['paths'].to_i).map { |i| res["path#{i}"] } end # Returns +true+ if +key+ exists, +false+ if not def exist?(key) args = { :key => key, :domain => @domain , :ruby_no_raise => true} case rv = @backend.get_paths(args) when Hash true when MogileFS::Backend::UnknownKeyError false else raise rv end end # Get the URIs for +key+ (paths) as URI::HTTP objects def get_uris(key, *args) get_paths(key, *args).map! { |path| URI.parse(path) } end # Creates a new file +key+ in the domain of this object. # # +bytes+ is the expected size of the file if known in advance # # It operates like File.open(..., "w") and may take an optional # block, yielding an IO-like object with support for the methods # documented in MogileFS::NewFile::Writer. # # This atomically replaces existing data stored as +key+ # when the block exits or when the returned object is closed. # # +args+ may contain the following options: # # [:content_length => Integer] # # This has the same effect as the (deprecated) +bytes+ parameter. # # [ :largefile => :stream, :content_range or :tempfile ] # # See MogileFS::NewFile for more information on this # # [ :class => String] # # The MogileFS storage class of the object. # # [:content_md5 => String, Proc, or :trailer] # # This can either be a Base64-encoded String, a Proc object, or # the :trailer symbol. If given a String, it will be used as the # Content-MD5 HTTP header. If given the :trailer symbol, this library # will automatically generate an Content-MD5 HTTP trailer. If given # a Proc object, this Proc object should give a Base64-encoded string # which can be used as the Content-MD5 HTTP trailer when called at the # end of the request. # # Keep in mind most HTTP servers do not support HTTP trailers, so # passing a String is usually the safest way to use this. # # [:info => Hash] # # This is an empty hash that will be filled the same information # MogileFS::MogileFS#file_info. # # Additionally, it contains one additional key: :uris, # an array of URI::HTTP objects to the stored destinations def new_file(key, args = nil, bytes = nil) # :yields: file raise MogileFS::ReadOnlyError if readonly? opts = { :key => key, :multi_dest => 1 } case args when Hash opts[:domain] = args[:domain] open_args = args[:create_open_args] klass = args[:class] and "default" != klass and opts[:class] = klass when String opts[:class] = args if "default" != args end opts[:domain] ||= @domain res = @backend.create_open(open_args ? open_args.merge(opts) : opts) opts[:nhp_put] = @nhp_put dests = if dev_count = res['dev_count'] # multi_dest succeeded (1..dev_count.to_i).map { |i| [res["devid_#{i}"], res["path_#{i}"]] } else # single destination returned # 0x0040: d0e4 4f4b 2064 6576 6964 3d31 2666 6964 ..OK.devid=1&fid # 0x0050: 3d33 2670 6174 683d 6874 7470 3a2f 2f31 =3&path=http://1 # 0x0060: 3932 2e31 3638 2e31 2e37 323a 3735 3030 92.168.1.72:7500 # 0x0070: 2f64 6576 312f 302f 3030 302f 3030 302f /dev1/0/000/000/ # 0x0080: 3030 3030 3030 3030 3033 2e66 6964 0d0a 0000000003.fid.. [[res['devid'], res['path']]] end opts.merge!(args) if Hash === args opts[:backend] = @backend opts[:fid] = res['fid'] opts[:content_length] ||= bytes if bytes opts[:new_file_max_time] ||= @new_file_max_time opts[:start_time] = MogileFS.now info = opts[:info] and info["class"] = klass || "default" case (dests[0][1] rescue nil) when %r{\Ahttp://} http_file = MogileFS::NewFile.new(dests, opts) if block_given? yield http_file return http_file.commit # calls create_close else return http_file end when nil, '' raise MogileFS::EmptyPathError, "Empty path for mogile upload res=#{res.inspect}" else raise MogileFS::UnsupportedPathError, "paths '#{dests.inspect}' returned by backend is not supported" end end # Copies the contents of +file+ into +key+ in class +klass+. +file+ can be # either a path name (String or Pathname object) or an IO-like object that # responds to #read or #readpartial. Returns size of +file+ stored. # This atomically replaces existing data stored as +key+ def store_file(key, klass, file, opts = nil) raise MogileFS::ReadOnlyError if readonly? (opts ||= {})[:class] = klass if String === klass new_file(key, opts) { |mfp| mfp.big_io = file } end # Stores +content+ into +key+ in class +klass+, where +content+ is a String # This atomically replaces existing data stored as +key+ def store_content(key, klass, content, opts = nil) raise MogileFS::ReadOnlyError if readonly? (opts ||= {})[:class] = klass if String === klass new_file(key, opts) do |mfp| if content.is_a?(MogileFS::Util::StoreContent) mfp.streaming_io = content else mfp << content end end end # Removes +key+. def delete(key) raise MogileFS::ReadOnlyError if readonly? @backend.delete :domain => @domain, :key => key true end # Updates +key+ to +newclass+ def updateclass(key, newclass) raise MogileFS::ReadOnlyError if readonly? @backend.updateclass(:domain => @domain, :key => key, :class => newclass) true end # Sleeps +duration+, only used for testing def sleep(duration) # :nodoc: @backend.sleep :duration => duration end # Renames a key +from+ to key +to+. def rename(from, to) raise MogileFS::ReadOnlyError if readonly? @backend.rename :domain => @domain, :from_key => from, :to_key => to nil end # Returns the size of +key+. def size(key) @backend.respond_to?(:_size) and return @backend._size(domain, key) begin file_info(key)["length"].to_i rescue MogileFS::Backend::UnknownCommandError paths_size(get_paths(key)) end end def paths_size(paths) # :nodoc: require "mogilefs/paths_size" MogileFS::PathsSize.call(paths) end # Lists keys starting with +prefix+ following +after+ up to +limit+. If # +after+ is nil the list starts at the beginning. def list_keys(prefix = "", after = nil, limit = 1000, &block) @backend.respond_to?(:_list_keys) and return @backend._list_keys(domain, prefix, after, limit, &block) res = @backend.list_keys(:domain => domain, :prefix => prefix, :after => after, :limit => limit, :ruby_no_raise => true) MogileFS::Backend::NoneMatchError === res and return raise res if MogileFS::Error === res keys = (1..res['key_count'].to_i).map { |i| res["key_#{i}"] } if block if 1 == block.arity keys.each { |key| block.call(key) } else list_keys_verbose(keys, block) end end [ keys, res['next_after'] ] end def list_keys_verbose(keys, block) # :nodoc: # emulate the MogileFS::Mysql interface, slowly... ordered = keys.reverse ready = {} on_file_info = lambda do |info| Hash === info or raise info file_info_cleanup(info) # deal with trackers with multiple queryworkers responding out-of-order ready[info["key"]] = info while info = ready.delete(ordered[-1]) block.call(ordered.pop, info["length"], info["devcount"]) end end opts = { :domain => @domain } begin keys.each do |key| opts[:key] = key @backend.pipeline_dispatch(:file_info, opts, &on_file_info) end @backend.pipeline_wait rescue MogileFS::Backend::UnknownCommandError # MogileFS < 2.45 @backend.shutdown # reset the socket args = { :pathcount => 0x7fffffff } keys.each do |key| paths = get_paths(key, args) block.call(key, paths_size(paths), paths.size) end rescue MogileFS::PipelineError, SystemCallError, MogileFS::RequestTruncatedError, MogileFS::UnreadableSocketError, MogileFS::InvalidResponseError, # truncated response MogileFS::Timeout @backend.shutdown keys = (ordered - ready.keys).reverse! retry rescue @backend.shutdown raise end end # Return metadata about a file as a hash. # Returns the domain, class, length, devcount, etc. as keys. # Optionally, device ids (not paths) can be returned as # well if :devices is specified and +true+. # # This should only be used for informational purposes, and not usually # for dynamically serving files. # # mg.file_info("bar") # # Returns: # # { # "domain" => "foo", # "key" => "bar", # "class" => "default", # "devcount" => 2, # "length => 666 # } def file_info(key, args = nil) opts = { :domain => @domain, :key => key } args and devices = args[:devices] and opts[:devices] = devices ? 1 : 0 file_info_cleanup(@backend.file_info(opts)) end def file_info_cleanup(rv) # :nodoc: %w(fid length devcount).each { |f| rv[f] = rv[f].to_i } devids = rv["devids"] and rv["devids"] = devids.split(','.freeze).map! { |x| x.to_i } rv end # Given an Integer +fid+ or String +key+ and domain, thorougly search # the database for all occurences of a particular fid. # # Use this sparingly, this command hits the master database numerous # times and is very expensive. This is not for production use, only # troubleshooting and debugging. # # Searches for fid=666: # # client.file_debug(666) # # Search for key=foo using the default domain for this object: # # client.file_debug("foo") # # Search for key=foo in domain="bar": # # client.file_debug(:key => "foo", :domain => "bar") # def file_debug(args) case args when Integer then args = { "fid" => args } when String then args = { "key" => args } end opts = { :domain => args[:domain] || @domain }.merge!(args) rv = @backend.file_debug(opts) rv.each do |k,v| case k when /_(?:classid|devcount|dmid|fid|length| nexttry|fromdevid|failcount|flags|devid|type)\z/x rv[k] = v.to_i when /devids\z/ rv[k] = v.split(','.freeze).map! { |x| x.to_i } end end end def nhp_new(name) # :nodoc: if Net::HTTP::Persistent::VERSION.to_f >= 3.0 MogileFS::NHP.new(:name => name) else MogileFS::NHP.new(name) end rescue NameError MogileFS::NHP.new(name) end end