Source code for saltfactories.bases

Factories base classes.
import atexit
import contextlib
import json
import logging
import os
import pprint
from typing import TYPE_CHECKING

import attr
import psutil
import pytest
import yaml
from pytestshellutils.exceptions import FactoryNotStarted
from import Daemon
from import DaemonImpl
from import ScriptSubprocess
from import Subprocess
from import SubprocessImpl
from pytestshellutils.utils import time
from pytestshellutils.utils.processes import ProcessResult
from pytestshellutils.utils.processes import terminate_process

from saltfactories.utils import running_username

log = logging.getLogger(__name__)

# If any --timeout is passed on the CLI, we'll feed it to salt,
# and will increase it internally as a hard timeout by the following amount

[docs] @attr.s(kw_only=True) class SaltMixin: """ Base factory for salt cli's and daemon's. :param dict config: The Salt config dictionary :param str python_executable: The path to the python executable to use :param bool system_service: If true, the daemons and CLI's are run against a system installed salt setup, ie, the default salt system paths apply. """ config = attr.ib(repr=False) id = attr.ib(init=False) # noqa: A003 pylint: disable=invalid-name config_file = attr.ib(init=False) config_dir = attr.ib() python_executable = attr.ib(default=None) system_service = attr.ib(repr=False, default=False) display_name = attr.ib(init=False, default=None) def __attrs_post_init__(self): """ Post attrs initialization routines. """ # We really do not want buffered output self.environ.setdefault("PYTHONUNBUFFERED", "1") # Don't write .pyc files or create them in __pycache__ directories self.environ.setdefault("PYTHONDONTWRITEBYTECODE", "1") from salt.utils.immutabletypes import freeze self.config = freeze(self.config) @config_file.default def _default_config_file(self): return self.config["conf_file"] @config_dir.default def _default_config_dir(self): return os.path.dirname(self.config_file) @id.default def _default_id(self): return self.config["id"]
[docs] def get_display_name(self): """ Returns a human readable name for the factory. """ if self.display_name is None: self.display_name = f"{self.__class__.__name__}(id={!r})" if self.display_name is not None: return self.display_name return super().get_display_name()
[docs] @attr.s(kw_only=True) class SaltCliImpl(SubprocessImpl): """ Salt CLI's subprocess interaction implementation. Please look at :py:class:`` for the additional supported keyword arguments documentation. """
[docs] def cmdline(self, *args, minion_tgt=None, **kwargs): # pylint: disable=arguments-differ """ Construct a list of arguments to use when starting the subprocess. :param str args: Additional arguments to use when starting the subprocess :keyword str minion_tgt: The minion ID to target :keyword kwargs: Additional keyword arguments will be converted into ``key=value`` pairs to be consumed by the salt CLI's """ return self.factory.cmdline(*args, minion_tgt=minion_tgt, **kwargs)
[docs] @attr.s(kw_only=True) class SaltCli(SaltMixin, ScriptSubprocess): """ Base factory for salt cli's. :param bool hard_crash: Pass ``--hard-crash`` to Salt's CLI's Please look at :py:class:`~saltfactories.bases.Salt` and :py:class:`` for the additional supported keyword arguments documentation. """ hard_crash = attr.ib(repr=False, default=False) # Override the following to default to non-mandatory and to None display_name = attr.ib(init=False, default=None) _minion_tgt = attr.ib(repr=False, init=False, default=None) merge_json_output = attr.ib(repr=False, default=True) __cli_timeout_supported__ = attr.ib(repr=False, init=False, default=False) __cli_log_level_supported__ = attr.ib(repr=False, init=False, default=True) __cli_output_supported__ = attr.ib(repr=False, init=False, default=True) __json_output__ = attr.ib(repr=False, init=False, default=False) __merge_json_output__ = attr.ib(repr=False, init=False, default=True) def _get_impl_class(self): return SaltCliImpl def __attrs_post_init__(self): """ Post attrs initialization routines. """ ScriptSubprocess.__attrs_post_init__(self) SaltMixin.__attrs_post_init__(self)
[docs] def get_script_args(self): """ Returns any additional arguments to pass to the CLI script. """ if not self.hard_crash: return super().get_script_args() return ["--hard-crash"]
[docs] def get_minion_tgt(self, minion_tgt=None): """ Return the minion target ID. """ return minion_tgt
[docs] def cmdline( # noqa: C901,PLR0912,PLR0915 self, *args, minion_tgt=None, merge_json_output=None, **kwargs ): # pylint: disable=arguments-differ """ Construct a list of arguments to use when starting the subprocess. :param str args: Additional arguments to use when starting the subprocess :keyword str minion_tgt: The minion ID to target :keyword bool merge_json_output: The default behavior of salt outputters is to print one line per minion return, which makes parsing the whole output as JSON impossible when targeting multiple minions. If this value is ``True``, an attempt is made to merge each JSON line into a single dictionary. :keyword kwargs: Additional keyword arguments will be converted into ``key=value`` pairs to be consumed by the salt CLI's """ log.debug( "Building cmdline. Minion target: %s; Input args: %s; Input kwargs: %s;", minion_tgt, args, kwargs, ) minion_tgt = self._minion_tgt = self.get_minion_tgt(minion_tgt=minion_tgt) if merge_json_output is None: self.__merge_json_output__ = self.merge_json_output else: self.__merge_json_output__ = merge_json_output cmdline = [] # Convert all passed in arguments to strings args = [str(arg) for arg in args] # Handle the config directory flag for arg in args: if arg.startswith("--config-dir="): break if arg in ("-c", "--config-dir"): break else: cmdline.append(f"--config-dir={self.config_dir}") # Handle the timeout CLI flag, if supported if self.__cli_timeout_supported__: salt_cli_timeout_next = False for arg in args: if arg.startswith("--timeout="): # Let's actually change the _terminal_timeout value which is used to # calculate when the run() method should actually timeout try: salt_cli_timeout = int(arg.split("--timeout=")[-1]) except ValueError: # Not a number? Let salt do it's error handling break if ( self.impl._terminal_timeout is None or salt_cli_timeout >= self.impl._terminal_timeout ): self.impl._terminal_timeout = ( int(salt_cli_timeout) + SALT_TIMEOUT_FLAG_INCREASE ) break if salt_cli_timeout_next: try: salt_cli_timeout = int(arg) except ValueError: # Not a number? Let salt do it's error handling break if ( self.impl._terminal_timeout is None or salt_cli_timeout >= self.impl._terminal_timeout ): self.impl._terminal_timeout = ( int(salt_cli_timeout) + SALT_TIMEOUT_FLAG_INCREASE ) break if arg == "-t" or arg.startswith("--timeout"): salt_cli_timeout_next = True continue else: salt_cli_timeout = self.timeout if ( salt_cli_timeout and self.impl._terminal_timeout and self.impl._terminal_timeout > salt_cli_timeout ): salt_cli_timeout = self.impl._terminal_timeout if not salt_cli_timeout and self.impl._terminal_timeout: salt_cli_timeout = self.impl._terminal_timeout if salt_cli_timeout: self.impl._terminal_timeout = salt_cli_timeout + SALT_TIMEOUT_FLAG_INCREASE # Add it to the salt command CLI flags cmdline.append(f"--timeout={salt_cli_timeout}") # Handle the output flag if self.__cli_output_supported__: for idx, arg in enumerate(args): if arg in ("--out", "--output"): self.__json_output__ = args[idx + 1] == "json" break if arg.startswith(("--out=", "--output=")): self.__json_output__ = arg.split("=")[-1].strip() == "json" break else: # No output was passed, the default output is JSON cmdline.append("--out=json") self.__json_output__ = True if self.__json_output__: for arg in args: if arg in ("--out-indent", "--output-indent"): break if arg.startswith(("--out-indent=", "--output-indent=")): break else: # Default to one line per output cmdline.append("--out-indent=0") if self.__cli_log_level_supported__: # Handle the logging flag for arg in args: if arg in ("-l", "--log-level"): break if arg.startswith("--log-level="): break else: # Default to being almost quiet on console output cmdline.append("--log-level=critical") if minion_tgt: cmdline.append(minion_tgt) # Add the remaining args cmdline.extend(args) # Keyword arguments get passed as KEY=VALUE pairs to the CLI for key, value in kwargs.items(): if not isinstance(value, str): value = json.dumps(value) # noqa: PLW2901 cmdline.append(f"{key}={value}") cmdline = super().cmdline(*cmdline) if self.python_executable and cmdline[0] != self.python_executable: cmdline.insert(0, self.python_executable) log.debug("Built cmdline: %s", cmdline) return cmdline
[docs] def process_output(self, stdout, stderr, cmdline=None): # noqa: ARG002 """ Process the output. When possible JSON is loaded from the output. :return: Returns a tuple in the form of ``(stdout, stderr, loaded_json)`` :rtype: tuple """ json_out = None if stdout and self.__json_output__: try: json_out = json.loads(stdout) except ValueError: if self.__merge_json_output__: with contextlib.suppress(ValueError): json_out = json.loads(stdout.replace("}\n{", ", ")) if json_out is None: log.debug("%s failed to load JSON from the following output:\n%r", self, stdout) if ( self.__cli_output_supported__ and json_out and isinstance(json_out, str) and self.__json_output__ ): # Sometimes the parsed JSON is just a string, for example: # OUTPUT: '"The salt master could not be contacted. Is master running?"\n' # LOADED JSON: 'The salt master could not be contacted. Is master running?' # # In this case, we assign the loaded JSON to stdout and reset json_out stdout = json_out json_out = None if ( self.__cli_output_supported__ and json_out and self._minion_tgt and self._minion_tgt != "*" ): with contextlib.suppress(KeyError): json_out = json_out[self._minion_tgt] return stdout, stderr, json_out
[docs] @attr.s(kw_only=True) class SystemdSaltDaemonImpl(DaemonImpl): """ Daemon systemd interaction implementation. Please look at :py:class:`` for the additional supported keyword arguments documentation. """ _process = attr.ib(init=False, repr=False, default=None) _service_name = attr.ib(init=False, repr=False, default=None)
[docs] def cmdline(self, *args): # pylint: disable=arguments-differ """ Construct a list of arguments to use when starting the subprocess. :param str args: Additional arguments to use when starting the subprocess """ if args: # pragma: no cover log.debug( " is ignoring the passed in arguments: %r", self.__class__.__name__, args ) return ("systemctl", "start", self.get_service_name())
[docs] def get_service_name(self): """ Return the systemd service name. """ if self._service_name is None: script_path = self.factory.get_script_path() if os.path.isabs(script_path): script_path = os.path.basename(script_path) self._service_name = script_path return self._service_name
def _internal_run(self, *cmdline): """ Run the given command synchronously. """ result = Subprocess( cwd=self.factory.cwd, environ=self.factory.environ.copy(), system_encoding=self.factory.system_encoding, ).run(*cmdline)"%s %s", self.factory.__class__.__name__, result) return result
[docs] def is_running(self): """ Returns true if the sub-process is alive. """ if self._process is None: ret = self._internal_run("systemctl", "show", "-p", "MainPID", self.get_service_name()) mainpid = ret.stdout.split("=")[-1].strip() if mainpid == "0": return False self._process = psutil.Process(int(mainpid)) return self._process.is_running()
@property def pid(self): """ Return the ``pid`` of the running process. """ if self.is_running(): return return None
[docs] def start(self, *extra_cli_arguments, max_start_attempts=None, start_timeout=None): """ Start the daemon. """ started = super().start( *extra_cli_arguments, max_start_attempts=max_start_attempts, start_timeout=start_timeout ) atexit.register(self.terminate) return started
def _terminate(self): """ This method actually terminates the started daemon. """ # We completely override the parent class method because we're not using the # self._terminal property, it's a systemd service if self._process is None: # pragma: no cover if TYPE_CHECKING: # Make mypy happy assert self._terminal_result return self._terminal_result # pylint: disable=access-member-before-definition atexit.unregister(self.terminate)"Stopping %s", self.factory) pid = # Collect any child processes information before terminating the process with contextlib.suppress(psutil.NoSuchProcess): for child in psutil.Process(pid).children(recursive=True): if child not in self._children: # pylint: disable=access-member-before-definition self._children.append(child) # pylint: disable=access-member-before-definition if self._process.is_running(): # pragma: no cover cmdline = self._process.cmdline() else: # The main pid is not longer alive, try to get the cmdline from systemd ret = self._internal_run( "systemctl", "show", "-p", "ExecStart", self.get_service_name() ) cmdline = ret.stdout.split("argv[]=")[-1].split(";")[0].strip().split() # Tell systemd to stop the service self._internal_run("systemctl", "stop", self.get_service_name()) if self._process.is_running(): # pragma: no cover cmdline = self._process.cmdline() try: self._process.wait() except psutil.TimeoutExpired: self._process.terminate() with contextlib.suppress(psutil.TimeoutExpired): self._process.wait() exitcode = self._process.wait() or 0 # Dereference the internal _process attribute self._process = None # Lets log and kill any child processes left behind, including the main subprocess # if it failed to properly stop terminate_process( pid=pid, kill_children=True, children=self._children, # pylint: disable=access-member-before-definition slow_stop=self.factory.slow_stop, ) if self._terminal_stdout is not None: self._terminal_stdout.close() # pylint: disable=access-member-before-definition if self._terminal_stderr is not None: self._terminal_stderr.close() # pylint: disable=access-member-before-definition stdout = "" ret = self._internal_run("journalctl", "--no-pager", "-u", self.get_service_name()) stderr = ret.stdout try: self._terminal_result = ProcessResult( returncode=exitcode, stdout=stdout, stderr=stderr, cmdline=cmdline )"%s %s", self.factory.__class__.__name__, self._terminal_result) return self._terminal_result finally: self._terminal = None self._terminal_stdout = None self._terminal_stderr = None self._terminal_timeout = None self._children = []
[docs] @attr.s(kw_only=True) class SaltDaemon(SaltMixin, Daemon): """ Base factory for salt daemon's. Please look at :py:class:`~saltfactories.bases.SaltMixin` and :py:class:`` for the additional supported keyword arguments documentation. """ display_name = attr.ib(init=False, default=None) event_listener = attr.ib(repr=False, default=None) factories_manager = attr.ib(repr=False, hash=False, default=None) _started_at = attr.ib(repr=False, default=None) def __attrs_post_init__(self): """ Post attrs initialization routines. """ Daemon.__attrs_post_init__(self) SaltMixin.__attrs_post_init__(self) if self.system_service is True and self.extra_cli_arguments_after_first_start_failure: msg = "You cannot pass `extra_cli_arguments_after_first_start_failure` to a salt system installation setup." raise pytest.UsageError(msg) if self.system_service is False: for arg in self.extra_cli_arguments_after_first_start_failure: if arg in ("-l", "--log-level"): break if arg.startswith("--log-level="): break else: self.extra_cli_arguments_after_first_start_failure.append("--log-level=debug") # Register before start function self.before_start(self._set_started_at) # Register start check function self.start_check(self._check_start_events) def _get_impl_class(self): if self.system_service: return SystemdSaltDaemonImpl return super()._get_impl_class()
[docs] @classmethod def configure( cls, factories_manager, daemon_id, root_dir=None, defaults=None, overrides=None, **configure_kwargs, ): """ Configure the salt daemon. """ return cls._configure( factories_manager, daemon_id, root_dir=root_dir, defaults=defaults, overrides=overrides, **configure_kwargs, )
@classmethod def _configure( cls, factories_manager, daemon_id, root_dir=None, defaults=None, overrides=None, ): raise NotImplementedError
[docs] @classmethod def verify_config(cls, config): """ Verify the configuration dictionary. """ # Do not move these deferred imports. It allows running against a Salt # onedir build in salt's repo checkout. import salt.utils.verify # pylint: disable=import-outside-toplevel salt.utils.verify.verify_env( cls._get_verify_config_entries(config), running_username(), pki_dir=config.get("pki_dir") or "", root_dir=config["root_dir"], )
@classmethod def _get_verify_config_entries(cls, config): raise NotImplementedError
[docs] @classmethod def write_config(cls, config): """ Write the configuration to file. """ config_file = config.pop("conf_file") log.debug( "Writing to configuration file %s. Configuration:\n%s", config_file, pprint.pformat(config), ) # Write down the computed configuration into the config file with open(config_file, "w", encoding="utf-8") as wfh: yaml.safe_dump(config, wfh, default_flow_style=False) loaded_config = cls.load_config(config_file, config) cls.verify_config(loaded_config) return loaded_config
[docs] @classmethod def load_config(cls, config_file, config): """ Return the loaded configuration. Should return the configuration as the daemon would have loaded after parsing the CLI """ raise NotImplementedError
[docs] def get_check_events(self): """ Return salt events to check. Returns list of tuples in the form of `(master_id, event_tag)` check against to ensure the daemon is running. """ raise NotImplementedError
[docs] def cmdline(self, *args): """ Construct a list of arguments to use when starting the subprocess. :param str args: Additional arguments to use when starting the subprocess """ _args = [] # Handle the config directory flag for arg in args: if not isinstance(arg, str): continue if arg.startswith("--config-dir="): break if arg in ("-c", "--config-dir"): break else: _args.append(f"--config-dir={self.config_dir}") # Handle the logging flag for arg in args: if not isinstance(arg, str): continue if arg in ("-l", "--log-level"): break if arg.startswith("--log-level="): break else: # Default to being almost quiet on console output _args.append("--log-level=critical") cmdline = super().cmdline(*(_args + list(args))) if self.python_executable and cmdline[0] != self.python_executable: cmdline.insert(0, self.python_executable) return cmdline
def _set_started_at(self): """ Set the ``_started_at`` attribute on the daemon instance. """ self._started_at = time.time() def _check_start_events(self, timeout_at): """ Check for start events in the Salt event bus to confirm that the daemon is running. """ if not self.event_listener: # pragma: no cover log.debug("The 'event_listener' attribute is not set. Not checking events...") return True check_events = set(self.get_check_events()) if not check_events: log.debug("No events to listen to for %s", self) return True log.debug("Events to check for %s: %s", self, set(self.get_check_events())) checks_start_time = time.time() while time.time() <= timeout_at: if not self.is_running(): msg = f"{self} is no longer running" raise FactoryNotStarted(msg) if not check_events: break check_events -= { (event.daemon_id, event.tag) for event in self.event_listener.get_events( check_events, after_time=self._started_at ) } if check_events: time.sleep(1.5) else: log.error( "Failed to check events after %1.2f seconds for %s. Remaining events to check: %s", time.time() - checks_start_time, self, check_events, ) return False log.debug("All events checked for %s: %s", self, set(self.get_check_events())) return True