Skip to content

Kafka

The kafka sink sends data to an Apache Kafka topic. It supports a variety of configurations such as specifying the topic name, bootstrap servers, key and value serializers, SSL configuration, and more. This sink is ideal for integrating with Kafka for real-time data streaming.

KafkaSinkConfigBuilder.<String>builder()
    .withTopicName("my-topic")
    .withBootstrapServers("kafka.example.com:9092")
    .withSchemaRegistryUrl("http://registry.example.com")
    .addPropertiesEntry("key1", "value1")
    .withHeaderExtractor(string -> List.of(new RecordHeader("encoding", ...)))
    .withSslBuilder(builder -> builder
        .withTrustStoreFile("/path/to/truststore.jks")
        .withTrustStorePassword("truststore-password")
        .withKeyStoreFile("/path/to/keystore.jks")
        .withKeyStorePassword("keystore-password")
        .withKeyPassword("key-password"))
    .withValueSerializer(IntegerSerializer.class)
sink:
  kafka:
    topicName: "my-topic"
    bootstrapServers: "kafka.example.com:9092"
    schemaRegistry: "http://registry.example.com"
    properties:
      key1: value1

Java dependency management

Add this declaration to your dependency management system to access the configuration DSL for this plugin in Java.

<dependency>
    <groupId>org.voltdb</groupId>
    <artifactId>volt-stream-plugin-kafka-api</artifactId>
    <version>1.0-20250910-124207-release-1.5.3</version>
</dependency>
implementation group: 'org.voltdb', name: 'volt-stream-plugin-kafka-api', version: '1.0-20250910-124207-release-1.5.3'

Properties

bootstrapServers

A list of host/port pairs used to establish the initial connection to the Kafka cluster. Clients use this list to bootstrap and discover the full set of Kafka brokers. Required.

Type: array

topicName

Sets the Kafka topic name where data will be sent. Required.

Type: string

properties

Adds or overrides a specific Kafka client property, allowing customization for advanced configurations. Type: object

ssl

SSL configuration. Type: object

Fields of ssl:

ssl.trustStoreFile

Truststore file or trusted CA certificate; supported formats include JKS, PKCS#12, or PEM. Type: string

ssl.trustStorePassword

Truststore password. Type: string

ssl.keyStoreFile

Keystore file; supported formats include JKS, PKCS#12, or PEM Type: string

ssl.keyStorePassword

Keystore password. Type: string

ssl.keyPassword

Private key password. Optional — if not set, the key store password will be used. Type: string

ssl.insecure

If set to true, disables SSL certificate and hostname validation. Intended for debugging purposes only. Doesn't work with mTLS.

Type: boolean

ssl.hostnameVerifier

Custom hostname verifier for SSL connections. If not specified and 'insecure' is true, hostname verification will be disabled.

Type: object

keySerializer

Kafka key deserializer Required.

Type: class

Default value: org.apache.kafka.common.serialization.StringSerializer

valueSerializer

Kafka value deserializer Required.

Type: class

exceptionHandler

Custom exception handler enabling interception of all errors related to this sink. Type: object

keyExtractor

A function that extracts the message key from the given output object. The extracted key is used by Kafka for partitioning and message ordering within a topic. Type: object

valueExtractor

A function that extracts the message value (payload) from the output object. This is the core content of the Kafka message. Type: object

headerExtractor

A function that extracts Kafka headers from the output object. Headers can carry metadata or additional context alongside the message and are represented as key-value pairs (Header objects). Type: object

schemaRegistryUrl

Schema registry URL Type: string

Usage Examples

KafkaStreamSinkConfigurator.accepting(String.class)
   .withTopicName("my-topic")
   .withBootstrapServers("kafka.example.com:9092")
   .withSchemaRegistry("http://registry.example.com")
   .withProperty("key1", "value1")
   .withHeaders(string -> List.of(new RecordHeader("encoding", ...))
   .withSSL(
       KafkaStreamSslConfiguration.builder()
           .withTruststore("/path/to/truststore.jks", "truststore-password")
           .withKeystore("/path/to/keystore.jks", "keystore-password", "key-password")
           .build()
       );

This configuration is designed for consuming real-time data from Kafka topics, with support for secure communication.