Event Listener#

Salt Factories Event Listener.

A salt events store for all daemons started by salt-factories

class saltfactories.plugins.event_listener.Event(*, daemon_id, tag, stamp, data, full_data, expire_seconds)[source]#

Bases: object

Event wrapper class.

The Event class is a container for a salt event which will live on the EventListener store.

Parameters:
  • daemon_id (str) – The daemon ID which received this event.

  • tag (str) – The event tag of the event.

  • stamp (datetime) – When the event occurred

  • data (dict) – The event payload, filtered of all of Salt’s private keys like _stamp which prevents proper assertions against it.

  • full_data (dict) – The full event payload, as received by the daemon, including all of Salt’s private keys.

  • expire_seconds (int,float) – The time, in seconds, after which the event should be considered as expired and removed from the store.

property expired#

Property to identify if the event has expired, at which time it should be removed from the store.

class saltfactories.plugins.event_listener.MatchedEvents(*, matches, missed)[source]#

Bases: object

MatchedEvents implementation.

The MatchedEvents class is a container which is returned by wait_for_events().

Parameters:
  • matches (set) – A set of Event instances that matched.

  • missed (set) – A set of Event instances that remained unmatched.

One can also easily iterate through all matched events of this class:

matched_events = MatchedEvents(..., ...)
for event in matched_events:
    print(event.tag)
property found_all_events#
Return bool:

True if all events were matched, or False otherwise.

class saltfactories.plugins.event_listener.EventListenerServer(_event_listener, *args, **kwargs)[source]#

Bases: Protocol

TCP Server to receive events forwarded.

connection_made(transport)[source]#

Connection established.

data_received(data)[source]#

Received data.

connection_lost(exc)#

Called when the connection is lost or closed.

The argument is an exception object or None (the latter meaning a regular EOF is received or the connection was aborted or closed).

eof_received()#

Called when the other end calls write_eof() or equivalent.

If this returns a false value (including None), the transport will close itself. If it returns a true value, closing the transport is up to the protocol.

pause_writing()#

Called when the transport’s buffer goes over the high-water mark.

Pause and resume calls are paired – pause_writing() is called once when the buffer goes strictly over the high-water mark (even if subsequent writes increases the buffer size even more), and eventually resume_writing() is called once when the buffer size reaches the low-water mark.

Note that if the buffer size equals the high-water mark, pause_writing() is not called – it must go strictly over. Conversely, resume_writing() is called when the buffer size is equal or lower than the low-water mark. These end conditions are important to ensure that things go as expected when either mark is zero.

NOTE: This is the only Protocol callback that is not called through EventLoop.call_soon() – if it were, it would have no effect when it’s most needed (when the app keeps writing without yielding until pause_writing() is called).

resume_writing()#

Called when the transport’s buffer drains below the low-water mark.

See pause_writing() for details.

class saltfactories.plugins.event_listener.EventListener(*, timeout=120)[source]#

Bases: object

EventListener implementation.

The EventListener is a service started by salt-factories which receives all the events of all the salt masters that it starts. The service runs throughout the whole pytest session.

Parameters:

timeout (int) – How long, in seconds, should a forwarded event stay in the store, after which, it will be deleted.

start_server()[source]#

Start the TCP server.

start()[source]#

Start the event listener.

stop()[source]#

Stop the event listener.

get_events(patterns, after_time=None)[source]#

Get events from the internal store.

Parameters:
  • pattern (Sequence) – An iterable of tuples in the form of ("<daemon-id>", "<event-tag-pattern>"), ie, which daemon ID we’re targeting and the event tag pattern which will be passed to fnmatch() to assert a match.

  • after_time (datetime,float) – After which time to start matching events.

Return set:

A set of matched events

wait_for_events(patterns, timeout=30, after_time=None)[source]#

Wait for a set of patterns to match or until timeout is reached.

Parameters:
  • pattern (Sequence) – An iterable of tuples in the form of ("<daemon-id>", "<event-tag-pattern>"), ie, which daemon ID we’re targeting and the event tag pattern which will be passed to fnmatch() to assert a match.

  • timeout (int,float) – The amount of time to wait for the events, in seconds.

  • after_time (datetime,float) – After which time to start matching events.

Returns:

An instance of MatchedEvents.

Rtype ~saltfactories.plugins.event_listener.MatchedEvents:

register_auth_event_handler(master_id, callback)[source]#

Register a callback to run for every authentication event, to accept or reject the minion authenticating.

Parameters:
  • master_id (str) – The master ID for which the callback should run

  • callback (Callable) – The function while should be called

unregister_auth_event_handler(master_id)[source]#

Un-register the authentication event callback, if any, for the provided master ID.

Parameters:

master_id (str) – The master ID for which the callback is registered

saltfactories.plugins.event_listener.event_listener()[source]#

Event listener session scoped fixture.

All started daemons will forward their events into an instance of EventListener.

This fixture can be used to wait for events:

def test_send(event_listener, salt_master, salt_minion, salt_call_cli):
    event_tag = random_string("salt/test/event/")
    data = {"event.fire": "just test it!!!!"}
    start_time = time.time()
    ret = salt_call_cli.run("event.send", event_tag, data=data)
    assert ret.returncode == 0
    assert ret.data
    assert ret.data is True

    event_pattern = (salt_master.id, event_tag)
    matched_events = event_listener.wait_for_events(
        [event_pattern], after_time=start_time, timeout=30
    )
    assert matched_events.found_all_events
    # At this stage, we got all the events we were waiting for

And assert against those events events:

def test_send(event_listener, salt_master, salt_minion, salt_call_cli):
    # ... check the example above for the initial code ...
    assert matched_events.found_all_events
    # At this stage, we got all the events we were waiting for
    for event in matched_events:
        assert event.data["id"] == salt_minion.id
        assert event.data["cmd"] == "_minion_event"
        assert "event.fire" in event.data["data"]