ACIL FM
Dark
Refresh
Current DIR:
/opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/contracts
/
opt
imunify360
venv
lib
python3.11
site-packages
defence360agent
contracts
Upload
Zip Selected
Delete Selected
Pilih semua
Nama
Ukuran
Permission
Aksi
__pycache__
-
chmod
Open
Rename
Delete
config.py
42.01 MB
chmod
View
DL
Edit
Rename
Delete
config_provider.py
9.36 MB
chmod
View
DL
Edit
Rename
Delete
eula.py
1.47 MB
chmod
View
DL
Edit
Rename
Delete
hooks.py
6.13 MB
chmod
View
DL
Edit
Rename
Delete
hook_events.py
1.51 MB
chmod
View
DL
Edit
Rename
Delete
license.py
21.27 MB
chmod
View
DL
Edit
Rename
Delete
messages.py
12 MB
chmod
View
DL
Edit
Rename
Delete
myimunify_id.py
4.4 MB
chmod
View
DL
Edit
Rename
Delete
permissions.py
6.01 MB
chmod
View
DL
Edit
Rename
Delete
plugins.py
8.1 MB
chmod
View
DL
Edit
Rename
Delete
sentry.py
2.97 MB
chmod
View
DL
Edit
Rename
Delete
__init__.py
0 B
chmod
View
DL
Edit
Rename
Delete
Edit file: /opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/contracts/config.py
""" All the config settings for defence360 in one place """ import functools import logging import os from abc import abstractmethod from bisect import bisect_left, bisect_right from copy import deepcopy from datetime import datetime, timedelta from enum import Enum from pathlib import Path from typing import ( Any, Callable, Dict, List, Mapping, Optional, Protocol, Sequence, Tuple, Union, _ProtocolMeta, ) from cerberus import Validator from defence360agent.contracts.config_provider import ( CachedConfigReader, ConfigError, ConfigReader, UserConfigReader, WriteOnlyConfigReader, ) from defence360agent.feature_management.checkers import ( config_cleanup as fm_config_cleanup, ) from defence360agent.utils import Singleton, dict_deep_update, importer av_version = importer.get( module="imav._version", name="__version__", default=None ) logger = logging.getLogger(__name__) ANTIVIRUS_MODE = not importer.exists("im360") # feature flag for those clients who want to test Myimunify FREEMIUM_FEATURE_FLAG = "/var/imunify360/myimunify-freemium.flag" MY_IMUNIFY_KEY = "MY_IMUNIFY" _version = importer.get( module="im360._version", name="__version__", default=av_version ) AGENT_CONF = "../." CONFIG_VALIDATORS_DIR_PATH = Path( os.environ.get( "IM360_CONFIG_SCHEMA_PATH", "/opt/imunify360/venv/share/imunify360/config_schema/", ) ) # TODO: remove after av-7.16.0 release NOTIFY, CLEANUP = "notify", "cleanup" NONE, DAY, WEEK, MONTH = "none", "day", "week", "month" DEFAULT_INTENSITY_CPU = 2 DEFAULT_INTENSITY_IO = 2 DEFAULT_INTENSITY_RAM = 2048 DEFAULT_RESOURCE_MANAGEMENT_CPU_LIMIT = 2 DEFAULT_RESOURCE_MANAGEMENT_IO_LIMIT = 2 DEFAULT_RESOURCE_MANAGEMENT_RAM_LIMIT = 500 MODSEC_RULESET_FULL = "FULL" MODSEC_RULESET_MINIMAL = "MINIMAL" _DOS_DETECTOR_DEFAULT_LIMIT = 250 _DOS_DETECTOR_MIN_LIMIT = 1 _DOS_DETECTOR_MIN_INTERVAL = 1 PORT_BLOCKING_MODE_DENY = "DENY" PORT_BLOCKING_MODE_ALLOW = "ALLOW" DO_NOT_MODIFY_DISCLAMER = """\ ############################################################################ # DO NOT MODIFY THIS FILE!!! # # USE /etc/sysconfig/imunify360/imunify360.config.d/ TO OVERRIDE DEFAULTS # ############################################################################ """ DEFAULT_CONFIG_DISCLAMER = """\ ############################################################################ # DO NOT MODIFY THIS FILE!!! # # USE /etc/sysconfig/imunify360/imunify360.config.d/ TO OVERRIDE DEFAULTS # # This is an example of default values only # # Changing this file will have no effect # ############################################################################ """ CPANEL, PLESK, DIRECTADMIN = "cpanel", "plesk", "directadmin" ACRONIS, R1SOFT, CLUSTERLOGICS = "acronis", "r1soft", "clusterlogics" SAMPLE_BACKEND = "sample" CLOUDLINUX, CLOUDLINUX_ON_PREMISE = "cloudlinux", "cloudlinux_on_premise" GENERIC_SENSOR_SOCKET_PATH = "/var/run/defence360agent/generic_sensor.sock.2" def int_from_envvar(var: str, default: int, env: Mapping = os.environ) -> int: try: return int(env[var]) except KeyError: return default except ValueError as e: raise ValueError("{}: integer required".format(var)) from e def bool_from_envvar( var: str, default: bool, env: Mapping = os.environ ) -> bool: TRUE_VALS = ("true", "t", "yes", "y", "1") FALSE_VALS = ("false", "f", "no", "n", "0") try: val = env[var] except KeyError: return default else: val = val.lower() if val in TRUE_VALS: return True if val in FALSE_VALS: return False raise ValueError( "{}: should be one of {}".format(var, TRUE_VALS + FALSE_VALS) ) def _self_rel2abs(relpath): return os.path.join(os.path.dirname(__file__), relpath) def conf_rel2abs(relpath): return os.path.join(os.path.dirname(_self_rel2abs(AGENT_CONF)), relpath) def _slurp_file(path): """Returns content for existing file, otherwise None""" try: with open(path, "r") as f: return f.read().strip() except OSError: return None def choose_value_from_config( section, option, username: str | None = None ) -> Tuple[Any, str | None]: """ Choose action for config option by checking EndUser's Imunify360 config and Admin config. Admins config applies only if EndUser didn't set the default action """ user_config = ConfigFile(username=username).config_to_dict() user_section = user_config.get(section) if user_section is not None: user_value = user_section.get(option) if user_value is not None: return user_value, username logger.debug("Cannot read %s:%s from user config", section, option) root_config = ConfigFile().config_to_dict() return root_config[section][option], UserType.ROOT def is_mi_freemium_license(): """ Just checks if this is server with MyImunify Freemium license """ return os.path.exists(FREEMIUM_FEATURE_FLAG) class FromConfig: def __init__(self, section, option=None, config_cls=None): self.section = section self.option = option self._config_cls = config_cls self._config_instance = None def __get__(self, instance, owner): if self._config_instance is None: if self._config_cls is None: self._config_instance = ConfigFile() else: self._config_instance = self._config_cls() section_value = self._config_instance.config_to_dict()[self.section] if self.option is not None: return section_value[self.option] return section_value class FromFlagFile: LOCATION = Path("/var/imunify360") def __init__(self, name, *, coerce=bool, default=...): self.name = name self.coerce = coerce self.default = default def __get__(self, instance, owner): path = self.LOCATION / self.name if path.exists(): return self.coerce(path.read_text() or self.default) def _get_combined_validation_schema(*, root=True): func_name = "get_root_config" if root else "get_non_root_config" combined_schema = {} for module in importer.iter_modules([CONFIG_VALIDATORS_DIR_PATH]): get_schema = getattr(module, func_name, lambda: {}) schema = get_schema() dict_deep_update(combined_schema, schema, allow_overwrite=False) return combined_schema @functools.lru_cache(maxsize=1) def config_schema_root(): return _get_combined_validation_schema() @functools.lru_cache(maxsize=1) def config_schema_non_root(): return _get_combined_validation_schema(root=False) CONFIG_SCHEMA_CUSTOM_BILLING = { "CUSTOM_BILLING": { "type": "dict", "schema": { "upgrade_url": { "type": "string", "default": None, "nullable": True, }, "upgrade_url_360": { "type": "string", "default": None, "nullable": True, }, "billing_notifications": {"type": "boolean", "default": True}, "ip_license": {"type": "boolean", "default": True}, }, "default": {}, } } class ConfigValidationError(Exception): pass class Core: PRODUCT = "imunify360" NAME = "%s agent" % PRODUCT if not ANTIVIRUS_MODE else "imunify antivirus" AV_VERSION = av_version VERSION = _version # AV or IM360 API_BASE_URL = os.environ.get( "IMUNIFY360_API_URL", "https://api.imunify360.com" ) DEFAULT_SOCKET_TIMEOUT = 10 DIST = ".el9" FILE_UMASK = 0o007 TMPDIR = "/var/imunify360/tmp" MERGED_CONFIG_FILE_NAME = "imunify360-merged.config" USER_CONFIG_FILE_NAME = "imunify360.config" LOCAL_CONFIG_FILE_NAME = "imunify360.config" CONFIG_DIR = "/etc/imunify360" USER_CONFDIR = os.path.join(CONFIG_DIR, "user_config") GLOBAL_CONFDIR = "/etc/sysconfig/imunify360" MERGED_CONFIG_FILE_PATH = os.path.join( GLOBAL_CONFDIR, MERGED_CONFIG_FILE_NAME ) MERGED_CONFIG_FILE_PERMISSION = 0o644 LOCAL_CONFIG_FILE_PATH = os.path.join(GLOBAL_CONFDIR, "imunify360.config") CONFIG_D_NAME = "imunify360.config.d" BACKUP_CONFIGFILENAME = ".imunify360.backup_config" HOOKS_CONFIGFILENAME = "hooks.yaml" CUSTOM_BILLING_CONFIGFILENAME = "custom_billing.config" INBOX_HOOKS_DIR = "/var/imunify360/hooks" SVC_NAME = ( "imunify360-agent" if not ANTIVIRUS_MODE else "imunify-antivirus" ) UNIFIED_ACCESS_LOGGER_CONFIGFILENAME = "unified-access-logger.conf" GO_FLAG_FILE = Path("/etc/sysconfig/imunify360/.go_agent") SIGNAL_HANDLER_MIGRATION_TIMEOUT_SECS = 60 class IConfig(Protocol): @abstractmethod def config_to_dict( self, normalize: bool = True, force_read: bool = False ) -> dict: raise NotImplementedError @abstractmethod def dict_to_config( self, data: Mapping, validate: bool = True, normalize: bool = True, overwrite: bool = False, without_defaults: bool = False, ) -> None: raise NotImplementedError @abstractmethod def validate(self) -> None: raise NotImplementedError @abstractmethod def normalize( self, config: Mapping, without_defaults: bool = False ) -> dict: raise NotImplementedError @abstractmethod def modified_since(self, timestamp: Optional[float]) -> bool: raise NotImplementedError def get(self, section: str, option: Optional[str]) -> Any: if option: return self.config_to_dict().get(section, {}).get(option) return None def set(self, section: str, option: Optional[str], value: Any) -> None: if option: self.dict_to_config({section: {option: value}}) class IConfigFile(IConfig, Protocol): path: str class ProtocolSingleton(_ProtocolMeta, Singleton): """ Needed to avoid metaclass conflict when implementing protocols that are also Singletons """ class Normalizer: def __init__(self, validation_schema): self._config: Optional[Mapping] = None self._normalized_config: dict = {} self._schema = ConfigsValidator.get_validation_schema( validation_schema ) self._schema_without_defaults = self._get_schema_without_defaults( self._schema ) @classmethod def _get_schema_without_defaults(cls, _dict: Mapping) -> dict: return { key: cls._get_schema_without_defaults(value) if isinstance(value, dict) else value for key, value in _dict.items() if key not in ["default", "default_setter"] } @staticmethod def remove_null(config: Mapping) -> dict: new_config: Dict[str, Union[dict, List[Tuple]]] = {} for section, options in config.items(): # We only support dict for option removal if isinstance(options, dict): for option, value in options.items(): if value is not None: new_config.setdefault(section, {})[option] = value elif options: # Let cerberus coerce this to dict # and leave the None's as is new_config[section] = options return new_config @staticmethod def _normalize_with_schema(config: Mapping, schema) -> dict: validator = ConfigValidator(schema) normalized: Optional[dict] = validator.normalized(config) if validator.errors: raise ConfigValidationError(validator.errors) if normalized is None: raise ConfigValidationError(f"Cerberus returned None for {config}") return normalized def normalize(self, config: Mapping, without_defaults: bool) -> dict: schema = ( self._schema_without_defaults if without_defaults else self._schema ) if without_defaults: config = self.remove_null(config) return self._normalize_with_schema(config, schema) # Utilize cache if config == self._config: return self._normalized_config normalized = self._normalize_with_schema(config, schema) self._config = config self._normalized_config = normalized return self._normalized_config class Config(IConfig, metaclass=ProtocolSingleton): DISCLAIMER = "" def __init__( self, *, # FIXME: allow pathlib.Path path: str = None, config_reader: ConfigReader = None, validation_schema: Union[Mapping, Callable[[], Mapping]] = None, disclaimer: str = None, cached: bool = True, permissions: int = None, ): super().__init__() assert path or config_reader config_reader_cls = CachedConfigReader if cached else ConfigReader self._config_reader = config_reader or config_reader_cls( path, disclaimer=disclaimer or self.DISCLAIMER, permissions=permissions, ) self.path = self._config_reader.path self.validation_schema = validation_schema or config_schema_root self._normalizer = Normalizer(self.validation_schema) def __repr__(self): return ( "<{classname}(config_reader={config_reader!r}," " validation_schema={validation_schema!r})" ">" ).format( classname=self.__class__.__qualname__, config_reader=self._config_reader, validation_schema=self.validation_schema, ) def normalize(self, config: Mapping, without_defaults=False) -> dict: return self._normalizer.normalize(config, without_defaults) def config_to_dict(self, normalize=True, force_read: bool = False) -> dict: """ Converts config file to dict :return dict: dictionary (key (section) / value (options)) """ config = self._config_reader.read_config_file(force_read=force_read) if normalize: config = self.normalize(config) return deepcopy(config) def dict_to_config( self, data: Mapping, validate: bool = True, normalize: bool = True, overwrite: bool = False, without_defaults: bool = False, ) -> None: """ Converts dict to config file New options will be mixed in with old ones unless overwrite is specified :param dict data: dictionary (key (section) / value (options)) :param bool validate: indicates if we need validation :param bool normalize: normalize config :param overwrite: overwrite existing conf :param without_defaults: do not fill defaults :return: None """ if overwrite: self._dict_to_config_overwrite( data=data, validate=validate, normalize=normalize, without_defaults=without_defaults, ) else: self._dict_to_config( data=data, validate=validate, normalize=normalize, without_defaults=without_defaults, ) def _dict_to_config_overwrite( self, data, validate, normalize, without_defaults ): if validate: ConfigsValidator.validate(data, self.validation_schema) if normalize: data = self.normalize(data, without_defaults=without_defaults) self._config_reader.write_config_file(data) def _dict_to_config(self, data, validate, normalize, without_defaults): config = deepcopy(self._config_reader.read_config_file()) if dict_deep_update(config, data): if validate: ConfigsValidator.validate(config, self.validation_schema) if normalize: config = self.normalize( config, without_defaults=without_defaults ) self._config_reader.write_config_file(config) def validate(self): """ :raises ConfigsValidatorError """ try: config_dict = self._config_reader.read_config_file( ignore_errors=False ) except ConfigError as e: message = "Error during config validation" logger.error("%s: %s", message, e) raise ConfigsValidatorError({self: message}) from e try: ConfigsValidator.validate(config_dict, self.validation_schema) except ConfigValidationError as e: message = "Imunify360 config does not match the scheme" logger.error("%s: %s", message, e) raise ConfigsValidatorError({self: message}) from e def modified_since(self, timestamp: Optional[float]) -> bool: return self._config_reader.modified_since(timestamp) class UserConfig(Config): def __init__(self, *, username): self.username = username path = os.path.join( Core.USER_CONFDIR, username, Core.USER_CONFIG_FILE_NAME ) super().__init__( path=path, config_reader=UserConfigReader(path, username), validation_schema=config_schema_non_root, ) class SystemConfig(IConfig, metaclass=ProtocolSingleton): def __init__( self, *, local_config: Config = None, merged_config: Config = None ): super().__init__() self._local_config = local_config or LocalConfig() self._merged_config = merged_config or MergedConfig() def config_to_dict( self, normalize: bool = True, force_read: bool = False ) -> dict: return self._merged_config.config_to_dict( normalize=normalize, force_read=force_read ) def dict_to_config( self, data: Mapping, validate: bool = True, normalize: bool = True, overwrite: bool = False, without_defaults: bool = True, ) -> None: self._local_config.dict_to_config( data, validate=validate, normalize=normalize, overwrite=overwrite, without_defaults=without_defaults, ) def validate(self) -> None: self._merged_config.validate() def normalize( self, config: Mapping, without_defaults: bool = False ) -> dict: return self._merged_config.normalize( config=config, without_defaults=without_defaults ) def modified_since(self, timestamp: Optional[float]) -> bool: return self._merged_config.modified_since(timestamp) def config_file_factory( username: Optional[Union[str, int]] = None, path: Optional[str] = None ) -> IConfig: if username and not isinstance(username, int): return UserConfig(username=username) elif path: return Config(path=path) else: return SystemConfig() # TODO: move all layer related functions to another module def any_layer_modified_since(timestamp) -> bool: merger = Merger(Merger.get_layer_names()) for layer in merger.layers: if layer.modified_since(timestamp): return True return False MergedConfig = functools.partial( Config, config_reader=WriteOnlyConfigReader( path=Core.MERGED_CONFIG_FILE_PATH, disclaimer=DO_NOT_MODIFY_DISCLAMER, permissions=Core.MERGED_CONFIG_FILE_PERMISSION, ), ) class LocalConfig(Config): """ Config (/etc/sysconfig/imunify360/imunify360.config) should contain options changed by a customer only """ def __init__( self, *, path=Core.LOCAL_CONFIG_FILE_PATH, # overrides the parent default value config_reader: ConfigReader = None, validation_schema: Union[Mapping, Callable[[], Mapping]] = None, disclaimer: str = None, cached=False, # overrides the parent default value ): super().__init__( path=path, config_reader=config_reader, validation_schema=validation_schema, disclaimer=disclaimer, cached=cached, ) def dict_to_config( self, data: Mapping, validate: bool = True, normalize: bool = True, overwrite: bool = False, without_defaults: bool = True, # overrides the parent default value ) -> None: return super().dict_to_config( data, validate=validate, normalize=normalize, overwrite=overwrite, without_defaults=without_defaults, ) class BaseMerger: DIR = os.path.join(Core.GLOBAL_CONFDIR, Core.CONFIG_D_NAME) LOCAL_CONFIG_NAME = "90-local.config" def __init__(self, names, include_defaults=False): self._include_defaults = include_defaults self.layers = [ Config(path=os.path.join(self.DIR, name)) for name in names ] @classmethod def get_layer_names(cls): return sorted(os.listdir(cls.DIR)) if os.path.isdir(cls.DIR) else [] def configs_to_dict(self, force_read=False): layer_dict_list = [] if self._include_defaults: defaults = Normalizer(config_schema_root).normalize( {}, without_defaults=False ) layer_dict_list.append(defaults) layer_dict_list += [ layer.config_to_dict(normalize=False, force_read=force_read) for layer in self.layers ] return self._build_effective_config(layer_dict_list) @classmethod def _build_effective_config(cls, layer_dict_list: list): effective: Dict[str, dict] = {} for layer_dict in layer_dict_list: for section, options in layer_dict.items(): if options is None: continue if section not in effective: effective[section] = {} for option, value in options.items(): if value is not None: effective[section][option] = value return effective class MutableMerger(BaseMerger): def __init__(self, names: Sequence): idx = bisect_left(names, self.LOCAL_CONFIG_NAME) names = names[:idx] super().__init__(names, include_defaults=True) class ImmutableMerger(BaseMerger): def __init__(self, names: Sequence): idx = bisect_right(names, self.LOCAL_CONFIG_NAME) names = names[idx:] super().__init__(names, include_defaults=False) class NonBaseMerger(BaseMerger): def __init__(self, names: Sequence): super().__init__(names, include_defaults=False) class Merger(BaseMerger): def __init__(self, names): super().__init__(names, include_defaults=True) @classmethod def update_merged_config(cls): merged_config = MergedConfig() merger = cls(cls.get_layer_names()) config_dict = merger.configs_to_dict() try: ConfigsValidator.validate(config_dict) except (ConfigsValidatorError, ConfigValidationError) as e: logger.warning("Config file is invalid! %s", e) else: merged_config.dict_to_config(config_dict, validate=False) class ConfigValidator(Validator): def __init__( self, *args, allow_unknown=ANTIVIRUS_MODE, purge_readonly=True, **kwargs, ): """ Initialises ConfigValidator(Validator) for more details on Validator params please check https://docs.python-cerberus.org/en/stable/validation-rules.html """ super().__init__( *args, allow_unknown=allow_unknown, purge_readonly=purge_readonly, **kwargs, ) def _normalize_coerce_user_override_pd_rules(self, value): if self.root_document.get("MY_IMUNIFY", {}).get("enable", False): return True return value class Packaging: DATADIR = "/opt/imunify360/venv/share/%s" % Core.PRODUCT class SimpleRpc: # how long to wait for the response from RPC server in seconds CLIENT_TIMEOUT = 3600 SOCKET_PATH = "/var/run/defence360agent/simple_rpc.sock" # end-user stuff NON_ROOT_SOCKET_PATH = "/var/run/defence360agent/non_root_simple_rpc.sock" TOKEN_FILE_TMPL = ".imunify360_token_{suffix}" # -r-------- TOKEN_MASK = 0o400 SOCKET_ACTIVATION = bool_from_envvar( "I360_SOCKET_ACTIVATION", ANTIVIRUS_MODE, ) INACTIVITY_TIMEOUT = int_from_envvar( "IMUNIFY360_INACTIVITY_TIMEOUT", int(timedelta(minutes=5).total_seconds()), ) class Model: # type sqlite3 - sqlite only supported PATH = "/var/%s/%s.db" % (Core.PRODUCT, Core.PRODUCT) PROACTIVE_PATH = "/var/%s/proactive.db" % (Core.PRODUCT,) RESIDENT_PATH = "/var/%s/%s-resident.db" % (Core.PRODUCT, Core.PRODUCT) class FilesUpdate: # Check files update periodically PERIOD = timedelta(minutes=30).total_seconds() # Timeout for single Index update (group of files) DEF-11501 # It has to be less than watchdog timeout (60 min) TIMEOUT = timedelta(minutes=15).total_seconds() # Timeout for individual socket operations (connect, recv/read, etc.) # For instance ssl-handshake timeout == SOCKET_TIMEOUT # Than less the value than better, to do not wait to long SOCKET_TIMEOUT = 3 # seconds # Following settings applicable only for IM360: # File types that should be downloaded but not applied # to the system (e.g. for testing purposes) DISABLED = [] # How long to keep the files on the disk DAYS_TO_KEEP = 30 class CountryInfo: DB = "/var/imunify360/files/geo/v1/GeoLite2-Country.mmdb" LOCATIONS_DB = ( "/var/imunify360/files/geo/v1/GeoLite2-Country-Locations-en.csv" ) @staticmethod def country_subnets_file(country_code): return "/var/imunify360/files/geo/v1/CountrySubnets-{}.txt".format( country_code ) class Sentry: DSN = os.getenv( "IMUNITY360_SENTRY_DSN", _slurp_file("%s/sentry" % Packaging.DATADIR) ) ENABLE = FromConfig("ERROR_REPORTING", "enable") class Malware: SCAN_CHECK_PERIOD = 300 CONSECUTIVE_ERROR_LIMIT = 10 INOTIFY_SCAN_PERIOD = 60 CONFIG_CHECK_PERIOD = 30 CONFLICTS_CHECK_PERIOD = 300 INOTIFY_ENABLED = FromConfig("MALWARE_SCANNING", "enable_scan_inotify") PURE_SCAN = FromConfig("MALWARE_SCANNING", "enable_scan_pure_ftpd") SEND_FILES = FromConfig("MALWARE_SCANNING", "sends_file_for_analysis") CLOUD_ASSISTED_SCAN = FromConfig("MALWARE_SCANNING", "cloud_assisted_scan") RAPID_SCAN = FromConfig("MALWARE_SCANNING", "rapid_scan") CRONTABS_SCAN_ENABLED = FromConfig("MALWARE_SCANNING", "crontabs") SCANS_PATH = "/var/imunify360/aibolit/scans.pickle" FILE_PREVIEW_BYTES_NUM = 1024 * 100 # 100 KB CLEANUP_STORAGE = "/var/imunify360/cleanup_storage" CLEANUP_TRIM = FromConfig( section="MALWARE_CLEANUP", option="trim_file_instead_of_removal", ) CLEANUP_KEEP = FromConfig( section="MALWARE_CLEANUP", option="keep_original_files_days", ) SCAN_MODIFIED_FILES = FromConfig( section="MALWARE_SCANNING", option="scan_modified_files", ) MAX_SIGNATURE_SIZE_TO_SCAN = FromConfig( "MALWARE_SCANNING", "max_signature_size_to_scan" ) MAX_CLOUDSCAN_SIZE_TO_SCAN = FromConfig( "MALWARE_SCANNING", "max_cloudscan_size_to_scan" ) MAX_MRS_UPLOAD_FILE = FromConfig("MALWARE_SCANNING", "max_mrs_upload_file") RAPID_SCAN_RESCAN_UNCHANGING_FILES_FREQUENCY = FromConfig( "MALWARE_SCANNING", "rapid_scan_rescan_unchanging_files_frequency" ) HYPERSCAN = FromConfig("MALWARE_SCANNING", "hyperscan") DATABASE_SCAN_ENABLED = FromConfig("MALWARE_DATABASE_SCAN", "enable") CPANEL_SCAN_ENABLED = FromConfig("MALWARE_SCANNING", "enable_scan_cpanel") CLEANUP_DISABLE_CLOUDAV = FromFlagFile("disable_cloudav") MDS_DB_TIMEOUT = FromConfig("MALWARE_DATABASE_SCAN", "db_timeout") class MalwareScanScheduleInterval: NONE = NONE DAY = DAY WEEK = WEEK MONTH = MONTH class MalwareScanSchedule: CMD = "/usr/bin/imunify360-agent malware user scan --background" CRON_PATH = "/etc/cron.d/imunify_scan_schedule" CRON_STRING = """\ # DO NOT EDIT. AUTOMATICALLY GENERATED. 0 {0} {1} * {2} root {cmd} >/dev/null 2>&1 """ INTERVAL = FromConfig( section="MALWARE_SCAN_SCHEDULE", option="interval", ) HOUR = FromConfig( section="MALWARE_SCAN_SCHEDULE", option="hour", ) DAY_OF_WEEK = FromConfig( section="MALWARE_SCAN_SCHEDULE", option="day_of_week", ) DAY_OF_MONTH = FromConfig( section="MALWARE_SCAN_SCHEDULE", option="day_of_month", ) class MalwareScanIntensity: CPU = FromConfig( section="MALWARE_SCAN_INTENSITY", option="cpu", ) IO = FromConfig( section="MALWARE_SCAN_INTENSITY", option="io", ) RAM = FromConfig( section="MALWARE_SCAN_INTENSITY", option="ram", ) USER_CPU = FromConfig( section="MALWARE_SCAN_INTENSITY", option="user_scan_cpu", ) USER_IO = FromConfig( section="MALWARE_SCAN_INTENSITY", option="user_scan_io", ) USER_RAM = FromConfig( section="MALWARE_SCAN_INTENSITY", option="user_scan_ram", ) class FileBasedResourceLimits: CPU = FromConfig( section="RESOURCE_MANAGEMENT", option="cpu_limit", ) IO = FromConfig( section="RESOURCE_MANAGEMENT", option="io_limit", ) RAM = FromConfig( section="RESOURCE_MANAGEMENT", option="ram_limit", ) class KernelCare: EDF = FromConfig( section="KERNELCARE", option="edf", ) def get_rapid_rescan_frequency(): value = Malware.RAPID_SCAN_RESCAN_UNCHANGING_FILES_FREQUENCY if value is None: freq = { MalwareScanScheduleInterval.NONE: 1, MalwareScanScheduleInterval.MONTH: 2, MalwareScanScheduleInterval.WEEK: 5, MalwareScanScheduleInterval.DAY: 10, } return freq.get(MalwareScanSchedule.INTERVAL, 1) return value class MalwareSignatures: _dir = "/var/imunify360/files/sigs/v1/" RFXN = os.path.join(_dir, "rfxn") i360 = os.path.join(_dir, "i360") AI_BOLIT_HOSTER = os.path.join(_dir, "aibolit", "ai-bolit-hoster-full.db") AI_BOLIT_HYPERSCAN = os.path.join(_dir, "aibolit", "hyperscan") MDS_AI_BOLIT_HOSTER = os.path.join( _dir, "aibolit", "mds-ai-bolit-hoster.db" ) PROCU_DB = os.path.join(_dir, "aibolit", "procu2.db") MDS_PROCU_DB = os.path.join(_dir, "aibolit", "mds-procu2.db") class Logger: MAX_LOG_FILE_SIZE = FromConfig( section="LOGGER", option="max_log_file_size", ) BACKUP_COUNT = FromConfig( section="LOGGER", option="backup_count", ) # directory mode for main log directory and all inner LOG_DIR_PERM = 0o700 # file mode for main log directory and all inner LOG_FILE_PERM = 0o660 class UserType: ROOT = "root" NON_ROOT = "non_root" class UIRole: CLIENT = "client" ADMIN = "admin" class NoCP: # path to script implementing No CP API CLIENT_SCRIPT = "/etc/imunify360/scripts/domains" # latest version of the API supported by agent LATEST_VERSION = 1 class CustomBillingConfig(Config): def __init__(self): path = os.path.join( "/etc/sysconfig/imunify360", Core.CUSTOM_BILLING_CONFIGFILENAME ) super().__init__( path=path, validation_schema=CONFIG_SCHEMA_CUSTOM_BILLING ) class CustomBilling: UPGRADE_URL = FromConfig( section="CUSTOM_BILLING", option="upgrade_url", config_cls=CustomBillingConfig, ) UPGRADE_URL_360 = FromConfig( section="CUSTOM_BILLING", option="upgrade_url_360", config_cls=CustomBillingConfig, ) NOTIFICATIONS = FromConfig( section="CUSTOM_BILLING", option="billing_notifications", config_cls=CustomBillingConfig, ) IP_LICENSE = FromConfig( section="CUSTOM_BILLING", option="ip_license", config_cls=CustomBillingConfig, ) class PermissionsConfig: USER_IGNORE_LIST = FromConfig( section="PERMISSIONS", option="user_ignore_list", ) ALLOW_MALWARE_SCAN = FromConfig( section="PERMISSIONS", option="allow_malware_scan", ) USER_OVERRIDE_MALWARE_ACTIONS = FromConfig( section="PERMISSIONS", option="user_override_malware_actions" ) USER_OVERRIDE_PROACTIVE_DEFENSE = FromConfig( section="PERMISSIONS", option="user_override_proactive_defense", ) ALLOW_LOCAL_MALWARE_IGNORE_LIST_MANAGEMENT = FromConfig( section="PERMISSIONS", option="allow_local_malware_ignore_list_management", ) USE_PLESK_SERVICE_PLAN = FromConfig( section="PERMISSIONS", option="use_plesk_service_plan", ) class MyImunifyConfig: ENABLED = FromConfig( section="MY_IMUNIFY", option="enable", ) PURCHASE_PAGE_URL = FromConfig( section="MY_IMUNIFY", option="purchase_page_url", ) class ControlPanelConfig: SMART_ADVICE_ALLOWED = FromConfig( section="CONTROL_PANEL", option="smart_advice_allowed", ) ADVICE_EMAIL_NOTIFICATION = FromConfig( section="CONTROL_PANEL", option="advice_email_notification", ) def effective_user_config(admin_config, user_config): allowed_sections = [ "BACKUP_RESTORE", "MALWARE_CLEANUP", "MALWARE_SCANNING", "ERROR_REPORTING", "PROACTIVE_DEFENCE", "PERMISSIONS", "MY_IMUNIFY", "CONTROL_PANEL", "MALWARE_SCAN_SCHEDULE", ] overridable = { ( "MALWARE_SCAN_SCHEDULE", "interval", ): PermissionsConfig.USER_OVERRIDE_MALWARE_ACTIONS, ( "MALWARE_SCAN_SCHEDULE", "hour", ): PermissionsConfig.USER_OVERRIDE_MALWARE_ACTIONS, ( "MALWARE_SCAN_SCHEDULE", "day_of_week", ): PermissionsConfig.USER_OVERRIDE_MALWARE_ACTIONS, ( "MALWARE_SCAN_SCHEDULE", "day_of_month", ): PermissionsConfig.USER_OVERRIDE_MALWARE_ACTIONS, ( "MALWARE_SCANNING", "default_action", ): PermissionsConfig.USER_OVERRIDE_MALWARE_ACTIONS, ( "PROACTIVE_DEFENCE", "mode", ): PermissionsConfig.USER_OVERRIDE_PROACTIVE_DEFENSE, } admin_dict = admin_config.config_to_dict() user_dict = user_config.config_to_dict() def normalize_section(section): admin_options = deepcopy(admin_dict.get(section, {})) user_options = deepcopy(user_dict.get(section, {})) resulting_dict = {} for option, admin_value in admin_options.items(): user_value = user_options.get(option) if ( user_value is not None # All options available to user are overridable by default and overridable.get((section, option), True) ): resulting_dict[option] = user_value else: resulting_dict[option] = admin_value return resulting_dict effective_config = { section: normalize_section(section) for section in allowed_sections } return fm_config_cleanup(effective_config, user_config.username) class HookEvents: IM360_EVENTS = ( AGENT, LICENSE, MALWARE_SCANNING, MALWARE_CLEANUP, MALWARE_DETECTED, ) = ( "agent", "license", "malware-scanning", "malware-cleanup", "malware-detected", ) IMAV_EVENTS = ( LICENSE, MALWARE_SCANNING, MALWARE_CLEANUP, MALWARE_DETECTED, ) EVENTS = IMAV_EVENTS if ANTIVIRUS_MODE else IM360_EVENTS class ConfigsValidatorError(Exception): def __init__(self, configs_to_errors: Dict[Config, str]): self.configs_to_errors = configs_to_errors def __repr__(self): errors = [] for config, error in self.configs_to_errors.items(): errors.append(f"{config!r}: {error}") return "\n".join(errors) class ConfigsValidator: """A class that has methods to validate configs bypassing the cache""" @classmethod def validate_system_config(cls): """ Validate merged config :raises ConfigsValidatorError """ SystemConfig().validate() @classmethod def validate_config_layers(cls): """ Validate all config layers, collect all errors :raises ConfigsValidatorError """ configs_to_errors = {} for layer in Merger(Merger.get_layer_names()).layers: try: layer.validate() except ConfigsValidatorError as e: configs_to_errors.update(e.configs_to_errors) if configs_to_errors: raise ConfigsValidatorError(configs_to_errors) @classmethod def validate( cls, config_dict: dict, validation_schema: Union[dict, Callable] = config_schema_root, ) -> None: """ Validate config represented by a dict :param config_dict: config to validate :param validation_schema: schema to validate config against :raises ConfigValidationError """ schema = cls.get_validation_schema(validation_schema) v = ConfigValidator(schema) if not v.validate(config_dict): raise ConfigValidationError(v.errors) @staticmethod def get_validation_schema( validation_schema: Union[Mapping, Callable], ) -> Mapping: if callable(validation_schema): return validation_schema() return validation_schema ConfigFile = config_file_factory class AdminContacts: ENABLE_ICONTACT_NOTIFICATIONS = FromConfig( section="ADMIN_CONTACTS", option="enable_icontact_notifications", ) class IContactMessageType(str, Enum): MALWARE_FOUND = "MalwareFound" SCAN_NOT_SCHEDULED = "ScanNotScheduled" GENERIC = "Generic" def __str__(self): return self.value CONFIG_SCHEMA_BACKUP_SYSTEM = { "BACKUP_SYSTEM": { "type": "dict", "schema": { "enabled": { "type": "boolean", "default": False, }, "backup_system": { "type": "string", "default": None, "nullable": True, "allowed": [ CPANEL, PLESK, R1SOFT, ACRONIS, CLOUDLINUX, DIRECTADMIN, CLOUDLINUX_ON_PREMISE, CLUSTERLOGICS, SAMPLE_BACKEND, ], }, }, "default": {}, } } class BackupConfig(Config): DISCLAIMER = """\ # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! # DO NOT EDIT. AUTOMATICALLY GENERATED. # !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! # # Direct modifications to this file WILL be lost upon subsequent # regeneration of this configuration file. # # To have your modifications retained, you should use CLI command # imunify360-agent backup-systems <init|disable> <backup-system> # or activate/deactivate appropriate feature in UI. # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # """ def __init__( self, *, path=os.path.join( "/etc/sysconfig/imunify360", Core.BACKUP_CONFIGFILENAME ), validation_schema=CONFIG_SCHEMA_BACKUP_SYSTEM, ): super().__init__(path=path, validation_schema=validation_schema) class BackupRestore: ENABLED = FromConfig( section="BACKUP_SYSTEM", option="enabled", config_cls=BackupConfig ) _BACKUP_SYSTEM = FromConfig( section="BACKUP_SYSTEM", option="backup_system", config_cls=BackupConfig, ) CL_BACKUP_ALLOWED = FromConfig( section="BACKUP_RESTORE", option="cl_backup_allowed", ) CL_ON_PREMISE_BACKUP_ALLOWED = FromConfig( section="BACKUP_RESTORE", option="cl_on_premise_backup_allowed", ) @classmethod def backup_system(cls): return _get_backend_system(cls._BACKUP_SYSTEM) def _get_backend_system(name): """ Get backup module from its name :param name: backup system name :return: backup system module """ from restore_infected import backup_backends if name in (CLOUDLINUX, CLOUDLINUX_ON_PREMISE): # cloudlinux backup is actually acronis one name = ACRONIS elif name is None: return None return backup_backends.backend(name, async_=True) class AcronisBackup: # https://kb.acronis.com/content/1711 # https://kb.acronis.com/content/47189 LOG_NAME = "acronis-installer.log" PORTS = (8443, 44445, 55556) RANGE = {(7770, 7800)} def should_try_autorestore_malicious(username: str) -> bool: """ Checks is Agent should try restore malware file firts and returns user that set this action in config """ try_restore, _ = choose_value_from_config( "MALWARE_SCANNING", "try_restore_from_backup_first", username ) return BackupRestore.ENABLED and try_restore def choose_use_backups_start_from_date(username: str) -> datetime: max_days, _ = choose_value_from_config( "BACKUP_RESTORE", "max_days_in_backup", username ) until = datetime.now() - timedelta(days=max_days) return until def should_send_user_notifications(username: str) -> bool: should_send, _ = choose_value_from_config( "CONTROL_PANEL", "generic_user_notifications", username=username, ) return should_send class Wordpress: SECURITY_PLUGIN_ENABLED = FromConfig( "WORDPRESS", "security_plugin_enabled" ) class HackerTrap: DIR = "/var/imunify360" NAME = "malware_found_b64.list" SA_NAME = "malware_standalone_b64.list" DIR_PD = "/opt/imunify360/proactive/dangerlist/"
Simpan
Batal
Isi Zip:
Unzip
Create
Buat Folder
Buat File
Terminal / Execute
Run
Chmod Bulk
All File
All Folder
All File dan Folder
Apply