Server IP : 172.24.0.40 / Your IP : 216.73.216.10 Web Server : Apache System : Linux dbweb26.ust.edu.ph 4.18.0-513.5.1.el8_9.x86_64 #1 SMP Fri Sep 29 05:21:10 EDT 2023 x86_64 User : apache ( 48) PHP Version : 8.2.18 Disable Function : NONE MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : OFF | Sudo : ON | Pkexec : ON Directory : /usr/lib64/python3.6/site-packages/rhsm/ |
Upload File : |
from __future__ import print_function, division, absolute_import # # Copyright (c) 2011 - 2012 Red Hat, Inc. # # This software is licensed to you under the GNU General Public # License as published by the Free Software Foundation; either version # 2 of the License (GPLv2) or (at your option) any later version. # There is NO WARRANTY for this software, express or implied, # including the implied warranties of MERCHANTABILITY, # NON-INFRINGEMENT, or FITNESS FOR A PARTICULAR PURPOSE. You should # have received a copy of GPLv2 along with this software; if not, see # http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt. # import logging import imp import rpm import six import os.path from rhsm import ourjson as json from rhsm.utils import suppress_output from iniparse import SafeConfigParser, ConfigParser from cloud_what import provider try: import dnf except ImportError: dnf = None try: import libdnf except ImportError: libdnf = None try: import yum except ImportError: yum = None try: imp.find_module('zypp_plugin') use_zypper = True except ImportError: use_zypper = False if use_zypper: REPOSITORY_PATH = "/etc/rhsm/zypper.repos.d/redhat.repo" else: REPOSITORY_PATH = "/etc/yum.repos.d/redhat.repo" log = logging.getLogger(__name__) class InvalidProfileType(Exception): """ Thrown when attempting to get a profile of an unsupported type. """ pass class ModulesProfile(object): def __init__(self): self.content = self.__generate() def __str__(self): return str(self.content) def __eq__(self, other): return self.content == other.content @staticmethod def _uniquify(module_list): ret = {} for module in module_list: key = (module["name"], module["stream"], module["version"], module["context"], module["arch"]) # Prefer duplicates that are Active. # There are "enabled" duplicates from dnf that are marked as inactive. if key not in ret: ret[key] = module else: if ret[key].get("active", False) is False and module.get("active", False) is True: ret[key] = module return list(ret.values()) @staticmethod def fix_aws_rhui_repos(base) -> None: """ Try to fix RHUI repos on AWS systems. When the system is running on AWS, then we have to fix repository URL. See: https://bugzilla.redhat.com/show_bug.cgi?id=1924126 :param base: DNF base :return: None """ # First try to detect if system is running on AWS cloud_provider = provider.get_cloud_provider() if cloud_provider is None or cloud_provider.CLOUD_PROVIDER_ID != "aws": log.debug("This system is not running on AWS. Skipping fixing of RHUI repos.") return log.debug("This system is running on AWS. Trying to collect AWS metadata") metadata_str = cloud_provider.get_metadata() if metadata_str is None: log.debug("Unable to gather metadata from IMDS. Skipping fixing of RHUI repos.") return try: metadata = json.loads(metadata_str) except json.JSONDecodeError: log.warning("AWS provided corrupted json metadata document. Skipping fixing of RHUI repos.") return if 'region' not in metadata: log.debug("Region is not specified in AWS metadata. Skipping fixing of RHUI repos.") return region = metadata['region'] log.debug(f"Trying to fix URLs of RHUI repos using region: {region}") for repo in cloud_provider.rhui_repos(base): log.debug(f"Trying to fix repository: {repo.id}") try: cloud_provider.fix_rhui_url_template(repo, region) except ValueError as error: log.debug("Unable to fix RHUI URL: {error}".format(error=error)) @suppress_output def __generate(self): module_list = [] if dnf is not None and libdnf is not None: base = dnf.Base() # Read yum/dnf variables from <install_root>/etc/yum/vars and <install_root>/etc/dnf/vars # See: https://bugzilla.redhat.com/show_bug.cgi?id=1863039 base.conf.substitutions.update_from_etc(base.conf.installroot) base.read_all_repos() # Try to fix repo names, when AWS cloud provider is detected self.fix_aws_rhui_repos(base) try: log.debug("Trying to fill dnf sack object...") base.fill_sack() except dnf.exceptions.RepoError as err: log.error("Unable to create sack object: %s" % str(err)) return [] # FIXME: DNF doesn't provide public API for modulemd try: modules = base._moduleContainer except AttributeError: log.warning("DNF does not provide modulemd functionality") return [] all_module_list = modules.getModulePackages() for module_pkg in all_module_list: status = "default" active = False if modules.isEnabled(module_pkg.getName(), module_pkg.getStream()): status = "enabled" elif modules.isDisabled(module_pkg.getName()): status = "disabled" if modules.isModuleActive(module_pkg.getId()): active = True installed_profiles = [] if status == "enabled": # It has to be list, because we compare this with cached json document and # JSON does not support anything like a tuple :-) installed_profiles = list(modules.getInstalledProfiles(module_pkg.getName())) module_list.append({ "name": module_pkg.getName(), "stream": module_pkg.getStream(), "version": module_pkg.getVersion(), "context": module_pkg.getContext(), "arch": module_pkg.getArch(), "profiles": [profile.getName() for profile in module_pkg.getProfiles()], "installed_profiles": installed_profiles, "status": status, "active": active }) return ModulesProfile._uniquify(module_list) def collect(self): """ Gather list of modules reported to candlepin server :return: List of modules """ return self.content class EnabledRepos(object): def __generate(self): if not os.path.exists(self.repofile): return [] # Unfortuantely, we can not use the SafeConfigParser for zypper repo # files because the repository urls contains strings which the # SafeConfigParser don't like. It would crash with # ConfigParser.InterpolationSyntaxError: '%' must be followed by '%' or '(' if use_zypper: config = ConfigParser() else: config = SafeConfigParser() config.read(self.repofile) enabled_sections = [section for section in config.sections() if config.getboolean(section, "enabled")] enabled_repos = [] for section in enabled_sections: try: enabled_repos.append( { "repositoryid": section, "baseurl": [self._format_baseurl(config.get(section, "baseurl"))] } ) except ImportError: break return enabled_repos def __init__(self, repo_file): """ :param path: A .repo file path used to filter the report. :type path: str """ if dnf is not None: self.db = dnf.dnf.Base() elif yum is not None: self.yb = yum.YumBase() self.repofile = repo_file self.content = self.__generate() def __str__(self): return str(self.content) def _format_baseurl(self, repo_url): """ returns a well formatted baseurl string :param repo_url: a repo URL that you want to format :type path: str """ if use_zypper: return self._cut_question_mark(repo_url) else: mappings = self._obtain_mappings() return repo_url.replace('$releasever', mappings['releasever']).replace('$basearch', mappings['basearch']) def _cut_question_mark(self, repo_url): """ returns a string where everything after the first occurence of ? is truncated :param repo_url: a repo URL that you want to modify :type path: str """ return repo_url[:repo_url.find('?')] @suppress_output def _obtain_mappings(self): """ returns a hash with "basearch" and "releasever" set. This will try dnf first, and them yum if dnf is not installed. """ if dnf is not None: return self._obtain_mappings_dnf() elif yum is not None: return self._obtain_mappings_yum() else: log.error('Unable to load module for any supported package manager (dnf, yum).') raise ImportError def _obtain_mappings_dnf(self): return {'releasever': self.db.conf.substitutions['releasever'], 'basearch': self.db.conf.substitutions['basearch']} def _obtain_mappings_yum(self): return {'releasever': self.yb.conf.yumvar['releasever'], 'basearch': self.yb.conf.yumvar['basearch']} class EnabledReposProfile(object): """ Collect information about enabled repositories """ def __init__(self, repo_file=REPOSITORY_PATH): self._enabled_repos = EnabledRepos(repo_file) def __eq__(self, other): return self._enabled_repos.content == other._enabled_repos.content def collect(self): """ Gather list of enabled repositories :return: List of enabled repositories """ return self._enabled_repos.content class Package(object): """ Represents a package installed on the system. """ def __init__(self, name, version, release, arch, epoch=0, vendor=None): self.name = name self.version = version self.release = release self.arch = arch self.epoch = epoch self.vendor = vendor def to_dict(self): """ Returns a dict representation of this packages info. """ return { 'name': self._normalize_string(self.name), 'version': self._normalize_string(self.version), 'release': self._normalize_string(self.release), 'arch': self._normalize_string(self.arch), 'epoch': self._normalize_string(self.epoch), 'vendor': self._normalize_string(self.vendor), # bz1519512 handle vendors that aren't utf-8 } def __eq__(self, other): """ Compare one profile to another to determine if anything has changed. """ if not isinstance(self, type(other)): return False if self.name == other.name and \ self.version == other.version and \ self.release == other.release and \ self.arch == other.arch and \ self.epoch == other.epoch and \ self._normalize_string(self.vendor) == self._normalize_string(other.vendor): return True return False def __str__(self): return "<Package: %s %s %s>" % (self.name, self.version, self.release) # added in support of bz1519512, bz1543639 @staticmethod def _normalize_string(value): if type(value) is six.binary_type: return value.decode('utf-8', 'replace') return value class RPMProfile(object): def __init__(self, from_file=None): """ Load the RPM package profile from a given file, or from rpm itself. NOTE: from_file is a file descriptor, not a file name. """ self.packages = [] if from_file: log.debug("Loading RPM profile from file: %s" % from_file.name) json_buffer = from_file.read() pkg_dicts = json.loads(json_buffer) for pkg_dict in pkg_dicts: self.packages.append(Package( name=pkg_dict['name'], version=pkg_dict['version'], release=pkg_dict['release'], arch=pkg_dict['arch'], epoch=pkg_dict['epoch'], vendor=pkg_dict['vendor'] )) else: log.debug("Loading current RPM profile.") ts = rpm.TransactionSet() ts.setVSFlags(-1) installed = ts.dbMatch() self.packages = self._accumulate_profile(installed) @staticmethod def _accumulate_profile(rpm_header_list): """ Accumulates list of installed rpm info @param rpm_header_list: list of rpm headers @type rpm_header_list: list @return: list of package info dicts @rtype: list """ pkg_list = [] for h in rpm_header_list: if h['name'] == "gpg-pubkey": # dbMatch includes imported gpg keys as well # skip these for now as there isn't compelling # reason for server to know this info continue pkg_list.append(Package( name=h['name'], version=h['version'], release=h['release'], arch=h['arch'], epoch=h['epoch'] or 0, vendor=h['vendor'] or None )) return pkg_list def collect(self): """ Returns a list of dicts containing the package info. See 'packages' member on this object for a list of actual objects. @return : list of package info dicts @rtype: list """ pkg_dicts = [] for pkg in self.packages: pkg_dicts.append(pkg.to_dict()) return pkg_dicts def __eq__(self, other): """ Compare one profile to another to determine if anything has changed. """ if not isinstance(self, type(other)): return False # Quickly check if we have a different number of packages for an # easy answer before we start checking everything: if len(self.packages) != len(other.packages): return False for pkg in self.packages: if pkg not in other.packages: return False return True def get_profile(profile_type): """ Returns an instance of a Profile object @param profile_type: profile type @type profile_type: string Returns an instance of a Profile object """ if profile_type not in PROFILE_MAP: raise InvalidProfileType('Could not find profile for type [%s]', profile_type) profile = PROFILE_MAP[profile_type]() return profile # Profile types we support: PROFILE_MAP = { "rpm": RPMProfile, "enabled_repos": EnabledReposProfile, "modulemd": ModulesProfile }