# -*- coding: utf-8 -*-
"""
pytest_engine
~~~~~~~~~~~~~
Simple salt engine which will setup a socket to accept connections allowing us to know
when a daemon is up and running
"""
import atexit
import datetime
import logging
import threading
import zmq
try:
from collections.abc import MutableMapping
except ImportError:
# Py2 compat
from collections import MutableMapping
try:
import msgpack
HAS_MSGPACK = True
except ImportError:
HAS_MSGPACK = False
import salt.utils.event
try:
import salt.utils.immutabletypes as immutabletypes
except ImportError:
immutabletypes = None
try:
from salt.utils.data import CaseInsensitiveDict
except ImportError:
CaseInsensitiveDict = None
log = logging.getLogger(__name__)
__virtualname__ = "pytest"
def __virtual__():
role = __opts__["__role"]
pytest_key = "pytest-{}".format(role)
if pytest_key not in __opts__:
return False, "No '{}' key in opts dictionary".format(pytest_key)
pytest_config = __opts__[pytest_key]
if "returner_address" not in pytest_config:
return False, "No 'returner_address' key in opts['{}'] dictionary".format(pytest_key)
if HAS_MSGPACK is False:
return False, "msgpack was not importable. Please install msgpack."
return True
def start():
opts = __opts__ # pylint: disable=undefined-variable
try:
pytest_engine = PyTestEventForwardEngine(opts=opts)
pytest_engine.start()
except Exception: # pragma: no cover pylint: disable=broad-except
log.error("Failed to start PyTestEventForwardEngine", exc_info=True)
raise
[docs]def ext_type_encoder(obj):
"""
Convert any types that msgpack cannot handle on it's own
"""
if isinstance(obj, (datetime.datetime, datetime.date)):
# msgpack doesn't support datetime.datetime and datetime.date datatypes.
return obj.strftime("%Y%m%dT%H:%M:%S.%f")
# The same for immutable types
elif immutabletypes is not None and isinstance(obj, immutabletypes.ImmutableDict):
return dict(obj)
elif immutabletypes is not None and isinstance(obj, immutabletypes.ImmutableList):
return list(obj)
elif immutabletypes is not None and isinstance(obj, immutabletypes.ImmutableSet):
# msgpack can't handle set so translate it to tuple
return tuple(obj)
elif isinstance(obj, set):
# msgpack can't handle set so translate it to tuple
return tuple(obj)
elif CaseInsensitiveDict is not None and isinstance(obj, CaseInsensitiveDict):
return dict(obj)
elif isinstance(obj, MutableMapping):
return dict(obj)
# Nothing known exceptions found. Let msgpack raise its own.
return obj
class PyTestEventForwardEngine:
__slots__ = ("opts", "id", "role", "returner_address", "running_event")
def __init__(self, opts):
self.opts = opts
self.id = self.opts["id"]
self.role = self.opts["__role"]
self.returner_address = self.opts["pytest-{}".format(self.role)]["returner_address"]
self.running_event = threading.Event()
def __repr__(self):
return "<{} role={!r} id={!r}, returner_address={!r} running={!r}>".format(
self.__class__.__name__,
self.role,
self.id,
self.returner_address,
self.running_event.is_set(),
)
def start(self):
if self.running_event.is_set():
return
log.info("%s is starting", self)
atexit.register(self.stop)
self.running_event.set()
try:
context = zmq.Context()
push = context.socket(zmq.PUSH)
log.debug("%s connecting PUSH socket to %s", self, self.returner_address)
push.connect(self.returner_address)
opts = self.opts.copy()
opts["file_client"] = "local"
with salt.utils.event.get_event(
self.role,
sock_dir=opts["sock_dir"],
opts=opts,
listen=True,
) as eventbus:
if self.role == "master":
event_tag = "salt/master/{}/start".format(self.id)
log.info("%s firing event on engine start. Tag: %s", self, event_tag)
load = {"id": self.id, "tag": event_tag, "data": {}}
eventbus.fire_event(load, event_tag)
log.info("%s started", self)
while self.running_event.is_set():
for event in eventbus.iter_events(full=True, auto_reconnect=True):
if not event:
continue
tag = event["tag"]
data = event["data"]
log.debug("%s Received Event; TAG: %r DATA: %r", self, tag, data)
forward = (self.id, tag, data)
try:
dumped = msgpack.dumps(
forward, use_bin_type=True, default=ext_type_encoder
)
push.send(dumped)
log.info("%s forwarded event: %r", self, forward)
except Exception: # pragma: no cover pylint: disable=broad-except
log.error(
"%s failed to forward event: %r", self, forward, exc_info=True
)
finally:
if self.running_event.is_set():
# Some exception happened, unset
self.running_event.clear()
if not push.closed:
push.close(1500)
if not context.closed:
context.term()
def stop(self):
if self.running_event.is_set() is False:
return
log.info("Stopping %s", self)
self.running_event.clear()
log.info("%s stopped", self)