HEX
Server: Apache
System: Linux a16-asgard6.hospedagemuolhost.com.br 5.14.0-570.52.1.el9_6.x86_64 #1 SMP PREEMPT_DYNAMIC Wed Oct 15 06:39:08 EDT 2025 x86_64
User: maoristu4c3dbd03 (1436)
PHP: 8.1.33
Disabled: NONE
Upload Files
File: //proc/3006601/root/usr/lib64/python3.9/site-packages/subscription_manager/cli_command/cli.py
#
# Subscription manager command line utility.
#
# Copyright (c) 2021 Red Hat, Inc.
#
# This software is licensed to you under the GNU General Public License,
# version 2 (GPLv2). There is NO WARRANTY for this software, express or
# implied, including the implied warranties of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. You should have received a copy of GPLv2
# along with this software; if not, see
# http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt.
#
# Red Hat trademarks are not licensed under GPLv2. No permission is
# granted to use or replicate Red Hat trademarks that are incorporated
# in this software or its documentation.
#
import logging
import os
import sys
from typing import List, Optional

import rhsm.config
import rhsm.connection as connection
import subscription_manager.injection as inj

from rhsm.certificate import CertificateException
from rhsm.certificate2 import CertificateLoadingError
from rhsm.connection import ProxyException, ConnectionOSErrorException
from rhsm.https import ssl
import rhsm.utils
from rhsm.utils import cmd_name, ServerUrlParseError

from rhsmlib.services import config

from subscription_manager.cli import AbstractCLICommand, InvalidCLIOptionError, system_exit
from subscription_manager.entcertlib import EntCertActionInvoker
from subscription_manager.i18n import ugettext as _
from subscription_manager.utils import (
    generate_correlation_id,
    get_client_versions,
    get_server_versions,
    parse_server_info,
    parse_baseurl_info,
    format_baseurl,
    is_valid_server_info,
    MissingCaCertException,
    get_current_owner,
)

ERR_NOT_REGISTERED_MSG = _(
    "This system is not yet registered. Try 'subscription-manager register --help' for more information."
)
ERR_NOT_REGISTERED_CODE = 1

log = logging.getLogger(__name__)
conf = config.Config(rhsm.config.get_config_parser())


def handle_exception(msg, ex):
    # On Python 2.4 and earlier, sys.exit triggers a SystemExit exception,
    # which can land us into this block of code. We do not want to handle
    # this or print any messages as the caller would already have done so,
    # so just re-throw and let Python have at it.
    if isinstance(ex, SystemExit):
        raise ex

    # GoneException will be handled uniformly for every command except unregister.
    if isinstance(ex, connection.GoneException):
        raise ex

    log.error(msg)
    log.exception(ex)

    # Directly pass the exception to system_exit for exception message formatting and exit.
    system_exit(os.EX_SOFTWARE, ex)


