Mqtt¶
The mqtt
source connects to an MQTT broker to consume publish messages from specified topics.
It supports configuration of basic attributes like the broker’s host and port, topic filter,
shared subscription group name, and message Quality of Service (QoS) level. Additionally it supports
basic authentication, TLS as well as OAuth.
Consumed messages are represented by the MqttPublishMessage
record, which encapsulates MQTT-specific
attributes such as the payload, retain flag, message expiry, payload format, and user properties.
MQTT source uses shared subscription to distribute incoming messages between all the workers.
MqttSourceConfigBuilder.builder()
.withGroupName("group1")
.withTopicFilter("sensors/#")
.withAddressHost("localhost")
.withAddressPort(61616)
.withQos(MqttMessageQoS.AT_LEAST_ONCE)
source:
mqtt:
address: "mqtt.example.com:1883"
topicFilter: "sensors/#"
groupName: "group1"
qos: "AT_LEAST_ONCE"
Java dependency management¶
Add this declaration to your dependency management system to access the configuration DSL for this plugin in Java.
<dependency>
<groupId>org.voltdb</groupId>
<artifactId>volt-stream-plugin-mqtt-api</artifactId>
<version>1.0-20250910-124207-release-1.5.3</version>
</dependency>
implementation group: 'org.voltdb', name: 'volt-stream-plugin-mqtt-api', version: '1.0-20250910-124207-release-1.5.3'
Properties¶
identifier
¶
Identifier of the client used to connect to brokers. If no value was provided it will use a random UUID with voltsp-source-
prefix.
Type: string
address
¶
Sets the address of the MQTT broker to connect to in host:port
format. When omitted the port is set to 1883
.
Required.
Type: object
Default value: 1883
Fields of address
:
address.host
¶
Type: string
address.port
¶
Type: number
address.hasBracketlessColons
¶
Type: boolean
topicFilter
¶
Specifies the MQTT topic filter to subscribe to, supporting wildcards for flexible subscriptions. Required.
Type: string
groupName
¶
Shared subscription group name. Required.
Type: string
websocket
¶
Configuration for a WebSocket transport to use by MQTT clients.
Type: object
Fields of websocket
:
websocket.subpath
¶
The path that should be used when connecting to websocket endpoint.
Type: string
qos
¶
Quality of Service (QoS) according to the MQTT specification.
Type: object
Supported values: at_most_once
, at_least_once
, exactly_once
.
Default value: AT_LEAST_ONCE
ssl
¶
Secure transport configuration.
Type: object
Fields of ssl
:
ssl.trustStoreFile
¶
Truststore file or trusted CA certificate; supported formats include JKS, PKCS#12, or PEM.
Type: string
ssl.trustStorePassword
¶
Truststore password.
Type: string
ssl.keyStoreFile
¶
Keystore file; supported formats include JKS, PKCS#12, or PEM
Type: string
ssl.keyStorePassword
¶
Keystore password.
Type: string
ssl.keyPassword
¶
Private key password. Optional — if not set, the key store password will be used.
Type: string
ssl.insecure
¶
If set to true, disables SSL certificate and hostname validation. Intended for debugging purposes only. Doesn't work with mTLS.
Type: boolean
ssl.hostnameVerifier
¶
Custom hostname verifier for SSL connections. If not specified and 'insecure' is true, hostname verification will be disabled.
Type: object
auth
¶
If not specified no authentication is used.
Type: object
Fields of auth
:
auth.username
¶
Username used for authentication.
Type: string
auth.password
¶
Password used for authentication.
Type: string
connect
¶
Additional connect options.
Type: object
Fields of connect
:
connect.userProperties
¶
List of key value paris that will be passed to MQTT client as user properties
Type: array
connect.keepAlive
¶
Sets the keep alive property, must be in the range from 0 to 2^16 -1 (unsigned short)
Type: object
Unit: unsigned short
reconnect
¶
Automatic reconnect strategy using an exponential backoff with configurable initial and maximum delays.
Type: object
Fields of reconnect
:
reconnect.initialDelay
¶
The initial delay before attempting to reconnect.
Type: object
reconnect.maxDelay
¶
Whatever the delay value that backoff strategy arrives at it will be capped by the maxDelay
specified.
Type: object
oauth
¶
Configures the OAuth details.
Type: object
Fields of oauth
:
oauth.scopes
¶
Type: array
oauth.tokenUrl
¶
Type: string
oauth.clientId
¶
Type: string
oauth.clientSecret
¶
Type: string
oauth.grantType
¶
Type: string
Default value: client_credentials
oauth.username
¶
Type: string
oauth.password
¶
Type: string
Usage Examples¶
stream
.withName("Read data from Mqtt and convert payload to string")
.consumeFromSource(
MqttSourceConfigBuilder.builder()
.withAddress("mqtt.example.com", "1883")
.withGroupName("device-group")
.withTopicFilter("devices/+/data")
.withQos(MqttMessageQoS.AT_LEAST_ONCE)
)
.processWith(mqttPublishMessage -> mqttPublishMessage.payload())
.processWith(readOnlyBuffer -> {
ByteBuffer tempBuffer = readOnlyBuffer.duplicate();
byte[] bytes = new byte[tempBuffer.remaining()];
tempBuffer.get(bytes);
return new String(bytes);
})
.terminateWithSink(sink);
This configuration connects to an MQTT broker at mqtt.example.com
on port 1883
subscribes to topics matching devices/+/data
,
and processes messages with an at-least-once QoS level.
source:
mqtt:
address: "mqtt.example.com:1883'
topicFilter: "devices/+/data"
groupName: "device-group"
qos: "AT_LEAST_ONCE"
This configuration connects to an MQTT broker at mqtt.example.com
on port 1883
subscribes to topics matching devices/+/data
,
and processes messages with an at-least-once QoS level.