Skip to content

Mqtt

The mqtt source connects to an MQTT broker to consume publish messages from specified topics. It supports configuration of basic attributes like the broker’s host and port, topic filter, shared subscription group name, and message Quality of Service (QoS) level. Additionally it supports basic authentication, TLS as well as OAuth.

Consumed messages are represented by the MqttPublishMessage record, which encapsulates MQTT-specific attributes such as the payload, retain flag, message expiry, payload format, and user properties.

MQTT source uses shared subscription to distribute incoming messages between all the workers.

MqttSourceConfigBuilder.builder()
     .withGroupName("group1")
     .withTopicFilter("sensors/#")
     .withAddressHost("localhost")
     .withAddressPort(61616)
     .withQos(MqttMessageQoS.AT_LEAST_ONCE)
source:
   mqtt:
     address: "mqtt.example.com:1883"
     topicFilter: "sensors/#"
     groupName: "group1"
     qos: "AT_LEAST_ONCE"

Java dependency management

Add this declaration to your dependency management system to access the configuration DSL for this plugin in Java.

<dependency>
    <groupId>org.voltdb</groupId>
    <artifactId>volt-stream-plugin-mqtt-api</artifactId>
    <version>1.8.0</version>
</dependency>
implementation group: 'org.voltdb', name: 'volt-stream-plugin-mqtt-api', version: '1.8.0'

Properties

identifier

Identifier of the client used to connect to brokers. If no value was provided it will use a random UUID with voltsp-source- prefix. Type: string

address

Sets the address of the MQTT broker to connect to in host:port format. When omitted the port is set to 1883. Required.

Type: object

Default value: 1883

Fields of address:

address.host

Type: string

address.port

Type: number

address.hasBracketlessColons

Type: boolean

topicFilter

Specifies the MQTT topic filter to subscribe to, supporting wildcards for flexible subscriptions. Required.

Type: string

groupName

Shared subscription group name. Required.

Type: string

websocket

Configuration for a WebSocket transport to use by MQTT clients. Type: object

Fields of websocket:

websocket.subpath

The path that should be used when connecting to websocket endpoint. Type: string

qos

Quality of Service (QoS) according to the MQTT specification. Type: object

Supported values: at_most_once, at_least_once, exactly_once.

Default value: AT_LEAST_ONCE

ssl

Secure transport configuration. Type: object

Fields of ssl:

ssl.trustStoreFile

Truststore file or trusted CA certificate; supported formats include JKS, PKCS#12, or PEM. Type: string

ssl.trustStorePassword

Truststore password. Type: string

ssl.keyStoreFile

Keystore file; supported formats include JKS, PKCS#12, or PEM Type: string

ssl.keyStorePassword

Keystore password. Type: string

ssl.keyPassword

Private key password. Optional — if not set, the key store password will be used. Type: string

ssl.insecure

If set to true, disables SSL certificate and hostname validation. Intended for debugging purposes only. Doesn't work with mTLS.

Type: boolean

ssl.hostnameVerifier

Custom hostname verifier for SSL connections. If not specified and 'insecure' is true, hostname verification will be disabled.

Type: object

auth

If not specified no authentication is used. Type: object

Fields of auth:

auth.username

Username used for authentication. Type: string

auth.password

Password used for authentication. Type: string

connect

Additional connect options. Type: object

Fields of connect:

connect.userProperties

List of key value paris that will be passed to MQTT client as user properties Type: array

connect.keepAlive

Sets the keep alive property, must be in the range from 0 to 2^16 -1 (unsigned short) Type: object

Unit: unsigned short

reconnect

Automatic reconnect strategy using an exponential backoff with configurable initial and maximum delays. Type: object

Fields of reconnect:

reconnect.initialDelay

The initial delay before attempting to reconnect. Type: object

reconnect.maxDelay

Whatever the delay value that backoff strategy arrives at it will be capped by the maxDelay specified. Type: object

oauth

Configures the OAuth details. Type: object

Fields of oauth:

oauth.scopes

List of scopes to request together with the access token. Type: array

oauth.tokenUrl

