Kafka¶
The kafka sink sends data to an Apache Kafka topic. It supports a variety of configurations such as specifying
the topic name, bootstrap servers, key and value serializers, SSL configuration, and more.
This sink is ideal for integrating with Kafka for real-time data streaming.
KafkaSinkConfigBuilder.<String>builder()
.withTopicName("my-topic")
.withBootstrapServers("kafka.example.com:9092")
.withSchemaRegistryUrl("http://registry.example.com")
.addPropertiesEntry("key1", "value1")
.withHeaderExtractor(string -> List.of(new RecordHeader("encoding", ...)))
.withSslBuilder(builder -> builder
.withTrustStoreFile("/path/to/truststore.jks")
.withTrustStorePassword("truststore-password")
.withKeyStoreFile("/path/to/keystore.jks")
.withKeyStorePassword("keystore-password")
.withKeyPassword("key-password"))
.withValueSerializer(IntegerSerializer.class)
sink:
kafka:
topicName: "my-topic"
bootstrapServers: "kafka.example.com:9092"
schemaRegistry: "http://registry.example.com"
properties:
key1: value1
Java dependency management¶
Add this declaration to your dependency management system to access the configuration DSL for this plugin in Java.
<dependency>
<groupId>org.voltdb</groupId>
<artifactId>volt-stream-plugin-kafka-api</artifactId>
<version>1.8.0</version>
</dependency>
implementation group: 'org.voltdb', name: 'volt-stream-plugin-kafka-api', version: '1.8.0'
Properties¶
bootstrapServers¶
A list of host/port pairs used to establish the initial connection to the Kafka cluster. Clients use this list to bootstrap and discover the full set of Kafka brokers. Required.
Type: array
topicName¶
Sets the Kafka topic name where data will be sent. Required.
Type: string
properties¶
Adds or overrides a specific Kafka client property, allowing customization for advanced configurations.
Type: object
ssl¶
SSL configuration.
Type: object
Fields of ssl:
ssl.trustStoreFile¶
Truststore file or trusted CA certificate; supported formats include JKS, PKCS#12, or PEM.
Type: string
ssl.trustStorePassword¶
Truststore password.
Type: string
ssl.keyStoreFile¶
Keystore file; supported formats include JKS, PKCS#12, or PEM
Type: string
ssl.keyStorePassword¶
Keystore password.
Type: string
ssl.keyPassword¶
Private key password. Optional — if not set, the key store password will be used.
Type: string
ssl.insecure¶
If set to true, disables SSL certificate and hostname validation. Intended for debugging purposes only. Doesn't work with mTLS.
Type: boolean
ssl.hostnameVerifier¶
Custom hostname verifier for SSL connections. If not specified and 'insecure' is true, hostname verification will be disabled.
Type: object
keySerializer¶
Kafka key deserializer Required.
Type: class
Default value: org.apache.kafka.common.serialization.StringSerializer
valueSerializer¶
Kafka value deserializer Required.
Type: class
exceptionHandler¶
Custom exception handler enabling interception of all errors related to this sink.
Type: object
keyExtractor¶
A function that extracts the message key from the given output object.
The extracted key is used by Kafka for partitioning and message ordering within a topic.
Type: object
valueExtractor¶
A function that extracts the message value (payload) from the output object.
This is the core content of the Kafka message.
Type: object
headerExtractor¶
A function that extracts Kafka headers from the output object.
Headers can carry metadata or additional context alongside the message and are represented
as key-value pairs (Header objects).
Type: object
schemaRegistryUrl¶
Schema registry URL
Type: string
JSON Schema¶
You can validate or explore the configuration using its JSON Schema.
Usage Examples¶
KafkaSinkConfig
.withTopicName("my-topic")
.withBootstrapServers("kafka.example.com:9092")
.withSchemaRegistry("http://registry.example.com")
.withProperty("key1", "value1")
.withHeaders(string -> List.of(new RecordHeader("encoding", ...))
.withSSL(
KafkaStreamSslConfiguration.builder()
.withTruststore("/path/to/truststore.jks", "truststore-password")
.withKeystore("/path/to/keystore.jks", "keystore-password", "key-password")
.build()
);
This configuration is designed for consuming real-time data from Kafka topics, with support for secure communication.
Metrics¶
Kafka metrics¶
Metric enum: org.voltdb.stream.plugin.kafka.KafkaMetric
| Prometheus name | Type | Description |
|---|---|---|
voltsp_kafka_commit_rate_total |
gauge |
Kafka consumer commit rate. |
voltsp_kafka_connection_count_total |
gauge |
Current Kafka client connection count. |
voltsp_kafka_fetch_latency_max_seconds |
gauge |
Maximum Kafka fetch latency reported by the consumer. |
voltsp_kafka_fetch_rate_total |
gauge |
Kafka consumer fetch rate. |
voltsp_kafka_outgoing_byte_rate_bytes |
gauge |
Kafka producer outgoing byte rate. |
voltsp_kafka_record_error_rate_total |
gauge |
Kafka producer record error rate. |
voltsp_kafka_record_queue_time_avg_seconds |
gauge |
Average Kafka producer record queue time. |
voltsp_kafka_request_in_flight_total |
gauge |
Number of in-flight Kafka producer requests. |
voltsp_kafka_source_partition_assigned_total |
gauge |
Current number of Kafka partitions assigned to the source. |
voltsp_kafka_waiting_threads_total |
gauge |
Number of Kafka producer threads waiting for buffer memory. |
voltsp_kafka_batch_size_max_total |
counter |
Maximum Kafka producer batch size. |
voltsp_kafka_buffer_exhausted_rate_total |
counter |
Kafka producer buffer exhausted rate. |
voltsp_kafka_bytes_consumed_bytes |
counter |
Number of bytes consumed by the Kafka consumer. |
voltsp_kafka_commit_sync_time_seconds |
counter |
Kafka consumer commitSync time reported by the native Kafka client. |
voltsp_kafka_commits_total |
counter |
Number of Kafka consumer commits. |
voltsp_kafka_connection_close_total |
counter |
Number of Kafka producer connection closes. |
voltsp_kafka_fetch_size_max_total |
counter |
Maximum Kafka fetch size reported by the consumer. |
voltsp_kafka_incoming_bytes |
counter |
Incoming bytes reported by the Kafka consumer. |
voltsp_kafka_io_time_ns_seconds |
counter |
Kafka consumer I/O time reported by the native Kafka client. |
voltsp_kafka_io_wait_time_ns_seconds |
counter |
Kafka producer I/O wait time reported by the native Kafka client. |
voltsp_kafka_last_poll_seconds_ago_total |
counter |
Seconds since the last Kafka consumer poll. |
voltsp_kafka_network_io_bytes |
counter |
Kafka consumer network I/O reported by the native Kafka client. |
voltsp_kafka_record_retry_total |
counter |
Number of Kafka producer record retries. |
voltsp_kafka_record_send_rate_total |
counter |
Kafka producer record send rate. |
voltsp_kafka_record_send_total |
counter |
Number of Kafka producer records sent. |
voltsp_kafka_records_consumed_total |
counter |
Number of records consumed by the Kafka consumer. |
voltsp_kafka_records_per_request_avg_total |
counter |
Average number of Kafka records per fetch request. |
voltsp_kafka_request_size_avg_bytes |
counter |
Average Kafka producer request size. |
voltsp_kafka_sink_bytes_pushed_bytes |
counter |
Number of bytes pushed by the Kafka sink. |
voltsp_kafka_sink_messages_consumed_total |
counter |
Number of records consumed by the Kafka sink. |
voltsp_kafka_sink_messages_pushed_total |
counter |
Number of records pushed by the Kafka sink. |
voltsp_kafka_source_bytes_pulled_bytes |
counter |
Number of bytes pulled by the Kafka source. |
voltsp_kafka_source_messages_pulled_total |
counter |
Number of records pulled by the Kafka source. |
voltsp_kafka_source_pull_errors_total |
counter |
Number of Kafka source poll errors. |
voltsp_kafka_start_time_ms_total |
counter |
Kafka client start time in milliseconds. |
voltsp_kafka_time_between_poll_max_seconds |
counter |
Maximum time between Kafka consumer polls. |
voltsp_kafka_sink_pull_delay_seconds |
histogram |
Delay between Kafka sink pulls. |
voltsp_kafka_sink_push_time_seconds |
histogram |
Kafka sink send duration. |
voltsp_kafka_source_pull_delay_seconds |
histogram |
Delay between Kafka source polls. |
voltsp_kafka_source_pull_time_seconds |
histogram |
Kafka source poll duration. |
Kafka tags¶
Tag enum: org.voltdb.stream.plugin.kafka.KafkaTag
Metrics reported by this component may include these tags. Not every metric includes every tag; tags are present only when they are relevant to the measurement.
| Tag | Description |
|---|---|
kafka_source_group_id |
Kafka consumer group id used by the source. |
kafka_topic |
Kafka topic name. |