403Webshell
Server IP : 172.24.0.40  /  Your IP : 216.73.216.10
Web Server : Apache
System : Linux dbweb26.ust.edu.ph 4.18.0-513.5.1.el8_9.x86_64 #1 SMP Fri Sep 29 05:21:10 EDT 2023 x86_64
User : apache ( 48)
PHP Version : 8.2.18
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : OFF  |  Sudo : ON  |  Pkexec : ON
Directory :  /lib64/python3.6/site-packages/rhsmlib/services/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /lib64/python3.6/site-packages/rhsmlib/services/entitlement.py
from __future__ import print_function, division, absolute_import

# Copyright (c) 2017 Red Hat, Inc.
#
# This software is licensed to you under the GNU General Public License,
# version 2 (GPLv2). There is NO WARRANTY for this software, express or
# implied, including the implied warranties of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. You should have received a copy of GPLv2
# along with this software; if not, see
# http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt.
#
# Red Hat trademarks are not licensed under GPLv2. No permission is
# granted to use or replicate Red Hat trademarks that are incorporated
# in this software or its documentation.

import collections
import datetime
import logging
import six
import time

from subscription_manager import injection as inj
from subscription_manager.i18n import ugettext as _
from subscription_manager import managerlib, utils
from subscription_manager.entcertlib import EntCertActionInvoker

from rhsm import certificate
from rhsmlib.services import exceptions, products
import rhsm.connection as connection

log = logging.getLogger(__name__)


