""" This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program.  If not, see . Copyright © 2019 Cloud Linux Software Inc. This software is also available under ImunifyAV commercial license, see """ import logging import os import pwd from defence360agent.rpc_tools.lookup import ( CommonEndpoints, RootEndpoints, bind, ) from defence360agent.utils import Scope, is_root_user from defence360agent.contracts.messages import MessageType from defence360agent.model.wordpress_incident import get_wordpress_incidents logger = logging.getLogger(__name__) def get_user_id_and_site_for_query( user: str | None = None, site_search: str | None = None ) -> tuple[int | None, str | None]: """ Determine the user_id and site_path for filtering WordPress incidents. Three calling contexts: 1. Root user: Can query all incidents or filter by specific user 2. Non-root user: Can only query their own incidents (user/site_search ignored) 3. Proxy service: Both user and site_search must be set, restricted to that site Args: user: Username to filter by site_search: Site path to filter by Returns: Tuple of (user_id, site_path) to filter by, or (None, None) for all Raises: KeyError: If the specified user doesn't exist ValueError: If proxy service call is missing required parameters """ current_uid = os.getuid() if is_root_user(): # Root user can see all incidents or filter by user logger.debug("Root user querying incidents, user filter: %s", user) user_id = None # Root can see all incidents by default if user is not None: # Root user specified a username to filter by try: user_id = pwd.getpwnam(user).pw_uid logger.debug( "Filtering incidents for user %s (uid=%d)", user, user_id ) except KeyError: logger.warning("User not found: %s", user) raise KeyError(f"User '{user}' not found") return user_id, site_search return current_uid, site_search class WordpressEndpoints(RootEndpoints): SCOPE = Scope.AV_IM360 @bind("wordpress-plugin", "install-on-new-sites") async def wordpress_plugin_install(self): await self._sink.process_message( MessageType.WordpressPluginAction(action="install_on_new_sites") ) @bind("wordpress-plugin", "tidy-up") async def wordpress_plugin_tidy_up(self): await self._sink.process_message( MessageType.WordpressPluginAction(action="tidy_up") ) @bind("wordpress-plugin", "update") async def wordpress_plugin_update(self): await self._sink.process_message( MessageType.WordpressPluginAction(action="update_existing") ) @bind("wordpress-plugin", "install-and-update") async def wordpress_plugin_install_and_update(self): await self._sink.process_message( MessageType.WordpressPluginAction(action="install_and_update") ) class WordpressCommonEndpoints(CommonEndpoints): SCOPE = Scope.AV_IM360 @bind("wordpress-plugin", "list-incidents") async def wordpress_plugin_list_incidents( self, user: str | None = None, site_search: str | None = None, limit: int = 50, offset: int = 0, by_abuser_ip: str | None = None, by_country_code: str | None = None, by_domain: str | None = None, search: str | None = None, since: int | None = None, to: int | None = None, order_by: list | None = None, ) -> list[dict] | str: """ List WordPress security incidents. Three calling contexts: 1. Root user: Can query all incidents or filter by specific user 2. Non-root user: Can only query their own incidents 3. Proxy service: Both user and site_search must be set, restricted to that site Args: user: Username to filter by (root or proxy service) site_search: Site path to filter by (proxy service only) limit: Maximum number of incidents to return offset: Number of incidents to skip by_abuser_ip: Filter by attacker IP address by_country_code: Filter by country code by_domain: Filter by domain search: Search across multiple fields since: Filter by timestamp >= this value (unix timestamp) to: Filter by timestamp <= this value (unix timestamp) order_by: List of fields to order by (e.g., ['timestamp-', 'severity-']) Returns: List of incident dictionaries or error message string """ try: user_id, site_path = get_user_id_and_site_for_query( user, site_search ) except KeyError as e: return f"WARNING: {e}" incidents = get_wordpress_incidents( limit=limit, offset=offset, user_id=user_id, by_abuser_ip=by_abuser_ip, by_country_code=by_country_code, by_domain=by_domain, search=search, site_search=site_path, since=since, to=to, order_by=order_by, ) # Fields transformation for UI for incident in incidents: incident["times"] = incident.pop("retries") incident["country"] = {"code": incident.pop("country")} return incidents