MQTT Translator
Status: Production — The MQTT translator is production-ready, built on mqtt.js v5.
The MQTT translator enables Conduit to subscribe to MQTT topics, bringing data from IoT devices, gateways, and message brokers into the context mesh. The translator integrates with the query planner for DAG-based multi-source execution, and connector templates are available for common broker configurations.
Overview
The MQTT translator supports:
- MQTT 5.0: Full protocol support via mqtt.js v5
- Wildcard Subscriptions:
+and#wildcards - Sparkplug B: Native Sparkplug decoding
- JSON Extraction: JSONPath for nested values
- QoS Levels: 0, 1, and 2
- TLS/SSL: Secure connections
Prerequisites
- MQTT Broker: Any compliant broker (Mosquitto, HiveMQ, EMQX, etc.)
- Network Access: Reach the broker (typically port 1883 or 8883)
- Topic Structure: Understanding of your topic hierarchy
Configuration
Basic Configuration
translator:
type: mqtt
name: mqtt-plant-floor
broker:
host: mqtt.company.com
port: 1883
protocol: tcp # or ssl
credentials:
username: ${MQTT_USER}
password: ${MQTT_PASS}
subscriptions:
- topic: plant/+/temperature
qos: 1
TLS Configuration
broker:
host: mqtt.company.com
port: 8883
protocol: ssl
tls:
ca: /etc/conduit/certs/ca.pem
cert: /etc/conduit/certs/client.pem
key: /etc/conduit/certs/client.key
verify: true
Subscriptions
Simple Subscriptions
subscriptions:
- topic: sensors/temperature
qos: 1
- topic: sensors/pressure
qos: 0
Wildcard Subscriptions
subscriptions:
# Single-level wildcard (+)
- topic: plant/+/temperature
# Matches: plant/area1/temperature, plant/area2/temperature
# Multi-level wildcard (#)
- topic: plant/building1/#
# Matches: plant/building1/*, plant/building1/floor1/*, etc.
QoS Levels
| QoS | Delivery | Use Case | | --- | ------------- | ----------------------------- | | 0 | At most once | Non-critical telemetry | | 1 | At least once | Important data, may duplicate | | 2 | Exactly once | Critical, ordered data |
Payload Parsing
JSON Payloads
subscriptions:
- topic: sensors/+/data
format: json
mapping:
value: "$.temperature"
timestamp: "$.ts"
quality: "$.quality"
# Extract multiple values from nested JSON
- topic: device/+/telemetry
format: json
extract:
- path: "$.sensors.temp"
name: "${topic[1]}_temperature"
- path: "$.sensors.humidity"
name: "${topic[1]}_humidity"
- path: "$.sensors.pressure"
name: "${topic[1]}_pressure"
Plain Text
subscriptions:
- topic: simple/temperature
format: text
dataType: float
Binary
subscriptions:
- topic: binary/sensor
format: binary
structure:
- name: temperature
type: float32
offset: 0
- name: pressure
type: float32
offset: 4
- name: status
type: uint8
offset: 8
Sparkplug B
Native support for the Sparkplug B industrial IoT specification:
subscriptions:
- topic: spBv1.0/#
format: sparkplug
sparkplug:
groupId: Plant1
edgeNodeId: Gateway1
metrics:
include:
- "Temperature/*"
- "Pressure/*"
exclude:
- "*/Internal/*"
Sparkplug Message Types
The translator handles all Sparkplug message types:
| Type | Description | | ------ | ------------------------ | | NBIRTH | Node birth certificate | | DBIRTH | Device birth certificate | | NDATA | Node data | | DDATA | Device data | | NCMD | Node command | | DCMD | Device command | | NDEATH | Node death | | DDEATH | Device death |
Topic-to-Tag Mapping
Pattern-Based Naming
subscriptions:
- topic: plant/+/+/temperature
format: json
mapping:
value: "$.value"
tagName: "${topic[1]}_${topic[2]}_Temperature"
# plant/area1/tank1/temperature -> area1_tank1_Temperature
# plant/area2/tank3/temperature -> area2_tank3_Temperature
Metadata Extraction
subscriptions:
- topic: devices/+/sensors/+
metadata:
device: "${topic[1]}"
sensor: "${topic[3]}"
source: "mqtt"
Message Processing
Deduplication
processing:
deduplicate:
enabled: true
window: 1000 # ms
key: [topic, value]
Rate Limiting
processing:
rateLimit:
enabled: true
maxPerSecond: 1000
strategy: drop # or buffer
Buffering
processing:
buffer:
enabled: true
maxSize: 10000
flushInterval: 1000 # ms
Connection Management
Keep-Alive
broker:
keepAlive: 60 # seconds
cleanSession: true
Reconnection
broker:
reconnect:
enabled: true
initialDelay: 1000 # ms
maxDelay: 30000
multiplier: 2
Last Will
broker:
lastWill:
topic: conduit/translators/mqtt-plant-floor/status
message: "offline"
qos: 1
retain: true
Publishing (Optional)
Enable publishing for command responses:
publishing:
enabled: true
defaultQos: 1
defaultRetain: false
topics:
- pattern: conduit/responses/+
qos: 1
Performance Tuning
Connection Settings
broker:
connectionTimeout: 30000 # ms
maxInflight: 100
maxPacketSize: 1048576 # 1MB
Subscription Batching
subscriptions:
batchSize: 100
parallelProcessing: true
workers: 4
Troubleshooting
Connection Issues
Connection Refused
- Verify broker host and port
- Check firewall rules
- Confirm credentials
TLS Handshake Failed
- Verify certificate chain
- Check certificate expiration
- Confirm TLS version compatibility
Message Issues
No Messages Received
- Verify topic pattern matches published topics
- Check subscription QoS vs publish QoS
- Review broker ACLs
Parse Errors
- Verify payload format matches configuration
- Check JSON structure
- Review Sparkplug encoding
Performance Issues
High Latency
- Increase worker count
- Enable parallel processing
- Review message rate limits
Example: IoT Gateway Integration
translator:
type: mqtt
name: mqtt-iot-gateway
broker:
host: iot-hub.company.com
port: 8883
protocol: ssl
tls:
verify: true
ca: /etc/conduit/certs/iot-ca.pem
credentials:
username: gateway-conduit
password: ${MQTT_PASSWORD}
subscriptions:
# Temperature sensors
- topic: sensors/+/temperature
format: json
mapping:
value: "$.value"
timestamp: "$.timestamp"
tagName: "IoT_${topic[1]}_Temperature"
unit: "°C"
# Sparkplug devices
- topic: spBv1.0/Plant1/#
format: sparkplug
# Raw binary sensors
- topic: legacy/+/data
format: binary
structure:
- name: temp
type: int16
scale: 0.1
- name: humidity
type: uint8
Next Steps
- MCP IoT Gateway - Connect to field devices
- OPC-UA Translator - Standard industrial protocol
- Architecture - Translator architecture