Kafka¶
The kafka
source consumes data from an Apache Kafka topic. It allows configuration of consumer properties,
such as topic names, bootstrap servers, poll timeout, starting offsets, and more.
This source is suitable for integrating real-time data from Kafka into your streaming pipeline.
KafkaSourceConfigBuilder.<String>builder()
.withGroupId("my-group")
.withTopicNames("topicA", "topicB")
.withBootstrapServers("serverA:9092", "serverB:9092")
.withStartingOffset(KafkaStartingOffset.EARLIEST)
.withPollTimeout(Duration.ofMillis(250))
.withMaxCommitRetries(3)
.withMaxCommitTimeout(Duration.ofSeconds(10)
);
source:
kafka:
groupId: "my-group"
bootstrapServers:
- "serverA:9092"
- "serverB:9092"
topicNames:
- "topicA"
- "topicB"
startingOffset: "EARLIEST"
pollTimeout: "PT0.25S"
maxCommitTimeout: "PT10S"
maxCommitRetries: 3
properties:
key1: value1
key2: value2
Java dependency management¶
Add this declaration to your dependency management system to access the configuration DSL for this plugin in Java.
<dependency>
<groupId>org.voltdb</groupId>
<artifactId>volt-stream-plugin-kafka-api</artifactId>
<version>1.0-20250910-124207-release-1.5.3</version>
</dependency>
implementation group: 'org.voltdb', name: 'volt-stream-plugin-kafka-api', version: '1.0-20250910-124207-release-1.5.3'
Properties¶
bootstrapServers
¶
A list of host/port pairs used to establish the initial connection to the Kafka cluster. Clients use this list to bootstrap and discover the full set of Kafka brokers. Required.
Type: array
topicNames
¶
Required.
Type: array
groupId
¶
A unique string that identifies the consumer group this consumer belongs to. Required.
Type: string
startingOffset
¶
What to do when there is no initial offset in Kafka or if the current offset does not exist any more: - earliest: automatically reset the offset to the earliest offset. - latest: automatically reset the offset to the latest offset. - none: throw exception to the consumer if no previous offset is found for the consumer's group.
Equivalent to Kafka auto.offset.reset
setting.
Required.
Type: object
Supported values: earliest
, latest
, none
.
Default value: earliest
pollTimeout
¶
The maximum time to block the receiving thread.
Type: object
Default value: 1s
maxCommitTimeout
¶
Sets the maximum timeout for commit retries.
Type: object
Default value: 10s
maxCommitRetries
¶
Configures the maximum number of retries for committing offsets.
Type: number
Default value: 3
properties
¶
Adds or overrides a specific Kafka client property, allowing customization for advanced configurations.
Type: object
keyDeserializer
¶
Kafka key deserializer Required.
Type: class
Default value: org.apache.kafka.common.serialization.ByteBufferDeserializer
valueDeserializer
¶
Kafka value deserializer Required.
Type: class
Default value: org.apache.kafka.common.serialization.ByteBufferDeserializer
exceptionHandler
¶
Custom exception handler enabling interception of all errors related to this source.
Type: object
ssl
¶
SSL configuration.
Type: object
Fields of ssl
:
ssl.trustStoreFile
¶
Truststore file or trusted CA certificate; supported formats include JKS, PKCS#12, or PEM.
Type: string
ssl.trustStorePassword
¶
Truststore password.
Type: string
ssl.keyStoreFile
¶
Keystore file; supported formats include JKS, PKCS#12, or PEM
Type: string
ssl.keyStorePassword
¶
Keystore password.
Type: string
ssl.keyPassword
¶
Private key password. Optional — if not set, the key store password will be used.
Type: string
ssl.insecure
¶
If set to true, disables SSL certificate and hostname validation. Intended for debugging purposes only. Doesn't work with mTLS.
Type: boolean
ssl.hostnameVerifier
¶
Custom hostname verifier for SSL connections. If not specified and 'insecure' is true, hostname verification will be disabled.
Type: object
schemaRegistryUrl
¶
Schema registry URL
Type: string
Usage Examples¶
KafkaSourceConfigBuilder.<String>builder()
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withGroupId("my-group")
.withTopicNames("topicA", "topicB")
.withBootstrapServers(List.of("serverA:9092","serverB:9092"))
.withStartingOffset(KafkaStartingOffset.LATEST)
.withPollTimeout(Duration.ofMillis(500))
.withMaxCommitRetries(5)
.withMaxCommitTimeout(Duration.ofSeconds(15))
.withSslBuilder(builder -> builder
.withTrustStoreFile("/path/to/truststore.jks")
.withTrustStorePassword("truststore-password")
.withKeyStoreFile("/path/to/keystore.jks")
.withKeyPassword("keystore-password")
.withKeyPassword("key-password"))
.withExceptionHandler((kafkaConsumer, context, ex) ->
System.err.println("Error while consuming data: " + ex.getMessage())
);
This configuration is designed for consuming real-time data from Kafka topics, with support for secure communication and custom error handling.