Server IP : 172.24.0.40 / Your IP : 216.73.216.10 Web Server : Apache System : Linux dbweb26.ust.edu.ph 4.18.0-513.5.1.el8_9.x86_64 #1 SMP Fri Sep 29 05:21:10 EDT 2023 x86_64 User : apache ( 48) PHP Version : 8.2.18 Disable Function : NONE MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : OFF | Sudo : ON | Pkexec : ON Directory : /usr/lib64/python3.6/site-packages/rhsm/ |
Upload File : |
from __future__ import print_function, division, absolute_import # # This module has been originally modified and enhanced from Red Hat Update # Agent's config module. # # Copyright (c) 2010 - 2012 Red Hat, Inc. # # This software is licensed to you under the GNU General Public License, # version 2 (GPLv2). There is NO WARRANTY for this software, express or # implied, including the implied warranties of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. You should have received a copy of GPLv2 # along with this software; if not, see # http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt. # # Red Hat trademarks are not licensed under GPLv2. No permission is # granted to use or replicate Red Hat trademarks that are incorporated # in this software or its documentation. # import sys import os import logging from iniparse import SafeConfigParser from iniparse.compat import NoOptionError, InterpolationMissingOptionError, NoSectionError import re import tempfile CONFIG_ENV_VAR = "RHSM_CONFIG" DEFAULT_CONFIG_DIR = "/etc/rhsm/" HOST_CONFIG_DIR = "/etc/rhsm-host/" # symlink inside docker containers DEFAULT_CONFIG_PATH = "%srhsm.conf" % DEFAULT_CONFIG_DIR DEFAULT_PROXY_PORT = "3128" DEFAULT_SERVER_TIMEOUT = "180" # Defaults for connecting to RHSM, used to "reset" the configuration file # if requested by the user: DEFAULT_HOSTNAME = "subscription.rhsm.redhat.com" DEFAULT_PORT = "443" DEFAULT_PREFIX = "/subscription" DEFAULT_CDN_HOSTNAME = "cdn.redhat.com" DEFAULT_CDN_PORT = "443" DEFAULT_CDN_PREFIX = "/" DEFAULT_CA_CERT_DIR = '/etc/rhsm/ca/' DEFAULT_ENT_CERT_DIR = '/etc/pki/entitlement' HOST_ENT_CERT_DIR = '/etc/pki/entitlement-host' SERVER_DEFAULTS = { 'hostname': DEFAULT_HOSTNAME, 'prefix': DEFAULT_PREFIX, 'port': DEFAULT_PORT, 'server_timeout': DEFAULT_SERVER_TIMEOUT, 'insecure': '0', 'ssl_verify_depth': '3', 'proxy_hostname': '', 'proxy_scheme': 'http', 'proxy_user': '', 'proxy_port': '', 'proxy_password': '', 'no_proxy': '', } RHSM_DEFAULTS = { 'baseurl': 'https://' + DEFAULT_CDN_HOSTNAME, 'repomd_gpg_url': '', 'ca_cert_dir': DEFAULT_CA_CERT_DIR, 'repo_ca_cert': '%(ca_cert_dir)sredhat-uep.pem', 'productcertdir': '/etc/pki/product', 'entitlementcertdir': DEFAULT_ENT_CERT_DIR, 'consumercertdir': '/etc/pki/consumer', 'manage_repos': '1', 'full_refresh_on_yum': '0', 'report_package_profile': '1', 'plugindir': '/usr/share/rhsm-plugins', 'pluginconfdir': '/etc/rhsm/pluginconf.d', 'auto_enable_yum_plugins': '1', 'package_profile_on_trans': '0', 'inotify': '1' } RHSMCERTD_DEFAULTS = { 'certcheckinterval': '240', 'autoattachinterval': '1440', 'splay': '1', 'disable': '0', 'auto_registration': '0', 'auto_registration_interval': '60' } LOGGING_DEFAULTS = { 'default_log_level': 'INFO' } # Defaults are applied to each section in the config file. DEFAULTS = { 'server': SERVER_DEFAULTS, 'rhsm': RHSM_DEFAULTS, 'rhsmcertd': RHSMCERTD_DEFAULTS, 'logging': LOGGING_DEFAULTS } log = logging.getLogger(__name__) def in_container(): """Are we running in a docker container or not?""" # In UBI containers (RHEL, CentOS), paths HOST_CONFIG_DIR and HOST_ENT_CERT_DIR # are symlinks to container's directories: # /etc/rhsm-host -> /run/secrets/rhsm/ # /etc/pki/entitlement-host -> /run/secrets/etc-pki-entitlement/ # # The container secrets are bind-mounted to a directory on the host: # /run/secrets (container) -> /usr/share/rhel/secrets (host) # which is specified in '/usr/share/containers/mounts.conf' (= Podman secret). # # The directories inside this host's directory are themselves # symlinks to other host directories populated by subscription-manager: # /usr/share/rhel/secrets/etc-pki-entitlement -> /etc/pki/entitlement # /usr/share/rhel/secrets/redhat.repo -> /etc/yum.repos.d/redhat.repo # /usr/share/rhel/secrets/rhsm -> /etc/rhsm # # If the container secrets exists, the system is considered to be a container: # /etc/rhsm-host/ exists # /etc/pki/entitlement/host/ exists and is not empty if os.path.isdir(HOST_CONFIG_DIR) and ( os.path.isdir(HOST_ENT_CERT_DIR) and any(os.walk(HOST_ENT_CERT_DIR)) ): log.debug(f"Container detected: found directories {HOST_CONFIG_DIR} and {HOST_ENT_CERT_DIR}.") return True return False class RhsmConfigParser(SafeConfigParser): """Config file parser for rhsm configuration.""" # defaults unused but kept to preserve compatibility def __init__(self, config_file=None, defaults=None): self.config_file = config_file SafeConfigParser.__init__(self) self.read(self.config_file) def read(self, file_names=None): """ Read configuration files. When configuration files are not specified, then read self.config_file :param file_names: list of configuration files :return: number of configuration files read """ if file_names is None: return super(RhsmConfigParser, self).read(self.config_file) else: return super(RhsmConfigParser, self).read(file_names) def save(self, config_file=None): """Writes config file to storage.""" rhsm_conf_dir = os.path.dirname(self.config_file) # When /etc/rhsm does not exist, then try to create it if os.path.isdir(rhsm_conf_dir) is False: os.makedirs(rhsm_conf_dir) with tempfile.NamedTemporaryFile(mode="w", dir=rhsm_conf_dir, delete=False) as fo: self.write(fo) fo.flush() try: mode = os.stat(self.config_file).st_mode except IOError: mode = 0o644 os.rename(fo.name, self.config_file) os.chmod(self.config_file, mode) def get(self, section, prop): """Get a value from rhsm config. :param section: config file section :type section: str :param prop: what config propery to find, the config item name :type prop: str :return: The string value of the config item. :rtype: str If config item exists, but is not set, an empty string is return. """ try: return SafeConfigParser.get(self, section, prop) except InterpolationMissingOptionError: # if there is an interpolation error, resolve it raw_val = super(RhsmConfigParser, self).get(section, prop, True) interpolations = re.findall("%\((.*?)\)s", raw_val) changed = False for interp in interpolations: # Defaults aren't interpolated by default, so bake them in as necessary # has_option throws an exception if the section doesn't exist, but at this point we know it does if self.has_option(section, interp): super(RhsmConfigParser, self).set(section, interp, self.get(section, interp)) changed = True if changed: # Now that we have the required values, we can interpolate return self.get(section, prop) # If nothing has been changed (we couldn't fix it) re-raise the exception raise except (NoOptionError, NoSectionError) as er: try: return DEFAULTS[section][prop.lower()] except KeyError: # re-raise the NoOptionError, not the key error raise er def set(self, section, name, value): try: # If the value doesn't exist, or isn't equal, write it if self.get(section, name) != value: raise NoOptionError except: if not self.has_section(section): self.add_section(section) super(RhsmConfigParser, self).set(section, name, value) if section == "logging" and name == "default_log_level": self.is_log_level_valid(value) def is_log_level_valid(self, value, print_warning=True): """ Check if provided default_log_level value is valid or not :param value: value of default_log_level :param print_warning: print warning, when provided value is not valid :return: True, when value is valid. Otherwise return False """ valid = ['CRITICAL', 'ERROR', 'WARNING', 'INFO', 'DEBUG'] if value not in valid + ["NOTSET"]: if print_warning is True: print("Invalid Log Level: {lvl}, setting to INFO for this run.".format(lvl=value), file=sys.stderr) print( "Please use: subscription-manager config --logging.default_log_level=<Log Level> to set " "the default_log_level to a valid value.", file=sys.stderr) valid_str = ", ".join(valid) print("Valid Values: {valid_str}".format(valid_str=valid_str), file=sys.stderr) return False return True def get_int(self, section, prop): """Get a int value from the config. :param section: the config section :type section: str :param prop: the config item name :type prop: str :return: An int cast from the string read from the config. If config item is unset, return None :rtype: int or None :raises ValueError: if the config value found can not be coerced into an int """ value_string = self.get(section, prop) if value_string == "": return None try: value_int = int(value_string) # we could also try to handle port name # strings (ie, 'http') here with getservbyname except (ValueError, TypeError): raise ValueError( "Section: %s, Property: %s - Integer value expected" % (section, prop)) return value_int # Overriding this method to address # http://code.google.com/p/iniparse/issues/detail?id=9 def defaults(self): result = [] for section in DEFAULTS: result += [(key, value) for (key, value) in list(DEFAULTS[section].items())] return dict(result) def sections(self): result = super(RhsmConfigParser, self).sections() for section in DEFAULTS: if section not in result: result.append(section) return result def has_option(self, section, prop): try: self.get(section, prop) return True except NoOptionError: return False def items(self, section): result = {} for key in DEFAULTS.get(section, {}): result[key] = DEFAULTS[section][key] if self.has_section(section): super_result = super(RhsmConfigParser, self).options(section) for key in super_result: if self.get(section, key) and len(self.get(section, key).strip()) > 0: result[key] = self.get(section, key) return list(result.items()) def options(self, section): # This is necessary because with the way we handle defaults, parser.has_section('xyz') # will return True if 'xyz' exists only in the defaults but parser.options('xyz') # will throw an exception. items = set() for key in DEFAULTS.get(section, {}): items.add(key) if self.has_section(section): super_result = super(RhsmConfigParser, self).options(section) items.update(super_result) return list(items) def is_default(self, section, prop, value): if self.get_default(section, prop) == value: return True return False def has_default(self, section, prop): return section in DEFAULTS and prop.lower() in DEFAULTS[section] def get_default(self, section, prop): if self.has_default(section, prop.lower()): return DEFAULTS[section][prop.lower()] return None class RhsmHostConfigParser(RhsmConfigParser): """ Sub-class of config parser automatically loaded when we detect that we're running in a container environment. Host config is shared with containers as /etc/rhsm-host. However the rhsm.conf within will still be referencing /etc/rhsm for a couple properties. (ca_cert_dir, repo_ca_cert) Instead we load config file normally, and assume to replace occurrences of /etc/rhsm with /etc/rhsm-host in these properties. A similar adjustment is necessary for /etc/pki/entitlement-host if present. """ def __init__(self, config_file=None, defaults=None): RhsmConfigParser.__init__(self, config_file, defaults) # Override the ca_cert_dir and repo_ca_cert if necessary: ca_cert_dir = self.get('rhsm', 'ca_cert_dir') repo_ca_cert = self.get('rhsm', 'repo_ca_cert') ca_cert_dir = ca_cert_dir.replace(DEFAULT_CONFIG_DIR, HOST_CONFIG_DIR) repo_ca_cert = repo_ca_cert.replace(DEFAULT_CONFIG_DIR, HOST_CONFIG_DIR) self.set('rhsm', 'ca_cert_dir', ca_cert_dir) self.set('rhsm', 'repo_ca_cert', repo_ca_cert) # Similarly if /etc/pki/entitlement-host exists, override this too. # If for some reason the host config is pointing to another directory # we leave the config setting alone, our tooling isn't going to be # able to handle it anyhow. if os.path.exists(HOST_ENT_CERT_DIR): ent_cert_dir = self.get('rhsm', 'entitlementcertdir') if ent_cert_dir == DEFAULT_ENT_CERT_DIR or \ ent_cert_dir == DEFAULT_ENT_CERT_DIR + "/": ent_cert_dir = HOST_ENT_CERT_DIR self.set('rhsm', 'entitlementcertdir', ent_cert_dir) def get_config_parser(): """ Get an :class:`RhsmConfig` instance Will use the first config file defined in the following list: - /etc/rhsm-host/rhsm.conf if it exists (only in containers) - /etc/rhsm/rhsm.conf """ global CFG try: CFG = CFG except NameError: CFG = None if CFG is None: # Load alternate config file implementation if we detect that we're # running in a container. if in_container(): CFG = RhsmHostConfigParser( config_file=os.path.join(HOST_CONFIG_DIR, 'rhsm.conf')) else: CFG = RhsmConfigParser(config_file=DEFAULT_CONFIG_PATH) return CFG # Deprecated but still in use by other applications def initConfig(configFile=None): return get_config_parser()