HEX
Server: Apache
System: Linux a16-asgard6.hospedagemuolhost.com.br 5.14.0-570.52.1.el9_6.x86_64 #1 SMP PREEMPT_DYNAMIC Wed Oct 15 06:39:08 EDT 2025 x86_64
User: maoristu4c3dbd03 (1436)
PHP: 8.1.33
Disabled: NONE
Upload Files
File: //lib64/python3.9/site-packages/subscription_manager/identity.py
# Copyright (c) 2013 Red Hat, Inc.
#
# This software is licensed to you under the GNU General Public License,
# version 2 (GPLv2). There is NO WARRANTY for this software, express or
# implied, including the implied warranties of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. You should have received a copy of GPLv2
# along with this software; if not, see
# http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt.
#
# Red Hat trademarks are not licensed under GPLv2. No permission is
# granted to use or replicate Red Hat trademarks that are incorporated
# in this software or its documentation.
#

import logging
import os
import errno
import threading
from typing import Optional, TYPE_CHECKING

from rhsm.certificate import create_from_pem
from rhsm.config import get_config_parser
from subscription_manager.certdirectory import Path

from rhsmlib.services import config
from rhsm.certificate import CertificateException
from rhsm.certificate2 import CertificateLoadingError

if TYPE_CHECKING:
    from rhsm.certificate2 import EntitlementCertificate

conf = config.Config(get_config_parser())
log = logging.getLogger(__name__)


class ConsumerIdentity:
    """Consumer info and certificate information.

    Includes helpers for reading/writing consumer identity certificates
    from disk."""

    PATH = conf["rhsm"]["consumerCertDir"]
    KEY = "key.pem"
    CERT = "cert.pem"

    @staticmethod
    def keypath() -> str:
        return str(Path.join(ConsumerIdentity.PATH, ConsumerIdentity.KEY))

    @staticmethod
    def certpath() -> str:
        return str(Path.join(ConsumerIdentity.PATH, ConsumerIdentity.CERT))

    @classmethod
    def read(cls) -> "ConsumerIdentity":
        with open(cls.keypath()) as key_file:
            key: str = key_file.read()
        with open(cls.certpath()) as cert_file:
            cert: str = cert_file.read()
        return ConsumerIdentity(key, cert)

    @classmethod
    def exists(cls) -> bool:
        return os.path.exists(cls.keypath()) and os.path.exists(cls.certpath())

    @classmethod
    def existsAndValid(cls) -> bool:
        if cls.exists():
            try:
                cls.read()
                return True
            except Exception as e:
                log.warn("possible certificate corruption")
                log.error(e)
        return False

    def __init__(self, keystring, certstring):
        self.key = keystring
        # TODO: bad variables, cert should be the certificate object, x509 is
        # used elsewhere for the rhsm._certificate object of the same name.
        self.cert = certstring
        self.x509: EntitlementCertificate = create_from_pem(certstring)

    def getConsumerId(self) -> str:
        subject: dict = self.x509.subject
        return subject.get("CN")

    def getConsumerName(self) -> str:
        # FIXME Unresolved attribute
        altName = self.x509.alt_name
        # FIXME Do we still need to support 'old' format?
        #       The comment is from 2014, the line below from 2017
        # must account for old format and new
        return altName.replace("DirName:/CN=", "").replace("URI:CN=", "").split(", ")[-1]

    def getSerialNumber(self) -> int:
        return self.x509.serial

    def getConsumerOwner(self) -> Optional[str]:
        """Get the name of the organization of the consumer.

        The value is stored in the 'Subject Name' > 'O (Organization)' field
        of the consumer certificate.
        """
        subject: dict = self.x509.subject
        return subject.get("O")

    # TODO: we're using a Certificate which has it's own write/delete, no idea
    # why this landed in a parallel disjoint class wrapping the actual cert.
    def write(self) -> None:
        from subscription_manager import managerlib

        self.__mkdir()
        with open(self.keypath(), "w") as key_file:
            key_file.write(self.key)
        os.chmod(self.keypath(), managerlib.ID_CERT_PERMS)
        with open(self.certpath(), "w") as cert_file:
            cert_file.write(self.cert)
        os.chmod(self.certpath(), managerlib.ID_CERT_PERMS)

    def delete(self) -> None:
        path: str = self.keypath()
        if os.path.exists(path):
            os.unlink(path)
        path: str = self.certpath()
        if os.path.exists(path):
            os.unlink(path)

    def __mkdir(self) -> None:
        path: str = Path.abs(self.PATH)
        if not os.path.exists(path):
            os.mkdir(path)

    def __str__(self) -> str:
        return 'consumer: name="%s", uuid=%s' % (self.getConsumerName(), self.getConsumerId())


class Identity:
    """Wrapper for sharing consumer identity without constant reloading."""

    def __init__(self):
        self.consumer: Optional[ConsumerIdentity] = None
        self._lock = threading.Lock()
        self._name: Optional[str] = None
        self._uuid: Optional[str] = None
        self._owner: Optional[str] = None
        self._cert_dir_path: str = conf["rhsm"]["consumerCertDir"]
        self.reload()

    def reload(self) -> None:
        """Check for consumer certificate on disk and update our info accordingly."""
        log.debug("Loading consumer info from identity certificates.")
        with self._lock:
            try:
                self.consumer = self._get_consumer_identity()
            # XXX shouldn't catch the global exception here, but that's what
            # existsAndValid did, so this is better.
            except (CertificateException, CertificateLoadingError, IOError) as err:
                self.consumer = None
                err_msg: Exception = err
                msg = "Reload of consumer identity cert %s raised an exception with msg: %s" % (
                    ConsumerIdentity.certpath(),
                    err_msg,
                )
                if isinstance(err, IOError) and err.errno == errno.ENOENT:
                    log.debug(msg)
                else:
                    log.error(msg)

            if self.consumer is not None:
                self._name = self.consumer.getConsumerName()
                self._uuid = self.consumer.getConsumerId()
                self._owner = self.consumer.getConsumerOwner()
                # since Identity gets dep injected, lets look up
                # the cert dir on the active id instead of the global config
                self._cert_dir_path = self.consumer.PATH
            else:
                self._name = None
                self._uuid = None
                self._cert_dir_path = conf["rhsm"]["consumerCertDir"]

    def _get_consumer_identity(self) -> ConsumerIdentity:
        return ConsumerIdentity.read()

    # this name is weird, since Certificate.is_valid actually checks the data
    # and this is a thin wrapper
    def is_valid(self) -> bool:
        return self.uuid is not None

    @property
    def name(self) -> Optional[str]:
        with self._lock:
            _name = self._name
        return _name

    @property
    def uuid(self) -> Optional[str]:
        with self._lock:
            _uuid = self._uuid
        return _uuid

    @property
    def owner(self) -> Optional[str]:
        with self._lock:
            return self._owner

    @property
    def cert_dir_path(self) -> str:
        with self._lock:
            _cert_dir_path = self._cert_dir_path
        return _cert_dir_path

    @staticmethod
    def is_present() -> bool:
        return ConsumerIdentity.exists()

    def __str__(self) -> str:
        return "Consumer Identity name=%s uuid=%s" % (self.name, self.uuid)