Module cvtool.ESGF.ESGConfigParser

Expand source code
# #!/usr/bin/env python
# # -*- coding: utf-8 -*-

# """
# .. module:: ESGConfigParser
# .. moduleauthor:: Guillaume Levavasseur <glipsl@ipsl.fr>

# """

# import os
# import re
# import string
# from ConfigParser import ConfigParser, _Chainmap, DEFAULTSECT, MAX_INTERPOLATION_DEPTH

# from custom_exceptions import *


# class SectionParser(ConfigParser):
#     """
#     Custom ConfigParser class to parse ESGF .ini files from a source directory.
#     Parse a configuration section (mandatory).

#     """

#     def __init__(self, section, directory=None):
#         ConfigParser.__init__(self)
#         self.reset()
#         ConfigException.SECTION = section
#         self.section = section
#         if directory:
#             self.file = None
#             self.parse(directory)
#         else:
#             self.add_section(section)

#     def set(self, option, value=None, newline=False, section=None):
#         """
#         Overwrite the original method to set an option value for the given section.
#         The value can be written on a new line.

#         """
#         if self.section == DEFAULTSECT:
#             sectdict = self._defaults
#         else:
#             try:
#                 sectdict = self._sections[self.section]
#             except KeyError:
#                 raise NoConfigSection()
#         if newline:
#             value = '\n{}'.format(value)
#         sectdict[self.optionxform(option)] = value

#     def write(self, fp):
#         """
#         Overwrite the original method to write options value for the given section.
#         An ending newline is added in any case.

#         """
#         if self._defaults:
#             fp.write("[%s]\n" % DEFAULTSECT)
#             fp.write("\n")
#             for (key, value) in self._defaults.items():
#                 fp.write("%s = %s\n" % (key, str(value)))
#                 fp.write("\n")
#         for section in self._sections:
#             fp.write("[%s]\n" % section)
#             fp.write("\n")
#             for (key, value) in self._sections[section].items():
#                 if key == "__name__":
#                     continue
#                 if (value is not None) or (self._optcre == self.OPTCRE):
#                     key = " = ".join((key, str(value)))
#                 fp.write("%s\n" % key)
#                 fp.write("\n")

#     def get(self, option, raw=True, variables=None, section=None):
#         """
#         Overwrite the original method to get an option value for the given section.

#         """
#         sectiondict = {}
#         try:
#             sectiondict = self._sections[self.section]
#         except KeyError:
#             if self.section != DEFAULTSECT:
#                 raise NoConfigSection()
#         # Update with the entry specific variables
#         vardict = {}
#         if variables:
#             for key, value in variables.items():
#                 vardict[self.optionxform(key)] = value
#         d = _Chainmap(vardict, sectiondict, self._defaults)
#         option = self.optionxform(option)
#         try:
#             value = d[option]
#         except KeyError:
#             raise NoConfigOption(option)
#         if raw or value is None:
#             return value
#         else:
#             return self._interpolate(self.section, option, value, d)

#     def has_option(self, option, section=None):
#         """
#         Overwrite the original method to check for the existence of a given option in the given section.

#         """
#         if not self.section or self.section == DEFAULTSECT:
#             option = self.optionxform(option)
#             return option in self._defaults
#         elif self.section not in self._sections:
#             return False
#         else:
#             option = self.optionxform(option)
#             return (option in self._sections[self.section]
#                     or option in self._defaults)

#     @staticmethod
#     def reset():
#         """
#         Resets exception constants

#         """
#         ConfigException.FILE = None
#         ConfigException.SECTION = None

#     def parse(self, path):
#         """
#         Parses the configuration files.

#         :param str path: The directory path of configuration files
#         :returns: The configuration file
#         :rtype: *CfgParser*
#         :raises Error: If no configuration file exists
#         :raises Error: If no configuration section exist
#         :raises Error: If the configuration file parsing fails

#         """
#         # If the section is not "[project:.*]", only read esg.ini
#         ConfigException.FILE = os.path.join(path, 'esg.ini')
#         self.file = os.path.join(path, 'esg.ini')
#         self.read(self.file)
#         if re.match(r'project:.*', self.section) and self.section not in self.sections():
#             project = self.section.split('project:')[1]
#             ConfigException.FILE = os.path.join(path, 'esg.{}.ini'.format(project))
#             self.file = os.path.join(path, 'esg.{}.ini'.format(project))
#             if not os.path.isfile(self.file):
#                 raise NoConfigFile(self.file)
#             self.read(self.file)
#         if self.section not in self.sections():
#             raise NoConfigSection()
#         if not self:
#             raise EmptyConfigFile()

#     def read(self, filenames):
#         """
#         Read and parse a filename or a list of filenames, and records their paths.