class CliCommand(AbstractCLICommand):
    """Base class for all sub-commands."""

    def __init__(self, name="cli", shortdesc=None, primary=False):
        AbstractCLICommand.__init__(self, name=name, shortdesc=shortdesc, primary=primary)

        self.log = self._get_logger()

        if self.require_connection():
            self._add_proxy_options()
            self._add_progress_messages_option()

        self.server_url = None

        # TODO
        self.server_hostname = None
        self.server_port = None
        self.server_prefix = None
        self.proxy_user = None
        self.proxy_password = None
        #
        self.proxy_url = None
        self.proxy_hostname = None
        self.proxy_port = None
        self.no_proxy = None
        #
        self.progress_messages = rhsm.utils.PROGRESS_MESSAGES

        self.entitlement_dir = inj.require(inj.ENT_DIR)
        self.product_dir = inj.require(inj.PROD_DIR)

        self.client_versions = self._default_client_version()
        self.server_versions = self._default_server_version()

        self.plugin_manager = inj.require(inj.PLUGIN_MANAGER)

        self.identity = inj.require(inj.IDENTITY)

        self.correlation_id = generate_correlation_id()

    def _print_ignore_auto_attach_message(self):
        """
        This message is shared by attach command and register command, because
        both commands can do auto-attach.
        :return: None
        """
        owner = get_current_owner(self.cp, self.identity)
        # We displayed Owner name: `owner_name = owner['displayName']`, but such behavior
        # was not consistent with rest of subscription-manager
        # Look at this comment: https://bugzilla.redhat.com/show_bug.cgi?id=1826300#c8
        owner_id = owner["key"]
        print(
            _(
                "Ignoring the request to auto-attach. "
                'Attaching subscriptions is disabled for organization "{owner_id}" '
                "because Simple Content Access (SCA) is enabled."
            ).format(owner_id=owner_id)
        )

    def _get_logger(self):
        return logging.getLogger(
            "rhsm-app.{module}.{name}".format(module=self.__module__, name=self.__class__.__name__)
        )

    def _request_validity_check(self):
        # Make sure the sorter is fresh (low footprint if it is)
        inj.require(inj.CERT_SORTER).force_cert_check()

    def _add_url_options(self):
        """Add options that allow the setting of the server URL."""
        self.parser.add_argument(
            "--serverurl",
            dest="server_url",
            default=None,
            help=_("server URL in the form of https://hostname:port/prefix"),
        )
        self.parser.add_argument(
            "--insecure",
            action="store_true",
            default=False,
            help=_(
                "do not check the entitlement server SSL certificate against "
                "available certificate authorities"
            ),
        )

    def _add_proxy_options(self):
        """Add proxy options that apply to sub-commands that require network connections."""
        self.parser.add_argument(
            "--proxy",
            dest="proxy_url",
            default=None,
            help=_("proxy URL in the form of hostname:port"),
        )
        self.parser.add_argument(
            "--proxyuser",
            dest="proxy_user",
            default=None,
            help=_("user for HTTP proxy with basic authentication"),
        )
        self.parser.add_argument(
            "--proxypassword",
            dest="proxy_password",
            default=None,
            help=_("password for HTTP proxy with basic authentication"),
        )
        self.parser.add_argument(
            "--noproxy",
            dest="no_proxy",
            default=None,
            help=_("host suffixes that should bypass HTTP proxy"),
        )

    def _add_progress_messages_option(self):
        """Add option to disable progress messages."""
        self.parser.add_argument(
            "--no-progress-messages",
            action="store_false",
            dest="progress_messages",
            help=_("Do not display progress messages"),
        )

    def _do_command(self):
        pass

    def assert_should_be_registered(self):
        if not self.is_consumer_cert_present():
            system_exit(ERR_NOT_REGISTERED_CODE, ERR_NOT_REGISTERED_MSG)
        elif not self.is_registered():
            system_exit(
                os.EX_DATAERR,
                _("Consumer identity either does not exist or is corrupted. Try register --help"),
            )

    def is_consumer_cert_present(self):
        self.identity = inj.require(inj.IDENTITY)
        return self.identity.is_present()

    def is_registered(self):
        self.identity = inj.require(inj.IDENTITY)
        log.debug("{identity}".format(identity=self.identity))
        return self.identity.is_valid()

    def persist_server_options(self):
        """
        Whether to persist options like --serverurl or --baseurl to the
        rhsm.conf file when used.  For modules like register, we want this to
        be true.  For modules like orgs or environments, we want false.
        """
        return False

    def require_connection(self):
        return True

    def _default_client_version(self):
        return {"subscription-manager": _("Unknown")}

    def _default_server_version(self):
        return {
            "candlepin": _("Unknown"),
            "rules-type": _("Unknown"),
            "server-type": _("Unknown"),
        }

    def log_client_version(self):
        self.client_versions = get_client_versions()
        log.debug("Client Versions: {versions}".format(versions=self.client_versions))

    def log_server_version(self):
        # can't check the server version without a connection
        # and valid registration
        if not self.require_connection():
            return

        # get_server_versions needs to handle any exceptions
        # and return the server dict
        if self.is_registered():
            cp = self.cp
        else:
            cp = self.no_auth_cp
        self.server_versions = get_server_versions(cp, exception_on_timeout=False)
        log.debug("Server Versions: {versions}".format(versions=self.server_versions))

    def main(self, args: Optional[List[str]] = None) -> Optional[int]:
        # TODO: For now, we disable the CLI entirely. We may want to allow some commands in the future.
        if rhsm.config.in_container():
            system_exit(
                os.EX_CONFIG,
                _(
                    "subscription-manager is operating in container mode. "
                    "Use your host system to manage subscriptions.\n"
                ),
            )

        config_changed: bool = False

        # In testing we sometimes specify args, otherwise use the default:
        if args is None:
            args = sys.argv[2:]

        (self.options, unknown_args) = self.parser.parse_known_args(args)

        # check for unparsed arguments
        if unknown_args:
            message: str = _("{prog}: error: no such option: {args}").format(
                prog=cmd_name(sys.argv),
                args=" ".join(unknown_args),
            )
            system_exit(os.EX_USAGE, message)

        if getattr(self.options, "progress_messages", None) is False:
            rhsm.utils.PROGRESS_MESSAGES = False

        if hasattr(self.options, "insecure") and self.options.insecure:
            conf["server"]["insecure"] = "1"
            config_changed = True

        if hasattr(self.options, "server_url") and self.options.server_url:
            try:
                (self.server_hostname, self.server_port, self.server_prefix) = parse_server_info(
                    self.options.server_url, conf
                )
            except ServerUrlParseError as e:
                print(_("Error parsing serverurl:"))
                handle_exception("Error parsing serverurl:", e)

            conf["server"]["hostname"] = self.server_hostname
            conf["server"]["port"] = self.server_port
            conf["server"]["prefix"] = self.server_prefix
            if self.server_port:
                self.server_port = int(self.server_port)
            config_changed = True

        if hasattr(self.options, "base_url") and self.options.base_url:
            try:
                (baseurl_server_hostname, baseurl_server_port, baseurl_server_prefix) = parse_baseurl_info(
                    self.options.base_url
                )
            except ServerUrlParseError as e:
                print(_("Error parsing baseurl:"))
                handle_exception("Error parsing baseurl:", e)

            conf["rhsm"]["baseurl"] = format_baseurl(
                baseurl_server_hostname, baseurl_server_port, baseurl_server_prefix
            )
            config_changed = True

        if hasattr(self.options, "proxy_url") and self.options.proxy_url:
            default_proxy_port = conf["server"].get_int("proxy_port") or rhsm.config.DEFAULT_PROXY_PORT
            proxy_user, proxy_pass, proxy_hostname, proxy_port, proxy_prefix = rhsm.utils.parse_url(
                self.options.proxy_url, default_port=default_proxy_port
            )
            self.proxy_user = proxy_user
            self.proxy_password = proxy_pass
            self.proxy_hostname = proxy_hostname
            self.proxy_port = int(proxy_port)
            config_changed = True

        if hasattr(self.options, "proxy_user") and self.options.proxy_user:
            self.proxy_user = self.options.proxy_user
        if hasattr(self.options, "proxy_password") and self.options.proxy_password:
            self.proxy_password = self.options.proxy_password
        if hasattr(self.options, "no_proxy") and self.options.no_proxy:
            self.no_proxy = self.options.no_proxy

        # Proxy information isn't written to the config, so we have to make sure
        # the sorter gets it
        connection_info = {}
        if self.proxy_hostname:
            connection_info["proxy_hostname_arg"] = self.proxy_hostname
        if self.proxy_port:
            connection_info["proxy_port_arg"] = self.proxy_port
        if self.proxy_user:
            connection_info["proxy_user_arg"] = self.proxy_user
        if self.proxy_password:
            connection_info["proxy_password_arg"] = self.proxy_password
        if self.server_hostname:
            connection_info["host"] = self.server_hostname
        if self.server_port:
            connection_info["ssl_port"] = self.server_port
        if self.server_prefix:
            connection_info["handler"] = self.server_prefix
        if self.no_proxy:
            connection_info["no_proxy_arg"] = self.no_proxy

        self.cp_provider = inj.require(inj.CP_PROVIDER)
        self.cp_provider.set_connection_info(**connection_info)
        self.log.debug("X-Correlation-ID: {id}".format(id=self.correlation_id))
        self.cp_provider.set_correlation_id(self.correlation_id)

        self.log_client_version()

        if self.require_connection():
            # make sure we pass in the new server info, otherwise we
            # we use the defaults from connection module init
            # we've set self.proxy* here, so we'll use them if they
            # are set
            self.cp = self.cp_provider.get_consumer_auth_cp()

            # no auth cp for get / (resources) and
            # get /status (status and versions)
            self.no_auth_cp = self.cp_provider.get_no_auth_cp()

            self.entcertlib = EntCertActionInvoker()

            if config_changed:
                try:
                    # this tries to actually connect to the server and ping it
                    if not is_valid_server_info(self.no_auth_cp):
                        system_exit(
                            os.EX_UNAVAILABLE,
                            _("Unable to reach the server at {host}:{port}{handler}").format(
                                host=connection.normalized_host(self.no_auth_cp.host),
                                port=self.no_auth_cp.ssl_port,
                                handler=self.no_auth_cp.handler,
                            ),
                        )

                except MissingCaCertException:
                    system_exit(
                        os.EX_CONFIG,
                        _("Error: CA certificate for subscription service has not been installed."),
                    )
                except (ProxyException, ConnectionOSErrorException) as exc:
                    system_exit(os.EX_UNAVAILABLE, exc)

        else:
            self.cp = None

        # do the work, catch most common errors here:
        try:
            return_code = self._do_command()

            # Only persist the config changes if there was no exception
            if config_changed and self.persist_server_options():
                conf.persist()

            if return_code is not None:
                return return_code
        except CertificateLoadingError as exc:
            system_exit(os.EX_SOFTWARE, exc)
        except (CertificateException, ssl.SSLError) as e:
            log.error(e)
            system_exit(os.EX_SOFTWARE, _("System certificates corrupted. Please reregister."))
        except connection.GoneException as ge:
            if ge.deleted_id == self.identity.uuid:
                log.critical(
                    'Consumer profile "{uuid}" has been deleted from the server.'.format(
                        uuid=self.identity.uuid
                    )
                )
                system_exit(
                    os.EX_UNAVAILABLE,
                    _(
                        'Consumer profile "{uuid}" has been deleted from the server. '
                        "You can use command clean or unregister to remove local profile."
                    ).format(uuid=self.identity.uuid),
                )
            else:
                raise ge
        except InvalidCLIOptionError as err:
            # This exception is handled in cli module
            raise err
        except Exception as err:
            handle_exception("exception caught in subscription-manager", err)