HEX
Server: Apache
System: Linux a16-asgard6.hospedagemuolhost.com.br 5.14.0-570.52.1.el9_6.x86_64 #1 SMP PREEMPT_DYNAMIC Wed Oct 15 06:39:08 EDT 2025 x86_64
User: maoristu4c3dbd03 (1436)
PHP: 8.1.33
Disabled: NONE
Upload Files
File: //proc/self/root/proc/thread-self/root/usr/lib64/python3.9/site-packages/rhsmlib/facts/hwprobe.py
# Module to probe Hardware info from the system
#
# Copyright (c) 2010 Red Hat, Inc.
#
# Authors: Pradeep Kilambi <pkilambi@redhat.com>
#
# This software is licensed to you under the GNU General Public License,
# version 2 (GPLv2). There is NO WARRANTY for this software, express or
# implied, including the implied warranties of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. You should have received a copy of GPLv2
# along with this software; if not, see
# http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt.
#
# Red Hat trademarks are not licensed under GPLv2. No permission is
# granted to use or replicate Red Hat trademarks that are incorporated
# in this software or its documentation.
#
import json
import logging
import os
import platform
import re
import subprocess
import sys

import datetime
from rhsmlib.facts import cpuinfo
from rhsmlib.facts import collector

from typing import Callable, Dict, Optional, List, TextIO, Tuple, Union

log = logging.getLogger(__name__)


class ClassicCheck:
    def is_registered_with_classic(self) -> bool:
        try:
            sys.path.append("/usr/share/rhn")
            from up2date_client import up2dateAuth
        except ImportError:
            return False

        return up2dateAuth.getSystemId() is not None


# take a string like '1-4' and returns a list of
# ints like [1,2,3,4]
# 31-37 return [31,32,33,34,35,36,37]
def parse_range(range_str: str) -> List[int]:
    range_list: List[str] = range_str.split("-")
    start = int(range_list[0])
    end = int(range_list[-1])

    return list(range(start, end + 1))


# util to total up the values represented by a cpu siblings list
# ala /sys/devices/cpu/cpu0/topology/core_siblings_list
#
# which can be a comma separated list of ranges
#  1,2,3,4
#  1-2, 4-6, 8-10, 12-14
#
def gather_entries(entries_string: str) -> List[int]:
    entries: List[int] = []
    entry_parts: List[str] = entries_string.split(",")
    for entry_part in entry_parts:
        # return a list of enumerated items
        entry_range: List[int] = parse_range(entry_part)
        for entry in entry_range:
            entries.append(entry)
    return entries


# FIXME This class does not seem to be used anywhere
class GenericPlatformSpecificInfoProvider:
    """Default provider for platform without a specific platform info provider.
    ie, all platforms except those with DMI (ie, intel platforms)"""

    def __init__(self, hardware_info, dump_file=None):
        self.info = {}

    @staticmethod
    def log_warnings():
        pass


