HEX
Server: Apache
System: Linux a16-asgard6.hospedagemuolhost.com.br 5.14.0-570.52.1.el9_6.x86_64 #1 SMP PREEMPT_DYNAMIC Wed Oct 15 06:39:08 EDT 2025 x86_64
User: maoristu4c3dbd03 (1436)
PHP: 8.1.33
Disabled: NONE
Upload Files
File: //lib64/python3.9/site-packages/subscription_manager/repofile.py
# Copyright (c) 2010 Red Hat, Inc.
# Copyright (c) 2017 ATIX AG
#
# Authors: Jeff Ortel <jortel@redhat.com>
#          Matthias Dellweg <dellweg@atix.de>
#
# This software is licensed to you under the GNU General Public License,
# version 2 (GPLv2). There is NO WARRANTY for this software, express or
# implied, including the implied warranties of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. You should have received a copy of GPLv2
# along with this software; if not, see
# http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt.
#
# Red Hat trademarks are not licensed under GPLv2. No permission is
# granted to use or replicate Red Hat trademarks that are incorporated
# in this software or its documentation.
#
from typing import Dict, List, Literal, Optional, TextIO, Tuple, TYPE_CHECKING

from iniparse import RawConfigParser as ConfigParser
import logging
import os
import re
import string
import sys

try:
    from debian.deb822 import Deb822

    HAS_DEB822 = True
except ImportError:
    HAS_DEB822 = False

from subscription_manager import utils
from subscription_manager.certdirectory import Path
import configparser
from urllib.parse import parse_qs, urlparse, urlunparse, urlencode

from rhsm.config import get_config_parser

from rhsmlib.services import config

if TYPE_CHECKING:
    from subscription_manager.model import Content
    from subscription_manager.repolib import YumReleaseverSource

log = logging.getLogger(__name__)

conf = config.Config(get_config_parser())

repo_files = []

# detect if running with yum, otherwise it's dnf
HAS_YUM = "yum" in sys.modules


