Skip to main content
MQTT Gateway connects to an MQTT broker to receive messages. Broker configuration is stored in the database and can be updated without restarting the service.

Broker connection settings

MQTT broker configurations are stored in the mqtt_servers table in the database. The gateway automatically selects the enabled broker with the highest priority.

Server selection logic

When the gateway starts, it loads the MQTT server using this query:
# From src/mqtt_client.py:25
def load_mqtt_server(self) -> MqttServer:
    with self.session_factory() as session:
        stmt = (
            select(MqttServer)
            .where(MqttServer.enabled.is_(True))
            .order_by(MqttServer.is_default.desc(), MqttServer.id.asc())
        )
        server = session.execute(stmt).scalars().first()
        if not server:
            raise RuntimeError("No enabled MQTT server found in mqtt_servers")
        return server
The gateway prioritizes servers where is_default=True, then by lowest id.

Server parameters

host
string
required
MQTT broker hostname or IP address.Example: 192.168.0.137
port
integer
default:"1883"
MQTT broker port number.Common ports:
  • 1883 - Standard MQTT
  • 8883 - MQTT over TLS
Example: 1883
username
string
Optional username for MQTT broker authentication.Leave empty if the broker doesn’t require authentication.
password
string
Optional password for MQTT broker authentication.Leave empty if the broker doesn’t require authentication.
enabled
boolean
default:"true"
Whether this MQTT server configuration is active.Set to false to disable without deleting the configuration.
is_default
boolean
default:"true"
Whether this is the default MQTT server.When multiple enabled servers exist, the gateway uses the one marked as default.

Authentication

If your MQTT broker requires authentication, set both username and password fields:
# From src/mqtt_client.py:107
if server.username and server.password:
    self.client.username_pw_set(server.username, server.password)
Both fields must be set for authentication to be used. If either is missing, the gateway connects without credentials.

Connection parameters

The MQTT client connection is configured using environment variables:
MQTT_CLIENT_ID
string
default:"mqtt-gateway"
Unique client identifier for the MQTT connection.See environment variables for details.
MQTT_KEEPALIVE
integer
default:"60"
Keepalive interval in seconds for maintaining the MQTT connection.See environment variables for details.

Connection establishment

The gateway establishes the MQTT connection at startup:
# From src/mqtt_client.py:112
self.client.connect(server.host, server.port, keepalive=self.settings.mqtt_keepalive)
self.client.loop_start()
The connection runs in a background thread, allowing the main thread to handle flow reloading.

Topic subscriptions

The gateway automatically subscribes to MQTT topics based on enabled flows in the database.

Initial subscription

When the connection is established, the gateway subscribes to all topics from enabled flows:
# From src/mqtt_client.py:65
def on_connect(self, client: mqtt.Client, userdata: Any, flags: Any, reason_code: Any, properties: Any) -> None:
    if reason_code != 0:
        self.logger.error("MQTT connection failed with reason code: %s", reason_code)
        return

    subscriptions = set(topic for topic, _ in self.flow_topics)
    for topic in subscriptions:
        client.subscribe(topic)

Dynamic subscription updates

The gateway periodically reloads flows from the database and updates subscriptions:
# From src/mqtt_client.py:55
def reload_flows(self) -> None:
    flows = self.load_flows()
    new_flow_topics = self._topic_to_flow_ids(flows)

    previous_topics = set(topic for topic, _ in self.flow_topics)
    current_topics = set(topic for topic, _ in new_flow_topics)

    self.flow_topics = new_flow_topics
    self._sync_subscriptions(previous_topics, current_topics)
This allows you to add or remove topic subscriptions by updating flows in the database without restarting the gateway.
The reload interval is controlled by the FLOWS_RELOAD_INTERVAL_SECONDS environment variable (default: 600 seconds).

Topic matching

The gateway supports MQTT wildcard patterns in topic subscriptions:
  • + - Single-level wildcard (matches one topic level)
  • # - Multi-level wildcard (matches zero or more topic levels)
Example topics:
  • sensors/temperature - Exact match
  • sensors/+/temperature - Matches sensors/kitchen/temperature, sensors/bedroom/temperature
  • sensors/# - Matches all topics under sensors/
When a message arrives, the gateway matches it against all flow topics:
# From src/mqtt_client.py:84
matched_flow_ids = [
    flow_id for sub_topic, flow_id in self.flow_topics if topic_matches_sub(sub_topic, topic)
]

Message processing

Incoming messages must be valid JSON objects:
# From src/mqtt_client.py:74
def on_message(self, client: mqtt.Client, userdata: Any, msg: mqtt.MQTTMessage) -> None:
    topic = msg.topic
    try:
        payload = json.loads(msg.payload.decode("utf-8"))
        if not isinstance(payload, dict):
            raise ValueError("Payload must be a JSON object")
    except (json.JSONDecodeError, UnicodeDecodeError, ValueError) as exc:
        self.logger.error("Invalid payload in topic %s: %s", topic, exc)
        return
Messages with invalid JSON or non-object payloads are logged and discarded. Ensure your MQTT publishers send valid JSON objects.

Connection error handling

If the connection fails, the error is logged but the gateway continues running:
# From src/mqtt_client.py:66
if reason_code != 0:
    self.logger.error("MQTT connection failed with reason code: %s", reason_code)
    return
The Paho MQTT client library automatically attempts to reconnect using its built-in reconnection logic.

Build docs developers (and LLMs) love