Skip to content

Mqtt

The mqtt sink connects to an MQTT broker to publish messages to specified topic. It supports configuration of basic attributes like the broker’s host and port and topic filter. Additionally it supports basic authentication, TLS as well as OAuth.

MqttSinkConfigBuilder.builder()
     .withAddress("mqtt.example.com", 1883)
     .withIdentifier("Tests")
     .withWebsocketBuilder(websocket -> {
             websocket.withSubpath("/topic");
     })
     .withAuthBuilder(auth -> {
         auth.withUsername("admin");
         auth.withPassword("admin");
     })
sink:
  mqtt:
    address: "mqtt.example.com:1883"
    identifier: "Tests"
    websocket:
      subpath: /topic

Java dependency management

Add this declaration to your dependency management system to access the configuration DSL for this plugin in Java.

<dependency>
    <groupId>org.voltdb</groupId>
    <artifactId>volt-stream-plugin-mqtt-api</artifactId>
    <version>1.8.0</version>
</dependency>
implementation group: 'org.voltdb', name: 'volt-stream-plugin-mqtt-api', version: '1.8.0'

Properties

identifier

Type: string

address

Sets the address of the MQTT broker to connect to in host:port format. When omitted the port is set to 1883. Required.

Type: object

Default value: :1883

Fields of address:

address.host

Type: string

address.port

Type: number

address.hasBracketlessColons

Type: boolean

websocket

Configuration for a WebSocket transport to use by MQTT clients. Type: object

Fields of websocket:

websocket.subpath

The path that should be used when connecting to websocket endpoint. Type: string

ssl

Secure transport configuration. Type: object

Fields of ssl:

ssl.trustStoreFile

Truststore file or trusted CA certificate; supported formats include JKS, PKCS#12, or PEM. Type: string

ssl.trustStorePassword

Truststore password. Type: string

ssl.keyStoreFile

Keystore file; supported formats include JKS, PKCS#12, or PEM Type: string

ssl.keyStorePassword

Keystore password. Type: string

ssl.keyPassword

Private key password. Optional — if not set, the key store password will be used. Type: string

ssl.insecure

If set to true, disables SSL certificate and hostname validation. Intended for debugging purposes only. Doesn't work with mTLS.

Type: boolean

ssl.hostnameVerifier

Custom hostname verifier for SSL connections. If not specified and 'insecure' is true, hostname verification will be disabled.

Type: object

auth

If not specified no authentication is used. Type: object

Fields of auth:

auth.username

Username used for authentication. Type: string

auth.password

Password used for authentication. Type: string

connect

General connection options such as keep alive and user properties to pass to client. Type: object

Fields of connect:

connect.userProperties

List of key value paris that will be passed to MQTT client as user properties Type: array

connect.keepAlive

Sets the keep alive property, must be in the range from 0 to 2^16 -1 (unsigned short) Type: object

Unit: unsigned short

reconnect

Configuration for an exponential backoff reconnect strategy. Type: object

Fields of reconnect:

reconnect.initialDelay

The initial delay before attempting to reconnect. Type: object

reconnect.maxDelay

Whatever the delay value that backoff strategy arrives at it will be capped by the maxDelay specified. Type: object

oauth

Configures the OAuth details. Type: object

Fields of oauth:

oauth.scopes

List of scopes to request together with the access token. Type: array

oauth.tokenUrl

