Skip to content

Kafka

The kafka source consumes data from an Apache Kafka topic. It allows configuration of consumer properties, such as topic names, bootstrap servers, poll timeout, starting offsets, and more. This source is suitable for integrating real-time data from Kafka into your streaming pipeline.

The record's key and headers are written to the stage-scoped {@link org.voltdb.stream.api.StashContext} and are available during event processing within current stage - starting from this source to the terminator (sink or emitter). Each time the new record is read from the kafka source, the key and headers are replaced within the context.

Available stash entries:

Entry Java (typed key) YAML / JS / Python (name) Runtime type
Key KafkaStashKeys.KEY source.kafka.key Object
Headers KafkaStashKeys.HEADERS source.kafka.headers Map<String, String>

Headers are off by default; set the includeHeaders option to enable them.

KafkaSourceConfigBuilder.<String>builder()
   .withGroupId("my-group")
   .withTopicNames("topicA", "topicB")
   .withBootstrapServers("serverA:9092", "serverB:9092")
   .withStartingOffset(KafkaStartingOffset.EARLIEST)
   .withPollTimeout(Duration.ofMillis(250))
   .withMaxCommitRetries(3)
   .withMaxCommitTimeout(Duration.ofSeconds(10)
);
source:
  kafka:
    groupId: "my-group"
    bootstrapServers:
      - "serverA:9092"
      - "serverB:9092"
    topicNames:
      - "topicA"
      - "topicB"
    startingOffset: "EARLIEST"
    pollTimeout: "PT0.25S"
    maxCommitTimeout: "PT10S"
    maxCommitRetries: 3
    properties:
      key1: value1
      key2: value2

Java dependency management

Add this declaration to your dependency management system to access the configuration DSL for this plugin in Java.

<dependency>
    <groupId>org.voltdb</groupId>
    <artifactId>volt-stream-plugin-kafka-api</artifactId>
    <version>1.8.0</version>
</dependency>
implementation group: 'org.voltdb', name: 'volt-stream-plugin-kafka-api', version: '1.8.0'

Properties

bootstrapServers

A list of host/port pairs used to establish the initial connection to the Kafka cluster. Clients use this list to bootstrap and discover the full set of Kafka brokers. Required.

Type: array

topicNames

Required.

Type: array

groupId

A unique string that identifies the consumer group this consumer belongs to. Required.

Type: string

startingOffset

What to do when there is no initial offset in Kafka or if the current offset does not exist any more: - earliest: automatically reset the offset to the earliest offset. - latest: automatically reset the offset to the latest offset. - none: throw exception to the consumer if no previous offset is found for the consumer's group.

Equivalent to Kafka auto.offset.reset setting.

Required.

Type: object

Supported values: earliest, latest, none.

Default value: earliest

pollTimeout

The maximum time to block the receiving thread. Type: object

Default value: 1s

maxCommitTimeout

Sets the maximum timeout for commit retries. Type: object

Default value: 10s

maxCommitRetries

Configures the maximum number of retries for committing offsets. Type: number

Default value: 3

maxPollRecords

Specifies the maximum number of records to retrieve in a single poll. Type: number

Default value: 10000

properties

Adds or overrides a specific Kafka client property, allowing customization for advanced configurations. Type: object

keyDeserializer

Kafka key deserializer Required.

Type: class

Default value: org.apache.kafka.common.serialization.ByteBufferDeserializer

valueDeserializer

Kafka value deserializer Required.

Type: class

Default value: org.apache.kafka.common.serialization.ByteBufferDeserializer

exceptionHandler

Custom exception handler enabling interception of all errors related to this source. Type: object

ssl

SSL configuration. Type: object

Fields of ssl:

ssl.trustStoreFile

Truststore file or trusted CA certificate; supported formats include JKS, PKCS#12, or PEM. Type: string

ssl.trustStorePassword

Truststore password. Type: string

ssl.keyStoreFile

Keystore file; supported formats include JKS, PKCS#12, or PEM Type: string

ssl.keyStorePassword

Keystore password. Type: string

ssl.keyPassword

Private key password. Optional — if not set, the key store password will be used. Type: string

ssl.insecure

If set to true, disables SSL certificate and hostname validation. Intended for debugging purposes only. Doesn't work with mTLS.

Type: boolean

ssl.hostnameVerifier

Custom hostname verifier for SSL connections. If not specified and 'insecure' is true, hostname verification will be disabled.

Type: object

schemaRegistryUrl

Schema registry URL Type: string

includeHeaders

Whether to deserialise Kafka record headers into the stash context. Type: boolean

Default value: false

JSON Schema

You can validate or explore the configuration using its JSON Schema.

Usage Examples

