Skip to content

Kafka

The kafka source consumes data from an Apache Kafka topic. It allows configuration of consumer properties, such as topic names, bootstrap servers, poll timeout, starting offsets, and more. This source is suitable for integrating real-time data from Kafka into your streaming pipeline.

KafkaSourceConfigBuilder.<String>builder()
   .withGroupId("my-group")
   .withTopicNames("topicA", "topicB")
   .withBootstrapServers("serverA:9092", "serverB:9092")
   .withStartingOffset(KafkaStartingOffset.EARLIEST)
   .withPollTimeout(Duration.ofMillis(250))
   .withMaxCommitRetries(3)
   .withMaxCommitTimeout(Duration.ofSeconds(10)
);
source:
  kafka:
    groupId: "my-group"
    bootstrapServers:
      - "serverA:9092"
      - "serverB:9092"
    topicNames:
      - "topicA"
      - "topicB"
    startingOffset: "EARLIEST"
    pollTimeout: "PT0.25S"
    maxCommitTimeout: "PT10S"
    maxCommitRetries: 3
    properties:
      key1: value1
      key2: value2

Java dependency management

Add this declaration to your dependency management system to access the configuration DSL for this plugin in Java.

<dependency>
    <groupId>org.voltdb</groupId>
    <artifactId>volt-stream-plugin-kafka-api</artifactId>
    <version>1.0-20250910-124207-release-1.5.3</version>
</dependency>
implementation group: 'org.voltdb', name: 'volt-stream-plugin-kafka-api', version: '1.0-20250910-124207-release-1.5.3'

Properties

bootstrapServers

A list of host/port pairs used to establish the initial connection to the Kafka cluster. Clients use this list to bootstrap and discover the full set of Kafka brokers. Required.

Type: array

topicNames

Required.

Type: array

groupId

A unique string that identifies the consumer group this consumer belongs to. Required.

Type: string

startingOffset

What to do when there is no initial offset in Kafka or if the current offset does not exist any more: - earliest: automatically reset the offset to the earliest offset. - latest: automatically reset the offset to the latest offset. - none: throw exception to the consumer if no previous offset is found for the consumer's group.

Equivalent to Kafka auto.offset.reset setting.

Required.

Type: object

Supported values: earliest, latest, none.

Default value: earliest

pollTimeout

The maximum time to block the receiving thread. Type: object

Default value: 1s

maxCommitTimeout

Sets the maximum timeout for commit retries. Type: object

Default value: 10s

maxCommitRetries

Configures the maximum number of retries for committing offsets. Type: number

Default value: 3

properties

Adds or overrides a specific Kafka client property, allowing customization for advanced configurations. Type: object

keyDeserializer

Kafka key deserializer Required.

Type: class

Default value: org.apache.kafka.common.serialization.ByteBufferDeserializer

valueDeserializer

Kafka value deserializer Required.

Type: class

Default value: org.apache.kafka.common.serialization.ByteBufferDeserializer

exceptionHandler

Custom exception handler enabling interception of all errors related to this source. Type: object

ssl

SSL configuration. Type: object

Fields of ssl:

ssl.trustStoreFile

Truststore file or trusted CA certificate; supported formats include JKS, PKCS#12, or PEM. Type: string

ssl.trustStorePassword

Truststore password. Type: string

ssl.keyStoreFile

Keystore file; supported formats include JKS, PKCS#12, or PEM Type: string

ssl.keyStorePassword

Keystore password. Type: string

ssl.keyPassword

Private key password. Optional — if not set, the key store password will be used. Type: string

ssl.insecure

If set to true, disables SSL certificate and hostname validation. Intended for debugging purposes only. Doesn't work with mTLS.

Type: boolean

ssl.hostnameVerifier

Custom hostname verifier for SSL connections. If not specified and 'insecure' is true, hostname verification will be disabled.

Type: object

schemaRegistryUrl

Schema registry URL Type: string

Usage Examples

KafkaSourceConfigBuilder.<String>builder()
    .withKeyDeserializer(StringDeserializer.class)
    .withValueDeserializer(StringDeserializer.class)
    .withGroupId("my-group")
    .withTopicNames("topicA", "topicB")
    .withBootstrapServers(List.of("serverA:9092","serverB:9092"))
    .withStartingOffset(KafkaStartingOffset.LATEST)
    .withPollTimeout(Duration.ofMillis(500))
    .withMaxCommitRetries(5)
    .withMaxCommitTimeout(Duration.ofSeconds(15))
    .withSslBuilder(builder -> builder
        .withTrustStoreFile("/path/to/truststore.jks")
        .withTrustStorePassword("truststore-password")
        .withKeyStoreFile("/path/to/keystore.jks")
        .withKeyPassword("keystore-password")
        .withKeyPassword("key-password"))
    .withExceptionHandler((kafkaConsumer, context, ex) -> 
        System.err.println("Error while consuming data: " + ex.getMessage())
    );

This configuration is designed for consuming real-time data from Kafka topics, with support for secure communication and custom error handling.