Mqtt¶
The mqtt
sink connects to an MQTT broker to publish messages to specified topic.
It supports configuration of basic attributes like the broker’s host and port and topic filter.
Additionally it supports basic authentication, TLS as well as OAuth.
MqttSinkConfigBuilder.builder()
.withAddress("mqtt.example.com", 1883)
.withIdentifier("Tests")
.withWebsocketBuilder(websocket -> {
websocket.withSubpath("/topic");
})
.withAuthBuilder(auth -> {
auth.withUsername("admin");
auth.withPassword("admin");
})
sink:
mqtt:
address: "mqtt.example.com:1883"
identifier: "Tests"
websocket:
subpath: /topic
Java dependency management¶
Add this declaration to your dependency management system to access the configuration DSL for this plugin in Java.
<dependency>
<groupId>org.voltdb</groupId>
<artifactId>volt-stream-plugin-mqtt-api</artifactId>
<version>1.0-20250910-124207-release-1.5.3</version>
</dependency>
implementation group: 'org.voltdb', name: 'volt-stream-plugin-mqtt-api', version: '1.0-20250910-124207-release-1.5.3'
Properties¶
identifier
¶
Type: string
address
¶
Sets the address of the MQTT broker to connect to in host:port
format. When omitted the port is set to 1883
.
Required.
Type: object
Default value: :1883
Fields of address
:
address.host
¶
Type: string
address.port
¶
Type: number
address.hasBracketlessColons
¶
Type: boolean
websocket
¶
Configuration for a WebSocket transport to use by MQTT clients.
Type: object
Fields of websocket
:
websocket.subpath
¶
The path that should be used when connecting to websocket endpoint.
Type: string
ssl
¶
Secure transport configuration.
Type: object
Fields of ssl
:
ssl.trustStoreFile
¶
Truststore file or trusted CA certificate; supported formats include JKS, PKCS#12, or PEM.
Type: string
ssl.trustStorePassword
¶
Truststore password.
Type: string
ssl.keyStoreFile
¶
Keystore file; supported formats include JKS, PKCS#12, or PEM
Type: string
ssl.keyStorePassword
¶
Keystore password.
Type: string
ssl.keyPassword
¶
Private key password. Optional — if not set, the key store password will be used.
Type: string
ssl.insecure
¶
If set to true, disables SSL certificate and hostname validation. Intended for debugging purposes only. Doesn't work with mTLS.
Type: boolean
ssl.hostnameVerifier
¶
Custom hostname verifier for SSL connections. If not specified and 'insecure' is true, hostname verification will be disabled.
Type: object
auth
¶
If not specified no authentication is used.
Type: object
Fields of auth
:
auth.username
¶
Username used for authentication.
Type: string
auth.password
¶
Password used for authentication.
Type: string
connect
¶
General connection options such as keep alive and user properties to pass to client.
Type: object
Fields of connect
:
connect.userProperties
¶
List of key value paris that will be passed to MQTT client as user properties
Type: array
connect.keepAlive
¶
Sets the keep alive property, must be in the range from 0 to 2^16 -1 (unsigned short)
Type: object
Unit: unsigned short
reconnect
¶
Configuration for an exponential backoff reconnect strategy.
Type: object
Fields of reconnect
:
reconnect.initialDelay
¶
The initial delay before attempting to reconnect.
Type: object
reconnect.maxDelay
¶
Whatever the delay value that backoff strategy arrives at it will be capped by the maxDelay
specified.
Type: object
oauth
¶
Configures the OAuth details.
Type: object
Fields of oauth
:
oauth.scopes
¶
Type: array
oauth.tokenUrl
¶
Type: string
oauth.clientId
¶
Type: string
oauth.clientSecret
¶
Type: string
oauth.grantType
¶
Type: string
Default value: client_credentials
oauth.username
¶
Type: string
oauth.password
¶
Type: string
retry
¶
Type: object
Fields of retry
:
retry.retries
¶
Number of retry attempts after a request failure.
Type: number
Default value: 3
retry.backoffDelay
¶
Initial delay before the first retry attempt.
Type: object
Default value: PT0.2S
retry.maxBackoffDelay
¶
Maximum delay between consecutive retry attempts.
Type: object
Default value: PT3S
exceptionHandler
¶
Custom exception handler enabling interception of all errors related to this sink.
Type: object
Usage Examples¶
stream
.withName("Send data to Mqtt")
.consumeFromSource(Sources.collection(SOURCE_MESSAGE))
.processWith(message -> MqttPublishMessage
.builder(topic, MqttMessageQoS.AT_LEAST_ONCE)
.withPayload(ByteBuffer.wrap(message.getBytes(StandardCharsets.UTF_8)))
.build())
.terminateWithSink(
MqttSinkConfigBuilder.builder()
.withAddress("mqtt.example.com", 1883)
.withSslBuilder(ssl -> {
ssl.withIgnoreHostnameValidation(true);
})
.withOauthBuilder(oauth -> {
oauth.withClientId("admin");
oauth.withTokenUrl("...");
})
);
This configuration connects to an MQTT broker at mqtt.example.com
on port 1883
using websockets and oauth.
sink:
mqtt:
address: "mqtt.example.com:1883"
auth:
username: "admin
password: "admin
ssl:
ignoreHostnameValidation: true
websocket:
subpath: /topic
This configuration connects to an MQTT broker at mqtt.example.com
on port 1883
using websockets and authentication.