ACIL FM
Dark
Refresh
Current DIR:
/opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/rpc_tools
/
opt
imunify360
venv
lib
python3.11
site-packages
defence360agent
rpc_tools
Upload
Zip Selected
Delete Selected
Pilih semua
Nama
Ukuran
Permission
Aksi
__pycache__
-
chmod
Open
Rename
Delete
exceptions.py
403 B
chmod
View
DL
Edit
Rename
Delete
lookup.py
4.39 MB
chmod
View
DL
Edit
Rename
Delete
middleware.py
5.8 MB
chmod
View
DL
Edit
Rename
Delete
utils.py
4.25 MB
chmod
View
DL
Edit
Rename
Delete
validate.py
6.54 MB
chmod
View
DL
Edit
Rename
Delete
__init__.py
694 B
chmod
View
DL
Edit
Rename
Delete
Edit file: /opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/rpc_tools/utils.py
import pickle import asyncio import os from contextlib import suppress from functools import lru_cache, wraps from itertools import chain from pathlib import Path from typing import Optional, Tuple, Generator import yaml import psutil from defence360agent.contracts.config import SimpleRpc as Config from defence360agent.model.simplification import run_in_executor from defence360agent.rpc_tools.validate import ValidationError from defence360agent.utils import ( AV_PID_PATH, IM360_NON_RESIDENT_PID_PATH, IM360_RESIDENT_PID_PATH, antivirus_mode, is_centos6_or_cloudlinux6, ) def rpc_is_running(): """Check if non-resident agent instance is running""" # we use socket activation, so we could not use socket for this purpose # check process instead rpc_process_pid_path = ( AV_PID_PATH if antivirus_mode.enabled else IM360_NON_RESIDENT_PID_PATH ) if rpc_process_pid_path.exists(): current_pid = os.getpid() with suppress(Exception): pid = int(rpc_process_pid_path.read_text()) if is_centos6_or_cloudlinux6(): # imunify agent process is a child process of minidaemon pid = psutil.Process(pid).children()[0].pid return pid != current_pid and psutil.pid_exists(pid) return False def is_running(): """Check if the agent instance is running""" if Config.SOCKET_ACTIVATION: return rpc_is_running() if IM360_RESIDENT_PID_PATH.exists(): current_pid = os.getpid() pid = int(IM360_RESIDENT_PID_PATH.read_text()) return pid != current_pid and psutil.pid_exists(pid) return False def _find_schema(base=None, schema_dir="schema"): for path in find_schema_files(base, schema_dir): if (p := path.with_suffix(".pickle")).exists(): document = pickle.loads(p.read_bytes()) else: document = yaml.safe_load(path.read_text()) for k, v in document.items(): # converting keys - from strings to tuples yield tuple(k.split(" ")), v def find_schema_files( base=None, schema_dir="schema" ) -> Generator[Path, None, None]: p = base / schema_dir for path in [*p.rglob("*.yaml"), *p.rglob("*.yml")]: yield path def get_schema_paths(paths: Optional[Tuple[Path]] = None): result = list(paths)[:] if paths else [] path = Path(__file__).parent.parent / "simple_rpc" result.extend( [ path, path.parent / "feature_management" / "rpc", ] ) return result @lru_cache(1) def prepare_schema(paths): schema = [] for base_path in get_schema_paths(paths): schema.append(_find_schema(base_path)) return dict(chain(*schema)) def run_in_executor_decorator(f): @wraps(f) async def wrapper(*args, **kwargs): return await run_in_executor( asyncio.get_event_loop(), lambda: f(*args, **kwargs) ) return wrapper def generate_warnings( affected, not_affected, dest_listname, all_list, success_warning, failure_warning, in_another_list_warning=None, ): """ :param list affected: IPs that were changed during operation :param list of tuples || list of str not_affected: IPs & it's listnames that weren't changed during operation :param list all_list: list of all IPs that take place in operation :param str success_warning: msg if IP was changed :param str failure_warning: msg if IPs wasn't changed and it's absent in any other lists :param str in_another_list_warning: msg if IPs wasn't changed , however it present in another list :return list of st warnings: msg to be printed """ warnings = [] for item in not_affected: record, listname = item["rec"], item.get("listname", dest_listname) if listname and in_another_list_warning: warnings.append(in_another_list_warning.format(record, listname)) else: warnings.append(failure_warning.format(record, dest_listname)) num_deleted = len(affected) total_num = len(all_list) if total_num > 1 and total_num != num_deleted: warnings.append(success_warning.format(num_deleted, total_num)) if warnings: raise ValidationError(warnings) return {}
Simpan
Batal
Isi Zip:
Unzip
Create
Buat Folder
Buat File
Terminal / Execute
Run
Chmod Bulk
All File
All Folder
All File dan Folder
Apply