Skip to content

Kafka

The kafka sink sends data to an Apache Kafka topic. It supports a variety of configurations such as specifying the topic name, bootstrap servers, key and value serializers, SSL configuration, and more. This sink is ideal for integrating with Kafka for real-time data streaming.

KafkaSinkConfigBuilder.<String>builder()
    .withTopicName("my-topic")
    .withBootstrapServers("kafka.example.com:9092")
    .withSchemaRegistryUrl("http://registry.example.com")
    .addPropertiesEntry("key1", "value1")
    .withHeaderExtractor(string -> List.of(new RecordHeader("encoding", ...)))
    .withSslBuilder(builder -> builder
        .withTrustStoreFile("/path/to/truststore.jks")
        .withTrustStorePassword("truststore-password")
        .withKeyStoreFile("/path/to/keystore.jks")
        .withKeyStorePassword("keystore-password")
        .withKeyPassword("key-password"))
    .withValueSerializer(IntegerSerializer.class)
sink:
  kafka:
    topicName: "my-topic"
    bootstrapServers: "kafka.example.com:9092"
    schemaRegistry: "http://registry.example.com"
    properties:
      key1: value1

Java dependency management

Add this declaration to your dependency management system to access the configuration DSL for this plugin in Java.

<dependency>
    <groupId>org.voltdb</groupId>
    <artifactId>volt-stream-plugin-kafka-api</artifactId>
    <version>1.8.0</version>
</dependency>
implementation group: 'org.voltdb', name: 'volt-stream-plugin-kafka-api', version: '1.8.0'

Properties

bootstrapServers

A list of host/port pairs used to establish the initial connection to the Kafka cluster. Clients use this list to bootstrap and discover the full set of Kafka brokers. Required.

Type: array

topicName

Sets the Kafka topic name where data will be sent. Required.

Type: string

properties

Adds or overrides a specific Kafka client property, allowing customization for advanced configurations. Type: object

ssl

SSL configuration. Type: object

Fields of ssl:

ssl.trustStoreFile

Truststore file or trusted CA certificate; supported formats include JKS, PKCS#12, or PEM. Type: string

ssl.trustStorePassword

Truststore password. Type: string

ssl.keyStoreFile

Keystore file; supported formats include JKS, PKCS#12, or PEM Type: string

ssl.keyStorePassword

Keystore password. Type: string

ssl.keyPassword

Private key password. Optional — if not set, the key store password will be used. Type: string

ssl.insecure

If set to true, disables SSL certificate and hostname validation. Intended for debugging purposes only. Doesn't work with mTLS.

Type: boolean

ssl.hostnameVerifier

Custom hostname verifier for SSL connections. If not specified and 'insecure' is true, hostname verification will be disabled.

Type: object

keySerializer

Kafka key deserializer Required.

Type: class

Default value: org.apache.kafka.common.serialization.StringSerializer

valueSerializer

Kafka value deserializer Required.

Type: class

exceptionHandler

Custom exception handler enabling interception of all errors related to this sink. Type: object

keyExtractor

A function that extracts the message key from the given output object. The extracted key is used by Kafka for partitioning and message ordering within a topic. Type: object

valueExtractor

A function that extracts the message value (payload) from the output object. This is the core content of the Kafka message. Type: object

headerExtractor

A function that extracts Kafka headers from the output object. Headers can carry metadata or additional context alongside the message and are represented as key-value pairs (Header objects). Type: object

schemaRegistryUrl

Schema registry URL Type: string

JSON Schema

You can validate or explore the configuration using its JSON Schema.

Usage Examples