KafkaSourceConfigBuilder.<String>builder()
    .withKeyDeserializer(StringDeserializer.class)
    .withValueDeserializer(StringDeserializer.class)
    .withGroupId("my-group")
    .withTopicNames("topicA", "topicB")
    .withBootstrapServers(List.of("serverA:9092","serverB:9092"))
    .withStartingOffset(KafkaStartingOffset.LATEST)
    .withPollTimeout(Duration.ofMillis(500))
    .withMaxCommitRetries(5)
    .withMaxCommitTimeout(Duration.ofSeconds(15))
    .withSslBuilder(builder -> builder
        .withTrustStoreFile("/path/to/truststore.jks")
        .withTrustStorePassword("truststore-password")
        .withKeyStoreFile("/path/to/keystore.jks")
        .withKeyPassword("keystore-password")
        .withKeyPassword("key-password"))
    .withExceptionHandler((kafkaConsumer, context, ex) -> 
        System.err.println("Error while consuming data: " + ex.getMessage())
    );

This configuration is designed for consuming real-time data from Kafka topics, with support for secure communication and custom error handling.

Metrics

Kafka metrics

Metric enum: org.voltdb.stream.plugin.kafka.KafkaMetric

Prometheus name Type Description
voltsp_kafka_commit_rate_total gauge Kafka consumer commit rate.
voltsp_kafka_connection_count_total gauge Current Kafka client connection count.
voltsp_kafka_fetch_latency_max_seconds gauge Maximum Kafka fetch latency reported by the consumer.
voltsp_kafka_fetch_rate_total gauge Kafka consumer fetch rate.
voltsp_kafka_outgoing_byte_rate_bytes gauge Kafka producer outgoing byte rate.
voltsp_kafka_record_error_rate_total gauge Kafka producer record error rate.
voltsp_kafka_record_queue_time_avg_seconds gauge Average Kafka producer record queue time.
voltsp_kafka_request_in_flight_total gauge Number of in-flight Kafka producer requests.
voltsp_kafka_source_partition_assigned_total gauge Current number of Kafka partitions assigned to the source.
voltsp_kafka_waiting_threads_total gauge Number of Kafka producer threads waiting for buffer memory.
voltsp_kafka_batch_size_max_total counter Maximum Kafka producer batch size.
voltsp_kafka_buffer_exhausted_rate_total counter Kafka producer buffer exhausted rate.
voltsp_kafka_bytes_consumed_bytes counter Number of bytes consumed by the Kafka consumer.
voltsp_kafka_commit_sync_time_seconds counter Kafka consumer commitSync time reported by the native Kafka client.
voltsp_kafka_commits_total counter Number of Kafka consumer commits.
voltsp_kafka_connection_close_total counter Number of Kafka producer connection closes.
voltsp_kafka_fetch_size_max_total counter Maximum Kafka fetch size reported by the consumer.
voltsp_kafka_incoming_bytes counter Incoming bytes reported by the Kafka consumer.
voltsp_kafka_io_time_ns_seconds counter Kafka consumer I/O time reported by the native Kafka client.
voltsp_kafka_io_wait_time_ns_seconds counter Kafka producer I/O wait time reported by the native Kafka client.
voltsp_kafka_last_poll_seconds_ago_total counter Seconds since the last Kafka consumer poll.
voltsp_kafka_network_io_bytes counter Kafka consumer network I/O reported by the native Kafka client.
voltsp_kafka_record_retry_total counter Number of Kafka producer record retries.
voltsp_kafka_record_send_rate_total counter Kafka producer record send rate.
voltsp_kafka_record_send_total counter Number of Kafka producer records sent.
voltsp_kafka_records_consumed_total counter Number of records consumed by the Kafka consumer.
voltsp_kafka_records_per_request_avg_total counter Average number of Kafka records per fetch request.
voltsp_kafka_request_size_avg_bytes counter Average Kafka producer request size.
voltsp_kafka_sink_bytes_pushed_bytes counter Number of bytes pushed by the Kafka sink.
voltsp_kafka_sink_messages_consumed_total counter Number of records consumed by the Kafka sink.
voltsp_kafka_sink_messages_pushed_total counter Number of records pushed by the Kafka sink.
voltsp_kafka_source_bytes_pulled_bytes counter Number of bytes pulled by the Kafka source.
voltsp_kafka_source_messages_pulled_total counter Number of records pulled by the Kafka source.
voltsp_kafka_source_pull_errors_total counter Number of Kafka source poll errors.
voltsp_kafka_start_time_ms_total counter Kafka client start time in milliseconds.
voltsp_kafka_time_between_poll_max_seconds counter Maximum time between Kafka consumer polls.
voltsp_kafka_sink_pull_delay_seconds histogram Delay between Kafka sink pulls.
voltsp_kafka_sink_push_time_seconds histogram Kafka sink send duration.
voltsp_kafka_source_pull_delay_seconds histogram Delay between Kafka source polls.
voltsp_kafka_source_pull_time_seconds histogram Kafka source poll duration.

Kafka tags

Tag enum: org.voltdb.stream.plugin.kafka.KafkaTag

Metrics reported by this component may include these tags. Not every metric includes every tag; tags are present only when they are relevant to the measurement.

Tag Description
kafka_source_group_id Kafka consumer group id used by the source.
kafka_topic Kafka topic name.