ACIL FM
Dark
Refresh
Current DIR:
/opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/plugins
/
opt
imunify360
venv
lib
python3.11
site-packages
defence360agent
plugins
Upload
Zip Selected
Delete Selected
Pilih semua
Nama
Ukuran
Permission
Aksi
__pycache__
-
chmod
Open
Rename
Delete
accumulate.py
3.63 MB
chmod
View
DL
Edit
Rename
Delete
analyst_cleanup_update.py
5.53 MB
chmod
View
DL
Edit
Rename
Delete
backup_info_sender.py
3.06 MB
chmod
View
DL
Edit
Rename
Delete
cagefs.py
5.17 MB
chmod
View
DL
Edit
Rename
Delete
checkpoint.py
1.23 MB
chmod
View
DL
Edit
Rename
Delete
client.py
10.6 MB
chmod
View
DL
Edit
Rename
Delete
config_merger.py
828 B
chmod
View
DL
Edit
Rename
Delete
config_watcher.py
1.89 MB
chmod
View
DL
Edit
Rename
Delete
event_hook_executor.py
777 B
chmod
View
DL
Edit
Rename
Delete
event_monitor.py
3.32 MB
chmod
View
DL
Edit
Rename
Delete
event_monitor_message_processor.py
6.33 MB
chmod
View
DL
Edit
Rename
Delete
files_recurring_update.py
1.09 MB
chmod
View
DL
Edit
Rename
Delete
icontact_sender.py
4.42 MB
chmod
View
DL
Edit
Rename
Delete
idle_time_out.py
1.21 MB
chmod
View
DL
Edit
Rename
Delete
lve_utils_install.py
1.58 MB
chmod
View
DL
Edit
Rename
Delete
myimunify.py
1.98 MB
chmod
View
DL
Edit
Rename
Delete
ping.py
536 B
chmod
View
DL
Edit
Rename
Delete
send_domain_list.py
2.78 MB
chmod
View
DL
Edit
Rename
Delete
send_server_config.py
10.96 MB
chmod
View
DL
Edit
Rename
Delete
__init__.py
0 B
chmod
View
DL
Edit
Rename
Delete
Edit file: /opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/plugins/client.py
import asyncio import concurrent.futures import contextlib import json import logging import os import time import uuid from typing import Generator from defence360agent.api.server import ( APIError, APIErrorTooManyRequests, APITokenError, send_message, ) from defence360agent.contracts import license from defence360agent.contracts.config import Core from defence360agent.contracts.messages import ( Message, MessageList, MessageType, ) from defence360agent.contracts.plugins import MessageSink, expect from defence360agent.internals.persistent_message import ( PersistentMessagesQueue, ) from defence360agent.utils import recurring_check, Scope from defence360agent.utils.json import ServerJSONEncoder logger = logging.getLogger(__name__) class SendToServerClient: """Send messages to server. * process Reportable messages; * add them to a pending messages list; * send all pending messages to server when list is full (contains _PENDING_MESSAGES_LIMIT items or more); * send all pending messages on plugin shutdown.""" _PENDING_MESSAGES_LIMIT = int( os.environ.get("IMUNIFYAV_MESSAGES_COUNT_TO_SEND", 20) ) _SEND_MESSAGE_RECURRING_TIME = 60 * 5 # 5 minutes # 50 second because it should be less than DefaultTimeoutStopSec _SHUTDOWN_SEND_TIMEOUT = 50 async def create_sink(self, loop: asyncio.AbstractEventLoop): self._loop = loop self._pending = PersistentMessagesQueue() self._try_send = asyncio.Event() self._lock = asyncio.Lock() self._sender_task = loop.create_task(self._send()) self._invoke_send_message_task = loop.create_task( self._invoke_send_message() ) async def shutdown(self) -> None: """ When shutdown begins it gives 50 seconds to send _pending messages to the server (after 60 seconds process will bi killed by systemd) If stop() isn't done in 50 second it terminates process of sending messages and logs error """ try: await asyncio.wait_for(self.stop(), self._SHUTDOWN_SEND_TIMEOUT) except asyncio.TimeoutError: # Used logger.error to notify sentry logger.error( "Timeout (%ds) sending messages to server on shutdown.", self._SHUTDOWN_SEND_TIMEOUT, ) if not self._sender_task.cancelled(): self._sender_task.cancel() with contextlib.suppress(asyncio.CancelledError): await self._sender_task if self._pending.buffer_size > 0: logger.warning( "Save %s messages to persistent storage", self._pending.buffer_size, ) self._pending.push_buffer_to_storage() logger.warning("Stored queue %r", self._pending.qsize()) async def stop(self): """ Stop sending. 1. wait for the lock being available i.e., while _sender_task finishes the current round of sending message (if it takes too long, then the timeout in shutdown() is triggered 2. once the sending round complete (we got the lock), cancel the next iteration of the _sender_task (it exits) 3. send _pending messages (again, if it takes too long, the timeout in shutdown() is triggered and the coroutine is cancelled That method makes sure that the coroutine that was started in it has ended. It excludes a situation when: -> The result of a coroutine that started BEFORE shutdown() is started. -> And the process of sending messages from _pending is interrupted because of it """ # The _lock allows you to be sure that the _send_pending_messages # coroutine is not running and _pending is not being used logger.info("SendToServer.stop cancel _invoke_send_message_task") self._invoke_send_message_task.cancel() with contextlib.suppress(asyncio.CancelledError): await self._invoke_send_message_task logger.info("SendToServer.stop wait lock") async with self._lock: # Cancel _sender_task. The lock ensures that the coroutine # is not in its critical part logger.info("SendToServer.stop lock acquired, cancel _sender_task") self._sender_task.cancel() with contextlib.suppress(asyncio.CancelledError): await self._sender_task # send messages that are in _pending at the time of agent shutdown await self._send_pending_messages() @staticmethod def _set_api_attrs(api): api.set_product_name(license.LicenseCLN.get_product_name()) api.set_server_id(license.LicenseCLN.get_server_id()) api.set_license(license.LicenseCLN.get_token()) return api @contextlib.contextmanager def _get_api(self) -> Generator[send_message.SendMessageAPI, None, None]: base_url = os.environ.get("IMUNIFYAV_API_BASE") # we send messages sequentially, so max_workers=1 with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: api = send_message.SendMessageAPI( Core.VERSION, base_url, executor=executor ) yield self._set_api_attrs(api) @expect(MessageType.Reportable) async def send_to_server(self, message: Message) -> None: # add message handling time if it does not exist, so that # the server does not depend on the time it was received if "timestamp" not in message: message["timestamp"] = time.time() if "message_id" not in message: message["message_id"] = uuid.uuid4().hex self._pending.put(self._encode_data_to_put_in_queue(message)) self._try_send.set() @recurring_check(_SEND_MESSAGE_RECURRING_TIME) async def _invoke_send_message(self): self._try_send.set() @recurring_check(0) async def _send(self): await self._try_send.wait() self._try_send.clear() if self._pending.qsize() >= self._PENDING_MESSAGES_LIMIT: # The _lock protects critical part of _send method logger.info("SendToServer._send wait lock") need_to_cancel = None async with self._lock: logger.info("SendToServer._send lock acquired") try: await self._send_pending_messages() except asyncio.CancelledError as e: logger.info("SendToServer._send cancelled unlocking") need_to_cancel = e logger.info("SendToServer._send lock released") if need_to_cancel: raise need_to_cancel def _encode_data_to_put_in_queue(self, data: Message) -> bytes: msg = json.dumps(data, cls=ServerJSONEncoder) + "\n" return msg.encode() def _decode_message(self, message: bytes) -> Message: data = json.loads(message) if data.get("list"): return MessageList(data.get("list")) return Message(data) async def _send_pending_messages(self) -> None: messages = self._pending.pop_all() logger.info("Sending %s messages", len(messages)) with self._get_api() as api: if api.server_id is not None: stop_attempt = False for timestamp, message_bytes in messages: if stop_attempt: self._pending.put(message_bytes, timestamp=timestamp) continue try: message = self._decode_message(message_bytes) await api.send_message(message) logger.info( "message sent %s", { "method": message.get("method"), "message_id": message.get("message_id"), }, ) except APIErrorTooManyRequests as exc: logger.warning( "Too many requests on send message %s to" " server: %s", { "method": message.get("method"), "message_id": message.get("message_id"), }, exc, ) message["api_retries_count"] = ( message.get("api_retries_count", 0) + 1 ) self._pending.put( self._encode_data_to_put_in_queue(message), timestamp=timestamp, ) stop_attempt = True except APITokenError as exc: logger.warning( "Token error on send message %s to server: %s", { "method": message.get("method"), "message_id": message.get("message_id"), }, exc, ) message["api_retries_count"] = ( message.get("api_retries_count", 0) + 1 ) self._pending.put( self._encode_data_to_put_in_queue(message), timestamp=timestamp, ) stop_attempt = True except APIError as exc: logger.warning( "Failed to send message %s to server: %s", { "method": message.get("method"), "message_id": message.get("message_id"), }, exc, ) message["api_retries_count"] = ( message.get("api_retries_count", 0) + 1 ) self._pending.put( self._encode_data_to_put_in_queue(message), timestamp=timestamp, ) logger.info("Unsuccessful to send %s messages", self._pending.qsize()) class SendToServer(SendToServerClient, MessageSink): SCOPE = Scope.AV SHUTDOWN_PRIORITY = 900 # Shutdown late, after Accumulate has flushed
Simpan
Batal
Isi Zip:
Unzip
Create
Buat Folder
Buat File
Terminal / Execute
Run
Chmod Bulk
All File
All Folder
All File dan Folder
Apply