HEX
Server: Apache
System: Linux a16-asgard6.hospedagemuolhost.com.br 5.14.0-570.52.1.el9_6.x86_64 #1 SMP PREEMPT_DYNAMIC Wed Oct 15 06:39:08 EDT 2025 x86_64
User: maoristu4c3dbd03 (1436)
PHP: 8.1.33
Disabled: NONE
Upload Files
File: //proc/3006601/root/usr/lib64/python3.9/site-packages/subscription_manager/cli_command/identity.py
#
# Subscription manager command line utility.
#
# Copyright (c) 2021 Red Hat, Inc.
#
# This software is licensed to you under the GNU General Public License,
# version 2 (GPLv2). There is NO WARRANTY for this software, express or
# implied, including the implied warranties of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. You should have received a copy of GPLv2
# along with this software; if not, see
# http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt.
#
# Red Hat trademarks are not licensed under GPLv2. No permission is
# granted to use or replicate Red Hat trademarks that are incorporated
# in this software or its documentation.
#
import logging
import os

import rhsm.connection as connection
import subscription_manager.injection as inj

from rhsmlib.facts.hwprobe import ClassicCheck

from subscription_manager import managerlib
from subscription_manager.branding import get_branding
from subscription_manager.cli import system_exit
from subscription_manager.cli_command.cli import handle_exception
from subscription_manager.cli_command.environments import MULTI_ENV
from subscription_manager.cli_command.user_pass import UserPassCommand
from subscription_manager.i18n import ugettext as _
from subscription_manager.i18n import ungettext
from subscription_manager.utils import get_current_owner, get_supported_resources

log = logging.getLogger(__name__)


class IdentityCommand(UserPassCommand):
    def __init__(self):
        shortdesc = _("Display the identity certificate for this system or " "request a new one")

        super(IdentityCommand, self).__init__("identity", shortdesc, False)

        self.parser.add_argument(
            "--regenerate",
            action="store_true",
            help=_("request a new certificate be generated"),
        )
        self.parser.add_argument(
            "--force",
            action="store_true",
            help=_(
                "force certificate regeneration (requires username and password); "
                "Only used with --regenerate"
            ),
        )

    def _validate_options(self):
        self.assert_should_be_registered()
        if self.options.force and not self.options.regenerate:
            system_exit(os.EX_USAGE, _("--force can only be used with --regenerate"))
        if (self.options.username or self.options.password) and not self.options.force:
            system_exit(os.EX_USAGE, _("--username and --password can only be used with --force"))
        if self.options.token and not self.options.force:
            system_exit(os.EX_USAGE, _("--token can only be used with --force"))

    def _do_command(self):
        # get current consumer identity
        identity = inj.require(inj.IDENTITY)

        # check for Classic before doing anything else
        if ClassicCheck().is_registered_with_classic():
            if identity.is_valid():
                print(_("server type: {type}").format(type=get_branding().REGISTERED_TO_BOTH_SUMMARY))
            else:
                # no need to continue if user is only registered to Classic
                print(_("server type: {type}").format(type=get_branding().REGISTERED_TO_OTHER_SUMMARY))
                return

        try:
            self._validate_options()
            consumerid = self.identity.uuid
            consumer_name = self.identity.name
            if not self.options.regenerate:
                owner = get_current_owner(self.cp, self.identity)
                ownername = owner["displayName"]
                ownerid = owner["key"]

                print(_("system identity: {consumerid}").format(consumerid=consumerid))
                print(_("name: {consumer_name}").format(consumer_name=consumer_name))
                print(_("org name: {ownername}").format(ownername=ownername))
                print(_("org ID: {ownerid}").format(ownerid=ownerid))

                supported_resources = get_supported_resources(self.cp, self.identity)
                if "environments" in supported_resources:
                    consumer = self.cp.getConsumer(consumerid)
                    evn_key = "environments" if self.cp.has_capability(MULTI_ENV) else "environment"
                    environments = consumer[evn_key]
                    if environments:
                        if evn_key == "environment":
                            environment_names = environments["name"]
                        else:
                            environment_names = ",".join(
                                [environment["name"] for environment in environments]
                            )
                    else:
                        environment_names = _("None")
                    print(
                        ungettext(
                            "environment name: {environment_name}",
                            "environment names: {environment_name}",
                            len(environment_names.split(",")),
                        ).format(environment_name=environment_names)
                    )
            else:
                if self.options.force:
                    # get an UEP with basic auth or keycloak auth
                    if self.options.token:
                        self.cp = self.cp_provider.get_keycloak_auth_cp(self.options.token)
                    else:
                        self.cp_provider.set_user_pass(self.username, self.password)
                        self.cp = self.cp_provider.get_basic_auth_cp()
                consumer = self.cp.regenIdCertificate(consumerid)
                managerlib.persist_consumer_cert(consumer)

                # do this in persist_consumer_cert? or some other
                # high level, "I just registered" thing
                self.identity.reload()

                print(_("Identity certificate has been regenerated."))

                log.debug("Successfully generated a new identity from server.")
        except connection.GoneException as ge:
            # Gone exception is caught in CliCommand and a consistent message
            # is printed there for all commands
            raise ge
        except connection.RestlibException as re:
            log.exception(re)
            log.error("Error: Unable to generate a new identity for the system: {re}".format(re=re))

            system_exit(os.EX_SOFTWARE, re)
        except Exception as e:
            handle_exception(_("Error: Unable to generate a new identity for the system"), e)