ACIL FM
Dark
Refresh
Current DIR:
/opt/imunify360/venv/lib/python3.11/site-packages/click
/
opt
imunify360
venv
lib
python3.11
site-packages
click
Upload
Zip Selected
Delete Selected
Pilih semua
Nama
Ukuran
Permission
Aksi
__pycache__
-
chmod
Open
Rename
Delete
core.py
111.41 MB
chmod
View
DL
Edit
Rename
Delete
decorators.py
18.28 MB
chmod
View
DL
Edit
Rename
Delete
exceptions.py
9.06 MB
chmod
View
DL
Edit
Rename
Delete
formatting.py
9.48 MB
chmod
View
DL
Edit
Rename
Delete
globals.py
1.92 MB
chmod
View
DL
Edit
Rename
Delete
parser.py
18.62 MB
chmod
View
DL
Edit
Rename
Delete
py.typed
0 B
chmod
View
DL
Edit
Rename
Delete
shell_completion.py
18.03 MB
chmod
View
DL
Edit
Rename
Delete
termui.py
27.66 MB
chmod
View
DL
Edit
Rename
Delete
testing.py
15.71 MB
chmod
View
DL
Edit
Rename
Delete
types.py
35.54 MB
chmod
View
DL
Edit
Rename
Delete
utils.py
19.82 MB
chmod
View
DL
Edit
Rename
Delete
_compat.py
18.3 MB
chmod
View
DL
Edit
Rename
Delete
_termui_impl.py
23.5 MB
chmod
View
DL
Edit
Rename
Delete
_textwrap.py
1.32 MB
chmod
View
DL
Edit
Rename
Delete
_winconsole.py
7.68 MB
chmod
View
DL
Edit
Rename
Delete
__init__.py
3.06 MB
chmod
View
DL
Edit
Rename
Delete
Edit file: /opt/imunify360/venv/lib/python3.11/site-packages/click/testing.py
import contextlib import io import os import shlex import shutil import sys import tempfile import typing as t from types import TracebackType from . import formatting from . import termui from . import utils from ._compat import _find_binary_reader if t.TYPE_CHECKING: from .core import BaseCommand class EchoingStdin: def __init__(self, input: t.BinaryIO, output: t.BinaryIO) -> None: self._input = input self._output = output self._paused = False def __getattr__(self, x: str) -> t.Any: return getattr(self._input, x) def _echo(self, rv: bytes) -> bytes: if not self._paused: self._output.write(rv) return rv def read(self, n: int = -1) -> bytes: return self._echo(self._input.read(n)) def read1(self, n: int = -1) -> bytes: return self._echo(self._input.read1(n)) # type: ignore def readline(self, n: int = -1) -> bytes: return self._echo(self._input.readline(n)) def readlines(self) -> t.List[bytes]: return [self._echo(x) for x in self._input.readlines()] def __iter__(self) -> t.Iterator[bytes]: return iter(self._echo(x) for x in self._input) def __repr__(self) -> str: return repr(self._input) @contextlib.contextmanager def _pause_echo(stream: t.Optional[EchoingStdin]) -> t.Iterator[None]: if stream is None: yield else: stream._paused = True yield stream._paused = False class _NamedTextIOWrapper(io.TextIOWrapper): def __init__( self, buffer: t.BinaryIO, name: str, mode: str, **kwargs: t.Any ) -> None: super().__init__(buffer, **kwargs) self._name = name self._mode = mode @property def name(self) -> str: return self._name @property def mode(self) -> str: return self._mode def make_input_stream( input: t.Optional[t.Union[str, bytes, t.IO[t.Any]]], charset: str ) -> t.BinaryIO: # Is already an input stream. if hasattr(input, "read"): rv = _find_binary_reader(t.cast(t.IO[t.Any], input)) if rv is not None: return rv raise TypeError("Could not find binary reader for input stream.") if input is None: input = b"" elif isinstance(input, str): input = input.encode(charset) return io.BytesIO(input) class Result: """Holds the captured result of an invoked CLI script.""" def __init__( self, runner: "CliRunner", stdout_bytes: bytes, stderr_bytes: t.Optional[bytes], return_value: t.Any, exit_code: int, exception: t.Optional[BaseException], exc_info: t.Optional[ t.Tuple[t.Type[BaseException], BaseException, TracebackType] ] = None, ): #: The runner that created the result self.runner = runner #: The standard output as bytes. self.stdout_bytes = stdout_bytes #: The standard error as bytes, or None if not available self.stderr_bytes = stderr_bytes #: The value returned from the invoked command. #: #: .. versionadded:: 8.0 self.return_value = return_value #: The exit code as integer. self.exit_code = exit_code #: The exception that happened if one did. self.exception = exception #: The traceback self.exc_info = exc_info @property def output(self) -> str: """The (standard) output as unicode string.""" return self.stdout @property def stdout(self) -> str: """The standard output as unicode string.""" return self.stdout_bytes.decode(self.runner.charset, "replace").replace( "\r\n", "\n" ) @property def stderr(self) -> str: """The standard error as unicode string.""" if self.stderr_bytes is None: raise ValueError("stderr not separately captured") return self.stderr_bytes.decode(self.runner.charset, "replace").replace( "\r\n", "\n" ) def __repr__(self) -> str: exc_str = repr(self.exception) if self.exception else "okay" return f"<{type(self).__name__} {exc_str}>" class CliRunner: """The CLI runner provides functionality to invoke a Click command line script for unittesting purposes in a isolated environment. This only works in single-threaded systems without any concurrency as it changes the global interpreter state. :param charset: the character set for the input and output data. :param env: a dictionary with environment variables for overriding. :param echo_stdin: if this is set to `True`, then reading from stdin writes to stdout. This is useful for showing examples in some circumstances. Note that regular prompts will automatically echo the input. :param mix_stderr: if this is set to `False`, then stdout and stderr are preserved as independent streams. This is useful for Unix-philosophy apps that have predictable stdout and noisy stderr, such that each may be measured independently """ def __init__( self, charset: str = "utf-8", env: t.Optional[t.Mapping[str, t.Optional[str]]] = None, echo_stdin: bool = False, mix_stderr: bool = True, ) -> None: self.charset = charset self.env: t.Mapping[str, t.Optional[str]] = env or {} self.echo_stdin = echo_stdin self.mix_stderr = mix_stderr def get_default_prog_name(self, cli: "BaseCommand") -> str: """Given a command object it will return the default program name for it. The default is the `name` attribute or ``"root"`` if not set. """ return cli.name or "root" def make_env( self, overrides: t.Optional[t.Mapping[str, t.Optional[str]]] = None ) -> t.Mapping[str, t.Optional[str]]: """Returns the environment overrides for invoking a script.""" rv = dict(self.env) if overrides: rv.update(overrides) return rv @contextlib.contextmanager def isolation( self, input: t.Optional[t.Union[str, bytes, t.IO[t.Any]]] = None, env: t.Optional[t.Mapping[str, t.Optional[str]]] = None, color: bool = False, ) -> t.Iterator[t.Tuple[io.BytesIO, t.Optional[io.BytesIO]]]: """A context manager that sets up the isolation for invoking of a command line tool. This sets up stdin with the given input data and `os.environ` with the overrides from the given dictionary. This also rebinds some internals in Click to be mocked (like the prompt functionality). This is automatically done in the :meth:`invoke` method. :param input: the input stream to put into sys.stdin. :param env: the environment overrides as dictionary. :param color: whether the output should contain color codes. The application can still override this explicitly. .. versionchanged:: 8.0 ``stderr`` is opened with ``errors="backslashreplace"`` instead of the default ``"strict"``. .. versionchanged:: 4.0 Added the ``color`` parameter. """ bytes_input = make_input_stream(input, self.charset) echo_input = None old_stdin = sys.stdin old_stdout = sys.stdout old_stderr = sys.stderr old_forced_width = formatting.FORCED_WIDTH formatting.FORCED_WIDTH = 80 env = self.make_env(env) bytes_output = io.BytesIO() if self.echo_stdin: bytes_input = echo_input = t.cast( t.BinaryIO, EchoingStdin(bytes_input, bytes_output) ) sys.stdin = text_input = _NamedTextIOWrapper( bytes_input, encoding=self.charset, name="<stdin>", mode="r" ) if self.echo_stdin: # Force unbuffered reads, otherwise TextIOWrapper reads a # large chunk which is echoed early. text_input._CHUNK_SIZE = 1 # type: ignore sys.stdout = _NamedTextIOWrapper( bytes_output, encoding=self.charset, name="<stdout>", mode="w" ) bytes_error = None if self.mix_stderr: sys.stderr = sys.stdout else: bytes_error = io.BytesIO() sys.stderr = _NamedTextIOWrapper( bytes_error, encoding=self.charset, name="<stderr>", mode="w", errors="backslashreplace", ) @_pause_echo(echo_input) # type: ignore def visible_input(prompt: t.Optional[str] = None) -> str: sys.stdout.write(prompt or "") val = text_input.readline().rstrip("\r\n") sys.stdout.write(f"{val}\n") sys.stdout.flush() return val @_pause_echo(echo_input) # type: ignore def hidden_input(prompt: t.Optional[str] = None) -> str: sys.stdout.write(f"{prompt or ''}\n") sys.stdout.flush() return text_input.readline().rstrip("\r\n") @_pause_echo(echo_input) # type: ignore def _getchar(echo: bool) -> str: char = sys.stdin.read(1) if echo: sys.stdout.write(char) sys.stdout.flush() return char default_color = color def should_strip_ansi( stream: t.Optional[t.IO[t.Any]] = None, color: t.Optional[bool] = None ) -> bool: if color is None: return not default_color return not color old_visible_prompt_func = termui.visible_prompt_func old_hidden_prompt_func = termui.hidden_prompt_func old__getchar_func = termui._getchar old_should_strip_ansi = utils.should_strip_ansi # type: ignore termui.visible_prompt_func = visible_input termui.hidden_prompt_func = hidden_input termui._getchar = _getchar utils.should_strip_ansi = should_strip_ansi # type: ignore old_env = {} try: for key, value in env.items(): old_env[key] = os.environ.get(key) if value is None: try: del os.environ[key] except Exception: pass else: os.environ[key] = value yield (bytes_output, bytes_error) finally: for key, value in old_env.items(): if value is None: try: del os.environ[key] except Exception: pass else: os.environ[key] = value sys.stdout = old_stdout sys.stderr = old_stderr sys.stdin = old_stdin termui.visible_prompt_func = old_visible_prompt_func termui.hidden_prompt_func = old_hidden_prompt_func termui._getchar = old__getchar_func utils.should_strip_ansi = old_should_strip_ansi # type: ignore formatting.FORCED_WIDTH = old_forced_width def invoke( self, cli: "BaseCommand", args: t.Optional[t.Union[str, t.Sequence[str]]] = None, input: t.Optional[t.Union[str, bytes, t.IO[t.Any]]] = None, env: t.Optional[t.Mapping[str, t.Optional[str]]] = None, catch_exceptions: bool = True, color: bool = False, **extra: t.Any, ) -> Result: """Invokes a command in an isolated environment. The arguments are forwarded directly to the command line script, the `extra` keyword arguments are passed to the :meth:`~clickpkg.Command.main` function of the command. This returns a :class:`Result` object. :param cli: the command to invoke :param args: the arguments to invoke. It may be given as an iterable or a string. When given as string it will be interpreted as a Unix shell command. More details at :func:`shlex.split`. :param input: the input data for `sys.stdin`. :param env: the environment overrides. :param catch_exceptions: Whether to catch any other exceptions than ``SystemExit``. :param extra: the keyword arguments to pass to :meth:`main`. :param color: whether the output should contain color codes. The application can still override this explicitly. .. versionchanged:: 8.0 The result object has the ``return_value`` attribute with the value returned from the invoked command. .. versionchanged:: 4.0 Added the ``color`` parameter. .. versionchanged:: 3.0 Added the ``catch_exceptions`` parameter. .. versionchanged:: 3.0 The result object has the ``exc_info`` attribute with the traceback if available. """ exc_info = None with self.isolation(input=input, env=env, color=color) as outstreams: return_value = None exception: t.Optional[BaseException] = None exit_code = 0 if isinstance(args, str): args = shlex.split(args) try: prog_name = extra.pop("prog_name") except KeyError: prog_name = self.get_default_prog_name(cli) try: return_value = cli.main(args=args or (), prog_name=prog_name, **extra) except SystemExit as e: exc_info = sys.exc_info() e_code = t.cast(t.Optional[t.Union[int, t.Any]], e.code) if e_code is None: e_code = 0 if e_code != 0: exception = e if not isinstance(e_code, int): sys.stdout.write(str(e_code)) sys.stdout.write("\n") e_code = 1 exit_code = e_code except Exception as e: if not catch_exceptions: raise exception = e exit_code = 1 exc_info = sys.exc_info() finally: sys.stdout.flush() stdout = outstreams[0].getvalue() if self.mix_stderr: stderr = None else: stderr = outstreams[1].getvalue() # type: ignore return Result( runner=self, stdout_bytes=stdout, stderr_bytes=stderr, return_value=return_value, exit_code=exit_code, exception=exception, exc_info=exc_info, # type: ignore ) @contextlib.contextmanager def isolated_filesystem( self, temp_dir: t.Optional[t.Union[str, "os.PathLike[str]"]] = None ) -> t.Iterator[str]: """A context manager that creates a temporary directory and changes the current working directory to it. This isolates tests that affect the contents of the CWD to prevent them from interfering with each other. :param temp_dir: Create the temporary directory under this directory. If given, the created directory is not removed when exiting. .. versionchanged:: 8.0 Added the ``temp_dir`` parameter. """ cwd = os.getcwd() dt = tempfile.mkdtemp(dir=temp_dir) os.chdir(dt) try: yield dt finally: os.chdir(cwd) if temp_dir is None: try: shutil.rmtree(dt) except OSError: # noqa: B014 pass
Simpan
Batal
Isi Zip:
Unzip
Create
Buat Folder
Buat File
Terminal / Execute
Run
Chmod Bulk
All File
All Folder
All File dan Folder
Apply