ACIL FM
Dark
Refresh
Current DIR:
/opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/api/server
/
opt
imunify360
venv
lib
python3.11
site-packages
defence360agent
api
server
Upload
Zip Selected
Delete Selected
Pilih semua
Nama
Ukuran
Permission
Aksi
__pycache__
-
chmod
Open
Rename
Delete
analyst_cleanup.py
5.24 MB
chmod
View
DL
Edit
Rename
Delete
cleanup_revert.py
817 B
chmod
View
DL
Edit
Rename
Delete
events.py
1.82 MB
chmod
View
DL
Edit
Rename
Delete
reputation.py
2.24 MB
chmod
View
DL
Edit
Rename
Delete
send_message.py
5.31 MB
chmod
View
DL
Edit
Rename
Delete
__init__.py
2.79 MB
chmod
View
DL
Edit
Rename
Delete
Edit file: /opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/api/server/send_message.py
import base64 import json import os import time import urllib.error import urllib.request from abc import ABC, abstractmethod from logging import getLogger from typing import Optional import asyncio import uuid from defence360agent.api.server import ( API, APIError, APITokenError, FGWSendMessgeException, ) from defence360agent.contracts.config import Core from defence360agent.contracts.messages import Message from defence360agent.internals.global_scope import g from defence360agent.internals.iaid import ( IndependentAgentIDAPI, IAIDTokenError, ) from defence360agent.utils.async_utils import AsyncIterate from defence360agent.utils.json import ServerJSONEncoder logger = getLogger(__name__) class BaseSendMessageAPI(API, ABC): URL = "/api/v2/send-message/{method}" @abstractmethod async def _send_request(self, message_method, headers, post_data) -> dict: pass # pragma: no cover def check_response(self, result: dict) -> None: if "status" not in result: raise APIError("unexpected server response: {!r}".format(result)) if result["status"] != "ok": raise APIError("server error: {}".format(result.get("msg"))) async def send_data(self, method: str, post_data: bytes) -> None: try: token = await IndependentAgentIDAPI.get_token() except IAIDTokenError as e: raise APITokenError(f"IAID token error occurred {e}") headers = { "Content-Type": "application/json", "X-Auth": token, } result = await self._send_request(method, headers, post_data) self.check_response(result) class SendMessageAPI(BaseSendMessageAPI): _SOCKET_TIMEOUT = Core.DEFAULT_SOCKET_TIMEOUT def __init__(self, rpm_ver: str, base_url: str = None, executor=None): self._executor = executor self.rpm_ver = rpm_ver self.product_name = "" self.server_id = None # type: Optional[str] self.license = {} # type: dict if base_url: self.base_url = base_url else: self.base_url = self._BASE_URL def set_product_name(self, product_name: str) -> None: self.product_name = product_name def set_server_id(self, server_id: Optional[str]) -> None: self.server_id = server_id def set_license(self, license: dict) -> None: self.license = license async def _send_request(self, message_method, headers, post_data): request = urllib.request.Request( self.base_url + self.URL.format(method=message_method), data=post_data, headers=headers, method="POST", ) return await self.async_request(request, executor=self._executor) async def send_message(self, message: Message) -> None: # add message handling time if it does not exist, so that # the server does not depend on the time it was received if "timestamp" not in message: message["timestamp"] = time.time() if "message_id" not in message: message["message_id"] = uuid.uuid4().hex if "method" not in message: message["method"] = "INCIDENT_LIST" data2send = { "payload": message.payload, "rpm_ver": self.rpm_ver, "message_id": message.message_id, "server_id": self.server_id, "name": self.product_name, } post_data = json.dumps(data2send, cls=ServerJSONEncoder).encode() await self.send_data(message.method, post_data) class FileBasedGatewayAPI(SendMessageAPI): async def _prepare_message(self, message, semaphore) -> dict: async with semaphore: loaded = await asyncio.to_thread(json.loads, message) return { "method": loaded["method"], "data": {k: v for k, v in loaded.items() if k != "method"}, } async def send_messages(self, messages: list[tuple[float, bytes]]) -> None: max_threads = 5 semaphore = asyncio.Semaphore(max_threads) tasks = [ self._prepare_message(msg, semaphore) async for _, msg in AsyncIterate(messages) ] prepared_messages = await asyncio.gather(*tasks) dumped_messages = await asyncio.to_thread( json.dumps, prepared_messages ) bin_file_path = os.getenv( "I360_MESSAGE_GATEWAY_BIN_PATH", "/usr/libexec/" ) bin_file = os.path.join(bin_file_path, "imunify-message-gateway") command = [ bin_file, "send-many", "--producer=i360-agent-non-resident", ] process = await asyncio.create_subprocess_exec( *command, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) b64data = base64.b64encode(dumped_messages.encode()) stdout, stderr = await process.communicate(input=b64data) if g.get("DEBUG"): logger.info( "Message sent to fgw: %s %s %s", len(messages), stdout, stderr ) if process.returncode != 0: logger.error(f"Error sending message: {stderr.decode()}") raise FGWSendMessgeException( str(f"Error sending message: {stderr.decode()}") )
Simpan
Batal
Isi Zip:
Unzip
Create
Buat Folder
Buat File
Terminal / Execute
Run
Chmod Bulk
All File
All Folder
All File dan Folder
Apply