ACIL FM
Dark
Refresh
Current DIR:
/usr/libexec/kcare/python/kcarectl
/
usr
libexec
kcare
python
kcarectl
Upload
Zip Selected
Delete Selected
Pilih semua
Nama
Ukuran
Permission
Aksi
__pycache__
-
chmod
Open
Rename
Delete
anomaly.py
11.02 MB
chmod
View
DL
Edit
Rename
Delete
auth.py
10.87 MB
chmod
View
DL
Edit
Rename
Delete
capabilities.py
956 B
chmod
View
DL
Edit
Rename
Delete
config.py
2.1 MB
chmod
View
DL
Edit
Rename
Delete
config_handlers.py
8.54 MB
chmod
View
DL
Edit
Rename
Delete
constants.py
1.35 MB
chmod
View
DL
Edit
Rename
Delete
errors.py
1.34 MB
chmod
View
DL
Edit
Rename
Delete
fetch.py
4.81 MB
chmod
View
DL
Edit
Rename
Delete
http_utils.py
7.27 MB
chmod
View
DL
Edit
Rename
Delete
ipv6_support.py
4.93 MB
chmod
View
DL
Edit
Rename
Delete
kcare.py
10.31 MB
chmod
View
DL
Edit
Rename
Delete
libcare.py
17.61 MB
chmod
View
DL
Edit
Rename
Delete
log_utils.py
2.84 MB
chmod
View
DL
Edit
Rename
Delete
platform_utils.py
8.67 MB
chmod
View
DL
Edit
Rename
Delete
process_utils.py
3.79 MB
chmod
View
DL
Edit
Rename
Delete
py23.py
2.15 MB
chmod
View
DL
Edit
Rename
Delete
selinux.py
1.64 MB
chmod
View
DL
Edit
Rename
Delete
serverid.py
1.85 MB
chmod
View
DL
Edit
Rename
Delete
server_info.py
3.58 MB
chmod
View
DL
Edit
Rename
Delete
update_utils.py
897 B
chmod
View
DL
Edit
Rename
Delete
utils.py
8.27 MB
chmod
View
DL
Edit
Rename
Delete
__init__.py
75.93 MB
chmod
View
DL
Edit
Rename
Delete
__main__.py
803 B
chmod
View
DL
Edit
Rename
Delete
Edit file: /usr/libexec/kcare/python/kcarectl/kcare.py
# Copyright (c) Cloud Linux Software, Inc # Licensed under CLOUD LINUX LICENSE AGREEMENT # http://cloudlinux.com/docs/LICENCE.TXT import ast import glob import hashlib import json import os import platform from . import config, constants, log_utils, process_utils, utils from .errors import SafeExceptionWrapper from .py23 import json_loads_nstr if False: # pragma: no cover from typing import List, Optional, Tuple # noqa: F401 UNAME_LABEL = 'uname: ' def is_uname_char(c): # type: (str) -> bool return str.isalnum(c) or c in '.-_+' def parse_uname(patch_level): khash = get_kernel_hash() f = open(get_cache_path(khash, patch_level, config.PATCH_INFO), 'r') try: for line in f.readlines(): if line.startswith(UNAME_LABEL): return ''.join(filter(is_uname_char, line[len(UNAME_LABEL) :].strip())) finally: f.close() return '' def kcare_update_effective_version(new_version): if os.path.exists(config.KCARE_UNAME_FILE): try: f = open(config.KCARE_UNAME_FILE, 'w') f.write(new_version) f.close() return True except Exception: pass return False def get_kernel_hash(): # type: () -> str f = open(config.KERNEL_VERSION_FILE, 'rb') try: # sha1 is not used for security, turn off bandit warning # bandit issues a warning that B324 has no test when `nosec B324` is # set here. Using broad `nosec` here to bypass the warning. return hashlib.sha1(f.read()).hexdigest() # nosec B324 finally: f.close() def get_last_stop(): # type: () -> str """Returns timestamp from PATCH_CACHE/stoped.at if its exsits""" stopped_at_filename = os.path.join(constants.PATCH_CACHE, 'stopped.at') if os.path.exists(stopped_at_filename): with open(stopped_at_filename, 'r') as fh: value = fh.read().rstrip() try: int(value) except ValueError: return str(int(os.path.getctime(stopped_at_filename))) except Exception: # pragma: no cover, it should not happen return 'error' return value return '-1' def get_cache_path(khash, plevel, fname): prefix = config.PREFIX or 'none' ptype = config.PATCH_TYPE or 'default' patch_dir = '-'.join([prefix, khash, str(plevel), ptype]) result = (constants.PATCH_CACHE, 'patches', patch_dir) # type: Tuple[str, ...] if fname: result += (fname,) return os.path.join(*result) def get_kernel_prefixed_url(*parts): return utils.get_patch_server_url(config.PREFIX, *parts) class BaseKernelPatchLevel(int): def cache_path(self, *parts): return get_cache_path(self.khash, str(self), *parts) # type: ignore[attr-defined] def as_dict(self): return { 'level': self.level, 'khash': self.khash, 'baseurl': self.baseurl, 'release': self.release, } class KernelPatchLevel(BaseKernelPatchLevel): def __new__(cls, khash, level, baseurl, release=None): return super(cls, cls).__new__(cls, level) def __init__(self, khash, level, baseurl, release=None): self.level = level self.khash = khash self.baseurl = baseurl self.release = release def kmod_url(self, *parts): return utils.get_patch_server_url(self.baseurl, self.khash, *parts) def file_url(self, *parts): return utils.get_patch_server_url(self.baseurl, self.khash, str(self), *parts) class LegacyKernelPatchLevel(BaseKernelPatchLevel): def __new__(cls, khash, level): try: return super(cls, cls).__new__(cls, level) except ValueError as exc: # common error with this class raise SafeExceptionWrapper(exc) def __init__(self, khash, level): self.level = level self.khash = khash self.baseurl = None self.release = None def kmod_url(self, *parts): if 'patches.kernelcare.com' in config.PATCH_SERVER: return get_kernel_prefixed_url(self.khash, str(self), *parts) # ePortal workaround, it doesn't support leveled links to kmod return get_kernel_prefixed_url(self.khash, *parts) def file_url(self, *parts): return get_kernel_prefixed_url(self.khash, str(self), *parts) def upgrade(self, baseurl): return KernelPatchLevel(self.khash, int(self), baseurl) def dump_kernel_patch_level(kernel_patch_level): # type: (BaseKernelPatchLevel) -> None try: with open(os.path.join(constants.PATCH_CACHE, 'kernel_patch_level.json'), 'w') as f: json.dump(kernel_patch_level.as_dict(), f) except Exception: log_utils.logexc('failed to dump kernel patch level', print_msg=False) def read_dumped_kernel_patch_level(): try: with open(os.path.join(constants.PATCH_CACHE, 'kernel_patch_level.json')) as f: return json_loads_nstr(f.read()) except Exception: log_utils.logexc('failed to read dumped kernel patch level', print_msg=False) def sort_files_by_ctime(files_list): # type: (List[str]) -> List[Tuple[str, float]] return sorted( [(it, os.path.getctime(it)) for it in files_list], key=lambda pair: pair[1], reverse=True, ) def get_kdump_root(): # type: () -> str kdump_path = "/var/crash" if not os.path.isfile("/etc/kdump.conf"): return kdump_path with open("/etc/kdump.conf") as kdump_conf: for line in kdump_conf: line = line.strip() if line.startswith('path '): _, kdump_path = line.split(None, 1) return kdump_path def list_kdump_vcore_files(): # type: () -> List[str] kdump_root = get_kdump_root() if not os.path.isdir(kdump_root): return [] return glob.glob(os.path.join(kdump_root, '*/vmcore')) def list_kdump_txt_files(): # type: () -> List[str] kdump_root = get_kdump_root() if not os.path.isdir(kdump_root): return [] return glob.glob(os.path.join(kdump_root, '*/*.txt')) def list_crashreporter_log_files(): # type: () -> List[str] if not os.path.isdir(config.KDUMPS_DIR): return [] return glob.glob(os.path.join(config.KDUMPS_DIR, '*.log')) def list_crashreporter_artifacts(): # type: () -> List[str] if not os.path.isdir(config.KDUMPS_DIR): return [] return [os.path.join(config.KDUMPS_DIR, it) for it in os.listdir(config.KDUMPS_DIR)] @utils.cached def kdumps_latest_event_timestamp(): # type: () -> Optional[float] kdumps = list_kdump_vcore_files() if not kdumps: return None return sort_files_by_ctime(kdumps)[0][1] @utils.cached def kdump_status(): if constants.SKIP_SYSTEMCTL_CHECK or os.path.isfile(constants.SYSTEMCTL): _, stdout, _ = process_utils.run_command([constants.SYSTEMCTL, 'is-active', 'kdump'], catch_stdout=True, catch_stderr=True) return stdout.strip() return 'systemd-absent' @utils.cached def crashreporter_latest_event_timestamp(): # type: () -> Optional[float] artifacts = list_crashreporter_artifacts() if not artifacts: return None return sort_files_by_ctime(artifacts)[0][1] def get_current_kmod_version(): kmod_version_file = '/sys/module/kcare/version' if not os.path.exists(kmod_version_file): return with open(kmod_version_file, 'r') as f: version = f.read().strip() return version def is_kmod_version_changed(khash, plevel): old_version = get_current_kmod_version() if not old_version: return True new_version = process_utils.check_output( ['/sbin/modinfo', '-F', 'version', get_cache_path(khash, plevel, constants.KMOD_BIN)] ).strip() return old_version != new_version def kcare_uname_su(): patch_level = loaded_patch_level() if not patch_level: return platform.release() return parse_uname(patch_level) def kcare_uname(): if os.path.exists(config.KCARE_UNAME_FILE): return open(config.KCARE_UNAME_FILE, 'r').read().strip() else: # TODO: talk to @kolshanov about runtime results from KPATCH_CTL info # (euname from kpatch-description -- not from kpatch.info file) return kcare_uname_su() def loaded_patch_level(): # mocked: tests/unit pl = parse_patch_description(loaded_patch_description())['patch-level'] if pl: try: int(pl) except ValueError as e: raise SafeExceptionWrapper(e, 'Unexpected patch state', _patch_info()) return LegacyKernelPatchLevel(get_kernel_hash(), pl) def _patch_info(): return process_utils.check_output([constants.KPATCH_CTL, 'info']) @utils.cached def get_loaded_modules(): try: return [line.split()[0] for line in open('/proc/modules')] except (OSError, IOError) as ex: log_utils.logerror('Error getting loaded modules list: ' + str(ex), print_msg=False) return [] def loaded_patch_description(): if 'kcare' not in get_loaded_modules(): return None # example: 28-:1532349972;4.4.0-128.154 # (patch level: number)-(patch type: free/extra/empty):(timestamp);(effective kernel version from kpatch.info) return get_patch_value(_patch_info(), 'kpatch-description') def get_patch_value(info, label): return utils.data_as_dict(info).get(label) def parse_patch_description(desc): result = {'patch-level': None, 'patch-type': 'default', 'last-update': '', 'kernel-version': ''} if not desc: return result level_type_timestamp, _, kernel = desc.partition(';') level_type, _, timestamp = level_type_timestamp.partition(':') patch_level, _, patch_type = level_type.partition('-') # need to return patch_level=None not to break old code # TODO: refactor all loaded_patch_level() usages to work with empty string instead of None result['patch-level'] = patch_level or None result['patch-type'] = patch_type or 'default' result['last-update'] = timestamp result['kernel-version'] = kernel return result def get_state(): state_file = os.path.join(constants.PATCH_CACHE, 'kcare.state') if os.path.exists(state_file): with open(state_file, 'r') as f: try: state = f.read() return ast.literal_eval(state) except (SyntaxError, OSError, ValueError, TypeError, UnicodeDecodeError): pass
Simpan
Batal
Isi Zip:
Unzip
Create
Buat Folder
Buat File
Terminal / Execute
Run
Chmod Bulk
All File
All Folder
All File dan Folder
Apply