HEX
Server: Apache
System: Linux a16-asgard6.hospedagemuolhost.com.br 5.14.0-570.52.1.el9_6.x86_64 #1 SMP PREEMPT_DYNAMIC Wed Oct 15 06:39:08 EDT 2025 x86_64
User: maoristu4c3dbd03 (1436)
PHP: 8.1.33
Disabled: NONE
Upload Files
File: //proc/3006601/root/usr/lib64/python3.9/site-packages/subscription_manager/cli_command/status.py
#
# Subscription manager command line utility.
#
# Copyright (c) 2021 Red Hat, Inc.
#
# This software is licensed to you under the GNU General Public License,
# version 2 (GPLv2). There is NO WARRANTY for this software, express or
# implied, including the implied warranties of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. You should have received a copy of GPLv2
# along with this software; if not, see
# http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt.
#
# Red Hat trademarks are not licensed under GPLv2. No permission is
# granted to use or replicate Red Hat trademarks that are incorporated
# in this software or its documentation.
#
import logging
import os
import subscription_manager.injection as inj

from rhsm.certificate2 import CONTENT_ACCESS_CERT_TYPE
from rhsm.connection import ConnectionException

from rhsmlib.services import entitlement
from rhsmlib.services.refresh import Refresh

from subscription_manager import syspurposelib
from subscription_manager.cli import system_exit
from subscription_manager.cli_command.cli import CliCommand
from subscription_manager.i18n import ugettext as _
from subscription_manager.printing_utils import format_name
from subscription_manager.utils import is_simple_content_access, get_terminal_width
from time import localtime, strftime

log = logging.getLogger(__name__)


class StatusCommand(CliCommand):
    def __init__(self):
        shortdesc = _("Show status information for this system's subscriptions and products")
        super(StatusCommand, self).__init__("status", shortdesc, True)
        self.parser.add_argument(
            "--ondate",
            dest="on_date",
            help=_("future date to check status on, defaults to today's date (example: {example})").format(
                example=strftime("%Y-%m-%d", localtime())
            ),
        )

    def _get_date_cli_option(self):
        """
        Try to get and validate command line options date
        :return: Return date or None, when date was not provided
        """
        on_date = None
        if self.options.on_date:
            try:
                on_date = entitlement.EntitlementService.parse_date(self.options.on_date)
            except ValueError as err:
                system_exit(os.EX_DATAERR, err)
        return on_date

    def _print_status(self, service_status):
        """
        Print only status
        :return: Print overall status
        """

        print("+-------------------------------------------+")
        print("   " + _("System Status Details"))
        print("+-------------------------------------------+")

        ca_message = ""
        has_cert = _(
            "Content Access Mode is set to Simple Content Access. "
            "This host has access to content, regardless of subscription status.\n"
        )

        certs = self.entitlement_dir.list_with_content_access()
        sca_certs = [cert for cert in certs if cert.entitlement_type == CONTENT_ACCESS_CERT_TYPE]
        sca_mode_detected = False

        refresh_service = Refresh(cp=self.cp, ent_cert_lib=self.entcertlib)

        if sca_certs:
            sca_mode_detected = True
        else:
            # When there are no entitlement SCA certificates, but status_id is "disabled", then
            # it means that content access mode has changed on the server and entitlement certificates
            # have to be refreshed
            if service_status["status_id"] == "disabled":
                refresh_service.refresh()
            if is_simple_content_access(uep=self.cp, identity=self.identity):
                sca_mode_detected = True

        if sca_mode_detected is True:
            # When SCA mode was detected using cache or installed SCA entitlement certificates, but status_id
            # is not "disabled", then it means that content access mode has changed on the server and
            # entitlement certificates have to be refreshed
            status_id = service_status["status_id"]
            if status_id != "disabled":
                log.debug(
                    f"Found SCA cert, but status ID is not 'disabled' ({status_id}). "
                    "Refreshing entitlement certs..."
                )
                refresh_service.refresh()
            else:
                ca_message = has_cert

        print(
            _("Overall Status: {status}\n{message}").format(
                status=service_status["status"], message=ca_message
            )
        )

    def _print_reasons(self, service_status):
        """
        Print reasons for overall status
        :param service_status:
        :return: None
        """
        reasons = service_status["reasons"]

        columns = get_terminal_width()
        for name in reasons:
            print(format_name(name + ":", 0, columns))
            for message in reasons[name]:
                print("- {name}".format(name=format_name(message, 2, columns)))
            print("")

    def _print_syspurpose_status(self, on_date):
        """
        Print syspurpose status
        :return: None
        """
        try:
            store = syspurposelib.get_sys_purpose_store()
            if store:
                store.sync()
        except (OSError, ConnectionException) as ne:
            log.exception(ne)

        syspurpose_cache = inj.require(inj.SYSTEMPURPOSE_COMPLIANCE_STATUS_CACHE)
        syspurpose_cache.load_status(self.cp, self.identity.uuid, on_date)
        print(_("System Purpose Status: {status}").format(status=syspurpose_cache.get_overall_status()))

        syspurpose_status_code = syspurpose_cache.get_overall_status_code()
        if syspurpose_status_code != "matched":
            reasons = syspurpose_cache.get_status_reasons()
            if reasons is not None:
                for reason in reasons:
                    print("- {reason}".format(reason=reason))
        print("")

    def _do_command(self):
        """
        Print status and all reasons it is not valid
        """

        # First get/check if provided date is valid
        on_date = self._get_date_cli_option()

        service_status = entitlement.EntitlementService(cp=self.cp).get_status(on_date)

        self._print_status(service_status)

        self._print_reasons(service_status)

        self._print_syspurpose_status(on_date)

        if service_status["valid"]:
            result = 0
        else:
            result = 1

        return result