#         """
#         if isinstance(filenames, str):
#             filenames = [filenames]
#         for filename in filenames:
#             try:
#                 fp = open(filename)
#             except IOError:
#                 continue
#             self._read(fp, filename)
#             fp.close()

#     def sections(self, default=True):
#         """
#         Returns the list of section names with/without [DEFAULT]

#         """
#         if default:
#             return self._sections.keys() + ['DEFAULT']
#         else:
#             return self._sections.keys()

#     def options(self, defaults=True):
#         """
#         Returns the list of options names into the section with/without defaults.

#         """
#         opts = self._sections[self.section].copy()
#         if defaults:
#             opts.update(self._defaults)
#         if '__name__' in opts:
#             del opts['__name__']
#         return opts.keys()

#     def translate(self, option, add_ending_filename=False, add_ending_version=False, sep='/'):
#         """
#         Return a regular expression associated with a ``pattern_format`` option
#         in the configuration file. This can be passed to the Python ``re`` methods.

#         :returns: The corresponding ``re`` pattern

#         """
#         # Default is to translate the pattern string as it
#         # Without forcing ending version or filename
#         if not self.has_option(option):
#             raise NoConfigOption(option)
#         pattern = self.get(option, raw=True).strip()
#         # Start translation
#         pattern = pattern.replace('\.', '__ESCAPE_DOT__')
#         pattern = pattern.replace('.', r'\.')
#         pattern = pattern.replace('__ESCAPE_DOT__', r'\.')
#         # Translate all patterns matching [.*] as optional pattern
#         pattern = re.sub(re.compile(r'\[(.*)\]'), r'(\1)?', pattern)
#         # Remove underscore from latest mandatory pattern to allow optional brackets
#         pattern = re.sub(re.compile(r'%\(([^()]*)\)s\('), r'(?P<\1>[^-_]+)(', pattern)
#         # Translate all patterns matching %(digit)s
#         pattern = re.sub(re.compile(r'%\((digit)\)s'), r'[\d]+', pattern)
#         # Translate all patterns matching %(string)s
#         pattern = re.sub(re.compile(r'%\((string)\)s'), r'[\w-]+', pattern)
#         # Translate %(root)s variable if exists but not required. Can include the project name.
#         if re.compile(r'%\((root)\)s').search(pattern):
#             pattern = re.sub(re.compile(r'%\((root)\)s'), r'(?P<\1>[\w./-]+)', pattern)
#         # Constraint on %(version)s number
#         pattern = re.sub(re.compile(r'%\((version)\)s'), r'(?P<\1>v[\d]+|latest)', pattern)
#         # Translate all patterns matching %(name)s
#         pattern = re.sub(re.compile(r'%\(([^()]*)\)s'), r'(?P<\1>[\w.-]+)', pattern)
#         # Add ending version pattern if needed and missing
#         if add_ending_version and 'version' not in pattern:
#             pattern = '{}{}(?P<version>v[\d]+|latest)$'.format(pattern, sep)
#         # Add ending filename pattern if needed and missing
#         if add_ending_filename and 'filename' not in pattern:
#             pattern = '{}{}(?P<filename>[\w.-]+)$'.format(pattern, sep)
#         return pattern

#     def get_facets(self, option, ignored=None):
#         """
#         Returns the set of facets declared into "*_format" attributes in the configuration file.
#         :param str option: The option to get facet names
#         :param list ignored: The list of facets to ignored
#         :returns: The collection of facets
#         :rtype: *set*

#         """
#         facets = re.findall(re.compile(r'%\(([^()]*)\)s'), self.get(option, raw=True))
#         if ignored:
#             return [f for f in facets if f not in ignored]
#         else:
#             return facets

#     def check_options(self, pairs):
#         """
#         Checks a {key: value} pairs against the corresponding options from the configuration file.

#         :param dict pairs: A dictionary of {key: value} to check
#         :raises Error: If the value is missing in the corresponding options list

#         """
#         for key in pairs.keys():
#             # Do the check only if value exists (i.e.,not None)
#             if pairs[key]:
#                 options, option = self.get_options(key)
#                 try:
#                     # get_options returned a list
#                     if pairs[key] not in options:
#                         raise NoConfigValue(pairs[key], option)
#                 except TypeError:
#                     # get_options returned a regex from pattern
#                     if not options.match(pairs[key]):
#                         raise NoConfigValue(pairs[key], options.pattern)
#                     else:
#                         self.check_options(options.match(pairs[key]).groupdict())

#     def get_options(self, option):
#         """
#         Returns the list of attribute options.

#         :param str option: The option to get available values
#         :returns: The option values
#         :rtype: *list* or *re.RegexObject*
#         :raises Error: If the option is missing

