403Webshell
Server IP : 172.24.0.40  /  Your IP : 216.73.216.10
Web Server : Apache
System : Linux dbweb26.ust.edu.ph 4.18.0-513.5.1.el8_9.x86_64 #1 SMP Fri Sep 29 05:21:10 EDT 2023 x86_64
User : apache ( 48)
PHP Version : 8.2.18
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : OFF  |  Sudo : ON  |  Pkexec : ON
Directory :  /usr/lib64/python3.6/site-packages/subscription_manager/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /usr/lib64/python3.6/site-packages/subscription_manager//identity.py
from __future__ import print_function, division, absolute_import

#
# Copyright (c) 2013 Red Hat, Inc.
#
# This software is licensed to you under the GNU General Public License,
# version 2 (GPLv2). There is NO WARRANTY for this software, express or
# implied, including the implied warranties of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. You should have received a copy of GPLv2
# along with this software; if not, see
# http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt.
#
# Red Hat trademarks are not licensed under GPLv2. No permission is
# granted to use or replicate Red Hat trademarks that are incorporated
# in this software or its documentation.
#

import logging
import os
import errno
import threading
import six

from rhsm.certificate import create_from_pem
from rhsm.config import get_config_parser
from subscription_manager.certdirectory import Path

from rhsmlib.services import config
from rhsm.certificate import CertificateException
from rhsm.certificate2 import CertificateLoadingError

conf = config.Config(get_config_parser())
log = logging.getLogger(__name__)


class ConsumerIdentity(object):
    """Consumer info and certificate information.

    Includes helpers for reading/writing consumer identity certificates
    from disk."""

    PATH = conf['rhsm']['consumerCertDir']
    KEY = 'key.pem'
    CERT = 'cert.pem'

    @staticmethod
    def keypath():
        return str(Path.join(ConsumerIdentity.PATH, ConsumerIdentity.KEY))

    @staticmethod
    def certpath():
        return str(Path.join(ConsumerIdentity.PATH, ConsumerIdentity.CERT))

    @classmethod
    def read(cls):
        with open(cls.keypath()) as key_file:
            key = key_file.read()
        with open(cls.certpath()) as cert_file:
            cert = cert_file.read()
        return ConsumerIdentity(key, cert)

    @classmethod
    def exists(cls):
        return (os.path.exists(cls.keypath()) and
                os.path.exists(cls.certpath()))

    @classmethod
    def existsAndValid(cls):
        if cls.exists():
            try:
                cls.read()
                return True
            except Exception as e:
                log.warn('possible certificate corruption')
                log.error(e)
        return False

    def __init__(self, keystring, certstring):
        self.key = keystring
        # TODO: bad variables, cert should be the certificate object, x509 is
        # used elsewhere for the rhsm._certificate object of the same name.
        self.cert = certstring
        self.x509 = create_from_pem(certstring)

    def getConsumerId(self):
        subject = self.x509.subject
        return subject.get('CN')

    def getConsumerName(self):
        altName = self.x509.alt_name
        # must account for old format and new
        return altName.replace("DirName:/CN=", "").replace("URI:CN=", "").split(", ")[-1]

    def getSerialNumber(self):
        return self.x509.serial

    # TODO: we're using a Certificate which has it's own write/delete, no idea
    # why this landed in a parallel disjoint class wrapping the actual cert.
    def write(self):
        from subscription_manager import managerlib
        self.__mkdir()
        with open(self.keypath(), 'w') as key_file:
            key_file.write(self.key)
        os.chmod(self.keypath(), managerlib.ID_CERT_PERMS)
        with open(self.certpath(), 'w') as cert_file:
            cert_file.write(self.cert)
        os.chmod(self.certpath(), managerlib.ID_CERT_PERMS)

    def delete(self):
        path = self.keypath()
        if os.path.exists(path):
            os.unlink(path)
        path = self.certpath()
        if os.path.exists(path):
            os.unlink(path)

    def __mkdir(self):
        path = Path.abs(self.PATH)
        if not os.path.exists(path):
            os.mkdir(path)

    def __str__(self):
        return 'consumer: name="%s", uuid=%s' % \
            (self.getConsumerName(),
             self.getConsumerId())


class Identity(object):
    """Wrapper for sharing consumer identity without constant reloading."""
    def __init__(self):
        self.consumer = None
        self._lock = threading.Lock()
        self._name = None
        self._uuid = None
        self._cert_dir_path = conf['rhsm']['consumerCertDir']
        self.reload()

    def reload(self):
        """Check for consumer certificate on disk and update our info accordingly."""
        log.debug("Loading consumer info from identity certificates.")
        with self._lock:
            try:
                self.consumer = self._get_consumer_identity()
            # XXX shouldn't catch the global exception here, but that's what
            # existsAndValid did, so this is better.
            except (CertificateException, CertificateLoadingError, IOError) as err:
                self.consumer = None
                if six.PY2:
                    err_msg = str(err).decode('utf-8')
                else:
                    err_msg = err
                msg = "Reload of consumer identity cert %s raised an exception with msg: %s" \
                    % (ConsumerIdentity.certpath(), err_msg)
                if isinstance(err, IOError) and err.errno == errno.ENOENT:
                    log.debug(msg)
                else:
                    log.error(msg)

            if self.consumer is not None:
                self._name = self.consumer.getConsumerName()
                self._uuid = self.consumer.getConsumerId()
                # since Identity gets dep injected, lets look up
                # the cert dir on the active id instead of the global config
                self._cert_dir_path = self.consumer.PATH
            else:
                self._name = None
                self._uuid = None
                self._cert_dir_path = conf['rhsm']['consumerCertDir']

    def _get_consumer_identity(self):
        return ConsumerIdentity.read()

    # this name is weird, since Certificate.is_valid actually checks the data
    # and this is a thin wrapper
    def is_valid(self):
        return self.uuid is not None

    @property
    def name(self):
        with self._lock:
            _name = self._name
        return _name

    @property
    def uuid(self):
        with self._lock:
            _uuid = self._uuid
        return _uuid

    @property
    def cert_dir_path(self):
        with self._lock:
            _cert_dir_path = self._cert_dir_path
        return _cert_dir_path

    @staticmethod
    def is_present():
        return ConsumerIdentity.exists()

    def __str__(self):
        return "Consumer Identity name=%s uuid=%s" % (self.name, self.uuid)

Youez - 2016 - github.com/yon3zu
LinuXploit