Source code for saltfactories.plugins.event_listener

"""
Event Listener
==============

A salt events store for all daemons started by salt-factories
"""
import copy
import fnmatch
import logging
import threading
import weakref
from collections import deque
from datetime import datetime
from datetime import timedelta

import attr
import msgpack
import pytest
import zmq

from saltfactories.utils import ports
from saltfactories.utils import time

log = logging.getLogger(__name__)


def _convert_stamp(stamp):
    try:
        return datetime.fromisoformat(stamp)
    except AttributeError:  # pragma: no cover
        # Python < 3.7
        return datetime.strptime(stamp, "%Y-%m-%dT%H:%M:%S.%f")


[docs]@attr.s(kw_only=True, slots=True, hash=True, frozen=True) class Event: """ The ``Event`` class is a container for a salt event which will live on the :py:class:`~saltfactories.plugins.event_listener.EventListener` store. :keyword str daemon_id: The daemon ID which received this event. :keyword str tag: The event tag of the event. :keyword ~datetime.datetime stamp: When the event occurred :keyword dict data: The event payload, filtered of all of Salt's private keys like ``_stamp`` which prevents proper assertions against it. :keyword dict full_data: The full event payload, as received by the daemon, including all of Salt's private keys. :keyword int,float expire_seconds: The time, in seconds, after which the event should be considered as expired and removed from the store. """ daemon_id = attr.ib() tag = attr.ib() stamp = attr.ib(converter=_convert_stamp) data = attr.ib(hash=False) full_data = attr.ib(hash=False) expire_seconds = attr.ib(hash=False) _expire_at = attr.ib(init=False, hash=False) @_expire_at.default def _set_expire_at(self): return self.stamp + timedelta(seconds=self.expire_seconds) @property def expired(self): """ Property to identify if the event has expired, at which time it should be removed from the store. """ if datetime.utcnow() < self._expire_at: return False return True
[docs]@attr.s(kw_only=True, slots=True, hash=True, frozen=True) class MatchedEvents: """ The ``MatchedEvents`` class is a container which is returned by :py:func:`~saltfactories.plugins.event_listener.EventListener.wait_for_events`. :keyword set matches: A :py:class:`set` of :py:class:`~saltfactories.plugins.event_listener.Event` instances that matched. :keyword set missed: A :py:class:`set` of :py:class:`~saltfactories.plugins.event_listener.Event` instances that remained unmatched. One can also easily iterate through all matched events of this class: .. code-block:: python matched_events = MatchedEvents(..., ...) for event in matched_events: print(event.tag) """ matches = attr.ib() missed = attr.ib() @property def found_all_events(self): """ :return bool: :py:class:`True` if all events were matched, or :py:class:`False` otherwise. """ return (not self.missed) is True def __iter__(self): return iter(self.matches)
[docs]@attr.s(kw_only=True, slots=True, hash=True) class EventListener: """ The ``EventListener`` is a service started by salt-factories which receives all the events of all the salt masters that it starts. The service runs throughout the whole pytest session. :keyword int timeout: How long, in seconds, should a forwarded event stay in the store, after which, it will be deleted. """ timeout = attr.ib(default=120) address = attr.ib(init=False) store = attr.ib(init=False, repr=False, hash=False) sentinel = attr.ib(init=False, repr=False, hash=False) sentinel_event = attr.ib(init=False, repr=False, hash=False) running_event = attr.ib(init=False, repr=False, hash=False) running_thread = attr.ib(init=False, repr=False, hash=False) cleanup_thread = attr.ib(init=False, repr=False, hash=False) auth_event_handlers = attr.ib(init=False, repr=False, hash=False) def __attrs_post_init__(self): self.store = deque(maxlen=10000) self.address = "tcp://127.0.0.1:{}".format(ports.get_unused_localhost_port()) self.running_event = threading.Event() self.running_thread = threading.Thread(target=self._run) self.cleanup_thread = threading.Thread(target=self._cleanup) self.sentinel = msgpack.dumps(None) self.sentinel_event = threading.Event() self.auth_event_handlers = weakref.WeakValueDictionary() def _run(self): context = zmq.Context() puller = context.socket(zmq.PULL) log.debug("%s Binding PULL socket to %s", self, self.address) puller.bind(self.address) if msgpack.version >= (0, 5, 2): msgpack_kwargs = {"raw": False} else: # pragma: no cover msgpack_kwargs = {"encoding": "utf-8"} log.debug("%s started", self) self.running_event.set() while self.running_event.is_set(): payload = puller.recv() if payload == self.sentinel: log.info("%s Received stop sentinel...", self) self.sentinel_event.set() break try: decoded = msgpack.loads(payload, **msgpack_kwargs) except ValueError: # pragma: no cover log.error( "%s Failed to msgpack.load message with payload: %s", self, payload, exc_info=True, ) continue if decoded is None: log.info("%s Received stop sentinel...", self) self.sentinel_event.set() break try: daemon_id, tag, data = decoded # Salt's event data has some "private" keys, for example, "_stamp" which # get in the way of direct assertions. # We'll just store a full_data attribute and clean up the regular data of these keys full_data = copy.deepcopy(data) for key in list(data): if key.startswith("_"): data.pop(key) event = Event( daemon_id=daemon_id, tag=tag, stamp=full_data["_stamp"], data=data, full_data=full_data, expire_seconds=self.timeout, ) log.info("%s received event: %s", self, event) self.store.append(event) if tag == "salt/auth": auth_event_callback = self.auth_event_handlers.get(daemon_id) if auth_event_callback: try: auth_event_callback(data) except Exception as exc: # pragma: no cover pylint: disable=broad-except log.error( "%s Error calling %r: %s", self, auth_event_callback, exc, exc_info=True, ) log.debug("%s store size after event received: %d", self, len(self.store)) except Exception: # pragma: no cover pylint: disable=broad-except log.error("%s Something funky happened", self, exc_info=True) puller.close(0) context.term() # We need to keep these events stored, restart zmq socket context = zmq.Context() puller = context.socket(zmq.PULL) log.debug("%s Binding PULL socket to %s", self, self.address) puller.bind(self.address) puller.close(1500) context.term() log.debug("%s is no longer running", self) def _cleanup(self): cleanup_at = time.time() + 30 while self.running_event.is_set(): if time.time() < cleanup_at: time.sleep(1) continue # Reset cleanup time cleanup_at = time.time() + 30 # Cleanup expired events to_remove = [] for event in self.store: if event.expired: to_remove.append(event) for event in to_remove: log.debug("%s Removing from event store: %s", self, event) self.store.remove(event) log.debug("%s store size after cleanup: %s", self, len(self.store)) def start(self): if self.running_event.is_set(): # pragma: no cover return log.debug("%s is starting", self) self.running_thread.start() # Wait for the thread to start if self.running_event.wait(5) is not True: self.running_event.clear() raise RuntimeError("Failed to start the event listener") self.cleanup_thread.start() def stop(self): if self.running_event.is_set() is False: # pragma: no cover return log.debug("%s is stopping", self) self.store.clear() self.auth_event_handlers.clear() context = zmq.Context() push = context.socket(zmq.PUSH) push.connect(self.address) try: push.send(self.sentinel) log.debug("%s Sent sentinel to trigger log server shutdown", self) if self.sentinel_event.wait(5) is not True: # pragma: no cover log.warning( "%s Failed to wait for the reception of the stop sentinel message. Stopping anyway.", self, ) finally: push.close(1500) context.term() self.running_event.clear() log.debug("%s Joining running thread...", self) self.running_thread.join(7) if self.running_thread.is_alive(): # pragma: no cover log.debug("%s The running thread is still alive. Waiting a little longer...", self) self.running_thread.join(5) if self.running_thread.is_alive(): log.debug( "%s The running thread is still alive. Exiting anyway and let GC take care of it", self, ) log.debug("%s Joining cleanup thread...", self) self.cleanup_thread.join(7) if self.cleanup_thread.is_alive(): # pragma: no cover log.debug("%s The cleanup thread is still alive. Waiting a little longer...", self) self.cleanup_thread.join(5) if self.cleanup_thread.is_alive(): log.debug( "%s The cleanup thread is still alive. Exiting anyway and let GC take care of it", self, ) log.debug("%s stopped", self)
[docs] def get_events(self, patterns, after_time=None): """ Get events from the internal store. :param ~collections.abc.Sequence pattern: An iterable of tuples in the form of ``("<daemon-id>", "<event-tag-pattern>")``, ie, which daemon ID we're targeting and the event tag pattern which will be passed to :py:func:`~fnmatch.fnmatch` to assert a match. :keyword ~datetime.datetime,float after_time: After which time to start matching events. :return set: A set of matched events """ if after_time is None: after_time = datetime.utcnow() elif isinstance(after_time, float): after_time = datetime.utcfromtimestamp(after_time) after_time_iso = after_time.isoformat() log.debug( "%s is checking for event patterns happening after %s: %s", self, after_time_iso, set(patterns), ) found_events = set() patterns = set(patterns) for event in copy.copy(self.store): if event.expired: # Too old, carry on continue if event.stamp < after_time: continue for pattern in set(patterns): _daemon_id, _pattern = pattern if event.daemon_id != _daemon_id: continue if fnmatch.fnmatch(event.tag, _pattern): log.debug("%s Found matching pattern: %s", self, pattern) found_events.add(event) if found_events: log.debug( "%s found the following patterns happening after %s: %s", self, after_time_iso, found_events, ) else: log.debug( "%s did not find any matching event patterns happening after %s", self, after_time_iso, ) return found_events
[docs] def wait_for_events(self, patterns, timeout=30, after_time=None): """ Wait for a set of patterns to match or until timeout is reached. :param ~collections.abc.Sequence pattern: An iterable of tuples in the form of ``("<daemon-id>", "<event-tag-pattern>")``, ie, which daemon ID we're targeting and the event tag pattern which will be passed to :py:func:`~fnmatch.fnmatch` to assert a match. :keyword int,float timeout: The amount of time to wait for the events, in seconds. :keyword ~datetime.datetime,float after_time: After which time to start matching events. :return: An instance of :py:class:`~saltfactories.plugins.event_listener.MatchedEvents`. :rtype ~saltfactories.plugins.event_listener.MatchedEvents: """ if after_time is None: after_time = datetime.utcnow() elif isinstance(after_time, float): after_time = datetime.utcfromtimestamp(after_time) after_time_iso = after_time.isoformat() log.debug( "%s is waiting for event patterns happening after %s: %s", self, after_time_iso, set(patterns), ) found_events = set() patterns = set(patterns) timeout_at = time.time() + timeout while True: if not patterns: return True for event in copy.copy(self.store): if event.expired: # Too old, carry on continue if event.stamp < after_time: continue for pattern in set(patterns): _daemon_id, _pattern = pattern if event.daemon_id != _daemon_id: continue if fnmatch.fnmatch(event.tag, _pattern): log.debug("%s Found matching pattern: %s", self, pattern) found_events.add(event) patterns.remove((event.daemon_id, _pattern)) if not patterns: break if time.time() > timeout_at: break time.sleep(0.5) return MatchedEvents(matches=found_events, missed=patterns)
[docs] def register_auth_event_handler(self, master_id, callback): """ Register a callback to run for every authentication event, to accept or reject the minion authenticating. :param str master_id: The master ID for which the callback should run :type callback: ~collections.abc.Callable :param callback: The function while should be called """ self.auth_event_handlers[master_id] = callback
[docs] def unregister_auth_event_handler(self, master_id): """ Un-register the authentication event callback, if any, for the provided master ID :param str master_id: The master ID for which the callback is registered """ self.auth_event_handlers.pop(master_id, None)
[docs]@pytest.fixture(scope="session") def event_listener(request): """ All started daemons will forward their events into an instance of :py:class:`~saltfactories.plugins.event_listener.EventListener`. This fixture can be used to wait for events: .. code-block:: python def test_send(event_listener, salt_master, salt_minion, salt_call_cli): event_tag = random_string("salt/test/event/") data = {"event.fire": "just test it!!!!"} start_time = time.time() ret = salt_call_cli.run("event.send", event_tag, data=data) assert ret.exitcode == 0 assert ret.json assert ret.json is True event_pattern = (salt_master.id, event_tag) matched_events = event_listener.wait_for_events( [event_pattern], after_time=start_time, timeout=30 ) assert matched_events.found_all_events # At this stage, we got all the events we were waiting for And assert against those events events: .. code-block:: python def test_send(event_listener, salt_master, salt_minion, salt_call_cli): # ... check the example above for the initial code ... assert matched_events.found_all_events # At this stage, we got all the events we were waiting for for event in matched_events: assert event.data["id"] == salt_minion.id assert event.data["cmd"] == "_minion_event" assert "event.fire" in event.data["data"] """ return request.config.pluginmanager.get_plugin("saltfactories-event-listener")
def pytest_configure(config): event_listener = EventListener() config.pluginmanager.register(event_listener, "saltfactories-event-listener") @pytest.hookimpl(tryfirst=True) def pytest_sessionstart(session): event_listener = session.config.pluginmanager.get_plugin("saltfactories-event-listener") event_listener.start() @pytest.hookimpl(trylast=True) def pytest_sessionfinish(session): event_listener = session.config.pluginmanager.get_plugin("saltfactories-event-listener") event_listener.stop()