#         """
#         if self.has_option('{}_options'.format(option)):
#             option = '{}_options'.format(option)
#             return self.get_options_from_list(option), option
#         elif self.has_option('{}_map'.format(option)):
#             option = '{}_map'.format(option)
#             return self.get_options_from_map(option), option
#         elif self.has_option('{}_pattern'.format(option)):
#             option = '{}_pattern'.format(option)
#             return self.get_options_from_pattern(option), option
#         elif self.has_option('category_defaults'):
#             return self.get_option_from_pairs('category_defaults', option), 'category_defaults'
#         else:
#             raise NoConfigOptions(option)

#     def get_options_from_list(self, option):
#         """
#         Returns the list of option values from options list.

#         :param str option: The option to get available values
#         :returns: The option values
#         :rtype: *list*
#         :raises Error: If the section does not exist
#         :raises Error: If the option does not exist

#         """
#         if not self.has_option(option):
#             raise NoConfigOption(option)
#         if option in ['experiment_options', 'project_options']:
#             options = self.get_options_from_table(option, field_id=2)
#         else:
#             options = split_line(self.get(option), sep=',')
#         return options

#     def get_options_from_table(self, option, field_id=None):
#         """
#         Returns the list of options from options table (i.e., <field1> | <field2> | <field3> | etc.).

#         :param str option: The option to get available values
#         :param int field_id: The field number starting from 1 (if not return the tuple)
#         :returns: The option values
#         :rtype: *list*
#         :raises Error: If the section does not exist
#         :raises Error: If the option does not exist
#         :raises Error: If the options table is misdeclared

#         """
#         if not self.has_option(option):
#             raise NoConfigOption(option)
#         option_lines = split_line(self.get(option).lstrip(), sep='\n')
#         if len(option_lines) == 1 and not option_lines[0]:
#             return list()
#         try:
#             if field_id:
#                 options = [tuple(option)[field_id - 1] for option in map(lambda x: split_line(x), option_lines)]
#             else:
#                 options = [tuple(option) for option in map(lambda x: split_line(x), option_lines)]
#         except:
#             raise MisdeclaredOption(option)
#         return options

#     def get_option_from_pairs(self, option, key):
#         """
#         Returns the list of option values from pairs table (i.e., <key> | <value>).

#         :param str option: The option to get available values
#         :param str key: The key to get the value
#         :returns: The key value
#         :rtype: *str*
#         :raises Error: If the section does not exist
#         :raises Error: If the option does not exist
#         :raises Error: If the key does not exist
#         :raises Error: If the options table is misdeclared

#         """
#         if not self.has_option(option):
#             raise NoConfigOption(option)
#         options_lines = split_line(self.get(option), sep='\n')
#         try:
#             options = dict((k, v) for k, v in map(lambda x: split_line(x), options_lines[1:]))
#         except:
#             raise MisdeclaredOption(option)
#         try:
#             return options[key]
#         except KeyError:
#             raise NoConfigKey(key, option)

#     def get_options_from_map(self, option, key=None):
#         """
#         Returns the list of option values from maptable.
#         If no key submitted, the option name has to be ``<key>_map``.

#         :param str option: The option to get available values
#         :param str key: The key to get the values
#         :returns: The option values
#         :rtype: *list*
#         :raises Error: If the section does not exist
#         :raises Error: If the option does not exist
#         :raises Error: If the related key is not in the source/destination keys of the maptable

#         """
#         if not self.has_option(option):
#             raise NoConfigOption(option)
#         if not key:
#             key = option.split('_map')[0]
#         from_keys, to_keys, value_map = split_map(self.get(option))
#         if key in from_keys:
#             return list(set([value[from_keys.index(key)] for value in value_map.keys()]))
#         else:
#             return list(set([value[to_keys.index(key)] for value in value_map.values()]))

#     def get_option_from_map(self, option, pairs):
#         """
#         Returns the destination values corresponding to key values from maptable.
#         The option name has to be ``<key>_map``. The key has to be in the destination keys of the maptable header.

#         :param str option: The option to get the value
#         :param dict pairs: A dictionary of {from_key: value} to input the maptable
#         :returns: The corresponding option value
#         :rtype: *list*
#         :raises Error: If the section does not exist
#         :raises Error: If the option does not exist
#         :raises Error: If the key values are not in the destination keys of the maptable

#         """
#         if not self.has_option(option):
#             raise NoConfigOption(option)
#         from_keys, to_keys, value_map = split_map(self.get(option))
#         key = option.split('_map')[0]
#         if key not in to_keys:
#             raise MisdeclaredOption(option, details="'{}' has to be in 'destination key'".format(key))
#         from_values = tuple(pairs[k] for k in from_keys)
#         to_values = value_map[from_values]
#         return to_values[to_keys.index(key)]

