mqtt - MQTT gateway extension

This extension provides a gateway with MQTT based communication architectures.

It includes a connector with the core builtin mechanisms implemented. Applications needing a MQTT connectivity only need to sub-class or monkey patch it in order to define the methods handling the in and out communications, in terms of filtering and transformations.

Package modules

The extension is implemented by the pycstbox.mqtt package, composed of the modules documented hereafter.

pycstbox.mqtt.core

Base classes and definitions used for the MQTT gateway.

This gateway is basically based on the MQTTConnector class, which wraps the connection with the MQTT broken in a convenient way, and offers the proper extension points for assembling the complete gateway.

class pycstbox.mqtt.core.MQTTConnector(broker, port, keep_alive=60, client_id=None, login=None, password=None, tls=None, topics=None, logger=None)

The base class taking care of bridging the CSTBox internal message bus and the MQTT one.

This implementation uses paho MQTT client.

Parameters:
  • broker (str) – the broker server public name or IP
  • port (int) – the server connection port
  • keep_alive (int) – delay (in seconds) during which the connection is maintained in the absence of communication
  • client_id (str) – optional client identification, for further exchanges tracking
  • login (str) – optional authentication login
  • password (str) – optional authentication password
  • tls – configuration for TLS if used (not used in current implementation)
  • topics – an optional list of MQTT topics to subscribe to when connected
  • logger – optional logger
publish(*args, **kwargs)

Publish a message on MQTT.

This method is a facade of the MQTT client homonym one, checking that the communication is established before passing the call.

It uses the same signature as the homonym method of the paho Client. Refer to its documentation for details.

connected

Tells if we are currently connected to the broker.

on_message

Handler of MQTT messages.

See paho MQTT Client documentation for details about the signature of the callback.

run()

Starts the connector.

Establish the connection with the MQTT broker, and starts the client loop in threaded mode. Subscription to the MQTT topics of interest (if any) is done in the _on_connect() private callback, to avoid race conditions.

shutdown()

Shutdowns the connector.

class pycstbox.mqtt.core.InboundFilter

Prototype of the filter for incoming messages.

Its role is to define how messages received from the MQTT domain are accepted, and for those which are, translated into messages injected in the CSTBox domain.

Important

The default implementation accept nothing by security. It has to be sub-classed by the application.

accept_event(client, user_data, message)

Tells if an incoming MQTT event is to be published on the CSTBox bus.

The method returns the corresponding CSTBox event(s) which must be published on CSTBox message bus in case the MQTT event is accepted, and the target channel for each one. The return events are either instances of pycstbox.events.BasicEvent or the equivalent tuples (see class definition for their attributes). To ensure the consistency of the time line, the CSTBox events will be dated by the event manager at publication time. Consequently, the time stamp included in the MQTT message will be discarded.

The method result is an iterable of tuples, each one containing the event and the name of the channel on which it must be published.

If the incoming MQTT event is not accepted, the method must return None or an empty list.

Parameters:
  • client – the MQTT client instance
  • user_data – application specific data which can be attached to the client
  • message – the MQTT message
Returns:

the list of CSTBox events to be published

Return type:

iterable of (CSTBox event, channel name) tuples

class pycstbox.mqtt.core.REInboundFilter(rules, logger=None)

Regular expressions based inbound filter.

It is based on regex based rules applied to the message topic, which give in return the variable type and variable name of the message to be emitted, with the channel on which this must be done.

Initializes the rules list by compiling their specifications.

The specifications are provided as an iterable of tuples, which items are :

  • a string containing the regex to be applied to the MQTT message topic

  • an events dispatch list, composed of tuples specifying each of the corresponding CSTBox events to be produced. Each tuple is composed of :

    • the variable type
    • the variable name
    • the channel on which the event will be emitted. If omitted, it is defaulted to pycstbox.evtmgr.CONTROL_EVENT_CHANNEL

    Using an empty dispatch list is allowed, which is equivalent to block the incoming message. This can be useful for dynamically dispatch strategies, so that the rule can be left in place but will produce no CSTBox event in result to the matching MQTT message.

Parameters:
  • rules – an iterable of tuples, as described above
  • logger – optional logging object
Raises ValueError:
 

if invalid regex, or if the topic format contains replaceable parameters not found in the regex

make_event_payload(client, user_data, message, var_type, var_name)

Returns the event payload of the CSTBox event to be produced, based on the content of the incoming MQTT message.

To make things easier for implementors, the result is expected as a tuple, which first item is the value, and the second an optional dictionary of additional information. The effective event payload will be built by the framework, by assembling these two pieces of data.

This method is called by accept_event() when a rule matches and corresponding variable type and name could thus have been determined.

Parameters:
  • client – the MQTT client instance
  • user_data – application specific data which can be attached to the client
  • message – the MQTT message
  • var_type (str) – the variable type of the CSTBox event
  • var_name (str) – the variable name of the CSTBox event
Returns:

the value and optional additional information to be used for the CSTBox event

Type:

tuple

class pycstbox.mqtt.core.OutboundFilter

