ACIL FM
Dark
Refresh
Current DIR:
/usr/libexec/kcare/python/kcarectl
/
usr
libexec
kcare
python
kcarectl
Upload
Zip Selected
Delete Selected
Pilih semua
Nama
Ukuran
Permission
Aksi
__pycache__
-
chmod
Open
Rename
Delete
anomaly.py
11.02 MB
chmod
View
DL
Edit
Rename
Delete
auth.py
10.87 MB
chmod
View
DL
Edit
Rename
Delete
capabilities.py
956 B
chmod
View
DL
Edit
Rename
Delete
config.py
2.1 MB
chmod
View
DL
Edit
Rename
Delete
config_handlers.py
8.54 MB
chmod
View
DL
Edit
Rename
Delete
constants.py
1.35 MB
chmod
View
DL
Edit
Rename
Delete
errors.py
1.34 MB
chmod
View
DL
Edit
Rename
Delete
fetch.py
4.81 MB
chmod
View
DL
Edit
Rename
Delete
http_utils.py
7.27 MB
chmod
View
DL
Edit
Rename
Delete
ipv6_support.py
4.93 MB
chmod
View
DL
Edit
Rename
Delete
kcare.py
10.31 MB
chmod
View
DL
Edit
Rename
Delete
libcare.py
17.61 MB
chmod
View
DL
Edit
Rename
Delete
log_utils.py
2.84 MB
chmod
View
DL
Edit
Rename
Delete
platform_utils.py
8.67 MB
chmod
View
DL
Edit
Rename
Delete
process_utils.py
3.79 MB
chmod
View
DL
Edit
Rename
Delete
py23.py
2.15 MB
chmod
View
DL
Edit
Rename
Delete
selinux.py
1.64 MB
chmod
View
DL
Edit
Rename
Delete
serverid.py
1.85 MB
chmod
View
DL
Edit
Rename
Delete
server_info.py
3.58 MB
chmod
View
DL
Edit
Rename
Delete
update_utils.py
897 B
chmod
View
DL
Edit
Rename
Delete
utils.py
8.27 MB
chmod
View
DL
Edit
Rename
Delete
__init__.py
75.93 MB
chmod
View
DL
Edit
Rename
Delete
__main__.py
803 B
chmod
View
DL
Edit
Rename
Delete
Edit file: /usr/libexec/kcare/python/kcarectl/platform_utils.py
# Copyright (c) Cloud Linux Software, Inc # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENCE.TXT import base64 import json import os import platform import re import socket import sys from . import config, constants, log_utils, process_utils, selinux, utils if False: # pragma: no cover from typing import Any, Dict, Optional, Tuple # noqa: F401 VIRTWHAT = '/usr/libexec/kcare/virt-what' PROC_DIR = '/proc' def get_distro(): if sys.version_info[:2] < (3, 6): # pragma: no py3 cover return platform.linux_distribution() else: # pragma: no distro cover import distro return distro.linux_distribution(full_distribution_name=False) @utils.cached def get_system_uname(): return platform.uname()[2] def get_python_version(): # type: () -> str return '%s.%s' % (sys.version_info[0], sys.version_info[1]) def app_info(is_json=False): # type: (bool) -> str info = { 'python_version': get_python_version(), 'agent_version': constants.VERSION, } if selinux.is_selinux_enabled(): rc, stdout, stderr = process_utils.run_command(['ps', '-Z', '--no-headers', '--pid', str(os.getpid())], catch_stdout=True) if not rc: selinux_context = stdout.split()[0] else: selinux_context = 'error: %s' % stderr info['selinux_context'] = selinux_context if is_json: return json.dumps(info) info_keys = sorted(info) info_str = '' for info_key in info_keys: info_str += '%s: %s\n' % (info_key, info[info_key]) return info_str.rstrip() EFIVARS_PATH = '/sys/firmware/efi/efivars' EFI_VENDORS = { 'global': '8be4df61-93ca-11d2-aa0d-00e098032b8c', 'shim': '605dab50-e046-4300-abb6-3dd810dd8b23', } def _read_uefi_var(name, vendor, max_bytes=256): # type: (str, str, Optional[int]) -> Optional[bytes] var_path = os.path.join(EFIVARS_PATH, '%s-%s' % (name, vendor)) if not os.path.exists(var_path): return None with open(var_path, 'rb') as var: return var.read(max_bytes) def is_secure_boot(): # mocked: tests/unit/test_load_kmod.py # type: () -> bool try: secure_boot_var = _read_uefi_var('SecureBoot', EFI_VENDORS['global']) if secure_boot_var: return secure_boot_var[-1:] == b'\x01' # Get last byte except Exception: # pragma: no cover pass return False def _get_uefi_var_encoded(name, vendor): # type: (str, str) -> Optional[str] try: value_bytes = _read_uefi_var(name, vendor) if value_bytes is None: return None except Exception as e: value_bytes = str(e).encode() return utils.nstr(base64.urlsafe_b64encode(value_bytes)) def secure_boot_info(): # type: () -> dict[str, Any] cmdline = utils.try_to_read(os.path.join(PROC_DIR, 'cmdline')) if cmdline and len(cmdline) > 1024: # pragma: no cover cmdline = cmdline[:1024] info = {'cmdline': cmdline, 'has_efi': os.path.exists(os.path.dirname(EFIVARS_PATH))} # type: dict[str, Any] if not info['has_efi']: return info try: info['global'] = dict((var, _get_uefi_var_encoded(var, EFI_VENDORS['global'])) for var in ('SecureBoot', 'SetupMode')) shim_vars = sorted( [var[0 : -len(EFI_VENDORS['shim']) - 1] for var in os.listdir(EFIVARS_PATH) if var.endswith(EFI_VENDORS['shim'])] ) info['shim'] = {'vars': shim_vars} shim_exclude_vars = set(['MokListRT', 'MokListXRT', 'MokListTrustedRT', 'SbatLevelRT']) for var in shim_vars: if var in ('HSIStatus', 'MokIgnoreDB') or (var.endswith('RT') and var not in shim_exclude_vars): info['shim'][var] = _get_uefi_var_encoded(var, EFI_VENDORS['shim']) except Exception as err: log_utils.logwarn(err) return info @utils.cached def get_hostname(): # type: () -> str # KCARE-1165 If fqdn gathering is forced if config.REPORT_FQDN: try: # getaddrinfo() -> [(family, socktypeget_hostname, proto, canonname, sockaddr), ...] hostname = socket.getaddrinfo(socket.gethostname(), 0, 0, 0, 0, socket.AI_CANONNAME)[0][3] except socket.gaierror as ge: log_utils.logerror(ge) hostname = platform.node() else: hostname = platform.node() return hostname @utils.cached def get_uptime(): # type: () -> str uptime_file = os.path.join(PROC_DIR, 'uptime') if os.path.isfile(uptime_file): f = open(uptime_file, 'r') line = f.readline() result = str(int(float(line.split()[0]))) f.close() return result return '-1' @utils.cached def get_virt(): if os.path.isfile(VIRTWHAT): return process_utils.check_output([VIRTWHAT]).strip() return 'no-virt-what' # pragma: no cover def is_cpanel(): return os.path.isfile('/usr/local/cpanel/cpanel') def inside_vz_container(): # mocked: tests/unit/test_load_kmod.py return os.path.exists(os.path.join(PROC_DIR, 'vz', 'veinfo')) and not os.path.exists(os.path.join(PROC_DIR, 'vz', 'version')) def inside_lxc_container(): # mocked: tests/unit/test_load_kmod.py return '/lxc/' in open(os.path.join(PROC_DIR, '1', 'cgroup')).read() def inside_docker_container(): # mocked: tests/unit/test_load_kmod.py return os.path.isfile('/.dockerenv') @utils.catch_errors(logger=log_utils.logwarn) def get_load_average(): # type: () -> Optional[Tuple[float, float, float]] loadavg = utils.try_to_read(os.path.join(PROC_DIR, 'loadavg')) if not loadavg: return None m1, m5, m15, _ = loadavg.split(' ', 3) return (float(m1), float(m5), float(m15)) @utils.catch_errors(logger=log_utils.logwarn) def get_mem_info(): # type: () -> Optional[Dict[str, int]] """Returns dict of memory info in kB""" meminfo = utils.try_to_read(os.path.join(PROC_DIR, 'meminfo')) if not meminfo: return None filter_params = ('MemTotal', 'MemFree', 'SwapTotal', 'SwapFree') # optional units are ignored (assumed to be always kB for mem size) return dict((k, int(v)) for k, v in (re.split(r'[\s:]+', line)[:2] for line in meminfo.splitlines()) if k in filter_params) @utils.cached @utils.catch_errors(logger=log_utils.logwarn) def get_cpu_info(): # type: () -> Optional[Dict[str, Any]] cpuinfo = utils.try_to_read(os.path.join(PROC_DIR, 'cpuinfo')) if not cpuinfo: return None cpus = [ dict(re.split(r'\s*:\s*', line) for line in cpu_lines.splitlines()) for cpu_lines in cpuinfo.split('\n\n') # cpu records are separated by an empty line if cpu_lines ] return { 'logical_cores': len(cpus), 'physical_cores': len(set((cpu.get('physical id'), cpu.get('core id')) for cpu in cpus)), 'vendor_id': cpus[0].get('vendor_id'), 'model': int(cpus[0].get('model', 0)), 'model_name': cpus[0].get('model name'), 'cpu_family': int(cpus[0].get('cpu family', 0)), 'stepping': int(cpus[0].get('stepping', 0)), 'microcode': cpus[0].get('microcode'), 'flags': cpus[0].get('flags', '').split(), } @utils.catch_errors(logger=log_utils.logwarn, default_return=0) def get_process_count(): # type: () -> int return len([d for d in os.listdir(PROC_DIR) if d.isdigit()]) @utils.catch_errors(logger=log_utils.logwarn, default_return=0) def get_opened_files_count(): # type: () -> int fd_info = utils.try_to_read(os.path.join(PROC_DIR, 'sys/fs/file-nr')) return int((fd_info or '0').split()[0]) @utils.catch_errors(logger=log_utils.logwarn) def get_vm_count_kvm(): # type: () -> Optional[int] for _root, dirs, _files in os.walk('/sys/kernel/debug/kvm'): return len(dirs) return None @utils.catch_errors(logger=log_utils.logwarn, default_return=(0, 0)) def get_network_connections_count(): # type: () -> Tuple[int, int] """Return tuple of total numbers of TCP and UDP connections""" def conn_records_count(proto): # type: (str) -> int records = utils.try_to_read(os.path.join(PROC_DIR, 'net', proto)) if not records: return 0 return max(len(records.splitlines()) - 1, 0) return conn_records_count('tcp') + conn_records_count('tcp6'), conn_records_count('udp') + conn_records_count('udp6') def get_performance_metrics(): # type: () -> Dict[str, Any] conn_tcp, conn_udp = get_network_connections_count() return { 'load_average': get_load_average(), 'mem_info': get_mem_info(), 'cpu_info': get_cpu_info(), 'vm_count': get_vm_count_kvm(), 'processes': get_process_count(), 'open_files': get_opened_files_count(), 'tcp_connections': conn_tcp, 'udp_connections': conn_udp, }
Simpan
Batal
Isi Zip:
Unzip
Create
Buat Folder
Buat File
Terminal / Execute
Run
Chmod Bulk
All File
All Folder
All File dan Folder
Apply