ACIL FM
Dark
Refresh
Current DIR:
/opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/feature_management
/
opt
imunify360
venv
lib
python3.11
site-packages
defence360agent
feature_management
Upload
Zip Selected
Delete Selected
Pilih semua
Nama
Ukuran
Permission
Aksi
plugins
-
chmod
Open
Rename
Delete
rpc
-
chmod
Open
Rename
Delete
__pycache__
-
chmod
Open
Rename
Delete
checkers.py
2.45 MB
chmod
View
DL
Edit
Rename
Delete
constants.py
812 B
chmod
View
DL
Edit
Rename
Delete
control.py
2.5 MB
chmod
View
DL
Edit
Rename
Delete
exceptions.py
306 B
chmod
View
DL
Edit
Rename
Delete
hooks.py
2.39 MB
chmod
View
DL
Edit
Rename
Delete
lookup.py
2.76 MB
chmod
View
DL
Edit
Rename
Delete
model.py
2.32 MB
chmod
View
DL
Edit
Rename
Delete
utils.py
3.48 MB
chmod
View
DL
Edit
Rename
Delete
__init__.py
0 B
chmod
View
DL
Edit
Rename
Delete
Edit file: /opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/feature_management/utils.py
import logging from itertools import chain from typing import List, Any from defence360agent.feature_management import hooks from defence360agent.feature_management.model import FeatureManagementPerms from defence360agent.model import instance from defence360agent.utils import execute_iterable_expression from .lookup import features logger = logging.getLogger(__name__) async def set_feature(user: str, feature: str, value: str) -> bool: """Sets a `feature` to `value` for a given `user`. Calls appropriate hook and returns its (bool) result. Logs the result of setting change. If hook fails rollbacks changes to database. """ with instance.db.atomic() as trx: perm = FeatureManagementPerms.get_perm(user) perm.set_feature(feature, value) hook = hooks.get_hook(feature) ok = hook(user, value) if ok: logger.info( "Applied setting %s=%s for user %s", feature, value, user ) else: logger.error( "Failed to apply setting %s=%s for user %s", feature, value, user, ) trx.rollback() return ok async def update_users( feature: str, users: List[str], value: Any, existing_users: List[str] ): result = {"succeeded": [], "failed": []} for user in users: if user not in existing_users: logger.warning("No such user: %s", user) continue if await set_feature(user, feature, value): result["succeeded"].append(user) else: result["failed"].append(user) return result async def update_default(feature: str, value: Any): perm = FeatureManagementPerms.get_default() perm.set_feature(feature, value) hook = hooks.get_hook(feature) return hook(None, value) async def sync_users(users: List[str]) -> bool: """Synchronize existing permissions with panel users""" panel_users = set(users) perm_users = FeatureManagementPerms.select(FeatureManagementPerms.user) perm_users = set(chain(*perm_users.tuples())) perms_to_remove = perm_users - panel_users perms_to_remove.remove(FeatureManagementPerms.DEFAULT) if perms_to_remove: logger.info("Remove permissions of users %s", perms_to_remove) def expression(perms_to_remove): return FeatureManagementPerms.delete().where( FeatureManagementPerms.user.in_(perms_to_remove) ) execute_iterable_expression(expression, list(perms_to_remove)) perms_to_add = panel_users - perm_users if perms_to_add: logger.info("Add permissions to users %s", perms_to_add) for user in perms_to_add: perm = FeatureManagementPerms.get_perm(user) for feature in features: value = perm.get_feature(feature) callback = hooks.get_hook(feature) callback(user, value) return bool(perms_to_add) or bool(perms_to_remove) async def reset_features(**features): """Sets feature values for all existing users in feature management database to given values in `features`.""" users = list( chain( *FeatureManagementPerms.select(FeatureManagementPerms.user) .where( FeatureManagementPerms.user != FeatureManagementPerms.DEFAULT ) .tuples() ) ) for user in users: for feature, value in features.items(): await set_feature(user, feature, value)
Simpan
Batal
Isi Zip:
Unzip
Create
Buat Folder
Buat File
Terminal / Execute
Run
Chmod Bulk
All File
All Folder
All File dan Folder
Apply