class Repo(dict):
    # (name, mutable, default) - The mutability information is only used in disconnected cases
    PROPERTIES: Dict[str, Tuple[int, Optional[Literal["0", "1"]]]] = {
        "name": (0, None),
        "baseurl": (0, None),
        "enabled": (1, "1"),
        "gpgcheck": (1, "1"),
        "gpgkey": (0, None),
        "sslverify": (1, "1"),
        "sslcacert": (0, None),
        "sslclientkey": (0, None),
        "sslclientcert": (0, None),
        "sslverifystatus": (1, None),
        "metadata_expire": (1, None),
        "enabled_metadata": (1, "0"),
        "proxy": (0, None),
        "proxy_username": (0, None),
        "proxy_password": (0, None),
        "ui_repoid_vars": (0, None),
    }

    def __init__(self, repo_id: str, existing_values: List = None):
        super().__init__()
        if HAS_DEB822 is True:
            self.PROPERTIES["arches"] = (1, None)

        # existing_values is a list of 2-tuples
        existing_values = existing_values or []
        self.id: str = self._clean_id(repo_id)

        # used to store key order, so we can write things out in the order
        # we read them from the config.
        self._order: List[str] = []

        self.content_type = None

        for key, value in existing_values:
            # only set keys that have a non-empty value, to not clutter the
            # file.
            if value:
                self[key] = value

        # NOTE: This sets the above properties to the default values even if
        # they are not defined on disk. i.e. these properties will always
        # appear in this dict, but their values may be None.
        for k, (_m, d) in list(self.PROPERTIES.items()):
            if k not in list(self.keys()):
                self[k] = d

    def copy(self):
        new_repo = Repo(self.id)
        for key, value in list(self.items()):
            new_repo[key] = value
        return new_repo

    @classmethod
    def from_ent_cert_content(
        cls, content: "Content", baseurl: str, ca_cert: str, release_source: "YumReleaseverSource"
    ) -> "Repo":
        """Create an instance of Repo() from an ent_cert.EntitlementCertContent().

        And the other out of band info we need including baseurl, ca_cert, and
        the release version string.
        """
        repo: Repo = cls(content.label)

        repo.content_type = content.content_type

        repo["name"] = content.name

        if content.enabled:
            repo["enabled"] = "1"
            repo["enabled_metadata"] = "1"
        else:
            repo["enabled"] = "0"
            repo["enabled_metadata"] = "0"

        expanded_url_path = Repo._expand_releasever(release_source, content.url)
        repo["baseurl"] = utils.url_base_join(baseurl, expanded_url_path)

        # Extract the variables from the url
        repo_parts = repo["baseurl"].split("/")
        repoid_vars = [part[1:] for part in repo_parts if part.startswith("$")]
        if HAS_YUM and repoid_vars:
            repo["ui_repoid_vars"] = " ".join(repoid_vars)

        # If no GPG key URL is specified, turn gpgcheck off:
        gpg_url = content.gpg
        if not gpg_url:
            gpg_url = ""
            repo["gpgcheck"] = "0"
        else:
            gpg_url = utils.url_base_join(baseurl, gpg_url)
            # Leave gpgcheck as the default of 1
        repomd_gpg_url = conf["rhsm"]["repomd_gpg_url"]
        if repomd_gpg_url:
            repomd_gpg_url = utils.url_base_join(baseurl, repomd_gpg_url)
            if not gpg_url or gpg_url in ["https://", "http://"]:
                gpg_url = repomd_gpg_url
            elif repomd_gpg_url not in gpg_url:
                gpg_url += "," + repomd_gpg_url
        repo["gpgkey"] = gpg_url

        repo["sslclientkey"] = content.cert.key_path()
        repo["sslclientcert"] = content.cert.path
        repo["sslcacert"] = ca_cert
        repo["metadata_expire"] = content.metadata_expire
        if "arches" in repo and len(content.arches) > 0:
            repo["arches"] = content.arches

        repo = Repo._set_proxy_info(repo)

        return repo

    @staticmethod
    def _set_proxy_info(repo: "Repo") -> "Repo":
        proxy = ""

        proxy_scheme = conf["server"]["proxy_scheme"]

        if proxy_scheme.endswith("://"):
            proxy_scheme = proxy_scheme[:-3]

        # Proxy scheme can be empty: 1704662
        if proxy_scheme == "":
            defaults = conf.defaults()
            proxy_scheme = defaults.get("proxy_scheme", "http")

        # Worth passing in proxy config info to from_ent_cert_content()?
        # That would decouple Repo some
        proxy_host = conf["server"]["proxy_hostname"]

        # proxy_port as string is fine here
        proxy_port = conf["server"]["proxy_port"]

        if proxy_host != "":
            if proxy_port:
                proxy_host = proxy_host + ":" + proxy_port
            proxy = proxy_scheme + "://" + proxy_host

        # These could be empty string, in which case they will not be
        # set in the yum repo file:
        repo["proxy"] = proxy
        repo["proxy_username"] = conf["server"]["proxy_user"]
        repo["proxy_password"] = conf["server"]["proxy_password"]

        return repo

    @staticmethod
    def _expand_releasever(release_source: "YumReleaseverSource", contenturl: str) -> str:
        # no $releasever to expand
        if release_source.marker not in contenturl:
            return contenturl

        expansion = release_source.get_expansion()

        # NOTE: This is building a url from external info
        #       so likely needs more validation. In our case, the
        #       external source is trusted (release list from tls
        #       mutually authed cdn, or a tls mutual auth api)
        # NOTE: The on disk cache is more vulnerable, since it is
        #       trusted.
        return contenturl.replace(release_source.marker, expansion)

    def _clean_id(self, repo_id: str) -> str:
        """
        Format the config file id to contain only characters that yum expects
        (we'll just replace 'bad' chars with -)
        """
        new_id = ""
        valid_chars = string.ascii_letters + string.digits + "-_.:"
        for byte in repo_id:
            if byte not in valid_chars:
                new_id += "-"
            else:
                new_id += byte

        return new_id

    def items(self) -> Tuple[Tuple[str, Tuple[int, Optional[Literal["0", "1"]]]], ...]:
        """
        Called when we fetch the items for this yum repo to write to disk.
        """
        # Skip anything set to 'None' or empty string, as this is likely
        # not intended for a yum repo file. None can result here if the
        # default is None, or the entitlement certificate did not have the
        # value set.
        #
        # all values will be in _order, since the key has to have been set
        # to get into our dict.
        return tuple([(k, self[k]) for k in self._order if k in self and self[k]])

    def __setitem__(self, key: str, value: Optional[Literal["0", "1"]]):
        if key not in self._order:
            self._order.append(key)
        dict.__setitem__(self, key, value)

    def __str__(self) -> str:
        s = []
        s.append("[%s]" % self.id)
        for k in self.PROPERTIES:
            v = self.get(k)
            if v is None:
                continue
            s.append("%s=%s" % (k, v))

        return "\n".join(s)

    def __eq__(self, other: "Repo") -> bool:
        return self.id == other.id

    def __hash__(self) -> int:
        return hash(self.id)


