ACIL FM
Dark
Refresh
Current DIR:
/usr/lib/python3.9/site-packages/cockpit/channels
/
usr
lib
python3.9
site-packages
cockpit
channels
Upload
Zip Selected
Delete Selected
Pilih semua
Nama
Ukuran
Permission
Aksi
__pycache__
-
chmod
Open
Rename
Delete
dbus.py
21.69 MB
chmod
View
DL
Edit
Rename
Delete
filesystem.py
24.97 MB
chmod
View
DL
Edit
Rename
Delete
http_channel.py
6.08 MB
chmod
View
DL
Edit
Rename
Delete
info.py
1001 B
chmod
View
DL
Edit
Rename
Delete
metrics.py
6.67 MB
chmod
View
DL
Edit
Rename
Delete
packages.py
3.96 MB
chmod
View
DL
Edit
Rename
Delete
pcp.py
27.01 MB
chmod
View
DL
Edit
Rename
Delete
stream.py
4.91 MB
chmod
View
DL
Edit
Rename
Delete
trivial.py
1.25 MB
chmod
View
DL
Edit
Rename
Delete
__init__.py
1.43 MB
chmod
View
DL
Edit
Rename
Delete
Edit file: /usr/lib/python3.9/site-packages/cockpit/channels/dbus.py
# This file is part of Cockpit. # # Copyright (C) 2022 Red Hat, Inc. # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <https://www.gnu.org/licenses/>. # Missing stuff compared to the C bridge that we should probably add: # # - removing matches # - removing watches # - emitting of signals # - publishing of objects # - failing more gracefully in some cases (during open, etc) # # Stuff we might or might not do: # # - using non-default service names # # Stuff we should probably not do: # # - emulation of ObjectManager via recursive introspection # - automatic detection of ObjectManager below the given path_namespace # - recursive scraping of properties for new object paths # (for path_namespace watches that don't hit an ObjectManager) import asyncio import errno import json import logging import traceback import xml.etree.ElementTree as ET from cockpit._vendor import systemd_ctypes from cockpit._vendor.systemd_ctypes import Bus, BusError, introspection from ..channel import Channel, ChannelError logger = logging.getLogger(__name__) # The dbusjson3 payload # # This channel payload type translates JSON encoded messages on a # Cockpit channel to D-Bus messages, in a mostly straightforward way. # See doc/protocol.md for a description of the basics. # # However, dbusjson3 offers some advanced features as well that are # meant to support the "magic" DBusProxy objects implemented by # cockpit.js. Those proxy objects "magically" expose all the methods # and properties of a D-Bus interface without requiring any explicit # binding code to be generated for a JavaScript client. A dbusjson3 # channel does this by doing automatic introspection and property # retrieval without much direction from the JavaScript client. # # The details of what exactly is done is not specified very strictly, # and the Python bridge will likely differ from the C bridge # significantly. This will be informed by what existing code actually # needs, and we might end up with a more concrete description of what # a client can actually expect. # # Here is an example of a more complex scenario: # # - The client adds a "watch" for a path namespace. There is a # ObjectManager at the given path and the bridge emits "meta" and # "notify" messages to describe all interfaces and objects reported # by that ObjectManager. # # - The client makes a method call that causes a new object with a new # interface to appear at the ObjectManager. The bridge will send a # "meta" and "notify" message to describe this new object. # # - Since the InterfacesAdded signal was emitted before the method # reply, the bridge must send the "meta" and "notify" messages # before the method reply message. # # - However, in order to construct the "meta" message, the bridge must # perform a Introspect call, and consequently must delay sending the # method reply until that call has finished. # # The Python bridge implements this delaying of messages with # coroutines and a fair mutex. Every message coming from D-Bus will # wait on the mutex for its turn to send its message on the Cockpit # channel, and will keep that mutex locked until it is done with # sending. Since the mutex is fair, everyone will nicely wait in line # without messages getting re-ordered. # # The scenario above will play out like this: # # - While adding the initial "watch", the lock is held until the # "meta" and "notify" messages have been sent. # # - Later, when the InterfacesAdded signal comes in that has been # triggered by the method call, the mutex will be locked while the # necessary introspection is going on. # # - The method reply will likely come while the mutex is locked, and # the task for sending that reply on the Cockpit channel will enter # the wait queue of the mutex. # # - Once the introspection is done and the new "meta" and "notify" # messages have been sent, the mutex is unlocked, the method reply # task acquires it, and sends its message. class InterfaceCache: def __init__(self): self.cache = {} self.old = set() # Interfaces already returned by get_interface_if_new def inject(self, interfaces) -> None: self.cache.update(interfaces) async def introspect_path(self, bus, destination, object_path): xml, = await bus.call_method_async(destination, object_path, 'org.freedesktop.DBus.Introspectable', 'Introspect') et = ET.fromstring(xml) interfaces = {tag.attrib['name']: introspection.parse_interface(tag) for tag in et.findall('interface')} # Add all interfaces we found: we might use them later self.inject(interfaces) return interfaces async def get_interface(self, interface_name, bus=None, destination=None, object_path=None): try: return self.cache[interface_name] except KeyError: pass if bus and object_path: try: await self.introspect_path(bus, destination, object_path) except BusError: pass return self.cache.get(interface_name) async def get_interface_if_new(self, interface_name, bus, destination, object_path): if interface_name in self.old: return None self.old.add(interface_name) return await self.get_interface(interface_name, bus, destination, object_path) async def get_signature(self, interface_name, method, bus=None, destination=None, object_path=None): interface = await self.get_interface(interface_name, bus, destination, object_path) if interface is None: raise KeyError(f'Interface {interface_name} is not found') return ''.join(interface['methods'][method]['in']) def notify_update(notify, path, interface_name, props): notify.setdefault(path, {})[interface_name] = {k: v.value for k, v in props.items()} class DBusChannel(Channel): json_encoder = systemd_ctypes.JSONEncoder(indent=2) payload = 'dbus-json3' matches = None name = None bus = None owner = None async def setup_name_owner_tracking(self): def send_owner(owner): # We must be careful not to send duplicate owner # notifications. cockpit.js relies on that. if self.owner != owner: self.owner = owner self.send_json(owner=owner) def handler(message): _name, _old, new = message.get_body() send_owner(owner=new if new != "" else None) self.add_signal_handler(handler, sender='org.freedesktop.DBus', path='/org/freedesktop/DBus', interface='org.freedesktop.DBus', member='NameOwnerChanged', arg0=self.name) try: unique_name, = await self.bus.call_method_async("org.freedesktop.DBus", "/org/freedesktop/DBus", "org.freedesktop.DBus", "GetNameOwner", "s", self.name) except BusError as error: if error.name == "org.freedesktop.DBus.Error.NameHasNoOwner": # Try to start it. If it starts successfully, we will # get a NameOwnerChanged signal (which will set # self.owner) before StartServiceByName returns. try: await self.bus.call_method_async("org.freedesktop.DBus", "/org/freedesktop/DBus", "org.freedesktop.DBus", "StartServiceByName", "su", self.name, 0) except BusError as start_error: logger.debug("Failed to start service '%s': %s", self.name, start_error.message) self.send_json(owner=None) else: logger.debug("Failed to get owner of service '%s': %s", self.name, error.message) else: send_owner(unique_name) def do_open(self, options): self.cache = InterfaceCache() self.name = options.get('name') self.matches = [] bus = options.get('bus') address = options.get('address') try: if address is not None: if bus is not None and bus != 'none': raise ChannelError('protocol-error', message='only one of "bus" and "address" can be specified') logger.debug('get bus with address %s for %s', address, self.name) self.bus = Bus.new(address=address, bus_client=self.name is not None) elif bus == 'internal': logger.debug('get internal bus for %s', self.name) self.bus = self.router.internal_bus.client else: if bus == 'session': logger.debug('get session bus for %s', self.name) self.bus = Bus.default_user() elif bus == 'system' or bus is None: logger.debug('get system bus for %s', self.name) self.bus = Bus.default_system() else: raise ChannelError('protocol-error', message=f'invalid bus "{bus}"') except OSError as exc: raise ChannelError('protocol-error', message=f'failed to connect to {bus} bus: {exc}') from exc try: self.bus.attach_event(None, 0) except OSError as err: if err.errno != errno.EBUSY: raise # This needs to be a fair mutex so that outgoing messages don't # get re-ordered. asyncio.Lock is fair. self.watch_processing_lock = asyncio.Lock() if self.name is not None: async def get_ready() -> None: async with self.watch_processing_lock: await self.setup_name_owner_tracking() if self.owner: self.ready(unique_name=self.owner) else: self.close({'problem': 'not-found'}) self.create_task(get_ready()) else: self.ready() def add_signal_handler(self, handler, **kwargs): r = dict(**kwargs) r['type'] = 'signal' if 'sender' not in r and self.name is not None: r['sender'] = self.name # HACK - https://github.com/bus1/dbus-broker/issues/309 # path_namespace='/' in a rule does not work. if r.get('path_namespace') == "/": del r['path_namespace'] def filter_owner(message): if self.owner is not None and self.owner == message.get_sender(): handler(message) if self.name is not None and 'sender' in r and r['sender'] == self.name: func = filter_owner else: func = handler r_string = ','.join(f"{key}='{value}'" for key, value in r.items()) if not self.is_closing(): # this gets an EINTR very often especially on RHEL 8 while True: try: match = self.bus.add_match(r_string, func) break except InterruptedError: pass self.matches.append(match) def add_async_signal_handler(self, handler, **kwargs): def sync_handler(message): self.create_task(handler(message)) self.add_signal_handler(sync_handler, **kwargs) async def do_call(self, message): path, iface, method, args = message['call'] cookie = message.get('id') flags = message.get('flags') timeout = message.get('timeout') if timeout is not None: # sd_bus timeout is μs, cockpit API timeout is ms timeout *= 1000 else: # sd_bus has no "indefinite" timeout, so use MAX_UINT64 timeout = 2 ** 64 - 1 # We have to figure out the signature of the call. Either we got told it: signature = message.get('type') # ... or there aren't any arguments if signature is None and len(args) == 0: signature = '' # ... or we need to introspect if signature is None: try: logger.debug('Doing introspection request for %s %s', iface, method) signature = await self.cache.get_signature(iface, method, self.bus, self.name, path) except BusError as error: self.send_json(error=[error.name, [f'Introspection: {error.message}']], id=cookie) return except KeyError: self.send_json( error=[ "org.freedesktop.DBus.Error.UnknownMethod", [f"Introspection data for method {iface} {method} not available"]], id=cookie) return except Exception as exc: self.send_json(error=['python.error', [f'Introspection: {exc!s}']], id=cookie) return try: method_call = self.bus.message_new_method_call(self.name, path, iface, method, signature, *args) reply = await self.bus.call_async(method_call, timeout=timeout) # If the method call has kicked off any signals related to # watch processing, wait for that to be done. async with self.watch_processing_lock: # TODO: stop hard-coding the endian flag here. self.send_json( reply=[reply.get_body()], id=cookie, flags="<" if flags is not None else None, type=reply.get_signature(True)) # noqa: FBT003 except BusError as error: # actually, should send the fields from the message body self.send_json(error=[error.name, [error.message]], id=cookie) except Exception: logger.exception("do_call(%s): generic exception", message) self.send_json(error=['python.error', [traceback.format_exc()]], id=cookie) async def do_add_match(self, message): add_match = message['add-match'] logger.debug('adding match %s', add_match) async def match_hit(message): logger.debug('got match') async with self.watch_processing_lock: self.send_json(signal=[ message.get_path(), message.get_interface(), message.get_member(), list(message.get_body()) ]) self.add_async_signal_handler(match_hit, **add_match) async def setup_objectmanager_watch(self, path, interface_name, meta, notify): # Watch the objects managed by the ObjectManager at "path". # Properties are not watched, that is done by setup_path_watch # below via recursive_props == True. async def handler(message): member = message.get_member() if member == "InterfacesAdded": (path, interface_props) = message.get_body() logger.debug('interfaces added %s %s', path, interface_props) meta = {} notify = {} async with self.watch_processing_lock: for name, props in interface_props.items(): if interface_name is None or name == interface_name: mm = await self.cache.get_interface_if_new(name, self.bus, self.name, path) if mm: meta.update({name: mm}) notify_update(notify, path, name, props) self.send_json(meta=meta) self.send_json(notify=notify) elif member == "InterfacesRemoved": (path, interfaces) = message.get_body() logger.debug('interfaces removed %s %s', path, interfaces) async with self.watch_processing_lock: notify = {path: dict.fromkeys(interfaces)} self.send_json(notify=notify) self.add_async_signal_handler(handler, path=path, interface="org.freedesktop.DBus.ObjectManager") objects, = await self.bus.call_method_async(self.name, path, 'org.freedesktop.DBus.ObjectManager', 'GetManagedObjects') for p, ifaces in objects.items(): for iface, props in ifaces.items(): if interface_name is None or iface == interface_name: mm = await self.cache.get_interface_if_new(iface, self.bus, self.name, p) if mm: meta.update({iface: mm}) notify_update(notify, p, iface, props) async def setup_path_watch(self, path, interface_name, recursive_props, meta, notify): # Watch a single object at "path", but maybe also watch for # property changes for all objects below "path". async def handler(message): async with self.watch_processing_lock: path = message.get_path() name, props, invalids = message.get_body() logger.debug('NOTIFY: %s %s %s %s', path, name, props, invalids) for inv in invalids: try: reply, = await self.bus.call_method_async(self.name, path, 'org.freedesktop.DBus.Properties', 'Get', 'ss', name, inv) except BusError as exc: logger.debug('failed to fetch property %s.%s on %s %s: %s', name, inv, self.name, path, str(exc)) continue props[inv] = reply notify = {} notify_update(notify, path, name, props) self.send_json(notify=notify) this_meta = await self.cache.introspect_path(self.bus, self.name, path) if interface_name is not None: interface = this_meta.get(interface_name) this_meta = {interface_name: interface} meta.update(this_meta) if recursive_props: self.add_async_signal_handler(handler, interface="org.freedesktop.DBus.Properties", path_namespace=path) else: self.add_async_signal_handler(handler, interface="org.freedesktop.DBus.Properties", path=path) for name in meta: if name.startswith("org.freedesktop.DBus."): continue try: props, = await self.bus.call_method_async(self.name, path, 'org.freedesktop.DBus.Properties', 'GetAll', 's', name) notify_update(notify, path, name, props) except BusError: pass async def do_watch(self, message): watch = message['watch'] path = watch.get('path') path_namespace = watch.get('path_namespace') interface_name = watch.get('interface') cookie = message.get('id') path = path or path_namespace recursive = path == path_namespace if path is None or cookie is None: logger.debug('ignored incomplete watch request %s', message) self.send_json(error=['x.y.z', ['Not Implemented']], id=cookie) self.send_json(reply=[], id=cookie) return try: async with self.watch_processing_lock: meta = {} notify = {} await self.setup_path_watch(path, interface_name, recursive, meta, notify) if recursive: await self.setup_objectmanager_watch(path, interface_name, meta, notify) self.send_json(meta=meta) self.send_json(notify=notify) self.send_json(reply=[], id=message['id']) except BusError as error: logger.debug("do_watch(%s) caught D-Bus error: %s", message, error.message) self.send_json(error=[error.name, [error.message]], id=cookie) async def do_meta(self, message): self.cache.inject(message['meta']) def do_data(self, data): message = json.loads(data) logger.debug('receive dbus request %s %s', self.name, message) if 'call' in message: self.create_task(self.do_call(message)) elif 'add-match' in message: self.create_task(self.do_add_match(message)) elif 'watch' in message: self.create_task(self.do_watch(message)) elif 'meta' in message: self.create_task(self.do_meta(message)) else: logger.debug('ignored dbus request %s', message) return def do_close(self): for slot in self.matches: slot.cancel() self.matches = [] self.close()
Simpan
Batal
Isi Zip:
Unzip
Create
Buat Folder
Buat File
Terminal / Execute
Run
Chmod Bulk
All File
All Folder
All File dan Folder
Apply