Overview
MQTT 5.0, published by OASIS in 2019, is the latest version of the MQTT protocol with significant enhancements over MQTT 3.1.1. HiveMQ CE provides complete support for all MQTT 5.0 features, making it ideal for modern IoT applications.
MQTT 5.0 is not backward compatible with MQTT 3.x, but HiveMQ CE supports both versions simultaneously, automatically detecting the client’s protocol version.
Key Enhancements Over MQTT 3.1.1
User Properties Add custom key-value metadata to any packet for application-specific data.
Reason Codes Detailed error and success codes for better diagnostics and error handling.
Request-Response Built-in pattern for request/response communication.
Shared Subscriptions Load balance messages across multiple subscribers.
Topic Aliases Reduce bandwidth by replacing long topic names with integers.
Message Expiry Automatically expire messages that are too old.
User Properties
User properties allow you to attach custom metadata to MQTT packets:
import paho.mqtt.client as mqtt
from paho.mqtt.properties import Properties
from paho.mqtt.packettypes import PacketTypes
client = mqtt.Client( protocol = mqtt.MQTTv5)
client.connect( "localhost" , 1883 , 60 )
# Create properties
properties = Properties(PacketTypes. PUBLISH )
properties.UserProperty = [( "source" , "sensor1" ), ( "location" , "warehouse" )]
# Publish with user properties
client.publish(
"sensors/temperature" ,
"22.5" ,
properties = properties
)
Use cases:
Message routing metadata
Correlation data
Application-specific flags
Tracking information
Reason Codes and Strings
MQTT 5.0 provides detailed reason codes for better error handling:
Common Reason Codes
Code Name Description 0 Success Operation completed successfully 16 No matching subscribers No subscribers for the published topic 128 Unspecified error Generic error 129 Malformed packet Invalid packet structure 130 Protocol error Protocol violation 131 Implementation specific error Broker-specific error 135 Not authorized Client lacks authorization 144 Topic name invalid Invalid topic format 145 Packet identifier in use Duplicate packet ID 151 Quota exceeded Message or connection quota reached
def on_publish ( client , userdata , mid , reason_codes , properties ):
if reason_codes[ 0 ] == 0 :
print ( "Message published successfully" )
elif reason_codes[ 0 ] == 16 :
print ( "No subscribers for this topic" )
elif reason_codes[ 0 ] == 135 :
print ( "Not authorized to publish to this topic" )
client.on_publish = on_publish
Reason strings provide human-readable descriptions alongside reason codes for easier debugging.
Request-Response Pattern
MQTT 5.0 includes built-in support for request-response communication:
import paho.mqtt.client as mqtt
from paho.mqtt.properties import Properties
from paho.mqtt.packettypes import PacketTypes
import uuid
def on_message ( client , userdata , message ):
# Process request
request_data = message.payload.decode()
# Extract response topic from message properties
response_topic = None
correlation_data = None
if message.properties:
response_topic = message.properties.ResponseTopic
correlation_data = message.properties.CorrelationData
if response_topic:
# Send response
response_props = Properties(PacketTypes. PUBLISH )
if correlation_data:
response_props.CorrelationData = correlation_data
response = f "Processed: { request_data } "
client.publish(response_topic, response, properties = response_props)
# Requester side
client = mqtt.Client( protocol = mqtt.MQTTv5)
client.connect( "localhost" , 1883 , 60 )
# Subscribe to response topic
response_topic = f "responses/ { uuid.uuid4() } "
client.subscribe(response_topic)
# Send request with response topic
request_props = Properties(PacketTypes. PUBLISH )
request_props.ResponseTopic = response_topic
request_props.CorrelationData = b "req-123"
client.publish( "requests/process" , "data to process" , properties = request_props)
Shared Subscriptions
Load balance messages across multiple subscribers:
# Multiple consumers subscribe to the same shared subscription
# Messages are distributed among them
# Consumer 1
client1 = mqtt.Client( protocol = mqtt.MQTTv5)
client1.connect( "localhost" , 1883 , 60 )
client1.subscribe( "$share/group1/orders/#" )
# Consumer 2
client2 = mqtt.Client( protocol = mqtt.MQTTv5)
client2.connect( "localhost" , 1883 , 60 )
client2.subscribe( "$share/group1/orders/#" )
# Consumer 3
client3 = mqtt.Client( protocol = mqtt.MQTTv5)
client3.connect( "localhost" , 1883 , 60 )
client3.subscribe( "$share/group1/orders/#" )
# Messages published to orders/# are distributed round-robin
Syntax: $share/{group_name}/{topic_filter}
Use cases:
Horizontal scaling of message processing
Load balancing across workers
High-throughput message handling
Use shared subscriptions to build scalable message processing systems without external load balancers.
Topic Aliases
Reduce bandwidth by replacing long topic names with 2-byte integers:
from paho.mqtt.properties import Properties
from paho.mqtt.packettypes import PacketTypes
client = mqtt.Client( protocol = mqtt.MQTTv5)
client.connect( "localhost" , 1883 , 60 )
# First publish: establish alias
properties = Properties(PacketTypes. PUBLISH )
properties.TopicAlias = 1
client.publish(
"sensors/factory/line1/machine5/temperature" ,
"22.5" ,
properties = properties
)
# Subsequent publishes: use alias instead of full topic
properties = Properties(PacketTypes. PUBLISH )
properties.TopicAlias = 1
client.publish( "" , "23.1" , properties = properties) # Empty topic uses alias
Benefits:
Reduced bandwidth for repeated publishes
Lower overhead for constrained devices
Faster message processing
Message Expiry Interval
Automatically expire messages that are too old:
from paho.mqtt.properties import Properties
from paho.mqtt.packettypes import PacketTypes
client = mqtt.Client( protocol = mqtt.MQTTv5)
client.connect( "localhost" , 1883 , 60 )
properties = Properties(PacketTypes. PUBLISH )
properties.MessageExpiryInterval = 300 # 5 minutes
client.publish(
"time-sensitive/command" ,
"execute_now" ,
qos = 1 ,
properties = properties
)
Use cases:
Time-sensitive commands
Real-time data that becomes stale
Preventing queued message buildup
Expired messages are not delivered to subscribers. Use this feature carefully for critical messages.
Session Expiry Interval
Control how long the broker maintains session state after disconnection:
from paho.mqtt.properties import Properties
from paho.mqtt.packettypes import PacketTypes
client = mqtt.Client( protocol = mqtt.MQTTv5, clean_session = False )
# Set session expiry in CONNECT properties
connect_props = Properties(PacketTypes. CONNECT )
connect_props.SessionExpiryInterval = 3600 # 1 hour
client.connect(
"localhost" ,
1883 ,
60 ,
properties = connect_props
)
0 (default): Session expires immediately on disconnect
> 0 : Session maintained for specified seconds
0xFFFFFFFF : Session never expires
Enhanced Authentication
MQTT 5.0 supports SASL-style challenge-response authentication:
from paho.mqtt.properties import Properties
from paho.mqtt.packettypes import PacketTypes
client = mqtt.Client( protocol = mqtt.MQTTv5)
def on_connect ( client , userdata , flags , reason_code , properties ):
if reason_code == 24 : # Continue authentication
# Handle authentication challenge
auth_props = Properties(PacketTypes. AUTH )
auth_props.AuthenticationMethod = "SCRAM-SHA-256"
auth_props.AuthenticationData = b "response_data"
client.publish( None , None , properties = auth_props)
client.on_connect = on_connect
# Initiate authentication
connect_props = Properties(PacketTypes. CONNECT )
connect_props.AuthenticationMethod = "SCRAM-SHA-256"
connect_props.AuthenticationData = b "initial_data"
client.connect( "localhost" , 1883 , 60 , properties = connect_props)
Subscription Options
MQTT 5.0 provides fine-grained subscription control:
import paho.mqtt.client as mqtt
import paho.mqtt.subscribeoptions as SubscribeOptions
client = mqtt.Client( protocol = mqtt.MQTTv5)
client.connect( "localhost" , 1883 , 60 )
# Subscribe with options
subscribe_options = SubscribeOptions( qos = 1 )
subscribe_options.noLocal = True # Don't receive own messages
subscribe_options.retainAsPublished = True # Preserve retain flag
subscribe_options.retainHandling = 0 # Send retained messages on subscribe
client.subscribe(
"sensors/#" ,
options = subscribe_options
)
Subscription Options
Option Description noLocal If true, don’t receive messages published by this client retainAsPublished If true, preserve retain flag in forwarded messages retainHandling 0 = send retained on subscribe, 1 = send if new subscription, 2 = never send
Server Keep-Alive
Broker can override client’s keep-alive value:
def on_connect ( client , userdata , flags , reason_code , properties ):
if properties and hasattr (properties, 'ServerKeepAlive' ):
keep_alive = properties.ServerKeepAlive
print ( f "Server keep-alive: { keep_alive } seconds" )
client.on_connect = on_connect
Content Type and Payload Format Indicator
Indicate message content type:
from paho.mqtt.properties import Properties
from paho.mqtt.packettypes import PacketTypes
properties = Properties(PacketTypes. PUBLISH )
properties.PayloadFormatIndicator = 1 # UTF-8 string
properties.ContentType = "application/json"
client.publish(
"sensors/data" ,
'{"temperature": 22.5}' ,
properties = properties
)
Maximum Packet Size
Negotiate maximum packet size:
from paho.mqtt.properties import Properties
from paho.mqtt.packettypes import PacketTypes
connect_props = Properties(PacketTypes. CONNECT )
connect_props.MaximumPacketSize = 10240 # 10 KB
client.connect( "localhost" , 1883 , 60 , properties = connect_props)
Server-Sent DISCONNECT
In MQTT 5.0, the broker can send DISCONNECT packets with reason codes:
def on_disconnect ( client , userdata , reason_code , properties ):
if reason_code == 142 : # Session taken over
print ( "Session taken over by another connection" )
elif reason_code == 149 : # Keep alive timeout
print ( "Keep-alive timeout" )
elif reason_code == 151 : # Quota exceeded
print ( "Quota exceeded" )
client.on_disconnect = on_disconnect
Complete MQTT 5 Example
import paho.mqtt.client as mqtt
from paho.mqtt.properties import Properties
from paho.mqtt.packettypes import PacketTypes
import json
import time
def on_connect ( client , userdata , flags , reason_code , properties ):
if reason_code == 0 :
print ( "Connected with MQTT 5.0" )
# Print server properties
if properties:
print ( f "Server Keep-Alive: { properties.ServerKeepAlive if hasattr (properties, 'ServerKeepAlive' ) else 'default' } " )
# Subscribe with options
client.subscribe( "sensors/#" , qos = 1 )
else :
print ( f "Connection failed: { reason_code } " )
def on_message ( client , userdata , message ):
print ( f " \n Received message:" )
print ( f " Topic: { message.topic } " )
print ( f " Payload: { message.payload.decode() } " )
print ( f " QoS: { message.qos } " )
# Access MQTT 5 properties
if message.properties:
if hasattr (message.properties, 'UserProperty' ):
print ( f " User Properties: { message.properties.UserProperty } " )
if hasattr (message.properties, 'ContentType' ):
print ( f " Content Type: { message.properties.ContentType } " )
# Create MQTT 5 client
client = mqtt.Client(
client_id = "mqtt5_example" ,
protocol = mqtt.MQTTv5,
clean_session = None # Use MQTT 5 clean start flag
)
client.on_connect = on_connect
client.on_message = on_message
# Set CONNECT properties
connect_props = Properties(PacketTypes. CONNECT )
connect_props.SessionExpiryInterval = 3600 # 1 hour
connect_props.ReceiveMaximum = 100
connect_props.MaximumPacketSize = 1048576 # 1 MB
# Connect with properties
client.connect(
"localhost" ,
1883 ,
keepalive = 60 ,
properties = connect_props
)
client.loop_start()
# Publish with MQTT 5 features
for i in range ( 5 ):
publish_props = Properties(PacketTypes. PUBLISH )
publish_props.UserProperty = [( "sensor_id" , "temp_01" ), ( "location" , "room_a" )]
publish_props.ContentType = "application/json"
publish_props.MessageExpiryInterval = 300
publish_props.PayloadFormatIndicator = 1
payload = json.dumps({
"temperature" : 20 + i * 0.5 ,
"timestamp" : int (time.time())
})
client.publish(
"sensors/temperature" ,
payload,
qos = 1 ,
properties = publish_props
)
time.sleep( 2 )
time.sleep( 5 )
client.disconnect()
client.loop_stop()
Migration from MQTT 3.1.1
Breaking Changes
Clean Session → Clean Start + Session Expiry Interval
Return codes replaced with reason codes
Some packet formats changed
Migration Tips
Test clients with both MQTT 3.1.1 and MQTT 5 endpoints
Update error handling to use reason codes
Leverage new features incrementally
Use user properties instead of embedding metadata in payload
HiveMQ CE supports both MQTT 3.x and MQTT 5 simultaneously, allowing gradual migration.
Next Steps
MQTT Features Detailed feature documentation
Transport Protocols Configure transports for MQTT 5
Connecting Clients MQTT 5 client examples
Extensions Extend MQTT 5 functionality