Source code for saltfactories.utils.processes

"""
saltfactories.utils.processes
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Process related utilities
"""
import errno
import logging
import pprint
import signal
import weakref

import attr
import psutil


log = logging.getLogger(__name__)


[docs]@attr.s(frozen=True) class ProcessResult: """ This class serves the purpose of having a common result class which will hold the resulting data from a subprocess command. :keyword int exitcode: The exitcode returned by the process :keyword str stdout: The ``stdout`` returned by the process :keyword str stderr: The ``stderr`` returned by the process :keyword list,tuple cmdline: The command line used to start the process .. admonition:: Note Cast :py:class:`~saltfactories.utils.processes.ProcessResult` to a string to pretty-print it. """ exitcode = attr.ib() stdout = attr.ib() stderr = attr.ib() cmdline = attr.ib(default=None, kw_only=True) @exitcode.validator def _validate_exitcode(self, attribute, value): if not isinstance(value, int): raise ValueError("'exitcode' needs to be an integer, not '{}'".format(type(value))) def __str__(self): message = self.__class__.__name__ if self.cmdline: message += "\n Command Line: {}".format(self.cmdline) if self.exitcode is not None: message += "\n Exitcode: {}".format(self.exitcode) if self.stdout or self.stderr: message += "\n Process Output:" if self.stdout: message += "\n >>>>> STDOUT >>>>>\n{}\n <<<<< STDOUT <<<<<".format(self.stdout) if self.stderr: message += "\n >>>>> STDERR >>>>>\n{}\n <<<<< STDERR <<<<<".format(self.stderr) return message + "\n"
[docs]@attr.s(frozen=True) class ShellResult(ProcessResult): """ This class serves the purpose of having a common result class which will hold the resulting data from a subprocess command. :keyword dict json: The dictionary returned from the process ``stdout`` if it could JSON decode it. Please look at :py:class:`~saltfactories.utils.processes.ProcessResult` for the additional supported keyword arguments documentation. """ json = attr.ib(default=None, kw_only=True) def __str__(self): message = super().__str__().rstrip() if self.json: message += "\n JSON Object:\n" message += "".join(" {}".format(line) for line in pprint.pformat(self.json)) return message + "\n" def __eq__(self, other): """ Allow comparison against the parsed JSON or the output """ if self.json: return self.json == other return self.stdout == other
[docs]def collect_child_processes(pid): """ Try to collect any started child processes of the provided pid :param int pid: The PID of the process """ # Let's get the child processes of the started subprocess try: parent = psutil.Process(pid) children = parent.children(recursive=True) except psutil.NoSuchProcess: children = [] return children
def _get_cmdline(proc): # pylint: disable=protected-access try: return proc._cmdline except AttributeError: # Cache the cmdline since that will be inaccessible once the process is terminated # and we use it in log calls try: cmdline = proc.cmdline() except (psutil.NoSuchProcess, psutil.AccessDenied): # OSX is more restrictive about the above information cmdline = None except OSError: # pragma: no cover # On Windows we've seen something like: # File " c: ... \lib\site-packages\pytestsalt\utils\__init__.py", line 182, in terminate_process # terminate_process_list(process_list, kill=slow_stop is False, slow_stop=slow_stop) # File " c: ... \lib\site-packages\pytestsalt\utils\__init__.py", line 130, in terminate_process_list # _terminate_process_list(process_list, kill=kill, slow_stop=slow_stop) # File " c: ... \lib\site-packages\pytestsalt\utils\__init__.py", line 78, in _terminate_process_list # cmdline = process.cmdline() # File " c: ... \lib\site-packages\psutil\__init__.py", line 786, in cmdline # return self._proc.cmdline() # File " c: ... \lib\site-packages\psutil\_pswindows.py", line 667, in wrapper # return fun(self, *args, **kwargs) # File " c: ... \lib\site-packages\psutil\_pswindows.py", line 745, in cmdline # ret = cext.proc_cmdline(self.pid, use_peb=True) # OSError: [WinError 299] Only part of a ReadProcessMemory or WriteProcessMemory request was completed: 'originated from ReadProcessMemory(ProcessParameters) cmdline = None except RuntimeError: # pragma: no cover # Also on windows # saltfactories\utils\processes\helpers.py:68: in _get_cmdline # cmdline = proc.as_dict() # c: ... \lib\site-packages\psutil\__init__.py:634: in as_dict # ret = meth() # c: ... \lib\site-packages\psutil\__init__.py:1186: in memory_full_info # return self._proc.memory_full_info() # c: ... \lib\site-packages\psutil\_pswindows.py:667: in wrapper # return fun(self, *args, **kwargs) # _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ # # self = <psutil._pswindows.Process object at 0x0000029B7FDA5558> # # @wrap_exceptions # def memory_full_info(self): # basic_mem = self.memory_info() # > uss = cext.proc_memory_uss(self.pid) # E RuntimeError: NtQueryVirtualMemory failed # # c: ... \lib\site-packages\psutil\_pswindows.py:806: RuntimeError cmdline = None if not cmdline: try: cmdline = proc.as_dict() except psutil.NoSuchProcess: cmdline = "<could not be retrived; dead process: {}>".format(proc) except (psutil.AccessDenied, OSError): # pragma: no cover cmdline = weakref.proxy(proc) proc._cmdline = cmdline return proc._cmdline # pylint: enable=protected-access def _terminate_process_list(process_list, kill=False, slow_stop=False): log.info( "Terminating process list:\n%s", pprint.pformat([_get_cmdline(proc) for proc in process_list]), ) for process in process_list[:]: # Iterate over copy of the list if not psutil.pid_exists(process.pid): process_list.remove(process) continue try: if not kill and process.status() == psutil.STATUS_ZOMBIE: # pragma: no cover # Zombie processes will exit once child processes also exit continue if kill: log.info("Killing process(%s): %s", process.pid, _get_cmdline(process)) process.kill() else: log.info("Terminating process(%s): %s", process.pid, _get_cmdline(process)) try: if slow_stop: # Allow coverage data to be written down to disk process.send_signal(signal.SIGTERM) try: process.wait(2) except psutil.TimeoutExpired: # pragma: no cover if psutil.pid_exists(process.pid): continue else: process.terminate() except OSError as exc: # pragma: no cover if exc.errno not in (errno.ESRCH, errno.EACCES): raise if not psutil.pid_exists(process.pid): process_list.remove(process) except psutil.NoSuchProcess: process_list.remove(process)
[docs]def terminate_process_list(process_list, kill=False, slow_stop=False): """ Terminate a list of processes :param ~collections.abc.Iterable process_list: An iterable of :py:class:`psutil.Process` instances to terminate :keyword bool kill: Kill the process instead of terminating it. :keyword bool slow_stop: First try to terminate each process in the list, and if termination was not successful, kill it. """ def on_process_terminated(proc): log.info( "Process %s terminated with exit code: %s", getattr(proc, "_cmdline", proc), proc.returncode, ) # Try to terminate processes with the provided kill and slow_stop parameters log.info("Terminating process list. 1st step. kill: %s, slow stop: %s", kill, slow_stop) # Remove duplicates from the process list seen_pids = [] start_count = len(process_list) for proc in process_list[:]: if proc.pid in seen_pids: process_list.remove(proc) seen_pids.append(proc.pid) end_count = len(process_list) if end_count < start_count: log.debug("Removed %d duplicates from the initial process list", start_count - end_count) _terminate_process_list(process_list, kill=kill, slow_stop=slow_stop) psutil.wait_procs(process_list, timeout=5, callback=on_process_terminated) if process_list: # If there's still processes to be terminated, retry and kill them if slow_stop is False log.info( "Terminating process list. 2nd step. kill: %s, slow stop: %s", slow_stop is False, slow_stop, ) _terminate_process_list(process_list, kill=slow_stop is False, slow_stop=slow_stop) psutil.wait_procs(process_list, timeout=5, callback=on_process_terminated) if process_list: # If there's still processes to be terminated, just kill them, no slow stopping now log.info("Terminating process list. 3rd step. kill: True, slow stop: False") _terminate_process_list(process_list, kill=True, slow_stop=False) psutil.wait_procs(process_list, timeout=5, callback=on_process_terminated) if process_list: # In there's still processes to be terminated, log a warning about it log.warning("Some processes failed to properly terminate: %s", process_list)
[docs]def terminate_process(pid=None, process=None, children=None, kill_children=None, slow_stop=False): """ Try to terminate/kill the started process :keyword int pid: The PID of the process :keyword ~psutil.Process process: An instance of :py:class:`psutil.Process` :keyword ~collections.abc.Iterable children: An iterable of :py:class:`psutil.Process` instances, children to the process being terminated :keyword bool kill_children: Also try to terminate/kill child processes :keyword bool slow_stop: First try to terminate each process in the list, and if termination was not successful, kill it. """ children = children or [] process_list = [] if kill_children is None: # Always kill children if kill the parent process and kill_children was not set kill_children = True if slow_stop is False else kill_children if pid and not process: try: process = psutil.Process(pid) process_list.append(process) except psutil.NoSuchProcess: # Process is already gone process = None if kill_children: if process: children.extend(collect_child_processes(pid)) if children: process_list.extend(children) if process_list: if process: log.info("Stopping process %s and respective children: %s", process, children) else: log.info("Terminating process list: %s", process_list) terminate_process_list(process_list, kill=slow_stop is False, slow_stop=slow_stop)