"""
..
PYTEST_DONT_REWRITE
Container based factories
"""
import atexit
import logging
import os
import attr
from saltfactories import bases
from saltfactories import CODE_ROOT_DIR
from saltfactories.daemons import minion
from saltfactories.exceptions import FactoryNotStarted
from saltfactories.utils import format_callback_to_string
from saltfactories.utils import ports
from saltfactories.utils import random_string
from saltfactories.utils import time
from saltfactories.utils.processes import ProcessResult
try:
import docker
from docker.errors import APIError
HAS_DOCKER = True
except ImportError: # pragma: no cover
HAS_DOCKER = False
class APIError(Exception):
pass
try:
from requests.exceptions import ConnectionError as RequestsConnectionError
HAS_REQUESTS = True
except ImportError: # pragma: no cover
HAS_REQUESTS = False
class RequestsConnectionError(ConnectionError):
pass
try:
import pywintypes
PyWinTypesError = pywintypes.error # pragma: no cover
except ImportError:
[docs] class PyWinTypesError(Exception):
pass
log = logging.getLogger(__name__)
[docs]@attr.s(kw_only=True)
class Container(bases.Factory):
image = attr.ib()
name = attr.ib(default=None)
check_ports = attr.ib(default=None)
docker_client = attr.ib(repr=False, default=None)
container_run_kwargs = attr.ib(repr=False, default=attr.Factory(dict))
container = attr.ib(init=False, default=None, repr=False)
start_timeout = attr.ib(repr=False, default=30)
max_start_attempts = attr.ib(repr=False, default=3)
_before_start_callbacks = attr.ib(repr=False, hash=False, default=attr.Factory(list))
_before_terminate_callbacks = attr.ib(repr=False, hash=False, default=attr.Factory(list))
_after_start_callbacks = attr.ib(repr=False, hash=False, default=attr.Factory(list))
_after_terminate_callbacks = attr.ib(repr=False, hash=False, default=attr.Factory(list))
_terminate_result = attr.ib(repr=False, hash=False, init=False, default=None)
def __attrs_post_init__(self):
super().__attrs_post_init__()
if self.name is None:
self.name = random_string("factories-")
if self.docker_client is None:
if not HAS_DOCKER:
raise RuntimeError("The docker python library was not found installed")
if not HAS_REQUESTS:
raise RuntimeError("The requests python library was not found installed")
self.docker_client = docker.from_env()
[docs] def before_start(self, callback, *args, **kwargs):
"""
Register a function callback to run before the container starts
:param ~collections.abc.Callable callback:
The function to call back
:keyword args:
The arguments to pass to the callback
:keyword kwargs:
The keyword arguments to pass to the callback
"""
self._before_start_callbacks.append((callback, args, kwargs))
[docs] def after_start(self, callback, *args, **kwargs):
"""
Register a function callback to run after the container starts
:param ~collections.abc.Callable callback:
The function to call back
:keyword args:
The arguments to pass to the callback
:keyword kwargs:
The keyword arguments to pass to the callback
"""
self._after_start_callbacks.append((callback, args, kwargs))
[docs] def before_terminate(self, callback, *args, **kwargs):
"""
Register a function callback to run before the container terminates
:param ~collections.abc.Callable callback:
The function to call back
:keyword args:
The arguments to pass to the callback
:keyword kwargs:
The keyword arguments to pass to the callback
"""
self._before_terminate_callbacks.append((callback, args, kwargs))
[docs] def after_terminate(self, callback, *args, **kwargs):
"""
Register a function callback to run after the container terminates
:param ~collections.abc.Callable callback:
The function to call back
:keyword args:
The arguments to pass to the callback
:keyword kwargs:
The keyword arguments to pass to the callback
"""
self._after_terminate_callbacks.append((callback, args, kwargs))
def start(self, *command, max_start_attempts=None, start_timeout=None):
if self.is_running():
log.warning("%s is already running.", self)
return True
connectable = Container.client_connectable(self.docker_client)
if connectable is not True:
self.terminate()
raise RuntimeError(connectable)
self._terminate_result = None
atexit.register(self.terminate)
factory_started = False
for callback, args, kwargs in self._before_start_callbacks:
try:
callback(*args, **kwargs)
except Exception as exc: # pragma: no cover pylint: disable=broad-except
log.info(
"Exception raised when running %s: %s",
format_callback_to_string(callback, args, kwargs),
exc,
exc_info=True,
)
start_time = time.time()
start_attempts = max_start_attempts or self.max_start_attempts
current_attempt = 0
while current_attempt <= start_attempts:
current_attempt += 1
if factory_started:
break
log.info("Starting %s. Attempt: %d of %d", self, current_attempt, start_attempts)
current_start_time = time.time()
start_running_timeout = current_start_time + (start_timeout or self.start_timeout)
# Start the container
self.container = self.docker_client.containers.run(
self.image,
name=self.name,
detach=True,
stdin_open=True,
command=list(command) or None,
**self.container_run_kwargs
)
while time.time() <= start_running_timeout:
# Don't know why, but if self.container wasn't previously in a running
# state, and now it is, we have to re-set the self.container attribute
# so that it gives valid status information
self.container = self.docker_client.containers.get(self.name)
if self.container.status != "running":
time.sleep(0.25)
continue
self.container = self.docker_client.containers.get(self.name)
logs = self.container.logs(stdout=True, stderr=True, stream=False)
if isinstance(logs, bytes):
stdout = logs.decode()
stderr = None
else:
stdout = logs[0].decode()
stderr = logs[1].decode()
if stdout and stderr:
log.info("Running Container Logs:\n%s\n%s", stdout, stderr)
elif stdout:
log.info("Running Container Logs:\n%s", stdout)
# If we reached this far it means that we got the running status above, and
# now that the container has started, run start checks
try:
if (
self.run_container_start_checks(current_start_time, start_running_timeout)
is False
):
time.sleep(0.5)
continue
except FactoryNotStarted:
self.terminate()
break
log.info(
"The %s factory is running after %d attempts. Took %1.2f seconds",
self,
current_attempt,
time.time() - start_time,
)
factory_started = True
break
else:
# We reached start_running_timeout, re-try
try:
self.container.remove(force=True)
self.container.wait()
except docker.errors.NotFound:
pass
self.container = None
else:
# The factory failed to confirm it's running status
self.terminate()
if factory_started:
for callback, args, kwargs in self._after_start_callbacks:
try:
callback(*args, **kwargs)
except Exception as exc: # pragma: no cover pylint: disable=broad-except
log.info(
"Exception raised when running %s: %s",
format_callback_to_string(callback, args, kwargs),
exc,
exc_info=True,
)
# TODO: Add containers to the processes stats?!
# if self.factories_manager and self.factories_manager.stats_processes is not None:
# self.factories_manager.stats_processes[self.get_display_name()] = psutil.Process(
# self.pid
# )
return factory_started
result = self.terminate()
raise FactoryNotStarted(
"The {} factory has failed to confirm running status after {} attempts, which "
"took {:.2f} seconds({:.2f} seconds each)".format(
self,
current_attempt - 1,
time.time() - start_time,
start_timeout or self.start_timeout,
),
stdout=result.stdout,
stderr=result.stderr,
exitcode=result.exitcode,
)
[docs] def started(self, *command, max_start_attempts=None, start_timeout=None):
"""
Start the container and return it's instance so it can be used as a context manager
"""
self.start(*command, max_start_attempts=max_start_attempts, start_timeout=start_timeout)
return self
def terminate(self):
if self._terminate_result is not None:
# The factory is already terminated
return self._terminate_result
atexit.unregister(self.terminate)
for callback, args, kwargs in self._before_terminate_callbacks:
try:
callback(*args, **kwargs)
except Exception as exc: # pragma: no cover pylint: disable=broad-except
log.info(
"Exception raised when running %s: %s",
format_callback_to_string(callback, args, kwargs),
exc,
exc_info=True,
)
stdout = stderr = None
try:
if self.container is not None:
container = self.docker_client.containers.get(self.name)
logs = container.logs(stdout=True, stderr=True, stream=False)
if isinstance(logs, bytes):
stdout = logs.decode()
else:
stdout = logs[0].decode()
stderr = logs[1].decode()
if stdout and stderr:
log.info("Stopped Container Logs:\n%s\n%s", stdout, stderr)
elif stdout:
log.info("Stopped Container Logs:\n%s", stdout)
if container.status == "running":
container.remove(force=True)
container.wait()
self.container = None
except docker.errors.NotFound:
pass
finally:
for callback, args, kwargs in self._after_terminate_callbacks:
try:
callback(*args, **kwargs)
except Exception as exc: # pragma: no cover pylint: disable=broad-except
log.info(
"Exception raised when running %s: %s",
format_callback_to_string(callback, args, kwargs),
exc,
exc_info=True,
)
self._terminate_result = ProcessResult(exitcode=0, stdout=stdout, stderr=stderr)
return self._terminate_result
[docs] def get_check_ports(self):
"""
Return a list of ports to check against to ensure the daemon is running
"""
return self.check_ports or []
def is_running(self):
if self.container is None:
return False
self.container = self.docker_client.containers.get(self.name)
return self.container.status == "running"
def run(self, *cmd, **kwargs):
if len(cmd) == 1:
cmd = cmd[0]
log.info("%s is running %r ...", self, cmd)
# We force dmux to True so that we always get back both stdout and stderr
container = self.docker_client.containers.get(self.name)
ret = container.exec_run(cmd, demux=True, **kwargs)
exitcode = ret.exit_code
stdout = stderr = None
if ret.output:
stdout, stderr = ret.output
if stdout is not None:
stdout = stdout.decode()
if stderr is not None:
stderr = stderr.decode()
return ProcessResult(exitcode=exitcode, stdout=stdout, stderr=stderr, cmdline=cmd)
@staticmethod
def client_connectable(docker_client):
try:
if not docker_client.ping():
return "The docker client failed to get a ping response from the docker daemon"
return True
except (APIError, RequestsConnectionError, PyWinTypesError) as exc:
return "The docker client failed to ping the docker server: {}".format(exc)
def run_container_start_checks(self, started_at, timeout_at):
checks_start_time = time.time()
while time.time() <= timeout_at:
if not self.is_running():
raise FactoryNotStarted("{} is no longer running".format(self))
if self._container_start_checks():
break
else:
log.error(
"Failed to run container start checks after %1.2f seconds",
time.time() - checks_start_time,
)
return False
check_ports = set(self.get_check_ports())
if not check_ports:
return True
while time.time() <= timeout_at:
if not self.is_running():
raise FactoryNotStarted("{} is no longer running".format(self))
if not check_ports:
break
check_ports -= ports.get_connectable_ports(check_ports)
if check_ports:
time.sleep(0.5)
else:
log.error("Failed to check ports after %1.2f seconds", time.time() - checks_start_time)
return False
return True
def _container_start_checks(self):
return True
def __enter__(self):
if not self.is_running():
raise RuntimeError(
"Factory not yet started. Perhaps you're after something like:\n\n"
"with {}.started() as factory:\n"
" yield factory".format(self.__class__.__name__)
)
return self
def __exit__(self, *_):
self.terminate()
[docs]@attr.s(kw_only=True)
class SaltDaemon(bases.SaltDaemon, Container):
def __attrs_post_init__(self):
self.daemon_started = self.daemon_starting = False
if self.python_executable is None:
# Default to whatever is the default python in the container
self.python_executable = "python"
bases.SaltDaemon.__attrs_post_init__(self)
Container.__attrs_post_init__(self)
# There are some volumes which NEED to exist on the container so
# that configs are in the right place and also our custom salt
# plugins along with the custom scripts to start the daemons.
root_dir = os.path.dirname(self.config["root_dir"])
config_dir = str(self.config_dir)
scripts_dir = str(self.factories_manager.scripts_dir)
volumes = {
root_dir: {"bind": root_dir, "mode": "z"},
scripts_dir: {"bind": scripts_dir, "mode": "z"},
config_dir: {"bind": self.config_dir, "mode": "z"},
str(CODE_ROOT_DIR): {"bind": str(CODE_ROOT_DIR), "mode": "z"},
}
if "volumes" not in self.container_run_kwargs:
self.container_run_kwargs["volumes"] = {}
self.container_run_kwargs["volumes"].update(volumes)
self.container_run_kwargs.setdefault("hostname", self.name)
self.container_run_kwargs.setdefault("auto_remove", True)
[docs] def cmdline(self, *args):
return ["docker", "exec", "-i", self.name] + super().cmdline(*args)
[docs] def start(self, *extra_cli_arguments, max_start_attempts=None, start_timeout=None):
# Start the container
Container.start(self, max_start_attempts=max_start_attempts, start_timeout=start_timeout)
self.daemon_starting = True
# Now that the container is up, let's start the daemon
self.daemon_started = bases.SaltDaemon.start(
self,
*extra_cli_arguments,
max_start_attempts=max_start_attempts,
start_timeout=start_timeout
)
return self.daemon_started
[docs] def terminate(self):
self.daemon_started = self.daemon_starting = False
ret = bases.SaltDaemon.terminate(self)
Container.terminate(self)
return ret
[docs] def is_running(self):
running = Container.is_running(self)
if running is False:
return running
if self.daemon_starting or self.daemon_started:
return bases.SaltDaemon.is_running(self)
return running
[docs] def get_check_ports(self):
"""
Return a list of ports to check against to ensure the daemon is running
"""
return Container.get_check_ports(self) + bases.SaltDaemon.get_check_ports(self)
[docs] def before_start(
self, callback, *args, on_container=False, **kwargs
): # pylint: disable=arguments-differ
"""
Register a function callback to run before the daemon starts
:param ~collections.abc.Callable callback:
The function to call back
:keyword bool on_container:
If true, the callback will be registered on the container and not the daemon.
:keyword args:
The arguments to pass to the callback
:keyword kwargs:
The keyword arguments to pass to the callback
"""
if on_container:
Container.before_start(self, callback, *args, **kwargs)
else:
bases.SaltDaemon.before_start(self, callback, *args, **kwargs)
[docs] def after_start(
self, callback, *args, on_container=False, **kwargs
): # pylint: disable=arguments-differ
"""
Register a function callback to run after the daemon starts
:param ~collections.abc.Callable callback:
The function to call back
:keyword bool on_container:
If true, the callback will be registered on the container and not the daemon.
:keyword args:
The arguments to pass to the callback
:keyword kwargs:
The keyword arguments to pass to the callback
"""
if on_container:
Container.after_start(self, callback, *args, **kwargs)
else:
bases.SaltDaemon.after_start(self, callback, *args, **kwargs)
[docs] def before_terminate(
self, callback, *args, on_container=False, **kwargs
): # pylint: disable=arguments-differ
"""
Register a function callback to run before the daemon terminates
:param ~collections.abc.Callable callback:
The function to call back
:keyword bool on_container:
If true, the callback will be registered on the container and not the daemon.
:keyword args:
The arguments to pass to the callback
:keyword kwargs:
The keyword arguments to pass to the callback
"""
if on_container:
Container.before_terminate(self, callback, *args, **kwargs)
else:
bases.SaltDaemon.before_terminate(self, callback, *args, **kwargs)
[docs] def after_terminate(
self, callback, *args, on_container=False, **kwargs
): # pylint: disable=arguments-differ
"""
Register a function callback to run after the daemon terminates
:param ~collections.abc.Callable callback:
The function to call back
:keyword bool on_container:
If true, the callback will be registered on the container and not the daemon.
:keyword args:
The arguments to pass to the callback
:keyword kwargs:
The keyword arguments to pass to the callback
"""
if on_container:
Container.after_terminate(self, callback, *args, **kwargs)
else:
bases.SaltDaemon.after_terminate(self, callback, *args, **kwargs)
[docs] def started(self, *extra_cli_arguments, max_start_attempts=None, start_timeout=None):
"""
Start the daemon and return it's instance so it can be used as a context manager
"""
return bases.SaltDaemon.started(
self,
*extra_cli_arguments,
max_start_attempts=max_start_attempts,
start_timeout=start_timeout
)
[docs] def get_check_events(self):
"""
Return a list of tuples in the form of `(master_id, event_tag)` check against to ensure the daemon is running
"""
raise NotImplementedError
[docs]@attr.s(kw_only=True, slots=True)
class SaltMinion(SaltDaemon, minion.SaltMinion):
"""
Salt minion daemon implementation running in a docker container
"""
[docs] def get_check_events(self):
"""
Return a list of tuples in the form of `(master_id, event_tag)` check against to ensure the daemon is running
"""
return minion.SaltMinion.get_check_events(self)
[docs] def run_start_checks(self, started_at, timeout_at):
return minion.SaltMinion.run_start_checks(self, started_at, timeout_at)