HEX
Server: Apache
System: Linux a16-asgard6.hospedagemuolhost.com.br 5.14.0-570.52.1.el9_6.x86_64 #1 SMP PREEMPT_DYNAMIC Wed Oct 15 06:39:08 EDT 2025 x86_64
User: maoristu4c3dbd03 (1436)
PHP: 8.1.33
Disabled: NONE
Upload Files
File: //proc/3006601/root/usr/lib64/python3.9/site-packages/subscription_manager/cli_command/attach.py
#
# Subscription manager command line utility.
#
# Copyright (c) 2021 Red Hat, Inc.
#
# This software is licensed to you under the GNU General Public License,
# version 2 (GPLv2). There is NO WARRANTY for this software, express or
# implied, including the implied warranties of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. You should have received a copy of GPLv2
# along with this software; if not, see
# http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt.
#
# Red Hat trademarks are not licensed under GPLv2. No permission is
# granted to use or replicate Red Hat trademarks that are incorporated
# in this software or its documentation.
#
import fileinput
import logging
import os
import re

import rhsm.connection as connection
import subscription_manager.injection as inj

from rhsmlib.services import attach, products

from subscription_manager.action_client import ProfileActionClient, ActionClient
from subscription_manager.cli import system_exit
from subscription_manager.cli_command.cli import CliCommand, handle_exception
from subscription_manager.cli_command.list import show_autosubscribe_output
from subscription_manager.exceptions import ExceptionMapper
from subscription_manager.i18n import ugettext as _
from subscription_manager.packageprofilelib import PackageProfileActionInvoker
from subscription_manager.syspurposelib import save_sla_to_syspurpose_metadata
from subscription_manager.utils import is_simple_content_access, get_current_owner

log = logging.getLogger(__name__)