class HardwareCollector(collector.FactsCollector):
    LSCPU_CMD: str = "/usr/bin/lscpu"

    def __init__(
        self,
        arch: str = None,
        prefix: str = None,
        testing: bool = None,
        collected_hw_info: Dict[str, Union[str, int, bool, None]] = None,
    ):
        super(HardwareCollector, self).__init__(
            arch=arch, prefix=prefix, testing=testing, collected_hw_info=None
        )

        self.hardware_methods: List[Callable] = [
            self.get_uname_info,
            self.get_release_info,
            self.get_mem_info,
            self.get_last_boot,
            self.get_proc_cpuinfo,
            self.get_proc_stat,
            self.get_cpu_info,
            self.get_ls_cpu_info,
        ]

    def get_uname_info(self) -> Dict[str, str]:
        uname_data: os.uname_result = os.uname()
        uname_keys: Tuple[str, ...] = (
            "uname.sysname",
            "uname.nodename",
            "uname.release",
            "uname.version",
            "uname.machine",
        )
        uname_info: Dict[str, str] = dict(list(zip(uname_keys, uname_data)))
        return uname_info

    def get_release_info(self) -> Dict[str, str]:
        distro_info: tuple = self.get_distribution()
        release_info: Dict[str, str] = {
            "distribution.name": distro_info[0],
            "distribution.version": distro_info[1],
            "distribution.id": distro_info[2],
            "distribution.version.modifier": distro_info[3],
        }
        return release_info

    def _open_release(self, filename: str) -> TextIO:
        return open(filename, "r")

    # this version os very RHEL/Fedora specific...
    def get_distribution(self) -> Tuple[str, str, str, str, str, List[str]]:
        version: str = "Unknown"
        distname: str = "Unknown"
        dist_id: str = "Unknown"
        id: str = "Unknown"
        id_like: List[str] = [""]
        version_modifier: str = ""

        if os.path.exists("/etc/os-release"):
            f: TextIO = open("/etc/os-release", "r")
            os_release: List[str] = f.readlines()
            f.close()
            data: Dict[str, str] = {
                "PRETTY_NAME": "Unknown",
                "NAME": distname,
                "ID": id,
                "ID_LIKE": "",
                "VERSION": dist_id,
                "VERSION_ID": version,
                "CPE_NAME": "Unknown",
            }
            for line in os_release:
                split: List[str] = [piece.strip('"\n ') for piece in line.split("=")]
                if len(split) != 2:
                    continue
                data[split[0]] = split[1]

            version = data["VERSION_ID"]
            distname = data["NAME"]
            dist_id = data["VERSION"]
            dist_id_search: Optional[re.Match] = re.search(r"\((.*?)\)", dist_id)
            if dist_id_search:
                dist_id: str = dist_id_search.group(1)
            id_like = data["ID_LIKE"].split()
            id = data["ID"]
            # Split on ':' that is not preceded by '\'
            vers_mod_data: List[str] = re.split(r"(?<!\\):", data["CPE_NAME"])
            if len(vers_mod_data) >= 6:
                version_modifier = vers_mod_data[5].lower().replace("\\:", ":")
        elif os.path.exists("/etc/redhat-release"):
            # from platform.py from python2.
            _lsb_release_version: re.Pattern = re.compile(
                r"(.+) release ([\d.]+)\s*(?!\()(\S*)\s*[^(]*(?:\((.+)\))?"
            )
            f: TextIO = self._open_release("/etc/redhat-release")
            firstline: str = f.readline()
            f.close()

            m: re.Match = _lsb_release_version.match(firstline)

            if m is not None:
                (distname, version, tmp_modifier, dist_id) = tuple(m.groups())
                if tmp_modifier:
                    version_modifier = tmp_modifier.lower()

        elif hasattr(platform, "linux_distribution"):
            (distname, version, dist_id) = platform.linux_distribution()
            version_modifier = "Unknown"

        # FIXME Return collections.namedtuple instead?
        return distname, version, dist_id, version_modifier, id, id_like

    def get_mem_info(self) -> Dict[str, str]:
        meminfo: Dict[str, str] = {}

        # most of this mem info changes constantly, which makes deciding
        # when to update facts painful, so lets try to just collect the
        # useful bits

        useful: List[str] = ["MemTotal", "SwapTotal"]
        try:
            parser: re.Pattern = re.compile(r"^(?P<key>\S*):\s*(?P<value>\d*)\s*kB")
            memdata: TextIO = open("/proc/meminfo")
            for info in memdata:
                match: Optional[re.Match] = parser.match(info)
                if not match:
                    continue
                key, value = match.groups(["key", "value"])
                if key in useful:
                    nkey: str = ".".join(["memory", key.lower()])
                    meminfo[nkey] = "%s" % int(value)
        except Exception as e:
            log.warning("Error reading system memory information: %s", e)
        return meminfo

    def get_last_boot(self) -> Dict[str, str]:
        last_boot: str = "unknown"

        # Use a date for uptime instead of the actual uptime since that
        # would force a refresh for every run. This was inspired by the
        # spacewalk client at https://github.com/spacewalkproject/
        # spacewalk/blob/master/client/rhel/rhn-client-tools/src/bin/rhn_check.py
        try:
            uptime = float(open("/proc/uptime", "r").read().split()[0])
            uptime_delta = datetime.timedelta(seconds=uptime)
            now = datetime.datetime.now(datetime.timezone.utc)
            last_boot_date: datetime.datetime = now - uptime_delta
            last_boot: str = last_boot_date.strftime("%Y-%m-%d %H:%M:%S") + " UTC"
        except Exception as e:
            log.warning("Error reading uptime information %s", e)
        return {"last_boot": last_boot}

    def count_cpumask_entries(self, cpu: str, field: str) -> Optional[int]:
        try:
            f: TextIO = open("%s/topology/%s" % (cpu, field), "r")
        except IOError:
            return None

        # ia64 entries seem to be null padded, or perhaps
        # that's a collection error
        # FIXME
        entries: str = f.read().rstrip("\n\x00")
        f.close()
        # these fields can exist, but be empty. For example,
        # thread_siblings_list from s390x-rhel64-zvm-2cpu-has-topo
        # test data

        if len(entries):
            cpumask_entries: List[int] = gather_entries(entries)
            return len(cpumask_entries)
        # that field was empty
        return None

    def is_a64fx(self) -> bool:
        if self.arch != "aarch64":
            return False
        try:
            # MIDR_EL1 register shows the identifer of ARM64 CPU.
            # MIDR_EL1 of A64FX is as follows:
            # FX700:  0x00000000461f0010
            # FX1000: 0x00000000460f0010
            f: TextIO = open("/sys/devices/system/cpu/cpu0/regs/identification/midr_el1", "r")
        except IOError:
            return False

        midr: str = f.read().rstrip("\n")
        f.close()
        if midr == "0x00000000461f0010" or midr == "0x00000000460f0010":
            return True

        return False

    # replace/add with getting CPU Totals for s390x
    # FIXME cpu_count is not used
    def _parse_s390x_sysinfo_topology(self, cpu_count: int, sysinfo: List[str]) -> Optional[Dict[str, int]]:
        # to quote lscpu.c:
        # CPU Topology SW:      0 0 0 4 6 4
        # /* s390 detects its cpu topology via /proc/sysinfo, if present.
        # * Using simply the cpu topology masks in sysfs will not give
        # * usable results since everything is virtualized. E.g.
        # * virtual core 0 may have only 1 cpu, but virtual core 2 may
        # * five cpus.
        # * If the cpu topology is not exported (e.g. 2nd level guest)
        # * fall back to old calculation scheme.
        # */
        for line in sysinfo:
            if line.startswith("CPU Topology SW:"):
                parts: List[str] = line.split(":", 1)
                s390_topo_str: str = parts[1]
                topo_parts: List[str] = s390_topo_str.split()

                # indexes 3/4/5 being books/sockets_per_book,
                # and cores_per_socket based on lscpu.c
                book_count = int(topo_parts[3])
                sockets_per_book = int(topo_parts[4])
                cores_per_socket = int(topo_parts[5])

                socket_count: int = book_count * sockets_per_book
                cores_count: int = socket_count * cores_per_socket

                return {
                    "socket_count": socket_count,
                    "cores_count": cores_count,
                    "book_count": book_count,
                    "sockets_per_book": sockets_per_book,
                    "cores_per_socket": cores_per_socket,
                }
        log.debug("Looking for 'CPU Topology SW' in sysinfo, but it was not found")
        return None

    def has_s390x_sysinfo(self, proc_sysinfo: str) -> bool:
        return os.access(proc_sysinfo, os.R_OK)

    # FIXME cpu_count is not used
    def read_s390x_sysinfo(self, cpu_count, proc_sysinfo: str) -> List[str]:
        lines: List[str] = []
        try:
            f: TextIO = open(proc_sysinfo, "r")
        except IOError:
            return lines

        lines = f.readlines()
        f.close()
        return lines

    def read_physical_id(self, cpu_file: str) -> Optional[str]:
        try:
            f: TextIO = open("%s/physical_id" % cpu_file, "r")
        except IOError:
            return None

        buf: str = f.read().strip()
        f.close()
        return buf

    def _ppc64_fallback(self, cpu_files: List[str]) -> Optional[int]:
        # ppc64, particular POWER5/POWER6 machines, show almost
        # no cpu information on rhel5. There is a "physical_id"
        # associated with each cpu that seems to map to a
        # cpu, in a socket
        log.debug("trying ppc64 specific cpu topology detection")
        # try to find cpuN/physical_id
        physical_ids = set()
        for cpu_file in cpu_files:
            physical_id: Optional[str] = self.read_physical_id(cpu_file)
            # offline cpu's show physical id of -1. Normally
            # we count all present cpu's even if offline, but
            # in this case, we can't get any cpu info from the
            # cpu since it is offline, so don't count it
            if physical_id != "-1":
                physical_ids.add(physical_id)

        if physical_ids:
            # For rhel6 or newer, we have more cpu topology info
            # exposed by the kernel which will override this
            socket_count: int = len(physical_ids)
            # add marker here so we know we fail back to this
            log.debug("Using /sys/devices/system/cpu/cpu*/physical_id for cpu info on ppc64")
            return socket_count

        return None

    def check_for_cpu_topo(self, cpu_topo_dir: str) -> bool:
        return os.access(cpu_topo_dir, os.R_OK)

    def get_proc_cpuinfo(self) -> Dict[str, str]:
        proc_cpuinfo: Dict[str, str] = {}
        fact_namespace: str = "proc_cpuinfo"

        proc_cpuinfo_source = cpuinfo.SystemCpuInfoFactory.from_uname_machine(self.arch, prefix=self.prefix)

        for key, value in list(proc_cpuinfo_source.cpu_info.common.items()):
            proc_cpuinfo["%s.common.%s" % (fact_namespace, key)] = value

        # NOTE: cpu_info.other is a potentially ordered non-uniq list, so may
        # not make sense for shoving into a list.
        for key, value in proc_cpuinfo_source.cpu_info.other:
            proc_cpuinfo["%s.system.%s" % (fact_namespace, key)] = value

        # we could enumerate each processor here as proc_cpuinfo.cpu.3.key =
        # value, but that is a lot of fact table entries
        return proc_cpuinfo

    def get_proc_stat(self) -> Dict[str, str]:
        proc_stat: Dict[str, str] = {}
        fact_namespace: str = "proc_stat"
        proc_stat_path: str = "/proc/stat"

        btime_re: str = r"btime\W*([0-9]+)\W*$"
        try:
            with open(proc_stat_path, "r") as proc_stat_file:
                for line in proc_stat_file.readlines():
                    match: re.Match = re.match(btime_re, line.strip())
                    if match:
                        proc_stat["%s.btime" % (fact_namespace)] = match.group(1)
                        break  # We presently only care about the btime fact
        except Exception as e:
            # This fact is not required, only log failure to gather it at the debug level
            log.debug("Could not gather proc_stat facts: %s", e)
        return proc_stat

    def get_cpu_info(self) -> Dict[str, Union[str, int]]:
        cpu_info: Dict[str, Union[str, int]] = {}
        # we also have cpufreq, etc. in this dir, so match just the numbers
        cpu_re: str = r"cpu([0-9]+$)"

        cpu_files: List[str] = []
        sys_cpu_path: str = self.prefix + "/sys/devices/system/cpu/"
        for cpu in os.listdir(sys_cpu_path):
            if re.match(cpu_re, cpu):
                cpu_topo_dir: str = os.path.join(sys_cpu_path, cpu, "topology")

                # see rhbz#1070908
                # ppc64 machines running on LPARs will add
                # a sys cpu entry for every cpu thread on the
                # physical machine, regardless of how many are
                # allocated to the LPAR. This throws off the cpu
                # thread count, which throws off the cpu socket count.
                # The entries for the unallocated or offline cpus
                # do not have topology info however.
                # So, skip sys cpu entries without topology info.
                #
                # NOTE: this assumes RHEL6+, prior to rhel5, on
                # some arches like ppc and s390, there is no topology
                # info ever, so this will break.
                if self.check_for_cpu_topo(cpu_topo_dir):
                    cpu_files.append("%s/%s" % (sys_cpu_path, cpu))

        # for systems with no cpus
        if not cpu_files:
            return cpu_info

        cpu_count: int = len(cpu_files)

        # see if we have a /proc/sysinfo ala s390, if so
        # prefer that info
        proc_sysinfo: str = self.prefix + "/proc/sysinfo"
        has_sysinfo: bool = self.has_s390x_sysinfo(proc_sysinfo)

        # s390x can have cpu 'books'
        books: bool = False

        socket_count: int
        cores_per_socket: int
        cores_per_cpu: int

        # assume each socket has the same number of cores, and
        # each core has the same number of threads.
        #
        # This is not actually true sometimes... *cough*s390x*cough*
        # but lscpu makes the same assumption

        threads_per_core: int = self.count_cpumask_entries(cpu_files[0], "thread_siblings_list")
        if self.is_a64fx():
            # core_siblings_list of A64FX shows the cpu mask which is in a same CMG,
            # not the socket. A64FX has always 4 CMGs on the socket.
            cores_per_cpu = self.count_cpumask_entries(cpu_files[0], "core_siblings_list") * 4
        else:
            cores_per_cpu = self.count_cpumask_entries(cpu_files[0], "core_siblings_list")

        # if we find valid values in cpu/cpuN/topology/*siblings_list
        # sometimes it's not there, particularly on rhel5
        if threads_per_core and cores_per_cpu:
            cores_per_socket = cores_per_cpu // threads_per_core
            cpu_info["cpu.topology_source"] = "kernel /sys cpu sibling lists"

            # rhel6 s390x can have /sys cpu topo, but we can't make assumption
            # about it being evenly distributed, so if we also have topo info
            # in sysinfo, prefer that
            if self.arch == "s390x" and has_sysinfo:
                # for s390x on lpar, try to see if /proc/sysinfo has any
                # topo info
                log.debug("/proc/sysinfo found, attempting to gather cpu topology info")
                sysinfo_lines: List[str] = self.read_s390x_sysinfo(cpu_count, proc_sysinfo)
                if sysinfo_lines:
                    sysinfo = self._parse_s390x_sysinfo_topology(cpu_count, sysinfo_lines)

                    # verify the sysinfo has system level virt info
                    if sysinfo:
                        cpu_info["cpu.topology_source"] = "s390x sysinfo"
                        socket_count = sysinfo["socket_count"]
                        book_count = sysinfo["book_count"]
                        sockets_per_book = sysinfo["sockets_per_book"]
                        cores_per_socket = sysinfo["cores_per_socket"]
                        threads_per_core = 1

                        # we can have a mismatch between /sys and /sysinfo. We
                        # defer to sysinfo in this case even for cpu_count
                        # cpu_count = sysinfo['cores_count'] * threads_per_core
                        books = True

        else:
            # we have found no valid socket information, I only know
            # the number of cpu's, but no threads, no cores, no sockets
            log.debug("No cpu socket information found")

            # how do we get here?
            #   no cpu topology info, ala s390x on rhel5,
            #   no sysinfo topology info, ala s390x with zvm on rhel5
            # we have no great topo info here,
            # assume each cpu thread = 1 core = 1 socket
            threads_per_core = 1
            cores_per_cpu = 1
            cores_per_socket = 1
            socket_count = None

            # lets try some arch/platform specific approaches
            if self.arch == "ppc64":
                socket_count = self._ppc64_fallback(cpu_files)

                if socket_count:
                    log.debug("Using ppc64 cpu physical id for cpu topology info")
                    cpu_info["cpu.topology_source"] = "ppc64 physical_package_id"

            else:
                # all of our usual methods failed us...
                log.debug("No cpu socket info found for real or virtual hardware")
                # so we can track if we get this far
                cpu_info["cpu.topology_source"] = "fallback one socket"
                socket_count = cpu_count

            # for some odd cases where there are offline ppc64 cpu's,
            # this can end up not being a whole number...
            cores_per_socket = cpu_count // socket_count

        if cores_per_socket and threads_per_core:
            # for s390x with sysinfo topo, we use the sysinfo numbers except
            # for cpu_count, which takes offline cpus into account. This is
            # mostly just to match lscpu behaviour here
            if cpu_info["cpu.topology_source"] != "s390x sysinfo":
                socket_count = cpu_count // cores_per_socket // threads_per_core

        # s390 etc
        # for s390, socket calculations are per book, and we can have multiple
        # books, so multiply socket count by book count
        # see if we are on a s390 with book info
        # all s390 platforms show book siblings, even the ones that also
        # show sysinfo (lpar)... Except on rhel5, where there is no
        # cpu topology info with lpar
        #
        # if we got book info from sysinfo, prefer it
        book_siblings_per_cpu: int = None
        if not books:
            book_siblings_per_cpu = self.count_cpumask_entries(cpu_files[0], "book_siblings_list")
            if book_siblings_per_cpu:
                book_count = cpu_count // book_siblings_per_cpu
                sockets_per_book = book_count // socket_count
                cpu_info["cpu.topology_source"] = "s390 book_siblings_list"
                books = True

        # we should always know this...
        cpu_info["cpu.cpu(s)"] = cpu_count

        # these may be unknown...
        if socket_count:
            cpu_info["cpu.cpu_socket(s)"] = socket_count
        if cores_per_socket:
            cpu_info["cpu.core(s)_per_socket"] = cores_per_socket
        if threads_per_core:
            cpu_info["cpu.thread(s)_per_core"] = threads_per_core

        if book_siblings_per_cpu:
            cpu_info["cpu.book(s)_per_cpu"] = book_siblings_per_cpu

        if books:
            cpu_info["cpu.socket(s)_per_book"] = sockets_per_book
            cpu_info["cpu.book(s)"] = book_count

        return cpu_info

    def get_ls_cpu_info(self) -> Dict:
        # if we have `lscpu`, let's use it for facts as well, under
        # the `lscpu` name space
        if not os.access(self.LSCPU_CMD, os.R_OK):
            return {}

        # copy of parent process environment
        lscpu_env: Dict[str, str] = dict(os.environ)

        # # LANGUAGE trumps LC_ALL, LC_CTYPE, LANG. See rhbz#1225435, rhbz#1450210
        lscpu_env.update({"LANGUAGE": "en_US.UTF-8"})

        if self._check_lscpu_json(lscpu_env):
            return self._parse_lscpu_json_output(lscpu_env)

        return self._parse_lscpu_human_readable_output(lscpu_env)

    def _check_lscpu_json(self, lscpu_env: Dict[str, str]) -> bool:
        lscpu_cmd: List[str] = [self.LSCPU_CMD, "--help"]

        try:
            output: bytes = subprocess.check_output(lscpu_cmd, env=lscpu_env)
        except subprocess.CalledProcessError as e:
            log.warning("Failed to run 'lscpu --help': %s", e)
            return False

        return b"--json" in output

    def _parse_lscpu_human_readable_output(self, lscpu_env: Dict[str, str]) -> Dict[str, str]:
        lscpu_info: Dict[str, str] = {}
        lscpu_cmd: List[str] = [self.LSCPU_CMD]

        if self.testing:
            lscpu_cmd += ["-s", self.prefix]

        # For display/message only
        lscpu_cmd_string: str = " ".join(lscpu_cmd)

        try:
            lscpu_out_raw: bytes = subprocess.check_output(lscpu_cmd, env=lscpu_env)
            lscpu_out: str = lscpu_out_raw.decode("utf-8")
        except subprocess.CalledProcessError as e:
            log.exception(e)
            log.warning("Error with lscpu (%s) subprocess: %s", lscpu_cmd_string, e)
            return lscpu_info

        errors: List[Exception] = []
        try:
            cpu_data: List[str] = lscpu_out.strip().split("\n")
            for info in cpu_data:
                try:
                    key, value = info.split(":")
                    nkey: str = ".".join(["lscpu", key.lower().strip().replace(" ", "_")])
                    lscpu_info[nkey] = "%s" % value.strip()
                except ValueError as e:
                    # sometimes lscpu outputs weird things. Or fails.
                    # But this is per line, so keep track but let it pass.
                    errors.append(e)

        except Exception as e:
            log.warning("Error reading system CPU information: %s", e)
        if errors:
            log.debug("Errors while parsing lscpu output: %s", errors)

        return lscpu_info

    def _parse_lscpu_json_output(self, lscpu_env: Dict[str, str]) -> Dict[str, str]:
        lscpu_cmd: List[str] = [self.LSCPU_CMD, "--json"]
        if self.testing:
            lscpu_cmd += ["-s", self.prefix]

        try:
            output: bytes = subprocess.check_output(lscpu_cmd, env=lscpu_env)
        except subprocess.CalledProcessError as e:
            log.warning("Failed to run 'lscpu --json': %s", e)
            return {}

        try:
            output_json: dict = json.loads(output)
        except json.JSONDecodeError:
            log.exception("Failed to load the lscpu JSON: %s", output)
            return {}

        try:
            main_object: dict = output_json["lscpu"]
        except KeyError:
            log.warning("Failed to load the lscpu JSON: missing 'lscpu' " "root object")
            return {}

        lscpu_info: Dict[str, str] = {}

        def parse_item(obj: dict) -> None:
            try:
                # get 'field' and 'data', considering them mandatory, and thus
                # ignoring this element and all its children if they are
                # missing; note that 'data' can be null, see later on
                key = obj["field"]
                value = obj["data"]
            except KeyError:
                log.warning(
                    "Failed to load the lscpu JSON: object lacks " "missing 'field' and 'data' fields: %s",
                    obj,
                )
                return
            # 'data' is null, which means the field is an "header"; ignore it
            if value is not None:
                if key.endswith(":"):
                    key = key[:-1]
                nkey: str = ".".join(["lscpu", key.lower().strip().replace(" ", "_")])
                lscpu_info[nkey] = value.strip()
            try:
                children = obj["children"]
                parse_list(children)
            except KeyError:
                # no 'children' field available, which is OK
                pass

        def parse_list(json_list) -> None:
            for list_item in json_list:
                parse_item(list_item)

        parse_list(main_object)

        return lscpu_info


