Skip to content

Mqtt

The mqtt source connects to an MQTT broker to consume publish messages from specified topics. It supports configuration of basic attributes like the broker’s host and port, topic filter, shared subscription group name, and message Quality of Service (QoS) level. Additionally it supports basic authentication, TLS as well as OAuth.

Consumed messages are represented by the MqttPublishMessage record, which encapsulates MQTT-specific attributes such as the payload, retain flag, message expiry, payload format, and user properties.

MQTT source uses shared subscription to distribute incoming messages between all the workers.

MqttSourceConfigBuilder.builder()
     .withGroupName("group1")
     .withTopicFilter("sensors/#")
     .withAddressHost("localhost")
     .withAddressPort(61616)
     .withQos(MqttMessageQoS.AT_LEAST_ONCE)
source:
   mqtt:
     address: "mqtt.example.com:1883"
     topicFilter: "sensors/#"
     groupName: "group1"
     qos: "AT_LEAST_ONCE"

Properties

identifier

Identifier of the client used to connect to brokers. If no value was provided it will use a random UUID with voltsp-source- prefix. Type: string

address

Sets the address of the MQTT broker to connect to in host:port format. When omitted the port is set to 1883. Required.

Type: object

Default value: 1883

Fields of address:

address.host

Type: string

address.port

Type: number

address.hasBracketlessColons

Type: boolean

topicFilter

Specifies the MQTT topic filter to subscribe to, supporting wildcards for flexible subscriptions. Required.

Type: string

groupName

Shared subscription group name. Required.

Type: string

websocket

Configuration for a WebSocket transport to use by MQTT clients. Type: object

Fields of websocket:

websocket.subpath

The path that should be used when connecting to websocket endpoint. Type: string

qos

Quality of Service (QoS) according to the MQTT specification. Type: object

Supported values: at_most_once, at_least_once, exactly_once.

Default value: AT_LEAST_ONCE

ssl

Secure transport configuration. Type: object

Fields of ssl:

ssl.keystoreFile

Type: string

ssl.keystorePassword

Type: string

ssl.keyPassword

Type: string

ssl.pemEncodedFile

Type: string

ssl.truststoreFile

Type: string

ssl.truststorePassword

Type: string

ssl.insecure

Disables certificate checking and hostname validation. Typically used for debugging connection issues. Type: boolean

ssl.ignoreHostnameValidation

Type: boolean

auth

If not specified no authentication is used. Type: object

Fields of auth:

auth.username

The username to present to the broker. Type: string

auth.password

The password to present to the broker. Type: string

connect

Additional connect options. Type: object

Fields of connect:

connect.userProperties

List of key value paris that will be passed to MQTT client as user properties Type: array

connect.keepAlive

Sets the keep alive property, must be in the range from 0 to 2^16 -1 (unsigned short) Type: object

Unit: unsigned short

reconnect

Automatic reconnect strategy using an exponential backoff with configurable initial and maximum delays. Type: object

Fields of reconnect:

reconnect.initialDelay

The initial delay before attempting to reconnect. Type: object

reconnect.maxDelay

Whatever the delay value that backoff strategy arrives at it will be capped by the maxDelay specified. Type: object

oauth

Configures the OAuth details. Type: object

Fields of oauth:

oauth.scopes

Type: array

oauth.tokenUrl

Type: string

oauth.clientId

Type: string

oauth.clientSecret

Type: string

oauth.grantType

Type: string

Default value: client_credentials

oauth.username

Type: string

oauth.password

Type: string

Java dependency management

Add this declaration to your dependency management system to access the configuration DSL for this plugin in Java.

<dependency>
    <groupId>org.voltdb</groupId>
    <artifactId>volt-stream-plugin-mqtt-api</artifactId>
    <version>1.4.0</version>
</dependency>
implementation group: 'org.voltdb', name: 'volt-stream-plugin-mqtt-api', version: '1.4.0'

Usage Examples

stream
       .withName("Read data from Mqtt and convert payload to string")
       .consumeFromSource(
               MqttSourceConfigBuilder.builder()
                       .withAddress("mqtt.example.com", "1883")
                       .withGroupName("device-group")
                       .withTopicFilter("devices/+/data")
                       .withQos(MqttMessageQoS.AT_LEAST_ONCE)
       )
       .processWith(mqttPublishMessage -> mqttPublishMessage.payload())
       .processWith(readOnlyBuffer -> {
               ByteBuffer tempBuffer = readOnlyBuffer.duplicate();
            byte[] bytes = new byte[tempBuffer.remaining()];
            tempBuffer.get(bytes);
            return new String(bytes);
       })
       .terminateWithSink(sink);

This configuration connects to an MQTT broker at mqtt.example.com on port 1883 subscribes to topics matching devices/+/data, and processes messages with an at-least-once QoS level.

source:
   mqtt:
     address: "mqtt.example.com:1883'
     topicFilter: "devices/+/data"
     groupName: "device-group"
     qos: "AT_LEAST_ONCE"

This configuration connects to an MQTT broker at mqtt.example.com on port 1883 subscribes to topics matching devices/+/data, and processes messages with an at-least-once QoS level.