Mqtt¶
The mqtt sink connects to an MQTT broker to publish messages to specified topic.
It supports configuration of basic attributes like the broker’s host and port and topic filter.
Additionally it supports basic authentication, TLS as well as OAuth.
MqttSinkConfigBuilder.builder()
.withAddress("mqtt.example.com", 1883)
.withIdentifier("Tests")
.withWebsocketBuilder(websocket -> {
websocket.withSubpath("/topic");
})
.withAuthBuilder(auth -> {
auth.withUsername("admin");
auth.withPassword("admin");
})
sink:
mqtt:
address: "mqtt.example.com:1883"
identifier: "Tests"
websocket:
subpath: /topic
Java dependency management¶
Add this declaration to your dependency management system to access the configuration DSL for this plugin in Java.
<dependency>
<groupId>org.voltdb</groupId>
<artifactId>volt-stream-plugin-mqtt-api</artifactId>
<version>1.8.0</version>
</dependency>
implementation group: 'org.voltdb', name: 'volt-stream-plugin-mqtt-api', version: '1.8.0'
Properties¶
identifier¶
Type: string
address¶
Sets the address of the MQTT broker to connect to in host:port format. When omitted the port is set to 1883.
Required.
Type: object
Default value: :1883
Fields of address:
address.host¶
Type: string
address.port¶
Type: number
address.hasBracketlessColons¶
Type: boolean
websocket¶
Configuration for a WebSocket transport to use by MQTT clients.
Type: object
Fields of websocket:
websocket.subpath¶
The path that should be used when connecting to websocket endpoint.
Type: string
ssl¶
Secure transport configuration.
Type: object
Fields of ssl:
ssl.trustStoreFile¶
Truststore file or trusted CA certificate; supported formats include JKS, PKCS#12, or PEM.
Type: string
ssl.trustStorePassword¶
Truststore password.
Type: string
ssl.keyStoreFile¶
Keystore file; supported formats include JKS, PKCS#12, or PEM
Type: string
ssl.keyStorePassword¶
Keystore password.
Type: string
ssl.keyPassword¶
Private key password. Optional — if not set, the key store password will be used.
Type: string
ssl.insecure¶
If set to true, disables SSL certificate and hostname validation. Intended for debugging purposes only. Doesn't work with mTLS.
Type: boolean
ssl.hostnameVerifier¶
Custom hostname verifier for SSL connections. If not specified and 'insecure' is true, hostname verification will be disabled.
Type: object
auth¶
If not specified no authentication is used.
Type: object
Fields of auth:
auth.username¶
Username used for authentication.
Type: string
auth.password¶
Password used for authentication.
Type: string
connect¶
General connection options such as keep alive and user properties to pass to client.
Type: object
Fields of connect:
connect.userProperties¶
List of key value paris that will be passed to MQTT client as user properties
Type: array
connect.keepAlive¶
Sets the keep alive property, must be in the range from 0 to 2^16 -1 (unsigned short)
Type: object
Unit: unsigned short
reconnect¶
Configuration for an exponential backoff reconnect strategy.
Type: object
Fields of reconnect:
reconnect.initialDelay¶
The initial delay before attempting to reconnect.
Type: object
reconnect.maxDelay¶
Whatever the delay value that backoff strategy arrives at it will be capped by the maxDelay specified.
Type: object
oauth¶
Configures the OAuth details.
Type: object
Fields of oauth:
oauth.scopes¶
List of scopes to request together with the access token.
Type: array
oauth.tokenUrl¶
OAuth 2.0 token endpoint URL (e.g. https://idp.example.com/realms/foo/protocol/openid-connect/token).
Type: string
oauth.clientId¶
OAuth 2.0 client identifier registered with the identity provider.
Type: string
oauth.clientSecret¶
OAuth 2.0 client secret paired with clientId.
Type: string
oauth.grantType¶
OAuth 2.0 grant type used when requesting the token.
Type: string
Default value: client_credentials
oauth.username¶
Resource owner username
Type: string
oauth.password¶
Resource owner password
Type: string
oauth.audience¶
OAuth 2.0 audience parameter restricting the recipients the issued access
token is intended for. The value is propagated to the aud claim of the JWT
by identity providers that honor it (e.g. Keycloak via an audience mapper).
Audience handling depends on the identity provider: some providers
(e.g. Auth0, Okta) reject unknown audiences with HTTP 400, others
(e.g. Keycloak with the client_credentials grant) silently ignore
the parameter. In the latter case, audience enforcement happens at
the resource server when it validates the aud claim.
Type: string
oauth.ssl¶
TLS configuration for the HTTP client that calls the OAuth token endpoint. Set this when the token endpoint uses HTTPS with a certificate that the JVM does not already trust (e.g. private CA, self-signed) or when the identity provider requires mTLS.
Type: object
Fields of oauth.ssl:
oauth.ssl.trustStoreFile¶
Truststore file or trusted CA certificate; supported formats include JKS, PKCS#12, or PEM.
Type: string
oauth.ssl.trustStorePassword¶
Truststore password.
Type: string
oauth.ssl.keyStoreFile¶
Keystore file; supported formats include JKS, PKCS#12, or PEM
Type: string
oauth.ssl.keyStorePassword¶
Keystore password.
Type: string
oauth.ssl.keyPassword¶
Private key password. Optional — if not set, the key store password will be used.
Type: string
oauth.ssl.insecure¶
If set to true, disables SSL certificate and hostname validation. Intended for debugging purposes only. Doesn't work with mTLS.
Type: boolean
oauth.ssl.hostnameVerifier¶
Custom hostname verifier for SSL connections. If not specified and 'insecure' is true, hostname verification will be disabled.
Type: object
retry¶
Type: object
Fields of retry:
retry.retries¶
Number of retry attempts after a request failure.
Type: number
Default value: 3
retry.backoffDelay¶
Initial delay before the first retry attempt.
Type: object
Default value: PT0.2S
retry.maxBackoffDelay¶
Maximum delay between consecutive retry attempts.
Type: object
Default value: PT3S
exceptionHandler¶
Custom exception handler enabling interception of all errors related to this sink.
Type: object
JSON Schema¶
You can validate or explore the configuration using its JSON Schema.
Usage Examples¶
stream
.withName("Send data to Mqtt")
.consumeFromSource(Sources.collection(SOURCE_MESSAGE))
.processWith(message -> MqttPublishMessage
.builder(topic, MqttMessageQoS.AT_LEAST_ONCE)
.withPayload(ByteBuffer.wrap(message.getBytes(StandardCharsets.UTF_8)))
.build())
.terminateWithSink(
MqttSinkConfigBuilder.builder()
.withAddress("mqtt.example.com", 1883)
.withSslBuilder(ssl -> {
ssl.withIgnoreHostnameValidation(true);
})
.withOauthBuilder(oauth -> {
oauth.withClientId("admin");
oauth.withTokenUrl("...");
})
);
This configuration connects to an MQTT broker at mqtt.example.com on port 1883
using websockets and oauth.
sink:
mqtt:
address: "mqtt.example.com:1883"
auth:
username: "admin
password: "admin
ssl:
ignoreHostnameValidation: true
websocket:
subpath: /topic
This configuration connects to an MQTT broker at mqtt.example.com on port 1883
using websockets and authentication.
Metrics¶
Mqtt metrics¶
Metric enum: org.voltdb.stream.plugin.mqtt.MqttMetric
| Prometheus name | Type | Description |
|---|---|---|
voltsp_mqtt_sink_disconnected_total |
counter |
Number of MQTT sink disconnect events. |
voltsp_mqtt_sink_messages_published_total |
counter |
Number of MQTT messages published by the sink. |
voltsp_mqtt_sink_request_retries_total |
counter |
Number of MQTT sink request retries. |
voltsp_mqtt_sink_requests_failed_total |
counter |
Number of failed MQTT sink requests. |
voltsp_mqtt_sink_requests_succeeded_total |
counter |
Number of successful MQTT sink requests. |
voltsp_mqtt_sink_token_expired_total |
counter |
Number of MQTT sink token expiration events. |
voltsp_mqtt_source_disconnected_total |
counter |
Number of MQTT source disconnect events. |
voltsp_mqtt_source_messages_received_total |
counter |
Number of messages received by the MQTT source. |
voltsp_mqtt_source_receive_errors_total |
counter |
Number of MQTT source receive errors. |
voltsp_mqtt_source_token_expired_total |
counter |
Number of MQTT source token expiration events. |
voltsp_mqtt_source_batch_size_total |
histogram |
MQTT source receive batch size. |
Mqtt tags¶
Tag enum: org.voltdb.stream.plugin.mqtt.MqttTag
Metrics reported by this component may include these tags. Not every metric includes every tag; tags are present only when they are relevant to the measurement.
| Tag | Description |
|---|---|
group_name |
MQTT consumer group name. |
mqtt_host |
MQTT broker host. |
syslog_protocol |
Protocol tag retained for compatibility with existing MQTT metric labels. |
topic_filter |
MQTT topic filter used by the source. |