ACIL FM
Dark
Refresh
Current DIR:
/usr/lib/python3.9/site-packages/cockpit
/
usr
lib
python3.9
site-packages
cockpit
Upload
Zip Selected
Delete Selected
Pilih semua
Nama
Ukuran
Permission
Aksi
channels
-
chmod
Open
Rename
Delete
data
-
chmod
Open
Rename
Delete
_vendor
-
chmod
Open
Rename
Delete
__pycache__
-
chmod
Open
Rename
Delete
beiboot.py
22.15 MB
chmod
View
DL
Edit
Rename
Delete
beipack.py
2.99 MB
chmod
View
DL
Edit
Rename
Delete
bridge.py
12 MB
chmod
View
DL
Edit
Rename
Delete
channel.py
22.32 MB
chmod
View
DL
Edit
Rename
Delete
config.py
3.37 MB
chmod
View
DL
Edit
Rename
Delete
internal_endpoints.py
5.95 MB
chmod
View
DL
Edit
Rename
Delete
jsonutil.py
7.42 MB
chmod
View
DL
Edit
Rename
Delete
osinfo.py
929 B
chmod
View
DL
Edit
Rename
Delete
packages.py
21.25 MB
chmod
View
DL
Edit
Rename
Delete
peer.py
12.45 MB
chmod
View
DL
Edit
Rename
Delete
polkit.py
7.4 MB
chmod
View
DL
Edit
Rename
Delete
polyfills.py
2.27 MB
chmod
View
DL
Edit
Rename
Delete
protocol.py
9.52 MB
chmod
View
DL
Edit
Rename
Delete
remote.py
8.93 MB
chmod
View
DL
Edit
Rename
Delete
router.py
10.08 MB
chmod
View
DL
Edit
Rename
Delete
samples.py
17.02 MB
chmod
View
DL
Edit
Rename
Delete
superuser.py
9.74 MB
chmod
View
DL
Edit
Rename
Delete
transports.py
17.92 MB
chmod
View
DL
Edit
Rename
Delete
_version.py
20 B
chmod
View
DL
Edit
Rename
Delete
__init__.py
68 B
chmod
View
DL
Edit
Rename
Delete
Edit file: /usr/lib/python3.9/site-packages/cockpit/remote.py
# This file is part of Cockpit. # # Copyright (C) 2022 Red Hat, Inc. # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see <https://www.gnu.org/licenses/>. import getpass import logging import re import socket from typing import Dict, List, Optional, Tuple from cockpit._vendor import ferny from .jsonutil import JsonObject, JsonValue, get_str, get_str_or_none from .peer import Peer, PeerError from .router import Router, RoutingRule logger = logging.getLogger(__name__) class PasswordResponder(ferny.AskpassHandler): PASSPHRASE_RE = re.compile(r"Enter passphrase for key '(.*)': ") password: Optional[str] hostkeys_seen: List[Tuple[str, str, str, str, str]] error_message: Optional[str] password_attempts: int def __init__(self, password: Optional[str]): self.password = password self.hostkeys_seen = [] self.error_message = None self.password_attempts = 0 async def do_hostkey(self, reason: str, host: str, algorithm: str, key: str, fingerprint: str) -> bool: self.hostkeys_seen.append((reason, host, algorithm, key, fingerprint)) return False async def do_askpass(self, messages: str, prompt: str, hint: str) -> Optional[str]: logger.debug('Got askpass(%s): %s', hint, prompt) match = PasswordResponder.PASSPHRASE_RE.fullmatch(prompt) if match is not None: # We never unlock private keys — we rather need to throw a # specially-formatted error message which will cause the frontend # to load the named key into the agent for us and try again. path = match.group(1) logger.debug("This is a passphrase request for %s, but we don't do those. Abort.", path) self.error_message = f'locked identity: {path}' return None assert self.password is not None assert self.password_attempts == 0 self.password_attempts += 1 return self.password class SshPeer(Peer): session: Optional[ferny.Session] = None host: str user: Optional[str] password: Optional[str] private: bool async def do_connect_transport(self) -> None: assert self.session is not None logger.debug('Starting ssh session user=%s, host=%s, private=%s', self.user, self.host, self.private) basename, colon, portstr = self.host.rpartition(':') if colon and portstr.isdigit(): host = basename port = int(portstr) else: host = self.host port = None responder = PasswordResponder(self.password) options = {"StrictHostKeyChecking": 'yes'} if self.password is not None: options.update(NumberOfPasswordPrompts='1') else: options.update(PasswordAuthentication="no", KbdInteractiveAuthentication="no") try: await self.session.connect(host, login_name=self.user, port=port, handle_host_key=self.private, options=options, interaction_responder=responder) except (OSError, socket.gaierror) as exc: logger.debug('connecting to host %s failed: %s', host, exc) raise PeerError('no-host', error='no-host', message=str(exc)) from exc except ferny.SshHostKeyError as exc: if responder.hostkeys_seen: # If we saw a hostkey then we can issue a detailed error message # containing the key that would need to be accepted. That will # cause the front-end to present a dialog. _reason, host, algorithm, key, fingerprint = responder.hostkeys_seen[0] error_args = {'host-key': f'{host} {algorithm} {key}', 'host-fingerprint': fingerprint} else: error_args = {} if isinstance(exc, ferny.SshChangedHostKeyError): error = 'invalid-hostkey' elif self.private: error = 'unknown-hostkey' else: # non-private session case. throw a generic error. error = 'unknown-host' logger.debug('SshPeer got a %s %s; private %s, seen hostkeys %r; raising %s with extra args %r', type(exc), exc, self.private, responder.hostkeys_seen, error, error_args) raise PeerError(error, error_args, error=error, auth_method_results={}) from exc except ferny.SshAuthenticationError as exc: logger.debug('authentication to host %s failed: %s', host, exc) results = dict.fromkeys(exc.methods, "not-provided") if 'password' in results and self.password is not None: if responder.password_attempts == 0: results['password'] = 'not-tried' else: results['password'] = 'denied' raise PeerError('authentication-failed', error=responder.error_message or 'authentication-failed', auth_method_results=results) from exc except ferny.SshError as exc: logger.debug('unknown failure connecting to host %s: %s', host, exc) raise PeerError('internal-error', message=str(exc)) from exc args = self.session.wrap_subprocess_args(['cockpit-bridge']) await self.spawn(args, []) def do_kill(self, host: 'str | None', group: 'str | None', message: JsonObject) -> None: if host == self.host: self.close() elif host is None: super().do_kill(host, group, message) def do_authorize(self, message: JsonObject) -> None: if get_str(message, 'challenge').startswith('plain1:'): cookie = get_str(message, 'cookie') self.write_control(command='authorize', cookie=cookie, response=self.password or '') self.password = None # once is enough... def do_superuser_init_done(self) -> None: self.password = None def __init__(self, router: Router, host: str, user: Optional[str], options: JsonObject, *, private: bool) -> None: super().__init__(router) self.host = host self.user = user self.password = get_str(options, 'password', None) self.private = private self.session = ferny.Session() superuser: JsonValue init_superuser = get_str_or_none(options, 'init-superuser', None) if init_superuser in (None, 'none'): superuser = False else: superuser = {'id': init_superuser} self.start_in_background(init_host=host, superuser=superuser) class HostRoutingRule(RoutingRule): remotes: Dict[Tuple[str, Optional[str], Optional[str]], Peer] def __init__(self, router): super().__init__(router) self.remotes = {} def apply_rule(self, options: JsonObject) -> Optional[Peer]: assert self.router is not None assert self.router.init_host is not None host = get_str(options, 'host', self.router.init_host) if host == self.router.init_host: return None user = get_str(options, 'user', None) # HACK: the front-end relies on this for tracking connections without an explicit user name; # the user will then be determined by SSH (`User` in the config or the current user) # See cockpit_router_normalize_host_params() in src/bridge/cockpitrouter.c if user == getpass.getuser(): user = None if not user: user_from_host, _, _ = host.rpartition('@') user = user_from_host or None # avoid '' if get_str(options, 'session', None) == 'private': nonce = get_str(options, 'channel') else: nonce = None assert isinstance(host, str) assert user is None or isinstance(user, str) assert nonce is None or isinstance(nonce, str) key = host, user, nonce logger.debug('Request for channel %s is remote.', options) logger.debug('key=%s', key) if key not in self.remotes: logger.debug('%s is not among the existing remotes %s. Opening a new connection.', key, self.remotes) peer = SshPeer(self.router, host, user, options, private=nonce is not None) peer.add_done_callback(lambda: self.remotes.__delitem__(key)) self.remotes[key] = peer return self.remotes[key] def shutdown(self) -> None: for peer in set(self.remotes.values()): peer.close()
Simpan
Batal
Isi Zip:
Unzip
Create
Buat Folder
Buat File
Terminal / Execute
Run
Chmod Bulk
All File
All Folder
All File dan Folder
Apply