class AttachCommand(CliCommand):
    def __init__(self):
        super(AttachCommand, self).__init__(self._command_name(), self._short_description(), self._primary())

        self.product = None
        self.substoken = None
        self.auto_attach = True
        self.parser.add_argument(
            "--pool",
            dest="pool",
            action="append",
            help=_("The ID of the pool to attach (can be specified more than once)"),
        )
        self.parser.add_argument(
            "--quantity",
            dest="quantity",
            type=int,
            help=_("Number of subscriptions to attach. May not be used with an auto-attach."),
        )
        self.parser.add_argument(
            "--auto",
            action="store_true",
            help=_(
                "Automatically attach the best-matched compatible subscriptions to this system. "
                "This is the default action."
            ),
        )
        self.parser.add_argument(
            "--servicelevel",
            dest="service_level",
            help=_(
                "Automatically attach only subscriptions matching the specified service level; "
                "only used with --auto"
            ),
        )
        self.parser.add_argument(
            "--file",
            dest="file",
            help=_(
                "A file from which to read pool IDs. If a hyphen is provided, pool IDs will be "
                "read from stdin."
            ),
        )

        # re bz #864207
        _("All installed products are covered by valid entitlements.")
        _("No need to update subscriptions at this time.")

    def _read_pool_ids(self, f):
        if not self.options.pool:
            self.options.pool = []

        for line in fileinput.input(f):
            for pool in filter(bool, re.split(r"\s+", line.strip())):
                self.options.pool.append(pool)

    def _short_description(self):
        return _(
            "Deprecated, this command will be removed from the future major releases. "
            "Attach a specified subscription to the registered system, when system does not use "
            "Simple Content Access mode"
        )

    def _command_name(self):
        return "attach"

    def _primary(self):
        return False

    def _validate_options(self):
        if self.options.pool or self.options.file:
            if self.options.auto:
                system_exit(os.EX_USAGE, _("Error: --auto may not be used when specifying pools."))
            if self.options.service_level:
                system_exit(
                    os.EX_USAGE, _("Error: The --servicelevel option cannot be used when specifying pools.")
                )

        # Quantity must be positive
        if self.options.quantity is not None:
            if self.options.quantity <= 0:
                system_exit(os.EX_USAGE, _("Error: Quantity must be a positive integer."))
            elif self.options.auto or not (self.options.pool or self.options.file):
                system_exit(os.EX_USAGE, _("Error: --quantity may not be used with an auto-attach"))

        # If a pools file was specified, process its contents and append it to options.pool
        if self.options.file:
            self.options.file = os.path.expanduser(self.options.file)
            if self.options.file == "-" or os.path.isfile(self.options.file):
                self._read_pool_ids(self.options.file)

                if len(self.options.pool) < 1:
                    if self.options.file == "-":
                        system_exit(os.EX_DATAERR, _("Error: Received data does not contain any pool IDs."))
                    else:
                        system_exit(
                            os.EX_DATAERR,
                            _('Error: The file "{file}" does not contain any pool IDs.').format(
                                file=self.options.file
                            ),
                        )
            else:
                system_exit(
                    os.EX_DATAERR,
                    _('Error: The file "{file}" does not exist or cannot be read.').format(
                        file=self.options.file
                    ),
                )

    def _print_ignore_attach_message(self):
        """
        Print message about ignoring attach request
        :return: None
        """
        owner = get_current_owner(self.cp, self.identity)
        owner_id = owner["key"]
        print(
            _(
                "Ignoring the request to attach. "
                'Attaching subscriptions is disabled for organization "{owner_id}" '
                "because Simple Content Access (SCA) is enabled."
            ).format(owner_id=owner_id)
        )

    def _do_command(self):
        """
        Executes the command.
        """
        self.assert_should_be_registered()
        self._validate_options()

        # --pool or --file turns off default auto attach
        if self.options.pool or self.options.file:
            self.auto_attach = False

        # Do not try to do auto-attach, when simple content access mode is used
        # BZ: https://bugzilla.redhat.com/show_bug.cgi?id=1826300
        #
        if is_simple_content_access(uep=self.cp, identity=self.identity):
            if self.auto_attach is True:
                self._print_ignore_auto_attach_message()
            else:
                self._print_ignore_attach_message()
            return 0

        installed_products_num = 0
        return_code = 0
        report = None

        # TODO: change to if self.auto_attach: else: pool/file stuff
        try:
            cert_action_client = ActionClient(skips=[PackageProfileActionInvoker])
            cert_action_client.update()
            cert_update = True

            attach_service = attach.AttachService(self.cp)
            if self.options.pool:
                subscribed = False

                for pool in self.options.pool:
                    # odd html strings will cause issues, reject them here.
                    if pool.find("#") >= 0:
                        system_exit(os.EX_USAGE, _("Please enter a valid numeric pool ID."))

                    try:
                        ents = attach_service.attach_pool(pool, self.options.quantity)
                        # Usually just one, but may as well be safe:
                        for ent in ents:
                            pool_json = ent["pool"]
                            print(
                                _("Successfully attached a subscription for: {name}").format(
                                    name=pool_json["productName"]
                                )
                            )
                            log.debug(
                                "Attached a subscription for {name}".format(name=pool_json["productName"])
                            )
                            subscribed = True
                    except connection.RestlibException as re:
                        log.exception(re)

                        exception_mapper = ExceptionMapper()
                        mapped_message = exception_mapper.get_message(re)

                        if re.code == 403:
                            print(mapped_message)  # already subscribed.
                        elif re.code == 400 or re.code == 404:
                            print(mapped_message)  # no such pool.
                        else:
                            system_exit(os.EX_SOFTWARE, mapped_message)  # some other error.. don't try again
                if not subscribed:
                    return_code = 1
            # must be auto
            else:
                installed_products_num = len(products.InstalledProducts(self.cp).list())
                # if we are green, we don't need to go to the server
                self.sorter = inj.require(inj.CERT_SORTER)

                if self.sorter.is_valid():
                    if not installed_products_num:
                        print(_("No Installed products on system. " "No need to attach subscriptions."))
                    else:
                        print(
                            _(
                                "All installed products are covered by valid entitlements. "
                                "No need to update subscriptions at this time."
                            )
                        )
                    cert_update = False
                else:
                    # If service level specified, make an additional request to
                    # verify service levels are supported on the server:
                    if self.options.service_level:
                        consumer = self.cp.getConsumer(self.identity.uuid)
                        if "serviceLevel" not in consumer:
                            system_exit(
                                os.EX_UNAVAILABLE,
                                _(
                                    "Error: The --servicelevel option is not "
                                    "supported by the server. Did not "
                                    "complete your request."
                                ),
                            )

                    attach_service.attach_auto(self.options.service_level)
                    if self.options.service_level is not None:
                        #  RHBZ 1632797 we should only save the sla if the sla was actually
                        #  specified. The uep and consumer_uuid are None, because service_level was sent
                        # to candlepin server using attach_service.attach_auto()
                        save_sla_to_syspurpose_metadata(
                            uep=None, consumer_uuid=None, service_level=self.options.service_level
                        )
                        print(_("Service level set to: {}").format(self.options.service_level))

            if cert_update:
                report = self.entcertlib.update()

            profile_action_client = ProfileActionClient()
            profile_action_client.update()

            if report and report.exceptions():
                print(_("Entitlement Certificate(s) update failed due to the following reasons:"))
                for e in report.exceptions():
                    print("\t-", str(e))
            elif self.auto_attach:
                if not installed_products_num:
                    return_code = 1
                else:
                    self.sorter.force_cert_check()
                    # Make sure that we get fresh status of installed products
                    status_cache = inj.require(inj.ENTITLEMENT_STATUS_CACHE)
                    status_cache.load_status(
                        self.sorter.cp_provider.get_consumer_auth_cp(),
                        self.sorter.identity.uuid,
                        self.sorter.on_date,
                    )
                    self.sorter.load()
                    # run this after entcertlib update, so we have the new entitlements
                    return_code = show_autosubscribe_output(self.cp, self.identity)

        except Exception as e:
            handle_exception("Unable to attach: {e}".format(e=e), e)

        # it is okay to call this no matter what happens above,
        # it's just a notification to perform a check
        self._request_validity_check()

        return return_code