ACIL FM
Dark
Refresh
Current DIR:
/opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/model
/
opt
imunify360
venv
lib
python3.11
site-packages
defence360agent
model
Upload
Zip Selected
Delete Selected
Pilih semua
Nama
Ukuran
Permission
Aksi
__pycache__
-
chmod
Open
Rename
Delete
analyst_cleanup.py
3.4 MB
chmod
View
DL
Edit
Rename
Delete
event_hook.py
1.74 MB
chmod
View
DL
Edit
Rename
Delete
icontact.py
1.16 MB
chmod
View
DL
Edit
Rename
Delete
infected_domain.py
4.24 MB
chmod
View
DL
Edit
Rename
Delete
instance.py
536 B
chmod
View
DL
Edit
Rename
Delete
messages_to_send.py
1.37 MB
chmod
View
DL
Edit
Rename
Delete
simplification.py
5.01 MB
chmod
View
DL
Edit
Rename
Delete
tls_check.py
1.29 MB
chmod
View
DL
Edit
Rename
Delete
wordpress_incident.py
13.02 MB
chmod
View
DL
Edit
Rename
Delete
__init__.py
963 B
chmod
View
DL
Edit
Rename
Delete
Edit file: /opt/imunify360/venv/lib/python3.11/site-packages/defence360agent/model/analyst_cleanup.py
import peewee as pw from defence360agent.model import Model, instance from datetime import datetime, timezone, timedelta class AnalystCleanupRequest(Model): """ Model for storing analyst cleanup requests. Tracks request details and status for each cleanup request submitted. """ class Meta: database = instance.db db_table = "analyst_cleanup_requests" id = pw.AutoField() username = pw.CharField(null=False) zendesk_id = pw.CharField(null=False) ticket_link = pw.TextField(null=False) created_at = pw.TimestampField( null=False, default=datetime.now(timezone.utc) ) status = pw.CharField( null=False, default="pending", constraints=[ pw.Check("status in ('pending','in_progress','completed')") ], ) last_updated = pw.TimestampField( null=False, default=datetime.now(timezone.utc) ) @classmethod def create_request(cls, username, zendesk_id, ticket_link): """Create a new cleanup request""" return cls.create( username=username, zendesk_id=zendesk_id, ticket_link=ticket_link ) @classmethod def get_user_requests(cls, username, limit=50, offset=0): """Get all requests for a specific user""" return ( cls.select() .where(cls.username == username) .order_by(cls.created_at.desc()) .limit(limit) .offset(offset) ) @classmethod def get_all_requests(cls, limit=50, offset=0): """Get all requests for a sever""" return ( cls.select() .order_by(cls.created_at.desc()) .limit(limit) .offset(offset) ) @classmethod def get_active_request_link(cls, username) -> str | None: """ Gets user requests for a user and checks if there are requests with [pending | in_progress] state. If found, returns ticket_link, otherwise returns None """ active_request = ( cls.select() .where( (cls.username == username) & (cls.status.in_(["pending", "in_progress"])) ) .limit(1) ).first() return active_request.ticket_link if active_request else None @classmethod def update_status(cls, zendesk_id, new_status, last_updated): """Update the status of a request""" return ( cls.update(status=new_status, last_updated=last_updated) .where(cls.zendesk_id == zendesk_id) .execute() ) @classmethod def get_all_relevant_requests(cls): """ Returns a query to fetch active cleanup requests and recently completed requests for the specified users. """ # Calculate the cutoff date for "recently completed" (3 days ago) three_days_ago = datetime.now(timezone.utc) - timedelta(days=3) return AnalystCleanupRequest.select( AnalystCleanupRequest.username, AnalystCleanupRequest.zendesk_id, AnalystCleanupRequest.status, AnalystCleanupRequest.last_updated, ).where( (AnalystCleanupRequest.status.in_(["pending", "in_progress"])) | ( (AnalystCleanupRequest.status == "completed") & (AnalystCleanupRequest.last_updated >= three_days_ago) ) )
Simpan
Batal
Isi Zip:
Unzip
Create
Buat Folder
Buat File
Terminal / Execute
Run
Chmod Bulk
All File
All Folder
All File dan Folder
Apply