Skip to content

Mqtt

The mqtt sink connects to an MQTT broker to publish messages to specified topic. It supports configuration of basic attributes like the broker’s host and port and topic filter. Additionally it supports basic authentication, TLS as well as OAuth.

MqttSinkConfigBuilder.builder()
     .withAddress("mqtt.example.com", 1883)
     .withIdentifier("Tests")
     .withWebsocketBuilder(websocket -> {
             websocket.withSubpath("/topic");
     })
     .withAuthBuilder(auth -> {
         auth.withUsername("admin");
         auth.withPassword("admin");
     })
sink:
  mqtt:
    address: "mqtt.example.com:1883"
    identifier: "Tests"
    websocket:
      subpath: /topic

Java dependency management

Add this declaration to your dependency management system to access the configuration DSL for this plugin in Java.

<dependency>
    <groupId>org.voltdb</groupId>
    <artifactId>volt-stream-plugin-mqtt-api</artifactId>
    <version>1.0-20250910-124207-release-1.5.3</version>
</dependency>
implementation group: 'org.voltdb', name: 'volt-stream-plugin-mqtt-api', version: '1.0-20250910-124207-release-1.5.3'

Properties

identifier

Type: string

address

Sets the address of the MQTT broker to connect to in host:port format. When omitted the port is set to 1883. Required.

Type: object

Default value: :1883

Fields of address:

address.host

Type: string

address.port

Type: number

address.hasBracketlessColons

Type: boolean

websocket

Configuration for a WebSocket transport to use by MQTT clients. Type: object

Fields of websocket:

websocket.subpath

The path that should be used when connecting to websocket endpoint. Type: string

ssl

Secure transport configuration. Type: object

Fields of ssl:

ssl.trustStoreFile

Truststore file or trusted CA certificate; supported formats include JKS, PKCS#12, or PEM. Type: string

ssl.trustStorePassword

Truststore password. Type: string

ssl.keyStoreFile

Keystore file; supported formats include JKS, PKCS#12, or PEM Type: string

ssl.keyStorePassword

Keystore password. Type: string

ssl.keyPassword

Private key password. Optional — if not set, the key store password will be used. Type: string

ssl.insecure

If set to true, disables SSL certificate and hostname validation. Intended for debugging purposes only. Doesn't work with mTLS.

Type: boolean

ssl.hostnameVerifier

Custom hostname verifier for SSL connections. If not specified and 'insecure' is true, hostname verification will be disabled.

Type: object

auth

If not specified no authentication is used. Type: object

Fields of auth:

auth.username

Username used for authentication. Type: string

auth.password

Password used for authentication. Type: string

connect

General connection options such as keep alive and user properties to pass to client. Type: object

Fields of connect:

connect.userProperties

List of key value paris that will be passed to MQTT client as user properties Type: array

connect.keepAlive

Sets the keep alive property, must be in the range from 0 to 2^16 -1 (unsigned short) Type: object

Unit: unsigned short

reconnect

Configuration for an exponential backoff reconnect strategy. Type: object

Fields of reconnect:

reconnect.initialDelay

The initial delay before attempting to reconnect. Type: object

reconnect.maxDelay

Whatever the delay value that backoff strategy arrives at it will be capped by the maxDelay specified. Type: object

oauth

Configures the OAuth details. Type: object

Fields of oauth:

oauth.scopes

Type: array

oauth.tokenUrl

Type: string

oauth.clientId

Type: string

oauth.clientSecret

Type: string

oauth.grantType

Type: string

Default value: client_credentials

oauth.username

Type: string

oauth.password

Type: string

retry

Type: object

Fields of retry:

retry.retries

Number of retry attempts after a request failure. Type: number

Default value: 3

retry.backoffDelay

Initial delay before the first retry attempt. Type: object

Default value: PT0.2S

retry.maxBackoffDelay

Maximum delay between consecutive retry attempts. Type: object

Default value: PT3S

exceptionHandler

Custom exception handler enabling interception of all errors related to this sink. Type: object

Usage Examples

stream
    .withName("Send data to Mqtt")
    .consumeFromSource(Sources.collection(SOURCE_MESSAGE))
    .processWith(message -> MqttPublishMessage
            .builder(topic, MqttMessageQoS.AT_LEAST_ONCE)
            .withPayload(ByteBuffer.wrap(message.getBytes(StandardCharsets.UTF_8)))
            .build())
    .terminateWithSink(
            MqttSinkConfigBuilder.builder()
                    .withAddress("mqtt.example.com", 1883)
                    .withSslBuilder(ssl -> {
                        ssl.withIgnoreHostnameValidation(true);
                    })
                    .withOauthBuilder(oauth -> {
                        oauth.withClientId("admin");
                        oauth.withTokenUrl("...");
                    })
    );

This configuration connects to an MQTT broker at mqtt.example.com on port 1883 using websockets and oauth.

sink:
   mqtt:
       address: "mqtt.example.com:1883"
       auth:
           username: "admin
           password: "admin
       ssl:
           ignoreHostnameValidation: true
       websocket:
           subpath: /topic

This configuration connects to an MQTT broker at mqtt.example.com on port 1883 using websockets and authentication.