ACIL FM
Dark
Refresh
Current DIR:
/opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/subsys
/
opt
imunify360
venv
lib
python3.11
site-packages
defence360agent
subsys
Upload
Zip Selected
Delete Selected
Pilih semua
Nama
Ukuran
Permission
Aksi
features
-
chmod
Open
Rename
Delete
panels
-
chmod
Open
Rename
Delete
__pycache__
-
chmod
Open
Rename
Delete
backup_systems.py
11.19 MB
chmod
View
DL
Edit
Rename
Delete
clcagefs.py
7.79 MB
chmod
View
DL
Edit
Rename
Delete
notifier.py
1.82 MB
chmod
View
DL
Edit
Rename
Delete
persistent_state.py
1.88 MB
chmod
View
DL
Edit
Rename
Delete
svcctl.py
8.67 MB
chmod
View
DL
Edit
Rename
Delete
sysctl.py
379 B
chmod
View
DL
Edit
Rename
Delete
systemd_notifier.py
1.58 MB
chmod
View
DL
Edit
Rename
Delete
web_server.py
21.22 MB
chmod
View
DL
Edit
Rename
Delete
__init__.py
0 B
chmod
View
DL
Edit
Rename
Delete
Edit file: /opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/subsys/notifier.py
"""Send events via Notification service""" import asyncio import base64 import json SOCKET_PATH = "/opt/imunify360/lib/event.sock" SOCKET_TIMEOUT = 10.0 # seconds _LEN_BYTES = 4 _MAX_SIZE = 1024 * 1024 CONFIG_UPDATED_EVENT_ID = "CONFIG_UPDATED" USER_SCAN_STARTED_EVENT_ID = "USER_SCAN_STARTED" USER_SCAN_FINISHED_EVENT_ID = "USER_SCAN_FINISHED" USER_SCAN_MALWARE_FOUND_EVENT_ID = "USER_SCAN_MALWARE_FOUND" CUSTOM_SCAN_STARTED_EVENT_ID = "CUSTOM_SCAN_STARTED" CUSTOM_SCAN_FINISHED_EVENT_ID = "CUSTOM_SCAN_FINISHED" CUSTOM_SCAN_MALWARE_FOUND_EVENT_ID = "CUSTOM_SCAN_MALWARE_FOUND" SCRIPT_BLOCKED_EVENT_ID = "SCRIPT_BLOCKED" def _prepare_event(event_id: str, user: str, body: dict) -> bytes: event = json.dumps( { "event_id": event_id, "user": user, "body": base64.b64encode(json.dumps(body).encode("utf-8")).decode( "utf-8" ), } ) binary = event.encode("utf-8") if len(binary) > _MAX_SIZE: raise Exception( "message size {} exceeds limit of {}".format( len(binary), _MAX_SIZE ) ) return len(binary).to_bytes(_LEN_BYTES, byteorder="big") + binary async def _send_event(event: bytes) -> None: _, writer = await asyncio.open_unix_connection(SOCKET_PATH) try: writer.write(event) await writer.drain() finally: writer.close() async def trigger_event(event_id: str, user: str, body: dict) -> None: """Send an event with given event_id and user, having given body.""" event = _prepare_event(event_id, user, body) await asyncio.wait_for(_send_event(event), SOCKET_TIMEOUT) async def config_updated() -> None: """Send CONFIG_UPDATED event. This forces imunify-notifier to reread its config.""" await trigger_event(CONFIG_UPDATED_EVENT_ID, "", {})
Simpan
Batal
Isi Zip:
Unzip
Create
Buat Folder
Buat File
Terminal / Execute
Run
Chmod Bulk
All File
All Folder
All File dan Folder
Apply