OAuth 2.0 token endpoint URL (e.g. https://idp.example.com/realms/foo/protocol/openid-connect/token). Type: string

oauth.clientId

OAuth 2.0 client identifier registered with the identity provider. Type: string

oauth.clientSecret

OAuth 2.0 client secret paired with clientId. Type: string

oauth.grantType

OAuth 2.0 grant type used when requesting the token. Type: string

Default value: client_credentials

oauth.username

Resource owner username Type: string

oauth.password

Resource owner password Type: string

oauth.audience

OAuth 2.0 audience parameter restricting the recipients the issued access token is intended for. The value is propagated to the aud claim of the JWT by identity providers that honor it (e.g. Keycloak via an audience mapper).

Audience handling depends on the identity provider: some providers (e.g. Auth0, Okta) reject unknown audiences with HTTP 400, others (e.g. Keycloak with the client_credentials grant) silently ignore the parameter. In the latter case, audience enforcement happens at the resource server when it validates the aud claim.

Type: string

oauth.ssl

TLS configuration for the HTTP client that calls the OAuth token endpoint. Set this when the token endpoint uses HTTPS with a certificate that the JVM does not already trust (e.g. private CA, self-signed) or when the identity provider requires mTLS.

Type: object

Fields of oauth.ssl:

oauth.ssl.trustStoreFile

Truststore file or trusted CA certificate; supported formats include JKS, PKCS#12, or PEM. Type: string

oauth.ssl.trustStorePassword

Truststore password. Type: string

oauth.ssl.keyStoreFile

Keystore file; supported formats include JKS, PKCS#12, or PEM Type: string

oauth.ssl.keyStorePassword

Keystore password. Type: string

oauth.ssl.keyPassword

Private key password. Optional — if not set, the key store password will be used. Type: string

oauth.ssl.insecure

If set to true, disables SSL certificate and hostname validation. Intended for debugging purposes only. Doesn't work with mTLS.

Type: boolean

oauth.ssl.hostnameVerifier

Custom hostname verifier for SSL connections. If not specified and 'insecure' is true, hostname verification will be disabled.

Type: object

retry

Type: object

Fields of retry:

retry.retries

Number of retry attempts after a request failure. Type: number

Default value: 3

retry.backoffDelay

Initial delay before the first retry attempt. Type: object

Default value: PT0.2S

retry.maxBackoffDelay

Maximum delay between consecutive retry attempts. Type: object

Default value: PT3S

exceptionHandler

Custom exception handler enabling interception of all errors related to this sink. Type: object

JSON Schema

You can validate or explore the configuration using its JSON Schema.

Usage Examples

stream
    .withName("Send data to Mqtt")
    .consumeFromSource(Sources.collection(SOURCE_MESSAGE))
    .processWith(message -> MqttPublishMessage
            .builder(topic, MqttMessageQoS.AT_LEAST_ONCE)
            .withPayload(ByteBuffer.wrap(message.getBytes(StandardCharsets.UTF_8)))
            .build())
    .terminateWithSink(
            MqttSinkConfigBuilder.builder()
                    .withAddress("mqtt.example.com", 1883)
                    .withSslBuilder(ssl -> {
                        ssl.withIgnoreHostnameValidation(true);
                    })
                    .withOauthBuilder(oauth -> {
                        oauth.withClientId("admin");
                        oauth.withTokenUrl("...");
                    })
    );

This configuration connects to an MQTT broker at mqtt.example.com on port 1883 using websockets and oauth.

sink:
   mqtt:
       address: "mqtt.example.com:1883"
       auth:
           username: "admin
           password: "admin
       ssl:
           ignoreHostnameValidation: true
       websocket:
           subpath: /topic

This configuration connects to an MQTT broker at mqtt.example.com on port 1883 using websockets and authentication.

Metrics

Mqtt metrics

Metric enum: org.voltdb.stream.plugin.mqtt.MqttMetric

Prometheus name Type Description
voltsp_mqtt_sink_disconnected_total counter Number of MQTT sink disconnect events.
voltsp_mqtt_sink_messages_published_total counter Number of MQTT messages published by the sink.
voltsp_mqtt_sink_request_retries_total counter Number of MQTT sink request retries.
voltsp_mqtt_sink_requests_failed_total counter Number of failed MQTT sink requests.
voltsp_mqtt_sink_requests_succeeded_total counter Number of successful MQTT sink requests.
voltsp_mqtt_sink_token_expired_total counter Number of MQTT sink token expiration events.
voltsp_mqtt_source_disconnected_total counter Number of MQTT source disconnect events.
voltsp_mqtt_source_messages_received_total counter Number of messages received by the MQTT source.
voltsp_mqtt_source_receive_errors_total counter Number of MQTT source receive errors.
voltsp_mqtt_source_token_expired_total counter Number of MQTT source token expiration events.
voltsp_mqtt_source_batch_size_total histogram MQTT source receive batch size.

Mqtt tags

Tag enum: org.voltdb.stream.plugin.mqtt.MqttTag

Metrics reported by this component may include these tags. Not every metric includes every tag; tags are present only when they are relevant to the measurement.

Tag Description
group_name MQTT consumer group name.
mqtt_host MQTT broker host.
syslog_protocol Protocol tag retained for compatibility with existing MQTT metric labels.
topic_filter MQTT topic filter used by the source.