def manage_repos_enabled() -> bool:
    try:
        manage_repos = conf["rhsm"].get_int("manage_repos")
    except ValueError as e:
        log.exception(e)
        return True
    except configparser.Error as e:
        log.exception(e)
        return True
    else:
        if manage_repos is None:
            return True

    return bool(manage_repos)


class TidyWriter:
    """
    ini file reader that removes successive newlines,
    and adds a trailing newline to the end of a file.

    used to keep our repo file clean after removals and additions of
    new sections, as iniparser's tidy function is not available in all
    versions.
    """

    def __init__(self, backing_file: TextIO):
        self.backing_file = backing_file
        self.ends_with_newline: bool = False
        self.writing_empty_lines: bool = False

    def write(self, line: str) -> None:
        lines = line.split("\n")
        i = 0
        while i < len(lines):
            line = lines[i]
            if line == "":
                if i != len(lines) - 1:
                    if not self.writing_empty_lines:
                        self.backing_file.write("\n")
                    self.writing_empty_lines = True
            else:
                self.writing_empty_lines = False
                self.backing_file.write(line)
                if i != len(lines) - 1:
                    self.backing_file.write("\n")

            i += 1

        if lines[-1] == "":
            self.ends_with_newline = True
        else:
            self.ends_with_newline = False

    def close(self) -> None:
        if not self.ends_with_newline:
            self.backing_file.write("\n")


class RepoFileBase:
    """
    Base class for managing repository.
    """

    PATH: str = None
    NAME: str = None
    REPOFILE_HEADER: str = None

    def __init__(self, path: Optional[str] = None, name: Optional[str] = None):
        # PATH gets expanded with chroot info, etc
        path = path or self.PATH
        name = name or self.NAME
        self.path: str = Path.join(path, name)
        self.repos_dir: str = Path.abs(path)
        self.manage_repos: bool = manage_repos_enabled()
        if self.manage_repos is True:
            self.create()

    # Easier than trying to mock/patch os.path.exists
    def path_exists(self, path: str) -> bool:
        """
        Wrapper around os.path.exists
        """
        return os.path.exists(path)

    def exists(self) -> bool:
        return self.path_exists(self.path)

    def create_dir_path(self) -> None:
        """
        Try to create directory for .repo files
        """
        if not self.path_exists(self.repos_dir):
            log.debug("The directory %s does not exist. Trying to create it" % self.PATH)
            try:
                os.makedirs(name=self.repos_dir, mode=0o755)
            except Exception as err:
                log.warning("Unable to create directory: %s, error: %s" % (self.repos_dir, err))
        else:
            log.debug("The directory %s already exists" % self.repos_dir)

    def create(self) -> None:
        """
        Try to create new repo file.
        """
        pass

    def fix_content(self, content: str) -> str:
        return content

    @classmethod
    def installed(cls) -> bool:
        return os.path.exists(Path.abs(cls.PATH))

    @classmethod
    def server_value_repo_file(cls) -> "RepoFileBase":
        return cls("var/lib/rhsm/repo_server_val/")


