Mqtt¶
The mqtt
source connects to an MQTT broker to consume publish messages from specified topics.
It supports configuration of basic attributes like the broker’s host and port, topic filter,
shared subscription group name, and message Quality of Service (QoS) level. Additionally it supports
basic authentication, TLS as well as OAuth.
Consumed messages are represented by the MqttPublishMessage
record, which encapsulates MQTT-specific
attributes such as the payload, retain flag, message expiry, payload format, and user properties.
MQTT source uses shared subscription to distribute incoming messages between all the workers.
MqttSourceConfigBuilder.builder()
.withGroupName("group1")
.withTopicFilter("sensors/#")
.withAddressHost("localhost")
.withAddressPort(61616)
.withQos(MqttMessageQoS.AT_LEAST_ONCE)
source:
mqtt:
address: "mqtt.example.com:1883"
topicFilter: "sensors/#"
groupName: "group1"
qos: "AT_LEAST_ONCE"
Properties¶
identifier
¶
Identifier of the client used to connect to brokers. If no value was provided it will use a random UUID with voltsp-source-
prefix.
Type: string
address
¶
Sets the address of the MQTT broker to connect to in host:port
format. When omitted the port is set to 1883
.
Required.
Type: object
Default value: 1883
Fields of address
:
address.host
¶
Type: string
address.port
¶
Type: number
address.hasBracketlessColons
¶
Type: boolean
topicFilter
¶
Specifies the MQTT topic filter to subscribe to, supporting wildcards for flexible subscriptions. Required.
Type: string
groupName
¶
Shared subscription group name. Required.
Type: string
websocket
¶
Configuration for a WebSocket transport to use by MQTT clients.
Type: object
Fields of websocket
:
websocket.subpath
¶
The path that should be used when connecting to websocket endpoint.
Type: string
qos
¶
Quality of Service (QoS) according to the MQTT specification.
Type: object
Supported values: at_most_once
, at_least_once
, exactly_once
.
Default value: AT_LEAST_ONCE
ssl
¶
Secure transport configuration.
Type: object
Fields of ssl
:
ssl.keystoreFile
¶
Type: string
ssl.keystorePassword
¶
Type: string
ssl.keyPassword
¶
Type: string
ssl.pemEncodedFile
¶
Type: string
ssl.truststoreFile
¶
Type: string
ssl.truststorePassword
¶
Type: string
ssl.insecure
¶
Disables certificate checking and hostname validation. Typically used for debugging connection issues.
Type: boolean
ssl.ignoreHostnameValidation
¶
Type: boolean
auth
¶
If not specified no authentication is used.
Type: object
Fields of auth
:
auth.username
¶
The username to present to the broker.
Type: string
auth.password
¶
The password to present to the broker.
Type: string
connect
¶
Additional connect options.
Type: object
Fields of connect
:
connect.userProperties
¶
List of key value paris that will be passed to MQTT client as user properties
Type: array
connect.keepAlive
¶
Sets the keep alive property, must be in the range from 0 to 2^16 -1 (unsigned short)
Type: object
Unit: unsigned short
reconnect
¶
Automatic reconnect strategy using an exponential backoff with configurable initial and maximum delays.
Type: object
Fields of reconnect
:
reconnect.initialDelay
¶
The initial delay before attempting to reconnect.
Type: object
reconnect.maxDelay
¶
Whatever the delay value that backoff strategy arrives at it will be capped by the maxDelay
specified.
Type: object
oauth
¶
Configures the OAuth details.
Type: object
Fields of oauth
:
oauth.scopes
¶
Type: array
oauth.tokenUrl
¶
Type: string
oauth.clientId
¶
Type: string
oauth.clientSecret
¶
Type: string
oauth.grantType
¶
Type: string
Default value: client_credentials
oauth.username
¶
Type: string
oauth.password
¶
Type: string
Java dependency management¶
Add this declaration to your dependency management system to access the configuration DSL for this plugin in Java.
<dependency>
<groupId>org.voltdb</groupId>
<artifactId>volt-stream-plugin-mqtt-api</artifactId>
<version>1.4.0</version>
</dependency>
implementation group: 'org.voltdb', name: 'volt-stream-plugin-mqtt-api', version: '1.4.0'
Usage Examples¶
stream
.withName("Read data from Mqtt and convert payload to string")
.consumeFromSource(
MqttSourceConfigBuilder.builder()
.withAddress("mqtt.example.com", "1883")
.withGroupName("device-group")
.withTopicFilter("devices/+/data")
.withQos(MqttMessageQoS.AT_LEAST_ONCE)
)
.processWith(mqttPublishMessage -> mqttPublishMessage.payload())
.processWith(readOnlyBuffer -> {
ByteBuffer tempBuffer = readOnlyBuffer.duplicate();
byte[] bytes = new byte[tempBuffer.remaining()];
tempBuffer.get(bytes);
return new String(bytes);
})
.terminateWithSink(sink);
This configuration connects to an MQTT broker at mqtt.example.com
on port 1883
subscribes to topics matching devices/+/data
,
and processes messages with an at-least-once QoS level.
source:
mqtt:
address: "mqtt.example.com:1883'
topicFilter: "devices/+/data"
groupName: "device-group"
qos: "AT_LEAST_ONCE"
This configuration connects to an MQTT broker at mqtt.example.com
on port 1883
subscribes to topics matching devices/+/data
,
and processes messages with an at-least-once QoS level.