if __name__ == "__main__":
    _LIBPATH = "/usr/share/rhsm"
    # add to the path if need be
    if _LIBPATH not in sys.path:
        sys.path.append(_LIBPATH)

    from rhsm import logutil

    logutil.init_logger()

    hw = HardwareCollector(prefix=sys.argv[1], testing=True)

    if len(sys.argv) > 1:
        hw.prefix = sys.argv[1]
        hw.testing = True
    hw_dict = hw.get_all()

    # just show the facts collected, unless we specify data dir and well,
    # anything else
    if len(sys.argv) > 2:
        for hkey, hvalue in sorted(hw_dict.items()):
            print("'%s' : '%s'" % (hkey, hvalue))

    if not hw.testing:
        sys.exit(0)

    # verify the cpu socket info collection we use for rhel5 matches lscpu
    cpu_items: List[Tuple[str, str]] = [
        ("cpu.core(s)_per_socket", "lscpu.core(s)_per_socket"),
        ("cpu.cpu(s)", "lscpu.cpu(s)"),
        # NOTE: the substring is different for these two folks...
        # FIXME: follow up to see if this has changed
        ("cpu.cpu_socket(s)", "lscpu.socket(s)"),
        ("cpu.book(s)", "lscpu.book(s)"),
        ("cpu.thread(s)_per_core", "lscpu.thread(s)_per_core"),
        ("cpu.socket(s)_per_book", "lscpu.socket(s)_per_book"),
    ]
    failed: bool = False
    failed_list: List[Tuple[str, str, int, int]] = []
    for cpu_item in cpu_items:
        value_0 = int(hw_dict.get(cpu_item[0], -1))
        value_1 = int(hw_dict.get(cpu_item[1], -1))

        if value_0 != value_1 and ((value_0 != -1) and (value_1 != -1)):
            failed_list.append((cpu_item[0], cpu_item[1], value_0, value_1))

    must_haves = ["cpu.cpu_socket(s)", "cpu.cpu(s)", "cpu.core(s)_per_socket", "cpu.thread(s)_per_core"]
    missing_set = set(must_haves).difference(set(hw_dict))

    if failed:
        print("cpu detection error")
    for failed in failed_list:
        print("The values %s %s do not match (|%s| != |%s|)" % (failed[0], failed[1], failed[2], failed[3]))
    if missing_set:
        for missing in missing_set:
            print("cpu info fact: %s was missing" % missing)

    if failed:
        sys.exit(1)