mqtt - MQTT gateway extension¶
This extension provides a gateway with MQTT based communication architectures.
It includes a connector with the core builtin mechanisms implemented. Applications needing a MQTT connectivity only need to sub-class or monkey patch it in order to define the methods handling the in and out communications, in terms of filtering and transformations.
Package modules¶
The extension is implemented by the pycstbox.mqtt
package, composed of the modules documented hereafter.
pycstbox.mqtt.core¶
Base classes and definitions used for the MQTT gateway.
This gateway is basically based on the MQTTConnector
class, which
wraps the connection with the MQTT broken in a convenient way, and offers the proper
extension points for assembling the complete gateway.
-
class
pycstbox.mqtt.core.
MQTTConnector
(broker, port, keep_alive=60, client_id=None, login=None, password=None, tls=None, topics=None, logger=None)¶ The base class taking care of bridging the CSTBox internal message bus and the MQTT one.
This implementation uses paho MQTT client.
Parameters: - broker (str) – the broker server public name or IP
- port (int) – the server connection port
- keep_alive (int) – delay (in seconds) during which the connection is maintained in the absence of communication
- client_id (str) – optional client identification, for further exchanges tracking
- login (str) – optional authentication login
- password (str) – optional authentication password
- tls – configuration for TLS if used (not used in current implementation)
- topics – an optional list of MQTT topics to subscribe to when connected
- logger – optional logger
-
publish
(*args, **kwargs)¶ Publish a message on MQTT.
This method is a facade of the MQTT client homonym one, checking that the communication is established before passing the call.
It uses the same signature as the homonym method of the paho Client. Refer to its documentation for details.
-
connected
¶ Tells if we are currently connected to the broker.
-
on_message
¶ Handler of MQTT messages.
See paho MQTT Client documentation for details about the signature of the callback.
-
run
()¶ Starts the connector.
Establish the connection with the MQTT broker, and starts the client loop in threaded mode. Subscription to the MQTT topics of interest (if any) is done in the
_on_connect()
private callback, to avoid race conditions.
-
shutdown
()¶ Shutdowns the connector.
-
class
pycstbox.mqtt.core.
InboundFilter
¶ Prototype of the filter for incoming messages.
Its role is to define how messages received from the MQTT domain are accepted, and for those which are, translated into messages injected in the CSTBox domain.
Important
The default implementation accept nothing by security. It has to be sub-classed by the application.
-
accept_event
(client, user_data, message)¶ Tells if an incoming MQTT event is to be published on the CSTBox bus.
The method returns the corresponding CSTBox event(s) which must be published on CSTBox message bus in case the MQTT event is accepted, and the target channel for each one. The return events are either instances of
pycstbox.events.BasicEvent
or the equivalent tuples (see class definition for their attributes). To ensure the consistency of the time line, the CSTBox events will be dated by the event manager at publication time. Consequently, the time stamp included in the MQTT message will be discarded.The method result is an iterable of tuples, each one containing the event and the name of the channel on which it must be published.
If the incoming MQTT event is not accepted, the method must return None or an empty list.
Parameters: - client – the MQTT client instance
- user_data – application specific data which can be attached to the client
- message – the MQTT message
Returns: the list of CSTBox events to be published
Return type: iterable of (CSTBox event, channel name) tuples
-
-
class
pycstbox.mqtt.core.
REInboundFilter
(rules, logger=None)¶ Regular expressions based inbound filter.
It is based on regex based rules applied to the message topic, which give in return the variable type and variable name of the message to be emitted, with the channel on which this must be done.
Initializes the rules list by compiling their specifications.
The specifications are provided as an iterable of tuples, which items are :
a string containing the regex to be applied to the MQTT message topic
an events dispatch list, composed of tuples specifying each of the corresponding CSTBox events to be produced. Each tuple is composed of :
- the variable type
- the variable name
- the channel on which the event will be emitted. If omitted, it is defaulted to
pycstbox.evtmgr.CONTROL_EVENT_CHANNEL
Using an empty dispatch list is allowed, which is equivalent to block the incoming message. This can be useful for dynamically dispatch strategies, so that the rule can be left in place but will produce no CSTBox event in result to the matching MQTT message.
Parameters: - rules – an iterable of tuples, as described above
- logger – optional logging object
Raises ValueError: if invalid regex, or if the topic format contains replaceable parameters not found in the regex
-
make_event_payload
(client, user_data, message, var_type, var_name)¶ Returns the event payload of the CSTBox event to be produced, based on the content of the incoming MQTT message.
To make things easier for implementors, the result is expected as a tuple, which first item is the value, and the second an optional dictionary of additional information. The effective event payload will be built by the framework, by assembling these two pieces of data.
This method is called by
accept_event()
when a rule matches and corresponding variable type and name could thus have been determined.Parameters: - client – the MQTT client instance
- user_data – application specific data which can be attached to the client
- message – the MQTT message
- var_type (str) – the variable type of the CSTBox event
- var_name (str) – the variable name of the CSTBox event
Returns: the value and optional additional information to be used for the CSTBox event
Type: tuple
-
class
pycstbox.mqtt.core.
OutboundFilter
¶ Prototype of the filter for outgoing messages.
Its role is to define if a given CSTBox message can be published on the MQTT domain, and in this case which is the relevant MQTT topic to be used.
Important
To be sub-classed, since the default implementation forbid any event to go outside of the CSTBox domain.
-
get_mqtt_topic
(timestamp, var_type, var_name, data)¶ Returns the MQTT topic conforming the broker conventions and corresponding to the CSTBox candidate message.
Return None if the message is not to be passed to the MQTT world (which is the behaviour if the default implementation).
Parameters: - timestamp (int) – the event time stamp
- var_type (str) – the type of the variable the event is related to
- var_name (str) – the name of the variable
- data (dict) – the event specific payload
Returns: the MQTT topic to be used, or None if the message is not allowed to go outside
Return type: str or None
-
-
class
pycstbox.mqtt.core.
REOutboundFilter
(rules, logger=None)¶ Regular expressions based outbound filter.
It is based on regex based rules, applied to the variable type and variable name, which give the MQTT topic to be used for publication. If no matching is found, the message is not published.
The rules are applied in the sequence defined by the rules table, by checking if the compound key built as <var_type>:<var_name> matches with the rule regex. The corresponding MQTT topîc will be used when sending the message.
Regex can include named capturing groups, which value are used for computing the the topic value. For using this feature, the topic must include named replaceable parameters, which name match regex group ones.
Initializes the rules list by compiling their specifications.
The specifications are provided as an iterable of tuples, which items are :
- a string containing the regex to be applied to fully qualified variable names
- the MQTT topic string (or topic format string) to be used for publication
Parameters: - rules – an iterable of tuples, as described above
- logger – optional logging object
Raises ValueError: if invalid regex, or if the topic format contains replaceable parameters not found in the regex
-
get_mqtt_topic
(timestamp, var_type, var_name, data)¶ Returns the MQTT topic matching the event variable type and name, based on the known regex rules.
For efficiency sake, the result is cached for avoiding scanning the rules tables and applying the regex for already processed variable types and names.
pycstbox.mqtt.dbus_binding¶
D-Bus binding layer of the MQTT gateway.
This module implements the building blocks for interfacing the MQTTConnector
class
with the CSTBox D-Bus layer.
The feature is provided by the MQTTGatewayServiceObject
which is the worker instance
of the service.
The gateway service object works in association with the filters provided by the
application. The role of these filters is to filter messages allowing to cross the CSTBox/MQTT
boundary, and to translate accepted ones. Their prototypes is specified by classes InboundFilter
and OutboundFilter
provided by the package module pycstbox.mqtt.core
.
-
pycstbox.mqtt.dbus_binding.
load_configuration
(cfgfile_path)¶ Loads the configuration from a file and returns it as a dictionnary.
Parameters: cfgfile_path (str) – configuration file path
Returns: the configuration data
Return type: dict
Raises: - ValueError – if path is not provided of configuration data are invalid
- IOError – if not found or not a file
-
class
pycstbox.mqtt.dbus_binding.
MQTTGatewayServiceObject
(cfg, inbound_filter=None, outbound_filter=None)¶ The service object wrapping the gateway process.
It takes care of all the generic tasks. It :
- listens to both sensor and control CSTBox buses for processing circulating messages for forwarding them in the MQTT world
- creates a MQTT client for the broker, and listens to the relevant MQTT topîcs, for injecting the corresponding messages in the CSTBox internal communication bus
In both cases, messages are first passed to an application dependant filtering layer, which decides which ones are allowed to cross the boundaries. This layer takes also care of converting the messages in the proper form before publishing.
It is the responsibility of the application to provide these filters when creating an instance of this service object (refer to the __init__ method signature). It is valid to omit one of them, the absence of filter meaning that nothing will go through in the corresponding direction.
The service object is configured based on the content of the configuration data, provided as a dictionary by the process which instantiates it.
See
configure()
method documentation for specifications of the configuration data.Note that providing no filter at all is invalid, since it would make the gateway useless.
Parameters: - cfg (dict) – the configuration data
- inbound_filter (InboundFilter) – the filter for MQTT to CSTBox messages (default: None)
- outbound_filter (OutboundFilter) – the filter for CSTBox to MQTT messages (default: None)
Raises ValueError: if both filters are unset
-
DEFAULT_BROKER_PORT
= 61613¶ default MQTT broker connection port
-
configure
(cfg)¶ Configures the service object.
The configuration data are provided as a dictionary, which keys and structure are :
broker : sub-dictionary providing the broker connection information
- host : public host name or IP of the server
- port : connection port, defaulted to
DEFAULT_BROKER_PORT
if not provided - client_id : optional identifier attached to the MQTT client
auth : optional sub-dictionary containing the authentication information
- username : login
- password : guess what...
listened_topics : optional list of MQTT topics or topic patterns the gateway must subscribe
Parameters: cfg (dict) – configuration data as a dictionary
-
start
()¶ Starts the service object.
-
stop
()¶ Stops the service object
Extension service¶
The extension includes a service, which daemon is provided as the mqttgwd.py script, usually deployed in /opt/cstbox/bin directory.