ACIL FM
Dark
Refresh
Current DIR:
/opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/plugins
/
opt
imunify360
venv
lib
python3.11
site-packages
defence360agent
plugins
Upload
Zip Selected
Delete Selected
Pilih semua
Nama
Ukuran
Permission
Aksi
__pycache__
-
chmod
Open
Rename
Delete
accumulate.py
3.63 MB
chmod
View
DL
Edit
Rename
Delete
analyst_cleanup_update.py
5.53 MB
chmod
View
DL
Edit
Rename
Delete
backup_info_sender.py
3.06 MB
chmod
View
DL
Edit
Rename
Delete
cagefs.py
5.17 MB
chmod
View
DL
Edit
Rename
Delete
checkpoint.py
1.23 MB
chmod
View
DL
Edit
Rename
Delete
client.py
10.6 MB
chmod
View
DL
Edit
Rename
Delete
config_merger.py
828 B
chmod
View
DL
Edit
Rename
Delete
config_watcher.py
1.89 MB
chmod
View
DL
Edit
Rename
Delete
event_hook_executor.py
777 B
chmod
View
DL
Edit
Rename
Delete
event_monitor.py
3.32 MB
chmod
View
DL
Edit
Rename
Delete
event_monitor_message_processor.py
6.33 MB
chmod
View
DL
Edit
Rename
Delete
files_recurring_update.py
1.09 MB
chmod
View
DL
Edit
Rename
Delete
icontact_sender.py
4.42 MB
chmod
View
DL
Edit
Rename
Delete
idle_time_out.py
1.21 MB
chmod
View
DL
Edit
Rename
Delete
lve_utils_install.py
1.58 MB
chmod
View
DL
Edit
Rename
Delete
myimunify.py
1.98 MB
chmod
View
DL
Edit
Rename
Delete
ping.py
536 B
chmod
View
DL
Edit
Rename
Delete
send_domain_list.py
2.78 MB
chmod
View
DL
Edit
Rename
Delete
send_server_config.py
10.96 MB
chmod
View
DL
Edit
Rename
Delete
__init__.py
0 B
chmod
View
DL
Edit
Rename
Delete
Edit file: /opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/plugins/send_server_config.py
import binascii import datetime import hashlib import os import re import uuid from functools import lru_cache from logging import getLogger from pathlib import Path from typing import Dict, List from defence360agent.contracts import sentry from defence360agent.contracts.config import ( ConfigFile, Core, CustomBillingConfig, Malware, MalwareSignatures, SystemConfig, int_from_envvar, FREEMIUM_FEATURE_FLAG, ) from defence360agent.contracts.license import LicenseCLN from defence360agent.contracts.messages import MessageType from defence360agent.contracts.plugins import ( MessageSink, MessageSource, expect, ) from defence360agent.contracts.myimunify_id import get_myimunify_users from defence360agent.feature_management.control import ( is_native_feature_management_enabled, is_native_feature_management_supported, ) from defence360agent.internals.iaid import IndependentAgentIDAPI from defence360agent.subsys.panels.hosting_panel import HostingPanel from defence360agent.subsys.panels.cpanel import cPanel from defence360agent.utils import ( log_error_and_ignore, recurring_check, Scope, stub_unexpected_error, safe_run, system_packages_info, ) from defence360agent.subsys.persistent_state import load_state, save_state from defence360agent.utils.whmcs import WhmcsConf #: info about these paths is sent to server in SERVER_CONFIG CH_PATHS = ( "/var/imunify360/imunify360.db", "/var/imunify360/imunify360.db-shm", "/var/imunify360/imunify360.db-wal", "/var/imunify360/gw.dir/", ) logger = getLogger(__name__) # Components which version sends to CH PACKAGES_TO_REPORT = { "imunify360-firewall", "imunify-antivirus", "ai-bolit", "app-version-detector", "imunify360-php-i360", "imunify360-webshield-bundle", "imunify-realtime-av", "imunify-auditd-log-reader", "imunify360-pam", "imunify-notifier", "imunify360-unified-access-logger", "imunify360-ossec-server", "imunify-core", "imunify-ui", "imunify360-venv", "imunify-patchman", } def read_cpu_info(): with open("/proc/cpuinfo") as f: return f.read() @lru_cache(maxsize=1) def get_cpu_info(): text = read_cpu_info() tuples = re.findall("^(.*?)[ \t]*:[ \t]*(.*)$", text, flags=re.M) res: List[dict] = [] current = {} for key, value in tuples: if key == "processor": if current and "processor" in current: res.append(current) current = {} current[key] = value res.append(current) return res @stub_unexpected_error def get_cpu_cores(): physical_ids = {} for processor in get_cpu_info(): if ( physical_id := processor.get("physical id", processor["processor"]) ) not in physical_ids: physical_ids[physical_id] = int(processor.get("cpu cores", 1)) return sum(physical_ids.values()) @stub_unexpected_error def get_cpu_model_and_flags(): processor = get_cpu_info()[0] return "{} {}".format( processor.get("model name") or processor["Processor"], processor.get("flags") or processor["Features"], ) @stub_unexpected_error async def get_hosting_panel_version(hp): return await hp.version() @stub_unexpected_error async def get_users_amount(hp): return await hp.users_count() @stub_unexpected_error async def get_domains_amount(hp): return len(await hp.get_domain_to_owner()) @stub_unexpected_error def get_malware_db_update_time(): if os.path.exists(MalwareSignatures.AI_BOLIT_HOSTER): return int(os.path.getmtime(MalwareSignatures.AI_BOLIT_HOSTER)) @stub_unexpected_error async def get_nfm_state(): if await is_native_feature_management_supported(): return await is_native_feature_management_enabled() def get_sha256_machine_id(): machine_id = Path("/etc/machine-id") if machine_id.exists(): return hashlib.sha256(machine_id.read_bytes()).hexdigest() return None @stub_unexpected_error def get_myimunify_whmcs_activation_state(): """ True only if active in whmcs config otherwise False """ activation_state = WhmcsConf().read().get("status") return activation_state == "active" async def get_additional_info(hp): return { "cpu_cores": get_cpu_cores(), "cpuinfo": get_cpu_model_and_flags(), "hosting_panel_version": await get_hosting_panel_version(hp), "users_amount": await get_users_amount(hp), "domains_amount": await get_domains_amount(hp), "trim_malicious": Malware.CLEANUP_TRIM, "days_to_keep_backup": Malware.CLEANUP_KEEP, "malware_db_update_time": get_malware_db_update_time(), "native_feature_management_enabled": await get_nfm_state(), "machine_id": get_sha256_machine_id(), "myimunify_freemium_flag_exists": os.path.exists( FREEMIUM_FEATURE_FLAG ), "myimunify_whmcs_activated": get_myimunify_whmcs_activation_state(), } async def get_users_configs(hp) -> Dict: """ Return dict that includes users config values that are explicitly set in the corresponding config files. """ result = dict() try: current_users = frozenset(await hp.get_users()) except Exception: logger.exception("Failed to get the list of panel's users.") current_users = frozenset() users_conf = os.path.join("*", Core.USER_CONFIG_FILE_NAME) for userconf_file in Path(Core.USER_CONFDIR).glob(users_conf): if userconf_file.parent.name in current_users: result[userconf_file.parent.name] = ConfigFile( username=userconf_file.parent.name ).config_to_dict(normalize=False) return result class SendServerConfig(MessageSink, MessageSource): """ This plugin is to provide central server with different server metrics. Message is sent on plugin creation, and then every :period: seconds """ SCOPE = Scope.AV def __init__(self, period=None): self._task = None # timestamp of the last ConfigUpdate sent to the server self._last_send_time = None # avoid duplicate on startup if period: self._period = period else: self._period = int_from_envvar( "IMUNIFY360_SEND_SERVER_CONFIG_PERIOD", int(datetime.timedelta(days=1).total_seconds() / 3), ) async def create_sink(self, loop): """MessageSink method""" @expect(MessageType.ConfigUpdate) @log_error_and_ignore() async def on_config_update_message(self, message): if self._last_send_time is None: # 1st ConfigUpdate # enable sending config on 2nd+ ConfigUpdate self._last_send_time = 0 return if not isinstance(message["conf"], SystemConfig): # ignore user configs, we do not need to send them on each change # all user configs will be send in # recurring_check(self._period)(self._send_server_config)() return if message["conf"].modified_since(self._last_send_time): self._last_send_time = message["timestamp"] self._loop.create_task(self._send_server_config()) async def create_source(self, loop, sink): self._loop = loop self._sink = sink self._task = self._loop.create_task( recurring_check(self._period)(self._send_server_config)() ) async def shutdown(self): if self._task is not None: self._task, t = None, self._task t.cancel() await t async def _create_server_config_msg(self): msg = MessageType.ServerConfig(uname=_uname_info()) diskstat = _diskstat() if diskstat: msg["diskstats"] = diskstat hp = HostingPanel() license_info = LicenseCLN.license_info() msg.update(sentry.tags()) msg.update(await get_additional_info(hp)) if hp.NAME == cPanel.NAME: msg["users"] = await get_myimunify_users() msg["iaid"] = IndependentAgentIDAPI.get_iaid() msg["status_license"] = LicenseCLN.is_valid() msg["system_info"] = { "uptime_since": await _uptime(), "devices": await _blkid(), "mac": await _mac_address(), } msg["agent_global_config"] = ( ConfigFile().config_to_dict() | CustomBillingConfig().config_to_dict() ) msg["agent_users_configs"] = await get_users_configs(hp) msg["paths"] = await _get_path_sizes(CH_PATHS) msg["components_versions"] = await system_packages_info( PACKAGES_TO_REPORT ) msg["agent_global_config"][ "CUSTOM_BILLING.effective_upgrade_url" ] = license_info.get("upgrade_url") msg["agent_global_config"][ "CUSTOM_BILLING.effective_upgrade_url_360" ] = license_info.get("upgrade_url_360") msg["agent_global_config"]["CORE.doctor_report"] = load_state( "doctor_key" ).get("doctor_key") save_state("doctor_key", {"doctor_key": None}) return msg async def _send_server_config(self): await self._sink.process_message( await self._create_server_config_msg() ) async def _get_path_sizes(paths) -> Dict[str, int]: """Return path->size mapping for *paths*. Send -errno on error. """ sizes = {} for path in map(os.fspath, paths): try: if os.path.isdir(path): size = _compute_dir_size(path) else: size = os.path.getsize(path) except OSError as e: logger.warning("Can't get size for %s, reason: %s", path, e) sizes[path] = -e.errno else: sizes[path] = size return sizes def _compute_dir_size(directory_path: str) -> int: total_size = 0 def _onerror(err: OSError): raise err for root, _dirs, files in os.walk( directory_path, onerror=_onerror, followlinks=False ): for file_name in files: file_path = os.path.join(root, file_name) try: total_size += os.path.getsize(file_path) except OSError as e: raise e return total_size def _diskstat(): try: with open("/proc/diskstats") as f: return f.read() except OSError as e: # pragma: no cover logger.warning("Can't get diskstat: %s", str(e)) def _uname_info() -> dict: return dict( zip( ("sysname", "nodename", "release", "version", "machine"), os.uname(), ) ) async def _uptime() -> str: """System up since""" return await safe_run(["uptime", "--since"]) async def _blkid() -> str: """Executes utility to locate/print block device attributes""" return await safe_run(["blkid"]) async def _mac_address() -> str: """MAC address in formatted way, like it specifies in /sys/class/net/*/address""" return binascii.hexlify(uuid.getnode().to_bytes(6, "big"), ":").decode()
Simpan
Batal
Isi Zip:
Unzip
Create
Buat Folder
Buat File
Terminal / Execute
Run
Chmod Bulk
All File
All Folder
All File dan Folder
Apply