if HAS_DEB822:

    class AptRepoFile(RepoFileBase):
        PATH: str = "etc/apt/sources.list.d"
        NAME: str = "rhsm.sources"
        CONTENT_TYPES: List[str] = ["deb"]
        REPOFILE_HEADER: str = """#
# Certificate-Based Repositories
# Managed by (rhsm) subscription-manager
#
# *** This file is auto-generated.  Changes made here will be over-written. ***
# *** Use "subscription-manager repo-override --help" if you wish to make changes. ***
#
# If this file is empty and this system is subscribed consider
# a "apt-get update" to refresh available repos
#
# *** DO NOT EDIT THIS FILE ***
#
"""

        def __init__(self, path: Optional[str] = None, name: Optional[str] = None):
            super(AptRepoFile, self).__init__(path, name)
            self.repos822 = []

        def read(self):
            if not self.manage_repos:
                log.debug("Skipping read due to manage_repos setting: %s" % self.path)
                return
            with open(self.path, "r") as f:
                for repo822 in Deb822.iter_paragraphs(f, shared_storage=False):
                    self.repos822.append(repo822)

        def write(self):
            if not self.manage_repos:
                log.debug("Skipping write due to manage_repos setting: %s" % self.path)
                return
            with open(self.path, "w") as f:
                f.write(self.REPOFILE_HEADER)
                for repo822 in self.repos822:
                    f.write("\n")
                    repo822.dump(f, text_mode=True)

        def add(self, repo):
            repo_dict = dict([(str(k), str(v)) for (k, v) in repo.items()])
            repo_dict["id"] = repo.id
            self.repos822.append(Deb822(repo_dict))

        def delete(self, repo_id):
            self.repos822[:] = [repo822 for repo822 in self.repos822 if repo822["id"] != repo_id]

        def update(self, repo):
            repo_dict = dict([(str(k), str(v)) for (k, v) in repo.items()])
            repo_dict["id"] = repo.id
            self.repos822[:] = [
                repo822 if repo822["id"] != repo.id else Deb822(repo_dict) for repo822 in self.repos822
            ]

        def section(self, repo_id):
            result = [repo822 for repo822 in self.repos822 if repo822["id"] == repo_id]
            if len(result) > 0:
                return Repo(result[0]["id"], result[0].items())
            else:
                return None

        def sections(self):
            return [repo822["id"] for repo822 in self.repos822]

        def fix_content(self, content):
            # Luckily apt ignores all Fields it does not recognize
            baseurl = content["baseurl"]
            url_res = re.match(r"^https?://(?P<location>.*)$", baseurl)
            ent_res = re.match(r"^/etc/pki/entitlement/(?P<entitlement>.*).pem$", content["sslclientcert"])
            if url_res and ent_res:
                location = url_res.group("location")
                entitlement = ent_res.group("entitlement")
                baseurl = "katello://{}@{}".format(entitlement, location)

            apt_cont = content.copy()
            apt_cont["Types"] = "deb"
            apt_cont["URIs"] = baseurl
            apt_cont["Suites"] = "default"
            apt_cont["Components"] = "all"
            apt_cont["Trusted"] = "yes"

            if apt_cont["arches"] is None or apt_cont["arches"] == ["ALL"]:
                apt_cont["arches"] = "none"
            else:
                arches_str = " ".join(apt_cont["arches"])
                apt_cont["arches"] = arches_str
                apt_cont["Architectures"] = arches_str

            return apt_cont