Prototype of the filter for outgoing messages.

Its role is to define if a given CSTBox message can be published on the MQTT domain, and in this case which is the relevant MQTT topic to be used.

Important

To be sub-classed, since the default implementation forbid any event to go outside of the CSTBox domain.

get_mqtt_topic(timestamp, var_type, var_name, data)

Returns the MQTT topic conforming the broker conventions and corresponding to the CSTBox candidate message.

Return None if the message is not to be passed to the MQTT world (which is the behaviour if the default implementation).

Parameters:
  • timestamp (int) – the event time stamp
  • var_type (str) – the type of the variable the event is related to
  • var_name (str) – the name of the variable
  • data (dict) – the event specific payload
Returns:

the MQTT topic to be used, or None if the message is not allowed to go outside

Return type:

str or None

class pycstbox.mqtt.core.REOutboundFilter(rules, logger=None)

Regular expressions based outbound filter.

It is based on regex based rules, applied to the variable type and variable name, which give the MQTT topic to be used for publication. If no matching is found, the message is not published.

The rules are applied in the sequence defined by the rules table, by checking if the compound key built as <var_type>:<var_name> matches with the rule regex. The corresponding MQTT topîc will be used when sending the message.

Regex can include named capturing groups, which value are used for computing the the topic value. For using this feature, the topic must include named replaceable parameters, which name match regex group ones.

Initializes the rules list by compiling their specifications.

The specifications are provided as an iterable of tuples, which items are :

  • a string containing the regex to be applied to fully qualified variable names
  • the MQTT topic string (or topic format string) to be used for publication
Parameters:
  • rules – an iterable of tuples, as described above
  • logger – optional logging object
Raises ValueError:
 

if invalid regex, or if the topic format contains replaceable parameters not found in the regex

get_mqtt_topic(timestamp, var_type, var_name, data)

Returns the MQTT topic matching the event variable type and name, based on the known regex rules.

For efficiency sake, the result is cached for avoiding scanning the rules tables and applying the regex for already processed variable types and names.

pycstbox.mqtt.dbus_binding

D-Bus binding layer of the MQTT gateway.

This module implements the building blocks for interfacing the MQTTConnector class with the CSTBox D-Bus layer.

The feature is provided by the MQTTGatewayServiceObject which is the worker instance of the service.

The gateway service object works in association with the filters provided by the application. The role of these filters is to filter messages allowing to cross the CSTBox/MQTT boundary, and to translate accepted ones. Their prototypes is specified by classes InboundFilter and OutboundFilter provided by the package module pycstbox.mqtt.core.

pycstbox.mqtt.dbus_binding.load_configuration(cfgfile_path)

Loads the configuration from a file and returns it as a dictionnary.

Parameters:

cfgfile_path (str) – configuration file path

Returns:

the configuration data

Return type:

dict

Raises:
  • ValueError – if path is not provided of configuration data are invalid
  • IOError – if not found or not a file
class pycstbox.mqtt.dbus_binding.MQTTGatewayServiceObject(cfg, inbound_filter=None, outbound_filter=None)

The service object wrapping the gateway process.

It takes care of all the generic tasks. It :

  • listens to both sensor and control CSTBox buses for processing circulating messages for forwarding them in the MQTT world
  • creates a MQTT client for the broker, and listens to the relevant MQTT topîcs, for injecting the corresponding messages in the CSTBox internal communication bus

In both cases, messages are first passed to an application dependant filtering layer, which decides which ones are allowed to cross the boundaries. This layer takes also care of converting the messages in the proper form before publishing.

It is the responsibility of the application to provide these filters when creating an instance of this service object (refer to the __init__ method signature). It is valid to omit one of them, the absence of filter meaning that nothing will go through in the corresponding direction.

The service object is configured based on the content of the configuration data, provided as a dictionary by the process which instantiates it.

See configure() method documentation for specifications of the configuration data.

Note that providing no filter at all is invalid, since it would make the gateway useless.

Parameters:
  • cfg (dict) – the configuration data
  • inbound_filter (InboundFilter) – the filter for MQTT to CSTBox messages (default: None)
  • outbound_filter (OutboundFilter) – the filter for CSTBox to MQTT messages (default: None)
Raises ValueError:
 

if both filters are unset

DEFAULT_BROKER_PORT = 61613

default MQTT broker connection port

configure(cfg)

Configures the service object.

The configuration data are provided as a dictionary, which keys and structure are :

  • broker : sub-dictionary providing the broker connection information

    • host : public host name or IP of the server
    • port : connection port, defaulted to DEFAULT_BROKER_PORT if not provided
    • client_id : optional identifier attached to the MQTT client
  • auth : optional sub-dictionary containing the authentication information

    • username : login
    • password : guess what...
  • listened_topics : optional list of MQTT topics or topic patterns the gateway must subscribe

Parameters:cfg (dict) – configuration data as a dictionary
start()

Starts the service object.

stop()

Stops the service object

Extension service

The extension includes a service, which daemon is provided as the mqttgwd.py script, usually deployed in /opt/cstbox/bin directory.