Kafka¶
The kafka
sink sends data to an Apache Kafka topic. It supports a variety of configurations such as specifying
the topic name, bootstrap servers, key and value serializers, SSL configuration, and more.
This sink is ideal for integrating with Kafka for real-time data streaming.
KafkaSinkConfigBuilder.<String>builder()
.withTopicName("my-topic")
.withBootstrapServers("kafka.example.com:9092")
.withSchemaRegistryUrl("http://registry.example.com")
.addPropertiesEntry("key1", "value1")
.withHeaderExtractor(string -> List.of(new RecordHeader("encoding", ...)))
.withSslBuilder(builder -> builder
.withTrustStoreFile("/path/to/truststore.jks")
.withTrustStorePassword("truststore-password")
.withKeyStoreFile("/path/to/keystore.jks")
.withKeyStorePassword("keystore-password")
.withKeyPassword("key-password"))
.withValueSerializer(IntegerSerializer.class)
sink:
kafka:
topicName: "my-topic"
bootstrapServers: "kafka.example.com:9092"
schemaRegistry: "http://registry.example.com"
properties:
key1: value1
Java dependency management¶
Add this declaration to your dependency management system to access the configuration DSL for this plugin in Java.
<dependency>
<groupId>org.voltdb</groupId>
<artifactId>volt-stream-plugin-kafka-api</artifactId>
<version>1.0-20250910-124207-release-1.5.3</version>
</dependency>
implementation group: 'org.voltdb', name: 'volt-stream-plugin-kafka-api', version: '1.0-20250910-124207-release-1.5.3'
Properties¶
bootstrapServers
¶
A list of host/port pairs used to establish the initial connection to the Kafka cluster. Clients use this list to bootstrap and discover the full set of Kafka brokers. Required.
Type: array
topicName
¶
Sets the Kafka topic name where data will be sent. Required.
Type: string
properties
¶
Adds or overrides a specific Kafka client property, allowing customization for advanced configurations.
Type: object
ssl
¶
SSL configuration.
Type: object
Fields of ssl
:
ssl.trustStoreFile
¶
Truststore file or trusted CA certificate; supported formats include JKS, PKCS#12, or PEM.
Type: string
ssl.trustStorePassword
¶
Truststore password.
Type: string
ssl.keyStoreFile
¶
Keystore file; supported formats include JKS, PKCS#12, or PEM
Type: string
ssl.keyStorePassword
¶
Keystore password.
Type: string
ssl.keyPassword
¶
Private key password. Optional — if not set, the key store password will be used.
Type: string
ssl.insecure
¶
If set to true, disables SSL certificate and hostname validation. Intended for debugging purposes only. Doesn't work with mTLS.
Type: boolean
ssl.hostnameVerifier
¶
Custom hostname verifier for SSL connections. If not specified and 'insecure' is true, hostname verification will be disabled.
Type: object
keySerializer
¶
Kafka key deserializer Required.
Type: class
Default value: org.apache.kafka.common.serialization.StringSerializer
valueSerializer
¶
Kafka value deserializer Required.
Type: class
exceptionHandler
¶
Custom exception handler enabling interception of all errors related to this sink.
Type: object
keyExtractor
¶
A function that extracts the message key from the given output object.
The extracted key is used by Kafka for partitioning and message ordering within a topic.
Type: object
valueExtractor
¶
A function that extracts the message value (payload) from the output object.
This is the core content of the Kafka message.
Type: object
headerExtractor
¶
A function that extracts Kafka headers from the output object.
Headers can carry metadata or additional context alongside the message and are represented
as key-value pairs (Header objects).
Type: object
schemaRegistryUrl
¶
Schema registry URL
Type: string
Usage Examples¶
KafkaStreamSinkConfigurator.accepting(String.class)
.withTopicName("my-topic")
.withBootstrapServers("kafka.example.com:9092")
.withSchemaRegistry("http://registry.example.com")
.withProperty("key1", "value1")
.withHeaders(string -> List.of(new RecordHeader("encoding", ...))
.withSSL(
KafkaStreamSslConfiguration.builder()
.withTruststore("/path/to/truststore.jks", "truststore-password")
.withKeystore("/path/to/keystore.jks", "keystore-password", "key-password")
.build()
);
This configuration is designed for consuming real-time data from Kafka topics, with support for secure communication.