class YumRepoFile(RepoFileBase, ConfigParser):
    PATH = "etc/yum.repos.d/"
    NAME = "redhat.repo"
    CONTENT_TYPES = ["yum"]
    REPOFILE_HEADER = """#
# Certificate-Based Repositories
# Managed by (rhsm) subscription-manager
#
# *** This file is auto-generated.  Changes made here will be over-written. ***
# *** Use "subscription-manager repo-override --help" if you wish to make changes. ***
#
# If this file is empty and this system is subscribed consider
# a "yum repolist" to refresh available repos
#
"""

    def __init__(self, path: Optional[str] = None, name: Optional[str] = None):
        ConfigParser.__init__(self)
        RepoFileBase.__init__(self, path, name)

    def read(self) -> None:
        ConfigParser.read(self, self.path)

    def _configparsers_equal(self, otherparser) -> bool:
        if set(otherparser.sections()) != set(self.sections()):
            return False

        for section in self.sections():
            # Sometimes we end up with ints, but values must be strings to compare
            current_items = dict([(str(k), str(v)) for (k, v) in self.items(section)])
            if current_items != dict(otherparser.items(section)):
                return False
        return True

    def _has_changed(self) -> bool:
        """
        Check if the version on disk is different from what we have loaded
        """
        on_disk = ConfigParser()
        on_disk.read(self.path)
        return not self._configparsers_equal(on_disk)

    def write(self) -> None:
        if not self.manage_repos:
            log.debug("Skipping write due to manage_repos setting: %s" % self.path)
            return
        if self._has_changed():
            with open(self.path, "w") as f:
                tidy_writer = TidyWriter(f)
                ConfigParser.write(self, tidy_writer)
                tidy_writer.close()

    def add(self, repo: "Repo") -> None:
        self.add_section(repo.id)
        self.update(repo)

    def delete(self, section) -> bool:
        return self.remove_section(section)

    def update(self, repo: "Repo") -> None:
        # Need to clear out the old section to allow unsetting options:
        # don't use remove section though, as that will reorder sections,
        # and move whitespace around (resulting in more and more whitespace
        # as time progresses).
        for k, v in self.items(repo.id):
            self.remove_option(repo.id, k)

        for k, v in list(repo.items()):
            ConfigParser.set(self, repo.id, k, v)

    def section(self, section: str) -> "Repo":
        if self.has_section(section):
            return Repo(section, self.items(section))


