Server IP : 172.24.0.40 / Your IP : 216.73.216.10 Web Server : Apache System : Linux dbweb26.ust.edu.ph 4.18.0-513.5.1.el8_9.x86_64 #1 SMP Fri Sep 29 05:21:10 EDT 2023 x86_64 User : apache ( 48) PHP Version : 8.2.18 Disable Function : NONE MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : OFF | Sudo : ON | Pkexec : ON Directory : /usr/lib64/python3.6/site-packages/subscription_manager/ |
Upload File : |
from __future__ import print_function, division, absolute_import # # Subscription manager command line utility. This script is a modified version of # cp_client.py from candlepin scripts # # Copyright (c) 2012 Red Hat, Inc. # # This software is licensed to you under the GNU General Public License, # version 2 (GPLv2). There is NO WARRANTY for this software, express or # implied, including the implied warranties of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. You should have received a copy of GPLv2 # along with this software; if not, see # http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt. # # Red Hat trademarks are not licensed under GPLv2. No permission is # granted to use or replicate Red Hat trademarks that are incorporated # in this software or its documentation. # import logging import socket import six import six.moves.http_client from rhsm.https import ssl from rhsm.connection import NoValidEntitlement import rhsm.config from subscription_manager import injection as inj from subscription_manager import listing from subscription_manager import rhelproduct from subscription_manager.i18n import ugettext as _ log = logging.getLogger(__name__) cfg = rhsm.config.get_config_parser() class MultipleReleaseProductsError(ValueError): def __init__(self, certificates): self.certificates = certificates self.certificate_paths = ", ".join([certificate.path for certificate in certificates]) super(ValueError, self).__init__(("More than one release product certificate installed. Certificate paths: %s" % self.certificate_paths)) def translated_message(self): return (_("Error: More than one release product certificate installed. Certificate paths: %s") % ", ".join([certificate.path for certificate in self.certificates])) class ContentConnectionProvider(object): def __init__(self): pass class ReleaseBackend(object): def get_releases(self): provider = self._get_release_version_provider() return provider.get_releases() def _get_release_version_provider(self): release_provider = ApiReleaseVersionProvider() if release_provider.api_supported(): return release_provider return CdnReleaseVersionProvider() class ApiReleaseVersionProvider(object): def __init__(self): self.cp_provider = inj.require(inj.CP_PROVIDER) self.identity = inj.require(inj.IDENTITY) def api_supported(self): return self._conn().supports_resource("available_releases") def get_releases(self): return self._conn().getAvailableReleases(self.identity.uuid) def _conn(self): return self.cp_provider.get_consumer_auth_cp() class CdnReleaseVersionProvider(object): def __init__(self): self.entitlement_dir = inj.require(inj.ENT_DIR) self.product_dir = inj.require(inj.PROD_DIR) self.cp_provider = inj.require(inj.CP_PROVIDER) self.content_connection = self.cp_provider.get_content_connection() def get_releases(self): # cdn base url # Find the rhel products release_products = [] certificates = set() installed_products = self.product_dir.get_installed_products() for product_hash in installed_products: product_cert = installed_products[product_hash] products = product_cert.products for product in products: rhel_matcher = rhelproduct.RHELProductMatcher(product) if rhel_matcher.is_rhel(): release_products.append(product) certificates.add(product_cert) if len(release_products) == 0: log.debug("No products with RHEL product tags found") return [] elif len(release_products) > 1: raise MultipleReleaseProductsError(certificates=certificates) # Note: only release_products with one item can pass previous if-elif release_product = release_products[0] entitlements = self.entitlement_dir.list_for_product(release_product.id) # When there is SCA entitlement certificate, then add this cert to # the list too. See: https://bugzilla.redhat.com/show_bug.cgi?id=1924921 sca_entitlements = self.entitlement_dir.list_with_sca_mode() entitlements.extend(sca_entitlements) listings = [] ent_cert_key_pairs = set() for entitlement in entitlements: contents = entitlement.content for content in contents: # ignore content that is not enabled # see bz #820639 if not content.enabled: continue if self._is_correct_rhel(release_product.provided_tags, content.required_tags): listing_path = self._build_listing_path(content.url) listings.append(listing_path) ent_cert_key_pairs.add((entitlement.path, entitlement.key_path())) # FIXME: not sure how to get the "base" content if we have multiple # entitlements for a product # for a entitlement, grant the corresponding entitlement cert # use it for this connection # hmm. We are really only supposed to have one product # with one content with one listing file. We shall see. releases = [] listings = sorted(set(listings)) for listing_path in listings: try: data = self.content_connection.get_versions( path=listing_path, cert_key_pairs=ent_cert_key_pairs ) except (socket.error, six.moves.http_client.HTTPException, ssl.SSLError, NoValidEntitlement) as e: # content connection doesn't handle any exceptions # and the code that invokes this doesn't either, so # swallow them here. log.exception(e) continue # any non 200 response on fetching the release version # listing file returns a None here if not data: continue ver_listing = listing.ListingFile(data=str(data)) # ver_listing.releases can be empty releases = releases + ver_listing.get_releases() releases_set = sorted(set(releases)) return releases_set def _build_listing_path(self, content_url): listing_parts = content_url.split('$releasever', 1) listing_base = listing_parts[0] # FIXME: cleanup paths ("//"'s, etc) return u"%s/listing" % listing_base # FIXME(khowell): ensure that my changes here don't break earlier fix # require tags provided by installed products? def _is_correct_rhel(self, product_tags, content_tags): # easy to pass a string instead of a list assert not isinstance(product_tags, six.string_types) assert not isinstance(content_tags, six.string_types) for product_tag in product_tags: # we are comparing the lists to see if they # have a matching rhel-# product_split = product_tag.split('-', 2) if product_split[0] == "rhel": # look for match in content tags for content_tag in content_tags: content_split = content_tag.split('-', 2) # ignore non rhel content tags if content_split[0] != "rhel": continue # exact match if product_tag == content_tag: return True # is this content for a base of this variant if product_tag.startswith(content_tag): return True # else, we don't match, keep looking log.debug("Ignoring content with tags [%s] because it does not match installed product tags [%s]" % ( ','.join(content_tags), ','.join(product_tags) )) return False