File: //proc/self/root/proc/thread-self/root/usr/lib64/python3.9/site-packages/rhsmlib/dbus/dbus_utils.py
# Copyright (C) 2011,2012 Red Hat, Inc.
#
# Authors:
# Thomas Woerner <twoerner@redhat.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
import logging
import pwd
import xml.etree.ElementTree as Et
import dbus
log = logging.getLogger(__name__)
def command_of_pid(pid):
"""Get command for pid from /proc"""
try:
with open("/proc/%d/cmdline" % pid, "r") as f:
cmd = f.readlines()[0].replace("\0", " ").strip()
except Exception as err:
log.warning("Unable to get cmdline of PID: %s, error: %s" % (pid, err))
return None
return cmd
def pid_of_sender(bus, sender):
"""Get pid from sender string using
org.freedesktop.DBus.GetConnectionUnixProcessID"""
dbus_obj = bus.get_object("org.freedesktop.DBus", "/org/freedesktop/DBus")
dbus_iface = dbus.Interface(dbus_obj, "org.freedesktop.DBus")
try:
pid = int(dbus_iface.GetConnectionUnixProcessID(sender))
except ValueError:
return None
# It seems that Python D-Bus implementation contains error. This should not happen,
# when we try to call GetConnectionUnixProcessID() with sender that does not exist
# anymore
except dbus.exceptions.DBusException as err:
log.debug(f"D-Bus raised exception: {err}")
return None
return pid
def uid_of_sender(bus, sender):
"""Get user id from sender string using
org.freedesktop.DBus.GetConnectionUnixUser"""
dbus_obj = bus.get_object("org.freedesktop.DBus", "/org/freedesktop/DBus")
dbus_iface = dbus.Interface(dbus_obj, "org.freedesktop.DBus")
try:
uid = int(dbus_iface.GetConnectionUnixUser(sender))
except ValueError:
return None
return uid
def user_of_uid(uid):
"""Get user for uid from pwd"""
try:
pws = pwd.getpwuid(uid)
except Exception:
return None
return pws[0]
def context_of_sender(bus, sender):
"""Get SELinux context from sender string using
org.freedesktop.DBus.GetConnectionSELinuxSecurityContext"""
dbus_obj = bus.get_object("org.freedesktop.DBus", "/org/freedesktop/DBus")
dbus_iface = dbus.Interface(dbus_obj, "org.freedesktop.DBus")
try:
context = dbus_iface.GetConnectionSELinuxSecurityContext(sender)
except Exception as exc:
log.debug(f"Could not get SELinux context: {exc}")
return None
return "".join(map(chr, dbus_to_python(context)))
def command_of_sender(bus, sender):
"""Return command of D-Bus sender"""
return command_of_pid(pid_of_sender(bus, sender))
def user_of_sender(bus, sender):
return user_of_uid(uid_of_sender(bus, sender))
def dbus_to_python(obj, expected_type=None):
if obj is None:
python_obj = obj
elif isinstance(obj, dbus.Boolean):
python_obj = bool(obj)
elif isinstance(obj, dbus.String):
python_obj = str(obj)
elif isinstance(obj, dbus.ObjectPath):
python_obj = str(obj)
elif (
isinstance(obj, dbus.Byte)
or isinstance(obj, dbus.Int16)
or isinstance(obj, dbus.Int32)
or isinstance(obj, dbus.Int64)
or isinstance(obj, dbus.UInt16)
or isinstance(obj, dbus.UInt32)
or isinstance(obj, dbus.UInt64)
):
python_obj = int(obj)
elif isinstance(obj, dbus.Double):
python_obj = float(obj)
elif isinstance(obj, dbus.Array):
python_obj = [dbus_to_python(x) for x in obj]
elif isinstance(obj, dbus.Struct):
python_obj = tuple([dbus_to_python(x) for x in obj])
elif isinstance(obj, dbus.Dictionary):
python_obj = dict([dbus_to_python(k), dbus_to_python(v)] for k, v in list(obj.items()))
elif (
isinstance(obj, bool)
or isinstance(obj, str)
or isinstance(obj, bytes)
or isinstance(obj, int)
or isinstance(obj, float)
or isinstance(obj, list)
or isinstance(obj, tuple)
or isinstance(obj, dict)
):
python_obj = obj
else:
raise TypeError("Unhandled %s" % obj)
if expected_type is not None:
if (
(expected_type == bool and not isinstance(python_obj, bool))
or (expected_type == str and not isinstance(python_obj, str))
or (expected_type == int and not isinstance(python_obj, int))
or (expected_type == float and not isinstance(python_obj, float))
or (expected_type == list and not isinstance(python_obj, list))
or (expected_type == tuple and not isinstance(python_obj, tuple))
or (expected_type == dict and not isinstance(python_obj, dict))
):
raise TypeError("%s is %s, expected %s" % (python_obj, type(python_obj), expected_type))
return python_obj
# From lvm-dubstep/lvmdbus/utils.py (GPLv2, copyright Red Hat)
# https://github.com/tasleson/lvm-dubstep
_type_map = dict(
s=dbus.String,
o=dbus.ObjectPath,
t=dbus.UInt64,
x=dbus.Int64,
u=dbus.UInt32,
i=dbus.Int32,
n=dbus.Int16,
q=dbus.UInt16,
d=dbus.Double,
y=dbus.Byte,
b=dbus.Boolean,
)
def _pass_through(v):
"""
If we have something which is not a simple type we return the original
value un-wrapped.
:param v:
:return:"""
return v
def _dbus_type(t, value):
return _type_map.get(t, _pass_through)(value)
def add_properties(xml, interface, props):
"""
Given xml that describes the interface, add property values to the XML
for the specified interface.
:param xml: XML to edit
:param interface: Interface to add the properties too
:param props: Output from get_properties
:return: updated XML string
"""
root = Et.fromstring(xml)
if props:
for c in root:
# print c.attrib['name']
if c.attrib["name"] == interface:
for p in props:
temp = '<property type="%s" name="%s" access="%s"/>\n' % (
p["p_t"],
p["p_name"],
p["p_access"],
)
log.debug("intro xml temp buf=%s", temp)
c.append(Et.fromstring(temp))
return Et.tostring(root, encoding="utf8")
return xml
def dict_to_variant_dict(in_dict):
# Handle creating dbus.Dictionaries with signatures of 'sv'
for key, value in in_dict.items():
if isinstance(value, dict):
in_dict[key] = dict_to_variant_dict(value)
return dbus.Dictionary(in_dict, signature="sv")