OAuth 2.0 token endpoint URL (e.g. https://idp.example.com/realms/foo/protocol/openid-connect/token). Type: string

oauth.clientId

OAuth 2.0 client identifier registered with the identity provider. Type: string

oauth.clientSecret

OAuth 2.0 client secret paired with clientId. Type: string

oauth.grantType

OAuth 2.0 grant type used when requesting the token. Type: string

Default value: client_credentials

oauth.username

Resource owner username Type: string

oauth.password

Resource owner password Type: string

oauth.audience

OAuth 2.0 audience parameter restricting the recipients the issued access token is intended for. The value is propagated to the aud claim of the JWT by identity providers that honor it (e.g. Keycloak via an audience mapper).

Audience handling depends on the identity provider: some providers (e.g. Auth0, Okta) reject unknown audiences with HTTP 400, others (e.g. Keycloak with the client_credentials grant) silently ignore the parameter. In the latter case, audience enforcement happens at the resource server when it validates the aud claim.

Type: string

oauth.ssl

TLS configuration for the HTTP client that calls the OAuth token endpoint. Set this when the token endpoint uses HTTPS with a certificate that the JVM does not already trust (e.g. private CA, self-signed) or when the identity provider requires mTLS.

Type: object

Fields of oauth.ssl:

oauth.ssl.trustStoreFile

Truststore file or trusted CA certificate; supported formats include JKS, PKCS#12, or PEM. Type: string

oauth.ssl.trustStorePassword

Truststore password. Type: string

oauth.ssl.keyStoreFile

Keystore file; supported formats include JKS, PKCS#12, or PEM Type: string

oauth.ssl.keyStorePassword

Keystore password. Type: string

oauth.ssl.keyPassword

Private key password. Optional — if not set, the key store password will be used. Type: string

oauth.ssl.insecure

If set to true, disables SSL certificate and hostname validation. Intended for debugging purposes only. Doesn't work with mTLS.

Type: boolean

oauth.ssl.hostnameVerifier

Custom hostname verifier for SSL connections. If not specified and 'insecure' is true, hostname verification will be disabled.

Type: object

JSON Schema

You can validate or explore the configuration using its JSON Schema.

Usage Examples

stream
       .withName("Read data from Mqtt and convert payload to string")
       .consumeFromSource(
               MqttSourceConfigBuilder.builder()
                       .withAddress("mqtt.example.com", "1883")
                       .withGroupName("device-group")
                       .withTopicFilter("devices/+/data")
                       .withQos(MqttMessageQoS.AT_LEAST_ONCE)
       )
       .processWith(mqttPublishMessage -> mqttPublishMessage.payload())
       .processWith(readOnlyBuffer -> {
               ByteBuffer tempBuffer = readOnlyBuffer.duplicate();
            byte[] bytes = new byte[tempBuffer.remaining()];
            tempBuffer.get(bytes);
            return new String(bytes);
       })
       .terminateWithSink(sink);

This configuration connects to an MQTT broker at mqtt.example.com on port 1883 subscribes to topics matching devices/+/data, and processes messages with an at-least-once QoS level.

source:
   mqtt:
     address: "mqtt.example.com:1883'
     topicFilter: "devices/+/data"
     groupName: "device-group"
     qos: "AT_LEAST_ONCE"

This configuration connects to an MQTT broker at mqtt.example.com on port 1883 subscribes to topics matching devices/+/data, and processes messages with an at-least-once QoS level.

Metrics

Mqtt metrics

Metric enum: org.voltdb.stream.plugin.mqtt.MqttMetric

Prometheus name Type Description
voltsp_mqtt_sink_disconnected_total counter Number of MQTT sink disconnect events.
voltsp_mqtt_sink_messages_published_total counter Number of MQTT messages published by the sink.
voltsp_mqtt_sink_request_retries_total counter Number of MQTT sink request retries.
voltsp_mqtt_sink_requests_failed_total counter Number of failed MQTT sink requests.
voltsp_mqtt_sink_requests_succeeded_total counter Number of successful MQTT sink requests.
voltsp_mqtt_sink_token_expired_total counter Number of MQTT sink token expiration events.
voltsp_mqtt_source_disconnected_total counter Number of MQTT source disconnect events.
voltsp_mqtt_source_messages_received_total counter Number of messages received by the MQTT source.
voltsp_mqtt_source_receive_errors_total counter Number of MQTT source receive errors.
voltsp_mqtt_source_token_expired_total counter Number of MQTT source token expiration events.
voltsp_mqtt_source_batch_size_total histogram MQTT source receive batch size.

Mqtt tags

Tag enum: org.voltdb.stream.plugin.mqtt.MqttTag

Metrics reported by this component may include these tags. Not every metric includes every tag; tags are present only when they are relevant to the measurement.

Tag Description
group_name MQTT consumer group name.
mqtt_host MQTT broker host.
syslog_protocol Protocol tag retained for compatibility with existing MQTT metric labels.
topic_filter MQTT topic filter used by the source.