class ZypperRepoFile(YumRepoFile):
    """
    Class for manipulation of repo file on systems using Zypper (SuSE, OpenSuse).
    """

    ZYPP_RHSM_PLUGIN_CONFIG_FILE = "/etc/rhsm/zypper.conf"
    PATH = "etc/rhsm/zypper.repos.d"
    NAME = "redhat.repo"
    REPOFILE_HEADER = """#
# Certificate-Based Repositories
# Managed by (rhsm) subscription-manager
#
# *** This file is auto-generated.  Changes made here will be over-written. ***
# *** Use "subscription-manager repo-override --help" if you wish to make changes. ***
#
# If this file is empty and this system is subscribed consider
# a "zypper lr" to refresh available repos
#
"""

    def __init__(self, path: Optional[str] = None, name: Optional[str] = None):
        super(ZypperRepoFile, self).__init__(path, name)
        self.gpgcheck: bool = False
        self.repo_gpgcheck: bool = False
        self.autorefresh: bool = False
        # According to
        # https://github.com/openSUSE/libzypp/blob/67f55b474d67f77c1868955da8542a7acfa70a9f/zypp/media/MediaManager.h#L394
        #   the following values are valid: "yes", "no", "host", "peer"
        self.gpgkey_ssl_verify: Optional[str] = None
        self.repo_ssl_verify: Optional[str] = None

    def read_zypp_conf(self):
        """
        Read configuration file for zypper plugin
        :return: None
        """
        zypp_cfg = configparser.ConfigParser()
        zypp_cfg.read(self.ZYPP_RHSM_PLUGIN_CONFIG_FILE)
        if zypp_cfg.has_option("rhsm-plugin", "gpgcheck"):
            self.gpgcheck = zypp_cfg.getboolean("rhsm-plugin", "gpgcheck")
        if zypp_cfg.has_option("rhsm-plugin", "repo_gpgcheck"):
            self.repo_gpgcheck = zypp_cfg.getboolean("rhsm-plugin", "repo_gpgcheck")
        if zypp_cfg.has_option("rhsm-plugin", "autorefresh"):
            self.autorefresh = zypp_cfg.getboolean("rhsm-plugin", "autorefresh")
        if zypp_cfg.has_option("rhsm-plugin", "gpgkey-ssl-verify"):
            self.gpgkey_ssl_verify = zypp_cfg.get("rhsm-plugin", "gpgkey-ssl-verify")
        if zypp_cfg.has_option("rhsm-plugin", "repo-ssl-verify"):
            self.repo_ssl_verify = zypp_cfg.get("rhsm-plugin", "repo-ssl-verify")

    def fix_content(self, content: "Content") -> str:
        self.read_zypp_conf()
        zypper_cont = content.copy()
        sslverify = zypper_cont["sslverify"]
        sslcacert = zypper_cont["sslcacert"]
        sslclientkey = zypper_cont["sslclientkey"]
        sslclientcert = zypper_cont["sslclientcert"]
        proxy = zypper_cont["proxy"]
        proxy_username = zypper_cont["proxy_username"]
        proxy_password = zypper_cont["proxy_password"]

        del zypper_cont["sslverify"]
        del zypper_cont["sslcacert"]
        del zypper_cont["sslclientkey"]
        del zypper_cont["sslclientcert"]
        del zypper_cont["proxy"]
        del zypper_cont["proxy_username"]
        del zypper_cont["proxy_password"]
        # NOTE looks like metadata_expire and ui_repoid_vars are ignored by zypper

        # clean up data for zypper
        if zypper_cont["gpgkey"] in ["https://", "http://"]:
            del zypper_cont["gpgkey"]

        # make sure gpg key download doesn't fail because of private certs
        if zypper_cont["gpgkey"] and self.gpgkey_ssl_verify:
            zypper_cont["gpgkey"] += "?ssl_verify=%s" % self.gpgkey_ssl_verify

        # See BZ: https://bugzilla.redhat.com/show_bug.cgi?id=1764265
        if self.gpgcheck is False:
            zypper_cont["gpgcheck"] = "0"

        # See BZ: https://bugzilla.redhat.com/show_bug.cgi?id=1858231
        if self.repo_gpgcheck is True:
            zypper_cont["repo_gpgcheck"] = "1"
        else:
            zypper_cont["repo_gpgcheck"] = "0"

        # See BZ: https://bugzilla.redhat.com/show_bug.cgi?id=1797386
        if self.autorefresh is True:
            zypper_cont["autorefresh"] = "1"
        else:
            zypper_cont["autorefresh"] = "0"

        baseurl = zypper_cont["baseurl"]
        parsed = urlparse(baseurl)
        zypper_query_args: Dict[str, str] = parse_qs(parsed.query)

        if sslverify and sslverify in ["1"]:
            if self.repo_ssl_verify:
                zypper_query_args["ssl_verify"] = self.repo_ssl_verify
            else:
                zypper_query_args["ssl_verify"] = "host"

        if sslcacert:
            zypper_query_args["ssl_capath"] = os.path.dirname(sslcacert)
        if sslclientkey:
            zypper_query_args["ssl_clientkey"] = sslclientkey
        if sslclientcert:
            zypper_query_args["ssl_clientcert"] = sslclientcert
        if proxy:
            zypper_query_args["proxy"] = proxy
        if proxy_username:
            zypper_query_args["proxyuser"] = proxy_username
        if proxy_password:
            zypper_query_args["proxypass"] = proxy_password
        zypper_query = urlencode(zypper_query_args)

        new_url = urlunparse(
            (parsed.scheme, parsed.netloc, parsed.path, parsed.params, zypper_query, parsed.fragment)
        )
        zypper_cont["baseurl"] = new_url

        return zypper_cont

    # We need to overwrite this, to avoid name clashes with yum's server_val_repo_file
    @classmethod
    def server_value_repo_file(cls) -> "ZypperRepoFile":
        return cls("var/lib/rhsm/repo_server_val/", "zypper_{}".format(cls.NAME))


def init_repo_file_classes() -> List[Tuple[type(RepoFileBase), str]]:
    repo_file_classes: List[type(RepoFileBase)] = [YumRepoFile, ZypperRepoFile]
    if HAS_DEB822:
        repo_file_classes.append(AptRepoFile)
    _repo_files: List[Tuple[type(RepoFileBase), type(RepoFileBase)]] = [
        (RepoFile, RepoFile.server_value_repo_file) for RepoFile in repo_file_classes if RepoFile.installed()
    ]
    return _repo_files


def get_repo_file_classes() -> List[Tuple[type(RepoFileBase), type(RepoFileBase)]]:
    global repo_files
    if not repo_files:
        repo_files = init_repo_file_classes()
    return repo_files