KafkaSinkConfig
   .withTopicName("my-topic")
   .withBootstrapServers("kafka.example.com:9092")
   .withSchemaRegistry("http://registry.example.com")
   .withProperty("key1", "value1")
   .withHeaders(string -> List.of(new RecordHeader("encoding", ...))
   .withSSL(
       KafkaStreamSslConfiguration.builder()
           .withTruststore("/path/to/truststore.jks", "truststore-password")
           .withKeystore("/path/to/keystore.jks", "keystore-password", "key-password")
           .build()
       );

This configuration is designed for consuming real-time data from Kafka topics, with support for secure communication.

Metrics

Kafka metrics

Metric enum: org.voltdb.stream.plugin.kafka.KafkaMetric

Prometheus name Type Description
voltsp_kafka_commit_rate_total gauge Kafka consumer commit rate.
voltsp_kafka_connection_count_total gauge Current Kafka client connection count.
voltsp_kafka_fetch_latency_max_seconds gauge Maximum Kafka fetch latency reported by the consumer.
voltsp_kafka_fetch_rate_total gauge Kafka consumer fetch rate.
voltsp_kafka_outgoing_byte_rate_bytes gauge Kafka producer outgoing byte rate.
voltsp_kafka_record_error_rate_total gauge Kafka producer record error rate.
voltsp_kafka_record_queue_time_avg_seconds gauge Average Kafka producer record queue time.
voltsp_kafka_request_in_flight_total gauge Number of in-flight Kafka producer requests.
voltsp_kafka_source_partition_assigned_total gauge Current number of Kafka partitions assigned to the source.
voltsp_kafka_waiting_threads_total gauge Number of Kafka producer threads waiting for buffer memory.
voltsp_kafka_batch_size_max_total counter Maximum Kafka producer batch size.
voltsp_kafka_buffer_exhausted_rate_total counter Kafka producer buffer exhausted rate.
voltsp_kafka_bytes_consumed_bytes counter Number of bytes consumed by the Kafka consumer.
voltsp_kafka_commit_sync_time_seconds counter Kafka consumer commitSync time reported by the native Kafka client.
voltsp_kafka_commits_total counter Number of Kafka consumer commits.
voltsp_kafka_connection_close_total counter Number of Kafka producer connection closes.
voltsp_kafka_fetch_size_max_total counter Maximum Kafka fetch size reported by the consumer.
voltsp_kafka_incoming_bytes counter Incoming bytes reported by the Kafka consumer.
voltsp_kafka_io_time_ns_seconds counter Kafka consumer I/O time reported by the native Kafka client.
voltsp_kafka_io_wait_time_ns_seconds counter Kafka producer I/O wait time reported by the native Kafka client.
voltsp_kafka_last_poll_seconds_ago_total counter Seconds since the last Kafka consumer poll.
voltsp_kafka_network_io_bytes counter Kafka consumer network I/O reported by the native Kafka client.
voltsp_kafka_record_retry_total counter Number of Kafka producer record retries.
voltsp_kafka_record_send_rate_total counter Kafka producer record send rate.
voltsp_kafka_record_send_total counter Number of Kafka producer records sent.
voltsp_kafka_records_consumed_total counter Number of records consumed by the Kafka consumer.
voltsp_kafka_records_per_request_avg_total counter Average number of Kafka records per fetch request.
voltsp_kafka_request_size_avg_bytes counter Average Kafka producer request size.
voltsp_kafka_sink_bytes_pushed_bytes counter Number of bytes pushed by the Kafka sink.
voltsp_kafka_sink_messages_consumed_total counter Number of records consumed by the Kafka sink.
voltsp_kafka_sink_messages_pushed_total counter Number of records pushed by the Kafka sink.
voltsp_kafka_source_bytes_pulled_bytes counter Number of bytes pulled by the Kafka source.
voltsp_kafka_source_messages_pulled_total counter Number of records pulled by the Kafka source.
voltsp_kafka_source_pull_errors_total counter Number of Kafka source poll errors.
voltsp_kafka_start_time_ms_total counter Kafka client start time in milliseconds.
voltsp_kafka_time_between_poll_max_seconds counter Maximum time between Kafka consumer polls.
voltsp_kafka_sink_pull_delay_seconds histogram Delay between Kafka sink pulls.
voltsp_kafka_sink_push_time_seconds histogram Kafka sink send duration.
voltsp_kafka_source_pull_delay_seconds histogram Delay between Kafka source polls.
voltsp_kafka_source_pull_time_seconds histogram Kafka source poll duration.

Kafka tags

Tag enum: org.voltdb.stream.plugin.kafka.KafkaTag

Metrics reported by this component may include these tags. Not every metric includes every tag; tags are present only when they are relevant to the measurement.

Tag Description
kafka_source_group_id Kafka consumer group id used by the source.
kafka_topic Kafka topic name.