Source code for saltfactories.client

"""
Salt Client in-process implementation.
"""
import logging
import re

import attr
import pytest

log = logging.getLogger(__name__)


[docs] @attr.s(kw_only=True, slots=True) class LocalClient: """ Wrapper class around Salt's local client. """ STATE_FUNCTION_RUNNING_RE = re.compile( r"""The function (?:"|')(?P<state_func>.*)(?:"|') is running as PID """ r"(?P<pid>[\d]+) and was started at (?P<date>.*) with jid (?P<jid>[\d]+)" ) master_config = attr.ib(repr=False) functions_known_to_return_none = attr.ib(repr=False) __client = attr.ib(init=False, repr=False) @functions_known_to_return_none.default def _set_functions_known_to_return_none(self): return ( "data.get", "file.chown", "file.chgrp", "pkg.refresh_db", "ssh.recv_known_host_entries", "time.sleep", ) @__client.default def _set_client(self): # Do not move these deferred imports. It allows running against a Salt # onedir build in salt's repo checkout. import salt.client # pylint: disable=import-outside-toplevel return salt.client.get_local_client(mopts=self.master_config)
[docs] def run(self, function, *args, minion_tgt="minion", timeout=300, **kwargs): """ Run a single salt function. Additional condition the return down to match the behavior of the raw function call. """ if "f_arg" in kwargs: kwargs["arg"] = kwargs.pop("f_arg") if "f_timeout" in kwargs: kwargs["timeout"] = kwargs.pop("f_timeout") ret = self.__client.cmd(minion_tgt, function, args, timeout=timeout, kwarg=kwargs) if minion_tgt not in ret: pytest.fail( "WARNING(SHOULD NOT HAPPEN #1935): Failed to get a reply " "from the minion '{}'. Command output: {}".format(minion_tgt, ret) ) elif ret[minion_tgt] is None and function not in self.functions_known_to_return_none: pytest.fail( "WARNING(SHOULD NOT HAPPEN #1935): Failed to get '{}' from " "the minion '{}'. Command output: {}".format(function, minion_tgt, ret) ) # Try to match stalled state functions ret[minion_tgt] = self._check_state_return(ret[minion_tgt]) return ret[minion_tgt]
def _check_state_return(self, ret): if isinstance(ret, dict): # This is the supposed return format for state calls return ret if isinstance(ret, list): jids = [] # These are usually errors for item in ret[:]: if not isinstance(item, str): # We don't know how to handle this continue match = self.STATE_FUNCTION_RUNNING_RE.match(item) if not match: # We don't know how to handle this continue jid = match.group("jid") if jid in jids: continue jids.append(jid) job_data = self.run("saltutil.find_job", jid) job_kill = self.run("saltutil.kill_job", jid) msg = ( "A running state.single was found causing a state lock. " "Job details: '{}' Killing Job Returned: '{}'".format(job_data, job_kill) ) ret.append(f"[TEST SUITE ENFORCED]{msg}[/TEST SUITE ENFORCED]") return ret