ACIL FM
Dark
Refresh
Current DIR:
/opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/utils
/
opt
imunify360
venv
lib
python3.11
site-packages
defence360agent
utils
Upload
Zip Selected
Delete Selected
Pilih semua
Nama
Ukuran
Permission
Aksi
__pycache__
-
chmod
Open
Rename
Delete
antivirus_mode.py
497 B
chmod
View
DL
Edit
Rename
Delete
async_utils.py
718 B
chmod
View
DL
Edit
Rename
Delete
benchmark.py
538 B
chmod
View
DL
Edit
Rename
Delete
buffer.py
1.24 MB
chmod
View
DL
Edit
Rename
Delete
check_db.py
7.72 MB
chmod
View
DL
Edit
Rename
Delete
check_lock.py
636 B
chmod
View
DL
Edit
Rename
Delete
cli.py
7.39 MB
chmod
View
DL
Edit
Rename
Delete
common.py
14.41 MB
chmod
View
DL
Edit
Rename
Delete
config.py
999 B
chmod
View
DL
Edit
Rename
Delete
cronjob.py
902 B
chmod
View
DL
Edit
Rename
Delete
doctor.py
1 MB
chmod
View
DL
Edit
Rename
Delete
hyperscan.py
149 B
chmod
View
DL
Edit
Rename
Delete
importer.py
2.67 MB
chmod
View
DL
Edit
Rename
Delete
ipecho.py
3.17 MB
chmod
View
DL
Edit
Rename
Delete
json.py
953 B
chmod
View
DL
Edit
Rename
Delete
kwconfig.py
1.56 MB
chmod
View
DL
Edit
Rename
Delete
parsers.py
11.12 MB
chmod
View
DL
Edit
Rename
Delete
resource_limits.py
2.29 MB
chmod
View
DL
Edit
Rename
Delete
safe_fileops.py
7.99 MB
chmod
View
DL
Edit
Rename
Delete
safe_sequence.py
363 B
chmod
View
DL
Edit
Rename
Delete
serialization.py
1.72 MB
chmod
View
DL
Edit
Rename
Delete
sshutil.py
7.94 MB
chmod
View
DL
Edit
Rename
Delete
subprocess.py
1.53 MB
chmod
View
DL
Edit
Rename
Delete
support.py
5.2 MB
chmod
View
DL
Edit
Rename
Delete
threads.py
1005 B
chmod
View
DL
Edit
Rename
Delete
validate.py
4.27 MB
chmod
View
DL
Edit
Rename
Delete
whmcs.py
7.6 MB
chmod
View
DL
Edit
Rename
Delete
wordpress_mu_plugin.py
1.41 MB
chmod
View
DL
Edit
Rename
Delete
_shutil.py
795 B
chmod
View
DL
Edit
Rename
Delete
__init__.py
55.96 MB
chmod
View
DL
Edit
Rename
Delete
Edit file: /opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/utils/check_db.py
import logging import itertools import os from contextlib import suppress from datetime import datetime from shutil import copy from sqlite3 import connect, DatabaseError from playhouse.sqlite_ext import SqliteExtDatabase from defence360agent.application import app from defence360agent import simple_rpc from defence360agent.contracts.config import Model from defence360agent.model import simplification logger = logging.getLogger(__name__) class OperationError(Exception): pass WORKAROUND_MSG = "Blank database will be created on agent start " def check_and_repair(): base = Model.PATH if simple_rpc.is_running(): raise OperationError( "Cannot perform database check and backup while agent is running. " "Please, stop the imunify360 agent with `service imunify360 stop`" ) elif not os.path.isfile(base): raise OperationError( "DB %s is not exists. %s" % (base, WORKAROUND_MSG) ) else: if is_db_corrupted(base): backup = make_backup(base) if not backup: raise OperationError( "Cannot proceed without backup copy of the database." "Please contact imunify360 support team at " "https://cloudlinux.zendesk.com" ) dump = dump_to_sql(base) logger.info("Removing original corrupted database at %s" % base) # TODO: Notify user in UI that original DB was dropped os.remove(base) if not dump: raise OperationError( "Cannot dump database to sql. Old DB backuped at %s. %s" % (backup, WORKAROUND_MSG) ) else: restored = load_from_sql(base, dump) if not restored: raise OperationError( "Loading dump to new database failed. Database will " "be recreated during migrations." ) if is_db_corrupted(restored): os.remove(restored) raise OperationError( "Restored database is still corrupt. Removing " "restored database. %s" % WORKAROUND_MSG ) logger.info( "Database restored successfully. Removing dump %s" % dump ) os.remove(dump) try: logger.info("Performing migrations on restored database") simplification.migrate() except Exception as e: os.remove(base) raise OperationError( "Migrations on restored database failed: %s. %s" % (e, WORKAROUND_MSG) ) else: if not all_tables_are_present(): os.remove(base) raise OperationError( "Restored database does not " "contain all necessary tables. " "%s" % WORKAROUND_MSG ) def mark_with_timestamp(filename, extension=None): """ >>> mark_with_timestamp('/var/imunify360/imunify360.db') '/var/imunify360/imunify360.db_2017-09-26_03:33:44.705967' >>> mark_with_timestamp('/var/imunify360/imunify360.db', extension='sql') '/var/imunify360/imunify360.db_2017-09-26_03:34:01.098544.sql' """ instant = datetime.now() basename = "{}_{}".format(filename, instant.isoformat("_")) if extension: return basename + ".%s" % extension else: return basename def is_db_corrupted(db_path): logger.info("Database %s integrity check..." % db_path) is_corrupted = True with connect(db_path) as connection: try: cursor = connection.execute("PRAGMA INTEGRITY_CHECK;") result = next(cursor) if "ok" in result: logger.info("Database integrity check succeeded.") is_corrupted = False except DatabaseError as e: logger.warning("DatabaseError detected: %s", e) return is_corrupted def dump_to_sql(db_path): dumpfile = mark_with_timestamp(db_path, extension="sql") logger.info("Dumping imunify360 database to %s" % dumpfile) try: with open(dumpfile, "w") as dump, connect(db_path) as connection: for row in connection.iterdump(): dump.write(row) except (DatabaseError, OSError) as e: logger.error("Error during dump: %s. Operation aborted" % e) with suppress(OSError): os.remove(dumpfile) dumpfile = None return dumpfile def load_from_sql(db_path, dumpfile): # This is unlikely to happen because we delete the original file # but to be defensive here won't hurt in case of reuse in other places. if os.path.exists(db_path): logger.warning( "Database already exists. Loading dump to existing " "database may cause errors. Operation aborted" ) return None logger.info( "Reading dump %s into new database %s..." % (dumpfile, db_path) ) with open(dumpfile, "r") as dump, connect(db_path) as connection: # We cannot read line by line because SQL statements are dumped # not in a statement-per-line way try: sql = dump.read() connection.executescript(sql) except MemoryError as e: logger.error(e) with suppress(OSError): os.remove(db_path) db_path = None return db_path def make_backup(db_path): logger.info("Making backup of the %s..." % db_path) backup_filename = mark_with_timestamp(db_path, "backup") try: copy(db_path, backup_filename) logger.info("Database copied successfully to: %s " % backup_filename) except Exception as e: logger.error("Making backup failed: %s", e) with suppress(OSError): os.remove(backup_filename) backup_filename = None return backup_filename def all_tables_are_present(): logger.info( "Verifying that db schema is up-to-date and all tables are present..." ) models = itertools.chain( *[ simplification.get_models(module) for module in app.MODULES_WITH_MODELS ] ) if all(model.table_exists() for model in models): logger.info("All tables are present") return True else: logger.error("Some tables are missing in db.") return False def recreate_schema() -> None: simplification.instance.db.init(Model.PATH) logger.info("Recreating schema for linked DBs...") attached_schemas = [] for db_path, schema in app.MIGRATIONS_ATTACHED_DBS: logger.info("Attach db: %s", db_path) simplification.instance.db.execute_sql( "ATTACH ? AS ?", (db_path, schema) ) attached_schemas.append(schema) recreate_schema_models(simplification.instance.db, attached_schemas) logger.info("Schema recreated successfully.") def recreate_schema_models( db: SqliteExtDatabase, target_schemas: list[str] ) -> None: models_to_create = [ model for model in itertools.chain( *[ simplification.get_models(module) for module in app.MODULES_WITH_MODELS ] ) if model._meta.schema in target_schemas ] logger.info("%r", models_to_create) # bind models to the db to avoid issues related to initialization order db.bind(models_to_create) db.create_tables(models_to_create) logger.info("Schema models recreated successfully.")
Simpan
Batal
Isi Zip:
Unzip
Create
Buat Folder
Buat File
Terminal / Execute
Run
Chmod Bulk
All File
All Folder
All File dan Folder
Apply