Mqtt¶
The mqtt source connects to an MQTT broker to consume publish messages from specified topics.
It supports configuration of basic attributes like the broker’s host and port, topic filter,
shared subscription group name, and message Quality of Service (QoS) level. Additionally it supports
basic authentication, TLS as well as OAuth.
Consumed messages are represented by the MqttPublishMessage record, which encapsulates MQTT-specific
attributes such as the payload, retain flag, message expiry, payload format, and user properties.
MQTT source uses shared subscription to distribute incoming messages between all the workers.
MqttSourceConfigBuilder.builder()
.withGroupName("group1")
.withTopicFilter("sensors/#")
.withAddressHost("localhost")
.withAddressPort(61616)
.withQos(MqttMessageQoS.AT_LEAST_ONCE)
source:
mqtt:
address: "mqtt.example.com:1883"
topicFilter: "sensors/#"
groupName: "group1"
qos: "AT_LEAST_ONCE"
Java dependency management¶
Add this declaration to your dependency management system to access the configuration DSL for this plugin in Java.
<dependency>
<groupId>org.voltdb</groupId>
<artifactId>volt-stream-plugin-mqtt-api</artifactId>
<version>1.8.0</version>
</dependency>
implementation group: 'org.voltdb', name: 'volt-stream-plugin-mqtt-api', version: '1.8.0'
Properties¶
identifier¶
Identifier of the client used to connect to brokers. If no value was provided it will use a random UUID with voltsp-source- prefix.
Type: string
address¶
Sets the address of the MQTT broker to connect to in host:port format. When omitted the port is set to 1883.
Required.
Type: object
Default value: 1883
Fields of address:
address.host¶
Type: string
address.port¶
Type: number
address.hasBracketlessColons¶
Type: boolean
topicFilter¶
Specifies the MQTT topic filter to subscribe to, supporting wildcards for flexible subscriptions. Required.
Type: string
groupName¶
Shared subscription group name. Required.
Type: string
websocket¶
Configuration for a WebSocket transport to use by MQTT clients.
Type: object
Fields of websocket:
websocket.subpath¶
The path that should be used when connecting to websocket endpoint.
Type: string
qos¶
Quality of Service (QoS) according to the MQTT specification.
Type: object
Supported values: at_most_once, at_least_once, exactly_once.
Default value: AT_LEAST_ONCE
ssl¶
Secure transport configuration.
Type: object
Fields of ssl:
ssl.trustStoreFile¶
Truststore file or trusted CA certificate; supported formats include JKS, PKCS#12, or PEM.
Type: string
ssl.trustStorePassword¶
Truststore password.
Type: string
ssl.keyStoreFile¶
Keystore file; supported formats include JKS, PKCS#12, or PEM
Type: string
ssl.keyStorePassword¶
Keystore password.
Type: string
ssl.keyPassword¶
Private key password. Optional — if not set, the key store password will be used.
Type: string
ssl.insecure¶
If set to true, disables SSL certificate and hostname validation. Intended for debugging purposes only. Doesn't work with mTLS.
Type: boolean
ssl.hostnameVerifier¶
Custom hostname verifier for SSL connections. If not specified and 'insecure' is true, hostname verification will be disabled.
Type: object
auth¶
If not specified no authentication is used.
Type: object
Fields of auth:
auth.username¶
Username used for authentication.
Type: string
auth.password¶
Password used for authentication.
Type: string
connect¶
Additional connect options.
Type: object
Fields of connect:
connect.userProperties¶
List of key value paris that will be passed to MQTT client as user properties
Type: array
connect.keepAlive¶
Sets the keep alive property, must be in the range from 0 to 2^16 -1 (unsigned short)
Type: object
Unit: unsigned short
reconnect¶
Automatic reconnect strategy using an exponential backoff with configurable initial and maximum delays.
Type: object
Fields of reconnect:
reconnect.initialDelay¶
The initial delay before attempting to reconnect.
Type: object
reconnect.maxDelay¶
Whatever the delay value that backoff strategy arrives at it will be capped by the maxDelay specified.
Type: object
oauth¶
Configures the OAuth details.
Type: object
Fields of oauth:
oauth.scopes¶
List of scopes to request together with the access token.
Type: array
oauth.tokenUrl¶
OAuth 2.0 token endpoint URL (e.g. https://idp.example.com/realms/foo/protocol/openid-connect/token).
Type: string
oauth.clientId¶
OAuth 2.0 client identifier registered with the identity provider.
Type: string
oauth.clientSecret¶
OAuth 2.0 client secret paired with clientId.
Type: string
oauth.grantType¶
OAuth 2.0 grant type used when requesting the token.
Type: string
Default value: client_credentials
oauth.username¶
Resource owner username
Type: string
oauth.password¶
Resource owner password
Type: string
oauth.audience¶
OAuth 2.0 audience parameter restricting the recipients the issued access
token is intended for. The value is propagated to the aud claim of the JWT
by identity providers that honor it (e.g. Keycloak via an audience mapper).
Audience handling depends on the identity provider: some providers
(e.g. Auth0, Okta) reject unknown audiences with HTTP 400, others
(e.g. Keycloak with the client_credentials grant) silently ignore
the parameter. In the latter case, audience enforcement happens at
the resource server when it validates the aud claim.
Type: string
oauth.ssl¶
TLS configuration for the HTTP client that calls the OAuth token endpoint. Set this when the token endpoint uses HTTPS with a certificate that the JVM does not already trust (e.g. private CA, self-signed) or when the identity provider requires mTLS.
Type: object
Fields of oauth.ssl:
oauth.ssl.trustStoreFile¶
Truststore file or trusted CA certificate; supported formats include JKS, PKCS#12, or PEM.
Type: string
oauth.ssl.trustStorePassword¶
Truststore password.
Type: string
oauth.ssl.keyStoreFile¶
Keystore file; supported formats include JKS, PKCS#12, or PEM
Type: string
oauth.ssl.keyStorePassword¶
Keystore password.
Type: string
oauth.ssl.keyPassword¶
Private key password. Optional — if not set, the key store password will be used.
Type: string
oauth.ssl.insecure¶
If set to true, disables SSL certificate and hostname validation. Intended for debugging purposes only. Doesn't work with mTLS.
Type: boolean
oauth.ssl.hostnameVerifier¶
Custom hostname verifier for SSL connections. If not specified and 'insecure' is true, hostname verification will be disabled.
Type: object
JSON Schema¶
You can validate or explore the configuration using its JSON Schema.
Usage Examples¶
stream
.withName("Read data from Mqtt and convert payload to string")
.consumeFromSource(
MqttSourceConfigBuilder.builder()
.withAddress("mqtt.example.com", "1883")
.withGroupName("device-group")
.withTopicFilter("devices/+/data")
.withQos(MqttMessageQoS.AT_LEAST_ONCE)
)
.processWith(mqttPublishMessage -> mqttPublishMessage.payload())
.processWith(readOnlyBuffer -> {
ByteBuffer tempBuffer = readOnlyBuffer.duplicate();
byte[] bytes = new byte[tempBuffer.remaining()];
tempBuffer.get(bytes);
return new String(bytes);
})
.terminateWithSink(sink);
This configuration connects to an MQTT broker at mqtt.example.com on port 1883
subscribes to topics matching devices/+/data,
and processes messages with an at-least-once QoS level.
source:
mqtt:
address: "mqtt.example.com:1883'
topicFilter: "devices/+/data"
groupName: "device-group"
qos: "AT_LEAST_ONCE"
This configuration connects to an MQTT broker at mqtt.example.com on port 1883
subscribes to topics matching devices/+/data,
and processes messages with an at-least-once QoS level.
Metrics¶
Mqtt metrics¶
Metric enum: org.voltdb.stream.plugin.mqtt.MqttMetric
| Prometheus name | Type | Description |
|---|---|---|
voltsp_mqtt_sink_disconnected_total |
counter |
Number of MQTT sink disconnect events. |
voltsp_mqtt_sink_messages_published_total |
counter |
Number of MQTT messages published by the sink. |
voltsp_mqtt_sink_request_retries_total |
counter |
Number of MQTT sink request retries. |
voltsp_mqtt_sink_requests_failed_total |
counter |
Number of failed MQTT sink requests. |
voltsp_mqtt_sink_requests_succeeded_total |
counter |
Number of successful MQTT sink requests. |
voltsp_mqtt_sink_token_expired_total |
counter |
Number of MQTT sink token expiration events. |
voltsp_mqtt_source_disconnected_total |
counter |
Number of MQTT source disconnect events. |
voltsp_mqtt_source_messages_received_total |
counter |
Number of messages received by the MQTT source. |
voltsp_mqtt_source_receive_errors_total |
counter |
Number of MQTT source receive errors. |
voltsp_mqtt_source_token_expired_total |
counter |
Number of MQTT source token expiration events. |
voltsp_mqtt_source_batch_size_total |
histogram |
MQTT source receive batch size. |
Mqtt tags¶
Tag enum: org.voltdb.stream.plugin.mqtt.MqttTag
Metrics reported by this component may include these tags. Not every metric includes every tag; tags are present only when they are relevant to the measurement.
| Tag | Description |
|---|---|
group_name |
MQTT consumer group name. |
mqtt_host |
MQTT broker host. |
syslog_protocol |
Protocol tag retained for compatibility with existing MQTT metric labels. |
topic_filter |
MQTT topic filter used by the source. |