Kafka¶
The kafka source consumes data from an Apache Kafka topic. It allows configuration of consumer properties,
such as topic names, bootstrap servers, poll timeout, starting offsets, and more.
This source is suitable for integrating real-time data from Kafka into your streaming pipeline.
The record's key and headers are written to the stage-scoped {@link org.voltdb.stream.api.StashContext} and are available during event processing within current stage - starting from this source to the terminator (sink or emitter). Each time the new record is read from the kafka source, the key and headers are replaced within the context.
Available stash entries:
| Entry | Java (typed key) | YAML / JS / Python (name) | Runtime type |
|---|---|---|---|
| Key | KafkaStashKeys.KEY |
source.kafka.key |
Object |
| Headers | KafkaStashKeys.HEADERS |
source.kafka.headers |
Map<String, String> |
Headers are off by default; set the includeHeaders option to enable them.
KafkaSourceConfigBuilder.<String>builder()
.withGroupId("my-group")
.withTopicNames("topicA", "topicB")
.withBootstrapServers("serverA:9092", "serverB:9092")
.withStartingOffset(KafkaStartingOffset.EARLIEST)
.withPollTimeout(Duration.ofMillis(250))
.withMaxCommitRetries(3)
.withMaxCommitTimeout(Duration.ofSeconds(10)
);
source:
kafka:
groupId: "my-group"
bootstrapServers:
- "serverA:9092"
- "serverB:9092"
topicNames:
- "topicA"
- "topicB"
startingOffset: "EARLIEST"
pollTimeout: "PT0.25S"
maxCommitTimeout: "PT10S"
maxCommitRetries: 3
properties:
key1: value1
key2: value2
Java dependency management¶
Add this declaration to your dependency management system to access the configuration DSL for this plugin in Java.
<dependency>
<groupId>org.voltdb</groupId>
<artifactId>volt-stream-plugin-kafka-api</artifactId>
<version>1.8.0</version>
</dependency>
implementation group: 'org.voltdb', name: 'volt-stream-plugin-kafka-api', version: '1.8.0'
Properties¶
bootstrapServers¶
A list of host/port pairs used to establish the initial connection to the Kafka cluster. Clients use this list to bootstrap and discover the full set of Kafka brokers. Required.
Type: array
topicNames¶
Required.
Type: array
groupId¶
A unique string that identifies the consumer group this consumer belongs to. Required.
Type: string
startingOffset¶
What to do when there is no initial offset in Kafka or if the current offset does not exist any more: - earliest: automatically reset the offset to the earliest offset. - latest: automatically reset the offset to the latest offset. - none: throw exception to the consumer if no previous offset is found for the consumer's group.
Equivalent to Kafka auto.offset.reset setting.
Required.
Type: object
Supported values: earliest, latest, none.
Default value: earliest
pollTimeout¶
The maximum time to block the receiving thread.
Type: object
Default value: 1s
maxCommitTimeout¶
Sets the maximum timeout for commit retries.
Type: object
Default value: 10s
maxCommitRetries¶
Configures the maximum number of retries for committing offsets.
Type: number
Default value: 3
maxPollRecords¶
Specifies the maximum number of records to retrieve in a single poll.
Type: number
Default value: 10000
properties¶
Adds or overrides a specific Kafka client property, allowing customization for advanced configurations.
Type: object
keyDeserializer¶
Kafka key deserializer Required.
Type: class
Default value: org.apache.kafka.common.serialization.ByteBufferDeserializer
valueDeserializer¶
Kafka value deserializer Required.
Type: class
Default value: org.apache.kafka.common.serialization.ByteBufferDeserializer
exceptionHandler¶
Custom exception handler enabling interception of all errors related to this source.
Type: object
ssl¶
SSL configuration.
Type: object
Fields of ssl:
ssl.trustStoreFile¶
Truststore file or trusted CA certificate; supported formats include JKS, PKCS#12, or PEM.
Type: string
ssl.trustStorePassword¶
Truststore password.
Type: string
ssl.keyStoreFile¶
Keystore file; supported formats include JKS, PKCS#12, or PEM
Type: string
ssl.keyStorePassword¶
Keystore password.
Type: string
ssl.keyPassword¶
Private key password. Optional — if not set, the key store password will be used.
Type: string
ssl.insecure¶
If set to true, disables SSL certificate and hostname validation. Intended for debugging purposes only. Doesn't work with mTLS.
Type: boolean
ssl.hostnameVerifier¶
Custom hostname verifier for SSL connections. If not specified and 'insecure' is true, hostname verification will be disabled.
Type: object
schemaRegistryUrl¶
Schema registry URL
Type: string
includeHeaders¶
Whether to deserialise Kafka record headers into the stash context.
Type: boolean
Default value: false
JSON Schema¶
You can validate or explore the configuration using its JSON Schema.
Usage Examples¶
KafkaSourceConfigBuilder.<String>builder()
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withGroupId("my-group")
.withTopicNames("topicA", "topicB")
.withBootstrapServers(List.of("serverA:9092","serverB:9092"))
.withStartingOffset(KafkaStartingOffset.LATEST)
.withPollTimeout(Duration.ofMillis(500))
.withMaxCommitRetries(5)
.withMaxCommitTimeout(Duration.ofSeconds(15))
.withSslBuilder(builder -> builder
.withTrustStoreFile("/path/to/truststore.jks")
.withTrustStorePassword("truststore-password")
.withKeyStoreFile("/path/to/keystore.jks")
.withKeyPassword("keystore-password")
.withKeyPassword("key-password"))
.withExceptionHandler((kafkaConsumer, context, ex) ->
System.err.println("Error while consuming data: " + ex.getMessage())
);
This configuration is designed for consuming real-time data from Kafka topics, with support for secure communication and custom error handling.
Metrics¶
Kafka metrics¶
Metric enum: org.voltdb.stream.plugin.kafka.KafkaMetric
| Prometheus name | Type | Description |
|---|---|---|
voltsp_kafka_commit_rate_total |
gauge |
Kafka consumer commit rate. |
voltsp_kafka_connection_count_total |
gauge |
Current Kafka client connection count. |
voltsp_kafka_fetch_latency_max_seconds |
gauge |
Maximum Kafka fetch latency reported by the consumer. |
voltsp_kafka_fetch_rate_total |
gauge |
Kafka consumer fetch rate. |
voltsp_kafka_outgoing_byte_rate_bytes |
gauge |
Kafka producer outgoing byte rate. |
voltsp_kafka_record_error_rate_total |
gauge |
Kafka producer record error rate. |
voltsp_kafka_record_queue_time_avg_seconds |
gauge |
Average Kafka producer record queue time. |
voltsp_kafka_request_in_flight_total |
gauge |
Number of in-flight Kafka producer requests. |
voltsp_kafka_source_partition_assigned_total |
gauge |
Current number of Kafka partitions assigned to the source. |
voltsp_kafka_waiting_threads_total |
gauge |
Number of Kafka producer threads waiting for buffer memory. |
voltsp_kafka_batch_size_max_total |
counter |
Maximum Kafka producer batch size. |
voltsp_kafka_buffer_exhausted_rate_total |
counter |
Kafka producer buffer exhausted rate. |
voltsp_kafka_bytes_consumed_bytes |
counter |
Number of bytes consumed by the Kafka consumer. |
voltsp_kafka_commit_sync_time_seconds |
counter |
Kafka consumer commitSync time reported by the native Kafka client. |
voltsp_kafka_commits_total |
counter |
Number of Kafka consumer commits. |
voltsp_kafka_connection_close_total |
counter |
Number of Kafka producer connection closes. |
voltsp_kafka_fetch_size_max_total |
counter |
Maximum Kafka fetch size reported by the consumer. |
voltsp_kafka_incoming_bytes |
counter |
Incoming bytes reported by the Kafka consumer. |
voltsp_kafka_io_time_ns_seconds |
counter |
Kafka consumer I/O time reported by the native Kafka client. |
voltsp_kafka_io_wait_time_ns_seconds |
counter |
Kafka producer I/O wait time reported by the native Kafka client. |
voltsp_kafka_last_poll_seconds_ago_total |
counter |
Seconds since the last Kafka consumer poll. |
voltsp_kafka_network_io_bytes |
counter |
Kafka consumer network I/O reported by the native Kafka client. |
voltsp_kafka_record_retry_total |
counter |
Number of Kafka producer record retries. |
voltsp_kafka_record_send_rate_total |
counter |
Kafka producer record send rate. |
voltsp_kafka_record_send_total |
counter |
Number of Kafka producer records sent. |
voltsp_kafka_records_consumed_total |
counter |
Number of records consumed by the Kafka consumer. |
voltsp_kafka_records_per_request_avg_total |
counter |
Average number of Kafka records per fetch request. |
voltsp_kafka_request_size_avg_bytes |
counter |
Average Kafka producer request size. |
voltsp_kafka_sink_bytes_pushed_bytes |
counter |
Number of bytes pushed by the Kafka sink. |
voltsp_kafka_sink_messages_consumed_total |
counter |
Number of records consumed by the Kafka sink. |
voltsp_kafka_sink_messages_pushed_total |
counter |
Number of records pushed by the Kafka sink. |
voltsp_kafka_source_bytes_pulled_bytes |
counter |
Number of bytes pulled by the Kafka source. |
voltsp_kafka_source_messages_pulled_total |
counter |
Number of records pulled by the Kafka source. |
voltsp_kafka_source_pull_errors_total |
counter |
Number of Kafka source poll errors. |
voltsp_kafka_start_time_ms_total |
counter |
Kafka client start time in milliseconds. |
voltsp_kafka_time_between_poll_max_seconds |
counter |
Maximum time between Kafka consumer polls. |
voltsp_kafka_sink_pull_delay_seconds |
histogram |
Delay between Kafka sink pulls. |
voltsp_kafka_sink_push_time_seconds |
histogram |
Kafka sink send duration. |
voltsp_kafka_source_pull_delay_seconds |
histogram |
Delay between Kafka source polls. |
voltsp_kafka_source_pull_time_seconds |
histogram |
Kafka source poll duration. |
Kafka tags¶
Tag enum: org.voltdb.stream.plugin.kafka.KafkaTag
Metrics reported by this component may include these tags. Not every metric includes every tag; tags are present only when they are relevant to the measurement.
| Tag | Description |
|---|---|
kafka_source_group_id |
Kafka consumer group id used by the source. |
kafka_topic |
Kafka topic name. |