Server IP : 172.24.0.40 / Your IP : 216.73.216.10 Web Server : Apache System : Linux dbweb26.ust.edu.ph 4.18.0-513.5.1.el8_9.x86_64 #1 SMP Fri Sep 29 05:21:10 EDT 2023 x86_64 User : apache ( 48) PHP Version : 8.2.18 Disable Function : NONE MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : OFF | Sudo : ON | Pkexec : ON Directory : /lib64/python3.6/site-packages/subscription_manager/ |
Upload File : |
from __future__ import print_function, division, absolute_import # # Copyright (c) 2014 Red Hat, Inc. # # This software is licensed to you under the GNU General Public License, # version 2 (GPLv2). There is NO WARRANTY for this software, express or # implied, including the implied warranties of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. You should have received a copy of GPLv2 # along with this software; if not, see # http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt. # # Red Hat trademarks are not licensed under GPLv2. No permission is # granted to use or replicate Red Hat trademarks that are incorporated # in this software or its documentation. # import logging import socket from rhsm.certificate import Key, create_from_pem from rhsm.certificate2 import CONTENT_ACCESS_CERT_TYPE from subscription_manager.certdirectory import Writer from subscription_manager import certlib from subscription_manager import content_action_client from subscription_manager import utils from subscription_manager.injection import IDENTITY, require from subscription_manager import rhelentbranding import subscription_manager.injection as inj from subscription_manager.i18n import ungettext, ugettext as _ log = logging.getLogger(__name__) CONTENT_ACCESS_CERT_CAPABILITY = "org_level_content_access" class EntCertActionInvoker(certlib.BaseActionInvoker): """Invoker for entitlement certificate updating actions.""" def _do_update(self): action = EntCertUpdateAction() return action.perform() # this guy is an oddball # NOTE: this lib and EntCertDeleteAction are currently # unused. Intention is to replace managerlib.clean_all_data # with a CertActionClient.delete invocation class EntCertDeleteLib(object): """Invoker for entitlement certificate delete actions.""" def __init__(self, serial_numbers=None, ent_dir=None): self.locker = certlib.Locker() self.ent_dir = ent_dir def delete(self): self.locker.run(self._do_delete) def _do_delete(self): action = EntCertDeleteAction(ent_dir=self.ent_dir, serial_numbers=self.serial_numbers) return action.perform() # FIXME: currently unused class EntCertDeleteAction(object): """Action for deleting all entitlement certs.""" def __init__(self, ent_dir=None): self.ent_dir = ent_dir def perform(self, serial_numbers): for sn in serial_numbers: cert = self.ent_dir.find(sn) if cert is None: continue cert.delete() return self class EntCertUpdateAction(object): """Action for syncing entitlement certificates. EntCertUpdateAction is used to sync entitlement certs based on currently entitlement status. An EntCertUpdateReport is returned containing information about the changes that were applied. install() and delete() methods are expected to update self.report. New and updated ent certs are installed via a EntitlementCertBundlesInstaller. Expired or extraneous entitlement certs are deleted. If there are changes applied to the EntitltementDirectory, repo_hook() and branding_hook() are triggered. Certificates will have been updated, and written to disk, and EntitlementDirectory refresh before these hooks are called. The injected self.uep is used to query RHSM API for a list of expected entitlement certificate serial numbers. If local system is missing certs matching those serial numbers, the API is queried for the list of serial numbers to update. rogue: ent certs installed on system but not known by RHSM API. missing: ent certs RHSM API knows, but are not installed on system. """ def __init__(self, report=None): self.cp_provider = inj.require(inj.CP_PROVIDER) self.uep = self.cp_provider.get_consumer_auth_cp() self.ent_dir = inj.require(inj.ENT_DIR) self.identity = require(IDENTITY) self.report = EntCertUpdateReport() self.content_access_cache = inj.require(inj.CONTENT_ACCESS_CACHE) # NOTE: this is slightly at odds with the manual cert import # path, manual import certs wont get a 'report', etc def perform(self): local = self._get_local_serials() try: expected = self._get_expected_serials() except socket.error as ex: log.exception(ex) log.error('Cannot modify subscriptions while disconnected') raise Disconnected() cert_changed = False missing_serials = self._find_missing_serials(local, expected) rogue_serials = self._find_rogue_serials(local, expected) self.delete(rogue_serials) installed_serials = self.install(missing_serials) log.info('certs updated:\n%s', self.report) self.syslog_results() # We call EntCertlibActionInvoker.update() solo from # the 'attach' cli instead of an ActionClient. So # we need to refresh the ent_dir object before calling # content updating actions. self.ent_dir.refresh() if missing_serials or rogue_serials: cert_changed = True if self.uep.has_capability(CONTENT_ACCESS_CERT_CAPABILITY): content_access_certs = self._find_content_access_certs() if len(content_access_certs) > 0: # This addresses BZs: 1448855, 1450862 obsolete_certs = [] for cont_access_cert in content_access_certs: if cont_access_cert.serial in installed_serials: continue if cont_access_cert.serial not in expected: obsolete_certs.append(cont_access_cert) if len(obsolete_certs) > 0: log.info('Deleting obsolete content access certificate') self.delete(obsolete_certs) update_data = self.content_access_hook() if update_data is not None: cert_changed = True if cert_changed: self.repo_hook() # NOTE: Since we have the yum repos defined here now # we could update product id certs here, or install # them if they are needed, but missing. That way the # branding installs could be more accurate. # reload certs and update branding self.branding_hook() # if we want the full report, we can get it, but # this makes CertLib.update() have same sig as reset # of *Lib.update return self.report def install(self, missing_serials): """Install any missing entitlement certificates.""" cert_bundles = self.get_certificates_by_serial_list(missing_serials) ent_cert_bundles_installer = EntitlementCertBundlesInstaller(self.report) return ent_cert_bundles_installer.install(cert_bundles) def _find_content_access_certs(self): certs = self.ent_dir.list_with_content_access() return [cert for cert in certs if cert.entitlement_type == CONTENT_ACCESS_CERT_TYPE] def content_access_hook(self): if not self.uep.has_capability(CONTENT_ACCESS_CERT_CAPABILITY): return # do nothing if we cannot check for content access cert updates content_access_certs = self._find_content_access_certs() update_data = None if len(content_access_certs) > 0: update_data = self.content_access_cache.check_for_update() for content_access_cert in content_access_certs: self.content_access_cache.update_cert(content_access_cert, update_data) if len(content_access_certs) == 0 and self.content_access_cache.exists(): self.content_access_cache.remove() if update_data is not None: self.ent_dir.refresh() return update_data def branding_hook(self): """Update branding info based on entitlement cert changes.""" # RHELBrandsInstaller will use latest ent_dir contents brands_installer = rhelentbranding.RHELBrandsInstaller() brands_installer.install() def repo_hook(self): """Update content repos.""" log.debug("entcerlibaction.repo_hook") try: # NOTE: this may need a lock content_action = content_action_client.ContentActionClient() content_action.update() except Exception as e: log.debug(e) log.debug("Failed to update repos") def _find_missing_serials(self, local, expected): """ Find serials from the server we do not have locally. """ missing = [sn for sn in expected if sn not in local] return missing def _find_rogue_serials(self, local, expected): """Find serials we have locally but are not on the server.""" rogue = [local[sn] for sn in local if sn not in expected] return rogue def syslog_results(self): """Write generated EntCertUpdateReport info to syslog.""" for cert in self.report.added: utils.system_log("Added subscription for '%s' contract '%s'" % (cert.order.name, cert.order.contract)) for product in cert.products: utils.system_log("Added subscription for product '%s'" % (product.name)) for cert in self.report.rogue: utils.system_log("Removed subscription for '%s' contract '%s'" % (cert.order.name, cert.order.contract)) for product in cert.products: utils.system_log("Removed subscription for product '%s'" % (product.name)) def _get_local_serials(self): local = {} # certificates in grace period were being renamed everytime. # this makes sure we don't try to re-write certificates in # grace period # XXX since we don't use grace period, this might not be needed self.ent_dir.refresh() ent_certs = self.ent_dir.list() + self.ent_dir.list_with_content_access() ent_certs = list(set(ent_certs)) for valid in ent_certs: sn = valid.serial self.report.valid.append(sn) local[sn] = valid return local def get_certificate_serials_list(self): """Query RHSM API for list of expected ent cert serial numbers.""" results = [] # if there is no UEP object, short circuit if self.uep is None: return results identity = inj.require(inj.IDENTITY) if not identity.is_valid(): # We can get here on unregister, with no id or ent certs or repos, # but don't want to raise an exception that would be logged. So # empty result set is returned. return results reply = self.uep.getCertificateSerials(identity.uuid) for d in reply: sn = d['serial'] results.append(sn) return results def get_certificates_by_serial_list(self, sn_list): """Fetch a list of entitlement certificates specified by a list of serial numbers.""" result = [] if sn_list: sn_list = [str(sn) for sn in sn_list] # NOTE: use injected IDENTITY, need to validate this # handles disconnected errors properly reply = self.uep.getCertificates(self.identity.uuid, serials=sn_list) for cert in reply: result.append(cert) return result def _get_expected_serials(self): exp = self.get_certificate_serials_list() self.report.expected = exp return exp def delete(self, rogue): for cert in rogue: try: cert.delete() self.report.rogue.append(cert) except OSError as er: log.exception(er) log.warn("Failed to delete cert") # If we just deleted certs, we need to refresh the now stale # entitlement directory before we go to delete expired certs. rogue_count = len(self.report.rogue) if rogue_count > 0: print(ungettext("%s local certificate has been deleted.", "%s local certificates have been deleted.", rogue_count) % rogue_count) self.ent_dir.refresh() class EntitlementCertBundlesInstaller(object): """Install a list of entitlement cert bundles. pre_install() is triggered before any of the ent cert bundles are installed. post_install() is triggered after all of the ent cert bundles are installed. """ def __init__(self, report): self.exceptions = [] self.report = report def install(self, cert_bundles): """Fetch entitliement certs, install them, and update the report.""" bundle_installer = EntitlementCertBundleInstaller(self.report) installed_serials = [] for cert_bundle in cert_bundles: cert_serial = bundle_installer.install(cert_bundle) if cert_serial is not None: installed_serials.append(cert_serial) self.exceptions = bundle_installer.exceptions self.post_install() return installed_serials # TODO: add subman plugin slot,conduit,hooks def pre_install(self): """Hook called before any ent cert bundles are installed.""" log.debug("cert bundles pre_install") def post_install(self): """Hook called after all cert bundles have been installed.""" for installed in self._get_installed(): log.debug("cert bundles post_install: %s" % installed) def get_installed(self): """Return a list of the ent cert bundles that were installed.""" return self._get_installed() def _get_installed(self): """Return the bundles installed based on this impl's EntCertUpdateReport.""" return self.report.added class EntitlementCertBundleInstaller(object): """Install an entitlement cert bundle (cert/key). Split a bundle into an certificate.EntitlementCertificate and a certificate.Key, and persist them. pre_install() is called before the cert bundle is installed. post_install() is called after the cert bundle is installed. Note that EntitlementCertBundlesInstaller's pre and post install hooks are before and after installing the full list of ent cert bundles, while this is pre/post each ent cert bundle. """ def __init__(self, report): self.exceptions = [] self.report = report def install(self, bundle): """Persist an ent cert and it's key after splitting it from the bundle.""" self.pre_install(bundle) cert_bundle_writer = Writer() cert_serial = None try: key, cert = self.build_cert(bundle) cert_bundle_writer.write(key, cert) self.report.added.append(cert) cert_serial = cert.serial except Exception as e: self.install_exception(bundle, e) self.post_install(bundle) return cert_serial # TODO: add subman plugin, slot, and conduit def pre_install(self, bundle): """Hook called before an ent cert bundle is installed.""" log.debug("Ent cert bundle pre_install") # should probably be in python-rhsm/certificate def build_cert(self, bundle): """Split a cert bundle into a EntitlementCertificate and a Key.""" keypem = bundle['key'] crtpem = bundle['cert'] key = Key(keypem) cert = create_from_pem(crtpem) return (key, cert) def install_exception(self, bundle, exception): """Log exceptions and add them to the EntCertUpdateReport.""" log.exception(exception) log.error('Bundle not loaded:\n%s\n%s', bundle, exception) self.report._exceptions.append(exception) def post_install(self, bundle): """Hook called after an ent cert bundle is installed.""" log.debug("ent cert bundle post_install") class Disconnected(Exception): pass class EntCertUpdateReport(certlib.ActionReport): """Report entitlement cert update action changes.""" name = "Entitlement Cert Updates" def __init__(self): self.valid = [] self.expected = [] self.added = [] self.rogue = [] self._exceptions = [] def updates(self): """Total number of ent certs installed and deleted.""" return (len(self.added) + len(self.rogue)) # need an ExceptionsReport? # FIXME: needs to be properties def exceptions(self): return self._exceptions def write(self, s, title, certificates): """Generate a report stanza for a list of certs.""" indent = ' ' s.append(title) if certificates: for c in certificates: products = c.products if not products: s.append('%s[sn:%d (%s) @ %s]' % (indent, c.serial, c.order.name, c.path)) for product in products: s.append('%s[sn:%d (%s,) @ %s]' % (indent, c.serial, product.name, c.path)) else: s.append('%s<NONE>' % indent) def __str__(self): """__str__ of report. Used in rhsm and rhsmcertd logging.""" s = [] s.append(_('Total updates: %d') % self.updates()) s.append(_('Found (local) serial# %s') % self.valid) s.append(_('Expected (UEP) serial# %s') % self.expected) self.write(s, _('Added (new)'), self.added) self.write(s, _('Deleted (rogue):'), self.rogue) return '\n'.join(s)