class EntitlementService(object):
    def __init__(self, cp=None):
        self.cp = cp
        self.identity = inj.require(inj.IDENTITY)
        self.product_dir = inj.require(inj.PROD_DIR)
        self.entitlement_dir = inj.require(inj.ENT_DIR)
        self.entcertlib = EntCertActionInvoker()

    @classmethod
    def parse_date(cls, on_date):
        """
        Return new datetime parsed from date
        :param on_date: String representing date
        :return It returns datetime.datime structure representing date
        """
        try:
            on_date = datetime.datetime.strptime(on_date, '%Y-%m-%d')
        except ValueError:
            raise ValueError(
                _("Date entered is invalid. Date should be in YYYY-MM-DD format (example: ") +
                time.strftime("%Y-%m-%d", time.localtime()) + " )"
            )
        if on_date.date() < datetime.datetime.now().date():
            raise ValueError(_("Past dates are not allowed"))
        return on_date

    def get_status(self, on_date=None, force=False):
        sorter = inj.require(inj.CERT_SORTER, on_date)
        # When singleton CertSorter was created with different argument on_date, then
        # it is necessary to update corresponding attribute in object (dependency
        # injection doesn't do it automatically).
        if sorter.on_date != on_date:
            sorter.on_date = on_date

        # Force reload status from the server to be sure that we get valid status for new date.
        # It is necessary to do it for rhsm.service, because it can run for very long time without
        # restart.
        if force is True:
            log.debug('Deleting cache entitlement status cache')
            status_cache = inj.require(inj.ENTITLEMENT_STATUS_CACHE)
            status_cache.server_status = None
            status_cache.delete_cache()

        sorter.load()

        if self.identity.is_valid():
            overall_status = sorter.get_system_status()
            overall_status_id = sorter.get_system_status_id()
            reasons = sorter.reasons.get_name_message_map()
            reason_ids = sorter.reasons.get_reason_ids_map()
            valid = sorter.is_valid()
            status = {
                'status': overall_status,
                'status_id': overall_status_id,
                'reasons': reasons,
                'reason_ids': reason_ids,
                'valid': valid
            }
        else:
            status = {
                'status': _('Unknown'),
                'status_id': 'unknown',
                'reasons': {},
                'reason_ids': {},
                'valid': False
            }
        log.debug('entitlement status: %s' % str(status))
        return status

    def get_pools(self, pool_subsets=None, matches=None, pool_only=None, match_installed=None,
                  no_overlap=None, service_level=None, show_all=None, on_date=None, future=None,
                  after_date=None, page=0, items_per_page=0, **kwargs):
        # We accept a **kwargs argument so that the DBus object can pass whatever dictionary it receives
        # via keyword expansion.
        if kwargs:
            raise exceptions.ValidationError(_("Unknown arguments: %s") % kwargs.keys())

        if isinstance(pool_subsets, six.string_types):
            pool_subsets = [pool_subsets]

        # [] or None means look at all pools
        if not pool_subsets:
            pool_subsets = ['installed', 'consumed', 'available']

        options = {
            'pool_subsets': pool_subsets,
            'matches': matches,
            'pool_only': pool_only,
            'match_installed': match_installed,
            'no_overlap': no_overlap,
            'service_level': service_level,
            'show_all': show_all,
            'on_date': on_date,
            'future': future,
            'after_date': after_date,
        }
        self.validate_options(options)
        results = {}
        if 'installed' in pool_subsets:
            installed = products.InstalledProducts(self.cp).list(matches, iso_dates=True)
            results['installed'] = [x._asdict() for x in installed]
        if 'consumed' in pool_subsets:
            consumed = self.get_consumed_product_pools(service_level=service_level, matches=matches,
                                                       iso_dates=True)
            if pool_only:
                results['consumed'] = [x._asdict()['pool_id'] for x in consumed]
            else:
                results['consumed'] = [x._asdict() for x in consumed]
        if 'available' in pool_subsets:
            available = self.get_available_pools(
                show_all=show_all,
                on_date=on_date,
                no_overlap=no_overlap,
                match_installed=match_installed,
                matches=matches,
                service_level=service_level,
                future=future,
                after_date=after_date,
                page=int(page),
                items_per_page=int(items_per_page),
                iso_dates=True
            )
            if pool_only:
                results['available'] = [x['id'] for x in available]
            else:
                results['available'] = available

        return results

    def get_consumed_product_pools(self, service_level=None, matches=None, iso_dates=False):
        # Use a named tuple so that the result can be unpacked into other functions
        OldConsumedStatus = collections.namedtuple('OldConsumedStatus', [
            'subscription_name',
            'provides',
            'sku',
            'contract',
            'account',
            'serial',
            'pool_id',
            'provides_management',
            'active',
            'quantity_used',
            'service_type',
            'service_level',
            'status_details',
            'subscription_type',
            'starts',
            'ends',
            'system_type',
        ])
        # Use a named tuple so that the result can be unpacked into other functions
        ConsumedStatus = collections.namedtuple('ConsumedStatus', [
            'subscription_name',
            'provides',
            'sku',
            'contract',
            'account',
            'serial',
            'pool_id',
            'provides_management',
            'active',
            'quantity_used',
            'service_type',
            'roles',
            'service_level',
            'usage',
            'addons',
            'status_details',
            'subscription_type',
            'starts',
            'ends',
            'system_type',
        ])
        sorter = inj.require(inj.CERT_SORTER)
        cert_reasons_map = sorter.reasons.get_subscription_reasons_map()
        pooltype_cache = inj.require(inj.POOLTYPE_CACHE)

        consumed_statuses = []
        # FIXME: the cache of CertificateDirectory should be smart enough and refreshing
        # should not be necessary. When new certificate is installed/deleted and rhsm-service
        # is running, then list of certificate is not automatically refreshed ATM.
        self.entitlement_dir.refresh()
        certs = self.entitlement_dir.list()
        cert_filter = utils.EntitlementCertificateFilter(filter_string=matches, service_level=service_level)

        if service_level is not None or matches is not None:
            certs = list(filter(cert_filter.match, certs))

        if iso_dates:
            date_formatter = managerlib.format_iso8601_date
        else:
            date_formatter = managerlib.format_date

        # Now we need to transform the EntitlementCertificate object into
        # something JSON-like for consumption
        for cert in certs:
            # for some certs, order can be empty
            # so we default the values and populate them if
            # they exist. BZ974587
            name = ""
            sku = ""
            contract = ""
            account = ""
            quantity_used = ""
            service_type = ""
            roles = ""
            service_level = ""
            usage = ""
            addons = ""
            system_type = ""
            provides_management = "No"

            order = cert.order

            if order:
                service_type = order.service_type or ""
                service_level = order.service_level or ""
                if cert.version.major >= 3 and cert.version.minor >= 4:
                    roles = order.roles or ""
                    usage = order.usage or ""
                    addons = order.addons or ""
                else:
                    roles = None
                    usage = None
                    addons = None
                name = order.name
                sku = order.sku
                contract = order.contract or ""
                account = order.account or ""
                quantity_used = order.quantity_used
                if order.virt_only:
                    system_type = _("Virtual")
                else:
                    system_type = _("Physical")

                if order.provides_management:
                    provides_management = _("Yes")
                else:
                    provides_management = _("No")

            pool_id = _("Not Available")
            if hasattr(cert.pool, "id"):
                pool_id = cert.pool.id

            provided_products = {p.id: p.name for p in cert.products}

            reasons = []
            pool_type = ''

            if inj.require(inj.CERT_SORTER).are_reasons_supported():
                if cert.subject and 'CN' in cert.subject:
                    if cert.subject['CN'] in cert_reasons_map:
                        reasons = cert_reasons_map[cert.subject['CN']]
                    pool_type = pooltype_cache.get(pool_id)

                # 1180400: Status details is empty when GUI is not
                if not reasons:
                    if cert in sorter.valid_entitlement_certs:
                        reasons.append(_("Subscription is current"))
                    else:
                        if cert.valid_range.end() < datetime.datetime.now(certificate.GMT()):
                            reasons.append(_("Subscription is expired"))
                        else:
                            reasons.append(_("Subscription has not begun"))
            else:
                reasons.append(_("Subscription management service doesn't support Status Details."))

            if roles is None and usage is None and addons is None:
                consumed_statuses.append(OldConsumedStatus(
                    name,
                    provided_products,
                    sku,
                    contract,
                    account,
                    cert.serial,
                    pool_id,
                    provides_management,
                    cert.is_valid(),
                    quantity_used,
                    service_type,
                    service_level,
                    reasons,
                    pool_type,
                    date_formatter(cert.valid_range.begin()),
                    date_formatter(cert.valid_range.end()),
                    system_type))
            else:
                consumed_statuses.append(ConsumedStatus(
                    name,
                    provided_products,
                    sku,
                    contract,
                    account,
                    cert.serial,
                    pool_id,
                    provides_management,
                    cert.is_valid(),
                    quantity_used,
                    service_type,
                    roles,
                    service_level,
                    usage,
                    addons,
                    reasons,
                    pool_type,
                    date_formatter(cert.valid_range.begin()),
                    date_formatter(cert.valid_range.end()),
                    system_type))
        return consumed_statuses

    def get_available_pools(self, show_all=None, on_date=None, no_overlap=None,
                            match_installed=None, matches=None, service_level=None, future=None,
                            after_date=None, page=0, items_per_page=0, iso_dates=False):
        """
        Get list of available pools
        :param show_all:
        :param on_date:
        :param no_overlap:
        :param match_installed:
        :param matches:
        :param service_level:
        :param future:
        :param after_date:
        :param page:
        :param items_per_page:
        :param iso_dates:
        :return:
        """

        # Values used for REST API calls and caching are bigger, because it makes using of cache and
        # API more efficient
        if show_all is not True:
            _page = int(page / 4)
            _items_per_page = 4 * items_per_page
        else:
            page = items_per_page = 0
            _page = _items_per_page = 0

        filter_options = {
            "show_all": show_all,
            "on_date": on_date,
            "no_overlap": no_overlap,
            "match_installed": match_installed,
            "matches": matches,
            "service_level": service_level,
            "future": future,
            "after_date": after_date,
            "page": _page,
            "items_per_page": _items_per_page
        }

        # Try to get identity
        identity = inj.require(inj.IDENTITY)

        # Try to get available pools from cache
        cache = inj.require(inj.AVAILABLE_ENTITLEMENT_CACHE)
        available_pools = cache.get_not_obsolete_data(identity, filter_options)

        if len(available_pools) == 0:
            available_pools = managerlib.get_available_entitlements(
                get_all=show_all,
                active_on=on_date,
                overlapping=no_overlap,
                uninstalled=match_installed,
                filter_string=matches,
                future=future,
                after_date=after_date,
                page=_page,
                items_per_page=_items_per_page,
                iso_dates=iso_dates
            )

            timeout = cache.timeout()

            data = {
                identity.uuid: {
                    'filter_options': filter_options,
                    'pools': available_pools,
                    'timeout': time.time() + timeout
                }
            }
            cache.available_entitlements = data
            cache.write_cache()

        def filter_pool_by_service_level(pool_data):
            pool_level = ""
            if pool_data['service_level']:
                pool_level = pool_data['service_level']
            return service_level.lower() == pool_level.lower()

        if service_level is not None:
            available_pools = list(filter(filter_pool_by_service_level, available_pools))

        # When pagination result of available pools is requested, then reduce too long list
        if items_per_page > 0:
            # Reduce too long list to requested "page"
            lo_idx = (page * items_per_page) % _items_per_page
            hi_idx = ((page + 1) * items_per_page) % _items_per_page
            if hi_idx == 0:
                hi_idx = _items_per_page

            # Own filtering of the list
            available_pools = available_pools[lo_idx:hi_idx]

            # Add requested page and number of items per page to the result too
            for item in available_pools:
                item["page"] = page
                item["items_per_page"] = items_per_page

        return available_pools

    def validate_options(self, options):
        if not set(['installed', 'consumed', 'available']).issuperset(options['pool_subsets']):
            raise exceptions.ValidationError(
                _('Error: invalid listing type provided.  Only "installed", '
                  '"consumed", or "available" are allowed')
            )
        if options['show_all'] and 'available' not in options['pool_subsets']:
            raise exceptions.ValidationError(
                _("Error: --all is only applicable with --available")
            )
        elif options['on_date'] and 'available' not in options['pool_subsets']:
            raise exceptions.ValidationError(
                _("Error: --ondate is only applicable with --available")
            )
        elif options['service_level'] is not None \
                and not set(['consumed', 'available']).intersection(options['pool_subsets']):
            raise exceptions.ValidationError(
                _("Error: --servicelevel is only applicable with --available or --consumed")
            )
        elif options['match_installed'] and 'available' not in options['pool_subsets']:
            raise exceptions.ValidationError(
                _("Error: --match-installed is only applicable with --available")
            )
        elif options['no_overlap'] and 'available' not in options['pool_subsets']:
            raise exceptions.ValidationError(
                _("Error: --no-overlap is only applicable with --available")
            )
        elif options['pool_only'] \
                and not set(['consumed', 'available']).intersection(options['pool_subsets']):
            raise exceptions.ValidationError(
                _("Error: --pool-only is only applicable with --available and/or --consumed")
            )
        elif not self.identity.is_valid() and 'available' in options['pool_subsets']:
            raise exceptions.ValidationError(_("Error: this system is not registered"))

    def _unbind_ids(self, unbind_method, consumer_uuid, ids):
        """
        Method for unbinding entitlements
        :param unbind_method: unbindByPoolId or unbindBySerial
        :param consumer_uuid: UUID of consumer
        :param ids: List of serials or pool_ids
        :return: Tuple of two lists containing unbinded and not-unbinded subscriptions
        """
        success = []
        failure = []
        for id_ in ids:
            try:
                unbind_method(consumer_uuid, id_)
                success.append(id_)
            except connection.RestlibException as re:
                if re.code == 410:
                    raise
                failure.append(id_)
                log.error(re)
        return success, failure

    def remove_all_entitlements(self):
        """
        Try to remove all entilements
        :return: Result of REST API call
        """

        response = self.cp.unbindAll(self.identity.uuid)
        self.entcertlib.update()

        return response

    def remove_entitlements_by_pool_ids(self, pool_ids):
        """
        Try to remove entitlements by pool IDs
        :param pool_ids: List of pool IDs
        :return: List of serial numbers of removed subscriptions
        """

        removed_serials = []
        _pool_ids = utils.unique_list_items(pool_ids)  # Don't allow duplicates
        # FIXME: the cache of CertificateDirectory should be smart enough and refreshing
        # should not be necessary. I vote for i-notify to be used there somehow.
        self.entitlement_dir.refresh()
        pool_id_to_serials = self.entitlement_dir.list_serials_for_pool_ids(_pool_ids)
        removed_pools, unremoved_pools = self._unbind_ids(self.cp.unbindByPoolId, self.identity.uuid, _pool_ids)
        if removed_pools:
            for pool_id in removed_pools:
                removed_serials.extend(pool_id_to_serials[pool_id])
        self.entcertlib.update()

        return removed_pools, unremoved_pools, removed_serials

    def remove_entitlements_by_serials(self, serials):
        """
        Try to remove pools by Serial numbers
        :param serials: List of serial numbers
        :return: List of serial numbers of already removed subscriptions
        """

        _serials = utils.unique_list_items(serials)  # Don't allow duplicates
        removed_serials, unremoved_serials = self._unbind_ids(self.cp.unbindBySerial, self.identity.uuid, _serials)
        self.entcertlib.update()

        return removed_serials, unremoved_serials

    def reload(self):
        """
        This callback function is called, when there is detected any change in directory with entitlement
        certificates (e.g. certificate is installed or removed)
        :return:
        """
        sorter = inj.require(inj.CERT_SORTER, on_date=None)
        status_cache = inj.require(inj.ENTITLEMENT_STATUS_CACHE)
        log.debug('Clearing in-memory cache of file %s' % status_cache.CACHE_FILE)
        status_cache.server_status = None
        sorter.load()

    def refresh(self, remove_cache=False, force=False):
        """
        Try to refresh entitlement certificate(s) from candlepin server
        :return: Report of EntCertActionInvoker
        """

        if remove_cache is True:
            # remove content_access cache, ensuring we get it fresh
            content_access = inj.require(inj.CONTENT_ACCESS_CACHE)
            if content_access.exists():
                content_access.remove()
            # Also remove the content access mode cache to be sure we display
            # SCA or regular mode correctly
            content_access_mode = inj.require(inj.CONTENT_ACCESS_MODE_CACHE)
            if content_access_mode.exists():
                content_access_mode.delete_cache()

        if force is True:
            # Force a regen of the entitlement certs for this consumer
            if not self.cp.regenEntitlementCertificates(self.identity.uuid, True):
                log.debug("Warning: Unable to refresh entitlement certificates; service likely unavailable")

        return self.entcertlib.update()

Youez - 2016 - github.com/yon3zu
LinuXploit