#     def get_options_from_pattern(self, option):
#         """
#         Returns the expanded regex from ``key_pattern``.
#         The option name has to be ``<attr>_pattern``.

#         :param str option: The option to get available values
#         :returns: The expanded regex
#         :rtype: *re.RegexObject*
#         :raises Error: If the section does not exist
#         :raises Error: If the option does not exist

#         """
#         return re.compile(self.translate(option))


# def interpolate(rawval, variables):
#     """
#     Makes string interpolation outside of ``ConfigParser.ConfigParser`` class.

#     :param str rawval: The string to interpolate
#     :param dict variables: The dictionary of variables to replace with
#     :return:
#     """
#     pattern = re.compile(r"%\(([^)]*)\)s|.")
#     value = rawval
#     depth = MAX_INTERPOLATION_DEPTH
#     while depth:
#         depth -= 1
#         if value and "%(" in value:
#             value = pattern.sub(interpolation_replace, value)
#             try:
#                 value = value % variables
#             except KeyError:
#                 raise BadInterpolation(value, variables)
#         else:
#             break
#     if value and "%(" in value:
#         raise InterpolationDepthError(rawval)
#     return value


# def interpolation_replace(match):
#     """
#     Used to interpolate deep strings.
#     """
#     s = match.group(1)
#     if s is None:
#         return match.group()
#     else:
#         return "%%(%s)s" % s.lower()


# def split_line(line, sep='|'):
#     """
#     Split a line into fields removing trailing and leading characters.

#     :param str line: String line to split
#     :param str sep: Separator character
#     :returns: The fields
#     :rtype: *list*

#     """
#     fields = map(string.strip, line.split(sep))
#     return fields


# def build_line(fields, sep=' | ', length=None, indent=False):
#     """
#     Build a line from fields adding trailing and leading characters.

#     :param tuple fields: Tuple of ordered fields
#     :param str sep: Separator character
#     :param tuple length: The fields length
#     :param boolean indent: True to indent the line
#     :returns: The line
#     :rtype: *str*

#     """
#     if length:
#         fields = [format(fields[i], str(length[i])) for i in range(len(fields))]
#     line = sep.join(fields)
#     if indent:
#         return ' ' * 4 + line
#     else:
#         return line


# def lengths(fields):
#     """
#     Returns the maximum length among items of a list of tuples.
#     :param list fields:
#     :returns: The fields lengths
#     :rtype: *tuple*

#     """
#     return tuple([max(map(len, f)) for f in zip(*fields)])


# def split_record(option, sep='|'):
#     """
#     Split a multi-line record in a configuration file.

#     :param str option: Option in the configuration file.
#     :param str sep: Separator character.
#     :returns: A list of the form [[field1A, field2A, ...], [field1B, field2B, ...]]

#     """
#     result = []
#     for record in option.split('\n'):
#         if record == '':
#             continue
#         fields = split_line(record, sep)
#         result.append(fields)
#     return result


# def split_map_header(header):
#     """
#     Split header of a multi-line map in a configuration file.
#     A map header defines the mapping between two sets of facets id.

#     :param str header: Header line of multi-line map
#     :returns: 'from' and 'to' tuples representing the keys for the mapping

#     """
#     header_pattern = re.compile(r'map\s*\((?P<from_keys>[^(:)]*):(?P<to_keys>[^(:)]*)\)')
#     result = re.match(header_pattern, header).groupdict()
#     if result is None:
#         raise InvalidMapHeader(header_pattern, header)
#     from_keys = split_line(result['from_keys'], sep=',')
#     to_keys = split_line(result['to_keys'], sep=',')
#     return from_keys, to_keys


# def split_map(option, sep='|'):
#     """
#     Split a multi-line map in a configuration file.

#     :param str option: Option in the configuration file
#     :param str sep: Separator character
#     :returns: A dictionary mapping the 'from' tuples to the 'to' tuples

#     """
#     lines = option.split('\n')
#     header = lines[0]
#     from_keys, to_keys = split_map_header(header)
#     n_from = len(from_keys)
#     result = {}
#     for record in lines[1:]:
#         if record == '':
#             continue
#         fields = map(string.strip, record.split(sep))
#         from_values = tuple(fields[0:n_from])
#         to_values = tuple(fields[n_from:])
#         if from_values not in result.keys():
#             result[from_values] = to_values
#         else:
#             raise DuplicatedMapEntry(fields, option)
#         if len(from_values) != n_from:
#             raise InvalidMapEntry(fields, header, option)
#     return from_keys, to_keys, result

Sub-modules

cvtool.ESGF.ESGConfigParser.constants

:platform: Unix :synopsis: Constants used in this package.

cvtool.ESGF.ESGConfigParser.custom_exceptions

:platform: Unix :synopsis: Custom exceptions.