HEX
Server: Apache
System: Linux a16-asgard6.hospedagemuolhost.com.br 5.14.0-570.52.1.el9_6.x86_64 #1 SMP PREEMPT_DYNAMIC Wed Oct 15 06:39:08 EDT 2025 x86_64
User: maoristu4c3dbd03 (1436)
PHP: 8.1.33
Disabled: NONE
Upload Files
File: //usr/lib64/python3.9/site-packages/rhsmlib/facts/cloud_facts.py
# Copyright (c) 2019 Red Hat, Inc.
#
# This software is licensed to you under the GNU General Public License,
# version 2 (GPLv2). There is NO WARRANTY for this software, express or
# implied, including the implied warranties of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. You should have received a copy of GPLv2
# along with this software; if not, see
# http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt.
#
# Red Hat trademarks are not licensed under GPLv2. No permission is
# granted to use or replicate Red Hat trademarks that are incorporated
# in this software or its documentation.
#

import logging
import json
from typing import Any, Dict, Callable, List, Optional, Union

from cloud_what._base_provider import BaseCloudProvider

from cloud_what.provider import get_cloud_provider, DetectionMethod
from rhsmlib.facts import collector


log = logging.getLogger(__name__)


class CloudFactsCollector(collector.FactsCollector):
    """
    Class used for collecting facts related to Cloud instances
    """

    def __init__(
        self,
        arch: str = None,
        prefix: str = None,
        testing: bool = None,
        collected_hw_info: Dict[str, Union[str, None]] = None,
    ):
        super(CloudFactsCollector, self).__init__(
            arch=arch,
            prefix=prefix,
            testing=testing,
            collected_hw_info=collected_hw_info,
        )

        self.hardware_methods: List[Callable] = []

        # Try to detect cloud provider using only strong method
        self.cloud_provider: Optional[BaseCloudProvider] = get_cloud_provider(
            facts=self._collected_hw_info,
            methods=DetectionMethod.STRONG,
        )

        if self.cloud_provider is not None:
            # Create dispatcher for supported cloud providers
            cloud_provider_dispatcher: Dict[str, Callable] = {
                "aws": self.get_aws_facts,
                "azure": self.get_azure_facts,
                "gcp": self.get_gcp_facts,
            }
            # Set method according detected cloud provider
            if self.cloud_provider.CLOUD_PROVIDER_ID in cloud_provider_dispatcher:
                self.hardware_methods: List[Callable] = [
                    cloud_provider_dispatcher[self.cloud_provider.CLOUD_PROVIDER_ID]
                ]

    def get_aws_facts(self) -> Dict[str, Union[str, None]]:
        """
        Try to get AWS facts (only instance ID ATM) of machine running on AWS public cloud
        :return:
            dictionary containing {"aws_instance_id": some_instance_ID} when
            the machine is able to gather metadata from AWS cloud provider;
            otherwise returns empty dictionary {}
        """

        metadata_str: str = self.cloud_provider.get_metadata()

        facts: Dict[str, Union[str, None]] = {}
        if metadata_str is not None:
            values: Dict[str, Union[str, None]] = self.parse_json_content(metadata_str)

            # Add these three attributes to system facts
            if "instanceId" in values:
                facts["aws_instance_id"] = values["instanceId"]

            if "accountId" in values:
                facts["aws_account_id"] = values["accountId"]

            # BTW: There should be only two types of billing codes: bp-63a5400a and bp-6fa54006 in the list,
            # when RHEL is used. When the subscription-manager is used by some other Linux distribution,
            # then there could be different codes, or it could be null
            if "billingProducts" in values:
                billing_products: Optional[List[str]] = values["billingProducts"]
                if isinstance(billing_products, list):
                    facts["aws_billing_products"] = " ".join(billing_products)
                elif billing_products is None:
                    facts["aws_billing_products"] = billing_products
                else:
                    log.debug("AWS metadata attribute billingProducts has to be list or null")

            if "marketplaceProductCodes" in values:
                marketplace_product_codes = values["marketplaceProductCodes"]
                if isinstance(marketplace_product_codes, list):
                    facts["aws_marketplace_product_codes"] = " ".join(marketplace_product_codes)
                elif marketplace_product_codes is None:
                    facts["aws_marketplace_product_codes"] = marketplace_product_codes
                else:
                    log.debug("AWS metadata attribute marketplaceProductCodes has to be list or null")

            if "instanceType" in values:
                facts["aws_instance_type"] = values["instanceType"]

            if "region" in values:
                facts["aws_region"] = values["region"]

        return facts

    def get_azure_facts(self) -> Dict[str, str]:
        """
        Try to get facts of VM running on Azure public cloud. Returned dictionary has the following format:
            {
                "azure_instance_id": some_instance_ID,
                "azure_offer": some_offer,
                "azure_sku": some_sku,
                "azure_subscription_id": some_subscription_ID
                "azure_location: azure region the VM is running in
            }
        :return: dictionary containing Azure facts, when the machine is able to gather metadata
            from Azure cloud provider; otherwise returns empty dictionary {}
        """

        metadata_str: str = self.cloud_provider.get_metadata()

        facts: Dict[str, str] = {}
        if metadata_str is not None:
            values: Dict[str, Any] = self.parse_json_content(metadata_str)
            if "compute" in values:
                if "vmId" in values["compute"]:
                    facts["azure_instance_id"] = values["compute"]["vmId"]
                if "sku" in values["compute"]:
                    facts["azure_sku"] = values["compute"]["sku"]
                if "offer" in values["compute"]:
                    facts["azure_offer"] = values["compute"]["offer"]
                if "subscriptionId" in values["compute"]:
                    facts["azure_subscription_id"] = values["compute"]["subscriptionId"]
                if "location" in values["compute"]:
                    facts["azure_location"] = values["compute"]["location"]
                if "extendedLocation" in values["compute"]:
                    if "name" in values["compute"]["extendedLocation"]:
                        facts["azure_extended_location_name"] = values["compute"]["extendedLocation"]["name"]
                    if "type" in values["compute"]["extendedLocation"]:
                        facts["azure_extended_location_type"] = values["compute"]["extendedLocation"]["type"]
        return facts

    def get_gcp_facts(self) -> Dict[str, str]:
        """
        Try to get facts of VM running on GCP public cloud. Only instance_id is reported ATM.
        :return: dictionary containing GCP facts, when the machine is able to gather metadata
            from GCP cloud provider; otherwise returns empty dictionary {}
        """

        encoded_jwt_token: str = self.cloud_provider.get_metadata()

        facts: Dict[str, str] = {}
        if encoded_jwt_token is not None:
            jose_header, metadata, signature = self.cloud_provider.decode_jwt(encoded_jwt_token)
            if metadata is not None:
                values: Dict[str, Any] = self.parse_json_content(metadata)
                if "google" in values and "compute_engine" in values["google"]:
                    # ID of instance
                    if "instance_id" in values["google"]["compute_engine"]:
                        facts["gcp_instance_id"] = values["google"]["compute_engine"]["instance_id"]
                    else:
                        log.debug("GCP instance_id not found in JWT token")
                    # IDs of licenses
                    if "license_id" in values["google"]["compute_engine"]:
                        gcp_license_codes = values["google"]["compute_engine"]["license_id"]
                        facts["gcp_license_codes"] = " ".join(gcp_license_codes)
                    else:
                        log.debug("GCP license codes not found in JWT token")
                    # ID of project
                    if "project_id" in values["google"]["compute_engine"]:
                        facts["gcp_project_id"] = values["google"]["compute_engine"]["project_id"]
                    else:
                        log.debug("GCP project_id not found in JWT token")
                    # number of project
                    if "project_number" in values["google"]["compute_engine"]:
                        facts["gcp_project_number"] = values["google"]["compute_engine"]["project_number"]
                    else:
                        log.debug("GCP project_number not found in JWT token")
                    # zone where the machine is located
                    if "zone" in values["google"]["compute_engine"]:
                        facts["gcp_zone"] = values["google"]["compute_engine"]["zone"]
                    else:
                        log.debug("GCP zone not found in JWT token")
                else:
                    log.debug("GCP google.compute_engine on found in JWT token")
        return facts

    @staticmethod
    def parse_json_content(content: str) -> Dict[str, Any]:
        """
        Parse content returned from AWS metadata provider
        :param content: string of JSON document
        :return: Dictionary containing values from parsed JSON document
        """
        try:
            return json.loads(content)
        except ValueError as e:
            raise ValueError("Failed to parse json data with error: %s", str(e))